diff --git a/src/functions/baseFunctions.ts b/src/functions/baseFunctions.ts index 2b2b067..005aa3a 100644 --- a/src/functions/baseFunctions.ts +++ b/src/functions/baseFunctions.ts @@ -19,3 +19,4 @@ export { replace } from "./replace"; export { split } from "./split"; export { stringify } from "./stringify"; export { unbatch } from "./unbatch"; +export { compose } from "./compose"; diff --git a/src/functions/compose.ts b/src/functions/compose.ts new file mode 100644 index 0000000..ed6df07 --- /dev/null +++ b/src/functions/compose.ts @@ -0,0 +1,45 @@ +import { Transform, Writable, Pipe, WritableOptions } from "stream"; + +class Compose extends Writable implements Pipe { + private head: Writable | Transform; + private tail: Writable | Transform; + constructor( + streams: Array, + options?: WritableOptions, + ) { + super(options); + if (streams.length < 2) { + throw new Error("Cannot compose 1 or less streams"); + } + this.head = streams[0]; + for (let i = 1; i < streams.length; i++) { + streams[i - 1].pipe(streams[i]); + } + this.tail = streams[streams.length - 1]; + } + + public pipe( + destination: T, + options: { end?: boolean } | undefined, + ) { + return this.tail.pipe( + destination, + options, + ); + } + + public _write(chunk: any, enc: string, cb: any) { + this.head.write(chunk.toString ? chunk.toString() : chunk, cb); + } +} + +/** + * Return a Readable stream of readable streams concatenated together + * @param streams Readable streams to concatenate + */ +export function compose( + streams: Array, + options?: WritableOptions, +): Compose { + return new Compose(streams, options); +} diff --git a/src/functions/index.ts b/src/functions/index.ts index 8bdbf75..ceba200 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -1,4 +1,4 @@ -import { Readable, Writable, Transform, Duplex } from "stream"; +import { Readable, Writable, WritableOptions, Transform, Duplex } from "stream"; import { ChildProcess } from "child_process"; import * as baseFunctions from "./baseFunctions"; @@ -287,3 +287,13 @@ export function accumulatorBy( ) { return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee); } + +export function compose( + streams: Array, + options?: WritableOptions, +) { + return baseFunctions.compose( + streams, + options, + ); +} diff --git a/src/index.ts b/src/index.ts index c0eabe4..98b0a45 100644 --- a/src/index.ts +++ b/src/index.ts @@ -21,4 +21,5 @@ export { parallelMap, accumulator, accumulatorBy, + compose, } from "./functions"; diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts new file mode 100644 index 0000000..001dbe0 --- /dev/null +++ b/tests/compose.spec.ts @@ -0,0 +1,20 @@ +const test = require("ava"); +const { compose, map } = require("../src"); + +test.cb("compose()", t => { + const first = map((chunk: number) => chunk * 2); + const second = map((chunk: number) => chunk + 1); + + const composed = compose( + [first, second], + { objectMode: true }, + ); + + composed.write(1); + composed.write(2); + composed.write(3); + + composed.on("data", data => { + console.log("DATA", data); + }); +}); diff --git a/tests/composed.spec.ts b/tests/composed.spec.ts new file mode 100644 index 0000000..e69de29 diff --git a/tsconfig.json b/tsconfig.json index fabdff4..3a85e23 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -15,5 +15,7 @@ "es2016" ], "sourceMap": true, - "declaration": true + "declaration": true, + "include": ["src/**/*"], + "exclude": ["tests", "node_modules"] }