diff --git a/src/functions/compose.ts b/src/functions/compose.ts index f53aa6c..40525a0 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -52,8 +52,10 @@ class Compose extends Duplex { return (this.last as NodeJS.ReadableStream).pipe(dest); } - public write(chunk: any) { - return (this.first as NodeJS.WritableStream).write(chunk); + public _write(chunk: any, encoding: string, cb: any) { + const res = (this.first as NodeJS.WritableStream).write(chunk); + cb(); + return res; } public on(event: string, cb: any) { diff --git a/src/functions/demux.ts b/src/functions/demux.ts index a0fead2..ef39284 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -14,7 +14,9 @@ export function demux( } class Demux extends Writable { - private keyMap: object; + private keyMap: { + [key: string]: NodeJS.WritableStream | NodeJS.ReadWriteStream; + }; private demuxer: (chunk: any) => string; private construct: ( destKey?: string, @@ -28,22 +30,22 @@ class Demux extends Writable { ) { super(options); if (demuxBy.keyBy === undefined && demuxBy.key === undefined) { - throw new Error("Need one"); + throw new Error( + "keyBy or key must be provided in second parameter", + ); } this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); this.construct = construct; this.keyMap = {}; } - public write(chunk: any, encoding?: any, cb?: any): boolean { + public _write(chunk: any, encoding: string, cb: any) { const destKey = this.demuxer(chunk); if (this.keyMap[destKey] === undefined) { - this.keyMap[destKey] = this.construct(destKey); + this.keyMap[destKey] = this.construct(destKey).on("error", e => { + this.emit("error", e); + }); } - const writeRes = this.keyMap[destKey].write(chunk); - if (cb !== undefined) { - cb(); - } - return writeRes; + return this.keyMap[destKey].write(chunk, encoding, cb); } } diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index dcd8ad0..4ed3e3f 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -26,14 +26,10 @@ test.cb("should spread per key", t => { const sink = new Writable({ objectMode: true, write(chunk, enc, cb) { - i++; expect(results).to.deep.include(chunk); expect(input).to.not.deep.include(chunk); t.pass(); cb(); - if (i === 4) { - t.end(); - } }, }); const construct = (destKey: string) => { @@ -53,6 +49,7 @@ test.cb("should spread per key", t => { demuxed.on("finish", () => { expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); t.pass(); + t.end(); }); input.forEach(event => demuxed.write(event)); @@ -74,18 +71,13 @@ test.cb("should spread per key using keyBy", t => { { key: "c", val: 5 }, ]; const destinationStreamKeys = []; - let i = 0; const sink = new Writable({ objectMode: true, write(chunk, enc, cb) { - i++; expect(results).to.deep.include(chunk); expect(input).to.not.deep.include(chunk); t.pass(); cb(); - if (i === 4) { - t.end(); - } }, }); const construct = (destKey: string) => { @@ -109,6 +101,7 @@ test.cb("should spread per key using keyBy", t => { demuxed.on("finish", () => { expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); t.pass(); + t.end(); }); input.forEach(event => demuxed.write(event));