From ae7c9d6b09a2966ab97c6768ba92aad0c514134b Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sat, 7 Sep 2019 14:27:55 -0400 Subject: [PATCH] Add test for highwatermark --- tests/compose.spec.ts | 87 ++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 29 deletions(-) diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 362f484..1e8b57a 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -72,10 +72,15 @@ test.cb( test.cb( "compose() should emit drain event after 1 second when first stream is bottleneck", t => { - t.plan(1); + t.plan(6); + interface Chunk { + index: number; + mapped: string[]; + } const first = map( - async (chunk: number) => { + async (chunk: Chunk) => { await sleep(200); + chunk.mapped.push("first"); return chunk; }, { @@ -84,7 +89,8 @@ test.cb( ); const second = map( - async (chunk: number) => { + async (chunk: Chunk) => { + chunk.mapped.push("second"); return chunk; }, { objectMode: true }, @@ -103,8 +109,11 @@ test.cb( t.pass(); }); - composed.on("data", chunk => { - if (chunk.data === 5) { + composed.on("data", (chunk: Chunk) => { + expect(chunk.mapped.length).to.equal(2); + expect(chunk.mapped).to.deep.equal(["first", "second"]); + t.pass(); + if (chunk.index === 5) { t.end(); } }); @@ -127,9 +136,14 @@ test.cb( test.cb( "compose() should emit drain event immediately when second stream is bottleneck", t => { - t.plan(1); + t.plan(6); + interface Chunk { + index: number; + mapped: string[]; + } const first = map( - async (chunk: number) => { + async (chunk: Chunk) => { + chunk.mapped.push("first"); return chunk; }, { @@ -138,8 +152,9 @@ test.cb( ); const second = map( - async (chunk: number) => { + async (chunk: Chunk) => { await sleep(500); + chunk.mapped.push("second"); return chunk; }, { objectMode: true }, @@ -158,18 +173,21 @@ test.cb( t.pass(); }); - composed.on("data", chunk => { - if (chunk.data === 5) { + composed.on("data", (chunk: Chunk) => { + expect(chunk.mapped.length).to.equal(2); + expect(chunk.mapped).to.deep.equal(["first", "second"]); + t.pass(); + if (chunk.index === 5) { t.end(); } }); const input = [ - { data: 1 }, - { data: 2 }, - { data: 3 }, - { data: 4 }, - { data: 5 }, + { index: 1, mapped: [] }, + { index: 2, mapped: [] }, + { index: 3, mapped: [] }, + { index: 4, mapped: [] }, + { index: 5, mapped: [] }, ]; input.forEach(item => { @@ -180,13 +198,17 @@ test.cb( ); test.cb( - "first should contain up to highWaterMark items in readable state when second is bottleneck", + "compose() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck", t => { - t.plan(10); + t.plan(6); + interface Chunk { + index: number; + mapped: string[]; + } const first = map( - async (chunk: number) => { + async (chunk: Chunk) => { expect(first._readableState.length).to.be.at.most(2); - t.pass(); + chunk.mapped.push("first"); return chunk; }, { @@ -196,10 +218,10 @@ test.cb( ); const second = map( - async (chunk: number) => { + async (chunk: Chunk) => { expect(second._writableState.length).to.be.equal(1); - t.pass(); await sleep(100); + chunk.mapped.push("second"); return chunk; }, { objectMode: true, highWaterMark: 2 }, @@ -207,24 +229,31 @@ test.cb( const composed = compose( [first, second], - { objectMode: true }, + { objectMode: true, highWaterMark: 3 }, ); composed.on("error", err => { t.end(err); }); - composed.on("data", chunk => { - if (chunk.data === 5) { + composed.on("data", (chunk: Chunk) => { + expect(chunk.mapped.length).to.equal(2); + expect(chunk.mapped).to.deep.equal(["first", "second"]); + t.pass(); + if (chunk.index === 5) { t.end(); } }); + composed.on("drain", () => { + t.pass(); + }); + const input = [ - { data: 1 }, - { data: 2 }, - { data: 3 }, - { data: 4 }, - { data: 5 }, + { index: 1, mapped: [] }, + { index: 2, mapped: [] }, + { index: 3, mapped: [] }, + { index: 4, mapped: [] }, + { index: 5, mapped: [] }, ]; input.forEach(item => {