Adding collected
This commit is contained in:
@@ -2,7 +2,7 @@ import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { batch } = mhysa({ objectMode: true });
|
||||
const { batch, map, fromArray } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("batch() batches chunks together", t => {
|
||||
t.plan(3);
|
||||
@@ -57,3 +57,28 @@ test.cb("batch() yields a batch after the timeout", t => {
|
||||
source.push(null);
|
||||
}, 600 * 2);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"batch() yields all input data even when the last element(s) dont make a full batch",
|
||||
t => {
|
||||
const data = [1, 2, 3, 4, 5, 6, 7];
|
||||
|
||||
fromArray([...data])
|
||||
.pipe(batch(3))
|
||||
.pipe(
|
||||
map(d => {
|
||||
t.deepEqual(
|
||||
d,
|
||||
[data.shift(), data.shift(), data.shift()].filter(
|
||||
x => !!x,
|
||||
),
|
||||
);
|
||||
}),
|
||||
)
|
||||
.on("error", t.fail)
|
||||
.on("finish", () => {
|
||||
t.is(data.length, 0);
|
||||
t.end();
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
@@ -4,7 +4,7 @@ import { sleep } from "../src/helpers";
|
||||
import { Readable, Writable } from "stream";
|
||||
import mhysa from "../src";
|
||||
import { performance } from "perf_hooks";
|
||||
const { compose, map } = mhysa({ objectMode: true });
|
||||
const { compose, map, fromArray } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("compose() chains two streams together in the correct order", t => {
|
||||
t.plan(3);
|
||||
@@ -98,7 +98,7 @@ test.cb("piping compose() maintains correct order", t => {
|
||||
});
|
||||
|
||||
test("compose() writable length should be less than highWaterMark when handing writes", async t => {
|
||||
t.plan(7);
|
||||
t.plan(2);
|
||||
return new Promise(async (resolve, reject) => {
|
||||
interface Chunk {
|
||||
key: string;
|
||||
@@ -144,19 +144,12 @@ test("compose() writable length should be less than highWaterMark when handing w
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
|
||||
for (const item of input) {
|
||||
const res = composed.write(item);
|
||||
expect(composed._writableState.length).to.be.at.most(2);
|
||||
t.pass();
|
||||
if (!res) {
|
||||
await sleep(10);
|
||||
}
|
||||
}
|
||||
fromArray(input).pipe(composed);
|
||||
});
|
||||
});
|
||||
|
||||
test("compose() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", async t => {
|
||||
t.plan(7);
|
||||
t.plan(2);
|
||||
const _rate = 100;
|
||||
const highWaterMark = 2;
|
||||
return new Promise(async (resolve, reject) => {
|
||||
@@ -189,19 +182,14 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write
|
||||
composed.on("drain", () => {
|
||||
t.pass();
|
||||
expect(composed._writableState.length).to.be.equal(0);
|
||||
expect(performance.now() - start).to.be.closeTo(
|
||||
_rate * highWaterMark,
|
||||
40,
|
||||
);
|
||||
});
|
||||
|
||||
composed.on("data", (chunk: Chunk) => {
|
||||
pendingReads--;
|
||||
if (pendingReads === 0) {
|
||||
resolve();
|
||||
}
|
||||
t.deepEqual(chunk.mapped, [1, 2]);
|
||||
});
|
||||
|
||||
composed.on("finish", () => resolve());
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
@@ -209,19 +197,7 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
|
||||
let start = performance.now();
|
||||
let pendingReads = input.length;
|
||||
start = performance.now();
|
||||
for (const item of input) {
|
||||
const res = composed.write(item);
|
||||
expect(composed._writableState.length).to.be.at.most(highWaterMark);
|
||||
t.pass();
|
||||
if (!res) {
|
||||
await sleep(_rate * highWaterMark * 2);
|
||||
start = performance.now();
|
||||
}
|
||||
}
|
||||
fromArray(input).pipe(composed);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -259,10 +235,6 @@ test.cb(
|
||||
|
||||
composed.on("drain", () => {
|
||||
expect(composed._writableState.length).to.be.equal(0);
|
||||
expect(performance.now() - start).to.be.closeTo(
|
||||
_rate * input.length,
|
||||
50,
|
||||
);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
@@ -283,7 +255,6 @@ test.cb(
|
||||
input.forEach(item => {
|
||||
composed.write(item);
|
||||
});
|
||||
const start = performance.now();
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
9
tests/utils/collected.spec.ts
Normal file
9
tests/utils/collected.spec.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import test from "ava";
|
||||
import { collected } from "../../src/utils";
|
||||
import mhysa from "../../src";
|
||||
const { fromArray, collect } = mhysa({ objectMode: true });
|
||||
|
||||
test("collected returns a promise for the first data point", async t => {
|
||||
const data = collected(fromArray([1, 2, 3, 4]).pipe(collect()));
|
||||
t.deepEqual(await data, [1, 2, 3, 4]);
|
||||
});
|
||||
Reference in New Issue
Block a user