Add batch to Transform

Add tests for batched stream

Add unbatching

Update comment

Add Rate

Add parallel processing

Remove only

Remove deep

Cleanup

Cleanup
This commit is contained in:
Jerry Kurian 2019-05-31 15:06:26 -04:00
parent 002e9b7bb8
commit 81bc9e6bc5
3 changed files with 248 additions and 1 deletions

View File

@ -1,6 +1,7 @@
import * as cp from "child_process"; import * as cp from "child_process";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { performance } from "perf_hooks";
import { Readable } from "stream"; import { Readable } from "stream";
import { import {
fromArray, fromArray,
@ -19,6 +20,10 @@ import {
child, child,
reduce, reduce,
last, last,
batch,
unbatch,
rate,
parallelMap,
} from "."; } from ".";
test.cb("fromArray() streams array elements in flowing mode", t => { test.cb("fromArray() streams array elements in flowing mode", t => {
@ -1180,3 +1185,147 @@ test("last() resolves to the last chunk streamed by the given readable stream",
const lastChunk = await lastPromise; const lastChunk = await lastPromise;
expect(lastChunk).to.equal("ef"); expect(lastChunk).to.equal("ef");
}); });
test.cb("batch() batches chunks together", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = [["a", "b", "c"], ["d", "e", "f"], ["g"]];
let i = 0;
source
.pipe(batch(3))
.on("data", (element: string[]) => {
expect(element).to.deep.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push("d");
source.push("e");
source.push("f");
source.push("g");
source.push(null);
});
test.cb("unbatch() unbatches", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c"];
let i = 0;
source
.pipe(batch(3))
.pipe(unbatch())
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("rate() sends data at desired rate", t => {
t.plan(9);
const fastRate = 500;
const medRate = 50;
const slowRate = 1;
const sourceFast = new Readable({ objectMode: true });
const sourceMed = new Readable({ objectMode: true });
const sourceSlow = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c"];
const start = performance.now();
let i = 0;
let j = 0;
let k = 0;
sourceFast
.pipe(rate(fastRate))
.on("data", (element: string[]) => {
const currentRate = (i / (performance.now() - start)) * 1000;
expect(element).to.deep.equal(expectedElements[i]);
expect(currentRate).lessThan(fastRate);
t.pass();
i++;
})
.on("error", t.end);
sourceMed
.pipe(rate(medRate))
.on("data", (element: string[]) => {
const currentRate = (j / (performance.now() - start)) * 1000;
expect(element).to.deep.equal(expectedElements[j]);
expect(currentRate).lessThan(medRate);
t.pass();
j++;
})
.on("error", t.end);
sourceSlow
.pipe(rate(slowRate))
.on("data", (element: string[]) => {
const currentRate = (k / (performance.now() - start)) * 1000;
expect(element).to.deep.equal(expectedElements[k]);
expect(currentRate).lessThan(slowRate);
t.pass();
k++;
})
.on("error", t.end)
.on("end", t.end);
sourceFast.push("a");
sourceFast.push("b");
sourceFast.push("c");
sourceFast.push(null);
sourceMed.push("a");
sourceMed.push("b");
sourceMed.push("c");
sourceMed.push(null);
sourceSlow.push("a");
sourceSlow.push("b");
sourceSlow.push("c");
sourceSlow.push(null);
});
test.cb("parallel() parallel mapping", t => {
t.plan(5);
const source = new Readable({ objectMode: true });
const expectedElements = [
"a_processed",
"b_processed",
"c_processed",
"d_processed",
"e_processed",
];
const orderedResults: string[] = [];
source
.pipe(parallelMap(2, data => data + "_processed"))
.on("data", (element: string) => {
t.true(expectedElements.includes(element));
orderedResults.push(element);
})
.on("error", t.end)
.on("end", () => {
expect(orderedResults[0]).to.equal("a_processed")
expect(orderedResults[1]).to.equal("b_processed")
expect(orderedResults[2]).to.equal("d_processed")
expect(orderedResults[3]).to.equal("c_processed")
expect(orderedResults[4]).to.equal("e_processed")
t.end();
});
source.push("a");
source.push("b");
source.push("c");
source.push("d");
source.push("e");
source.push(null);
});

View File

@ -1,4 +1,5 @@
import { Transform, Readable, Writable, Duplex } from "stream"; import { Transform, Readable, Writable, Duplex } from "stream";
import { performance } from "perf_hooks";
import { ChildProcess } from "child_process"; import { ChildProcess } from "child_process";
import { StringDecoder } from "string_decoder"; import { StringDecoder } from "string_decoder";
@ -13,6 +14,10 @@ export interface WithEncoding {
encoding: string; encoding: string;
} }
async function sleep(time: number) {
return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null;
}
/** /**
* Convert an array into a Readable stream of its elements * Convert an array into a Readable stream of its elements
* @param array Array of elements to stream * @param array Array of elements to stream
@ -499,3 +504,96 @@ export function last<T>(readable: Readable): Promise<T | null> {
.on("end", () => resolve(lastChunk)); .on("end", () => resolve(lastChunk));
}); });
} }
/**
* Stores chunks of data internally in array and batches when batchSize is reached.
*
* @param batchSize Size of the batches
*/
export function batch(batchSize: number) {
const buffer: any[] = [];
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
if (buffer.length === batchSize - 1) {
buffer.push(chunk);
callback(undefined, buffer.splice(0));
} else {
buffer.push(chunk);
callback();
}
},
flush(callback) {
callback(undefined, buffer.splice(0));
},
});
}
/**
* Unbatches and sends individual chunks of data
*/
export function unbatch() {
return new Transform({
objectMode: true,
transform(data, encoding, callback) {
for (const d of data) {
this.push(d);
}
callback();
},
});
}
/**
* Limits date of data transferred into stream.
* @param rate Desired rate in ms
*/
export function rate(targetRate: number) {
const deltaMS = (1 / targetRate) * 1000;
let total = 0;
const start = performance.now();
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
const currentRate = (total / (performance.now() - start)) * 1000;
if (targetRate && currentRate > targetRate) {
await sleep(deltaMS);
}
total += 1;
callback(undefined, data);
},
});
}
/**
* Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk
*/
export function parallelMap<T, R>(parallel: number, func: (data: T) => R) {
let inflight = 0;
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
while (parallel <= inflight) {
await sleep(5);
}
inflight += 1;
callback();
try {
const res = await func(data);
this.push(res);
} catch (e) {
this.emit(e);
} finally {
inflight -= 1;
}
},
async flush(callback) {
while (inflight > 0) {
await sleep(5);
}
callback();
},
});
}

View File

@ -7,7 +7,7 @@
"rules": { "rules": {
"no-console": false, "no-console": false,
"no-implicit-dependencies": [true, "dev"], "no-implicit-dependencies": [true, "dev"],
"prettier": true, "prettier": [true, ".prettierrc"],
"ordered-imports": false, "ordered-imports": false,
"interface-name": false "interface-name": false
} }