From d097fa6aa50743fd0114a467264668fd84612b69 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 22 Aug 2019 12:07:30 -0400 Subject: [PATCH] Save --- src/functions/baseFunctions.ts | 2 +- src/functions/compose.ts | 83 +++++++++++++++++++--------------- src/functions/index.ts | 6 +++ src/index.ts | 1 + tests/compose.spec.ts | 44 ++++++++++++++++-- 5 files changed, 93 insertions(+), 43 deletions(-) diff --git a/src/functions/baseFunctions.ts b/src/functions/baseFunctions.ts index 005aa3a..6bf75a6 100644 --- a/src/functions/baseFunctions.ts +++ b/src/functions/baseFunctions.ts @@ -19,4 +19,4 @@ export { replace } from "./replace"; export { split } from "./split"; export { stringify } from "./stringify"; export { unbatch } from "./unbatch"; -export { compose } from "./compose"; +export { compose, composeDuplex } from "./compose"; diff --git a/src/functions/compose.ts b/src/functions/compose.ts index ed6df07..8d21283 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -1,45 +1,54 @@ -import { Transform, Writable, Pipe, WritableOptions } from "stream"; - -class Compose extends Writable implements Pipe { - private head: Writable | Transform; - private tail: Writable | Transform; - constructor( - streams: Array, - options?: WritableOptions, - ) { - super(options); - if (streams.length < 2) { - throw new Error("Cannot compose 1 or less streams"); - } - this.head = streams[0]; - for (let i = 1; i < streams.length; i++) { - streams[i - 1].pipe(streams[i]); - } - this.tail = streams[streams.length - 1]; - } - - public pipe( - destination: T, - options: { end?: boolean } | undefined, - ) { - return this.tail.pipe( - destination, - options, - ); - } - - public _write(chunk: any, enc: string, cb: any) { - this.head.write(chunk.toString ? chunk.toString() : chunk, cb); - } -} +import { + pipeline, + Transform, + Writable, + Pipe, + WritableOptions, + Readable, + Duplex, +} from "stream"; /** * Return a Readable stream of readable streams concatenated together * @param streams Readable streams to concatenate */ + +// First Readable --> Readable +// First Transform | Duplex, Last Writable --> Writable +// export function compose( - streams: Array, + streams: Array, options?: WritableOptions, -): Compose { - return new Compose(streams, options); +): Duplex { + // Maybe just return a new stream here + if (streams.length < 2) { + throw new Error("Not enough"); + } + + const duplex = new Duplex({ + objectMode: true, + write(chunk, enc, cb) { + const first = streams[0] as Writable; + if (!first.write(chunk)) { + first.on("drain", cb); + } else { + cb(); + } + }, + read(size) { + let chunk; + while ( + null !== + (chunk = (streams[streams.length - 1] as Readable).read()) + ) { + this.push(chunk); + } + }, + }); + + pipeline(streams, (err: any) => { + duplex.emit("error", err); + }); + + return duplex; } diff --git a/src/functions/index.ts b/src/functions/index.ts index ceba200..9d08eba 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -297,3 +297,9 @@ export function compose( options, ); } +export function composeDuplex( + streams: Array, + options?: WritableOptions, +) { + return baseFunctions.composeDuplex(streams, options); +} diff --git a/src/index.ts b/src/index.ts index 98b0a45..d6b784c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,4 +22,5 @@ export { accumulator, accumulatorBy, compose, + composeDuplex, } from "./functions"; diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 001dbe0..9fa8d82 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -1,20 +1,54 @@ const test = require("ava"); -const { compose, map } = require("../src"); +const { compose, composeDuplex, map } = require("../src"); test.cb("compose()", t => { - const first = map((chunk: number) => chunk * 2); - const second = map((chunk: number) => chunk + 1); + const first = map((chunk: number) => chunk + "x"); + const second = map((chunk: number) => chunk + "y"); const composed = compose( [first, second], { objectMode: true }, ); + const third = map((chunk: number) => chunk + "z"); + composed + .pipe(third) + .on("data", data => console.log("Piped composed: ", data)); + + composed.on("data", data => { + console.log("data on composed", data); + t.end(); + }); + composed.on("error", data => { + console.log("ERROR", data); + }); + composed.on("end", data => { + console.log("end", data); + }); composed.write(1); composed.write(2); - composed.write(3); +}); +test.cb.only("composeDuplex()", t => { + const first = map((chunk: number) => chunk + "x"); + const second = map((chunk: number) => chunk + "y"); + + const composed = composeDuplex([first, second], { objectMode: true }); + const third = map((chunk: number) => chunk + "z"); + // composed + // .pipe(third) + // .on("data", data => console.log("Piped composed: ", data)); composed.on("data", data => { - console.log("DATA", data); + console.log("data on composed", data); + t.end(); }); + composed.on("error", data => { + console.log("ERROR", data); + }); + composed.on("end", data => { + console.log("end", data); + }); + + composed.write(1); + composed.write(2); });