diff --git a/src/functions/demux.ts b/src/functions/demux.ts index a9b1011..4b6312a 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -24,7 +24,7 @@ const eventsTarget = { export function demux( construct: () => NodeJS.WritableStream | NodeJS.ReadWriteStream, - demuxBy: { key?: string; keyBy?: (chunk: any) => string }, + demuxBy: string | ((chunk: any) => string), options?: WritableOptions, ): Writable { return new Demux(construct, demuxBy, options); @@ -42,19 +42,17 @@ class Demux extends Writable { construct: ( destKey?: string, ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, - demuxBy: { key?: string; keyBy?: (chunk: any) => string }, + demuxBy: string | ((chunk: any) => string), options?: WritableOptions, ) { super(options); - if (demuxBy.keyBy === undefined && demuxBy.key === undefined) { - throw new Error("keyBy or key must be provided in second argument"); - } - this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); + this.demuxer = + typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy; this.construct = construct; this.streamsByKey = {}; } - public async _write(chunk: any, encoding: any, cb: any) { + public _write(chunk: any, encoding: any, cb: any) { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { this.streamsByKey[destKey] = this.construct(destKey); diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 4201375..725ec43 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -30,7 +30,7 @@ test.cb("demux() constructor should be called once per key", t => { return dest; }); - const demuxed = demux(construct, { key: "key" }, { objectMode: true }); + const demuxed = demux(construct, "key", { objectMode: true }); demuxed.on("finish", () => { expect(construct.withArgs("a").callCount).to.equal(1); @@ -65,7 +65,7 @@ test.cb("demux() should send input through correct pipeline", t => { return dest; }; - const demuxed = demux(construct, { key: "key" }, { objectMode: true }); + const demuxed = demux(construct, "key", { objectMode: true }); demuxed.on("finish", () => { pipelineSpies["a"].getCalls().forEach(call => { @@ -107,11 +107,7 @@ test.cb("demux() constructor should be called once per key using keyBy", t => { return dest; }); - const demuxed = demux( - construct, - { keyBy: item => item.key }, - { objectMode: true }, - ); + const demuxed = demux(construct, item => item.key, { objectMode: true }); demuxed.on("finish", () => { expect(construct.withArgs("a").callCount).to.equal(1); @@ -146,11 +142,7 @@ test.cb("demux() should send input through correct pipeline using keyBy", t => { return dest; }; - const demuxed = demux( - construct, - { keyBy: item => item.key }, - { objectMode: true }, - ); + const demuxed = demux(construct, item => item.key, { objectMode: true }); demuxed.on("finish", () => { pipelineSpies["a"].getCalls().forEach(call => { @@ -211,14 +203,10 @@ test("demux() write should return false after if it has >= highWaterMark items b return first; }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); + const _demux = demux(construct, "key", { + objectMode: true, + highWaterMark, + }); _demux.on("error", err => { reject(); @@ -278,14 +266,10 @@ test("demux() should emit one drain event after slowProcessorSpeed * highWaterMa }); return first; }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); + const _demux = demux(construct, "key", { + objectMode: true, + highWaterMark, + }); _demux.on("error", err => { reject(); }); @@ -345,14 +329,10 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar }); return first; }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark: 5, - }, - ); + const _demux = demux(construct, "key", { + objectMode: true, + highWaterMark: 5, + }); _demux.on("error", err => { reject(); @@ -375,9 +355,9 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar }); test.cb( - "demux() should emit drain event when second stream is bottleneck", + "demux() should emit drain event when third stream is bottleneck", t => { - t.plan(6); + t.plan(8); const slowProcessorSpeed = 100; const highWaterMark = 5; interface Chunk { @@ -417,20 +397,15 @@ test.cb( first.pipe(second).pipe(sink); return first; }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); + const _demux = demux(construct, () => "a", { + objectMode: true, + highWaterMark, + }); _demux.on("error", err => { t.end(err); }); // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first) - // @TODO Verify this is correct behaviour _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan( @@ -441,10 +416,100 @@ test.cb( const input = [ { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "c", mapped: [] }, + { key: "d", mapped: [] }, + { key: "e", mapped: [] }, + { key: "f", mapped: [] }, + { key: "g", mapped: [] }, + ]; + let pendingReads = input.length; + + const start = performance.now(); + input.forEach(item => { + _demux.write(item); + }); + }, +); + +test.cb( + "demux() should emit drain event when second stream is bottleneck", + t => { + t.plan(8); + const slowProcessorSpeed = 100; + const highWaterMark = 5; + interface Chunk { + key: string; + mapped: number[]; + } + const sink = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + expect(chunk.mapped).to.deep.equal([1, 2, 3]); + t.pass(); + pendingReads--; + if (pendingReads === 0) { + t.end(); + } + cb(); + }, + }); + const construct = (destKey: string) => { + const first = map( + (chunk: Chunk) => { + chunk.mapped.push(1); + return chunk; + }, + { objectMode: true, highWaterMark: 1 }, + ); + const second = map( + (chunk: Chunk) => { + chunk.mapped.push(2); + return chunk; + }, + { objectMode: true, highWaterMark: 1 }, + ); + + const third = map( + async (chunk: Chunk) => { + await sleep(slowProcessorSpeed); + chunk.mapped.push(3); + return chunk; + }, + { objectMode: true, highWaterMark: 1 }, + ); + + first + .pipe(second) + .pipe(third) + .pipe(sink); + return first; + }; + const _demux = demux(construct, () => "a", { + objectMode: true, + highWaterMark, + }); + _demux.on("error", err => { + t.end(err); + }); + + // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first) + _demux.on("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + expect(performance.now() - start).to.be.greaterThan( + slowProcessorSpeed * (input.length - 4), + ); + t.pass(); + }); + + const input = [ { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, + { key: "b", mapped: [] }, + { key: "c", mapped: [] }, + { key: "d", mapped: [] }, + { key: "e", mapped: [] }, + { key: "f", mapped: [] }, + { key: "g", mapped: [] }, ]; let pendingReads = input.length; @@ -486,14 +551,10 @@ test("demux() should be blocked by slowest pipeline", t => { }); return first; }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark: 1, - }, - ); + const _demux = demux(construct, "key", { + objectMode: true, + highWaterMark: 1, + }); _demux.on("error", err => { reject(err); }); @@ -567,14 +628,10 @@ test("demux() should emit drain event when second stream in pipeline is bottlene return first; }; - const _demux = demux( - construct, - { key: "key" }, - { - objectMode: true, - highWaterMark, - }, - ); + const _demux = demux(construct, "key", { + objectMode: true, + highWaterMark, + }); _demux.on("error", err => { reject(); });