diff --git a/src/functions/demux.ts b/src/functions/demux.ts index feeb691..1c39f5d 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -10,7 +10,7 @@ enum EventSubscription { const eventsTarget = { close: EventSubscription.Self, - data: EventSubscription.Self, + data: EventSubscription.All, drain: EventSubscription.Self, end: EventSubscription.Self, error: EventSubscription.Self, diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 5c7e017..dc71ca3 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -5,7 +5,7 @@ import { Writable } from "stream"; const sinon = require("sinon"); const { sleep } = require("../src/helpers"); import { performance } from "perf_hooks"; -const { demux, map } = mhysa(); +const { demux, map } = mhysa({ objectMode: true }); interface Test { key: string; @@ -655,3 +655,47 @@ test("demux() should emit drain event when second stream in pipeline is bottlene }); }); }); + +test.cb("Demux should remux to sink", t => { + t.plan(6); + let i = 0; + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, + ]; + const result = [ + { key: "a", visited: ["a"] }, + { key: "b", visited: ["b"] }, + { key: "a", visited: ["a"] }, + { key: "c", visited: ["c"] }, + { key: "a", visited: ["a"] }, + { key: "b", visited: ["b"] }, + ]; + const construct = (destKey: string) => { + const dest = map((chunk: any) => { + chunk.visited.push(destKey); + return chunk; + }); + + return dest; + }; + + const remux = map(d => { + t.deepEqual(d, result[i]); + i++; + if (i === input.length) { + t.end(); + } + }); + + const demuxed = demux(construct, "key", { + objectMode: true, + remultiplex: remux, + }); + + input.forEach(event => demuxed.write(event)); +});