WIP Add some backpressure tests for compose

This commit is contained in:
Jerry Kurian
2019-09-07 11:04:33 -04:00
parent d5f3fd8bd8
commit cd10649d44
7 changed files with 221 additions and 66 deletions

View File

@@ -1,6 +1,8 @@
const test = require("ava");
const { expect } = require("chai");
const { compose, composeDuplex, map } = require("../src");
const { compose, composeDuplex, map, rate } = require("../src");
const { sleep } = require("../src/helpers");
import { performance } from "perf_hooks";
test.cb("compose() chains two streams together in the correct order", t => {
t.plan(3);
@@ -66,3 +68,167 @@ test.cb(
input.forEach(item => composed.write(item));
},
);
test.cb(
"compose() should emit drain event after 1 second when first stream is bottleneck",
t => {
t.plan(1);
const first = map(
async (chunk: number) => {
await sleep(200);
return chunk;
},
{
objectMode: true,
},
);
const second = map(
async (chunk: number) => {
return chunk;
},
{ objectMode: true },
);
const composed = compose(
[first, second],
{ objectMode: true, highWaterMark: 2 },
);
composed.on("error", err => {
t.end(err);
});
composed.on("drain", err => {
expect(performance.now() - start).to.be.greaterThan(1000);
t.pass();
});
composed.on("data", chunk => {
if (chunk.data === 5) {
t.end();
}
});
const input = [
{ data: 1 },
{ data: 2 },
{ data: 3 },
{ data: 4 },
{ data: 5 },
];
input.forEach(item => {
composed.write(item);
});
const start = performance.now();
},
);
test.cb(
"compose() should emit drain event immediately when second stream is bottleneck",
t => {
t.plan(1);
const first = map(
async (chunk: number) => {
return chunk;
},
{
objectMode: true,
},
);
const second = map(
async (chunk: number) => {
await sleep(500);
return chunk;
},
{ objectMode: true },
);
const composed = compose(
[first, second],
{ objectMode: true, highWaterMark: 2 },
);
composed.on("error", err => {
t.end(err);
});
composed.on("drain", err => {
expect(performance.now() - start).to.be.lessThan(100);
t.pass();
});
composed.on("data", chunk => {
if (chunk.data === 5) {
t.end();
}
});
const input = [
{ data: 1 },
{ data: 2 },
{ data: 3 },
{ data: 4 },
{ data: 5 },
];
input.forEach(item => {
composed.write(item);
});
const start = performance.now();
},
);
test.cb(
"first should contain up to highWaterMark items in readable state when second is bottleneck",
t => {
t.plan(10);
const first = map(
async (chunk: number) => {
expect(first._readableState.length).to.be.at.most(2);
t.pass();
return chunk;
},
{
objectMode: true,
highWaterMark: 2,
},
);
const second = map(
async (chunk: number) => {
expect(second._writableState.length).to.be.equal(1);
t.pass();
await sleep(100);
return chunk;
},
{ objectMode: true, highWaterMark: 2 },
);
const composed = compose(
[first, second],
{ objectMode: true },
);
composed.on("error", err => {
t.end(err);
});
composed.on("data", chunk => {
if (chunk.data === 5) {
t.end();
}
});
const input = [
{ data: 1 },
{ data: 2 },
{ data: 3 },
{ data: 4 },
{ data: 5 },
];
input.forEach(item => {
composed.write(item);
});
},
);

View File

@@ -9,7 +9,12 @@ test.cb("filter() filters elements synchronously", t => {
const expectedElements = ["a", "c"];
let i = 0;
source
.pipe(filter((element: string) => element !== "b"))
.pipe(
filter((element: string) => element !== "b", {
readableObjectMode: true,
writableObjectMode: true,
}),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
@@ -31,10 +36,13 @@ test.cb("filter() filters elements asynchronously", t => {
let i = 0;
source
.pipe(
filter(async (element: string) => {
await Promise.resolve();
return element !== "b";
}),
filter(
async (element: string) => {
await Promise.resolve();
return element !== "b";
},
{ readableObjectMode: true, writableObjectMode: true },
),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
@@ -55,12 +63,15 @@ test.cb("filter() emits errors during synchronous filtering", t => {
const source = new Readable({ objectMode: true });
source
.pipe(
filter((element: string) => {
if (element !== "a") {
throw new Error("Failed filtering");
}
return true;
}),
filter(
(element: string) => {
if (element !== "a") {
throw new Error("Failed filtering");
}
return true;
},
{ readableObjectMode: true, writableObjectMode: true },
),
)
.resume()
.on("error", err => {
@@ -80,13 +91,16 @@ test.cb("filter() emits errors during asynchronous filtering", t => {
const source = new Readable({ objectMode: true });
source
.pipe(
filter(async (element: string) => {
await Promise.resolve();
if (element !== "a") {
throw new Error("Failed filtering");
}
return true;
}),
filter(
async (element: string) => {
await Promise.resolve();
if (element !== "a") {
throw new Error("Failed filtering");
}
return true;
},
{ readableObjectMode: true, writableObjectMode: true },
),
)
.resume()
.on("error", err => {