This commit is contained in:
Jerry Kurian 2019-08-21 15:40:19 -04:00
parent 7394b6ef84
commit 6581e1d745
7 changed files with 81 additions and 2 deletions

View File

@ -19,3 +19,4 @@ export { replace } from "./replace";
export { split } from "./split"; export { split } from "./split";
export { stringify } from "./stringify"; export { stringify } from "./stringify";
export { unbatch } from "./unbatch"; export { unbatch } from "./unbatch";
export { compose } from "./compose";

45
src/functions/compose.ts Normal file
View File

@ -0,0 +1,45 @@
import { Transform, Writable, Pipe, WritableOptions } from "stream";
class Compose extends Writable implements Pipe {
private head: Writable | Transform;
private tail: Writable | Transform;
constructor(
streams: Array<Transform | Writable>,
options?: WritableOptions,
) {
super(options);
if (streams.length < 2) {
throw new Error("Cannot compose 1 or less streams");
}
this.head = streams[0];
for (let i = 1; i < streams.length; i++) {
streams[i - 1].pipe(streams[i]);
}
this.tail = streams[streams.length - 1];
}
public pipe<T extends NodeJS.WritableStream>(
destination: T,
options: { end?: boolean } | undefined,
) {
return this.tail.pipe(
destination,
options,
);
}
public _write(chunk: any, enc: string, cb: any) {
this.head.write(chunk.toString ? chunk.toString() : chunk, cb);
}
}
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
export function compose(
streams: Array<Transform | Writable>,
options?: WritableOptions,
): Compose {
return new Compose(streams, options);
}

View File

@ -1,4 +1,4 @@
import { Readable, Writable, Transform, Duplex } from "stream"; import { Readable, Writable, WritableOptions, Transform, Duplex } from "stream";
import { ChildProcess } from "child_process"; import { ChildProcess } from "child_process";
import * as baseFunctions from "./baseFunctions"; import * as baseFunctions from "./baseFunctions";
@ -287,3 +287,13 @@ export function accumulatorBy<T, S extends FlushStrategy>(
) { ) {
return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee); return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee);
} }
export function compose(
streams: Array<Writable | Transform>,
options?: WritableOptions,
) {
return baseFunctions.compose(
streams,
options,
);
}

View File

@ -21,4 +21,5 @@ export {
parallelMap, parallelMap,
accumulator, accumulator,
accumulatorBy, accumulatorBy,
compose,
} from "./functions"; } from "./functions";

20
tests/compose.spec.ts Normal file
View File

@ -0,0 +1,20 @@
const test = require("ava");
const { compose, map } = require("../src");
test.cb("compose()", t => {
const first = map((chunk: number) => chunk * 2);
const second = map((chunk: number) => chunk + 1);
const composed = compose(
[first, second],
{ objectMode: true },
);
composed.write(1);
composed.write(2);
composed.write(3);
composed.on("data", data => {
console.log("DATA", data);
});
});

0
tests/composed.spec.ts Normal file
View File

View File

@ -15,5 +15,7 @@
"es2016" "es2016"
], ],
"sourceMap": true, "sourceMap": true,
"declaration": true "declaration": true,
"include": ["src/**/*"],
"exclude": ["tests", "node_modules"]
} }