From 9b09a3f9490896851936f3895c91a8b99b156de8 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Wed, 28 Aug 2019 17:01:51 -0400 Subject: [PATCH 1/5] Add demux --- src/functions/baseFunctions.ts | 1 + src/functions/demux.ts | 49 +++++++++++++++++++++++++++++ src/functions/index.ts | 10 ++++++ src/index.ts | 1 + tests/demux.spec.ts | 57 ++++++++++++++++++++++++++++++++++ tslint.json | 3 +- 6 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 src/functions/demux.ts create mode 100644 tests/demux.spec.ts diff --git a/src/functions/baseFunctions.ts b/src/functions/baseFunctions.ts index 005aa3a..6ff480a 100644 --- a/src/functions/baseFunctions.ts +++ b/src/functions/baseFunctions.ts @@ -20,3 +20,4 @@ export { split } from "./split"; export { stringify } from "./stringify"; export { unbatch } from "./unbatch"; export { compose } from "./compose"; +export { demux } from "./demux"; diff --git a/src/functions/demux.ts b/src/functions/demux.ts new file mode 100644 index 0000000..a0fead2 --- /dev/null +++ b/src/functions/demux.ts @@ -0,0 +1,49 @@ +import { WritableOptions, Writable } from "stream"; + +/** + * Return a Duplex stream that is pushed data from multiple sources + * @param streams Source streams to multiplex + * @param options Duplex stream options + */ +export function demux( + construct: () => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxBy: { key?: string; keyBy?: (chunk: any) => string }, + options?: WritableOptions, +): Writable { + return new Demux(construct, demuxBy, options); +} + +class Demux extends Writable { + private keyMap: object; + private demuxer: (chunk: any) => string; + private construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream; + constructor( + construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxBy: { key?: string; keyBy?: (chunk: any) => string }, + options?: WritableOptions, + ) { + super(options); + if (demuxBy.keyBy === undefined && demuxBy.key === undefined) { + throw new Error("Need one"); + } + this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); + this.construct = construct; + this.keyMap = {}; + } + + public write(chunk: any, encoding?: any, cb?: any): boolean { + const destKey = this.demuxer(chunk); + if (this.keyMap[destKey] === undefined) { + this.keyMap[destKey] = this.construct(destKey); + } + const writeRes = this.keyMap[destKey].write(chunk); + if (cb !== undefined) { + cb(); + } + return writeRes; + } +} diff --git a/src/functions/index.ts b/src/functions/index.ts index 21ad7f9..ce50218 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -297,3 +297,13 @@ export function compose( options, ); } + +export function demux( + construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxer: { key?: string; keyBy?: (chunk: any) => string }, + options?: DuplexOptions, +) { + return baseFunctions.demux(construct, demuxer, options); +} diff --git a/src/index.ts b/src/index.ts index 98b0a45..924b246 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,4 +22,5 @@ export { accumulator, accumulatorBy, compose, + demux, } from "./functions"; diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts new file mode 100644 index 0000000..3c124b9 --- /dev/null +++ b/tests/demux.spec.ts @@ -0,0 +1,57 @@ +import test from "ava"; +import { expect } from "chai"; +import { demux, map } from "../src"; +import { Readable, Transform } from "stream"; + +interface Test { + key: string; + val: number; +} +test.cb("should spread per key", t => { + t.plan(5); + const input = [ + { key: "a", val: 1 }, + { key: "a", val: 2 }, + { key: "b", val: 3 }, + { key: "c", val: 4 }, + ]; + const results = [ + { key: "a", val: 2 }, + { key: "a", val: 3 }, + { key: "b", val: 4 }, + { key: "c", val: 5 }, + ]; + const destKeys = []; + const dests = []; + let i = 0; + + const construct = (destKey: string) => { + destKeys.push(destKey); + const dest = map((chunk: Test) => ({ + ...chunk, + val: chunk.val + 1, + })) + .on("data", (d: Test) => { + expect(results).to.deep.include(d); + t.pass(); + }) + .on("end", () => { + i++; + if (i === dests.length) { + t.end(); + } + }); + dests.push(dest); + return dest; + }; + + const demuxed = demux(construct, { key: "key" }, { objectMode: true }); + demuxed.on("finish", () => { + expect(destKeys).to.deep.equal(["a", "b", "c"]); + t.pass(); + dests.forEach(dest => dest.end()); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); diff --git a/tslint.json b/tslint.json index becd92c..03b2e44 100644 --- a/tslint.json +++ b/tslint.json @@ -9,6 +9,7 @@ "no-implicit-dependencies": [true, "dev"], "prettier": [true, ".prettierrc"], "ordered-imports": false, - "interface-name": false + "interface-name": false, + "object-literal-sort-keys": false } } From 685215bee6d5652ff1e571467c4ef6fd256dee3e Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Wed, 28 Aug 2019 17:04:31 -0400 Subject: [PATCH 2/5] Add test for keyBy --- tests/demux.spec.ts | 54 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 3c124b9..baf29aa 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -1,7 +1,6 @@ import test from "ava"; import { expect } from "chai"; import { demux, map } from "../src"; -import { Readable, Transform } from "stream"; interface Test { key: string; @@ -55,3 +54,56 @@ test.cb("should spread per key", t => { input.forEach(event => demuxed.write(event)); demuxed.end(); }); + +test.cb("should spread per key using keyBy", t => { + t.plan(5); + const input = [ + { key: "a", val: 1 }, + { key: "a", val: 2 }, + { key: "b", val: 3 }, + { key: "c", val: 4 }, + ]; + const results = [ + { key: "a", val: 2 }, + { key: "a", val: 3 }, + { key: "b", val: 4 }, + { key: "c", val: 5 }, + ]; + const destKeys = []; + const dests = []; + let i = 0; + + const construct = (destKey: string) => { + destKeys.push(destKey); + const dest = map((chunk: Test) => ({ + ...chunk, + val: chunk.val + 1, + })) + .on("data", (d: Test) => { + expect(results).to.deep.include(d); + t.pass(); + }) + .on("end", () => { + i++; + if (i === dests.length) { + t.end(); + } + }); + dests.push(dest); + return dest; + }; + + const demuxed = demux( + construct, + { keyBy: (chunk: any) => chunk.key }, + { objectMode: true }, + ); + demuxed.on("finish", () => { + expect(destKeys).to.deep.equal(["a", "b", "c"]); + t.pass(); + dests.forEach(dest => dest.end()); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); From 9765e6cb49fee9636ba0cdf1c1343206836a8cc7 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 29 Aug 2019 08:50:11 -0400 Subject: [PATCH 3/5] Update tests to write to sink --- tests/demux.spec.ts | 107 +++++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index baf29aa..dcd8ad0 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -1,6 +1,7 @@ import test from "ava"; import { expect } from "chai"; import { demux, map } from "../src"; +import { Writable } from "stream"; interface Test { key: string; @@ -10,45 +11,48 @@ test.cb("should spread per key", t => { t.plan(5); const input = [ { key: "a", val: 1 }, - { key: "a", val: 2 }, - { key: "b", val: 3 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, { key: "c", val: 4 }, ]; const results = [ { key: "a", val: 2 }, - { key: "a", val: 3 }, - { key: "b", val: 4 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, { key: "c", val: 5 }, ]; - const destKeys = []; - const dests = []; + const destinationStreamKeys = []; let i = 0; - + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + i++; + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + if (i === 4) { + t.end(); + } + }, + }); const construct = (destKey: string) => { - destKeys.push(destKey); - const dest = map((chunk: Test) => ({ - ...chunk, - val: chunk.val + 1, - })) - .on("data", (d: Test) => { - expect(results).to.deep.include(d); - t.pass(); - }) - .on("end", () => { - i++; - if (i === dests.length) { - t.end(); - } - }); - dests.push(dest); + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + return { + ...chunk, + val: chunk.val + 1, + }; + }); + + dest.pipe(sink); return dest; }; const demuxed = demux(construct, { key: "key" }, { objectMode: true }); demuxed.on("finish", () => { - expect(destKeys).to.deep.equal(["a", "b", "c"]); + expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); t.pass(); - dests.forEach(dest => dest.end()); }); input.forEach(event => demuxed.write(event)); @@ -59,37 +63,41 @@ test.cb("should spread per key using keyBy", t => { t.plan(5); const input = [ { key: "a", val: 1 }, - { key: "a", val: 2 }, - { key: "b", val: 3 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, { key: "c", val: 4 }, ]; const results = [ { key: "a", val: 2 }, - { key: "a", val: 3 }, - { key: "b", val: 4 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, { key: "c", val: 5 }, ]; - const destKeys = []; - const dests = []; + const destinationStreamKeys = []; let i = 0; - + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + i++; + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + if (i === 4) { + t.end(); + } + }, + }); const construct = (destKey: string) => { - destKeys.push(destKey); - const dest = map((chunk: Test) => ({ - ...chunk, - val: chunk.val + 1, - })) - .on("data", (d: Test) => { - expect(results).to.deep.include(d); - t.pass(); - }) - .on("end", () => { - i++; - if (i === dests.length) { - t.end(); - } - }); - dests.push(dest); + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + return { + ...chunk, + val: chunk.val + 1, + }; + }); + + dest.pipe(sink); return dest; }; @@ -99,9 +107,8 @@ test.cb("should spread per key using keyBy", t => { { objectMode: true }, ); demuxed.on("finish", () => { - expect(destKeys).to.deep.equal(["a", "b", "c"]); + expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); t.pass(); - dests.forEach(dest => dest.end()); }); input.forEach(event => demuxed.write(event)); From 2524d51aa7b778d292450430775a54d184fbb384 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 29 Aug 2019 14:39:08 -0400 Subject: [PATCH 4/5] Allow CB to be called by construction streams --- src/functions/compose.ts | 6 ++++-- src/functions/demux.ts | 20 +++++++++++--------- tests/demux.spec.ts | 11 ++--------- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/functions/compose.ts b/src/functions/compose.ts index f53aa6c..40525a0 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -52,8 +52,10 @@ class Compose extends Duplex { return (this.last as NodeJS.ReadableStream).pipe(dest); } - public write(chunk: any) { - return (this.first as NodeJS.WritableStream).write(chunk); + public _write(chunk: any, encoding: string, cb: any) { + const res = (this.first as NodeJS.WritableStream).write(chunk); + cb(); + return res; } public on(event: string, cb: any) { diff --git a/src/functions/demux.ts b/src/functions/demux.ts index a0fead2..ef39284 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -14,7 +14,9 @@ export function demux( } class Demux extends Writable { - private keyMap: object; + private keyMap: { + [key: string]: NodeJS.WritableStream | NodeJS.ReadWriteStream; + }; private demuxer: (chunk: any) => string; private construct: ( destKey?: string, @@ -28,22 +30,22 @@ class Demux extends Writable { ) { super(options); if (demuxBy.keyBy === undefined && demuxBy.key === undefined) { - throw new Error("Need one"); + throw new Error( + "keyBy or key must be provided in second parameter", + ); } this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); this.construct = construct; this.keyMap = {}; } - public write(chunk: any, encoding?: any, cb?: any): boolean { + public _write(chunk: any, encoding: string, cb: any) { const destKey = this.demuxer(chunk); if (this.keyMap[destKey] === undefined) { - this.keyMap[destKey] = this.construct(destKey); + this.keyMap[destKey] = this.construct(destKey).on("error", e => { + this.emit("error", e); + }); } - const writeRes = this.keyMap[destKey].write(chunk); - if (cb !== undefined) { - cb(); - } - return writeRes; + return this.keyMap[destKey].write(chunk, encoding, cb); } } diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index dcd8ad0..4ed3e3f 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -26,14 +26,10 @@ test.cb("should spread per key", t => { const sink = new Writable({ objectMode: true, write(chunk, enc, cb) { - i++; expect(results).to.deep.include(chunk); expect(input).to.not.deep.include(chunk); t.pass(); cb(); - if (i === 4) { - t.end(); - } }, }); const construct = (destKey: string) => { @@ -53,6 +49,7 @@ test.cb("should spread per key", t => { demuxed.on("finish", () => { expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); t.pass(); + t.end(); }); input.forEach(event => demuxed.write(event)); @@ -74,18 +71,13 @@ test.cb("should spread per key using keyBy", t => { { key: "c", val: 5 }, ]; const destinationStreamKeys = []; - let i = 0; const sink = new Writable({ objectMode: true, write(chunk, enc, cb) { - i++; expect(results).to.deep.include(chunk); expect(input).to.not.deep.include(chunk); t.pass(); cb(); - if (i === 4) { - t.end(); - } }, }); const construct = (destKey: string) => { @@ -109,6 +101,7 @@ test.cb("should spread per key using keyBy", t => { demuxed.on("finish", () => { expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); t.pass(); + t.end(); }); input.forEach(event => demuxed.write(event)); From fe0e53147c39c6ebd3e5dbeb81842118d58792f3 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Fri, 30 Aug 2019 09:33:29 -0400 Subject: [PATCH 5/5] Handle backpressure --- package.json | 2 +- src/functions/demux.ts | 54 +++++++++++++++++++++++++++++++++++------ src/functions/map.ts | 2 ++ tests/demux.spec.ts | 55 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 8 deletions(-) diff --git a/package.json b/package.json index d5a2ee0..58a655f 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ }, "scripts": { "test": "NODE_PATH=src node node_modules/.bin/ava 'tests/*.spec.ts' -e", - "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.ts", + "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.js", "test:all": "NODE_PATH=src node node_modules/.bin/ava", "lint": "tslint -p tsconfig.json", "validate:tslint": "tslint-config-prettier-check ./tslint.json", diff --git a/src/functions/demux.ts b/src/functions/demux.ts index ef39284..ef7a26a 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -14,10 +14,15 @@ export function demux( } class Demux extends Writable { - private keyMap: { - [key: string]: NodeJS.WritableStream | NodeJS.ReadWriteStream; + private streamsByKey: { + [key: string]: { + stream: NodeJS.WritableStream | NodeJS.ReadWriteStream; + writable: boolean; + }; }; private demuxer: (chunk: any) => string; + private isWritable: boolean; + private nonWritableStreams: Array; private construct: ( destKey?: string, ) => NodeJS.WritableStream | NodeJS.ReadWriteStream; @@ -36,16 +41,51 @@ class Demux extends Writable { } this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); this.construct = construct; - this.keyMap = {}; + this.streamsByKey = {}; + this.isWritable = true; } public _write(chunk: any, encoding: string, cb: any) { const destKey = this.demuxer(chunk); - if (this.keyMap[destKey] === undefined) { - this.keyMap[destKey] = this.construct(destKey).on("error", e => { - this.emit("error", e); + if (this.streamsByKey[destKey] === undefined) { + this.streamsByKey[destKey] = { + stream: this.construct(destKey), + writable: true, + }; + } + // Throttle when one stream is not writable anymore + // Set writable to false + // keep state of all the streams, if one is not writable demux shouldnt be writable + // Small optimization is to keep writing until you get a following event to the unwritable destination + + let res = false; + if (this.isWritable && this.streamsByKey[destKey].writable) { + res = this.streamsByKey[destKey].stream.write(chunk, encoding, cb); + } else if (this.isWritable) { + this.isWritable = false; + // Buffer chunk? + return this.isWritable; + } + + /* If write above returns false and the stream written to was writable previously, we need to make demux + * non-writable and update state to know the stream is nonWritable. + * If write returns true and the stream was previously not writable, we need to update which streams + * are non writable and determine if it is safe for demux to become writable (all streams are writable) + */ + if (!res) { + this.streamsByKey[destKey].writable = false; + this.nonWritableStreams.push(destKey); + this.isWritable = false; + this.streamsByKey[destKey].stream.once("drain", () => { + this.streamsByKey[destKey].writable = true; + this.nonWritableStreams = this.nonWritableStreams.filter( + key => key !== destKey, + ); + + this.isWritable = this.nonWritableStreams.length === 0; }); } - return this.keyMap[destKey].write(chunk, encoding, cb); + + return this.writable; } } diff --git a/src/functions/map.ts b/src/functions/map.ts index 0bf708d..61e84d5 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -14,6 +14,7 @@ export function map( writableObjectMode: true, }, ): Transform { + // remove try catch return new Transform({ ...options, async transform(chunk: T, encoding, callback) { @@ -22,6 +23,7 @@ export function map( this.push(mapped); callback(); } catch (err) { + console.log("caught error", err.message); callback(err); } }, diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 4ed3e3f..cdc91d5 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -107,3 +107,58 @@ test.cb("should spread per key using keyBy", t => { input.forEach(event => demuxed.write(event)); demuxed.end(); }); + +test.cb("should emit errors", t => { + t.plan(2); + const input = [ + { key: "a", val: 1 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, + { key: "a", val: 4 }, + ]; + const results = [ + { key: "a", val: 2 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, + { key: "a", val: 5 }, + ]; + const destinationStreamKeys = []; + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + }, + }).on("unpipe", e => console.log("sink err")); + + const construct = (destKey: string) => { + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + if (chunk.key === "b") { + throw new Error("Caught object with key 'b'"); + } + return { + ...chunk, + val: chunk.val + 1, + }; + }).on("error", e => console.log("got err")); + + dest.pipe(sink); + return dest; + }; + + const demuxed = demux( + construct, + { keyBy: (chunk: any) => chunk.key }, + { objectMode: true }, + ); + demuxed.on("error", e => { + expect(e.message).to.equal("Caught object with key 'b'"); + t.pass(); + t.end(); + }); + input.forEach(event => demuxed.write(event)); + demuxed.end(); +});