This commit is contained in:
Jerry Kurian 2019-08-22 12:07:30 -04:00
parent 1e7fad2403
commit d097fa6aa5
5 changed files with 93 additions and 43 deletions

View File

@ -19,4 +19,4 @@ export { replace } from "./replace";
export { split } from "./split"; export { split } from "./split";
export { stringify } from "./stringify"; export { stringify } from "./stringify";
export { unbatch } from "./unbatch"; export { unbatch } from "./unbatch";
export { compose } from "./compose"; export { compose, composeDuplex } from "./compose";

View File

@ -1,45 +1,54 @@
import { Transform, Writable, Pipe, WritableOptions } from "stream"; import {
pipeline,
class Compose extends Writable implements Pipe { Transform,
private head: Writable | Transform; Writable,
private tail: Writable | Transform; Pipe,
constructor( WritableOptions,
streams: Array<Transform | Writable>, Readable,
options?: WritableOptions, Duplex,
) { } from "stream";
super(options);
if (streams.length < 2) {
throw new Error("Cannot compose 1 or less streams");
}
this.head = streams[0];
for (let i = 1; i < streams.length; i++) {
streams[i - 1].pipe(streams[i]);
}
this.tail = streams[streams.length - 1];
}
public pipe<T extends NodeJS.WritableStream>(
destination: T,
options: { end?: boolean } | undefined,
) {
return this.tail.pipe(
destination,
options,
);
}
public _write(chunk: any, enc: string, cb: any) {
this.head.write(chunk.toString ? chunk.toString() : chunk, cb);
}
}
/** /**
* Return a Readable stream of readable streams concatenated together * Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate * @param streams Readable streams to concatenate
*/ */
// First Readable --> Readable
// First Transform | Duplex, Last Writable --> Writable
//
export function compose( export function compose(
streams: Array<Transform | Writable>, streams: Array<Readable | Duplex | Transform | Writable>,
options?: WritableOptions, options?: WritableOptions,
): Compose { ): Duplex {
return new Compose(streams, options); // Maybe just return a new stream here
if (streams.length < 2) {
throw new Error("Not enough");
}
const duplex = new Duplex({
objectMode: true,
write(chunk, enc, cb) {
const first = streams[0] as Writable;
if (!first.write(chunk)) {
first.on("drain", cb);
} else {
cb();
}
},
read(size) {
let chunk;
while (
null !==
(chunk = (streams[streams.length - 1] as Readable).read())
) {
this.push(chunk);
}
},
});
pipeline(streams, (err: any) => {
duplex.emit("error", err);
});
return duplex;
} }

View File

@ -297,3 +297,9 @@ export function compose(
options, options,
); );
} }
export function composeDuplex(
streams: Array<Writable | Transform>,
options?: WritableOptions,
) {
return baseFunctions.composeDuplex(streams, options);
}

View File

@ -22,4 +22,5 @@ export {
accumulator, accumulator,
accumulatorBy, accumulatorBy,
compose, compose,
composeDuplex,
} from "./functions"; } from "./functions";

View File

@ -1,20 +1,54 @@
const test = require("ava"); const test = require("ava");
const { compose, map } = require("../src"); const { compose, composeDuplex, map } = require("../src");
test.cb("compose()", t => { test.cb("compose()", t => {
const first = map((chunk: number) => chunk * 2); const first = map((chunk: number) => chunk + "x");
const second = map((chunk: number) => chunk + 1); const second = map((chunk: number) => chunk + "y");
const composed = compose( const composed = compose(
[first, second], [first, second],
{ objectMode: true }, { objectMode: true },
); );
const third = map((chunk: number) => chunk + "z");
composed
.pipe(third)
.on("data", data => console.log("Piped composed: ", data));
composed.on("data", data => {
console.log("data on composed", data);
t.end();
});
composed.on("error", data => {
console.log("ERROR", data);
});
composed.on("end", data => {
console.log("end", data);
});
composed.write(1); composed.write(1);
composed.write(2); composed.write(2);
composed.write(3); });
test.cb.only("composeDuplex()", t => {
const first = map((chunk: number) => chunk + "x");
const second = map((chunk: number) => chunk + "y");
const composed = composeDuplex([first, second], { objectMode: true });
const third = map((chunk: number) => chunk + "z");
// composed
// .pipe(third)
// .on("data", data => console.log("Piped composed: ", data));
composed.on("data", data => { composed.on("data", data => {
console.log("DATA", data); console.log("data on composed", data);
t.end();
}); });
composed.on("error", data => {
console.log("ERROR", data);
});
composed.on("end", data => {
console.log("end", data);
});
composed.write(1);
composed.write(2);
}); });