From 9d280b16626f488c56806cd5f41876e40db2599e Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Tue, 10 Sep 2019 18:13:13 -0400 Subject: [PATCH] Wait for drain when write returns false in demux --- src/functions/demux.ts | 21 ++--- tests/demux.spec.ts | 184 +++++++++++++---------------------------- 2 files changed, 62 insertions(+), 143 deletions(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index e26dafb..0ad2e57 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -62,7 +62,7 @@ class Demux extends Writable { } // Throttles when one stream is not writable - public _write(chunk: any, encoding?: any, cb?: any) { + public async _write(chunk: any, encoding: any, cb: any) { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { this.streamsByKey[destKey] = { @@ -70,21 +70,12 @@ class Demux extends Writable { writable: true, }; } - let res = false; - if (this.streamsByKey[destKey].writable && this.isWritable) { - res = this.streamsByKey[destKey].stream.write(chunk, encoding, cb); - } - if (!res && this.isWritable) { - this.isWritable = false; - this.streamsByKey[destKey].writable = false; - this.nonWritableStreams.push(destKey); - this.streamsByKey[destKey].stream.once("drain", () => { - this.nonWritableStreams.filter(key => key !== destKey); - this.isWritable = this.nonWritableStreams.length === 0; - this.streamsByKey[destKey].stream.write(chunk, encoding, cb); - if (this.isWritable) { + if (!this.streamsByKey[destKey].stream.write(chunk, encoding, cb)) { + await new Promise((resolve, reject) => { + this.streamsByKey[destKey].stream.once("drain", () => { + resolve(); this.emit("drain"); - } + }); }); } } diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 06cec0e..bde4ffe 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -172,6 +172,7 @@ test.cb("demux() should send input through correct pipeline using keyBy", t => { demuxed.end(); }); +// Probably needs to be removed test.cb("should emit errors", t => { t.plan(2); let index = 0; @@ -209,7 +210,7 @@ test.cb("should emit errors", t => { _chunk.visited.push(index); index++; return _chunk; - }).on("error", () => {}); + }).on("error", () => {}); // Otherwise ava complains dest.pipe(sink); return dest; @@ -226,18 +227,26 @@ test.cb("should emit errors", t => { t.end(); }); input.forEach(event => demuxed.write(event)); - demuxed.end(); }); -test("demux() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", t => { - t.plan(7); - interface Chunk { - key: string; - mapped: number[]; - } - const highWaterMark = 5; - const _rate = 25; +test("demux() when write returns false, drain event should be emitted after at least slowProcessorSpeed * highWaterMark", t => { return new Promise(async (resolve, reject) => { + t.plan(7); + interface Chunk { + key: string; + mapped: number[]; + } + const input: Chunk[] = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + ]; + let pendingReads = input.length; + const highWaterMark = 5; + const slowProcessorSpeed = 25; const sink = new Writable({ objectMode: true, write(chunk, encoding, cb) { @@ -251,7 +260,7 @@ test("demux() should emit drain event ~rate * highWaterMark ms for every write t }); const construct = (destKey: string) => { const first = map(async (chunk: Chunk) => { - await sleep(_rate); + await sleep(slowProcessorSpeed); chunk.mapped.push(1); return chunk; }); @@ -278,40 +287,45 @@ test("demux() should emit drain event ~rate * highWaterMark ms for every write t _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.greaterThan(_rate); + expect(performance.now() - start).to.be.greaterThan( + slowProcessorSpeed * highWaterMark, + ); t.pass(); }); - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - let pendingReads = input.length; - - let start = performance.now(); + let start = null; for (const item of input) { const res = _demux.write(item); expect(_demux._writableState.length).to.be.at.most(highWaterMark); if (!res) { start = performance.now(); - await sleep(100); + await new Promise((resolv, rej) => { + _demux.once("drain", () => { + resolv(); + }); + }); } } }); }); test("demux() should emit one drain event when writing 6 items with highWaterMark of 5", t => { - t.plan(7); - const highWaterMark = 5; return new Promise(async (resolve, reject) => { + t.plan(7); interface Chunk { key: string; mapped: number[]; } + const highWaterMark = 5; + const input = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + ]; + let pendingReads = input.length; const sink = new Writable({ objectMode: true, write(chunk, encoding, cb) { @@ -354,110 +368,26 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar t.pass(); }); - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - let pendingReads = input.length; - for (const item of input) { const res = _demux.write(item); expect(_demux._writableState.length).to.be.at.most(highWaterMark); if (!res) { - await sleep(10); + await new Promise((_resolve, _reject) => { + _demux.once("drain", () => { + _resolve(); + }); + }); } } }); }); -test.cb( - "demux() should emit drain event after 500 ms when writing 5 items that take 100ms to process with a highWaterMark of 5 ", - t => { - t.plan(6); - const _rate = 100; - const highWaterMark = 5; - interface Chunk { - key: string; - mapped: number[]; - } - const sink = new Writable({ - objectMode: true, - write(chunk, encoding, cb) { - t.pass(); - cb(); - if (pendingReads === 0) { - t.end(); - } - }, - }); - const construct = (destKey: string) => { - const first = map( - async (chunk: Chunk) => { - chunk.mapped.push(1); - await sleep(_rate); - return chunk; - }, - { objectMode: true }, - ); - - const second = map( - (chunk: Chunk) => { - pendingReads--; - chunk.mapped.push(2); - return chunk; - }, - { objectMode: true, highWaterMark: 1 }, - ); - - first.pipe(second).pipe(sink); - return first; - }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); - _demux.on("error", err => { - t.end(err); - }); - - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.greaterThan( - _rate * input.length, - ); - t.pass(); - }); - - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - - let pendingReads = input.length; - input.forEach(item => { - _demux.write(item); - }); - const start = performance.now(); - }, -); - test.cb( "demux() should emit drain event immediately when second stream is bottleneck", t => { t.plan(6); const highWaterMark = 5; - const _rate = 200; + const slowProcessorSpeed = 200; interface Chunk { key: string; mapped: number[]; @@ -484,7 +414,7 @@ test.cb( const second = map( async (chunk: Chunk) => { pendingReads--; - await sleep(_rate); + await sleep(slowProcessorSpeed); chunk.mapped.push(2); expect(second._writableState.length).to.be.equal(1); expect(first._readableState.length).to.equal(pendingReads); @@ -510,7 +440,9 @@ test.cb( _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.lessThan(_rate); + expect(performance.now() - start).to.be.lessThan( + slowProcessorSpeed, + ); t.pass(); }); @@ -530,7 +462,7 @@ test.cb( }, ); -test("demux() should only emit drain event when all streams are writable", t => { +test.only("demux() should only emit drain event when all streams are writable", t => { t.plan(1); const highWaterMark = 2; interface Chunk { @@ -558,12 +490,12 @@ test("demux() should only emit drain event when all streams are writable", t => chunk.mapped.push(1); return chunk; }, - { objectMode: true }, + { objectMode: true, highWaterMark: 1 }, ); const second = map( async (chunk: Chunk) => { - await sleep(50); + await sleep(2000); chunk.mapped.push(2); return chunk; }, @@ -578,11 +510,10 @@ test("demux() should only emit drain event when all streams are writable", t => { key: "key" }, { objectMode: true, - highWaterMark, }, ); _demux.on("error", err => { - reject(); + reject(err); }); const input = [ @@ -590,16 +521,13 @@ test("demux() should only emit drain event when all streams are writable", t => { key: "a", mapped: [] }, { key: "c", mapped: [] }, { key: "c", mapped: [] }, - { key: "b", mapped: [] }, // should only be recieved after a becomes writable + { key: "b", mapped: [] }, // should only be recieved after a and c become writable ]; let pendingReads = input.length; const start = performance.now(); for (const item of input) { - const res = _demux.write(item); - if (!res) { - await sleep(50); - } + console.log("DEMUX", _demux.write(item)); } }); });