diff --git a/src/functions/definitions.ts b/src/functions/definitions.ts index fc97bd1..f18659a 100644 --- a/src/functions/definitions.ts +++ b/src/functions/definitions.ts @@ -15,10 +15,25 @@ export enum SerializationFormats { utf8 = "utf8", } - type JsonPrimitive = string | number | object; export type JsonValue = JsonPrimitive | JsonPrimitive[]; export interface JsonParseOptions { pretty: boolean; } + +export interface IBatchParams { + batchSize?: number; + maxBatchAge?: number; +} + +export interface IRateParams { + targetRate?: number; + period?: number; +} + +export interface IParallelMapParams { + mapper: (data: T) => R; + parallel?: number; + sleepTime?: number; +} diff --git a/src/functions/functions.ts b/src/functions/functions.ts index 0107110..be2afff 100644 --- a/src/functions/functions.ts +++ b/src/functions/functions.ts @@ -547,7 +547,8 @@ export function unbatch() { /** * Limits date of data transferred into stream. - * @param rate Desired rate in ms + * @param targetRate Desired rate in ms + * @param period Period to sleep for when rate is above or equal to targetRate */ export function rate(targetRate: number = 50, period: number = 2) { const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period diff --git a/src/functions/index.ts b/src/functions/index.ts index f9c1e58..1ad0f10 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -6,14 +6,27 @@ import { ThroughOptions, TransformOptions, WithEncoding, - SerializationFormats, JsonParseOptions, + IBatchParams, + IRateParams, + IParallelMapParams, } from "./definitions"; +/** + * Convert an array into a Readable stream of its elements + * @param array Array of elements to stream + */ export function fromArray(array: any[]): NodeJS.ReadableStream { return baseFunctions.fromArray(array); } +/** + * Return a ReadWrite stream that maps streamed chunks + * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) + * @param options? + * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects + */ export function map( mapper: (chunk: T, encoding?: string) => R, options?: TransformOptions, @@ -21,6 +34,13 @@ export function map( return baseFunctions.map(mapper, options); } +/** + * Return a ReadWrite stream that flat maps streamed chunks + * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) + * @param options? + * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects + */ export function flatMap( mapper: | ((chunk: T, encoding: string) => R[]) @@ -30,6 +50,12 @@ export function flatMap( return baseFunctions.flatMap(mapper, options); } +/** + * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold + * @param predicate Predicate with which to filter scream chunks + * @param options? + * @param options.objectMode? Whether this stream should behave as a stream of objects. + */ export function filter( mapper: | ((chunk: T, encoding: string) => boolean) @@ -39,6 +65,15 @@ export function filter( return baseFunctions.filter(mapper, options); } +/** + * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that + * value + * @param iteratee Reducer function to apply on each streamed chunk + * @param initialValue Initial value + * @param options? + * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects + */ export function reduce( iteratee: | ((previousValue: R, chunk: T, encoding: string) => R) @@ -49,6 +84,12 @@ export function reduce( return baseFunctions.reduce(iteratee, initialValue, options); } +/** + * Return a ReadWrite stream that splits streamed chunks using the given separator + * @param separator? Separator to split by, defaulting to "\n" + * @param options? Defaults to encoding: utf8 + * @param options.encoding? Encoding written chunks are assumed to use + */ export function split( separator?: string | RegExp, options?: WithEncoding, @@ -56,6 +97,12 @@ export function split( return baseFunctions.split(separator, options); } +/** + * Return a ReadWrite stream that joins streamed chunks using the given separator + * @param separator Separator to join with + * @param options? Defaults to encoding: utf8 + * @param options.encoding? Encoding written chunks are assumed to use + */ export function join( separator: string, options?: WithEncoding, @@ -63,6 +110,14 @@ export function join( return baseFunctions.join(separator, options); } +/** + * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in + * the streamed chunks with the specified replacement string + * @param searchValue Search string to use + * @param replaceValue Replacement string to use + * @param options? Defaults to encoding: utf8 + * @param options.encoding Encoding written chunks are assumed to use + */ export function replace( searchValue: string | RegExp, replaceValue: string, @@ -71,30 +126,59 @@ export function replace( return baseFunctions.replace(searchValue, replaceValue, options); } -export function parse(format: SerializationFormats): NodeJS.ReadWriteStream { - return baseFunctions.parse(format); +/** + * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk + * must be a fully defined JSON string in utf8. + */ +export function parse(): NodeJS.ReadWriteStream { + return baseFunctions.parse(); } +/** + * Return a ReadWrite stream that stringifies the streamed chunks to JSON + * @param options? + * @param options.pretty If true, whitespace is inserted into the stringified chunks. + * + */ export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream { return baseFunctions.stringify(options); } +/** + * Return a ReadWrite stream that collects streamed chunks into an array or buffer + * @param options? + * @param options.objectMode? Whether this stream should behave as a stream of objects + */ export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream { return baseFunctions.collect(options); } +/** + * Return a Readable stream of readable streams concatenated together + * @param streams Readable streams to concatenate + */ export function concat( ...streams: NodeJS.ReadableStream[] ): NodeJS.ReadableStream { return baseFunctions.concat(...streams); } +/** + * Return a Readable stream of readable streams concatenated together + * @param streams Readable streams to merge + */ export function merge( ...streams: NodeJS.ReadableStream[] ): NodeJS.ReadableStream { return baseFunctions.merge(...streams); } +/** + * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, + * cause the given readable stream to yield chunks + * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to + * @param readable Readable stream assumed to yield chunks when the writable stream is written to + */ export function duplex( writable: Writable, readable: Readable, @@ -102,36 +186,65 @@ export function duplex( return baseFunctions.duplex(writable, readable); } +/** + * Return a Duplex stream from a child process' stdin and stdout + * @param childProcess Child process from which to create duplex stream + */ export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream { return baseFunctions.child(childProcess); } +/** + * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has + * ended + * @param readable Readable stream to wait on + */ export function last(readable: Readable): Promise { return baseFunctions.last(readable); } -export function batch( - batchSize?: number, - maxBatchAge?: number, -): NodeJS.ReadWriteStream { +/** + * Stores chunks of data internally in array and batches when batchSize is reached. + * + * @param batchSize? Size of the batches, defaults to 1000. + * @param maxBatchAge? Max lifetime of a batch, defaults to 500 + */ +export function batch({ + batchSize, + maxBatchAge, +}: IBatchParams): NodeJS.ReadWriteStream { return baseFunctions.batch(batchSize, maxBatchAge); } +/** + * Unbatches and sends individual chunks of data + */ export function unbatch(): NodeJS.ReadWriteStream { return baseFunctions.unbatch(); } -export function rate( - targetRate?: number, - period?: number, -): NodeJS.ReadWriteStream { +/** + * Limits date of data transferred into stream. + * @param targetRate? Desired rate in ms + * @param period? Period to sleep for when rate is above or equal to targetRate + */ +export function rate({ + targetRate, + period, +}: IRateParams): NodeJS.ReadWriteStream { return baseFunctions.rate(targetRate, period); } -export function parallelMap( - mapper: (data: T) => R, - parallel?: number, - sleepTime?: number, -) { +/** + * Limits number of parallel processes in flight. + * @param parallel Max number of parallel processes. + * @param func Function to execute on each data chunk + * @param pause Amount of time to pause processing when max number of parallel processes are executing. + */ +export function parallelMap({ + mapper, + parallel, + sleepTime, +}: IParallelMapParams) { return baseFunctions.parallelMap(mapper, parallel, sleepTime); }