Use class

This commit is contained in:
Jerry Kurian 2019-08-22 15:35:36 -04:00
parent 1d0e15890c
commit f35f025dbc

View File

@ -1,11 +1,4 @@
import { import { pipeline, Duplex, DuplexOptions } from "stream";
pipeline,
Transform,
Writable,
WritableOptions,
Readable,
Duplex,
} from "stream";
/** /**
* Return a Readable stream of readable streams concatenated together * Return a Readable stream of readable streams concatenated together
@ -15,35 +8,59 @@ import {
// First Transform | Duplex, Last Writable --> Writable // First Transform | Duplex, Last Writable --> Writable
// //
export function compose( export function compose(
streams: Array<Readable | Duplex | Transform | Writable>, streams: Array<
options?: WritableOptions, NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream
>,
options?: DuplexOptions,
): Duplex { ): Duplex {
// Maybe just return a new stream here // Maybe just return a new stream here
if (streams.length < 2) { if (streams.length < 2) {
throw new Error("At least two streams are required to compose"); throw new Error("At least two streams are required to compose");
} }
const first = streams[0] as Writable; const composed = new Compose(streams, options);
const last = streams[streams.length - 1] as Readable;
const duplex = new Duplex({
...options,
write(chunk, enc, cb) {
if (!first.write(chunk)) {
first.once("drain", cb);
} else {
cb();
}
},
read(size) {
if (last.readable) {
this.push(last.read(size));
}
},
});
return composed;
}
class Compose extends Duplex {
private first:
| NodeJS.ReadableStream
| NodeJS.ReadWriteStream
| NodeJS.WritableStream;
private last:
| NodeJS.ReadableStream
| NodeJS.ReadWriteStream
| NodeJS.WritableStream;
constructor(
streams: Array<
| NodeJS.ReadableStream
| NodeJS.ReadWriteStream
| NodeJS.WritableStream
>,
options?: DuplexOptions,
) {
super(options);
this.first = streams[0];
this.last = streams[streams.length - 1];
pipeline(streams, (err: any) => { pipeline(streams, (err: any) => {
duplex.emit("error", err); this.emit("error", err);
}); });
}
return duplex;
public pipe<T extends NodeJS.WritableStream>(dest: T) {
return (this.last as NodeJS.ReadableStream).pipe(dest);
}
public write(chunk: any) {
return (this.first as NodeJS.WritableStream).write(chunk);
}
public on(event: string, cb: any) {
if (event === "error") {
super.on(event, cb);
}
this.last.on(event, cb);
return this;
}
} }