From 9b09a3f9490896851936f3895c91a8b99b156de8 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Wed, 28 Aug 2019 17:01:51 -0400 Subject: [PATCH] Add demux --- src/functions/baseFunctions.ts | 1 + src/functions/demux.ts | 49 +++++++++++++++++++++++++++++ src/functions/index.ts | 10 ++++++ src/index.ts | 1 + tests/demux.spec.ts | 57 ++++++++++++++++++++++++++++++++++ tslint.json | 3 +- 6 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 src/functions/demux.ts create mode 100644 tests/demux.spec.ts diff --git a/src/functions/baseFunctions.ts b/src/functions/baseFunctions.ts index 005aa3a..6ff480a 100644 --- a/src/functions/baseFunctions.ts +++ b/src/functions/baseFunctions.ts @@ -20,3 +20,4 @@ export { split } from "./split"; export { stringify } from "./stringify"; export { unbatch } from "./unbatch"; export { compose } from "./compose"; +export { demux } from "./demux"; diff --git a/src/functions/demux.ts b/src/functions/demux.ts new file mode 100644 index 0000000..a0fead2 --- /dev/null +++ b/src/functions/demux.ts @@ -0,0 +1,49 @@ +import { WritableOptions, Writable } from "stream"; + +/** + * Return a Duplex stream that is pushed data from multiple sources + * @param streams Source streams to multiplex + * @param options Duplex stream options + */ +export function demux( + construct: () => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxBy: { key?: string; keyBy?: (chunk: any) => string }, + options?: WritableOptions, +): Writable { + return new Demux(construct, demuxBy, options); +} + +class Demux extends Writable { + private keyMap: object; + private demuxer: (chunk: any) => string; + private construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream; + constructor( + construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxBy: { key?: string; keyBy?: (chunk: any) => string }, + options?: WritableOptions, + ) { + super(options); + if (demuxBy.keyBy === undefined && demuxBy.key === undefined) { + throw new Error("Need one"); + } + this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); + this.construct = construct; + this.keyMap = {}; + } + + public write(chunk: any, encoding?: any, cb?: any): boolean { + const destKey = this.demuxer(chunk); + if (this.keyMap[destKey] === undefined) { + this.keyMap[destKey] = this.construct(destKey); + } + const writeRes = this.keyMap[destKey].write(chunk); + if (cb !== undefined) { + cb(); + } + return writeRes; + } +} diff --git a/src/functions/index.ts b/src/functions/index.ts index 21ad7f9..ce50218 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -297,3 +297,13 @@ export function compose( options, ); } + +export function demux( + construct: ( + destKey?: string, + ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, + demuxer: { key?: string; keyBy?: (chunk: any) => string }, + options?: DuplexOptions, +) { + return baseFunctions.demux(construct, demuxer, options); +} diff --git a/src/index.ts b/src/index.ts index 98b0a45..924b246 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,4 +22,5 @@ export { accumulator, accumulatorBy, compose, + demux, } from "./functions"; diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts new file mode 100644 index 0000000..3c124b9 --- /dev/null +++ b/tests/demux.spec.ts @@ -0,0 +1,57 @@ +import test from "ava"; +import { expect } from "chai"; +import { demux, map } from "../src"; +import { Readable, Transform } from "stream"; + +interface Test { + key: string; + val: number; +} +test.cb("should spread per key", t => { + t.plan(5); + const input = [ + { key: "a", val: 1 }, + { key: "a", val: 2 }, + { key: "b", val: 3 }, + { key: "c", val: 4 }, + ]; + const results = [ + { key: "a", val: 2 }, + { key: "a", val: 3 }, + { key: "b", val: 4 }, + { key: "c", val: 5 }, + ]; + const destKeys = []; + const dests = []; + let i = 0; + + const construct = (destKey: string) => { + destKeys.push(destKey); + const dest = map((chunk: Test) => ({ + ...chunk, + val: chunk.val + 1, + })) + .on("data", (d: Test) => { + expect(results).to.deep.include(d); + t.pass(); + }) + .on("end", () => { + i++; + if (i === dests.length) { + t.end(); + } + }); + dests.push(dest); + return dest; + }; + + const demuxed = demux(construct, { key: "key" }, { objectMode: true }); + demuxed.on("finish", () => { + expect(destKeys).to.deep.equal(["a", "b", "c"]); + t.pass(); + dests.forEach(dest => dest.end()); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); diff --git a/tslint.json b/tslint.json index becd92c..03b2e44 100644 --- a/tslint.json +++ b/tslint.json @@ -9,6 +9,7 @@ "no-implicit-dependencies": [true, "dev"], "prettier": [true, ".prettierrc"], "ordered-imports": false, - "interface-name": false + "interface-name": false, + "object-literal-sort-keys": false } }