Update demux

This commit is contained in:
Jerry Kurian
2020-01-27 13:07:37 -05:00
parent 2b1308a605
commit 8856cb8d3b
5 changed files with 379 additions and 148 deletions

View File

@@ -1,4 +1,6 @@
import { WritableOptions, Writable } from "stream";
import { DuplexOptions, Duplex, Transform } from "stream";
import { isReadable } from "../helpers";
enum EventSubscription {
Last = 0,
@@ -8,62 +10,62 @@ enum EventSubscription {
Unhandled,
}
const eventsTarget = {
close: EventSubscription.Self,
data: EventSubscription.All,
drain: EventSubscription.Self,
end: EventSubscription.Self,
error: EventSubscription.Self,
finish: EventSubscription.Self,
pause: EventSubscription.Self,
pipe: EventSubscription.Self,
readable: EventSubscription.Self,
resume: EventSubscription.Self,
unpipe: EventSubscription.Self,
};
type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
interface DemuxOptions extends WritableOptions {
remultiplex?: DemuxStreams;
interface DemuxOptions extends DuplexOptions {
remultiplex?: boolean;
}
export function demux(
construct: (destKey?: string) => DemuxStreams,
demuxBy: string | ((chunk: any) => string),
options?: DemuxOptions,
): Writable {
): Duplex {
return new Demux(construct, demuxBy, options);
}
// @TODO handle pipe event ie) Multiplex
class Demux extends Writable {
class Demux extends Duplex {
private streamsByKey: {
[key: string]: DemuxStreams;
};
private demuxer: (chunk: any) => string;
private construct: (destKey?: string) => DemuxStreams;
private remultiplex?: DemuxStreams;
private remultiplex: boolean;
private transform: Transform;
constructor(
construct: (destKey?: string) => DemuxStreams,
demuxBy: string | ((chunk: any) => string),
options: DemuxOptions = {},
options: DemuxOptions,
) {
super(options);
this.demuxer =
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
this.construct = construct;
this.remultiplex = options.remultiplex;
this.remultiplex =
options.remultiplex === undefined ? true : options.remultiplex;
this.streamsByKey = {};
this.transform = new Transform({
...options,
transform: (d, _, cb) => {
this.push(d);
cb(null, d);
},
});
this.once("unpipe", () => this._flush());
}
public _read(size: number) {}
public async _write(chunk: any, encoding: any, cb: any) {
const destKey = this.demuxer(chunk);
if (this.streamsByKey[destKey] === undefined) {
this.streamsByKey[destKey] = await this.construct(destKey);
if (this.remultiplex) {
(this.streamsByKey[destKey] as NodeJS.ReadWriteStream).pipe(
this.remultiplex,
const newPipeline = await this.construct(destKey);
this.streamsByKey[destKey] = newPipeline;
if (this.remultiplex && isReadable(newPipeline)) {
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
} else if (this.remultiplex) {
console.error(
`Pipeline construct for ${destKey} does not implement readable interface`,
);
}
}
@@ -77,35 +79,24 @@ class Demux extends Writable {
}
}
public on(event: string, cb: any) {
switch (eventsTarget[event]) {
case EventSubscription.Self:
super.on(event, cb);
break;
case EventSubscription.All:
Object.keys(this.streamsByKey).forEach(key =>
this.streamsByKey[key].on(event, cb),
);
break;
default:
super.on(event, cb);
}
return this;
public _flush() {
const pipelines = Object.values(this.streamsByKey);
let totalEnded = 0;
pipelines.forEach(pipeline => {
pipeline.once("end", () => {
totalEnded++;
if (pipelines.length === totalEnded) {
this.push(null);
this.emit("finished");
}
});
});
pipelines.forEach(pipeline => pipeline.end());
}
public once(event: string, cb: any) {
switch (eventsTarget[event]) {
case EventSubscription.Self:
super.once(event, cb);
break;
case EventSubscription.All:
Object.keys(this.streamsByKey).forEach(key =>
this.streamsByKey[key].once(event, cb),
);
break;
default:
super.once(event, cb);
}
return this;
public _destroy(error: any, cb: (error?: any) => void) {
const pipelines = Object.values(this.streamsByKey);
pipelines.forEach(p => (p as any).destroy());
cb(error);
}
}