Add test for highwatermark

This commit is contained in:
Jerry Kurian 2019-09-07 14:27:55 -04:00
parent cd10649d44
commit ae7c9d6b09

View File

@ -72,10 +72,15 @@ test.cb(
test.cb( test.cb(
"compose() should emit drain event after 1 second when first stream is bottleneck", "compose() should emit drain event after 1 second when first stream is bottleneck",
t => { t => {
t.plan(1); t.plan(6);
interface Chunk {
index: number;
mapped: string[];
}
const first = map( const first = map(
async (chunk: number) => { async (chunk: Chunk) => {
await sleep(200); await sleep(200);
chunk.mapped.push("first");
return chunk; return chunk;
}, },
{ {
@ -84,7 +89,8 @@ test.cb(
); );
const second = map( const second = map(
async (chunk: number) => { async (chunk: Chunk) => {
chunk.mapped.push("second");
return chunk; return chunk;
}, },
{ objectMode: true }, { objectMode: true },
@ -103,8 +109,11 @@ test.cb(
t.pass(); t.pass();
}); });
composed.on("data", chunk => { composed.on("data", (chunk: Chunk) => {
if (chunk.data === 5) { expect(chunk.mapped.length).to.equal(2);
expect(chunk.mapped).to.deep.equal(["first", "second"]);
t.pass();
if (chunk.index === 5) {
t.end(); t.end();
} }
}); });
@ -127,9 +136,14 @@ test.cb(
test.cb( test.cb(
"compose() should emit drain event immediately when second stream is bottleneck", "compose() should emit drain event immediately when second stream is bottleneck",
t => { t => {
t.plan(1); t.plan(6);
interface Chunk {
index: number;
mapped: string[];
}
const first = map( const first = map(
async (chunk: number) => { async (chunk: Chunk) => {
chunk.mapped.push("first");
return chunk; return chunk;
}, },
{ {
@ -138,8 +152,9 @@ test.cb(
); );
const second = map( const second = map(
async (chunk: number) => { async (chunk: Chunk) => {
await sleep(500); await sleep(500);
chunk.mapped.push("second");
return chunk; return chunk;
}, },
{ objectMode: true }, { objectMode: true },
@ -158,18 +173,21 @@ test.cb(
t.pass(); t.pass();
}); });
composed.on("data", chunk => { composed.on("data", (chunk: Chunk) => {
if (chunk.data === 5) { expect(chunk.mapped.length).to.equal(2);
expect(chunk.mapped).to.deep.equal(["first", "second"]);
t.pass();
if (chunk.index === 5) {
t.end(); t.end();
} }
}); });
const input = [ const input = [
{ data: 1 }, { index: 1, mapped: [] },
{ data: 2 }, { index: 2, mapped: [] },
{ data: 3 }, { index: 3, mapped: [] },
{ data: 4 }, { index: 4, mapped: [] },
{ data: 5 }, { index: 5, mapped: [] },
]; ];
input.forEach(item => { input.forEach(item => {
@ -180,13 +198,17 @@ test.cb(
); );
test.cb( test.cb(
"first should contain up to highWaterMark items in readable state when second is bottleneck", "compose() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck",
t => { t => {
t.plan(10); t.plan(6);
interface Chunk {
index: number;
mapped: string[];
}
const first = map( const first = map(
async (chunk: number) => { async (chunk: Chunk) => {
expect(first._readableState.length).to.be.at.most(2); expect(first._readableState.length).to.be.at.most(2);
t.pass(); chunk.mapped.push("first");
return chunk; return chunk;
}, },
{ {
@ -196,10 +218,10 @@ test.cb(
); );
const second = map( const second = map(
async (chunk: number) => { async (chunk: Chunk) => {
expect(second._writableState.length).to.be.equal(1); expect(second._writableState.length).to.be.equal(1);
t.pass();
await sleep(100); await sleep(100);
chunk.mapped.push("second");
return chunk; return chunk;
}, },
{ objectMode: true, highWaterMark: 2 }, { objectMode: true, highWaterMark: 2 },
@ -207,24 +229,31 @@ test.cb(
const composed = compose( const composed = compose(
[first, second], [first, second],
{ objectMode: true }, { objectMode: true, highWaterMark: 3 },
); );
composed.on("error", err => { composed.on("error", err => {
t.end(err); t.end(err);
}); });
composed.on("data", chunk => { composed.on("data", (chunk: Chunk) => {
if (chunk.data === 5) { expect(chunk.mapped.length).to.equal(2);
expect(chunk.mapped).to.deep.equal(["first", "second"]);
t.pass();
if (chunk.index === 5) {
t.end(); t.end();
} }
}); });
composed.on("drain", () => {
t.pass();
});
const input = [ const input = [
{ data: 1 }, { index: 1, mapped: [] },
{ data: 2 }, { index: 2, mapped: [] },
{ data: 3 }, { index: 3, mapped: [] },
{ data: 4 }, { index: 4, mapped: [] },
{ data: 5 }, { index: 5, mapped: [] },
]; ];
input.forEach(item => { input.forEach(item => {