From eaeed0dde699f6bac2d41d02db596597744fb4e4 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Tue, 4 Jun 2019 10:21:15 -0400 Subject: [PATCH] Refactor lib structure --- src/functions/definitions.ts | 24 + .../functions.spec.ts} | 22 +- src/functions/functions.ts | 605 +++++++++++++++++ src/functions/index.ts | 137 ++++ src/helpers.ts | 3 + src/index.ts | 621 +----------------- 6 files changed, 803 insertions(+), 609 deletions(-) create mode 100644 src/functions/definitions.ts rename src/{index.spec.ts => functions/functions.spec.ts} (98%) create mode 100644 src/functions/functions.ts create mode 100644 src/functions/index.ts create mode 100644 src/helpers.ts diff --git a/src/functions/definitions.ts b/src/functions/definitions.ts new file mode 100644 index 0000000..fc97bd1 --- /dev/null +++ b/src/functions/definitions.ts @@ -0,0 +1,24 @@ +export interface ThroughOptions { + objectMode?: boolean; +} + +export interface TransformOptions { + readableObjectMode?: boolean; + writableObjectMode?: boolean; +} + +export interface WithEncoding { + encoding: string; +} + +export enum SerializationFormats { + utf8 = "utf8", +} + + +type JsonPrimitive = string | number | object; +export type JsonValue = JsonPrimitive | JsonPrimitive[]; + +export interface JsonParseOptions { + pretty: boolean; +} diff --git a/src/index.spec.ts b/src/functions/functions.spec.ts similarity index 98% rename from src/index.spec.ts rename to src/functions/functions.spec.ts index 051fdd7..50e5916 100644 --- a/src/index.spec.ts +++ b/src/functions/functions.spec.ts @@ -25,6 +25,7 @@ import { rate, parallelMap, } from "."; +import { SerializationFormats } from "./definitions"; test.cb("fromArray() streams array elements in flowing mode", t => { t.plan(3); @@ -682,7 +683,7 @@ test.cb("parse() parses the streamed elements as JSON", t => { const expectedElements = ["abc", {}, []]; let i = 0; source - .pipe(parse()) + .pipe(parse(SerializationFormats.utf8)) .on("data", part => { expect(part).to.deep.equal(expectedElements[i]); t.pass(); @@ -701,7 +702,7 @@ test.cb("parse() emits errors on invalid JSON", t => { t.plan(2); const source = new Readable({ objectMode: true }); source - .pipe(parse()) + .pipe(parse(SerializationFormats.utf8)) .resume() .on("error", () => t.pass()) .on("end", t.end); @@ -1235,7 +1236,7 @@ test.cb("unbatch() unbatches", t => { test.cb("rate() sends data at desired rate", t => { t.plan(9); - const fastRate = 500; + const fastRate = 150; const medRate = 50; const slowRate = 1; const sourceFast = new Readable({ objectMode: true }); @@ -1270,7 +1271,7 @@ test.cb("rate() sends data at desired rate", t => { .on("error", t.end); sourceSlow - .pipe(rate(slowRate)) + .pipe(rate(slowRate, 1)) .on("data", (element: string[]) => { const currentRate = (k / (performance.now() - start)) * 1000; expect(element).to.deep.equal(expectedElements[k]); @@ -1306,19 +1307,20 @@ test.cb("parallel() parallel mapping", t => { "e_processed", ]; const orderedResults: string[] = []; + // Record start / end times of each process and then compare to figure out # of processes ocurring and order source - .pipe(parallelMap(2, data => data + "_processed")) + .pipe(parallelMap(data => data + "_processed")) .on("data", (element: string) => { t.true(expectedElements.includes(element)); orderedResults.push(element); }) .on("error", t.end) .on("end", () => { - expect(orderedResults[0]).to.equal("a_processed") - expect(orderedResults[1]).to.equal("b_processed") - expect(orderedResults[2]).to.equal("d_processed") - expect(orderedResults[3]).to.equal("c_processed") - expect(orderedResults[4]).to.equal("e_processed") + expect(orderedResults[0]).to.equal("a_processed"); + expect(orderedResults[1]).to.equal("b_processed"); + expect(orderedResults[2]).to.equal("d_processed"); + expect(orderedResults[3]).to.equal("c_processed"); + expect(orderedResults[4]).to.equal("e_processed"); t.end(); }); diff --git a/src/functions/functions.ts b/src/functions/functions.ts new file mode 100644 index 0000000..0107110 --- /dev/null +++ b/src/functions/functions.ts @@ -0,0 +1,605 @@ +import { Transform, Readable, Writable, Duplex } from "stream"; +import { performance } from "perf_hooks"; +import { ChildProcess } from "child_process"; +import { StringDecoder } from "string_decoder"; + +import { + TransformOptions, + ThroughOptions, + WithEncoding, + SerializationFormats, + JsonValue, + JsonParseOptions, +} from "./definitions"; +import { sleep } from "../helpers"; + +/** + * Convert an array into a Readable stream of its elements + * @param array Array of elements to stream + */ +export function fromArray(array: any[]): NodeJS.ReadableStream { + let cursor = 0; + return new Readable({ + objectMode: true, + read() { + if (cursor < array.length) { + this.push(array[cursor]); + cursor++; + } else { + this.push(null); + } + }, + }); +} + +/** + * Return a ReadWrite stream that maps streamed chunks + * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) + * @param options + * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects + */ +export function map( + mapper: (chunk: T, encoding: string) => R, + options: TransformOptions = { + readableObjectMode: true, + writableObjectMode: true, + }, +): NodeJS.ReadWriteStream { + return new Transform({ + ...options, + async transform(chunk: T, encoding, callback) { + let isPromise = false; + try { + const mapped = mapper(chunk, encoding); + isPromise = mapped instanceof Promise; + callback(undefined, await mapped); + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + }); +} + +/** + * Return a ReadWrite stream that flat maps streamed chunks + * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) + * @param options + * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects + */ +export function flatMap( + mapper: + | ((chunk: T, encoding: string) => R[]) + | ((chunk: T, encoding: string) => Promise), + options: TransformOptions = { + readableObjectMode: true, + writableObjectMode: true, + }, +): NodeJS.ReadWriteStream { + return new Transform({ + ...options, + async transform(chunk: T, encoding, callback) { + let isPromise = false; + try { + const mapped = mapper(chunk, encoding); + isPromise = mapped instanceof Promise; + (await mapped).forEach(c => this.push(c)); + callback(); + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + }); +} + +/** + * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold + * @param predicate Predicate with which to filter scream chunks + * @param options + * @param options.objectMode Whether this stream should behave as a stream of objects + */ +export function filter( + predicate: + | ((chunk: T, encoding: string) => boolean) + | ((chunk: T, encoding: string) => Promise), + options: ThroughOptions = { + objectMode: true, + }, +) { + return new Transform({ + readableObjectMode: options.objectMode, + writableObjectMode: options.objectMode, + async transform(chunk: T, encoding, callback) { + let isPromise = false; + try { + const result = predicate(chunk, encoding); + isPromise = result instanceof Promise; + if (!!(await result)) { + callback(undefined, chunk); + } else { + callback(); + } + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + }); +} + +/** + * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that + * value + * @param iteratee Reducer function to apply on each streamed chunk + * @param initialValue Initial value + * @param options + * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects + */ +export function reduce( + iteratee: + | ((previousValue: R, chunk: T, encoding: string) => R) + | ((previousValue: R, chunk: T, encoding: string) => Promise), + initialValue: R, + options: TransformOptions = { + readableObjectMode: true, + writableObjectMode: true, + }, +) { + let value = initialValue; + return new Transform({ + readableObjectMode: options.readableObjectMode, + writableObjectMode: options.writableObjectMode, + async transform(chunk: T, encoding, callback) { + let isPromise = false; + try { + const result = iteratee(value, chunk, encoding); + isPromise = result instanceof Promise; + value = await result; + callback(); + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + flush(callback) { + // Best effort attempt at yielding the final value (will throw if e.g. yielding an object and + // downstream doesn't expect objects) + try { + callback(undefined, value); + } catch (err) { + try { + this.emit("error", err); + } catch { + // Best effort was made + } + } + }, + }); +} + +/** + * Return a ReadWrite stream that splits streamed chunks using the given separator + * @param separator Separator to split by, defaulting to "\n" + * @param options + * @param options.encoding Encoding written chunks are assumed to use + */ +export function split( + separator: string | RegExp = "\n", + options: WithEncoding = { encoding: "utf8" }, +): NodeJS.ReadWriteStream { + let buffered = ""; + const decoder = new StringDecoder(options.encoding); + + return new Transform({ + readableObjectMode: true, + transform(chunk: Buffer, encoding, callback) { + const asString = decoder.write(chunk); + const splitted = asString.split(separator); + if (splitted.length > 1) { + splitted[0] = buffered.concat(splitted[0]); + buffered = ""; + } + buffered += splitted[splitted.length - 1]; + splitted.slice(0, -1).forEach((part: string) => this.push(part)); + callback(); + }, + flush(callback) { + callback(undefined, buffered + decoder.end()); + }, + }); +} + +/** + * Return a ReadWrite stream that joins streamed chunks using the given separator + * @param separator Separator to join with + * @param options + * @param options.encoding Encoding written chunks are assumed to use + */ +export function join( + separator: string, + options: WithEncoding = { encoding: "utf8" }, +): NodeJS.ReadWriteStream { + let isFirstChunk = true; + const decoder = new StringDecoder(options.encoding); + return new Transform({ + readableObjectMode: true, + async transform(chunk: Buffer, encoding, callback) { + const asString = decoder.write(chunk); + // Take care not to break up multi-byte characters spanning multiple chunks + if (asString !== "" || chunk.length === 0) { + if (!isFirstChunk) { + this.push(separator); + } + this.push(asString); + isFirstChunk = false; + } + callback(); + }, + }); +} + +/** + * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in + * the streamed chunks with the specified replacement string + * @param searchValue Search string to use + * @param replaceValue Replacement string to use + * @param options + * @param options.encoding Encoding written chunks are assumed to use + */ +export function replace( + searchValue: string | RegExp, + replaceValue: string, + options: WithEncoding = { encoding: "utf8" }, +): NodeJS.ReadWriteStream { + const decoder = new StringDecoder(options.encoding); + return new Transform({ + readableObjectMode: true, + transform(chunk: Buffer, encoding, callback) { + const asString = decoder.write(chunk); + // Take care not to break up multi-byte characters spanning multiple chunks + if (asString !== "" || chunk.length === 0) { + callback( + undefined, + asString.replace(searchValue, replaceValue), + ); + } else { + callback(); + } + }, + }); +} + +/** + * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk + * must be a fully defined JSON string. + * @param format Format of serialized data, only utf8 supported. + */ +export function parse( + format: SerializationFormats = SerializationFormats.utf8, +): NodeJS.ReadWriteStream { + const decoder = new StringDecoder(format); + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + async transform(chunk: Buffer, encoding, callback) { + try { + const asString = decoder.write(chunk); + // Using await causes parsing errors to be emitted + callback(undefined, await JSON.parse(asString)); + } catch (err) { + callback(err); + } + }, + }); +} + +/** + * Return a ReadWrite stream that stringifies the streamed chunks to JSON + */ +export function stringify( + options: JsonParseOptions = { pretty: false }, +): NodeJS.ReadWriteStream { + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + transform(chunk: JsonValue, encoding, callback) { + callback( + undefined, + options.pretty + ? JSON.stringify(chunk, null, 2) + : JSON.stringify(chunk), + ); + }, + }); +} + +/** + * Return a ReadWrite stream that collects streamed chunks into an array or buffer + * @param options + * @param options.objectMode Whether this stream should behave as a stream of objects + */ +export function collect( + options: ThroughOptions = { objectMode: false }, +): NodeJS.ReadWriteStream { + const collected: any[] = []; + return new Transform({ + readableObjectMode: options.objectMode, + writableObjectMode: options.objectMode, + transform(data, encoding, callback) { + collected.push(data); + callback(); + }, + flush(callback) { + this.push( + options.objectMode ? collected : Buffer.concat(collected), + ); + callback(); + }, + }); +} + +/** + * Return a Readable stream of readable streams concatenated together + * @param streams Readable streams to concatenate + */ +export function concat( + ...streams: NodeJS.ReadableStream[] +): NodeJS.ReadableStream { + let isStarted = false; + let currentStreamIndex = 0; + const startCurrentStream = () => { + if (currentStreamIndex >= streams.length) { + wrapper.push(null); + } else { + streams[currentStreamIndex] + .on("data", chunk => { + if (!wrapper.push(chunk)) { + streams[currentStreamIndex].pause(); + } + }) + .on("error", err => wrapper.emit("error", err)) + .on("end", () => { + currentStreamIndex++; + startCurrentStream(); + }); + } + }; + + const wrapper = new Readable({ + objectMode: true, + read() { + if (!isStarted) { + isStarted = true; + startCurrentStream(); + } + if (currentStreamIndex < streams.length) { + streams[currentStreamIndex].resume(); + } + }, + }); + return wrapper; +} + +/** + * Return a Readable stream of readable streams merged together in chunk arrival order + * @param streams Readable streams to merge + */ +export function merge( + ...streams: NodeJS.ReadableStream[] +): NodeJS.ReadableStream { + let isStarted = false; + let streamEndedCount = 0; + return new Readable({ + objectMode: true, + read() { + if (streamEndedCount >= streams.length) { + this.push(null); + } else if (!isStarted) { + isStarted = true; + streams.forEach(stream => + stream + .on("data", chunk => { + if (!this.push(chunk)) { + streams.forEach(s => s.pause()); + } + }) + .on("error", err => this.emit("error", err)) + .on("end", () => { + streamEndedCount++; + if (streamEndedCount === streams.length) { + this.push(null); + } + }), + ); + } else { + streams.forEach(s => s.resume()); + } + }, + }); +} + +/** + * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, + * cause the given readable stream to yield chunks + * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to + * @param readable Readable stream assumed to yield chunks when the writable stream is written to + */ +export function duplex(writable: Writable, readable: Readable) { + const wrapper = new Duplex({ + readableObjectMode: true, + writableObjectMode: true, + read() { + readable.resume(); + }, + write(chunk, encoding, callback) { + return writable.write(chunk, encoding, callback); + }, + final(callback) { + writable.end(callback); + }, + }); + readable + .on("data", chunk => { + if (!wrapper.push(chunk)) { + readable.pause(); + } + }) + .on("error", err => wrapper.emit("error", err)) + .on("end", () => wrapper.push(null)); + writable.on("drain", () => wrapper.emit("drain")); + writable.on("error", err => wrapper.emit("error", err)); + return wrapper; +} + +/** + * Return a Duplex stream from a child process' stdin and stdout + * @param childProcess Child process from which to create duplex stream + */ +export function child(childProcess: ChildProcess) { + return duplex(childProcess.stdin, childProcess.stdout); +} + +/** + * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has + * ended + * @param readable Readable stream to wait on + */ +export function last(readable: Readable): Promise { + let lastChunk: T | null = null; + return new Promise((resolve, reject) => { + readable + .on("data", chunk => (lastChunk = chunk)) + .on("end", () => resolve(lastChunk)); + }); +} + +/** + * Stores chunks of data internally in array and batches when batchSize is reached. + * + * @param batchSize Size of the batches + * @param maxBatchAge Max lifetime of a batch + */ +export function batch(batchSize: number = 1000, maxBatchAge: number = 500) { + const buffer: any[] = []; + let startTime: number | null = null; + return new Transform({ + objectMode: true, + transform(chunk, encoding, callback) { + if ( + buffer.length === batchSize - 1 || + (startTime !== null && + startTime - performance.now() >= maxBatchAge) + ) { + buffer.push(chunk); + callback(undefined, buffer.splice(0)); + } else { + if (startTime === null) { + startTime = performance.now(); + } + buffer.push(chunk); + callback(); + } + }, + flush(callback) { + callback(undefined, buffer.splice(0)); + }, + }); +} + +/** + * Unbatches and sends individual chunks of data + */ +export function unbatch() { + return new Transform({ + objectMode: true, + transform(data, encoding, callback) { + for (const d of data) { + this.push(d); + } + callback(); + }, + }); +} + +/** + * Limits date of data transferred into stream. + * @param rate Desired rate in ms + */ +export function rate(targetRate: number = 50, period: number = 2) { + const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period + let total = 0; + const start = performance.now(); + return new Transform({ + objectMode: true, + async transform(data, encoding, callback) { + const currentRate = (total / (performance.now() - start)) * 1000; + if (targetRate && currentRate > targetRate) { + await sleep(deltaMS); + } + total += 1; + callback(undefined, data); + }, + }); +} + +/** + * Limits number of parallel processes in flight. + * @param parallel Max number of parallel processes. + * @param func Function to execute on each data chunk + * @param pause Amount of time to pause processing when max number of parallel processes are executing. + */ +export function parallelMap( + mapper: (data: T) => R, + parallel: number = 10, + sleepTime: number = 5, +) { + let inflight = 0; + return new Transform({ + objectMode: true, + async transform(data, encoding, callback) { + while (parallel <= inflight) { + await sleep(sleepTime); + } + inflight += 1; + callback(); + try { + const res = await mapper(data); + this.push(res); + } catch (e) { + this.emit(e); + } finally { + inflight -= 1; + } + }, + async flush(callback) { + while (inflight > 0) { + await sleep(sleepTime); + } + callback(); + }, + }); +} diff --git a/src/functions/index.ts b/src/functions/index.ts new file mode 100644 index 0000000..f9c1e58 --- /dev/null +++ b/src/functions/index.ts @@ -0,0 +1,137 @@ +import { Readable, Writable } from "stream"; +import { ChildProcess } from "child_process"; +import * as baseFunctions from "./functions"; + +import { + ThroughOptions, + TransformOptions, + WithEncoding, + SerializationFormats, + JsonParseOptions, +} from "./definitions"; + +export function fromArray(array: any[]): NodeJS.ReadableStream { + return baseFunctions.fromArray(array); +} + +export function map( + mapper: (chunk: T, encoding?: string) => R, + options?: TransformOptions, +): NodeJS.ReadWriteStream { + return baseFunctions.map(mapper, options); +} + +export function flatMap( + mapper: + | ((chunk: T, encoding: string) => R[]) + | ((chunk: T, encoding: string) => Promise), + options?: TransformOptions, +): NodeJS.ReadWriteStream { + return baseFunctions.flatMap(mapper, options); +} + +export function filter( + mapper: + | ((chunk: T, encoding: string) => boolean) + | ((chunk: T, encoding: string) => Promise), + options?: ThroughOptions, +): NodeJS.ReadWriteStream { + return baseFunctions.filter(mapper, options); +} + +export function reduce( + iteratee: + | ((previousValue: R, chunk: T, encoding: string) => R) + | ((previousValue: R, chunk: T, encoding: string) => Promise), + initialValue: R, + options?: TransformOptions, +): NodeJS.ReadWriteStream { + return baseFunctions.reduce(iteratee, initialValue, options); +} + +export function split( + separator?: string | RegExp, + options?: WithEncoding, +): NodeJS.ReadWriteStream { + return baseFunctions.split(separator, options); +} + +export function join( + separator: string, + options?: WithEncoding, +): NodeJS.ReadWriteStream { + return baseFunctions.join(separator, options); +} + +export function replace( + searchValue: string | RegExp, + replaceValue: string, + options?: WithEncoding, +): NodeJS.ReadWriteStream { + return baseFunctions.replace(searchValue, replaceValue, options); +} + +export function parse(format: SerializationFormats): NodeJS.ReadWriteStream { + return baseFunctions.parse(format); +} + +export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream { + return baseFunctions.stringify(options); +} + +export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream { + return baseFunctions.collect(options); +} + +export function concat( + ...streams: NodeJS.ReadableStream[] +): NodeJS.ReadableStream { + return baseFunctions.concat(...streams); +} + +export function merge( + ...streams: NodeJS.ReadableStream[] +): NodeJS.ReadableStream { + return baseFunctions.merge(...streams); +} + +export function duplex( + writable: Writable, + readable: Readable, +): NodeJS.ReadWriteStream { + return baseFunctions.duplex(writable, readable); +} + +export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream { + return baseFunctions.child(childProcess); +} + +export function last(readable: Readable): Promise { + return baseFunctions.last(readable); +} + +export function batch( + batchSize?: number, + maxBatchAge?: number, +): NodeJS.ReadWriteStream { + return baseFunctions.batch(batchSize, maxBatchAge); +} + +export function unbatch(): NodeJS.ReadWriteStream { + return baseFunctions.unbatch(); +} + +export function rate( + targetRate?: number, + period?: number, +): NodeJS.ReadWriteStream { + return baseFunctions.rate(targetRate, period); +} + +export function parallelMap( + mapper: (data: T) => R, + parallel?: number, + sleepTime?: number, +) { + return baseFunctions.parallelMap(mapper, parallel, sleepTime); +} diff --git a/src/helpers.ts b/src/helpers.ts new file mode 100644 index 0000000..242d264 --- /dev/null +++ b/src/helpers.ts @@ -0,0 +1,3 @@ +export async function sleep(time: number): Promise<{} | null> { + return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null; +} diff --git a/src/index.ts b/src/index.ts index dfbee1b..e9f1369 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,599 +1,22 @@ -import { Transform, Readable, Writable, Duplex } from "stream"; -import { performance } from "perf_hooks"; -import { ChildProcess } from "child_process"; -import { StringDecoder } from "string_decoder"; - -export interface ThroughOptions { - objectMode?: boolean; -} -export interface TransformOptions { - readableObjectMode?: boolean; - writableObjectMode?: boolean; -} -export interface WithEncoding { - encoding: string; -} - -async function sleep(time: number) { - return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null; -} - -/** - * Convert an array into a Readable stream of its elements - * @param array Array of elements to stream - */ -export function fromArray(array: any[]): NodeJS.ReadableStream { - let cursor = 0; - return new Readable({ - objectMode: true, - read() { - if (cursor < array.length) { - this.push(array[cursor]); - cursor++; - } else { - this.push(null); - } - }, - }); -} - -/** - * Return a ReadWrite stream that maps streamed chunks - * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) - * @param options - * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects - */ -export function map( - mapper: (chunk: T, encoding: string) => R, - options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, - }, -): NodeJS.ReadWriteStream { - return new Transform({ - ...options, - async transform(chunk: T, encoding, callback) { - let isPromise = false; - try { - const mapped = mapper(chunk, encoding); - isPromise = mapped instanceof Promise; - callback(undefined, await mapped); - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } - } - }, - }); -} - -/** - * Return a ReadWrite stream that flat maps streamed chunks - * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) - * @param options - * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects - */ -export function flatMap( - mapper: - | ((chunk: T, encoding: string) => R[]) - | ((chunk: T, encoding: string) => Promise), - options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, - }, -): NodeJS.ReadWriteStream { - return new Transform({ - ...options, - async transform(chunk: T, encoding, callback) { - let isPromise = false; - try { - const mapped = mapper(chunk, encoding); - isPromise = mapped instanceof Promise; - (await mapped).forEach(c => this.push(c)); - callback(); - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } - } - }, - }); -} - -/** - * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold - * @param predicate Predicate with which to filter scream chunks - * @param options - * @param options.objectMode Whether this stream should behave as a stream of objects - */ -export function filter( - predicate: - | ((chunk: T, encoding: string) => boolean) - | ((chunk: T, encoding: string) => Promise), - options: ThroughOptions = { - objectMode: true, - }, -) { - return new Transform({ - readableObjectMode: options.objectMode, - writableObjectMode: options.objectMode, - async transform(chunk: T, encoding, callback) { - let isPromise = false; - try { - const result = predicate(chunk, encoding); - isPromise = result instanceof Promise; - if (!!(await result)) { - callback(undefined, chunk); - } else { - callback(); - } - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } - } - }, - }); -} - -/** - * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that - * value - * @param iteratee Reducer function to apply on each streamed chunk - * @param initialValue Initial value - * @param options - * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects - */ -export function reduce( - iteratee: - | ((previousValue: R, chunk: T, encoding: string) => R) - | ((previousValue: R, chunk: T, encoding: string) => Promise), - initialValue: R, - options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, - }, -) { - let value = initialValue; - return new Transform({ - readableObjectMode: options.readableObjectMode, - writableObjectMode: options.writableObjectMode, - async transform(chunk: T, encoding, callback) { - let isPromise = false; - try { - const result = iteratee(value, chunk, encoding); - isPromise = result instanceof Promise; - value = await result; - callback(); - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } - } - }, - flush(callback) { - // Best effort attempt at yielding the final value (will throw if e.g. yielding an object and - // downstream doesn't expect objects) - try { - callback(undefined, value); - } catch (err) { - try { - this.emit("error", err); - } catch { - // Best effort was made - } - } - }, - }); -} - -/** - * Return a ReadWrite stream that splits streamed chunks using the given separator - * @param separator Separator to split by, defaulting to "\n" - * @param options - * @param options.encoding Encoding written chunks are assumed to use - */ -export function split( - separator: string | RegExp = "\n", - options: WithEncoding = { encoding: "utf8" }, -): NodeJS.ReadWriteStream { - let buffered = ""; - const decoder = new StringDecoder(options.encoding); - - return new Transform({ - readableObjectMode: true, - transform(chunk: Buffer, encoding, callback) { - const asString = decoder.write(chunk); - const splitted = asString.split(separator); - if (splitted.length > 1) { - splitted[0] = buffered.concat(splitted[0]); - buffered = ""; - } - buffered += splitted[splitted.length - 1]; - splitted.slice(0, -1).forEach((part: string) => this.push(part)); - callback(); - }, - flush(callback) { - callback(undefined, buffered + decoder.end()); - }, - }); -} - -/** - * Return a ReadWrite stream that joins streamed chunks using the given separator - * @param separator Separator to join with - * @param options - * @param options.encoding Encoding written chunks are assumed to use - */ -export function join( - separator: string, - options: WithEncoding = { encoding: "utf8" }, -): NodeJS.ReadWriteStream { - let isFirstChunk = true; - const decoder = new StringDecoder(options.encoding); - return new Transform({ - readableObjectMode: true, - async transform(chunk: Buffer, encoding, callback) { - const asString = decoder.write(chunk); - // Take care not to break up multi-byte characters spanning multiple chunks - if (asString !== "" || chunk.length === 0) { - if (!isFirstChunk) { - this.push(separator); - } - this.push(asString); - isFirstChunk = false; - } - callback(); - }, - }); -} - -/** - * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in - * the streamed chunks with the specified replacement string - * @param searchValue Search string to use - * @param replaceValue Replacement string to use - * @param options - * @param options.encoding Encoding written chunks are assumed to use - */ -export function replace( - searchValue: string | RegExp, - replaceValue: string, - options: WithEncoding = { encoding: "utf8" }, -): NodeJS.ReadWriteStream { - const decoder = new StringDecoder(options.encoding); - return new Transform({ - readableObjectMode: true, - transform(chunk: Buffer, encoding, callback) { - const asString = decoder.write(chunk); - // Take care not to break up multi-byte characters spanning multiple chunks - if (asString !== "" || chunk.length === 0) { - callback( - undefined, - asString.replace(searchValue, replaceValue), - ); - } else { - callback(); - } - }, - }); -} - -/** - * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk - * must be a fully defined JSON string. - */ -export function parse(): NodeJS.ReadWriteStream { - const decoder = new StringDecoder("utf8"); // JSON must be utf8 - return new Transform({ - readableObjectMode: true, - writableObjectMode: true, - async transform(chunk: Buffer, encoding, callback) { - try { - const asString = decoder.write(chunk); - // Using await causes parsing errors to be emitted - callback(undefined, await JSON.parse(asString)); - } catch (err) { - callback(err); - } - }, - }); -} - -type JsonPrimitive = string | number | object; -type JsonValue = JsonPrimitive | JsonPrimitive[]; -interface JsonParseOptions { - pretty: boolean; -} - -/** - * Return a ReadWrite stream that stringifies the streamed chunks to JSON - */ -export function stringify( - options: JsonParseOptions = { pretty: false }, -): NodeJS.ReadWriteStream { - return new Transform({ - readableObjectMode: true, - writableObjectMode: true, - transform(chunk: JsonValue, encoding, callback) { - callback( - undefined, - options.pretty - ? JSON.stringify(chunk, null, 2) - : JSON.stringify(chunk), - ); - }, - }); -} - -/** - * Return a ReadWrite stream that collects streamed chunks into an array or buffer - * @param options - * @param options.objectMode Whether this stream should behave as a stream of objects - */ -export function collect( - options: ThroughOptions = { objectMode: false }, -): NodeJS.ReadWriteStream { - const collected: any[] = []; - return new Transform({ - readableObjectMode: options.objectMode, - writableObjectMode: options.objectMode, - transform(data, encoding, callback) { - collected.push(data); - callback(); - }, - flush(callback) { - this.push( - options.objectMode ? collected : Buffer.concat(collected), - ); - callback(); - }, - }); -} - -/** - * Return a Readable stream of readable streams concatenated together - * @param streams Readable streams to concatenate - */ -export function concat( - ...streams: NodeJS.ReadableStream[] -): NodeJS.ReadableStream { - let isStarted = false; - let currentStreamIndex = 0; - const startCurrentStream = () => { - if (currentStreamIndex >= streams.length) { - wrapper.push(null); - } else { - streams[currentStreamIndex] - .on("data", chunk => { - if (!wrapper.push(chunk)) { - streams[currentStreamIndex].pause(); - } - }) - .on("error", err => wrapper.emit("error", err)) - .on("end", () => { - currentStreamIndex++; - startCurrentStream(); - }); - } - }; - - const wrapper = new Readable({ - objectMode: true, - read() { - if (!isStarted) { - isStarted = true; - startCurrentStream(); - } - if (currentStreamIndex < streams.length) { - streams[currentStreamIndex].resume(); - } - }, - }); - return wrapper; -} - -/** - * Return a Readable stream of readable streams merged together in chunk arrival order - * @param streams Readable streams to merge - */ -export function merge( - ...streams: NodeJS.ReadableStream[] -): NodeJS.ReadableStream { - let isStarted = false; - let streamEndedCount = 0; - return new Readable({ - objectMode: true, - read() { - if (streamEndedCount >= streams.length) { - this.push(null); - } else if (!isStarted) { - isStarted = true; - streams.forEach(stream => - stream - .on("data", chunk => { - if (!this.push(chunk)) { - streams.forEach(s => s.pause()); - } - }) - .on("error", err => this.emit("error", err)) - .on("end", () => { - streamEndedCount++; - if (streamEndedCount === streams.length) { - this.push(null); - } - }), - ); - } else { - streams.forEach(s => s.resume()); - } - }, - }); -} - -/** - * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, - * cause the given readable stream to yield chunks - * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to - * @param readable Readable stream assumed to yield chunks when the writable stream is written to - */ -export function duplex(writable: Writable, readable: Readable) { - const wrapper = new Duplex({ - readableObjectMode: true, - writableObjectMode: true, - read() { - readable.resume(); - }, - write(chunk, encoding, callback) { - return writable.write(chunk, encoding, callback); - }, - final(callback) { - writable.end(callback); - }, - }); - readable - .on("data", chunk => { - if (!wrapper.push(chunk)) { - readable.pause(); - } - }) - .on("error", err => wrapper.emit("error", err)) - .on("end", () => wrapper.push(null)); - writable.on("drain", () => wrapper.emit("drain")); - writable.on("error", err => wrapper.emit("error", err)); - return wrapper; -} - -/** - * Return a Duplex stream from a child process' stdin and stdout - * @param childProcess Child process from which to create duplex stream - */ -export function child(childProcess: ChildProcess) { - return duplex(childProcess.stdin, childProcess.stdout); -} - -/** - * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has - * ended - * @param readable Readable stream to wait on - */ -export function last(readable: Readable): Promise { - let lastChunk: T | null = null; - return new Promise((resolve, reject) => { - readable - .on("data", chunk => (lastChunk = chunk)) - .on("end", () => resolve(lastChunk)); - }); -} - -/** - * Stores chunks of data internally in array and batches when batchSize is reached. - * - * @param batchSize Size of the batches - */ -export function batch(batchSize: number) { - const buffer: any[] = []; - return new Transform({ - objectMode: true, - transform(chunk, encoding, callback) { - if (buffer.length === batchSize - 1) { - buffer.push(chunk); - callback(undefined, buffer.splice(0)); - } else { - buffer.push(chunk); - callback(); - } - }, - flush(callback) { - callback(undefined, buffer.splice(0)); - }, - }); -} - -/** - * Unbatches and sends individual chunks of data - */ -export function unbatch() { - return new Transform({ - objectMode: true, - transform(data, encoding, callback) { - for (const d of data) { - this.push(d); - } - callback(); - }, - }); -} - -/** - * Limits date of data transferred into stream. - * @param rate Desired rate in ms - */ -export function rate(targetRate: number) { - const deltaMS = (1 / targetRate) * 1000; - let total = 0; - const start = performance.now(); - return new Transform({ - objectMode: true, - async transform(data, encoding, callback) { - const currentRate = (total / (performance.now() - start)) * 1000; - if (targetRate && currentRate > targetRate) { - await sleep(deltaMS); - } - total += 1; - callback(undefined, data); - }, - }); -} - -/** - * Limits number of parallel processes in flight. - * @param parallel Max number of parallel processes. - * @param func Function to execute on each data chunk - */ -export function parallelMap(parallel: number, func: (data: T) => R) { - let inflight = 0; - return new Transform({ - objectMode: true, - async transform(data, encoding, callback) { - while (parallel <= inflight) { - await sleep(5); - } - inflight += 1; - callback(); - try { - const res = await func(data); - this.push(res); - } catch (e) { - this.emit(e); - } finally { - inflight -= 1; - } - }, - async flush(callback) { - while (inflight > 0) { - await sleep(5); - } - callback(); - }, - }); -} +export { + fromArray, + map, + flatMap, + filter, + reduce, + split, + join, + replace, + parse, + stringify, + collect, + concat, + merge, + duplex, + child, + last, + batch, + unbatch, + rate, + parallelMap, +} from "./functions";