3 Commits

Author SHA1 Message Date
Jerry Kurian
46d118d4a5 Update docs 2020-04-27 14:28:55 -04:00
Jerry Kurian
5dd856deed Update test 2020-04-27 14:23:58 -04:00
Jerry Kurian
d0a9d35fe7 Purge idle pipelines 2020-04-27 14:20:07 -04:00
4 changed files with 103 additions and 76 deletions

View File

@@ -1,6 +1,6 @@
{ {
"name": "@jogogo/mhysa", "name": "@jogogo/mhysa",
"version": "2.0.0-alpha.4", "version": "2.0.0-alpha.3",
"description": "Streams and event emitter utils for Node.js", "description": "Streams and event emitter utils for Node.js",
"keywords": [ "keywords": [
"promise", "promise",

View File

@@ -6,45 +6,36 @@ type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
export interface DemuxOptions extends DuplexOptions { export interface DemuxOptions extends DuplexOptions {
remultiplex?: boolean; remultiplex?: boolean;
purgeIdleInterval?: number;
maxIdleTime?: number;
} }
export function demux( export function demux(
pipelineConstructor: ( construct: (destKey?: string, chunk?: any) => DemuxStreams,
destKey?: string,
chunk?: any,
) => DemuxStreams | DemuxStreams[],
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options?: DemuxOptions, options?: DemuxOptions,
): Duplex { ): Duplex {
return new Demux(pipelineConstructor, demuxBy, options); return new Demux(construct, demuxBy, options);
} }
class Demux extends Duplex { class Demux extends Duplex {
private streamsByKey: { private streamsByKey: {
[key: string]: DemuxStreams[]; [key: string]: { stream: DemuxStreams; lastWrite: number };
}; };
private demuxer: (chunk: any) => string; private demuxer: (chunk: any) => string;
private pipelineConstructor: ( private construct: (destKey?: string, chunk?: any) => DemuxStreams;
destKey?: string,
chunk?: any,
) => DemuxStreams[];
private remultiplex: boolean; private remultiplex: boolean;
private transform: Transform; private transform: Transform;
private maxIdleTime: number;
constructor( constructor(
pipelineConstructor: ( construct: (destKey?: string) => DemuxStreams,
destKey?: string,
chunk?: any,
) => DemuxStreams | DemuxStreams[],
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options: DemuxOptions = {}, options: DemuxOptions = {},
) { ) {
super(options); super(options);
this.demuxer = this.demuxer =
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy; typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
this.pipelineConstructor = (destKey: string, chunk?: any) => { this.construct = construct;
const pipeline = pipelineConstructor(destKey, chunk);
return Array.isArray(pipeline) ? pipeline : [pipeline];
};
this.remultiplex = this.remultiplex =
options.remultiplex === undefined ? true : options.remultiplex; options.remultiplex === undefined ? true : options.remultiplex;
this.streamsByKey = {}; this.streamsByKey = {};
@@ -55,77 +46,74 @@ class Demux extends Duplex {
cb(null); cb(null);
}, },
}); });
this.maxIdleTime = options.maxIdleTime || 600000;
const purgeIdleInterval = options.purgeIdleInterval || 600000;
setInterval(() => {
this._destroyIdle();
}, purgeIdleInterval);
this.on("unpipe", () => this._flush()); this.on("unpipe", () => this._flush());
} }
private _destroyIdle() {
for (let key in this.streamsByKey) {
const curTime = Date.now();
const pipeline = this.streamsByKey[key];
if (curTime - pipeline.lastWrite > this.maxIdleTime) {
delete this.streamsByKey[key];
}
}
}
// tslint:disable-next-line // tslint:disable-next-line
public _read(size: number) {} public _read(size: number) {}
public async _write(chunk: any, encoding: any, cb: any) { public async _write(chunk: any, encoding: any, cb: any) {
const destKey = this.demuxer(chunk); const destKey = this.demuxer(chunk);
if (this.streamsByKey[destKey] === undefined) { if (this.streamsByKey[destKey] === undefined) {
const newPipelines = this.pipelineConstructor(destKey, chunk); const newPipeline = await this.construct(destKey, chunk);
this.streamsByKey[destKey] = newPipelines; this.streamsByKey[destKey] = {
stream: newPipeline,
newPipelines.forEach(newPipeline => { lastWrite: Date.now(),
};
if (this.remultiplex && isReadable(newPipeline)) { if (this.remultiplex && isReadable(newPipeline)) {
(newPipeline as NodeJS.ReadWriteStream).pipe( (newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
this.transform,
);
} else if (this.remultiplex) { } else if (this.remultiplex) {
console.error( console.error(
`Pipeline construct for ${destKey} does not implement readable interface`, `Pipeline construct for ${destKey} does not implement readable interface`,
); );
} }
}); } else {
this.streamsByKey[destKey].lastWrite = Date.now();
} }
const pipelines = this.streamsByKey[destKey];
const pendingDrains: Array<Promise<any>> = [];
pipelines.forEach(pipeline => { if (!this.streamsByKey[destKey].stream.write(chunk, encoding)) {
if (!pipeline.write(chunk, encoding)) { this.streamsByKey[destKey].stream.once("drain", () => {
pendingDrains.push(
new Promise(resolve => {
pipeline.once("drain", () => {
resolve();
});
}),
);
}
});
await Promise.all(pendingDrains);
cb(); cb();
});
} else {
cb();
}
} }
public _flush() { public _flush() {
const pipelines: DemuxStreams[] = [].concat.apply( const pipelines = Object.values(this.streamsByKey);
[], let totalEnded = 0;
Object.values(this.streamsByKey), pipelines.forEach(({ stream }) => {
); stream.once("end", () => {
const flushPromises: Array<Promise<void>> = []; totalEnded++;
pipelines.forEach(pipeline => { if (pipelines.length === totalEnded) {
flushPromises.push(
new Promise(resolve => {
pipeline.once("end", () => {
resolve();
});
}),
);
});
pipelines.forEach(pipeline => pipeline.end());
Promise.all(flushPromises).then(() => {
this.push(null); this.push(null);
this.emit("end"); this.emit("end");
}
}); });
});
pipelines.forEach(({ stream }) => stream.end());
} }
public _destroy(error: any, cb: (error?: any) => void) { public _destroy(error: any, cb: (error?: any) => void) {
const pipelines: DemuxStreams[] = [].concat.apply( const pipelines = Object.values(this.streamsByKey);
[], pipelines.forEach(({ stream }) => (stream as any).destroy());
Object.values(this.streamsByKey),
);
pipelines.forEach(p => (p as any).destroy());
cb(error); cb(error);
} }
} }

