diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 0ad2e57..a9b1011 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -31,15 +31,10 @@ export function demux( } class Demux extends Writable { - public isWritable: boolean; private streamsByKey: { - [key: string]: { - stream: NodeJS.WritableStream | NodeJS.ReadWriteStream; - writable: boolean; - }; + [key: string]: NodeJS.WritableStream | NodeJS.ReadWriteStream; }; private demuxer: (chunk: any) => string; - private nonWritableStreams: Array; private construct: ( destKey?: string, ) => NodeJS.WritableStream | NodeJS.ReadWriteStream; @@ -57,26 +52,19 @@ class Demux extends Writable { this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); this.construct = construct; this.streamsByKey = {}; - this.isWritable = true; - this.nonWritableStreams = []; } - // Throttles when one stream is not writable public async _write(chunk: any, encoding: any, cb: any) { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { - this.streamsByKey[destKey] = { - stream: this.construct(destKey), - writable: true, - }; + this.streamsByKey[destKey] = this.construct(destKey); } - if (!this.streamsByKey[destKey].stream.write(chunk, encoding, cb)) { - await new Promise((resolve, reject) => { - this.streamsByKey[destKey].stream.once("drain", () => { - resolve(); - this.emit("drain"); - }); + if (!this.streamsByKey[destKey].write(chunk, encoding)) { + this.streamsByKey[destKey].once("drain", () => { + cb(); }); + } else { + cb(); } } @@ -87,7 +75,7 @@ class Demux extends Writable { break; case EventSubscription.All: Object.keys(this.streamsByKey).forEach(key => - this.streamsByKey[key].stream.on(event, cb), + this.streamsByKey[key].on(event, cb), ); break; case EventSubscription.Unhandled: @@ -107,7 +95,7 @@ class Demux extends Writable { break; case EventSubscription.All: Object.keys(this.streamsByKey).forEach(key => - this.streamsByKey[key].stream.once(event, cb), + this.streamsByKey[key].once(event, cb), ); break; case EventSubscription.Unhandled: diff --git a/src/functions/map.ts b/src/functions/map.ts index c088c72..13834af 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -12,12 +12,7 @@ export function map( return new Transform({ ...options, async transform(chunk: T, encoding, callback) { - try { - const mapped = await mapper(chunk, encoding); - callback(null, mapped); - } catch (err) { - callback(err); - } + callback(null, await mapper(chunk, encoding)); }, }); } diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index bde4ffe..aedbe6c 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -172,63 +172,6 @@ test.cb("demux() should send input through correct pipeline using keyBy", t => { demuxed.end(); }); -// Probably needs to be removed -test.cb("should emit errors", t => { - t.plan(2); - let index = 0; - const input = [ - { key: "a", visited: [] }, - { key: "b", visited: [] }, - { key: "a", visited: [] }, - { key: "a", visited: [] }, - ]; - const results = [ - { key: "a", visited: [0] }, - { key: "b", visited: [1] }, - { key: "a", visited: [2] }, - { key: "a", visited: [3] }, - ]; - const destinationStreamKeys = []; - const sink = new Writable({ - objectMode: true, - write(chunk, enc, cb) { - expect(results).to.deep.include(chunk); - expect(input).to.not.deep.include(chunk); - t.pass(); - cb(); - }, - }); - - const construct = (destKey: string) => { - destinationStreamKeys.push(destKey); - const dest = map((chunk: Test) => { - if (chunk.key === "b") { - throw new Error("Caught object with key 'b'"); - } - - const _chunk = { ...chunk, visited: [] }; - _chunk.visited.push(index); - index++; - return _chunk; - }).on("error", () => {}); // Otherwise ava complains - - dest.pipe(sink); - return dest; - }; - - const demuxed = demux( - construct, - { keyBy: (chunk: any) => chunk.key }, - { objectMode: true }, - ); - demuxed.on("error", e => { - expect(e.message).to.equal("Caught object with key 'b'"); - t.pass(); - t.end(); - }); - input.forEach(event => demuxed.write(event)); -}); - test("demux() when write returns false, drain event should be emitted after at least slowProcessorSpeed * highWaterMark", t => { return new Promise(async (resolve, reject) => { t.plan(7); @@ -259,11 +202,14 @@ test("demux() when write returns false, drain event should be emitted after at l }, }); const construct = (destKey: string) => { - const first = map(async (chunk: Chunk) => { - await sleep(slowProcessorSpeed); - chunk.mapped.push(1); - return chunk; - }); + const first = map( + async (chunk: Chunk) => { + await sleep(slowProcessorSpeed); + chunk.mapped.push(1); + return chunk; + }, + { highWaterMark: 1, objectMode: true }, + ); const second = map(async (chunk: Chunk) => { chunk.mapped.push(2); @@ -285,14 +231,6 @@ test("demux() when write returns false, drain event should be emitted after at l reject(); }); - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.greaterThan( - slowProcessorSpeed * highWaterMark, - ); - t.pass(); - }); - let start = null; for (const item of input) { const res = _demux.write(item); @@ -301,6 +239,11 @@ test("demux() when write returns false, drain event should be emitted after at l start = performance.now(); await new Promise((resolv, rej) => { _demux.once("drain", () => { + expect(_demux._writableState.length).to.be.equal(0); + expect(performance.now() - start).to.be.greaterThan( + slowProcessorSpeed * highWaterMark, + ); + t.pass(); resolv(); }); }); @@ -318,63 +261,60 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar } const highWaterMark = 5; const input = [ - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, - { key: "a", mapped: [] }, + { key: "a", val: 1, mapped: [] }, + { key: "a", val: 2, mapped: [] }, + { key: "a", val: 3, mapped: [] }, + { key: "a", val: 4, mapped: [] }, + { key: "a", val: 5, mapped: [] }, + { key: "a", val: 6, mapped: [] }, ]; let pendingReads = input.length; const sink = new Writable({ objectMode: true, write(chunk, encoding, cb) { cb(); - t.pass(); pendingReads--; + t.pass(); if (pendingReads === 0) { resolve(); } }, }); const construct = (destKey: string) => { - const first = map(async (chunk: Chunk) => { - chunk.mapped.push(1); - return chunk; - }); + const pipeline = map( + async (chunk: Chunk) => { + await sleep(50); + chunk.mapped.push(2); + return chunk; + }, + { highWaterMark: 1, objectMode: true }, + ); - const second = map(async (chunk: Chunk) => { - chunk.mapped.push(2); - return chunk; - }); - - first.pipe(second).pipe(sink); - return first; + pipeline.pipe(sink); + return pipeline; }; const _demux = demux( construct, { key: "key" }, { objectMode: true, - highWaterMark, + highWaterMark: 5, }, ); + _demux.on("error", err => { reject(); }); - _demux.on("drain", () => { - expect(_demux._writableState.length).to.be.equal(0); - t.pass(); - }); - for (const item of input) { const res = _demux.write(item); expect(_demux._writableState.length).to.be.at.most(highWaterMark); if (!res) { - await new Promise((_resolve, _reject) => { + await new Promise(_resolve => { _demux.once("drain", () => { _resolve(); + expect(_demux._writableState.length).to.be.equal(0); + t.pass(); }); }); } @@ -386,8 +326,8 @@ test.cb( "demux() should emit drain event immediately when second stream is bottleneck", t => { t.plan(6); - const highWaterMark = 5; - const slowProcessorSpeed = 200; + const slowProcessorSpeed = 100; + const highWaterMark = 3; interface Chunk { key: string; mapped: number[]; @@ -395,11 +335,13 @@ test.cb( const sink = new Writable({ objectMode: true, write(chunk, encoding, cb) { + expect(chunk.mapped).to.deep.equal([1, 2]); t.pass(); - cb(); + pendingReads--; if (pendingReads === 0) { t.end(); } + cb(); }, }); const construct = (destKey: string) => { @@ -408,16 +350,13 @@ test.cb( chunk.mapped.push(1); return chunk; }, - { objectMode: true }, + { objectMode: true, highWaterMark: 1 }, ); const second = map( async (chunk: Chunk) => { - pendingReads--; await sleep(slowProcessorSpeed); chunk.mapped.push(2); - expect(second._writableState.length).to.be.equal(1); - expect(first._readableState.length).to.equal(pendingReads); return chunk; }, { objectMode: true, highWaterMark: 1 }, @@ -440,8 +379,9 @@ test.cb( _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.lessThan( - slowProcessorSpeed, + // Should take longer than the amount of items needed to be processed until we are under highWaterMark + expect(performance.now() - start).to.be.greaterThan( + slowProcessorSpeed * (input.length - highWaterMark - 1), ); t.pass(); }); @@ -453,18 +393,18 @@ test.cb( { key: "a", mapped: [] }, { key: "a", mapped: [] }, ]; - let pendingReads = input.length; + + const start = performance.now(); input.forEach(item => { _demux.write(item); }); - const start = performance.now(); }, ); -test.only("demux() should only emit drain event when all streams are writable", t => { +test("demux() should be blocked by slowest pipeline", t => { t.plan(1); - const highWaterMark = 2; + const slowProcessorSpeed = 100; interface Chunk { key: string; mapped: number[]; @@ -476,33 +416,26 @@ test.only("demux() should only emit drain event when all streams are writable", cb(); pendingReads--; if (chunk.key === "b") { - expect(performance.now() - start).to.be.greaterThan(150); + expect(performance.now() - start).to.be.greaterThan( + slowProcessorSpeed * totalItems, + ); t.pass(); - } - if (pendingReads === 0) { + expect(pendingReads).to.equal(0); resolve(); } }, }); const construct = (destKey: string) => { const first = map( - (chunk: Chunk) => { + async (chunk: Chunk) => { + await sleep(slowProcessorSpeed); chunk.mapped.push(1); return chunk; }, { objectMode: true, highWaterMark: 1 }, ); - const second = map( - async (chunk: Chunk) => { - await sleep(2000); - chunk.mapped.push(2); - return chunk; - }, - { objectMode: true, highWaterMark: 1 }, - ); - - first.pipe(second).pipe(sink); + first.pipe(sink); return first; }; const _demux = demux( @@ -510,6 +443,7 @@ test.only("demux() should only emit drain event when all streams are writable", { key: "key" }, { objectMode: true, + highWaterMark: 1, }, ); _demux.on("error", err => { @@ -521,13 +455,21 @@ test.only("demux() should only emit drain event when all streams are writable", { key: "a", mapped: [] }, { key: "c", mapped: [] }, { key: "c", mapped: [] }, - { key: "b", mapped: [] }, // should only be recieved after a and c become writable + { key: "c", mapped: [] }, + { key: "b", mapped: [] }, ]; let pendingReads = input.length; + const totalItems = input.length; const start = performance.now(); for (const item of input) { - console.log("DEMUX", _demux.write(item)); + if (!_demux.write(item)) { + await new Promise(_resolve => { + _demux.once("drain", () => { + _resolve(); + }); + }); + } } }); }); @@ -543,6 +485,7 @@ test("demux() should emit drain event and first should contain up to highWaterMa const sink = new Writable({ objectMode: true, write(chunk, encoding, cb) { + expect(chunk.mapped).to.deep.equal([1, 2]); t.pass(); cb(); if (pendingReads === 0) { @@ -557,7 +500,7 @@ test("demux() should emit drain event and first should contain up to highWaterMa chunk.mapped.push(1); return chunk; }, - { objectMode: 2, highWaterMark: 2 }, + { objectMode: true, highWaterMark: 1 }, ); const second = map( @@ -568,7 +511,7 @@ test("demux() should emit drain event and first should contain up to highWaterMa pendingReads--; return chunk; }, - { objectMode: 2, highWaterMark: 2 }, + { objectMode: true, highWaterMark: 1 }, ); first.pipe(second).pipe(sink); diff --git a/tests/map.spec.ts b/tests/map.spec.ts index 75210ff..35c8e84 100644 --- a/tests/map.spec.ts +++ b/tests/map.spec.ts @@ -49,61 +49,3 @@ test.cb("map() maps elements asynchronously", t => { source.push("c"); source.push(null); }); - -test.cb("map() emits errors during synchronous mapping", t => { - t.plan(3); - const source = new Readable({ objectMode: true }); - const mapStream = map((element: string) => { - if (element !== "b") { - throw new Error("Failed mapping"); - } - return element.toUpperCase(); - }); - source - .pipe(mapStream) - .on("data", data => { - expect(data).to.equal("B"); - t.pass(); - }) - .on("error", err => { - source.pipe(mapStream); - mapStream.resume(); - expect(err.message).to.equal("Failed mapping"); - t.pass(); - }) - .on("end", t.end); - - source.push("a"); - source.push("b"); - source.push("c"); - source.push(null); -}); - -test("map() emits errors during asynchronous mapping", t => { - t.plan(1); - return new Promise((resolve, _) => { - const source = new Readable({ objectMode: true }); - const mapStream = map(async (element: string) => { - await Promise.resolve(); - if (element === "b") { - throw new Error("Failed mapping"); - } - return element.toUpperCase(); - }); - source - .pipe(mapStream) - .on("error", err => { - expect(err.message).to.equal("Failed mapping"); - t.pass(); - resolve(); - }) - .on("end", () => t.fail); - - source.push("a"); - source.push("b"); - source.push("c"); - source.push(null); - source.push(null); - source.push(null); - }); -});