Add description + interfaces

This commit is contained in:
Jerry Kurian 2019-06-04 11:25:33 -04:00
parent eaeed0dde6
commit 5eeae17559
3 changed files with 147 additions and 18 deletions

View File

@ -15,10 +15,25 @@ export enum SerializationFormats {
utf8 = "utf8", utf8 = "utf8",
} }
type JsonPrimitive = string | number | object; type JsonPrimitive = string | number | object;
export type JsonValue = JsonPrimitive | JsonPrimitive[]; export type JsonValue = JsonPrimitive | JsonPrimitive[];
export interface JsonParseOptions { export interface JsonParseOptions {
pretty: boolean; pretty: boolean;
} }
export interface IBatchParams {
batchSize?: number;
maxBatchAge?: number;
}
export interface IRateParams {
targetRate?: number;
period?: number;
}
export interface IParallelMapParams<T, R> {
mapper: (data: T) => R;
parallel?: number;
sleepTime?: number;
}

View File

@ -547,7 +547,8 @@ export function unbatch() {
/** /**
* Limits date of data transferred into stream. * Limits date of data transferred into stream.
* @param rate Desired rate in ms * @param targetRate Desired rate in ms
* @param period Period to sleep for when rate is above or equal to targetRate
*/ */
export function rate(targetRate: number = 50, period: number = 2) { export function rate(targetRate: number = 50, period: number = 2) {
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period

View File

@ -6,14 +6,27 @@ import {
ThroughOptions, ThroughOptions,
TransformOptions, TransformOptions,
WithEncoding, WithEncoding,
SerializationFormats,
JsonParseOptions, JsonParseOptions,
IBatchParams,
IRateParams,
IParallelMapParams,
} from "./definitions"; } from "./definitions";
/**
* Convert an array into a Readable stream of its elements
* @param array Array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream { export function fromArray(array: any[]): NodeJS.ReadableStream {
return baseFunctions.fromArray(array); return baseFunctions.fromArray(array);
} }
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>( export function map<T, R>(
mapper: (chunk: T, encoding?: string) => R, mapper: (chunk: T, encoding?: string) => R,
options?: TransformOptions, options?: TransformOptions,
@ -21,6 +34,13 @@ export function map<T, R>(
return baseFunctions.map(mapper, options); return baseFunctions.map(mapper, options);
} }
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>( export function flatMap<T, R>(
mapper: mapper:
| ((chunk: T, encoding: string) => R[]) | ((chunk: T, encoding: string) => R[])
@ -30,6 +50,12 @@ export function flatMap<T, R>(
return baseFunctions.flatMap(mapper, options); return baseFunctions.flatMap(mapper, options);
} }
/**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects.
*/
export function filter<T>( export function filter<T>(
mapper: mapper:
| ((chunk: T, encoding: string) => boolean) | ((chunk: T, encoding: string) => boolean)
@ -39,6 +65,15 @@ export function filter<T>(
return baseFunctions.filter(mapper, options); return baseFunctions.filter(mapper, options);
} }
/**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value
* @param iteratee Reducer function to apply on each streamed chunk
* @param initialValue Initial value
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
export function reduce<T, R>( export function reduce<T, R>(
iteratee: iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R) | ((previousValue: R, chunk: T, encoding: string) => R)
@ -49,6 +84,12 @@ export function reduce<T, R>(
return baseFunctions.reduce(iteratee, initialValue, options); return baseFunctions.reduce(iteratee, initialValue, options);
} }
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator? Separator to split by, defaulting to "\n"
* @param options? Defaults to encoding: utf8
* @param options.encoding? Encoding written chunks are assumed to use
*/
export function split( export function split(
separator?: string | RegExp, separator?: string | RegExp,
options?: WithEncoding, options?: WithEncoding,
@ -56,6 +97,12 @@ export function split(
return baseFunctions.split(separator, options); return baseFunctions.split(separator, options);
} }
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options? Defaults to encoding: utf8
* @param options.encoding? Encoding written chunks are assumed to use
*/
export function join( export function join(
separator: string, separator: string,
options?: WithEncoding, options?: WithEncoding,
@ -63,6 +110,14 @@ export function join(
return baseFunctions.join(separator, options); return baseFunctions.join(separator, options);
} }
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options? Defaults to encoding: utf8
* @param options.encoding Encoding written chunks are assumed to use
*/
export function replace( export function replace(
searchValue: string | RegExp, searchValue: string | RegExp,
replaceValue: string, replaceValue: string,
@ -71,30 +126,59 @@ export function replace(
return baseFunctions.replace(searchValue, replaceValue, options); return baseFunctions.replace(searchValue, replaceValue, options);
} }
export function parse(format: SerializationFormats): NodeJS.ReadWriteStream { /**
return baseFunctions.parse(format); * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string in utf8.
*/
export function parse(): NodeJS.ReadWriteStream {
return baseFunctions.parse();
} }
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
* @param options?
* @param options.pretty If true, whitespace is inserted into the stringified chunks.
*
*/
export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream { export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream {
return baseFunctions.stringify(options); return baseFunctions.stringify(options);
} }
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects
*/
export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream { export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream {
return baseFunctions.collect(options); return baseFunctions.collect(options);
} }
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
export function concat( export function concat(
...streams: NodeJS.ReadableStream[] ...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream { ): NodeJS.ReadableStream {
return baseFunctions.concat(...streams); return baseFunctions.concat(...streams);
} }
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to merge
*/
export function merge( export function merge(
...streams: NodeJS.ReadableStream[] ...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream { ): NodeJS.ReadableStream {
return baseFunctions.merge(...streams); return baseFunctions.merge(...streams);
} }
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
export function duplex( export function duplex(
writable: Writable, writable: Writable,
readable: Readable, readable: Readable,
@ -102,36 +186,65 @@ export function duplex(
return baseFunctions.duplex(writable, readable); return baseFunctions.duplex(writable, readable);
} }
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream { export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream {
return baseFunctions.child(childProcess); return baseFunctions.child(childProcess);
} }
/**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended
* @param readable Readable stream to wait on
*/
export function last<T>(readable: Readable): Promise<T | null> { export function last<T>(readable: Readable): Promise<T | null> {
return baseFunctions.last(readable); return baseFunctions.last(readable);
} }
export function batch( /**
batchSize?: number, * Stores chunks of data internally in array and batches when batchSize is reached.
maxBatchAge?: number, *
): NodeJS.ReadWriteStream { * @param batchSize? Size of the batches, defaults to 1000.
* @param maxBatchAge? Max lifetime of a batch, defaults to 500
*/
export function batch({
batchSize,
maxBatchAge,
}: IBatchParams): NodeJS.ReadWriteStream {
return baseFunctions.batch(batchSize, maxBatchAge); return baseFunctions.batch(batchSize, maxBatchAge);
} }
/**
* Unbatches and sends individual chunks of data
*/
export function unbatch(): NodeJS.ReadWriteStream { export function unbatch(): NodeJS.ReadWriteStream {
return baseFunctions.unbatch(); return baseFunctions.unbatch();
} }
export function rate( /**
targetRate?: number, * Limits date of data transferred into stream.
period?: number, * @param targetRate? Desired rate in ms
): NodeJS.ReadWriteStream { * @param period? Period to sleep for when rate is above or equal to targetRate
*/
export function rate({
targetRate,
period,
}: IRateParams): NodeJS.ReadWriteStream {
return baseFunctions.rate(targetRate, period); return baseFunctions.rate(targetRate, period);
} }
export function parallelMap<T, R>( /**
mapper: (data: T) => R, * Limits number of parallel processes in flight.
parallel?: number, * @param parallel Max number of parallel processes.
sleepTime?: number, * @param func Function to execute on each data chunk
) { * @param pause Amount of time to pause processing when max number of parallel processes are executing.
*/
export function parallelMap<T, R>({
mapper,
parallel,
sleepTime,
}: IParallelMapParams<T, R>) {
return baseFunctions.parallelMap(mapper, parallel, sleepTime); return baseFunctions.parallelMap(mapper, parallel, sleepTime);
} }