diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 17cb325..f05a248 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -6,9 +6,20 @@ import { performance } from "perf_hooks"; test.cb("compose() chains two streams together in the correct order", t => { t.plan(3); + interface Chunk { + visited: number[]; + key: string; + } + let i = 0; - const first = map((chunk: number) => chunk + 1); - const second = map((chunk: number) => chunk * 2); + const first = map((chunk: Chunk) => { + chunk.visited.push(1); + return chunk; + }); + const second = map((chunk: Chunk) => { + chunk.visited.push(2); + return chunk; + }); const composed = compose( [first, second], @@ -16,7 +27,7 @@ test.cb("compose() chains two streams together in the correct order", t => { ); composed.on("data", data => { - expect(data).to.equal(result[i]); + expect(data).to.deep.equal(result[i]); t.pass(); i++; if (i === 3) { @@ -30,59 +41,82 @@ test.cb("compose() chains two streams together in the correct order", t => { t.end(); }); - const input = [1, 2, 3]; - const result = [4, 6, 8]; + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "c", visited: [] }, + ]; + const result = [ + { key: "a", visited: [1, 2] }, + { key: "b", visited: [1, 2] }, + { key: "c", visited: [1, 2] }, + ]; input.forEach(item => composed.write(item)); }); -test.cb( - "compose() followed by pipe chains streams together in the correct order", - t => { - t.plan(3); - let i = 0; - const first = map((chunk: number) => chunk + 1); - const second = map((chunk: number) => chunk * 2); +test.cb("piping compose() maintains correct order", t => { + t.plan(3); + interface Chunk { + visited: number[]; + key: string; + } + let i = 0; + const first = map((chunk: Chunk) => { + chunk.visited.push(1); + return chunk; + }); + const second = map((chunk: Chunk) => { + chunk.visited.push(2); + return chunk; + }); - const composed = compose( - [first, second], - { objectMode: true }, - ); - const third = map((chunk: number) => chunk + 1); - composed.pipe(third).on("data", data => { - expect(data).to.equal(result[i]); - t.pass(); - i++; - if (i === 3) { - t.end(); - } - }); + const composed = compose( + [first, second], + { objectMode: true }, + ); + const third = map((chunk: Chunk) => { + chunk.visited.push(3); + return chunk; + }); - composed.on("error", err => { - t.end(err); - }); + composed.pipe(third).on("data", data => { + expect(data).to.deep.equal(result[i]); + t.pass(); + i++; + if (i === 3) { + t.end(); + } + }); - const input = [1, 2, 3]; - const result = [5, 7, 9]; + composed.on("error", err => { + t.end(err); + }); - input.forEach(item => composed.write(item)); - }, -); + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "c", visited: [] }, + ]; + const result = [ + { key: "a", visited: [1, 2, 3] }, + { key: "b", visited: [1, 2, 3] }, + { key: "c", visited: [1, 2, 3] }, + ]; -test.cb( - "compose() should emit drain event after 1 second when first stream is bottleneck", - t => { - t.plan(6); - let passedBottleneck = 0; + input.forEach(item => composed.write(item)); +}); + +test("compose() writable length should be less than highWaterMark when handing writes", async t => { + t.plan(7); + return new Promise(async (resolve, reject) => { interface Chunk { - index: number; - mapped: string[]; + key: string; + mapped: number[]; } const first = map( async (chunk: Chunk) => { - await sleep(200); - passedBottleneck++; - chunk.mapped.push("first"); + chunk.mapped.push(1); return chunk; }, { @@ -92,7 +126,141 @@ test.cb( const second = map( async (chunk: Chunk) => { - chunk.mapped.push("second"); + chunk.mapped.push(2); + return chunk; + }, + { objectMode: true }, + ); + + const composed = compose( + [first, second], + { objectMode: true, highWaterMark: 2 }, + ); + composed.on("error", err => { + reject(); + }); + + composed.on("drain", () => { + t.pass(); + expect(composed._writableState.length).to.be.equal(0); + }); + + composed.on("data", (chunk: Chunk) => { + if (chunk.key === "e") { + resolve(); + } + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "c", mapped: [] }, + { key: "d", mapped: [] }, + { key: "e", mapped: [] }, + ]; + + for (const item of input) { + const res = composed.write(item); + expect(composed._writableState.length).to.be.at.most(2); + t.pass(); + if (!res) { + await sleep(10); + } + } + }); +}); + +test("compose() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", async t => { + t.plan(7); + const _rate = 25; + return new Promise(async (resolve, reject) => { + interface Chunk { + key: string; + mapped: number[]; + } + const first = map( + async (chunk: Chunk) => { + await sleep(_rate); + chunk.mapped.push(1); + return chunk; + }, + { + objectMode: true, + }, + ); + + const second = map( + async (chunk: Chunk) => { + chunk.mapped.push(2); + return chunk; + }, + { objectMode: true }, + ); + + const composed = compose( + [first, second], + { objectMode: true, highWaterMark: 2 }, + ); + composed.on("error", err => { + reject(); + }); + + composed.on("drain", () => { + t.pass(); + expect(composed._writableState.length).to.be.equal(0); + expect(performance.now() - start).to.be.greaterThan(_rate); + }); + + composed.on("data", (chunk: Chunk) => { + if (chunk.key === "e") { + resolve(); + } + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "c", mapped: [] }, + { key: "d", mapped: [] }, + { key: "e", mapped: [] }, + ]; + + let start = performance.now(); + for (const item of input) { + const res = composed.write(item); + expect(composed._writableState.length).to.be.at.most(2); + t.pass(); + if (!res) { + start = performance.now(); + await sleep(100); + } + } + }); +}); + +test.cb( + "compose() should emit drain event after 500 ms when writing 5 items that take 100ms to process with a highWaterMark of 5 ", + t => { + t.plan(6); + const _rate = 100; + interface Chunk { + key: string; + mapped: number[]; + } + const first = map( + async (chunk: Chunk) => { + await sleep(_rate); + chunk.mapped.push(1); + return chunk; + }, + { + objectMode: true, + }, + ); + + const second = map( + async (chunk: Chunk) => { + chunk.mapped.push(2); return chunk; }, { objectMode: true }, @@ -106,33 +274,28 @@ test.cb( t.end(err); }); - composed.on("drain", err => { + composed.on("drain", () => { expect(composed._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.greaterThan(1000); + expect(performance.now() - start).to.be.greaterThan( + _rate * input.length, + ); t.pass(); }); composed.on("data", (chunk: Chunk) => { - // Since first is bottleneck, composed accumulates until cb is executed in first. Therefore buffer should contain 4, 3, 2, 1 then 0 elements - expect(composed._writableState.length).to.be.equal( - input.length - passedBottleneck, - ); - expect(chunk.mapped.length).to.equal(2); - expect(chunk.mapped).to.deep.equal(["first", "second"]); t.pass(); - if (chunk.index === 5) { + if (chunk.key === "e") { t.end(); } }); const input = [ - { index: 1, mapped: [] }, - { index: 2, mapped: [] }, - { index: 3, mapped: [] }, - { index: 4, mapped: [] }, - { index: 5, mapped: [] }, + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "c", mapped: [] }, + { key: "d", mapped: [] }, + { key: "e", mapped: [] }, ]; - input.forEach(item => { composed.write(item); }); @@ -177,7 +340,7 @@ test.cb( t.end(err); }); - composed.on("drain", err => { + composed.on("drain", () => { expect(composed._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.lessThan(100); t.pass(); @@ -275,3 +438,65 @@ test.cb( }); }, ); + +test.cb( + "compose() should not emit drain event writing 5 items to compose with a highWaterMark of 6", + t => { + t.plan(5); + const _rate = 100; + interface Chunk { + key: string; + mapped: number[]; + } + const first = map( + async (chunk: Chunk) => { + await sleep(_rate); + chunk.mapped.push(1); + return chunk; + }, + { + objectMode: true, + }, + ); + + const second = map( + async (chunk: Chunk) => { + chunk.mapped.push(2); + return chunk; + }, + { objectMode: true }, + ); + + const composed = compose( + [first, second], + { objectMode: true, highWaterMark: 6 }, + ); + + composed.on("error", err => { + t.end(err); + }); + + composed.on("drain", () => { + t.end(new Error("Drain should not be emitted")); + }); + + composed.on("data", (chunk: Chunk) => { + t.pass(); + if (chunk.key === "e") { + t.end(); + } + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "c", mapped: [] }, + { key: "d", mapped: [] }, + { key: "e", mapped: [] }, + ]; + + input.forEach(item => { + composed.write(item); + }); + }, +);