From bff4e0d6ed7f563e3c313fb990bc706565a09f84 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Sun, 26 Jan 2020 09:55:35 -0500 Subject: [PATCH] Add remux options --- src/functions/demux.ts | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/functions/demux.ts b/src/functions/demux.ts index cd287dd..feeb691 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -10,7 +10,7 @@ enum EventSubscription { const eventsTarget = { close: EventSubscription.Self, - data: EventSubscription.All, + data: EventSubscription.Self, drain: EventSubscription.Self, end: EventSubscription.Self, error: EventSubscription.Self, @@ -24,10 +24,14 @@ const eventsTarget = { type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; +interface DemuxOptions extends WritableOptions { + remultiplex?: DemuxStreams; +} + export function demux( construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), - options?: WritableOptions, + options?: DemuxOptions, ): Writable { return new Demux(construct, demuxBy, options); } @@ -37,18 +41,19 @@ class Demux extends Writable { private streamsByKey: { [key: string]: DemuxStreams; }; - private destination: Writable; private demuxer: (chunk: any) => string; private construct: (destKey?: string) => DemuxStreams; + private remultiplex?: DemuxStreams; constructor( construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), - options: WritableOptions = {}, + options: DemuxOptions = {}, ) { super(options); this.demuxer = typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy; this.construct = construct; + this.remultiplex = options.remultiplex; this.streamsByKey = {}; } @@ -56,8 +61,10 @@ class Demux extends Writable { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { this.streamsByKey[destKey] = await this.construct(destKey); - if (this.destination !== undefined) { - (this.streamsByKey[destKey] as any).pipe(this.destination); + if (this.remultiplex) { + (this.streamsByKey[destKey] as NodeJS.ReadWriteStream).pipe( + this.remultiplex, + ); } } @@ -70,11 +77,6 @@ class Demux extends Writable { } } - public pipe(dest: any) { - this.destination = dest; - return dest; - } - public on(event: string, cb: any) { switch (eventsTarget[event]) { case EventSubscription.Self: