diff --git a/package.json b/package.json index d5a2ee0..58a655f 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ }, "scripts": { "test": "NODE_PATH=src node node_modules/.bin/ava 'tests/*.spec.ts' -e", - "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.ts", + "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.js", "test:all": "NODE_PATH=src node node_modules/.bin/ava", "lint": "tslint -p tsconfig.json", "validate:tslint": "tslint-config-prettier-check ./tslint.json", diff --git a/src/functions/baseFunctions.ts b/src/functions/baseFunctions.ts index 005aa3a..6ff480a 100644 --- a/src/functions/baseFunctions.ts +++ b/src/functions/baseFunctions.ts @@ -20,3 +20,4 @@ export { split } from "./split"; export { stringify } from "./stringify"; export { unbatch } from "./unbatch"; export { compose } from "./compose"; +export { demux } from "./demux"; diff --git a/src/functions/compose.ts b/src/functions/compose.ts index fea69eb..ee17101 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -74,8 +74,10 @@ export class Compose extends Duplex { return (this.last as NodeJS.ReadableStream).pipe(dest); } - public write(chunk: any) { - return (this.first as NodeJS.WritableStream).write(chunk); + public _write(chunk: any, encoding: string, cb: any) { + const res = (this.first as NodeJS.WritableStream).write(chunk); + cb(); + return res; } public bubble(...events: string[]) { diff --git a/src/functions/demux.ts b/src/functions/demux.ts new file mode 100644 index 0000000..ef7a26a --- /dev/null +++ b/src/functions/demux.ts @@ -0,0 +1,91 @@ +import { WritableOptions, Writable } from "stream"; + +/** + * Return a Duplex stream that is pushed data from multiple sources + * @param streams Source streams to multiplex + * @param options Duplex stream options + */ +export function demux( + construct: () => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxBy: { key?: string; keyBy?: (chunk: any) => string }, + options?: WritableOptions, +): Writable { + return new Demux(construct, demuxBy, options); +} + +class Demux extends Writable { + private streamsByKey: { + [key: string]: { + stream: NodeJS.WritableStream | NodeJS.ReadWriteStream; + writable: boolean; + }; + }; + private demuxer: (chunk: any) => string; + private isWritable: boolean; + private nonWritableStreams: Array; + private construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream; + constructor( + construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxBy: { key?: string; keyBy?: (chunk: any) => string }, + options?: WritableOptions, + ) { + super(options); + if (demuxBy.keyBy === undefined && demuxBy.key === undefined) { + throw new Error( + "keyBy or key must be provided in second parameter", + ); + } + this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); + this.construct = construct; + this.streamsByKey = {}; + this.isWritable = true; + } + + public _write(chunk: any, encoding: string, cb: any) { + const destKey = this.demuxer(chunk); + if (this.streamsByKey[destKey] === undefined) { + this.streamsByKey[destKey] = { + stream: this.construct(destKey), + writable: true, + }; + } + // Throttle when one stream is not writable anymore + // Set writable to false + // keep state of all the streams, if one is not writable demux shouldnt be writable + // Small optimization is to keep writing until you get a following event to the unwritable destination + + let res = false; + if (this.isWritable && this.streamsByKey[destKey].writable) { + res = this.streamsByKey[destKey].stream.write(chunk, encoding, cb); + } else if (this.isWritable) { + this.isWritable = false; + // Buffer chunk? + return this.isWritable; + } + + /* If write above returns false and the stream written to was writable previously, we need to make demux + * non-writable and update state to know the stream is nonWritable. + * If write returns true and the stream was previously not writable, we need to update which streams + * are non writable and determine if it is safe for demux to become writable (all streams are writable) + */ + if (!res) { + this.streamsByKey[destKey].writable = false; + this.nonWritableStreams.push(destKey); + this.isWritable = false; + this.streamsByKey[destKey].stream.once("drain", () => { + this.streamsByKey[destKey].writable = true; + this.nonWritableStreams = this.nonWritableStreams.filter( + key => key !== destKey, + ); + + this.isWritable = this.nonWritableStreams.length === 0; + }); + } + + return this.writable; + } +} diff --git a/src/functions/index.ts b/src/functions/index.ts index 2e4b8ad..a5b53db 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -289,3 +289,13 @@ export function accumulatorBy( } export const compose = baseFunctions.compose; + +export function demux( + construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxer: { key?: string; keyBy?: (chunk: any) => string }, + options?: DuplexOptions, +) { + return baseFunctions.demux(construct, demuxer, options); +} diff --git a/src/functions/map.ts b/src/functions/map.ts index 0bf708d..61e84d5 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -14,6 +14,7 @@ export function map( writableObjectMode: true, }, ): Transform { + // remove try catch return new Transform({ ...options, async transform(chunk: T, encoding, callback) { @@ -22,6 +23,7 @@ export function map( this.push(mapped); callback(); } catch (err) { + console.log("caught error", err.message); callback(err); } }, diff --git a/src/index.ts b/src/index.ts index 98b0a45..924b246 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,4 +22,5 @@ export { accumulator, accumulatorBy, compose, + demux, } from "./functions"; diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts new file mode 100644 index 0000000..cdc91d5 --- /dev/null +++ b/tests/demux.spec.ts @@ -0,0 +1,164 @@ +import test from "ava"; +import { expect } from "chai"; +import { demux, map } from "../src"; +import { Writable } from "stream"; + +interface Test { + key: string; + val: number; +} +test.cb("should spread per key", t => { + t.plan(5); + const input = [ + { key: "a", val: 1 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, + { key: "c", val: 4 }, + ]; + const results = [ + { key: "a", val: 2 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, + { key: "c", val: 5 }, + ]; + const destinationStreamKeys = []; + let i = 0; + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + }, + }); + const construct = (destKey: string) => { + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + return { + ...chunk, + val: chunk.val + 1, + }; + }); + + dest.pipe(sink); + return dest; + }; + + const demuxed = demux(construct, { key: "key" }, { objectMode: true }); + demuxed.on("finish", () => { + expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); + t.pass(); + t.end(); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); + +test.cb("should spread per key using keyBy", t => { + t.plan(5); + const input = [ + { key: "a", val: 1 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, + { key: "c", val: 4 }, + ]; + const results = [ + { key: "a", val: 2 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, + { key: "c", val: 5 }, + ]; + const destinationStreamKeys = []; + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + }, + }); + const construct = (destKey: string) => { + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + return { + ...chunk, + val: chunk.val + 1, + }; + }); + + dest.pipe(sink); + return dest; + }; + + const demuxed = demux( + construct, + { keyBy: (chunk: any) => chunk.key }, + { objectMode: true }, + ); + demuxed.on("finish", () => { + expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); + t.pass(); + t.end(); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); + +test.cb("should emit errors", t => { + t.plan(2); + const input = [ + { key: "a", val: 1 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, + { key: "a", val: 4 }, + ]; + const results = [ + { key: "a", val: 2 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, + { key: "a", val: 5 }, + ]; + const destinationStreamKeys = []; + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + }, + }).on("unpipe", e => console.log("sink err")); + + const construct = (destKey: string) => { + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + if (chunk.key === "b") { + throw new Error("Caught object with key 'b'"); + } + return { + ...chunk, + val: chunk.val + 1, + }; + }).on("error", e => console.log("got err")); + + dest.pipe(sink); + return dest; + }; + + const demuxed = demux( + construct, + { keyBy: (chunk: any) => chunk.key }, + { objectMode: true }, + ); + demuxed.on("error", e => { + expect(e.message).to.equal("Caught object with key 'b'"); + t.pass(); + t.end(); + }); + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); diff --git a/tslint.json b/tslint.json index becd92c..03b2e44 100644 --- a/tslint.json +++ b/tslint.json @@ -9,6 +9,7 @@ "no-implicit-dependencies": [true, "dev"], "prettier": [true, ".prettierrc"], "ordered-imports": false, - "interface-name": false + "interface-name": false, + "object-literal-sort-keys": false } }