2019-12-06 21:38:52 +00:00
|
|
|
import * as test from "ava";
|
|
|
|
import { expect } from "chai";
|
|
|
|
import { sleep } from "../src/helpers";
|
|
|
|
import { Readable, Writable } from "stream";
|
2020-07-04 14:43:52 +00:00
|
|
|
import { strom } from "../src";
|
2019-09-07 15:04:33 +00:00
|
|
|
import { performance } from "perf_hooks";
|
2020-07-04 14:43:52 +00:00
|
|
|
const { compose, map, fromArray } = strom({ objectMode: true });
|
2019-08-21 19:40:19 +00:00
|
|
|
|
2019-08-22 18:52:39 +00:00
|
|
|
test.cb("compose() chains two streams together in the correct order", t => {
|
|
|
|
t.plan(3);
|
2019-09-09 12:58:04 +00:00
|
|
|
interface Chunk {
|
|
|
|
visited: number[];
|
|
|
|
key: string;
|
|
|
|
}
|
|
|
|
|
2019-08-22 18:52:39 +00:00
|
|
|
let i = 0;
|
2019-09-09 12:58:04 +00:00
|
|
|
const first = map((chunk: Chunk) => {
|
|
|
|
chunk.visited.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
|
|
|
const second = map((chunk: Chunk) => {
|
|
|
|
chunk.visited.push(2);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-08-21 19:40:19 +00:00
|
|
|
|
2019-12-06 21:38:52 +00:00
|
|
|
const composed = compose([first, second]);
|
2019-08-22 16:07:30 +00:00
|
|
|
|
|
|
|
composed.on("data", data => {
|
2019-09-09 12:58:04 +00:00
|
|
|
expect(data).to.deep.equal(result[i]);
|
2019-08-22 18:52:39 +00:00
|
|
|
t.pass();
|
|
|
|
i++;
|
|
|
|
if (i === 3) {
|
|
|
|
t.end();
|
|
|
|
}
|
2019-08-22 16:07:30 +00:00
|
|
|
});
|
2019-08-21 19:40:19 +00:00
|
|
|
|
2019-09-09 12:58:04 +00:00
|
|
|
const input = [
|
|
|
|
{ key: "a", visited: [] },
|
|
|
|
{ key: "b", visited: [] },
|
|
|
|
{ key: "c", visited: [] },
|
|
|
|
];
|
|
|
|
const result = [
|
|
|
|
{ key: "a", visited: [1, 2] },
|
|
|
|
{ key: "b", visited: [1, 2] },
|
|
|
|
{ key: "c", visited: [1, 2] },
|
|
|
|
];
|
2019-08-22 18:52:39 +00:00
|
|
|
|
|
|
|
input.forEach(item => composed.write(item));
|
2019-08-22 16:07:30 +00:00
|
|
|
});
|
|
|
|
|
2019-09-09 12:58:04 +00:00
|
|
|
test.cb("piping compose() maintains correct order", t => {
|
|
|
|
t.plan(3);
|
|
|
|
interface Chunk {
|
|
|
|
visited: number[];
|
|
|
|
key: string;
|
|
|
|
}
|
|
|
|
let i = 0;
|
|
|
|
const first = map((chunk: Chunk) => {
|
|
|
|
chunk.visited.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
|
|
|
const second = map((chunk: Chunk) => {
|
|
|
|
chunk.visited.push(2);
|
|
|
|
return chunk;
|
|
|
|
});
|
|
|
|
|
2019-12-06 21:38:52 +00:00
|
|
|
const composed = compose([first, second]);
|
2019-09-09 12:58:04 +00:00
|
|
|
const third = map((chunk: Chunk) => {
|
|
|
|
chunk.visited.push(3);
|
|
|
|
return chunk;
|
|
|
|
});
|
|
|
|
|
|
|
|
composed.pipe(third).on("data", data => {
|
|
|
|
expect(data).to.deep.equal(result[i]);
|
|
|
|
t.pass();
|
|
|
|
i++;
|
|
|
|
if (i === 3) {
|
|
|
|
t.end();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
composed.on("error", err => {
|
|
|
|
t.end(err);
|
|
|
|
});
|
|
|
|
|
|
|
|
const input = [
|
|
|
|
{ key: "a", visited: [] },
|
|
|
|
{ key: "b", visited: [] },
|
|
|
|
{ key: "c", visited: [] },
|
|
|
|
];
|
|
|
|
const result = [
|
|
|
|
{ key: "a", visited: [1, 2, 3] },
|
|
|
|
{ key: "b", visited: [1, 2, 3] },
|
|
|
|
{ key: "c", visited: [1, 2, 3] },
|
|
|
|
];
|
|
|
|
|
|
|
|
input.forEach(item => composed.write(item));
|
|
|
|
});
|
|
|
|
|
|
|
|
test("compose() writable length should be less than highWaterMark when handing writes", async t => {
|
2020-03-02 15:07:20 +00:00
|
|
|
t.plan(2);
|
2019-09-09 12:58:04 +00:00
|
|
|
return new Promise(async (resolve, reject) => {
|
|
|
|
interface Chunk {
|
|
|
|
key: string;
|
|
|
|
mapped: number[];
|
|
|
|
}
|
2019-12-06 21:38:52 +00:00
|
|
|
const first = map(async (chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-09-09 12:58:04 +00:00
|
|
|
|
2019-12-06 21:38:52 +00:00
|
|
|
const second = map(async (chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(2);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-08-21 19:40:19 +00:00
|
|
|
|
2020-07-04 14:43:52 +00:00
|
|
|
const composed = compose([first, second], undefined, {
|
|
|
|
highWaterMark: 2,
|
|
|
|
});
|
2019-09-09 12:58:04 +00:00
|
|
|
composed.on("error", err => {
|
|
|
|
reject();
|
|
|
|
});
|
|
|
|
|
|
|
|
composed.on("drain", () => {
|
2019-08-22 18:52:39 +00:00
|
|
|
t.pass();
|
2019-09-09 12:58:04 +00:00
|
|
|
expect(composed._writableState.length).to.be.equal(0);
|
|
|
|
});
|
|
|
|
|
|
|
|
composed.on("data", (chunk: Chunk) => {
|
|
|
|
if (chunk.key === "e") {
|
|
|
|
resolve();
|
2019-08-22 18:52:39 +00:00
|
|
|
}
|
|
|
|
});
|
2019-08-22 16:07:30 +00:00
|
|
|
|
2019-09-09 12:58:04 +00:00
|
|
|
const input = [
|
|
|
|
{ key: "a", mapped: [] },
|
|
|
|
{ key: "b", mapped: [] },
|
|
|
|
{ key: "c", mapped: [] },
|
|
|
|
{ key: "d", mapped: [] },
|
|
|
|
{ key: "e", mapped: [] },
|
|
|
|
];
|
|
|
|
|
2020-03-02 15:07:20 +00:00
|
|
|
fromArray(input).pipe(composed);
|
2019-09-09 12:58:04 +00:00
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
test("compose() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", async t => {
|
2020-03-02 15:07:20 +00:00
|
|
|
t.plan(2);
|
2019-09-26 14:36:36 +00:00
|
|
|
const _rate = 100;
|
|
|
|
const highWaterMark = 2;
|
2019-09-09 12:58:04 +00:00
|
|
|
return new Promise(async (resolve, reject) => {
|
|
|
|
interface Chunk {
|
|
|
|
key: string;
|
|
|
|
mapped: number[];
|
|
|
|
}
|
2019-12-06 21:38:52 +00:00
|
|
|
const first = map(async (chunk: Chunk) => {
|
|
|
|
await sleep(_rate);
|
|
|
|
chunk.mapped.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-09-09 12:58:04 +00:00
|
|
|
|
2019-12-06 21:38:52 +00:00
|
|
|
const second = map(async (chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(2);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-09-09 12:58:04 +00:00
|
|
|
|
2020-07-04 14:43:52 +00:00
|
|
|
const composed = compose([first, second], undefined, {
|
|
|
|
highWaterMark,
|
|
|
|
});
|
2019-08-22 18:52:39 +00:00
|
|
|
composed.on("error", err => {
|
2019-09-09 12:58:04 +00:00
|
|
|
reject();
|
2019-08-22 18:52:39 +00:00
|
|
|
});
|
|
|
|
|
2019-09-09 12:58:04 +00:00
|
|
|
composed.on("drain", () => {
|
|
|
|
t.pass();
|
|
|
|
expect(composed._writableState.length).to.be.equal(0);
|
|
|
|
});
|
2019-08-22 18:52:39 +00:00
|
|
|
|
2019-09-09 12:58:04 +00:00
|
|
|
composed.on("data", (chunk: Chunk) => {
|
2020-03-02 15:07:20 +00:00
|
|
|
t.deepEqual(chunk.mapped, [1, 2]);
|
2019-09-09 12:58:04 +00:00
|
|
|
});
|
|
|
|
|
2020-03-02 15:07:20 +00:00
|
|
|
composed.on("finish", () => resolve());
|
|
|
|
|
2019-09-09 12:58:04 +00:00
|
|
|
const input = [
|
|
|
|
{ key: "a", mapped: [] },
|
2019-09-26 14:36:36 +00:00
|
|
|
{ key: "b", mapped: [] },
|
|
|
|
{ key: "c", mapped: [] },
|
|
|
|
{ key: "d", mapped: [] },
|
|
|
|
{ key: "e", mapped: [] },
|
2019-09-09 12:58:04 +00:00
|
|
|
];
|
2020-03-02 15:07:20 +00:00
|
|
|
fromArray(input).pipe(composed);
|
2019-09-09 12:58:04 +00:00
|
|
|
});
|
|
|
|
});
|
2019-09-07 15:04:33 +00:00
|
|
|
|
|
|
|
test.cb(
|
2019-09-09 12:58:04 +00:00
|
|
|
"compose() should emit drain event after 500 ms when writing 5 items that take 100ms to process with a highWaterMark of 5 ",
|
2019-09-07 15:04:33 +00:00
|
|
|
t => {
|
2019-09-07 18:27:55 +00:00
|
|
|
t.plan(6);
|
2019-09-09 12:58:04 +00:00
|
|
|
const _rate = 100;
|
2019-09-07 18:27:55 +00:00
|
|
|
interface Chunk {
|
2019-09-09 12:58:04 +00:00
|
|
|
key: string;
|
|
|
|
mapped: number[];
|
2019-09-07 18:27:55 +00:00
|
|
|
}
|
2019-12-06 21:38:52 +00:00
|
|
|
const first = map(async (chunk: Chunk) => {
|
|
|
|
await sleep(_rate);
|
|
|
|
chunk.mapped.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-09-07 15:04:33 +00:00
|
|
|
|
2019-12-06 21:38:52 +00:00
|
|
|
const second = map(async (chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(2);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-09-07 15:04:33 +00:00
|
|
|
|
2020-07-04 14:43:52 +00:00
|
|
|
const composed = compose([first, second], undefined, {
|
|
|
|
highWaterMark: 5,
|
|
|
|
});
|
2019-09-10 16:09:26 +00:00
|
|
|
|
2019-09-07 15:04:33 +00:00
|
|
|
composed.on("error", err => {
|
|
|
|
t.end(err);
|
|
|
|
});
|
|
|
|
|
2019-09-09 12:58:04 +00:00
|
|
|
composed.on("drain", () => {
|
2019-09-07 21:14:08 +00:00
|
|
|
expect(composed._writableState.length).to.be.equal(0);
|
2019-09-07 15:04:33 +00:00
|
|
|
t.pass();
|
|
|
|
});
|
|
|
|
|
2019-09-07 18:27:55 +00:00
|
|
|
composed.on("data", (chunk: Chunk) => {
|
|
|
|
t.pass();
|
2019-09-09 12:58:04 +00:00
|
|
|
if (chunk.key === "e") {
|
2019-09-07 15:04:33 +00:00
|
|
|
t.end();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
const input = [
|
2019-09-09 12:58:04 +00:00
|
|
|
{ key: "a", mapped: [] },
|
|
|
|
{ key: "b", mapped: [] },
|
|
|
|
{ key: "c", mapped: [] },
|
|
|
|
{ key: "d", mapped: [] },
|
|
|
|
{ key: "e", mapped: [] },
|
2019-09-07 15:04:33 +00:00
|
|
|
];
|
|
|
|
input.forEach(item => {
|
|
|
|
composed.write(item);
|
|
|
|
});
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
test.cb(
|
|
|
|
"compose() should emit drain event immediately when second stream is bottleneck",
|
|
|
|
t => {
|
2019-09-07 18:27:55 +00:00
|
|
|
t.plan(6);
|
2019-09-10 16:09:26 +00:00
|
|
|
const _rate = 200;
|
2019-09-07 18:27:55 +00:00
|
|
|
interface Chunk {
|
2019-09-09 15:53:21 +00:00
|
|
|
key: string;
|
|
|
|
mapped: number[];
|
2019-09-07 18:27:55 +00:00
|
|
|
}
|
2019-12-06 21:38:52 +00:00
|
|
|
const first = map((chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-09-07 15:04:33 +00:00
|
|
|
|
|
|
|
const second = map(
|
2019-09-07 18:27:55 +00:00
|
|
|
async (chunk: Chunk) => {
|
2019-09-07 21:14:08 +00:00
|
|
|
pendingReads--;
|
2019-09-10 16:09:26 +00:00
|
|
|
await sleep(_rate);
|
2019-09-09 15:53:21 +00:00
|
|
|
expect(second._writableState.length).to.be.equal(1);
|
2019-09-07 21:14:08 +00:00
|
|
|
expect(first._readableState.length).to.equal(pendingReads);
|
2019-09-09 15:53:21 +00:00
|
|
|
chunk.mapped.push(2);
|
2019-09-07 15:04:33 +00:00
|
|
|
return chunk;
|
|
|
|
},
|
2019-12-06 21:38:52 +00:00
|
|
|
{ highWaterMark: 1 },
|
2019-09-07 15:04:33 +00:00
|
|
|
);
|
|
|
|
|
2020-07-04 14:43:52 +00:00
|
|
|
const composed = compose([first, second], undefined, {
|
|
|
|
highWaterMark: 5,
|
|
|
|
});
|
2019-09-07 15:04:33 +00:00
|
|
|
composed.on("error", err => {
|
|
|
|
t.end(err);
|
|
|
|
});
|
|
|
|
|
2019-09-09 12:58:04 +00:00
|
|
|
composed.on("drain", () => {
|
2019-09-07 21:14:08 +00:00
|
|
|
expect(composed._writableState.length).to.be.equal(0);
|
2019-09-10 16:09:26 +00:00
|
|
|
expect(performance.now() - start).to.be.lessThan(_rate);
|
2019-09-07 15:04:33 +00:00
|
|
|
t.pass();
|
|
|
|
});
|
|
|
|
|
2019-09-07 18:27:55 +00:00
|
|
|
composed.on("data", (chunk: Chunk) => {
|
2019-09-07 21:14:08 +00:00
|
|
|
expect(composed._writableState.length).to.be.equal(0);
|
2019-09-07 18:27:55 +00:00
|
|
|
t.pass();
|
2019-09-09 15:53:21 +00:00
|
|
|
if (chunk.key === "e") {
|
2019-09-07 15:04:33 +00:00
|
|
|
t.end();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
const input = [
|
2019-09-09 15:53:21 +00:00
|
|
|
{ key: "a", mapped: [] },
|
|
|
|
{ key: "b", mapped: [] },
|
|
|
|
{ key: "c", mapped: [] },
|
|
|
|
{ key: "d", mapped: [] },
|
|
|
|
{ key: "e", mapped: [] },
|
2019-09-07 15:04:33 +00:00
|
|
|
];
|
2019-09-07 21:14:08 +00:00
|
|
|
let pendingReads = input.length;
|
2019-09-07 15:04:33 +00:00
|
|
|
|
|
|
|
input.forEach(item => {
|
|
|
|
composed.write(item);
|
|
|
|
});
|
2019-09-10 16:09:26 +00:00
|
|
|
|
2019-09-07 15:04:33 +00:00
|
|
|
const start = performance.now();
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
test.cb(
|
2019-09-07 18:27:55 +00:00
|
|
|
"compose() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck",
|
2019-09-07 15:04:33 +00:00
|
|
|
t => {
|
2019-09-07 18:27:55 +00:00
|
|
|
t.plan(6);
|
|
|
|
interface Chunk {
|
|
|
|
index: number;
|
|
|
|
mapped: string[];
|
|
|
|
}
|
2019-09-07 15:04:33 +00:00
|
|
|
const first = map(
|
2019-09-07 18:27:55 +00:00
|
|
|
async (chunk: Chunk) => {
|
2019-09-07 15:04:33 +00:00
|
|
|
expect(first._readableState.length).to.be.at.most(2);
|
2019-09-07 18:27:55 +00:00
|
|
|
chunk.mapped.push("first");
|
2019-09-07 15:04:33 +00:00
|
|
|
return chunk;
|
|
|
|
},
|
|
|
|
{
|
|
|
|
highWaterMark: 2,
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
const second = map(
|
2019-09-07 18:27:55 +00:00
|
|
|
async (chunk: Chunk) => {
|
2019-09-07 15:04:33 +00:00
|
|
|
expect(second._writableState.length).to.be.equal(1);
|
|
|
|
await sleep(100);
|
2019-09-07 18:27:55 +00:00
|
|
|
chunk.mapped.push("second");
|
2019-09-07 15:04:33 +00:00
|
|
|
return chunk;
|
|
|
|
},
|
2019-12-06 21:38:52 +00:00
|
|
|
{ highWaterMark: 2 },
|
2019-09-07 15:04:33 +00:00
|
|
|
);
|
|
|
|
|
2020-07-04 14:43:52 +00:00
|
|
|
const composed = compose([first, second], undefined, {
|
|
|
|
highWaterMark: 5,
|
|
|
|
});
|
2019-09-07 15:04:33 +00:00
|
|
|
composed.on("error", err => {
|
|
|
|
t.end(err);
|
|
|
|
});
|
|
|
|
|
2019-09-07 18:27:55 +00:00
|
|
|
composed.on("data", (chunk: Chunk) => {
|
|
|
|
expect(chunk.mapped.length).to.equal(2);
|
|
|
|
expect(chunk.mapped).to.deep.equal(["first", "second"]);
|
|
|
|
t.pass();
|
|
|
|
if (chunk.index === 5) {
|
2019-09-07 15:04:33 +00:00
|
|
|
t.end();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2019-09-07 18:27:55 +00:00
|
|
|
composed.on("drain", () => {
|
2019-09-07 21:14:08 +00:00
|
|
|
expect(composed._writableState.length).to.be.equal(0);
|
2019-09-07 18:27:55 +00:00
|
|
|
t.pass();
|
|
|
|
});
|
|
|
|
|
2019-09-07 15:04:33 +00:00
|
|
|
const input = [
|
2019-09-07 18:27:55 +00:00
|
|
|
{ index: 1, mapped: [] },
|
|
|
|
{ index: 2, mapped: [] },
|
|
|
|
{ index: 3, mapped: [] },
|
|
|
|
{ index: 4, mapped: [] },
|
|
|
|
{ index: 5, mapped: [] },
|
2019-09-07 15:04:33 +00:00
|
|
|
];
|
|
|
|
|
|
|
|
input.forEach(item => {
|
|
|
|
composed.write(item);
|
|
|
|
});
|
|
|
|
},
|
|
|
|
);
|
2019-09-09 12:58:04 +00:00
|
|
|
|
|
|
|
test.cb(
|
|
|
|
"compose() should not emit drain event writing 5 items to compose with a highWaterMark of 6",
|
|
|
|
t => {
|
|
|
|
t.plan(5);
|
|
|
|
const _rate = 100;
|
|
|
|
interface Chunk {
|
|
|
|
key: string;
|
|
|
|
mapped: number[];
|
|
|
|
}
|
2019-12-06 21:38:52 +00:00
|
|
|
const first = map(async (chunk: Chunk) => {
|
|
|
|
await sleep(_rate);
|
|
|
|
chunk.mapped.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-09-09 12:58:04 +00:00
|
|
|
|
2019-12-06 21:38:52 +00:00
|
|
|
const second = map(async (chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(2);
|
|
|
|
return chunk;
|
|
|
|
});
|
2019-09-09 12:58:04 +00:00
|
|
|
|
2020-07-04 14:43:52 +00:00
|
|
|
const composed = compose([first, second], undefined, {
|
|
|
|
highWaterMark: 6,
|
|
|
|
});
|
2019-09-09 12:58:04 +00:00
|
|
|
|
|
|
|
composed.on("error", err => {
|
|
|
|
t.end(err);
|
|
|
|
});
|
|
|
|
|
|
|
|
composed.on("drain", () => {
|
|
|
|
t.end(new Error("Drain should not be emitted"));
|
|
|
|
});
|
|
|
|
|
|
|
|
composed.on("data", (chunk: Chunk) => {
|
|
|
|
t.pass();
|
|
|
|
if (chunk.key === "e") {
|
|
|
|
t.end();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
const input = [
|
|
|
|
{ key: "a", mapped: [] },
|
|
|
|
{ key: "b", mapped: [] },
|
|
|
|
{ key: "c", mapped: [] },
|
|
|
|
{ key: "d", mapped: [] },
|
|
|
|
{ key: "e", mapped: [] },
|
|
|
|
];
|
|
|
|
|
|
|
|
input.forEach(item => {
|
|
|
|
composed.write(item);
|
|
|
|
});
|
|
|
|
},
|
|
|
|
);
|
2019-12-06 21:38:52 +00:00
|
|
|
|
|
|
|
test.cb("compose() should be 'destroyable'", t => {
|
|
|
|
t.plan(3);
|
|
|
|
const _sleep = 100;
|
|
|
|
interface Chunk {
|
|
|
|
key: string;
|
|
|
|
mapped: number[];
|
|
|
|
}
|
|
|
|
|
|
|
|
const first = map(async (chunk: Chunk) => {
|
|
|
|
await sleep(_sleep);
|
|
|
|
chunk.mapped.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
|
|
|
|
|
|
|
const second = map(async (chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(2);
|
|
|
|
return chunk;
|
|
|
|
});
|
|
|
|
|
2020-07-04 14:43:52 +00:00
|
|
|
const composed = compose([first, second], (err: any) => {
|
|
|
|
t.pass();
|
|
|
|
});
|
2019-12-06 21:38:52 +00:00
|
|
|
|
|
|
|
const fakeSource = new Readable({
|
|
|
|
objectMode: true,
|
|
|
|
read() {
|
|
|
|
return;
|
|
|
|
},
|
|
|
|
});
|
|
|
|
|
|
|
|
const fakeSink = new Writable({
|
|
|
|
objectMode: true,
|
|
|
|
write(data, enc, cb) {
|
|
|
|
const cur = input.shift();
|
|
|
|
t.is(cur.key, data.key);
|
|
|
|
t.deepEqual(cur.mapped, [1, 2]);
|
|
|
|
if (cur.key === "a") {
|
|
|
|
composed.destroy();
|
|
|
|
}
|
|
|
|
cb();
|
|
|
|
},
|
|
|
|
});
|
|
|
|
|
|
|
|
composed.on("close", t.end);
|
|
|
|
fakeSource.pipe(composed).pipe(fakeSink);
|
|
|
|
|
|
|
|
const input = [
|
|
|
|
{ key: "a", mapped: [] },
|
|
|
|
{ key: "b", mapped: [] },
|
|
|
|
{ key: "c", mapped: [] },
|
|
|
|
{ key: "d", mapped: [] },
|
|
|
|
{ key: "e", mapped: [] },
|
|
|
|
];
|
|
|
|
fakeSource.push(input[0]);
|
|
|
|
fakeSource.push(input[1]);
|
|
|
|
fakeSource.push(input[2]);
|
|
|
|
fakeSource.push(input[3]);
|
|
|
|
fakeSource.push(input[4]);
|
|
|
|
});
|
|
|
|
|
|
|
|
test.cb("compose() `finish` and `end` propagates", t => {
|
|
|
|
interface Chunk {
|
|
|
|
key: string;
|
|
|
|
mapped: number[];
|
|
|
|
}
|
|
|
|
|
|
|
|
t.plan(8);
|
|
|
|
const first = map(async (chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(1);
|
|
|
|
return chunk;
|
|
|
|
});
|
|
|
|
|
|
|
|
const second = map(async (chunk: Chunk) => {
|
|
|
|
chunk.mapped.push(2);
|
|
|
|
return chunk;
|
|
|
|
});
|
|
|
|
|
2020-07-04 14:43:52 +00:00
|
|
|
const composed = compose([first, second], undefined, {
|
|
|
|
highWaterMark: 3,
|
|
|
|
});
|
2019-12-06 21:38:52 +00:00
|
|
|
|
|
|
|
const fakeSource = new Readable({
|
|
|
|
objectMode: true,
|
|
|
|
read() {
|
|
|
|
return;
|
|
|
|
},
|
|
|
|
});
|
|
|
|
const sink = map((d: Chunk) => {
|
|
|
|
const curr = input.shift();
|
|
|
|
t.is(curr.key, d.key);
|
|
|
|
t.deepEqual(d.mapped, [1, 2]);
|
|
|
|
});
|
|
|
|
|
|
|
|
fakeSource.pipe(composed).pipe(sink);
|
|
|
|
|
|
|
|
fakeSource.on("end", () => {
|
|
|
|
t.pass();
|
|
|
|
});
|
|
|
|
composed.on("finish", () => {
|
|
|
|
t.pass();
|
|
|
|
});
|
|
|
|
composed.on("end", () => {
|
|
|
|
t.pass();
|
|
|
|
t.end();
|
|
|
|
});
|
|
|
|
sink.on("finish", () => {
|
|
|
|
t.pass();
|
|
|
|
});
|
|
|
|
|
|
|
|
const input = [
|
|
|
|
{ key: "a", mapped: [] },
|
|
|
|
{ key: "b", mapped: [] },
|
|
|
|
{ key: "c", mapped: [] },
|
|
|
|
{ key: "d", mapped: [] },
|
|
|
|
{ key: "e", mapped: [] },
|
|
|
|
];
|
|
|
|
fakeSource.push(input[0]);
|
|
|
|
fakeSource.push(input[1]);
|
|
|
|
fakeSource.push(null);
|
|
|
|
});
|