Add test for drain events
This commit is contained in:
parent
f06cb1c33e
commit
ce19c5e987
@ -172,6 +172,75 @@ test.cb("demux() should send input through correct pipeline using keyBy", t => {
|
|||||||
demuxed.end();
|
demuxed.end();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("demux() should return false after if it has >= highWaterMark items buffered and drain should be emitted", t => {
|
||||||
|
return new Promise(async (resolve, reject) => {
|
||||||
|
t.plan(7);
|
||||||
|
interface Chunk {
|
||||||
|
key: string;
|
||||||
|
mapped: number[];
|
||||||
|
}
|
||||||
|
const input: Chunk[] = [
|
||||||
|
{ key: "a", mapped: [] },
|
||||||
|
{ key: "a", mapped: [] },
|
||||||
|
{ key: "a", mapped: [] },
|
||||||
|
{ key: "a", mapped: [] },
|
||||||
|
{ key: "a", mapped: [] },
|
||||||
|
{ key: "a", mapped: [] },
|
||||||
|
];
|
||||||
|
let pendingReads = input.length;
|
||||||
|
const highWaterMark = 5;
|
||||||
|
const slowProcessorSpeed = 25;
|
||||||
|
const construct = (destKey: string) => {
|
||||||
|
const first = map(
|
||||||
|
async (chunk: Chunk) => {
|
||||||
|
await sleep(slowProcessorSpeed);
|
||||||
|
return { ...chunk, mapped: [1] };
|
||||||
|
},
|
||||||
|
{ highWaterMark: 1, objectMode: true },
|
||||||
|
);
|
||||||
|
|
||||||
|
// to clear first
|
||||||
|
first.on("data", chunk => {
|
||||||
|
expect(chunk.mapped).to.deep.equal([1]);
|
||||||
|
pendingReads--;
|
||||||
|
if (pendingReads === 0) {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
t.pass();
|
||||||
|
});
|
||||||
|
|
||||||
|
return first;
|
||||||
|
};
|
||||||
|
|
||||||
|
const _demux = demux(
|
||||||
|
construct,
|
||||||
|
{ key: "key" },
|
||||||
|
{
|
||||||
|
objectMode: true,
|
||||||
|
highWaterMark,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
_demux.on("error", err => {
|
||||||
|
reject();
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const item of input) {
|
||||||
|
const res = _demux.write(item);
|
||||||
|
expect(_demux._writableState.length).to.be.at.most(highWaterMark);
|
||||||
|
if (!res) {
|
||||||
|
await new Promise((resolv, rej) => {
|
||||||
|
_demux.once("drain", () => {
|
||||||
|
expect(_demux._writableState.length).to.be.equal(0);
|
||||||
|
t.pass();
|
||||||
|
resolv();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
test("demux() when write returns false, drain event should be emitted after at least slowProcessorSpeed * highWaterMark", t => {
|
test("demux() when write returns false, drain event should be emitted after at least slowProcessorSpeed * highWaterMark", t => {
|
||||||
return new Promise(async (resolve, reject) => {
|
return new Promise(async (resolve, reject) => {
|
||||||
t.plan(7);
|
t.plan(7);
|
||||||
@ -211,12 +280,7 @@ test("demux() when write returns false, drain event should be emitted after at l
|
|||||||
{ highWaterMark: 1, objectMode: true },
|
{ highWaterMark: 1, objectMode: true },
|
||||||
);
|
);
|
||||||
|
|
||||||
const second = map(async (chunk: Chunk) => {
|
first.pipe(sink);
|
||||||
chunk.mapped.push(2);
|
|
||||||
return chunk;
|
|
||||||
});
|
|
||||||
|
|
||||||
first.pipe(second).pipe(sink);
|
|
||||||
return first;
|
return first;
|
||||||
};
|
};
|
||||||
const _demux = demux(
|
const _demux = demux(
|
||||||
@ -231,20 +295,18 @@ test("demux() when write returns false, drain event should be emitted after at l
|
|||||||
reject();
|
reject();
|
||||||
});
|
});
|
||||||
|
|
||||||
let start = null;
|
let start = performance.now();
|
||||||
for (const item of input) {
|
for (const item of input) {
|
||||||
const res = _demux.write(item);
|
const res = _demux.write(item);
|
||||||
expect(_demux._writableState.length).to.be.at.most(highWaterMark);
|
|
||||||
if (!res) {
|
if (!res) {
|
||||||
start = performance.now();
|
|
||||||
await new Promise((resolv, rej) => {
|
await new Promise((resolv, rej) => {
|
||||||
_demux.once("drain", () => {
|
_demux.once("drain", () => {
|
||||||
expect(_demux._writableState.length).to.be.equal(0);
|
|
||||||
expect(performance.now() - start).to.be.greaterThan(
|
expect(performance.now() - start).to.be.greaterThan(
|
||||||
slowProcessorSpeed * highWaterMark,
|
slowProcessorSpeed * highWaterMark,
|
||||||
);
|
);
|
||||||
t.pass();
|
t.pass();
|
||||||
resolv();
|
resolv();
|
||||||
|
start = performance.now();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user