Compare commits

...

4 Commits

Author SHA1 Message Date
Jerry Kurian
3c75ef88b4 Minor changes to types 2020-05-11 21:13:03 -04:00
Jerry Kurian
7113400cb1 version 2020-05-11 10:33:17 -04:00
Jerry Kurian
a42560edfc Rename construct 2020-05-08 16:21:20 -04:00
Jerry Kurian
58a95a91d0 Demux handles arrays by key 2020-05-08 15:58:31 -04:00
2 changed files with 68 additions and 33 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@jogogo/mhysa", "name": "@jogogo/mhysa",
"version": "2.0.0-alpha.3", "version": "2.0.0-alpha.4",
"description": "Streams and event emitter utils for Node.js", "description": "Streams and event emitter utils for Node.js",
"keywords": [ "keywords": [
"promise", "promise",

View File

@ -9,30 +9,42 @@ export interface DemuxOptions extends DuplexOptions {
} }
export function demux( export function demux(
construct: (destKey?: string, chunk?: any) => DemuxStreams, pipelineConstructor: (
destKey?: string,
chunk?: any,
) => DemuxStreams | DemuxStreams[],
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options?: DemuxOptions, options?: DemuxOptions,
): Duplex { ): Duplex {
return new Demux(construct, demuxBy, options); return new Demux(pipelineConstructor, demuxBy, options);
} }
class Demux extends Duplex { class Demux extends Duplex {
private streamsByKey: { private streamsByKey: {
[key: string]: DemuxStreams; [key: string]: DemuxStreams[];
}; };
private demuxer: (chunk: any) => string; private demuxer: (chunk: any) => string;
private construct: (destKey?: string, chunk?: any) => DemuxStreams; private pipelineConstructor: (
destKey?: string,
chunk?: any,
) => DemuxStreams[];
private remultiplex: boolean; private remultiplex: boolean;
private transform: Transform; private transform: Transform;
constructor( constructor(
construct: (destKey?: string) => DemuxStreams, pipelineConstructor: (
destKey?: string,
chunk?: any,
) => DemuxStreams | DemuxStreams[],
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options: DemuxOptions = {}, options: DemuxOptions = {},
) { ) {
super(options); super(options);
this.demuxer = this.demuxer =
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy; typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
this.construct = construct; this.pipelineConstructor = (destKey: string, chunk?: any) => {
const pipeline = pipelineConstructor(destKey, chunk);
return Array.isArray(pipeline) ? pipeline : [pipeline];
};
this.remultiplex = this.remultiplex =
options.remultiplex === undefined ? true : options.remultiplex; options.remultiplex === undefined ? true : options.remultiplex;
this.streamsByKey = {}; this.streamsByKey = {};
@ -53,43 +65,66 @@ class Demux extends Duplex {
public async _write(chunk: any, encoding: any, cb: any) { public async _write(chunk: any, encoding: any, cb: any) {
const destKey = this.demuxer(chunk); const destKey = this.demuxer(chunk);
if (this.streamsByKey[destKey] === undefined) { if (this.streamsByKey[destKey] === undefined) {
const newPipeline = await this.construct(destKey, chunk); const newPipelines = this.pipelineConstructor(destKey, chunk);
this.streamsByKey[destKey] = newPipeline; this.streamsByKey[destKey] = newPipelines;
newPipelines.forEach(newPipeline => {
if (this.remultiplex && isReadable(newPipeline)) { if (this.remultiplex && isReadable(newPipeline)) {
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform); (newPipeline as NodeJS.ReadWriteStream).pipe(
this.transform,
);
} else if (this.remultiplex) { } else if (this.remultiplex) {
console.error( console.error(
`Pipeline construct for ${destKey} does not implement readable interface`, `Pipeline construct for ${destKey} does not implement readable interface`,
); );
} }
}
if (!this.streamsByKey[destKey].write(chunk, encoding)) {
this.streamsByKey[destKey].once("drain", () => {
cb();
}); });
} else {
cb();
} }
const pipelines = this.streamsByKey[destKey];
const pendingDrains: Array<Promise<any>> = [];
pipelines.forEach(pipeline => {
if (!pipeline.write(chunk, encoding)) {
pendingDrains.push(
new Promise(resolve => {
pipeline.once("drain", () => {
resolve();
});
}),
);
}
});
await Promise.all(pendingDrains);
cb();
} }
public _flush() { public _flush() {
const pipelines = Object.values(this.streamsByKey); const pipelines: DemuxStreams[] = [].concat.apply(
let totalEnded = 0; [],
Object.values(this.streamsByKey),
);
const flushPromises: Array<Promise<void>> = [];
pipelines.forEach(pipeline => { pipelines.forEach(pipeline => {
flushPromises.push(
new Promise(resolve => {
pipeline.once("end", () => { pipeline.once("end", () => {
totalEnded++; resolve();
if (pipelines.length === totalEnded) {
this.push(null);
this.emit("end");
}
}); });
}),
);
}); });
pipelines.forEach(pipeline => pipeline.end()); pipelines.forEach(pipeline => pipeline.end());
Promise.all(flushPromises).then(() => {
this.push(null);
this.emit("end");
});
} }
public _destroy(error: any, cb: (error?: any) => void) { public _destroy(error: any, cb: (error?: any) => void) {
const pipelines = Object.values(this.streamsByKey); const pipelines: DemuxStreams[] = [].concat.apply(
[],
Object.values(this.streamsByKey),
);
pipelines.forEach(p => (p as any).destroy()); pipelines.forEach(p => (p as any).destroy());
cb(error); cb(error);
} }