Add more tests for compose

This commit is contained in:
Jerry Kurian 2019-09-09 08:58:04 -04:00
parent 2cbeae38e7
commit 599ba16d48

View File

@ -6,9 +6,20 @@ import { performance } from "perf_hooks";
test.cb("compose() chains two streams together in the correct order", t => { test.cb("compose() chains two streams together in the correct order", t => {
t.plan(3); t.plan(3);
interface Chunk {
visited: number[];
key: string;
}
let i = 0; let i = 0;
const first = map((chunk: number) => chunk + 1); const first = map((chunk: Chunk) => {
const second = map((chunk: number) => chunk * 2); chunk.visited.push(1);
return chunk;
});
const second = map((chunk: Chunk) => {
chunk.visited.push(2);
return chunk;
});
const composed = compose( const composed = compose(
[first, second], [first, second],
@ -16,7 +27,7 @@ test.cb("compose() chains two streams together in the correct order", t => {
); );
composed.on("data", data => { composed.on("data", data => {
expect(data).to.equal(result[i]); expect(data).to.deep.equal(result[i]);
t.pass(); t.pass();
i++; i++;
if (i === 3) { if (i === 3) {
@ -30,27 +41,47 @@ test.cb("compose() chains two streams together in the correct order", t => {
t.end(); t.end();
}); });
const input = [1, 2, 3]; const input = [
const result = [4, 6, 8]; { key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "c", visited: [] },
];
const result = [
{ key: "a", visited: [1, 2] },
{ key: "b", visited: [1, 2] },
{ key: "c", visited: [1, 2] },
];
input.forEach(item => composed.write(item)); input.forEach(item => composed.write(item));
}); });
test.cb( test.cb("piping compose() maintains correct order", t => {
"compose() followed by pipe chains streams together in the correct order",
t => {
t.plan(3); t.plan(3);
interface Chunk {
visited: number[];
key: string;
}
let i = 0; let i = 0;
const first = map((chunk: number) => chunk + 1); const first = map((chunk: Chunk) => {
const second = map((chunk: number) => chunk * 2); chunk.visited.push(1);
return chunk;
});
const second = map((chunk: Chunk) => {
chunk.visited.push(2);
return chunk;
});
const composed = compose( const composed = compose(
[first, second], [first, second],
{ objectMode: true }, { objectMode: true },
); );
const third = map((chunk: number) => chunk + 1); const third = map((chunk: Chunk) => {
chunk.visited.push(3);
return chunk;
});
composed.pipe(third).on("data", data => { composed.pipe(third).on("data", data => {
expect(data).to.equal(result[i]); expect(data).to.deep.equal(result[i]);
t.pass(); t.pass();
i++; i++;
if (i === 3) { if (i === 3) {
@ -62,27 +93,30 @@ test.cb(
t.end(err); t.end(err);
}); });
const input = [1, 2, 3]; const input = [
const result = [5, 7, 9]; { key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "c", visited: [] },
];
const result = [
{ key: "a", visited: [1, 2, 3] },
{ key: "b", visited: [1, 2, 3] },
{ key: "c", visited: [1, 2, 3] },
];
input.forEach(item => composed.write(item)); input.forEach(item => composed.write(item));
}, });
);
test.cb( test("compose() writable length should be less than highWaterMark when handing writes", async t => {
"compose() should emit drain event after 1 second when first stream is bottleneck", t.plan(7);
t => { return new Promise(async (resolve, reject) => {
t.plan(6);
let passedBottleneck = 0;
interface Chunk { interface Chunk {
index: number; key: string;
mapped: string[]; mapped: number[];
} }
const first = map( const first = map(
async (chunk: Chunk) => { async (chunk: Chunk) => {
await sleep(200); chunk.mapped.push(1);
passedBottleneck++;
chunk.mapped.push("first");
return chunk; return chunk;
}, },
{ {
@ -92,7 +126,141 @@ test.cb(
const second = map( const second = map(
async (chunk: Chunk) => { async (chunk: Chunk) => {
chunk.mapped.push("second"); chunk.mapped.push(2);
return chunk;
},
{ objectMode: true },
);
const composed = compose(
[first, second],
{ objectMode: true, highWaterMark: 2 },
);
composed.on("error", err => {
reject();
});
composed.on("drain", () => {
t.pass();
expect(composed._writableState.length).to.be.equal(0);
});
composed.on("data", (chunk: Chunk) => {
if (chunk.key === "e") {
resolve();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
for (const item of input) {
const res = composed.write(item);
expect(composed._writableState.length).to.be.at.most(2);
t.pass();
if (!res) {
await sleep(10);
}
}
});
});
test("compose() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", async t => {
t.plan(7);
const _rate = 25;
return new Promise(async (resolve, reject) => {
interface Chunk {
key: string;
mapped: number[];
}
const first = map(
async (chunk: Chunk) => {
await sleep(_rate);
chunk.mapped.push(1);
return chunk;
},
{
objectMode: true,
},
);
const second = map(
async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
},
{ objectMode: true },
);
const composed = compose(
[first, second],
{ objectMode: true, highWaterMark: 2 },
);
composed.on("error", err => {
reject();
});
composed.on("drain", () => {
t.pass();
expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.greaterThan(_rate);
});
composed.on("data", (chunk: Chunk) => {
if (chunk.key === "e") {
resolve();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
let start = performance.now();
for (const item of input) {
const res = composed.write(item);
expect(composed._writableState.length).to.be.at.most(2);
t.pass();
if (!res) {
start = performance.now();
await sleep(100);
}
}
});
});
test.cb(
"compose() should emit drain event after 500 ms when writing 5 items that take 100ms to process with a highWaterMark of 5 ",
t => {
t.plan(6);
const _rate = 100;
interface Chunk {
key: string;
mapped: number[];
}
const first = map(
async (chunk: Chunk) => {
await sleep(_rate);
chunk.mapped.push(1);
return chunk;
},
{
objectMode: true,
},
);
const second = map(
async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk; return chunk;
}, },
{ objectMode: true }, { objectMode: true },
@ -106,33 +274,28 @@ test.cb(
t.end(err); t.end(err);
}); });
composed.on("drain", err => { composed.on("drain", () => {
expect(composed._writableState.length).to.be.equal(0); expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.greaterThan(1000); expect(performance.now() - start).to.be.greaterThan(
_rate * input.length,
);
t.pass(); t.pass();
}); });
composed.on("data", (chunk: Chunk) => { composed.on("data", (chunk: Chunk) => {
// Since first is bottleneck, composed accumulates until cb is executed in first. Therefore buffer should contain 4, 3, 2, 1 then 0 elements
expect(composed._writableState.length).to.be.equal(
input.length - passedBottleneck,
);
expect(chunk.mapped.length).to.equal(2);
expect(chunk.mapped).to.deep.equal(["first", "second"]);
t.pass(); t.pass();
if (chunk.index === 5) { if (chunk.key === "e") {
t.end(); t.end();
} }
}); });
const input = [ const input = [
{ index: 1, mapped: [] }, { key: "a", mapped: [] },
{ index: 2, mapped: [] }, { key: "b", mapped: [] },
{ index: 3, mapped: [] }, { key: "c", mapped: [] },
{ index: 4, mapped: [] }, { key: "d", mapped: [] },
{ index: 5, mapped: [] }, { key: "e", mapped: [] },
]; ];
input.forEach(item => { input.forEach(item => {
composed.write(item); composed.write(item);
}); });
@ -177,7 +340,7 @@ test.cb(
t.end(err); t.end(err);
}); });
composed.on("drain", err => { composed.on("drain", () => {
expect(composed._writableState.length).to.be.equal(0); expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.lessThan(100); expect(performance.now() - start).to.be.lessThan(100);
t.pass(); t.pass();
@ -275,3 +438,65 @@ test.cb(
}); });
}, },
); );
test.cb(
"compose() should not emit drain event writing 5 items to compose with a highWaterMark of 6",
t => {
t.plan(5);
const _rate = 100;
interface Chunk {
key: string;
mapped: number[];
}
const first = map(
async (chunk: Chunk) => {
await sleep(_rate);
chunk.mapped.push(1);
return chunk;
},
{
objectMode: true,
},
);
const second = map(
async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
},
{ objectMode: true },
);
const composed = compose(
[first, second],
{ objectMode: true, highWaterMark: 6 },
);
composed.on("error", err => {
t.end(err);
});
composed.on("drain", () => {
t.end(new Error("Drain should not be emitted"));
});
composed.on("data", (chunk: Chunk) => {
t.pass();
if (chunk.key === "e") {
t.end();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
input.forEach(item => {
composed.write(item);
});
},
);