Compare commits
3 Commits
master
...
feature/de
Author | SHA1 | Date | |
---|---|---|---|
|
46d118d4a5 | ||
|
5dd856deed | ||
|
d0a9d35fe7 |
@ -6,6 +6,8 @@ type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
|
||||
|
||||
export interface DemuxOptions extends DuplexOptions {
|
||||
remultiplex?: boolean;
|
||||
purgeIdleInterval?: number;
|
||||
maxIdleTime?: number;
|
||||
}
|
||||
|
||||
export function demux(
|
||||
@ -18,12 +20,13 @@ export function demux(
|
||||
|
||||
class Demux extends Duplex {
|
||||
private streamsByKey: {
|
||||
[key: string]: DemuxStreams;
|
||||
[key: string]: { stream: DemuxStreams; lastWrite: number };
|
||||
};
|
||||
private demuxer: (chunk: any) => string;
|
||||
private construct: (destKey?: string, chunk?: any) => DemuxStreams;
|
||||
private remultiplex: boolean;
|
||||
private transform: Transform;
|
||||
private maxIdleTime: number;
|
||||
constructor(
|
||||
construct: (destKey?: string) => DemuxStreams,
|
||||
demuxBy: string | ((chunk: any) => string),
|
||||
@ -43,10 +46,25 @@ class Demux extends Duplex {
|
||||
cb(null);
|
||||
},
|
||||
});
|
||||
this.maxIdleTime = options.maxIdleTime || 600000;
|
||||
const purgeIdleInterval = options.purgeIdleInterval || 600000;
|
||||
setInterval(() => {
|
||||
this._destroyIdle();
|
||||
}, purgeIdleInterval);
|
||||
|
||||
this.on("unpipe", () => this._flush());
|
||||
}
|
||||
|
||||
private _destroyIdle() {
|
||||
for (let key in this.streamsByKey) {
|
||||
const curTime = Date.now();
|
||||
const pipeline = this.streamsByKey[key];
|
||||
|
||||
if (curTime - pipeline.lastWrite > this.maxIdleTime) {
|
||||
delete this.streamsByKey[key];
|
||||
}
|
||||
}
|
||||
}
|
||||
// tslint:disable-next-line
|
||||
public _read(size: number) {}
|
||||
|
||||
@ -54,7 +72,10 @@ class Demux extends Duplex {
|
||||
const destKey = this.demuxer(chunk);
|
||||
if (this.streamsByKey[destKey] === undefined) {
|
||||
const newPipeline = await this.construct(destKey, chunk);
|
||||
this.streamsByKey[destKey] = newPipeline;
|
||||
this.streamsByKey[destKey] = {
|
||||
stream: newPipeline,
|
||||
lastWrite: Date.now(),
|
||||
};
|
||||
if (this.remultiplex && isReadable(newPipeline)) {
|
||||
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
|
||||
} else if (this.remultiplex) {
|
||||
@ -62,10 +83,12 @@ class Demux extends Duplex {
|
||||
`Pipeline construct for ${destKey} does not implement readable interface`,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
this.streamsByKey[destKey].lastWrite = Date.now();
|
||||
}
|
||||
|
||||
if (!this.streamsByKey[destKey].write(chunk, encoding)) {
|
||||
this.streamsByKey[destKey].once("drain", () => {
|
||||
if (!this.streamsByKey[destKey].stream.write(chunk, encoding)) {
|
||||
this.streamsByKey[destKey].stream.once("drain", () => {
|
||||
cb();
|
||||
});
|
||||
} else {
|
||||
@ -76,8 +99,8 @@ class Demux extends Duplex {
|
||||
public _flush() {
|
||||
const pipelines = Object.values(this.streamsByKey);
|
||||
let totalEnded = 0;
|
||||
pipelines.forEach(pipeline => {
|
||||
pipeline.once("end", () => {
|
||||
pipelines.forEach(({ stream }) => {
|
||||
stream.once("end", () => {
|
||||
totalEnded++;
|
||||
if (pipelines.length === totalEnded) {
|
||||
this.push(null);
|
||||
@ -85,12 +108,12 @@ class Demux extends Duplex {
|
||||
}
|
||||
});
|
||||
});
|
||||
pipelines.forEach(pipeline => pipeline.end());
|
||||
pipelines.forEach(({ stream }) => stream.end());
|
||||
}
|
||||
|
||||
public _destroy(error: any, cb: (error?: any) => void) {
|
||||
const pipelines = Object.values(this.streamsByKey);
|
||||
pipelines.forEach(p => (p as any).destroy());
|
||||
pipelines.forEach(({ stream }) => (stream as any).destroy());
|
||||
cb(error);
|
||||
}
|
||||
}
|
||||
|
@ -256,11 +256,14 @@ export default function mhysa(defaultOptions?: TransformOptions) {
|
||||
|
||||
/**
|
||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||
* @param demuxBy
|
||||
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||
* @param options Writable stream options
|
||||
* @param {Function} construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||
* @param {String | Function} demuxBy
|
||||
* @param {string} demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||
* @param {Function} demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||
* @param {Object} options Demux stream options
|
||||
* @param {boolean} options.remultiplex? If demux should be remultiplexed into a single destination
|
||||
* @param {number} options.purgeIdleInterval? Interval at which a purge for idle pipelines will occur
|
||||
* @param {number} options.maxIdleTime? Min time a demuxed pipeline must be idle for to be purged
|
||||
*/
|
||||
demux: withDefaultOptions(2, demux),
|
||||
};
|
||||
|
@ -862,3 +862,39 @@ test.cb("demux() should be 'destroyable'", t => {
|
||||
fakeSource.push(input[3]);
|
||||
fakeSource.push(input[4]);
|
||||
});
|
||||
|
||||
test.cb("Should delete idle pipelines", t => {
|
||||
t.plan(5);
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
];
|
||||
const construct = sinon.spy((destKey: string) => {
|
||||
const dest = map((chunk: Test) => {
|
||||
chunk.visited.push(1);
|
||||
return chunk;
|
||||
});
|
||||
t.pass();
|
||||
|
||||
return dest;
|
||||
});
|
||||
|
||||
const demuxed = demux(construct, "key", {
|
||||
maxIdleTime: 110,
|
||||
purgeIdleInterval: 110,
|
||||
});
|
||||
|
||||
demuxed.on("data", data => {
|
||||
if (data.key === "c") t.end();
|
||||
});
|
||||
|
||||
for (let i = 0; i < input.length; i++) {
|
||||
setTimeout(() => {
|
||||
demuxed.write(input[i]);
|
||||
}, i * 100);
|
||||
}
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user