From 1227ce70953903d60209e43283d8a1ffba1784a8 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sat, 25 Jan 2020 12:33:09 -0500 Subject: [PATCH] Allow demux to be piped (mux) --- src/functions/demux.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 6435447..25e16bd 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -37,6 +37,7 @@ class Demux extends Writable { private streamsByKey: { [key: string]: DemuxStreams; }; + private destination: Writable; private demuxer: (chunk: any) => string; private construct: (destKey?: string) => DemuxStreams; constructor( @@ -55,8 +56,12 @@ class Demux extends Writable { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { this.streamsByKey[destKey] = await this.construct(destKey); + if (this.destination !== undefined) { + (this.streamsByKey[destKey] as any).pipe(this.destination); + } } - if (!this.streamsByKey[destKey].write(chunk, encoding)) { + const writeRes = this.streamsByKey[destKey].write(chunk, encoding); + if (!writeRes) { this.streamsByKey[destKey].once("drain", () => { cb(); }); @@ -65,6 +70,11 @@ class Demux extends Writable { } } + public pipe(dest: any) { + this.destination = dest; + return dest; + } + public on(event: string, cb: any) { switch (eventsTarget[event]) { case EventSubscription.Self: