Allow demux to be piped (mux)

This commit is contained in:
Jerry Kurian 2020-01-25 12:33:09 -05:00
parent 179d526c6c
commit 1227ce7095

View File

@ -37,6 +37,7 @@ class Demux extends Writable {
private streamsByKey: { private streamsByKey: {
[key: string]: DemuxStreams; [key: string]: DemuxStreams;
}; };
private destination: Writable;
private demuxer: (chunk: any) => string; private demuxer: (chunk: any) => string;
private construct: (destKey?: string) => DemuxStreams; private construct: (destKey?: string) => DemuxStreams;
constructor( constructor(
@ -55,8 +56,12 @@ class Demux extends Writable {
const destKey = this.demuxer(chunk); const destKey = this.demuxer(chunk);
if (this.streamsByKey[destKey] === undefined) { if (this.streamsByKey[destKey] === undefined) {
this.streamsByKey[destKey] = await this.construct(destKey); this.streamsByKey[destKey] = await this.construct(destKey);
if (this.destination !== undefined) {
(this.streamsByKey[destKey] as any).pipe(this.destination);
}
} }
if (!this.streamsByKey[destKey].write(chunk, encoding)) { const writeRes = this.streamsByKey[destKey].write(chunk, encoding);
if (!writeRes) {
this.streamsByKey[destKey].once("drain", () => { this.streamsByKey[destKey].once("drain", () => {
cb(); cb();
}); });
@ -65,6 +70,11 @@ class Demux extends Writable {
} }
} }
public pipe(dest: any) {
this.destination = dest;
return dest;
}
public on(event: string, cb: any) { public on(event: string, cb: any) {
switch (eventsTarget[event]) { switch (eventsTarget[event]) {
case EventSubscription.Self: case EventSubscription.Self: