diff --git a/src/functions/compose.ts b/src/functions/compose.ts index 96008ea..f53aa6c 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -1,11 +1,4 @@ -import { - pipeline, - Transform, - Writable, - WritableOptions, - Readable, - Duplex, -} from "stream"; +import { pipeline, Duplex, DuplexOptions } from "stream"; /** * Return a Readable stream of readable streams concatenated together @@ -15,35 +8,59 @@ import { // First Transform | Duplex, Last Writable --> Writable // export function compose( - streams: Array, - options?: WritableOptions, + streams: Array< + NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream + >, + options?: DuplexOptions, ): Duplex { // Maybe just return a new stream here if (streams.length < 2) { throw new Error("At least two streams are required to compose"); } - const first = streams[0] as Writable; - const last = streams[streams.length - 1] as Readable; - const duplex = new Duplex({ - ...options, - write(chunk, enc, cb) { - if (!first.write(chunk)) { - first.once("drain", cb); - } else { - cb(); - } - }, - read(size) { - if (last.readable) { - this.push(last.read(size)); - } - }, - }); + const composed = new Compose(streams, options); - pipeline(streams, (err: any) => { - duplex.emit("error", err); - }); - - return duplex; + return composed; +} + +class Compose extends Duplex { + private first: + | NodeJS.ReadableStream + | NodeJS.ReadWriteStream + | NodeJS.WritableStream; + private last: + | NodeJS.ReadableStream + | NodeJS.ReadWriteStream + | NodeJS.WritableStream; + constructor( + streams: Array< + | NodeJS.ReadableStream + | NodeJS.ReadWriteStream + | NodeJS.WritableStream + >, + options?: DuplexOptions, + ) { + super(options); + this.first = streams[0]; + this.last = streams[streams.length - 1]; + pipeline(streams, (err: any) => { + this.emit("error", err); + }); + } + + public pipe(dest: T) { + return (this.last as NodeJS.ReadableStream).pipe(dest); + } + + public write(chunk: any) { + return (this.first as NodeJS.WritableStream).write(chunk); + } + + public on(event: string, cb: any) { + if (event === "error") { + super.on(event, cb); + } + this.last.on(event, cb); + return this; + } }