Purge idle pipelines
This commit is contained in:
@@ -862,3 +862,42 @@ test.cb("demux() should be 'destroyable'", t => {
|
||||
fakeSource.push(input[3]);
|
||||
fakeSource.push(input[4]);
|
||||
});
|
||||
|
||||
test.cb("Should delete idle pipelines", t => {
|
||||
t.plan(6);
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "d", visited: [] },
|
||||
];
|
||||
const construct = sinon.spy((destKey: string) => {
|
||||
const dest = map((chunk: Test) => {
|
||||
chunk.visited.push(1);
|
||||
return chunk;
|
||||
});
|
||||
t.pass();
|
||||
|
||||
return dest;
|
||||
});
|
||||
|
||||
const demuxed = demux(construct, "key", {
|
||||
maxIdleTime: 110,
|
||||
purgeIdleInterval: 110,
|
||||
});
|
||||
|
||||
demuxed.on("data", data => {
|
||||
if (data.key === "d") t.end();
|
||||
});
|
||||
|
||||
for (let i = 0; i < input.length; i++) {
|
||||
setTimeout(() => {
|
||||
demuxed.write(input[i]);
|
||||
}, i * 100);
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user