diff --git a/src/functions/batch.ts b/src/functions/batch.ts index 4b56b4c..0d0f314 100644 --- a/src/functions/batch.ts +++ b/src/functions/batch.ts @@ -1,11 +1,6 @@ import { Transform } from "stream"; import { TransformOptions } from "./baseDefinitions"; -/** - * Stores chunks of data internally in array and batches when batchSize is reached. - * - * @param batchSize Size of the batches - * @param maxBatchAge Max lifetime of a batch in seconds - */ + export function batch( batchSize: number = 1000, maxBatchAge: number = 500, diff --git a/src/functions/child.ts b/src/functions/child.ts index e2e0c22..73bdbef 100644 --- a/src/functions/child.ts +++ b/src/functions/child.ts @@ -1,9 +1,6 @@ import { ChildProcess } from "child_process"; import { duplex } from "./baseFunctions"; -/** - * Return a Duplex stream from a child process' stdin and stdout - * @param childProcess Child process from which to create duplex stream - */ + export function child(childProcess: ChildProcess) { if (childProcess.stdin === null) { throw new Error("childProcess.stdin is null"); diff --git a/src/functions/collect.ts b/src/functions/collect.ts index 9507565..38cd6ea 100644 --- a/src/functions/collect.ts +++ b/src/functions/collect.ts @@ -1,10 +1,6 @@ import { Transform } from "stream"; import { ThroughOptions } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that collects streamed chunks into an array or buffer - * @param options - * @param options.objectMode Whether this stream should behave as a stream of objects - */ + export function collect( options: ThroughOptions = { objectMode: false }, ): Transform { diff --git a/src/functions/compose.ts b/src/functions/compose.ts index 3212fb9..7a4fdb4 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -1,9 +1,5 @@ import { pipeline, Duplex, DuplexOptions } from "stream"; -/** - * Return a Readable stream of readable streams concatenated together - * @param streams Readable streams to concatenate - */ export function compose( streams: Array< NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream diff --git a/src/functions/concat.ts b/src/functions/concat.ts index af79db9..d15f936 100644 --- a/src/functions/concat.ts +++ b/src/functions/concat.ts @@ -1,8 +1,5 @@ import { Readable } from "stream"; -/** - * Return a Readable stream of readable streams concatenated together - * @param streams Readable streams to concatenate - */ + export function concat(...streams: NodeJS.ReadableStream[]): Readable { let isStarted = false; let currentStreamIndex = 0; diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 77e7d48..e26dafb 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -22,11 +22,6 @@ const eventsTarget = { unpipe: EventSubscription.Unhandled, }; -/** - * Return a Duplex stream that is pushed data from multiple sources - * @param streams Source streams to multiplex - * @param options Duplex stream options - */ export function demux( construct: () => NodeJS.WritableStream | NodeJS.ReadWriteStream, demuxBy: { key?: string; keyBy?: (chunk: any) => string }, diff --git a/src/functions/duplex.ts b/src/functions/duplex.ts index b1e967a..b72fd0d 100644 --- a/src/functions/duplex.ts +++ b/src/functions/duplex.ts @@ -1,10 +1,5 @@ import { Duplex } from "stream"; -/** - * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, - * cause the given readable stream to yield chunks - * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to - * @param readable Readable stream assumed to yield chunks when the writable stream is written to - */ + export function duplex( writable: NodeJS.WritableStream, readable: NodeJS.ReadableStream, diff --git a/src/functions/filter.ts b/src/functions/filter.ts index 336db0c..e7578b3 100644 --- a/src/functions/filter.ts +++ b/src/functions/filter.ts @@ -1,10 +1,5 @@ import { Transform, TransformOptions } from "stream"; -/** - * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold - * @param predicate Predicate with which to filter scream chunks - * @param options - * @param options.objectMode Whether this stream should behave as a stream of objects - */ + export function filter( predicate: | ((chunk: T, encoding: string) => boolean) diff --git a/src/functions/flatMap.ts b/src/functions/flatMap.ts index ba8915f..99f38a6 100644 --- a/src/functions/flatMap.ts +++ b/src/functions/flatMap.ts @@ -1,12 +1,6 @@ import { Transform } from "stream"; import { TransformOptions } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that flat maps streamed chunks - * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) - * @param options - * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects - */ + export function flatMap( mapper: | ((chunk: T, encoding: string) => R[]) diff --git a/src/functions/fromArray.ts b/src/functions/fromArray.ts index 54e01a9..a757354 100644 --- a/src/functions/fromArray.ts +++ b/src/functions/fromArray.ts @@ -1,8 +1,5 @@ import { Readable } from "stream"; -/** - * Convert an array into a Readable stream of its elements - * @param array Array of elements to stream - */ + export function fromArray(array: any[]): Readable { let cursor = 0; return new Readable({ diff --git a/src/functions/index.ts b/src/functions/index.ts index 0998e26..48d8062 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -1,14 +1,6 @@ -import { Readable, Writable, Transform, Duplex } from "stream"; -import { ChildProcess } from "child_process"; +import { Transform } from "stream"; import * as baseFunctions from "./baseFunctions"; -import { - ThroughOptions, - TransformOptions, - WithEncoding, - JsonParseOptions, -} from "./baseDefinitions"; - /** * Convert an array into a Readable stream of its elements * @param array Array of elements to stream @@ -206,7 +198,7 @@ export const accumulatorBy = baseFunctions.accumulatorBy; * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. * @param streams Array of streams to compose. Minimum of two. * @param options Transform stream options - **/ + */ export const compose = baseFunctions.compose; /** @@ -216,5 +208,5 @@ export const compose = baseFunctions.compose; * @param demuxBy.key? Key to fetch value from source chunks to demultiplex source. * @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source. * @param options Writable stream options - **/ + */ export const demux = baseFunctions.demux; diff --git a/src/functions/join.ts b/src/functions/join.ts index c1a28b6..b49022b 100644 --- a/src/functions/join.ts +++ b/src/functions/join.ts @@ -1,12 +1,7 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; import { WithEncoding } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that joins streamed chunks using the given separator - * @param separator Separator to join with - * @param options - * @param options.encoding Encoding written chunks are assumed to use - */ + export function join( separator: string, options: WithEncoding = { encoding: "utf8" }, diff --git a/src/functions/last.ts b/src/functions/last.ts index 98422a7..412a34c 100644 --- a/src/functions/last.ts +++ b/src/functions/last.ts @@ -1,8 +1,3 @@ -/** - * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has - * ended - * @param readable Readable stream to wait on - */ export function last(readable: NodeJS.ReadableStream): Promise { let lastChunk: T | null = null; return new Promise((resolve, _) => { diff --git a/src/functions/map.ts b/src/functions/map.ts index 05fe627..5848ca5 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -1,12 +1,6 @@ import { Transform } from "stream"; import { TransformOptions } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that maps streamed chunks - * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) - * @param options - * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects - */ + export function map( mapper: (chunk: T, encoding: string) => R, options: TransformOptions = { diff --git a/src/functions/merge.ts b/src/functions/merge.ts index 7166006..ff4d5f6 100644 --- a/src/functions/merge.ts +++ b/src/functions/merge.ts @@ -1,8 +1,5 @@ import { Readable } from "stream"; -/** - * Return a Readable stream of readable streams merged together in chunk arrival order - * @param streams Readable streams to merge - */ + export function merge(...streams: Readable[]): Readable { let isStarted = false; let streamEndedCount = 0; diff --git a/src/functions/parallelMap.ts b/src/functions/parallelMap.ts index 7610f49..56c9f41 100644 --- a/src/functions/parallelMap.ts +++ b/src/functions/parallelMap.ts @@ -1,12 +1,7 @@ import { Transform } from "stream"; import { sleep } from "../helpers"; import { TransformOptions } from "./baseDefinitions"; -/** - * Limits number of parallel processes in flight. - * @param parallel Max number of parallel processes. - * @param func Function to execute on each data chunk - * @param pause Amount of time to pause processing when max number of parallel processes are executing. - */ + export function parallelMap( mapper: (data: T) => R, parallel: number = 10, diff --git a/src/functions/parse.ts b/src/functions/parse.ts index da2ccee..451e86c 100644 --- a/src/functions/parse.ts +++ b/src/functions/parse.ts @@ -1,11 +1,7 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; import { SerializationFormats } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk - * must be a fully defined JSON string. - * @param format Format of serialized data, only utf8 supported. - */ + export function parse( format: SerializationFormats = SerializationFormats.utf8, ): Transform { diff --git a/src/functions/rate.ts b/src/functions/rate.ts index e322744..cb5cbfb 100644 --- a/src/functions/rate.ts +++ b/src/functions/rate.ts @@ -2,11 +2,7 @@ import { Transform } from "stream"; import { performance } from "perf_hooks"; import { sleep } from "../helpers"; import { TransformOptions } from "./baseDefinitions"; -/** - * Limits date of data transferred into stream. - * @param targetRate Desired rate in ms - * @param period Period to sleep for when rate is above or equal to targetRate - */ + export function rate( targetRate: number = 50, period: number = 1, diff --git a/src/functions/reduce.ts b/src/functions/reduce.ts index 6dfcdf9..6ee665f 100644 --- a/src/functions/reduce.ts +++ b/src/functions/reduce.ts @@ -1,14 +1,6 @@ import { Transform } from "stream"; import { TransformOptions } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that - * value - * @param iteratee Reducer function to apply on each streamed chunk - * @param initialValue Initial value - * @param options - * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects - */ + export function reduce( iteratee: | ((previousValue: R, chunk: T, encoding: string) => R) diff --git a/src/functions/replace.ts b/src/functions/replace.ts index e8bc0e7..dc5a05e 100644 --- a/src/functions/replace.ts +++ b/src/functions/replace.ts @@ -1,14 +1,7 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; import { WithEncoding } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in - * the streamed chunks with the specified replacement string - * @param searchValue Search string to use - * @param replaceValue Replacement string to use - * @param options - * @param options.encoding Encoding written chunks are assumed to use - */ + export function replace( searchValue: string | RegExp, replaceValue: string, diff --git a/src/functions/split.ts b/src/functions/split.ts index fe31d65..8e517ed 100644 --- a/src/functions/split.ts +++ b/src/functions/split.ts @@ -1,12 +1,7 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; import { WithEncoding } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that splits streamed chunks using the given separator - * @param separator Separator to split by, defaulting to "\n" - * @param options - * @param options.encoding Encoding written chunks are assumed to use - */ + export function split( separator: string | RegExp = "\n", options: WithEncoding = { encoding: "utf8" }, diff --git a/src/functions/stringify.ts b/src/functions/stringify.ts index 21996ad..34eb302 100644 --- a/src/functions/stringify.ts +++ b/src/functions/stringify.ts @@ -1,9 +1,6 @@ import { Transform } from "stream"; import { JsonValue, JsonParseOptions } from "./baseDefinitions"; -/** - * Return a ReadWrite stream that stringifies the streamed chunks to JSON - */ export function stringify( options: JsonParseOptions = { pretty: false }, ): Transform { diff --git a/src/functions/unbatch.ts b/src/functions/unbatch.ts index d8fc25f..0f9b3f6 100644 --- a/src/functions/unbatch.ts +++ b/src/functions/unbatch.ts @@ -1,8 +1,6 @@ import { Transform } from "stream"; import { TransformOptions } from "./baseDefinitions"; -/** - * Unbatches and sends individual chunks of data - */ + export function unbatch( options: TransformOptions = { readableObjectMode: true,