diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 6d4d093..b6a94fc 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -6,6 +6,8 @@ type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; export interface DemuxOptions extends DuplexOptions { remultiplex?: boolean; + purgeIdleInterval?: number; + maxIdleTime?: number; } export function demux( @@ -18,12 +20,13 @@ export function demux( class Demux extends Duplex { private streamsByKey: { - [key: string]: DemuxStreams; + [key: string]: { stream: DemuxStreams; lastWrite: number }; }; private demuxer: (chunk: any) => string; private construct: (destKey?: string, chunk?: any) => DemuxStreams; private remultiplex: boolean; private transform: Transform; + private maxIdleTime: number; constructor( construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), @@ -43,10 +46,25 @@ class Demux extends Duplex { cb(null); }, }); + this.maxIdleTime = options.maxIdleTime || 600000; + const purgeIdleInterval = options.purgeIdleInterval || 600000; + setInterval(() => { + this._destroyIdle(); + }, purgeIdleInterval); this.on("unpipe", () => this._flush()); } + private _destroyIdle() { + for (let key in this.streamsByKey) { + const curTime = Date.now(); + const pipeline = this.streamsByKey[key]; + + if (curTime - pipeline.lastWrite > this.maxIdleTime) { + delete this.streamsByKey[key]; + } + } + } // tslint:disable-next-line public _read(size: number) {} @@ -54,7 +72,10 @@ class Demux extends Duplex { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { const newPipeline = await this.construct(destKey, chunk); - this.streamsByKey[destKey] = newPipeline; + this.streamsByKey[destKey] = { + stream: newPipeline, + lastWrite: Date.now(), + }; if (this.remultiplex && isReadable(newPipeline)) { (newPipeline as NodeJS.ReadWriteStream).pipe(this.transform); } else if (this.remultiplex) { @@ -62,10 +83,12 @@ class Demux extends Duplex { `Pipeline construct for ${destKey} does not implement readable interface`, ); } + } else { + this.streamsByKey[destKey].lastWrite = Date.now(); } - if (!this.streamsByKey[destKey].write(chunk, encoding)) { - this.streamsByKey[destKey].once("drain", () => { + if (!this.streamsByKey[destKey].stream.write(chunk, encoding)) { + this.streamsByKey[destKey].stream.once("drain", () => { cb(); }); } else { @@ -76,8 +99,8 @@ class Demux extends Duplex { public _flush() { const pipelines = Object.values(this.streamsByKey); let totalEnded = 0; - pipelines.forEach(pipeline => { - pipeline.once("end", () => { + pipelines.forEach(({ stream }) => { + stream.once("end", () => { totalEnded++; if (pipelines.length === totalEnded) { this.push(null); @@ -85,12 +108,12 @@ class Demux extends Duplex { } }); }); - pipelines.forEach(pipeline => pipeline.end()); + pipelines.forEach(({ stream }) => stream.end()); } public _destroy(error: any, cb: (error?: any) => void) { const pipelines = Object.values(this.streamsByKey); - pipelines.forEach(p => (p as any).destroy()); + pipelines.forEach(({ stream }) => (stream as any).destroy()); cb(error); } } diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index b8c8a5f..1fd4f22 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -862,3 +862,42 @@ test.cb("demux() should be 'destroyable'", t => { fakeSource.push(input[3]); fakeSource.push(input[4]); }); + +test.cb("Should delete idle pipelines", t => { + t.plan(6); + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "c", visited: [] }, + { key: "c", visited: [] }, + { key: "c", visited: [] }, + { key: "b", visited: [] }, + { key: "d", visited: [] }, + ]; + const construct = sinon.spy((destKey: string) => { + const dest = map((chunk: Test) => { + chunk.visited.push(1); + return chunk; + }); + t.pass(); + + return dest; + }); + + const demuxed = demux(construct, "key", { + maxIdleTime: 110, + purgeIdleInterval: 110, + }); + + demuxed.on("data", data => { + if (data.key === "d") t.end(); + }); + + for (let i = 0; i < input.length; i++) { + setTimeout(() => { + demuxed.write(input[i]); + }, i * 100); + } +});