Compare commits
3 Commits
master
...
feature/de
Author | SHA1 | Date | |
---|---|---|---|
|
46d118d4a5 | ||
|
5dd856deed | ||
|
d0a9d35fe7 |
@ -6,6 +6,8 @@ type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
|
|||||||
|
|
||||||
export interface DemuxOptions extends DuplexOptions {
|
export interface DemuxOptions extends DuplexOptions {
|
||||||
remultiplex?: boolean;
|
remultiplex?: boolean;
|
||||||
|
purgeIdleInterval?: number;
|
||||||
|
maxIdleTime?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function demux(
|
export function demux(
|
||||||
@ -18,12 +20,13 @@ export function demux(
|
|||||||
|
|
||||||
class Demux extends Duplex {
|
class Demux extends Duplex {
|
||||||
private streamsByKey: {
|
private streamsByKey: {
|
||||||
[key: string]: DemuxStreams;
|
[key: string]: { stream: DemuxStreams; lastWrite: number };
|
||||||
};
|
};
|
||||||
private demuxer: (chunk: any) => string;
|
private demuxer: (chunk: any) => string;
|
||||||
private construct: (destKey?: string, chunk?: any) => DemuxStreams;
|
private construct: (destKey?: string, chunk?: any) => DemuxStreams;
|
||||||
private remultiplex: boolean;
|
private remultiplex: boolean;
|
||||||
private transform: Transform;
|
private transform: Transform;
|
||||||
|
private maxIdleTime: number;
|
||||||
constructor(
|
constructor(
|
||||||
construct: (destKey?: string) => DemuxStreams,
|
construct: (destKey?: string) => DemuxStreams,
|
||||||
demuxBy: string | ((chunk: any) => string),
|
demuxBy: string | ((chunk: any) => string),
|
||||||
@ -43,10 +46,25 @@ class Demux extends Duplex {
|
|||||||
cb(null);
|
cb(null);
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
this.maxIdleTime = options.maxIdleTime || 600000;
|
||||||
|
const purgeIdleInterval = options.purgeIdleInterval || 600000;
|
||||||
|
setInterval(() => {
|
||||||
|
this._destroyIdle();
|
||||||
|
}, purgeIdleInterval);
|
||||||
|
|
||||||
this.on("unpipe", () => this._flush());
|
this.on("unpipe", () => this._flush());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _destroyIdle() {
|
||||||
|
for (let key in this.streamsByKey) {
|
||||||
|
const curTime = Date.now();
|
||||||
|
const pipeline = this.streamsByKey[key];
|
||||||
|
|
||||||
|
if (curTime - pipeline.lastWrite > this.maxIdleTime) {
|
||||||
|
delete this.streamsByKey[key];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// tslint:disable-next-line
|
// tslint:disable-next-line
|
||||||
public _read(size: number) {}
|
public _read(size: number) {}
|
||||||
|
|
||||||
@ -54,7 +72,10 @@ class Demux extends Duplex {
|
|||||||
const destKey = this.demuxer(chunk);
|
const destKey = this.demuxer(chunk);
|
||||||
if (this.streamsByKey[destKey] === undefined) {
|
if (this.streamsByKey[destKey] === undefined) {
|
||||||
const newPipeline = await this.construct(destKey, chunk);
|
const newPipeline = await this.construct(destKey, chunk);
|
||||||
this.streamsByKey[destKey] = newPipeline;
|
this.streamsByKey[destKey] = {
|
||||||
|
stream: newPipeline,
|
||||||
|
lastWrite: Date.now(),
|
||||||
|
};
|
||||||
if (this.remultiplex && isReadable(newPipeline)) {
|
if (this.remultiplex && isReadable(newPipeline)) {
|
||||||
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
|
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
|
||||||
} else if (this.remultiplex) {
|
} else if (this.remultiplex) {
|
||||||
@ -62,10 +83,12 @@ class Demux extends Duplex {
|
|||||||
`Pipeline construct for ${destKey} does not implement readable interface`,
|
`Pipeline construct for ${destKey} does not implement readable interface`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
this.streamsByKey[destKey].lastWrite = Date.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!this.streamsByKey[destKey].write(chunk, encoding)) {
|
if (!this.streamsByKey[destKey].stream.write(chunk, encoding)) {
|
||||||
this.streamsByKey[destKey].once("drain", () => {
|
this.streamsByKey[destKey].stream.once("drain", () => {
|
||||||
cb();
|
cb();
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
@ -76,8 +99,8 @@ class Demux extends Duplex {
|
|||||||
public _flush() {
|
public _flush() {
|
||||||
const pipelines = Object.values(this.streamsByKey);
|
const pipelines = Object.values(this.streamsByKey);
|
||||||
let totalEnded = 0;
|
let totalEnded = 0;
|
||||||
pipelines.forEach(pipeline => {
|
pipelines.forEach(({ stream }) => {
|
||||||
pipeline.once("end", () => {
|
stream.once("end", () => {
|
||||||
totalEnded++;
|
totalEnded++;
|
||||||
if (pipelines.length === totalEnded) {
|
if (pipelines.length === totalEnded) {
|
||||||
this.push(null);
|
this.push(null);
|
||||||
@ -85,12 +108,12 @@ class Demux extends Duplex {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
pipelines.forEach(pipeline => pipeline.end());
|
pipelines.forEach(({ stream }) => stream.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
public _destroy(error: any, cb: (error?: any) => void) {
|
public _destroy(error: any, cb: (error?: any) => void) {
|
||||||
const pipelines = Object.values(this.streamsByKey);
|
const pipelines = Object.values(this.streamsByKey);
|
||||||
pipelines.forEach(p => (p as any).destroy());
|
pipelines.forEach(({ stream }) => (stream as any).destroy());
|
||||||
cb(error);
|
cb(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -256,11 +256,14 @@ export default function mhysa(defaultOptions?: TransformOptions) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||||
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
* @param {Function} construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||||
* @param demuxBy
|
* @param {String | Function} demuxBy
|
||||||
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
* @param {string} demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||||
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
* @param {Function} demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||||
* @param options Writable stream options
|
* @param {Object} options Demux stream options
|
||||||
|
* @param {boolean} options.remultiplex? If demux should be remultiplexed into a single destination
|
||||||
|
* @param {number} options.purgeIdleInterval? Interval at which a purge for idle pipelines will occur
|
||||||
|
* @param {number} options.maxIdleTime? Min time a demuxed pipeline must be idle for to be purged
|
||||||
*/
|
*/
|
||||||
demux: withDefaultOptions(2, demux),
|
demux: withDefaultOptions(2, demux),
|
||||||
};
|
};
|
||||||
|
@ -862,3 +862,39 @@ test.cb("demux() should be 'destroyable'", t => {
|
|||||||
fakeSource.push(input[3]);
|
fakeSource.push(input[3]);
|
||||||
fakeSource.push(input[4]);
|
fakeSource.push(input[4]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test.cb("Should delete idle pipelines", t => {
|
||||||
|
t.plan(5);
|
||||||
|
const input = [
|
||||||
|
{ key: "a", visited: [] },
|
||||||
|
{ key: "b", visited: [] },
|
||||||
|
{ key: "b", visited: [] },
|
||||||
|
{ key: "a", visited: [] },
|
||||||
|
{ key: "b", visited: [] },
|
||||||
|
{ key: "c", visited: [] },
|
||||||
|
];
|
||||||
|
const construct = sinon.spy((destKey: string) => {
|
||||||
|
const dest = map((chunk: Test) => {
|
||||||
|
chunk.visited.push(1);
|
||||||
|
return chunk;
|
||||||
|
});
|
||||||
|
t.pass();
|
||||||
|
|
||||||
|
return dest;
|
||||||
|
});
|
||||||
|
|
||||||
|
const demuxed = demux(construct, "key", {
|
||||||
|
maxIdleTime: 110,
|
||||||
|
purgeIdleInterval: 110,
|
||||||
|
});
|
||||||
|
|
||||||
|
demuxed.on("data", data => {
|
||||||
|
if (data.key === "c") t.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
for (let i = 0; i < input.length; i++) {
|
||||||
|
setTimeout(() => {
|
||||||
|
demuxed.write(input[i]);
|
||||||
|
}, i * 100);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user