Add remux options

This commit is contained in:
Jerry Kurian 2020-01-26 09:55:35 -05:00
parent bd178ce2f0
commit bff4e0d6ed

View File

@ -10,7 +10,7 @@ enum EventSubscription {
const eventsTarget = { const eventsTarget = {
close: EventSubscription.Self, close: EventSubscription.Self,
data: EventSubscription.All, data: EventSubscription.Self,
drain: EventSubscription.Self, drain: EventSubscription.Self,
end: EventSubscription.Self, end: EventSubscription.Self,
error: EventSubscription.Self, error: EventSubscription.Self,
@ -24,10 +24,14 @@ const eventsTarget = {
type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
interface DemuxOptions extends WritableOptions {
remultiplex?: DemuxStreams;
}
export function demux( export function demux(
construct: (destKey?: string) => DemuxStreams, construct: (destKey?: string) => DemuxStreams,
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options?: WritableOptions, options?: DemuxOptions,
): Writable { ): Writable {
return new Demux(construct, demuxBy, options); return new Demux(construct, demuxBy, options);
} }
@ -37,18 +41,19 @@ class Demux extends Writable {
private streamsByKey: { private streamsByKey: {
[key: string]: DemuxStreams; [key: string]: DemuxStreams;
}; };
private destination: Writable;
private demuxer: (chunk: any) => string; private demuxer: (chunk: any) => string;
private construct: (destKey?: string) => DemuxStreams; private construct: (destKey?: string) => DemuxStreams;
private remultiplex?: DemuxStreams;
constructor( constructor(
construct: (destKey?: string) => DemuxStreams, construct: (destKey?: string) => DemuxStreams,
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options: WritableOptions = {}, options: DemuxOptions = {},
) { ) {
super(options); super(options);
this.demuxer = this.demuxer =
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy; typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
this.construct = construct; this.construct = construct;
this.remultiplex = options.remultiplex;
this.streamsByKey = {}; this.streamsByKey = {};
} }
@ -56,8 +61,10 @@ class Demux extends Writable {
const destKey = this.demuxer(chunk); const destKey = this.demuxer(chunk);
if (this.streamsByKey[destKey] === undefined) { if (this.streamsByKey[destKey] === undefined) {
this.streamsByKey[destKey] = await this.construct(destKey); this.streamsByKey[destKey] = await this.construct(destKey);
if (this.destination !== undefined) { if (this.remultiplex) {
(this.streamsByKey[destKey] as any).pipe(this.destination); (this.streamsByKey[destKey] as NodeJS.ReadWriteStream).pipe(
this.remultiplex,
);
} }
} }
@ -70,11 +77,6 @@ class Demux extends Writable {
} }
} }
public pipe(dest: any) {
this.destination = dest;
return dest;
}
public on(event: string, cb: any) { public on(event: string, cb: any) {
switch (eventsTarget[event]) { switch (eventsTarget[event]) {
case EventSubscription.Self: case EventSubscription.Self: