From 71b03678ba7fb6ea51cc1c969e52d5d7cd1f3dc6 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Mon, 27 Apr 2020 12:25:36 -0400 Subject: [PATCH] Add chunk to constructor (#5) * Add chunk to constructor * Add test * Bump version --- package.json | 2 +- src/functions/demux.ts | 16 ++++------------ tests/demux.spec.ts | 28 ++++++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/package.json b/package.json index c0ec76b..7683ec4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@jogogo/mhysa", - "version": "2.0.0-alpha.2", + "version": "2.0.0-alpha.3", "description": "Streams and event emitter utils for Node.js", "keywords": [ "promise", diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 9fb5735..6d4d093 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -1,15 +1,7 @@ -import { DuplexOptions, Duplex, Transform, Writable } from "stream"; +import { DuplexOptions, Duplex, Transform } from "stream"; import { isReadable } from "../helpers"; -enum EventSubscription { - Last = 0, - First, - All, - Self, - Unhandled, -} - type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; export interface DemuxOptions extends DuplexOptions { @@ -17,7 +9,7 @@ export interface DemuxOptions extends DuplexOptions { } export function demux( - construct: (destKey?: string) => DemuxStreams, + construct: (destKey?: string, chunk?: any) => DemuxStreams, demuxBy: string | ((chunk: any) => string), options?: DemuxOptions, ): Duplex { @@ -29,7 +21,7 @@ class Demux extends Duplex { [key: string]: DemuxStreams; }; private demuxer: (chunk: any) => string; - private construct: (destKey?: string) => DemuxStreams; + private construct: (destKey?: string, chunk?: any) => DemuxStreams; private remultiplex: boolean; private transform: Transform; constructor( @@ -61,7 +53,7 @@ class Demux extends Duplex { public async _write(chunk: any, encoding: any, cb: any) { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { - const newPipeline = await this.construct(destKey); + const newPipeline = await this.construct(destKey, chunk); this.streamsByKey[destKey] = newPipeline; if (this.remultiplex && isReadable(newPipeline)) { (newPipeline as NodeJS.ReadWriteStream).pipe(this.transform); diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 6d5e628..b8c8a5f 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -44,6 +44,34 @@ test.cb("demux() constructor should be called once per key", t => { fromArray(input).pipe(demuxed); }); +test.cb("demux() item written passed in constructor", t => { + t.plan(4); + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "c", visited: [] }, + ]; + const construct = sinon.spy((destKey: string, item: any) => { + expect(item).to.deep.equal({ key: destKey, visited: [] }); + t.pass(); + const dest = map((chunk: Test) => { + chunk.visited.push(1); + return chunk; + }); + + return dest; + }); + + const demuxed = demux(construct, "key", {}); + + demuxed.on("finish", () => { + t.pass(); + t.end(); + }); + + fromArray(input).pipe(demuxed); +}); + test.cb("demux() should send input through correct pipeline", t => { t.plan(6); const input = [