diff --git a/package.json b/package.json index 7683ec4..f7068f3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@jogogo/mhysa", - "version": "2.0.0-alpha.3", + "version": "2.0.0-alpha.4", "description": "Streams and event emitter utils for Node.js", "keywords": [ "promise", diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 6d4d093..fdc9063 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -9,30 +9,42 @@ export interface DemuxOptions extends DuplexOptions { } export function demux( - construct: (destKey?: string, chunk?: any) => DemuxStreams, + pipelineConstructor: ( + destKey?: string, + chunk?: any, + ) => DemuxStreams | DemuxStreams[], demuxBy: string | ((chunk: any) => string), options?: DemuxOptions, ): Duplex { - return new Demux(construct, demuxBy, options); + return new Demux(pipelineConstructor, demuxBy, options); } class Demux extends Duplex { private streamsByKey: { - [key: string]: DemuxStreams; + [key: string]: DemuxStreams[]; }; private demuxer: (chunk: any) => string; - private construct: (destKey?: string, chunk?: any) => DemuxStreams; + private pipelineConstructor: ( + destKey?: string, + chunk?: any, + ) => DemuxStreams[]; private remultiplex: boolean; private transform: Transform; constructor( - construct: (destKey?: string) => DemuxStreams, + pipelineConstructor: ( + destKey?: string, + chunk?: any, + ) => DemuxStreams | DemuxStreams[], demuxBy: string | ((chunk: any) => string), options: DemuxOptions = {}, ) { super(options); this.demuxer = typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy; - this.construct = construct; + this.pipelineConstructor = (destKey: string, chunk?: any) => { + const pipeline = pipelineConstructor(destKey, chunk); + return Array.isArray(pipeline) ? pipeline : [pipeline]; + }; this.remultiplex = options.remultiplex === undefined ? true : options.remultiplex; this.streamsByKey = {}; @@ -53,43 +65,66 @@ class Demux extends Duplex { public async _write(chunk: any, encoding: any, cb: any) { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { - const newPipeline = await this.construct(destKey, chunk); - this.streamsByKey[destKey] = newPipeline; - if (this.remultiplex && isReadable(newPipeline)) { - (newPipeline as NodeJS.ReadWriteStream).pipe(this.transform); - } else if (this.remultiplex) { - console.error( - `Pipeline construct for ${destKey} does not implement readable interface`, + const newPipelines = this.pipelineConstructor(destKey, chunk); + this.streamsByKey[destKey] = newPipelines; + + newPipelines.forEach(newPipeline => { + if (this.remultiplex && isReadable(newPipeline)) { + (newPipeline as NodeJS.ReadWriteStream).pipe( + this.transform, + ); + } else if (this.remultiplex) { + console.error( + `Pipeline construct for ${destKey} does not implement readable interface`, + ); + } + }); + } + const pipelines = this.streamsByKey[destKey]; + const pendingDrains: Array> = []; + + pipelines.forEach(pipeline => { + if (!pipeline.write(chunk, encoding)) { + pendingDrains.push( + new Promise(resolve => { + pipeline.once("drain", () => { + resolve(); + }); + }), ); } - } - - if (!this.streamsByKey[destKey].write(chunk, encoding)) { - this.streamsByKey[destKey].once("drain", () => { - cb(); - }); - } else { - cb(); - } + }); + await Promise.all(pendingDrains); + cb(); } public _flush() { - const pipelines = Object.values(this.streamsByKey); - let totalEnded = 0; + const pipelines: DemuxStreams[] = [].concat.apply( + [], + Object.values(this.streamsByKey), + ); + const flushPromises: Array> = []; pipelines.forEach(pipeline => { - pipeline.once("end", () => { - totalEnded++; - if (pipelines.length === totalEnded) { - this.push(null); - this.emit("end"); - } - }); + flushPromises.push( + new Promise(resolve => { + pipeline.once("end", () => { + resolve(); + }); + }), + ); }); pipelines.forEach(pipeline => pipeline.end()); + Promise.all(flushPromises).then(() => { + this.push(null); + this.emit("end"); + }); } public _destroy(error: any, cb: (error?: any) => void) { - const pipelines = Object.values(this.streamsByKey); + const pipelines: DemuxStreams[] = [].concat.apply( + [], + Object.values(this.streamsByKey), + ); pipelines.forEach(p => (p as any).destroy()); cb(error); }