From eed36a4fe91285171627996263664854ba4b76bc Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Mon, 9 Sep 2019 14:43:18 -0400 Subject: [PATCH] Lots of stuff --- 1:p | 589 ----------------------------------------- src/functions/demux.ts | 19 ++ tests/demux.spec.ts | 4 +- 3 files changed, 22 insertions(+), 590 deletions(-) delete mode 100644 1:p diff --git a/1:p b/1:p deleted file mode 100644 index 5f5fc0a..0000000 --- a/1:p +++ /dev/null @@ -1,589 +0,0 @@ -import test from "ava"; -import { expect } from "chai"; -const { demux, map } = require("../src"); -import { Writable } from "stream"; -const sinon = require("sinon"); -const { sleep } = require("../src/helpers"); -import { performance } from "perf_hooks"; - -interface Test { - key: string; - visited: number[]; -} -test.cb("demux() constructor should be called once per key", t => { - t.plan(1); - const input = [ - { key: "a", visited: [] }, - { key: "b", visited: [] }, - { key: "a", visited: [] }, - { key: "c", visited: [] }, - { key: "a", visited: [] }, - { key: "b", visited: [] }, - ]; - const construct = sinon.spy((destKey: string) => { - const dest = map((chunk: Test) => { - chunk.visited.push(1); - return chunk; - }); - - return dest; - }); - - const demuxed = demux(construct, { key: "key" }, { objectMode: true }); - - demuxed.on("finish", () => { - expect(construct.withArgs("a").callCount).to.equal(1); - expect(construct.withArgs("b").callCount).to.equal(1); - expect(construct.withArgs("c").callCount).to.equal(1); - t.pass(); - t.end(); - }); - - input.forEach(event => demuxed.write(event)); - demuxed.end(); -}); - -test.cb("demux() constructor should be called once per key using keyBy", t => { - t.plan(1); - const input = [ - { key: "a", visited: [] }, - { key: "b", visited: [] }, - { key: "a", visited: [] }, - { key: "c", visited: [] }, - { key: "a", visited: [] }, - { key: "b", visited: [] }, - ]; - - const construct = sinon.spy((destKey: string) => { - const dest = map((chunk: Test) => { - chunk.visited.push(1); - return chunk; - }); - - return dest; - }); - - const demuxed = demux( - construct, - { keyBy: item => item.key }, - { objectMode: true }, - ); - - demuxed.on("finish", () => { - expect(construct.withArgs("a").callCount).to.equal(1); - expect(construct.withArgs("b").callCount).to.equal(1); - expect(construct.withArgs("c").callCount).to.equal(1); - t.pass(); - t.end(); - }); - - input.forEach(event => demuxed.write(event)); - demuxed.end(); -}); - -test.cb("should emit errors", t => { - t.plan(2); - let index = 0; - const input = [ - { key: "a", visited: [] }, - { key: "b", visited: [] }, - { key: "a", visited: [] }, - { key: "a", visited: [] }, - ]; - const results = [ - { key: "a", visited: [0] }, - { key: "b", visited: [1] }, - { key: "a", visited: [2] }, - { key: "a", visited: [3] }, - ]; - const destinationStreamKeys = []; - const sink = new Writable({ - objectMode: true, - write(chunk, enc, cb) { - expect(results).to.deep.include(chunk); - expect(input).to.not.deep.include(chunk); - t.pass(); - cb(); - }, - }); - - const construct = (destKey: string) => { - destinationStreamKeys.push(destKey); - const dest = map((chunk: Test) => { - if (chunk.key === "b") { - throw new Error("Caught object with key 'b'"); - } - - const _chunk = { ...chunk, visited: [] }; - _chunk.visited.push(index); - index++; - return _chunk; - }).on("error", () => {}); - - dest.pipe(sink); - return dest; - }; - - const demuxed = demux( - construct, - { keyBy: (chunk: any) => chunk.key }, - { objectMode: true }, - ); - demuxed.on("error", e => { - expect(e.message).to.equal("Caught object with key 'b'"); - t.pass(); - t.end(); - }); - input.forEach(event => demuxed.write(event)); - demuxed.end(); -}); - -test("demux() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", t => { - t.plan(7); - interface Chunk { - key: string; - mapped: number[]; - } - const highWaterMark = 5; - const _rate = 25; - return new Promise(async (resolve, reject) => { - const sink = new Writable({ - objectMode: true, - write(chunk, encoding, cb) { - cb(); - t.pass(); - pendingReads--; - if (pendingReads === 0) { - resolve(); - } - }, - }); - const construct = (destKey: string) => { - const first = map(async (chunk: Chunk) => { - await sleep(_rate); - chunk.mapped.push(1); - return chunk; - }); - - const second = map(async (chunk: Chunk) => { - chunk.mapped.push(2); - return chunk; - }); - - first.pipe(second).pipe(sink); - return first; - }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); - _demux.on("error", err => { - reject(); - }); - - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.greaterThan(_rate); - t.pass(); - }); - - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - let pendingReads = input.length; - - let start = performance.now(); - for (const item of input) { - const res = _demux.write(item); - expect(_demux._writableState.length).to.be.at.most(highWaterMark); - if (!res) { - start = performance.now(); - await sleep(100); - } - } - }); -}); - -test("demux() should emit one drain event when writing 6 items with highWaterMark of 5", t => { - t.plan(7); - const highWaterMark = 5; - return new Promise(async (resolve, reject) => { - interface Chunk { - key: string; - mapped: number[]; - } - const sink = new Writable({ - objectMode: true, - write(chunk, encoding, cb) { - cb(); - t.pass(); - pendingReads--; - if (pendingReads === 0) { - resolve(); - } - }, - }); - const construct = (destKey: string) => { - const first = map(async (chunk: Chunk) => { - chunk.mapped.push(1); - return chunk; - }); - - const second = map(async (chunk: Chunk) => { - chunk.mapped.push(2); - return chunk; - }); - - first.pipe(second).pipe(sink); - return first; - }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); - _demux.on("error", err => { - reject(); - }); - - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - t.pass(); - }); - - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - let pendingReads = input.length; - - for (const item of input) { - const res = _demux.write(item); - expect(_demux._writableState.length).to.be.at.most(highWaterMark); - if (!res) { - await sleep(10); - } - } - }); -}); - -test.cb( - "demux() should emit drain event after 500 ms when writing 5 items that take 100ms to process with a highWaterMark of 5 ", - t => { - t.plan(6); - const _rate = 100; - const highWaterMark = 5; - interface Chunk { - key: string; - mapped: number[]; - } - const sink = new Writable({ - objectMode: true, - write(chunk, encoding, cb) { - t.pass(); - cb(); - if (pendingReads === 0) { - t.end(); - } - }, - }); - const construct = (destKey: string) => { - const first = map( - async (chunk: Chunk) => { - chunk.mapped.push(1); - await sleep(_rate); - return chunk; - }, - { objectMode: true }, - ); - - const second = map( - (chunk: Chunk) => { - pendingReads--; - chunk.mapped.push(2); - return chunk; - }, - { objectMode: true, highWaterMark: 1 }, - ); - - first.pipe(second).pipe(sink); - return first; - }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); - _demux.on("error", err => { - t.end(err); - }); - - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.greaterThan( - _rate * input.length, - ); - t.pass(); - }); - - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - - let pendingReads = input.length; - input.forEach(item => { - _demux.write(item); - }); - const start = performance.now(); - }, -); - -test.cb( - "demux() should emit drain event immediately when second stream is bottleneck", - t => { - t.plan(6); - const highWaterMark = 5; - interface Chunk { - key: string; - mapped: number[]; - } - const sink = new Writable({ - objectMode: true, - write(chunk, encoding, cb) { - t.pass(); - cb(); - if (pendingReads === 0) { - t.end(); - } - }, - }); - const construct = (destKey: string) => { - const first = map( - (chunk: Chunk) => { - chunk.mapped.push(1); - return chunk; - }, - { objectMode: true }, - ); - - const second = map( - async (chunk: Chunk) => { - pendingReads--; - await sleep(200); - chunk.mapped.push(2); - expect(second._writableState.length).to.be.equal(1); - expect(first._readableState.length).to.equal(pendingReads); - return chunk; - }, - { objectMode: true, highWaterMark: 1 }, - ); - - first.pipe(second).pipe(sink); - return first; - }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); - _demux.on("error", err => { - t.end(err); - }); - - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.lessThan(50); - t.pass(); - }); - - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - - let pendingReads = input.length; - input.forEach(item => { - _demux.write(item); - }); - const start = performance.now(); - }, -); - -test.only("demux() should only emit drain event when all streams are writable", t => { - t.plan(3); - const highWaterMark = 2; - interface Chunk { - key: string; - mapped: number[]; - } - return new Promise(async (resolve, reject) => { - const sink = new Writable({ - objectMode: true, - write(chunk, encoding, cb) { - t.pass(); - cb(); - console.log(chunk); - pendingReads--; - if (pendingReads === 0) { - resolve(); - } - }, - }); - const construct = (destKey: string) => { - const first = map( - (chunk: Chunk) => { - chunk.mapped.push(1); - return chunk; - }, - { objectMode: true }, - ); - - const second = map( - async (chunk: Chunk) => { - await sleep(25); - chunk.mapped.push(2); - return chunk; - }, - { objectMode: true, highWaterMark: 1 }, - ); - - first.pipe(second).pipe(sink); - return first; - }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); - _demux.on("error", err => { - reject(); - }); - - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.lessThan(50); - t.pass(); - }); - - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "b", mapped: [] }, // should only be recieved after a becomes writable - ]; - - let pendingReads = input.length; - let start = performance.now(); - for (const item of input) { - const res = _demux.write(item); - if (!res) { - await sleep(100); - start = performance.now(); - } - } - }); -}); -test("demux() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck", t => { - t.plan(6); - const highWaterMark = 5; - return new Promise(async (resolve, reject) => { - interface Chunk { - key: string; - mapped: number[]; - } - const sink = new Writable({ - objectMode: true, - write(chunk, encoding, cb) { - t.pass(); - cb(); - if (pendingReads === 0) { - resolve(); - } - }, - }); - const construct = (destKey: string) => { - const first = map( - (chunk: Chunk) => { - expect(first._readableState.length).to.be.at.most(2); - chunk.mapped.push(1); - return chunk; - }, - { objectMode: 2, highWaterMark: 2 }, - ); - - const second = map( - async (chunk: Chunk) => { - chunk.mapped.push(2); - expect(second._writableState.length).to.be.equal(1); - await sleep(100); - pendingReads--; - return chunk; - }, - { objectMode: 2, highWaterMark: 2 }, - ); - - first.pipe(second).pipe(sink); - return first; - }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); - _demux.on("error", err => { - reject(); - }); - - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - t.pass(); - }); - - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - let pendingReads = input.length; - - input.forEach(item => { - _demux.write(item); - }); - }); -}); diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 9c9e624..647cd61 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -118,4 +118,23 @@ class Demux extends Writable { } return this; } + public once(event: string, cb: any) { + switch (eventsTarget[event]) { + case EventSubscription.Self: + super.once(event, cb); + break; + case EventSubscription.All: + Object.keys(this.streamsByKey).forEach(key => + this.streamsByKey[key].stream.once(event, cb), + ); + break; + case EventSubscription.Unhandled: + throw new Error( + "Stream must be multiplexed before handling this event", + ); + default: + super.once(event, cb); + } + return this; + } } diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 5504c20..15d89c3 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -10,6 +10,7 @@ interface Test { key: string; visited: number[]; } + test.cb("demux() constructor should be called once per key", t => { t.plan(1); const input = [ @@ -502,7 +503,7 @@ test("demux() should only emit drain event when all streams are writable", t => ]; let pendingReads = input.length; - let start = performance.now(); + const start = performance.now(); for (const item of input) { const res = _demux.write(item); if (!res) { @@ -511,6 +512,7 @@ test("demux() should only emit drain event when all streams are writable", t => } }); }); + test("demux() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck", t => { t.plan(6); const highWaterMark = 5;