From ea2ffdb38cf2cfdb8431dde97129067be5f21ba8 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Mon, 9 Sep 2019 13:47:38 -0400 Subject: [PATCH] Add test for unwritable streams in demux --- 1:p | 589 ++++++++++++++++++++++++++++++++++++++++++ tests/compose.spec.ts | 8 +- tests/demux.spec.ts | 102 +++++++- 3 files changed, 684 insertions(+), 15 deletions(-) create mode 100644 1:p diff --git a/1:p b/1:p new file mode 100644 index 0000000..5f5fc0a --- /dev/null +++ b/1:p @@ -0,0 +1,589 @@ +import test from "ava"; +import { expect } from "chai"; +const { demux, map } = require("../src"); +import { Writable } from "stream"; +const sinon = require("sinon"); +const { sleep } = require("../src/helpers"); +import { performance } from "perf_hooks"; + +interface Test { + key: string; + visited: number[]; +} +test.cb("demux() constructor should be called once per key", t => { + t.plan(1); + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, + ]; + const construct = sinon.spy((destKey: string) => { + const dest = map((chunk: Test) => { + chunk.visited.push(1); + return chunk; + }); + + return dest; + }); + + const demuxed = demux(construct, { key: "key" }, { objectMode: true }); + + demuxed.on("finish", () => { + expect(construct.withArgs("a").callCount).to.equal(1); + expect(construct.withArgs("b").callCount).to.equal(1); + expect(construct.withArgs("c").callCount).to.equal(1); + t.pass(); + t.end(); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); + +test.cb("demux() constructor should be called once per key using keyBy", t => { + t.plan(1); + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, + ]; + + const construct = sinon.spy((destKey: string) => { + const dest = map((chunk: Test) => { + chunk.visited.push(1); + return chunk; + }); + + return dest; + }); + + const demuxed = demux( + construct, + { keyBy: item => item.key }, + { objectMode: true }, + ); + + demuxed.on("finish", () => { + expect(construct.withArgs("a").callCount).to.equal(1); + expect(construct.withArgs("b").callCount).to.equal(1); + expect(construct.withArgs("c").callCount).to.equal(1); + t.pass(); + t.end(); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); + +test.cb("should emit errors", t => { + t.plan(2); + let index = 0; + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "a", visited: [] }, + ]; + const results = [ + { key: "a", visited: [0] }, + { key: "b", visited: [1] }, + { key: "a", visited: [2] }, + { key: "a", visited: [3] }, + ]; + const destinationStreamKeys = []; + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + }, + }); + + const construct = (destKey: string) => { + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + if (chunk.key === "b") { + throw new Error("Caught object with key 'b'"); + } + + const _chunk = { ...chunk, visited: [] }; + _chunk.visited.push(index); + index++; + return _chunk; + }).on("error", () => {}); + + dest.pipe(sink); + return dest; + }; + + const demuxed = demux( + construct, + { keyBy: (chunk: any) => chunk.key }, + { objectMode: true }, + ); + demuxed.on("error", e => { + expect(e.message).to.equal("Caught object with key 'b'"); + t.pass(); + t.end(); + }); + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); + +test("demux() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", t => { + t.plan(7); + interface Chunk { + key: string; + mapped: number[]; + } + const highWaterMark = 5; + const _rate = 25; + return new Promise(async (resolve, reject) => { + const sink = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + cb(); + t.pass(); + pendingReads--; + if (pendingReads === 0) { + resolve(); + } + }, + }); + const construct = (destKey: string) => { + const first = map(async (chunk: Chunk) => { + await sleep(_rate); + chunk.mapped.push(1); + return chunk; + }); + + const second = map(async (chunk: Chunk) => { + chunk.mapped.push(2); + return chunk; + }); + + first.pipe(second).pipe(sink); + return first; + }; + const _demux = demux( + construct, + { key: "key" }, + { + objectMode: true, + highWaterMark, + }, + ); + _demux.on("error", err => { + reject(); + }); + + _demux.on("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + expect(performance.now() - start).to.be.greaterThan(_rate); + t.pass(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + ]; + let pendingReads = input.length; + + let start = performance.now(); + for (const item of input) { + const res = _demux.write(item); + expect(_demux._writableState.length).to.be.at.most(highWaterMark); + if (!res) { + start = performance.now(); + await sleep(100); + } + } + }); +}); + +test("demux() should emit one drain event when writing 6 items with highWaterMark of 5", t => { + t.plan(7); + const highWaterMark = 5; + return new Promise(async (resolve, reject) => { + interface Chunk { + key: string; + mapped: number[]; + } + const sink = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + cb(); + t.pass(); + pendingReads--; + if (pendingReads === 0) { + resolve(); + } + }, + }); + const construct = (destKey: string) => { + const first = map(async (chunk: Chunk) => { + chunk.mapped.push(1); + return chunk; + }); + + const second = map(async (chunk: Chunk) => { + chunk.mapped.push(2); + return chunk; + }); + + first.pipe(second).pipe(sink); + return first; + }; + const _demux = demux( + construct, + { key: "key" }, + { + objectMode: true, + highWaterMark, + }, + ); + _demux.on("error", err => { + reject(); + }); + + _demux.on("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + t.pass(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + ]; + let pendingReads = input.length; + + for (const item of input) { + const res = _demux.write(item); + expect(_demux._writableState.length).to.be.at.most(highWaterMark); + if (!res) { + await sleep(10); + } + } + }); +}); + +test.cb( + "demux() should emit drain event after 500 ms when writing 5 items that take 100ms to process with a highWaterMark of 5 ", + t => { + t.plan(6); + const _rate = 100; + const highWaterMark = 5; + interface Chunk { + key: string; + mapped: number[]; + } + const sink = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + t.pass(); + cb(); + if (pendingReads === 0) { + t.end(); + } + }, + }); + const construct = (destKey: string) => { + const first = map( + async (chunk: Chunk) => { + chunk.mapped.push(1); + await sleep(_rate); + return chunk; + }, + { objectMode: true }, + ); + + const second = map( + (chunk: Chunk) => { + pendingReads--; + chunk.mapped.push(2); + return chunk; + }, + { objectMode: true, highWaterMark: 1 }, + ); + + first.pipe(second).pipe(sink); + return first; + }; + const _demux = demux( + construct, + { key: "key" }, + { + objectMode: true, + highWaterMark, + }, + ); + _demux.on("error", err => { + t.end(err); + }); + + _demux.on("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + expect(performance.now() - start).to.be.greaterThan( + _rate * input.length, + ); + t.pass(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + ]; + + let pendingReads = input.length; + input.forEach(item => { + _demux.write(item); + }); + const start = performance.now(); + }, +); + +test.cb( + "demux() should emit drain event immediately when second stream is bottleneck", + t => { + t.plan(6); + const highWaterMark = 5; + interface Chunk { + key: string; + mapped: number[]; + } + const sink = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + t.pass(); + cb(); + if (pendingReads === 0) { + t.end(); + } + }, + }); + const construct = (destKey: string) => { + const first = map( + (chunk: Chunk) => { + chunk.mapped.push(1); + return chunk; + }, + { objectMode: true }, + ); + + const second = map( + async (chunk: Chunk) => { + pendingReads--; + await sleep(200); + chunk.mapped.push(2); + expect(second._writableState.length).to.be.equal(1); + expect(first._readableState.length).to.equal(pendingReads); + return chunk; + }, + { objectMode: true, highWaterMark: 1 }, + ); + + first.pipe(second).pipe(sink); + return first; + }; + const _demux = demux( + construct, + { key: "key" }, + { + objectMode: true, + highWaterMark, + }, + ); + _demux.on("error", err => { + t.end(err); + }); + + _demux.on("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + expect(performance.now() - start).to.be.lessThan(50); + t.pass(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + ]; + + let pendingReads = input.length; + input.forEach(item => { + _demux.write(item); + }); + const start = performance.now(); + }, +); + +test.only("demux() should only emit drain event when all streams are writable", t => { + t.plan(3); + const highWaterMark = 2; + interface Chunk { + key: string; + mapped: number[]; + } + return new Promise(async (resolve, reject) => { + const sink = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + t.pass(); + cb(); + console.log(chunk); + pendingReads--; + if (pendingReads === 0) { + resolve(); + } + }, + }); + const construct = (destKey: string) => { + const first = map( + (chunk: Chunk) => { + chunk.mapped.push(1); + return chunk; + }, + { objectMode: true }, + ); + + const second = map( + async (chunk: Chunk) => { + await sleep(25); + chunk.mapped.push(2); + return chunk; + }, + { objectMode: true, highWaterMark: 1 }, + ); + + first.pipe(second).pipe(sink); + return first; + }; + const _demux = demux( + construct, + { key: "key" }, + { + objectMode: true, + highWaterMark, + }, + ); + _demux.on("error", err => { + reject(); + }); + + _demux.on("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + expect(performance.now() - start).to.be.lessThan(50); + t.pass(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, // should only be recieved after a becomes writable + ]; + + let pendingReads = input.length; + let start = performance.now(); + for (const item of input) { + const res = _demux.write(item); + if (!res) { + await sleep(100); + start = performance.now(); + } + } + }); +}); +test("demux() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck", t => { + t.plan(6); + const highWaterMark = 5; + return new Promise(async (resolve, reject) => { + interface Chunk { + key: string; + mapped: number[]; + } + const sink = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + t.pass(); + cb(); + if (pendingReads === 0) { + resolve(); + } + }, + }); + const construct = (destKey: string) => { + const first = map( + (chunk: Chunk) => { + expect(first._readableState.length).to.be.at.most(2); + chunk.mapped.push(1); + return chunk; + }, + { objectMode: 2, highWaterMark: 2 }, + ); + + const second = map( + async (chunk: Chunk) => { + chunk.mapped.push(2); + expect(second._writableState.length).to.be.equal(1); + await sleep(100); + pendingReads--; + return chunk; + }, + { objectMode: 2, highWaterMark: 2 }, + ); + + first.pipe(second).pipe(sink); + return first; + }; + const _demux = demux( + construct, + { key: "key" }, + { + objectMode: true, + highWaterMark, + }, + ); + _demux.on("error", err => { + reject(); + }); + + _demux.on("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + t.pass(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + ]; + let pendingReads = input.length; + + input.forEach(item => { + _demux.write(item); + }); + }); +}); diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 6b97798..2c5feb6 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -219,10 +219,10 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write const input = [ { key: "a", mapped: [] }, - { key: "b", mapped: [] }, - { key: "c", mapped: [] }, - { key: "d", mapped: [] }, - { key: "e", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, ]; let start = performance.now(); diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 9ff3498..5504c20 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -17,6 +17,8 @@ test.cb("demux() constructor should be called once per key", t => { { key: "b", visited: [] }, { key: "a", visited: [] }, { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, ]; const construct = sinon.spy((destKey: string) => { const dest = map((chunk: Test) => { @@ -48,6 +50,8 @@ test.cb("demux() constructor should be called once per key using keyBy", t => { { key: "b", visited: [] }, { key: "a", visited: [] }, { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, ]; const construct = sinon.spy((destKey: string) => { @@ -134,15 +138,15 @@ test.cb("should emit errors", t => { demuxed.end(); }); -test("compose() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", async t => { +test("demux() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", t => { t.plan(7); + interface Chunk { + key: string; + mapped: number[]; + } const highWaterMark = 5; const _rate = 25; return new Promise(async (resolve, reject) => { - interface Chunk { - key: string; - mapped: number[]; - } const sink = new Writable({ objectMode: true, write(chunk, encoding, cb) { @@ -222,7 +226,8 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar write(chunk, encoding, cb) { cb(); t.pass(); - if (chunk.key === "f") { + pendingReads--; + if (pendingReads === 0) { resolve(); } }, @@ -260,12 +265,13 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar const input = [ { key: "a", mapped: [] }, - { key: "b", mapped: [] }, - { key: "c", mapped: [] }, - { key: "d", mapped: [] }, - { key: "e", mapped: [] }, - { key: "f", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, ]; + let pendingReads = input.length; for (const item of input) { const res = _demux.write(item); @@ -354,6 +360,7 @@ test.cb( const start = performance.now(); }, ); + test.cb( "demux() should emit drain event immediately when second stream is bottleneck", t => { @@ -431,6 +438,79 @@ test.cb( }, ); +test("demux() should only emit drain event when all streams are writable", t => { + t.plan(1); + const highWaterMark = 2; + interface Chunk { + key: string; + mapped: number[]; + } + return new Promise(async (resolve, reject) => { + const sink = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + cb(); + pendingReads--; + if (chunk.key === "b") { + expect(performance.now() - start).to.be.greaterThan(150); + t.pass(); + } + if (pendingReads === 0) { + resolve(); + } + }, + }); + const construct = (destKey: string) => { + const first = map( + (chunk: Chunk) => { + chunk.mapped.push(1); + return chunk; + }, + { objectMode: true }, + ); + + const second = map( + async (chunk: Chunk) => { + await sleep(50); + chunk.mapped.push(2); + return chunk; + }, + { objectMode: true, highWaterMark: 1 }, + ); + + first.pipe(second).pipe(sink); + return first; + }; + const _demux = demux( + construct, + { key: "key" }, + { + objectMode: true, + highWaterMark, + }, + ); + _demux.on("error", err => { + reject(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "c", mapped: [] }, + { key: "c", mapped: [] }, + { key: "b", mapped: [] }, // should only be recieved after a becomes writable + ]; + + let pendingReads = input.length; + let start = performance.now(); + for (const item of input) { + const res = _demux.write(item); + if (!res) { + await sleep(50); + } + } + }); +}); test("demux() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck", t => { t.plan(6); const highWaterMark = 5;