diff --git a/src/index.spec.ts b/src/index.spec.ts index a15050d..051fdd7 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -1,6 +1,7 @@ import * as cp from "child_process"; import test from "ava"; import { expect } from "chai"; +import { performance } from "perf_hooks"; import { Readable } from "stream"; import { fromArray, @@ -19,6 +20,10 @@ import { child, reduce, last, + batch, + unbatch, + rate, + parallelMap, } from "."; test.cb("fromArray() streams array elements in flowing mode", t => { @@ -1180,3 +1185,147 @@ test("last() resolves to the last chunk streamed by the given readable stream", const lastChunk = await lastPromise; expect(lastChunk).to.equal("ef"); }); + +test.cb("batch() batches chunks together", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = [["a", "b", "c"], ["d", "e", "f"], ["g"]]; + let i = 0; + source + .pipe(batch(3)) + .on("data", (element: string[]) => { + expect(element).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push("d"); + source.push("e"); + source.push("f"); + source.push("g"); + source.push(null); +}); + +test.cb("unbatch() unbatches", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["a", "b", "c"]; + let i = 0; + source + .pipe(batch(3)) + .pipe(unbatch()) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("rate() sends data at desired rate", t => { + t.plan(9); + const fastRate = 500; + const medRate = 50; + const slowRate = 1; + const sourceFast = new Readable({ objectMode: true }); + const sourceMed = new Readable({ objectMode: true }); + const sourceSlow = new Readable({ objectMode: true }); + const expectedElements = ["a", "b", "c"]; + const start = performance.now(); + let i = 0; + let j = 0; + let k = 0; + + sourceFast + .pipe(rate(fastRate)) + .on("data", (element: string[]) => { + const currentRate = (i / (performance.now() - start)) * 1000; + expect(element).to.deep.equal(expectedElements[i]); + expect(currentRate).lessThan(fastRate); + t.pass(); + i++; + }) + .on("error", t.end); + + sourceMed + .pipe(rate(medRate)) + .on("data", (element: string[]) => { + const currentRate = (j / (performance.now() - start)) * 1000; + expect(element).to.deep.equal(expectedElements[j]); + expect(currentRate).lessThan(medRate); + t.pass(); + j++; + }) + .on("error", t.end); + + sourceSlow + .pipe(rate(slowRate)) + .on("data", (element: string[]) => { + const currentRate = (k / (performance.now() - start)) * 1000; + expect(element).to.deep.equal(expectedElements[k]); + expect(currentRate).lessThan(slowRate); + t.pass(); + k++; + }) + .on("error", t.end) + .on("end", t.end); + + sourceFast.push("a"); + sourceFast.push("b"); + sourceFast.push("c"); + sourceFast.push(null); + sourceMed.push("a"); + sourceMed.push("b"); + sourceMed.push("c"); + sourceMed.push(null); + sourceSlow.push("a"); + sourceSlow.push("b"); + sourceSlow.push("c"); + sourceSlow.push(null); +}); + +test.cb("parallel() parallel mapping", t => { + t.plan(5); + const source = new Readable({ objectMode: true }); + const expectedElements = [ + "a_processed", + "b_processed", + "c_processed", + "d_processed", + "e_processed", + ]; + const orderedResults: string[] = []; + source + .pipe(parallelMap(2, data => data + "_processed")) + .on("data", (element: string) => { + t.true(expectedElements.includes(element)); + orderedResults.push(element); + }) + .on("error", t.end) + .on("end", () => { + expect(orderedResults[0]).to.equal("a_processed") + expect(orderedResults[1]).to.equal("b_processed") + expect(orderedResults[2]).to.equal("d_processed") + expect(orderedResults[3]).to.equal("c_processed") + expect(orderedResults[4]).to.equal("e_processed") + t.end(); + }); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push("d"); + source.push("e"); + source.push(null); +}); diff --git a/src/index.ts b/src/index.ts index 6da9256..dfbee1b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ import { Transform, Readable, Writable, Duplex } from "stream"; +import { performance } from "perf_hooks"; import { ChildProcess } from "child_process"; import { StringDecoder } from "string_decoder"; @@ -13,6 +14,10 @@ export interface WithEncoding { encoding: string; } +async function sleep(time: number) { + return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null; +} + /** * Convert an array into a Readable stream of its elements * @param array Array of elements to stream @@ -499,3 +504,96 @@ export function last(readable: Readable): Promise { .on("end", () => resolve(lastChunk)); }); } + +/** + * Stores chunks of data internally in array and batches when batchSize is reached. + * + * @param batchSize Size of the batches + */ +export function batch(batchSize: number) { + const buffer: any[] = []; + return new Transform({ + objectMode: true, + transform(chunk, encoding, callback) { + if (buffer.length === batchSize - 1) { + buffer.push(chunk); + callback(undefined, buffer.splice(0)); + } else { + buffer.push(chunk); + callback(); + } + }, + flush(callback) { + callback(undefined, buffer.splice(0)); + }, + }); +} + +/** + * Unbatches and sends individual chunks of data + */ +export function unbatch() { + return new Transform({ + objectMode: true, + transform(data, encoding, callback) { + for (const d of data) { + this.push(d); + } + callback(); + }, + }); +} + +/** + * Limits date of data transferred into stream. + * @param rate Desired rate in ms + */ +export function rate(targetRate: number) { + const deltaMS = (1 / targetRate) * 1000; + let total = 0; + const start = performance.now(); + return new Transform({ + objectMode: true, + async transform(data, encoding, callback) { + const currentRate = (total / (performance.now() - start)) * 1000; + if (targetRate && currentRate > targetRate) { + await sleep(deltaMS); + } + total += 1; + callback(undefined, data); + }, + }); +} + +/** + * Limits number of parallel processes in flight. + * @param parallel Max number of parallel processes. + * @param func Function to execute on each data chunk + */ +export function parallelMap(parallel: number, func: (data: T) => R) { + let inflight = 0; + return new Transform({ + objectMode: true, + async transform(data, encoding, callback) { + while (parallel <= inflight) { + await sleep(5); + } + inflight += 1; + callback(); + try { + const res = await func(data); + this.push(res); + } catch (e) { + this.emit(e); + } finally { + inflight -= 1; + } + }, + async flush(callback) { + while (inflight > 0) { + await sleep(5); + } + callback(); + }, + }); +} diff --git a/tslint.json b/tslint.json index b1c37a5..becd92c 100644 --- a/tslint.json +++ b/tslint.json @@ -7,7 +7,7 @@ "rules": { "no-console": false, "no-implicit-dependencies": [true, "dev"], - "prettier": true, + "prettier": [true, ".prettierrc"], "ordered-imports": false, "interface-name": false }