From fe0e53147c39c6ebd3e5dbeb81842118d58792f3 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Fri, 30 Aug 2019 09:33:29 -0400 Subject: [PATCH] Handle backpressure --- package.json | 2 +- src/functions/demux.ts | 54 +++++++++++++++++++++++++++++++++++------ src/functions/map.ts | 2 ++ tests/demux.spec.ts | 55 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 8 deletions(-) diff --git a/package.json b/package.json index d5a2ee0..58a655f 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ }, "scripts": { "test": "NODE_PATH=src node node_modules/.bin/ava 'tests/*.spec.ts' -e", - "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.ts", + "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.js", "test:all": "NODE_PATH=src node node_modules/.bin/ava", "lint": "tslint -p tsconfig.json", "validate:tslint": "tslint-config-prettier-check ./tslint.json", diff --git a/src/functions/demux.ts b/src/functions/demux.ts index ef39284..ef7a26a 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -14,10 +14,15 @@ export function demux( } class Demux extends Writable { - private keyMap: { - [key: string]: NodeJS.WritableStream | NodeJS.ReadWriteStream; + private streamsByKey: { + [key: string]: { + stream: NodeJS.WritableStream | NodeJS.ReadWriteStream; + writable: boolean; + }; }; private demuxer: (chunk: any) => string; + private isWritable: boolean; + private nonWritableStreams: Array; private construct: ( destKey?: string, ) => NodeJS.WritableStream | NodeJS.ReadWriteStream; @@ -36,16 +41,51 @@ class Demux extends Writable { } this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); this.construct = construct; - this.keyMap = {}; + this.streamsByKey = {}; + this.isWritable = true; } public _write(chunk: any, encoding: string, cb: any) { const destKey = this.demuxer(chunk); - if (this.keyMap[destKey] === undefined) { - this.keyMap[destKey] = this.construct(destKey).on("error", e => { - this.emit("error", e); + if (this.streamsByKey[destKey] === undefined) { + this.streamsByKey[destKey] = { + stream: this.construct(destKey), + writable: true, + }; + } + // Throttle when one stream is not writable anymore + // Set writable to false + // keep state of all the streams, if one is not writable demux shouldnt be writable + // Small optimization is to keep writing until you get a following event to the unwritable destination + + let res = false; + if (this.isWritable && this.streamsByKey[destKey].writable) { + res = this.streamsByKey[destKey].stream.write(chunk, encoding, cb); + } else if (this.isWritable) { + this.isWritable = false; + // Buffer chunk? + return this.isWritable; + } + + /* If write above returns false and the stream written to was writable previously, we need to make demux + * non-writable and update state to know the stream is nonWritable. + * If write returns true and the stream was previously not writable, we need to update which streams + * are non writable and determine if it is safe for demux to become writable (all streams are writable) + */ + if (!res) { + this.streamsByKey[destKey].writable = false; + this.nonWritableStreams.push(destKey); + this.isWritable = false; + this.streamsByKey[destKey].stream.once("drain", () => { + this.streamsByKey[destKey].writable = true; + this.nonWritableStreams = this.nonWritableStreams.filter( + key => key !== destKey, + ); + + this.isWritable = this.nonWritableStreams.length === 0; }); } - return this.keyMap[destKey].write(chunk, encoding, cb); + + return this.writable; } } diff --git a/src/functions/map.ts b/src/functions/map.ts index 0bf708d..61e84d5 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -14,6 +14,7 @@ export function map( writableObjectMode: true, }, ): Transform { + // remove try catch return new Transform({ ...options, async transform(chunk: T, encoding, callback) { @@ -22,6 +23,7 @@ export function map( this.push(mapped); callback(); } catch (err) { + console.log("caught error", err.message); callback(err); } }, diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 4ed3e3f..cdc91d5 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -107,3 +107,58 @@ test.cb("should spread per key using keyBy", t => { input.forEach(event => demuxed.write(event)); demuxed.end(); }); + +test.cb("should emit errors", t => { + t.plan(2); + const input = [ + { key: "a", val: 1 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, + { key: "a", val: 4 }, + ]; + const results = [ + { key: "a", val: 2 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, + { key: "a", val: 5 }, + ]; + const destinationStreamKeys = []; + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + }, + }).on("unpipe", e => console.log("sink err")); + + const construct = (destKey: string) => { + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + if (chunk.key === "b") { + throw new Error("Caught object with key 'b'"); + } + return { + ...chunk, + val: chunk.val + 1, + }; + }).on("error", e => console.log("got err")); + + dest.pipe(sink); + return dest; + }; + + const demuxed = demux( + construct, + { keyBy: (chunk: any) => chunk.key }, + { objectMode: true }, + ); + demuxed.on("error", e => { + expect(e.message).to.equal("Caught object with key 'b'"); + t.pass(); + t.end(); + }); + input.forEach(event => demuxed.write(event)); + demuxed.end(); +});