diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index aedbe6c..f01723a 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -172,6 +172,75 @@ test.cb("demux() should send input through correct pipeline using keyBy", t => { demuxed.end(); }); +test("demux() should return false after if it has >= highWaterMark items buffered and drain should be emitted", t => { + return new Promise(async (resolve, reject) => { + t.plan(7); + interface Chunk { + key: string; + mapped: number[]; + } + const input: Chunk[] = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + ]; + let pendingReads = input.length; + const highWaterMark = 5; + const slowProcessorSpeed = 25; + const construct = (destKey: string) => { + const first = map( + async (chunk: Chunk) => { + await sleep(slowProcessorSpeed); + return { ...chunk, mapped: [1] }; + }, + { highWaterMark: 1, objectMode: true }, + ); + + // to clear first + first.on("data", chunk => { + expect(chunk.mapped).to.deep.equal([1]); + pendingReads--; + if (pendingReads === 0) { + resolve(); + } + t.pass(); + }); + + return first; + }; + + const _demux = demux( + construct, + { key: "key" }, + { + objectMode: true, + highWaterMark, + }, + ); + + _demux.on("error", err => { + reject(); + }); + + for (const item of input) { + const res = _demux.write(item); + expect(_demux._writableState.length).to.be.at.most(highWaterMark); + if (!res) { + await new Promise((resolv, rej) => { + _demux.once("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + t.pass(); + resolv(); + }); + }); + } + } + }); +}); + test("demux() when write returns false, drain event should be emitted after at least slowProcessorSpeed * highWaterMark", t => { return new Promise(async (resolve, reject) => { t.plan(7); @@ -211,12 +280,7 @@ test("demux() when write returns false, drain event should be emitted after at l { highWaterMark: 1, objectMode: true }, ); - const second = map(async (chunk: Chunk) => { - chunk.mapped.push(2); - return chunk; - }); - - first.pipe(second).pipe(sink); + first.pipe(sink); return first; }; const _demux = demux( @@ -231,20 +295,18 @@ test("demux() when write returns false, drain event should be emitted after at l reject(); }); - let start = null; + let start = performance.now(); for (const item of input) { const res = _demux.write(item); - expect(_demux._writableState.length).to.be.at.most(highWaterMark); if (!res) { - start = performance.now(); await new Promise((resolv, rej) => { _demux.once("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan( slowProcessorSpeed * highWaterMark, ); t.pass(); resolv(); + start = performance.now(); }); }); }