From 2cbeae38e7e1ceced2c20697f47c82708c1966fb Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sat, 7 Sep 2019 17:14:08 -0400 Subject: [PATCH] Test readable length in first --- tests/compose.spec.ts | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 1e8b57a..17cb325 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -73,6 +73,7 @@ test.cb( "compose() should emit drain event after 1 second when first stream is bottleneck", t => { t.plan(6); + let passedBottleneck = 0; interface Chunk { index: number; mapped: string[]; @@ -80,6 +81,7 @@ test.cb( const first = map( async (chunk: Chunk) => { await sleep(200); + passedBottleneck++; chunk.mapped.push("first"); return chunk; }, @@ -98,18 +100,23 @@ test.cb( const composed = compose( [first, second], - { objectMode: true, highWaterMark: 2 }, + { objectMode: true, highWaterMark: 5 }, ); composed.on("error", err => { t.end(err); }); composed.on("drain", err => { + expect(composed._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan(1000); t.pass(); }); composed.on("data", (chunk: Chunk) => { + // Since first is bottleneck, composed accumulates until cb is executed in first. Therefore buffer should contain 4, 3, 2, 1 then 0 elements + expect(composed._writableState.length).to.be.equal( + input.length - passedBottleneck, + ); expect(chunk.mapped.length).to.equal(2); expect(chunk.mapped).to.deep.equal(["first", "second"]); t.pass(); @@ -119,11 +126,11 @@ test.cb( }); const input = [ - { data: 1 }, - { data: 2 }, - { data: 3 }, - { data: 4 }, - { data: 5 }, + { index: 1, mapped: [] }, + { index: 2, mapped: [] }, + { index: 3, mapped: [] }, + { index: 4, mapped: [] }, + { index: 5, mapped: [] }, ]; input.forEach(item => { @@ -153,27 +160,32 @@ test.cb( const second = map( async (chunk: Chunk) => { + pendingReads--; await sleep(500); + expect(first._readableState.length).to.equal(pendingReads); chunk.mapped.push("second"); return chunk; }, - { objectMode: true }, + { objectMode: true, highWaterMark: 1 }, ); const composed = compose( [first, second], - { objectMode: true, highWaterMark: 2 }, + { objectMode: true, highWaterMark: 5 }, ); composed.on("error", err => { t.end(err); }); composed.on("drain", err => { + expect(composed._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.lessThan(100); t.pass(); }); composed.on("data", (chunk: Chunk) => { + // Since second is bottleneck, composed will write into first immediately. Buffer should be empty. + expect(composed._writableState.length).to.be.equal(0); expect(chunk.mapped.length).to.equal(2); expect(chunk.mapped).to.deep.equal(["first", "second"]); t.pass(); @@ -189,6 +201,7 @@ test.cb( { index: 4, mapped: [] }, { index: 5, mapped: [] }, ]; + let pendingReads = input.length; input.forEach(item => { composed.write(item); @@ -229,7 +242,7 @@ test.cb( const composed = compose( [first, second], - { objectMode: true, highWaterMark: 3 }, + { objectMode: true, highWaterMark: 5 }, ); composed.on("error", err => { t.end(err); @@ -245,6 +258,7 @@ test.cb( }); composed.on("drain", () => { + expect(composed._writableState.length).to.be.equal(0); t.pass(); });