From 1227ce70953903d60209e43283d8a1ffba1784a8 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sat, 25 Jan 2020 12:33:09 -0500 Subject: [PATCH 01/11] Allow demux to be piped (mux) --- src/functions/demux.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 6435447..25e16bd 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -37,6 +37,7 @@ class Demux extends Writable { private streamsByKey: { [key: string]: DemuxStreams; }; + private destination: Writable; private demuxer: (chunk: any) => string; private construct: (destKey?: string) => DemuxStreams; constructor( @@ -55,8 +56,12 @@ class Demux extends Writable { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { this.streamsByKey[destKey] = await this.construct(destKey); + if (this.destination !== undefined) { + (this.streamsByKey[destKey] as any).pipe(this.destination); + } } - if (!this.streamsByKey[destKey].write(chunk, encoding)) { + const writeRes = this.streamsByKey[destKey].write(chunk, encoding); + if (!writeRes) { this.streamsByKey[destKey].once("drain", () => { cb(); }); @@ -65,6 +70,11 @@ class Demux extends Writable { } } + public pipe(dest: any) { + this.destination = dest; + return dest; + } + public on(event: string, cb: any) { switch (eventsTarget[event]) { case EventSubscription.Self: From bd178ce2f0f508db1aef7a8ab0afec98e57d21c6 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sat, 25 Jan 2020 12:36:21 -0500 Subject: [PATCH 02/11] Revert to old --- src/functions/demux.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 25e16bd..cd287dd 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -60,8 +60,8 @@ class Demux extends Writable { (this.streamsByKey[destKey] as any).pipe(this.destination); } } - const writeRes = this.streamsByKey[destKey].write(chunk, encoding); - if (!writeRes) { + + if (!this.streamsByKey[destKey].write(chunk, encoding)) { this.streamsByKey[destKey].once("drain", () => { cb(); }); From bff4e0d6ed7f563e3c313fb990bc706565a09f84 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sun, 26 Jan 2020 09:55:35 -0500 Subject: [PATCH 03/11] Add remux options --- src/functions/demux.ts | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index cd287dd..feeb691 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -10,7 +10,7 @@ enum EventSubscription { const eventsTarget = { close: EventSubscription.Self, - data: EventSubscription.All, + data: EventSubscription.Self, drain: EventSubscription.Self, end: EventSubscription.Self, error: EventSubscription.Self, @@ -24,10 +24,14 @@ const eventsTarget = { type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; +interface DemuxOptions extends WritableOptions { + remultiplex?: DemuxStreams; +} + export function demux( construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), - options?: WritableOptions, + options?: DemuxOptions, ): Writable { return new Demux(construct, demuxBy, options); } @@ -37,18 +41,19 @@ class Demux extends Writable { private streamsByKey: { [key: string]: DemuxStreams; }; - private destination: Writable; private demuxer: (chunk: any) => string; private construct: (destKey?: string) => DemuxStreams; + private remultiplex?: DemuxStreams; constructor( construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), - options: WritableOptions = {}, + options: DemuxOptions = {}, ) { super(options); this.demuxer = typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy; this.construct = construct; + this.remultiplex = options.remultiplex; this.streamsByKey = {}; } @@ -56,8 +61,10 @@ class Demux extends Writable { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { this.streamsByKey[destKey] = await this.construct(destKey); - if (this.destination !== undefined) { - (this.streamsByKey[destKey] as any).pipe(this.destination); + if (this.remultiplex) { + (this.streamsByKey[destKey] as NodeJS.ReadWriteStream).pipe( + this.remultiplex, + ); } } @@ -70,11 +77,6 @@ class Demux extends Writable { } } - public pipe(dest: any) { - this.destination = dest; - return dest; - } - public on(event: string, cb: any) { switch (eventsTarget[event]) { case EventSubscription.Self: From 9c09957775a852397ab5c76d8ff1aa22c7fc5129 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sun, 26 Jan 2020 10:26:54 -0500 Subject: [PATCH 04/11] Add test for remux --- src/functions/demux.ts | 2 +- tests/demux.spec.ts | 46 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index feeb691..1c39f5d 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -10,7 +10,7 @@ enum EventSubscription { const eventsTarget = { close: EventSubscription.Self, - data: EventSubscription.Self, + data: EventSubscription.All, drain: EventSubscription.Self, end: EventSubscription.Self, error: EventSubscription.Self, diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 5c7e017..dc71ca3 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -5,7 +5,7 @@ import { Writable } from "stream"; const sinon = require("sinon"); const { sleep } = require("../src/helpers"); import { performance } from "perf_hooks"; -const { demux, map } = mhysa(); +const { demux, map } = mhysa({ objectMode: true }); interface Test { key: string; @@ -655,3 +655,47 @@ test("demux() should emit drain event when second stream in pipeline is bottlene }); }); }); + +test.cb("Demux should remux to sink", t => { + t.plan(6); + let i = 0; + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, + ]; + const result = [ + { key: "a", visited: ["a"] }, + { key: "b", visited: ["b"] }, + { key: "a", visited: ["a"] }, + { key: "c", visited: ["c"] }, + { key: "a", visited: ["a"] }, + { key: "b", visited: ["b"] }, + ]; + const construct = (destKey: string) => { + const dest = map((chunk: any) => { + chunk.visited.push(destKey); + return chunk; + }); + + return dest; + }; + + const remux = map(d => { + t.deepEqual(d, result[i]); + i++; + if (i === input.length) { + t.end(); + } + }); + + const demuxed = demux(construct, "key", { + objectMode: true, + remultiplex: remux, + }); + + input.forEach(event => demuxed.write(event)); +}); From cf719b25cf8f876db14619e4188762e72668f7b2 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sun, 26 Jan 2020 10:37:59 -0500 Subject: [PATCH 05/11] Fix broken test --- tests/demux.spec.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index dc71ca3..929d04f 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -355,7 +355,7 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar }); }); -test.cb.only( +test.cb( "demux() should emit drain event when third stream is bottleneck", t => { t.plan(8); @@ -446,7 +446,7 @@ test.cb( const sink = new Writable({ objectMode: true, write(chunk, encoding, cb) { - expect(chunk.mapped).to.deep.equal([1, 2]); + expect(chunk.mapped).to.deep.equal([1, 2, 3]); t.pass(); pendingReads--; if (pendingReads === 0) { @@ -685,6 +685,7 @@ test.cb("Demux should remux to sink", t => { }; const remux = map(d => { + console.log(d); t.deepEqual(d, result[i]); i++; if (i === input.length) { From 11ed6f81e73019cac61d1ae04e9382f7a44d5f54 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sun, 26 Jan 2020 10:43:33 -0500 Subject: [PATCH 06/11] remove console log --- tests/demux.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 929d04f..2102d33 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -685,7 +685,6 @@ test.cb("Demux should remux to sink", t => { }; const remux = map(d => { - console.log(d); t.deepEqual(d, result[i]); i++; if (i === input.length) { From 2b1308a605737bdab5e66098545bfd1a9676cca5 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Mon, 27 Jan 2020 09:29:59 -0500 Subject: [PATCH 07/11] use fromArray --- tests/demux.spec.ts | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 2102d33..abb4ec9 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -5,7 +5,7 @@ import { Writable } from "stream"; const sinon = require("sinon"); const { sleep } = require("../src/helpers"); import { performance } from "perf_hooks"; -const { demux, map } = mhysa({ objectMode: true }); +const { demux, map, fromArray } = mhysa({ objectMode: true }); interface Test { key: string; @@ -41,8 +41,7 @@ test.cb("demux() constructor should be called once per key", t => { t.end(); }); - input.forEach(event => demuxed.write(event)); - demuxed.end(); + fromArray(input).pipe(demuxed); }); test.cb("demux() should send input through correct pipeline", t => { @@ -84,8 +83,7 @@ test.cb("demux() should send input through correct pipeline", t => { t.end(); }); - input.forEach(event => demuxed.write(event)); - demuxed.end(); + fromArray(input).pipe(demuxed); }); test.cb("demux() constructor should be called once per key using keyBy", t => { @@ -108,7 +106,9 @@ test.cb("demux() constructor should be called once per key using keyBy", t => { return dest; }); - const demuxed = demux(construct, item => item.key, { objectMode: true }); + const demuxed = demux(construct, item => item.key, { + objectMode: true, + }); demuxed.on("finish", () => { expect(construct.withArgs("a").callCount).to.equal(1); @@ -118,8 +118,7 @@ test.cb("demux() constructor should be called once per key using keyBy", t => { t.end(); }); - input.forEach(event => demuxed.write(event)); - demuxed.end(); + fromArray(input).pipe(demuxed); }); test.cb("demux() should send input through correct pipeline using keyBy", t => { @@ -161,8 +160,7 @@ test.cb("demux() should send input through correct pipeline using keyBy", t => { t.end(); }); - input.forEach(event => demuxed.write(event)); - demuxed.end(); + fromArray(input).pipe(demuxed); }); test("demux() write should return false after if it has >= highWaterMark items buffered and drain should be emitted", t => { @@ -697,5 +695,5 @@ test.cb("Demux should remux to sink", t => { remultiplex: remux, }); - input.forEach(event => demuxed.write(event)); + fromArray(input).pipe(demuxed); }); From 8856cb8d3b2ed10c5e7f5016415fb61ff6cd4ef2 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Mon, 27 Jan 2020 13:07:37 -0500 Subject: [PATCH 08/11] Update demux --- src/functions/compose.ts | 13 +- src/functions/demux.ts | 101 ++++++------ src/helpers.ts | 14 ++ tests/compose.spec.ts | 79 +++++++--- tests/demux.spec.ts | 320 ++++++++++++++++++++++++++++++++------- 5 files changed, 379 insertions(+), 148 deletions(-) diff --git a/src/functions/compose.ts b/src/functions/compose.ts index b2276b7..64789ef 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -1,4 +1,5 @@ import { pipeline, TransformOptions, Transform } from "stream"; +import { AllStreams, isReadable } from "../helpers"; export function compose( streams: Array< @@ -21,18 +22,6 @@ enum EventSubscription { Self, } -type AllStreams = - | NodeJS.ReadableStream - | NodeJS.ReadWriteStream - | NodeJS.WritableStream; - -function isReadable(stream: AllStreams): stream is NodeJS.WritableStream { - return ( - (stream as NodeJS.ReadableStream).pipe !== undefined && - (stream as any).readable === true - ); -} - export class Compose extends Transform { private first: AllStreams; private last: AllStreams; diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 1c39f5d..aaf2914 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -1,4 +1,6 @@ -import { WritableOptions, Writable } from "stream"; +import { DuplexOptions, Duplex, Transform } from "stream"; + +import { isReadable } from "../helpers"; enum EventSubscription { Last = 0, @@ -8,62 +10,62 @@ enum EventSubscription { Unhandled, } -const eventsTarget = { - close: EventSubscription.Self, - data: EventSubscription.All, - drain: EventSubscription.Self, - end: EventSubscription.Self, - error: EventSubscription.Self, - finish: EventSubscription.Self, - pause: EventSubscription.Self, - pipe: EventSubscription.Self, - readable: EventSubscription.Self, - resume: EventSubscription.Self, - unpipe: EventSubscription.Self, -}; - type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; -interface DemuxOptions extends WritableOptions { - remultiplex?: DemuxStreams; +interface DemuxOptions extends DuplexOptions { + remultiplex?: boolean; } export function demux( construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), options?: DemuxOptions, -): Writable { +): Duplex { return new Demux(construct, demuxBy, options); } -// @TODO handle pipe event ie) Multiplex -class Demux extends Writable { +class Demux extends Duplex { private streamsByKey: { [key: string]: DemuxStreams; }; private demuxer: (chunk: any) => string; private construct: (destKey?: string) => DemuxStreams; - private remultiplex?: DemuxStreams; + private remultiplex: boolean; + private transform: Transform; constructor( construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), - options: DemuxOptions = {}, + options: DemuxOptions, ) { super(options); this.demuxer = typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy; this.construct = construct; - this.remultiplex = options.remultiplex; + this.remultiplex = + options.remultiplex === undefined ? true : options.remultiplex; this.streamsByKey = {}; + this.transform = new Transform({ + ...options, + transform: (d, _, cb) => { + this.push(d); + cb(null, d); + }, + }); + this.once("unpipe", () => this._flush()); } + public _read(size: number) {} + public async _write(chunk: any, encoding: any, cb: any) { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { - this.streamsByKey[destKey] = await this.construct(destKey); - if (this.remultiplex) { - (this.streamsByKey[destKey] as NodeJS.ReadWriteStream).pipe( - this.remultiplex, + const newPipeline = await this.construct(destKey); + this.streamsByKey[destKey] = newPipeline; + if (this.remultiplex && isReadable(newPipeline)) { + (newPipeline as NodeJS.ReadWriteStream).pipe(this.transform); + } else if (this.remultiplex) { + console.error( + `Pipeline construct for ${destKey} does not implement readable interface`, ); } } @@ -77,35 +79,24 @@ class Demux extends Writable { } } - public on(event: string, cb: any) { - switch (eventsTarget[event]) { - case EventSubscription.Self: - super.on(event, cb); - break; - case EventSubscription.All: - Object.keys(this.streamsByKey).forEach(key => - this.streamsByKey[key].on(event, cb), - ); - break; - default: - super.on(event, cb); - } - return this; + public _flush() { + const pipelines = Object.values(this.streamsByKey); + let totalEnded = 0; + pipelines.forEach(pipeline => { + pipeline.once("end", () => { + totalEnded++; + if (pipelines.length === totalEnded) { + this.push(null); + this.emit("finished"); + } + }); + }); + pipelines.forEach(pipeline => pipeline.end()); } - public once(event: string, cb: any) { - switch (eventsTarget[event]) { - case EventSubscription.Self: - super.once(event, cb); - break; - case EventSubscription.All: - Object.keys(this.streamsByKey).forEach(key => - this.streamsByKey[key].once(event, cb), - ); - break; - default: - super.once(event, cb); - } - return this; + public _destroy(error: any, cb: (error?: any) => void) { + const pipelines = Object.values(this.streamsByKey); + pipelines.forEach(p => (p as any).destroy()); + cb(error); } } diff --git a/src/helpers.ts b/src/helpers.ts index 242d264..4b255d9 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -1,3 +1,17 @@ export async function sleep(time: number): Promise<{} | null> { return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null; } + +export type AllStreams = + | NodeJS.ReadableStream + | NodeJS.ReadWriteStream + | NodeJS.WritableStream; + +export function isReadable( + stream: AllStreams, +): stream is NodeJS.WritableStream { + return ( + (stream as NodeJS.ReadableStream).pipe !== undefined && + (stream as any).readable === true + ); +} diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 56a20de..508c112 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -114,9 +114,13 @@ test("compose() writable length should be less than highWaterMark when handing w return chunk; }); - const composed = compose([first, second], undefined, { - highWaterMark: 2, - }); + const composed = compose( + [first, second], + undefined, + { + highWaterMark: 2, + }, + ); composed.on("error", err => { reject(); }); @@ -171,9 +175,13 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write return chunk; }); - const composed = compose([first, second], undefined, { - highWaterMark, - }); + const composed = compose( + [first, second], + undefined, + { + highWaterMark, + }, + ); composed.on("error", err => { reject(); }); @@ -237,9 +245,13 @@ test.cb( return chunk; }); - const composed = compose([first, second], undefined, { - highWaterMark: 5, - }); + const composed = compose( + [first, second], + undefined, + { + highWaterMark: 5, + }, + ); composed.on("error", err => { t.end(err); @@ -301,9 +313,13 @@ test.cb( { highWaterMark: 1 }, ); - const composed = compose([first, second], undefined, { - highWaterMark: 5, - }); + const composed = compose( + [first, second], + undefined, + { + highWaterMark: 5, + }, + ); composed.on("error", err => { t.end(err); }); @@ -368,9 +384,13 @@ test.cb( { highWaterMark: 2 }, ); - const composed = compose([first, second], undefined, { - highWaterMark: 5, - }); + const composed = compose( + [first, second], + undefined, + { + highWaterMark: 5, + }, + ); composed.on("error", err => { t.end(err); }); @@ -423,9 +443,13 @@ test.cb( return chunk; }); - const composed = compose([first, second], undefined, { - highWaterMark: 6, - }); + const composed = compose( + [first, second], + undefined, + { + highWaterMark: 6, + }, + ); composed.on("error", err => { t.end(err); @@ -475,9 +499,12 @@ test.cb("compose() should be 'destroyable'", t => { return chunk; }); - const composed = compose([first, second], (err: any) => { - t.pass(); - }); + const composed = compose( + [first, second], + (err: any) => { + t.pass(); + }, + ); const fakeSource = new Readable({ objectMode: true, @@ -533,9 +560,13 @@ test.cb("compose() `finish` and `end` propagates", t => { return chunk; }); - const composed = compose([first, second], undefined, { - highWaterMark: 3, - }); + const composed = compose( + [first, second], + undefined, + { + highWaterMark: 3, + }, + ); const fakeSource = new Readable({ objectMode: true, diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index abb4ec9..3e1dae0 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -1,7 +1,7 @@ import test from "ava"; import { expect } from "chai"; import mhysa from "../src"; -import { Writable } from "stream"; +import { Writable, Readable } from "stream"; const sinon = require("sinon"); const { sleep } = require("../src/helpers"); import { performance } from "perf_hooks"; @@ -31,7 +31,7 @@ test.cb("demux() constructor should be called once per key", t => { return dest; }); - const demuxed = demux(construct, "key", { objectMode: true }); + const demuxed = demux(construct, "key", {}); demuxed.on("finish", () => { expect(construct.withArgs("a").callCount).to.equal(1); @@ -65,7 +65,7 @@ test.cb("demux() should send input through correct pipeline", t => { return dest; }; - const demuxed = demux(construct, "key", { objectMode: true }); + const demuxed = demux(construct, "key", {}); demuxed.on("finish", () => { pipelineSpies["a"].getCalls().forEach(call => { @@ -106,9 +106,7 @@ test.cb("demux() constructor should be called once per key using keyBy", t => { return dest; }); - const demuxed = demux(construct, item => item.key, { - objectMode: true, - }); + const demuxed = demux(construct, item => item.key, {}); demuxed.on("finish", () => { expect(construct.withArgs("a").callCount).to.equal(1); @@ -142,7 +140,7 @@ test.cb("demux() should send input through correct pipeline using keyBy", t => { return dest; }; - const demuxed = demux(construct, item => item.key, { objectMode: true }); + const demuxed = demux(construct, item => item.key, {}); demuxed.on("finish", () => { pipelineSpies["a"].getCalls().forEach(call => { @@ -187,7 +185,7 @@ test("demux() write should return false after if it has >= highWaterMark items b await sleep(slowProcessorSpeed); return { ...chunk, mapped: [1] }; }, - { highWaterMark: 1, objectMode: true }, + { highWaterMark: 1 }, ); first.on("data", chunk => { @@ -203,7 +201,6 @@ test("demux() write should return false after if it has >= highWaterMark items b }; const _demux = demux(construct, "key", { - objectMode: true, highWaterMark, }); @@ -253,7 +250,7 @@ test("demux() should emit one drain event after slowProcessorSpeed * highWaterMa chunk.mapped.push(1); return chunk; }, - { highWaterMark: 1, objectMode: true }, + { highWaterMark: 1 }, ); first.on("data", () => { @@ -266,7 +263,6 @@ test("demux() should emit one drain event after slowProcessorSpeed * highWaterMa return first; }; const _demux = demux(construct, "key", { - objectMode: true, highWaterMark, }); _demux.on("error", err => { @@ -316,7 +312,7 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar chunk.mapped.push(2); return chunk; }, - { highWaterMark: 1, objectMode: true }, + { highWaterMark: 1 }, ); first.on("data", () => { @@ -329,7 +325,6 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar return first; }; const _demux = demux(construct, "key", { - objectMode: true, highWaterMark: 5, }); @@ -354,7 +349,7 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar }); test.cb( - "demux() should emit drain event when third stream is bottleneck", + "demux() should emit drain event when second stream is bottleneck", t => { t.plan(8); const slowProcessorSpeed = 100; @@ -381,7 +376,7 @@ test.cb( chunk.mapped.push(1); return chunk; }, - { objectMode: true, highWaterMark: 1 }, + { highWaterMark: 1 }, ); const second = map( @@ -390,25 +385,23 @@ test.cb( chunk.mapped.push(2); return chunk; }, - { objectMode: true, highWaterMark: 1 }, + { highWaterMark: 1 }, ); first.pipe(second).pipe(sink); return first; }; const _demux = demux(construct, () => "a", { - objectMode: true, highWaterMark, }); _demux.on("error", err => { t.end(err); }); - // This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event) _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan( - slowProcessorSpeed * (input.length - 2), + slowProcessorSpeed, ); t.pass(); }); @@ -425,14 +418,12 @@ test.cb( let pendingReads = input.length; const start = performance.now(); - input.forEach(item => { - _demux.write(item); - }); + fromArray(input).pipe(_demux); }, ); test.cb( - "demux() should emit drain event when second stream is bottleneck", + "demux() should emit drain event when third stream is bottleneck", t => { t.plan(8); const slowProcessorSpeed = 100; @@ -459,14 +450,14 @@ test.cb( chunk.mapped.push(1); return chunk; }, - { objectMode: true, highWaterMark: 1 }, + { highWaterMark: 1 }, ); const second = map( (chunk: Chunk) => { chunk.mapped.push(2); return chunk; }, - { objectMode: true, highWaterMark: 1 }, + { highWaterMark: 1 }, ); const third = map( @@ -475,7 +466,7 @@ test.cb( chunk.mapped.push(3); return chunk; }, - { objectMode: true, highWaterMark: 1 }, + { highWaterMark: 1 }, ); first @@ -485,7 +476,6 @@ test.cb( return first; }; const _demux = demux(construct, () => "a", { - objectMode: true, highWaterMark, }); _demux.on("error", err => { @@ -496,7 +486,7 @@ test.cb( _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan( - slowProcessorSpeed * (input.length - 4), + slowProcessorSpeed, ); t.pass(); }); @@ -513,13 +503,11 @@ test.cb( let pendingReads = input.length; const start = performance.now(); - input.forEach(item => { - _demux.write(item); - }); + fromArray(input).pipe(_demux); }, ); -test("demux() should be blocked by slowest pipeline", t => { +test.skip("demux() should be blocked by slowest pipeline", t => { t.plan(1); const slowProcessorSpeed = 100; interface Chunk { @@ -534,36 +522,37 @@ test("demux() should be blocked by slowest pipeline", t => { chunk.mapped.push(1); return chunk; }, - { objectMode: true, highWaterMark: 1 }, + { highWaterMark: 1 }, ); - first.on("data", chunk => { - pendingReads--; - if (chunk.key === "b") { - expect(performance.now() - start).to.be.greaterThan( - slowProcessorSpeed * totalItems, - ); - t.pass(); - expect(pendingReads).to.equal(0); - resolve(); - } - }); return first; }; + const _demux = demux(construct, "key", { - objectMode: true, highWaterMark: 1, }); + _demux.on("error", err => { reject(err); }); + _demux.on("data", async chunk => { + pendingReads--; + if (chunk.key === "b") { + expect(performance.now() - start).to.be.greaterThan( + slowProcessorSpeed * totalItems, + ); + t.pass(); + expect(pendingReads).to.equal(0); + resolve(); + } + }); + const input = [ { key: "a", mapped: [] }, { key: "a", mapped: [] }, { key: "c", mapped: [] }, { key: "c", mapped: [] }, - { key: "c", mapped: [] }, { key: "b", mapped: [] }, ]; @@ -609,7 +598,7 @@ test("demux() should emit drain event when second stream in pipeline is bottlene chunk.mapped.push(1); return chunk; }, - { objectMode: true, highWaterMark: 2 }, + { highWaterMark: 2 }, ); const second = map( @@ -620,7 +609,7 @@ test("demux() should emit drain event when second stream in pipeline is bottlene pendingReads--; return chunk; }, - { objectMode: true, highWaterMark: 1 }, + { highWaterMark: 1 }, ); first.pipe(second).pipe(sink); @@ -628,7 +617,6 @@ test("demux() should emit drain event when second stream in pipeline is bottlene }; const _demux = demux(construct, "key", { - objectMode: true, highWaterMark, }); _demux.on("error", err => { @@ -648,9 +636,7 @@ test("demux() should emit drain event when second stream in pipeline is bottlene ]; let pendingReads = input.length; - input.forEach(item => { - _demux.write(item); - }); + fromArray(input).pipe(_demux); }); }); @@ -682,7 +668,7 @@ test.cb("Demux should remux to sink", t => { return dest; }; - const remux = map(d => { + const sink = map(d => { t.deepEqual(d, result[i]); i++; if (i === input.length) { @@ -690,10 +676,230 @@ test.cb("Demux should remux to sink", t => { } }); - const demuxed = demux(construct, "key", { - objectMode: true, - remultiplex: remux, - }); + const demuxed = demux(construct, "key", {}); + + fromArray(input) + .pipe(demuxed) + .pipe(sink); +}); + +test.cb("Demux should send data events", t => { + t.plan(6); + let i = 0; + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, + ]; + const result = [ + { key: "a", visited: ["a"] }, + { key: "b", visited: ["b"] }, + { key: "a", visited: ["a"] }, + { key: "c", visited: ["c"] }, + { key: "a", visited: ["a"] }, + { key: "b", visited: ["b"] }, + ]; + const construct = (destKey: string) => { + const dest = map((chunk: any) => { + chunk.visited.push(destKey); + return chunk; + }); + + return dest; + }; + + const demuxed = demux(construct, "key", {}); fromArray(input).pipe(demuxed); + + demuxed.on("data", d => { + t.deepEqual(d, result[i]); + i++; + if (i === input.length) { + t.end(); + } + }); +}); + +test.cb("demux() `finish` and `end` propagates", t => { + interface Chunk { + key: string; + mapped: number[]; + } + + t.plan(9); + + const construct = (destKey: string) => { + const dest = map((chunk: any) => { + chunk.mapped.push(destKey); + return chunk; + }); + return dest; + }; + + const _demux = demux(construct, "key", { + highWaterMark: 3, + }); + + const fakeSource = new Readable({ + objectMode: true, + read() { + return; + }, + }); + + const sink = map((d: any) => { + const curr = input.shift(); + t.is(curr.key, d.key); + t.deepEqual(d.mapped, [d.key]); + }); + + fakeSource.pipe(_demux).pipe(sink); + + fakeSource.on("end", () => { + t.pass(); + }); + _demux.on("finish", () => { + t.pass(); + }); + _demux.on("unpipe", () => { + t.pass(); + }); + _demux.on("end", () => { + t.pass(); + t.end(); + }); + sink.on("finish", () => { + t.pass(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + ]; + fakeSource.push(input[0]); + fakeSource.push(input[1]); + fakeSource.push(null); +}); + +test.cb("demux() `unpipe` propagates", t => { + interface Chunk { + key: string; + mapped: number[]; + } + + t.plan(7); + + const construct = (destKey: string) => { + const dest = map((chunk: any) => { + chunk.mapped.push(destKey); + return chunk; + }); + return dest; + }; + + const _demux = demux(construct, "key", { + highWaterMark: 3, + }); + + const fakeSource = new Readable({ + objectMode: true, + read() { + return; + }, + }); + + const sink = map((d: any) => { + const curr = input.shift(); + t.is(curr.key, d.key); + t.deepEqual(d.mapped, [d.key]); + }); + + fakeSource.pipe(_demux).pipe(sink); + + _demux.on("unpipe", () => { + t.pass(); + }); + + sink.on("unpipe", () => { + t.pass(); + }); + + sink.on("finish", () => { + t.pass(); + t.end(); + }); + + const input = [ + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "a", mapped: [] }, + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + ]; + fakeSource.push(input[0]); + fakeSource.push(input[1]); + fakeSource.push(null); +}); + +test.cb("demux() should be 'destroyable'", t => { + t.plan(2); + const _sleep = 100; + interface Chunk { + key: string; + mapped: string[]; + } + + const construct = (destKey: string) => { + const first = map(async (chunk: Chunk) => { + await sleep(_sleep); + chunk.mapped.push(destKey); + return chunk; + }); + return first; + }; + + const _demux = demux(construct, "key"); + + const fakeSource = new Readable({ + objectMode: true, + read() { + return; + }, + }); + + const fakeSink = new Writable({ + objectMode: true, + write(data, enc, cb) { + const cur = input.shift(); + t.is(cur.key, data.key); + t.deepEqual(cur.mapped, ["a"]); + if (cur.key === "a") { + _demux.destroy(); + } + cb(); + }, + }); + + _demux.on("close", t.end); + fakeSource.pipe(_demux).pipe(fakeSink); + + const input = [ + { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "c", mapped: [] }, + { key: "d", mapped: [] }, + { key: "e", mapped: [] }, + ]; + fakeSource.push(input[0]); + fakeSource.push(input[1]); + fakeSource.push(input[2]); + fakeSource.push(input[3]); + fakeSource.push(input[4]); }); From 2bbc5c9e0fa0c2600a9d8d454c784e0cc74d500b Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Mon, 27 Jan 2020 13:11:51 -0500 Subject: [PATCH 09/11] types --- src/functions/demux.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index aaf2914..963b6c3 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -35,7 +35,7 @@ class Demux extends Duplex { constructor( construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), - options: DemuxOptions, + options: DemuxOptions = {}, ) { super(options); this.demuxer = From ce2bb55b2430ec732a13a9fbca136b26db564ea3 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Mon, 27 Jan 2020 16:10:00 -0500 Subject: [PATCH 10/11] Emit correct event --- src/functions/demux.ts | 9 +++++---- tests/demux.spec.ts | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 963b6c3..35b3a1f 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -1,4 +1,4 @@ -import { DuplexOptions, Duplex, Transform } from "stream"; +import { DuplexOptions, Duplex, Transform, Writable } from "stream"; import { isReadable } from "../helpers"; @@ -48,10 +48,11 @@ class Demux extends Duplex { ...options, transform: (d, _, cb) => { this.push(d); - cb(null, d); + cb(null); }, }); - this.once("unpipe", () => this._flush()); + + this.on("unpipe", () => this._flush()); } public _read(size: number) {} @@ -87,7 +88,7 @@ class Demux extends Duplex { totalEnded++; if (pipelines.length === totalEnded) { this.push(null); - this.emit("finished"); + this.emit("end"); } }); }); diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 3e1dae0..93be081 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -724,7 +724,7 @@ test.cb("Demux should send data events", t => { }); }); -test.cb("demux() `finish` and `end` propagates", t => { +test.cb.only("demux() `finish` and `end` propagates", t => { interface Chunk { key: string; mapped: number[]; From 12efbec698da1fe1399937310d2f9e8808cc7db5 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Tue, 28 Jan 2020 09:48:29 -0500 Subject: [PATCH 11/11] Update tests --- tests/demux.spec.ts | 87 +++++---------------------------------------- 1 file changed, 9 insertions(+), 78 deletions(-) diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 93be081..6d5e628 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -161,7 +161,7 @@ test.cb("demux() should send input through correct pipeline using keyBy", t => { fromArray(input).pipe(demuxed); }); -test("demux() write should return false after if it has >= highWaterMark items buffered and drain should be emitted", t => { +test("demux() write should return false and emit drain if more than highWaterMark items are buffered", t => { return new Promise(async (resolve, reject) => { t.plan(7); interface Chunk { @@ -224,7 +224,7 @@ test("demux() write should return false after if it has >= highWaterMark items b }); }); -test("demux() should emit one drain event after slowProcessorSpeed * highWaterMark ms", t => { +test("demux() should emit one drain event after slowProcessorSpeed * highWaterMark ms when first stream is bottleneck", t => { return new Promise(async (resolve, reject) => { t.plan(7); interface Chunk { @@ -290,7 +290,7 @@ test("demux() should emit one drain event after slowProcessorSpeed * highWaterMa test("demux() should emit one drain event when writing 6 items with highWaterMark of 5", t => { return new Promise(async (resolve, reject) => { - t.plan(7); + t.plan(1); interface Chunk { key: string; mapped: number[]; @@ -317,7 +317,6 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar first.on("data", () => { pendingReads--; - t.pass(); if (pendingReads === 0) { resolve(); } @@ -349,8 +348,9 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar }); test.cb( - "demux() should emit drain event when second stream is bottleneck", + "demux() should emit drain event when second stream is bottleneck after (highWaterMark - 2) * slowProcessorSpeed ms", t => { + // ie) first two items are pushed directly into first and second streams (highWaterMark - 2 remain in demux) t.plan(8); const slowProcessorSpeed = 100; const highWaterMark = 5; @@ -401,7 +401,7 @@ test.cb( _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan( - slowProcessorSpeed, + slowProcessorSpeed * 3, ); t.pass(); }); @@ -425,6 +425,7 @@ test.cb( test.cb( "demux() should emit drain event when third stream is bottleneck", t => { + // @TODO investigate why drain is emitted after slowProcessorSpeed t.plan(8); const slowProcessorSpeed = 100; const highWaterMark = 5; @@ -482,7 +483,6 @@ test.cb( t.end(err); }); - // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event) _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan( @@ -507,7 +507,7 @@ test.cb( }, ); -test.skip("demux() should be blocked by slowest pipeline", t => { +test("demux() should be blocked by slowest pipeline", t => { t.plan(1); const slowProcessorSpeed = 100; interface Chunk { @@ -571,75 +571,6 @@ test.skip("demux() should be blocked by slowest pipeline", t => { }); }); -test("demux() should emit drain event when second stream in pipeline is bottleneck", t => { - t.plan(5); - const highWaterMark = 3; - return new Promise(async (resolve, reject) => { - interface Chunk { - key: string; - mapped: number[]; - } - const sink = new Writable({ - objectMode: true, - write(chunk, encoding, cb) { - expect(chunk.mapped).to.deep.equal([1, 2]); - t.pass(); - cb(); - if (pendingReads === 0) { - resolve(); - } - }, - }); - - const construct = (destKey: string) => { - const first = map( - (chunk: Chunk) => { - expect(first._readableState.length).to.be.at.most(2); - chunk.mapped.push(1); - return chunk; - }, - { highWaterMark: 2 }, - ); - - const second = map( - async (chunk: Chunk) => { - await sleep(100); - chunk.mapped.push(2); - expect(second._writableState.length).to.be.equal(1); - pendingReads--; - return chunk; - }, - { highWaterMark: 1 }, - ); - - first.pipe(second).pipe(sink); - return first; - }; - - const _demux = demux(construct, "key", { - highWaterMark, - }); - _demux.on("error", err => { - reject(); - }); - - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - t.pass(); - }); - - const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - ]; - let pendingReads = input.length; - - fromArray(input).pipe(_demux); - }); -}); - test.cb("Demux should remux to sink", t => { t.plan(6); let i = 0; @@ -724,7 +655,7 @@ test.cb("Demux should send data events", t => { }); }); -test.cb.only("demux() `finish` and `end` propagates", t => { +test.cb("demux() `finish` and `end` propagates", t => { interface Chunk { key: string; mapped: number[];