View File

@@ -256,11 +256,14 @@ export default function mhysa(defaultOptions?: TransformOptions) {
/** /**
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream. * @param {Function} construct Constructor for new output source. Should return a Writable or ReadWrite stream.
* @param demuxBy * @param {String | Function} demuxBy
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source. * @param {string} demuxBy.key? Key to fetch value from source chunks to demultiplex source.
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source. * @param {Function} demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
* @param options Writable stream options * @param {Object} options Demux stream options
* @param {boolean} options.remultiplex? If demux should be remultiplexed into a single destination
* @param {number} options.purgeIdleInterval? Interval at which a purge for idle pipelines will occur
* @param {number} options.maxIdleTime? Min time a demuxed pipeline must be idle for to be purged
*/ */
demux: withDefaultOptions(2, demux), demux: withDefaultOptions(2, demux),
}; };

View File

@@ -862,3 +862,39 @@ test.cb("demux() should be 'destroyable'", t => {
fakeSource.push(input[3]); fakeSource.push(input[3]);
fakeSource.push(input[4]); fakeSource.push(input[4]);
}); });
test.cb("Should delete idle pipelines", t => {
t.plan(5);
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "c", visited: [] },
];
const construct = sinon.spy((destKey: string) => {
const dest = map((chunk: Test) => {
chunk.visited.push(1);
return chunk;
});
t.pass();
return dest;
});
const demuxed = demux(construct, "key", {
maxIdleTime: 110,
purgeIdleInterval: 110,
});
demuxed.on("data", data => {
if (data.key === "c") t.end();
});
for (let i = 0; i < input.length; i++) {
setTimeout(() => {
demuxed.write(input[i]);
}, i * 100);
}
});