Update tests for parallelMap
This commit is contained in:
parent
712e538c3e
commit
86020a50ad
@ -1298,6 +1298,7 @@ test.cb("rate() sends data at desired rate", t => {
|
|||||||
|
|
||||||
test.cb("parallel() parallel mapping", t => {
|
test.cb("parallel() parallel mapping", t => {
|
||||||
t.plan(6);
|
t.plan(6);
|
||||||
|
const offset = 50;
|
||||||
const source = new Readable({ objectMode: true });
|
const source = new Readable({ objectMode: true });
|
||||||
const expectedElements = [
|
const expectedElements = [
|
||||||
"a_processed",
|
"a_processed",
|
||||||
@ -1307,17 +1308,21 @@ test.cb("parallel() parallel mapping", t => {
|
|||||||
"e_processed",
|
"e_processed",
|
||||||
"f_processed",
|
"f_processed",
|
||||||
];
|
];
|
||||||
const orderedResults: Array<{ output: string; processed: number }> = [];
|
interface IPerfData {
|
||||||
// Record start / end times of each process and then compare to figure out # of processes ocurring and order
|
start: number;
|
||||||
|
output?: string;
|
||||||
|
finish?: number;
|
||||||
|
}
|
||||||
|
const orderedResults: IPerfData[] = [];
|
||||||
source
|
source
|
||||||
.pipe(
|
.pipe(
|
||||||
parallelMap(async (data: any) => {
|
parallelMap(async (data: any) => {
|
||||||
|
const perfData: IPerfData = { start: performance.now() };
|
||||||
const c = data + "_processed";
|
const c = data + "_processed";
|
||||||
await sleep(50);
|
perfData.output = c;
|
||||||
orderedResults.push({
|
await sleep(offset);
|
||||||
output: c,
|
perfData.finish = performance.now();
|
||||||
processed: performance.now(),
|
orderedResults.push(perfData);
|
||||||
});
|
|
||||||
return c;
|
return c;
|
||||||
}, 2),
|
}, 2),
|
||||||
)
|
)
|
||||||
@ -1326,26 +1331,29 @@ test.cb("parallel() parallel mapping", t => {
|
|||||||
})
|
})
|
||||||
.on("error", t.end)
|
.on("error", t.end)
|
||||||
.on("end", async () => {
|
.on("end", async () => {
|
||||||
expect(orderedResults[0].processed).to.be.lessThan(
|
expect(orderedResults[0].finish).to.be.lessThan(
|
||||||
orderedResults[1].processed + 50,
|
orderedResults[2].start,
|
||||||
);
|
);
|
||||||
expect(orderedResults[2].processed).to.be.lessThan(
|
expect(orderedResults[1].finish).to.be.lessThan(
|
||||||
orderedResults[3].processed + 50,
|
orderedResults[3].start,
|
||||||
);
|
);
|
||||||
expect(orderedResults[4].processed).to.be.lessThan(
|
expect(orderedResults[2].finish).to.be.lessThan(
|
||||||
orderedResults[5].processed + 50,
|
orderedResults[4].start,
|
||||||
);
|
);
|
||||||
expect(orderedResults[2].processed).to.be.greaterThan(
|
expect(orderedResults[3].finish).to.be.lessThan(
|
||||||
orderedResults[0].processed + 50,
|
orderedResults[5].start,
|
||||||
);
|
);
|
||||||
expect(orderedResults[3].processed).to.be.greaterThan(
|
expect(orderedResults[0].start).to.be.lessThan(
|
||||||
orderedResults[1].processed + 50,
|
orderedResults[2].start + offset
|
||||||
);
|
);
|
||||||
expect(orderedResults[4].processed).to.be.greaterThan(
|
expect(orderedResults[1].start).to.be.lessThan(
|
||||||
orderedResults[2].processed + 50,
|
orderedResults[3].start + offset
|
||||||
);
|
);
|
||||||
expect(orderedResults[5].processed).to.be.greaterThan(
|
expect(orderedResults[2].start).to.be.lessThan(
|
||||||
orderedResults[3].processed + 50,
|
orderedResults[4].start + offset
|
||||||
|
);
|
||||||
|
expect(orderedResults[3].start).to.be.lessThan(
|
||||||
|
orderedResults[5].start + offset
|
||||||
);
|
);
|
||||||
t.end();
|
t.end();
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user