diff --git a/src/functions/collect.ts b/src/functions/collect.ts index 33b7330..9507565 100644 --- a/src/functions/collect.ts +++ b/src/functions/collect.ts @@ -10,8 +10,7 @@ export function collect( ): Transform { const collected: any[] = []; return new Transform({ - readableObjectMode: options.objectMode, - writableObjectMode: options.objectMode, + ...options, transform(data, encoding, callback) { collected.push(data); callback(); diff --git a/src/functions/compose.ts b/src/functions/compose.ts index 00ff00e..3212fb9 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -4,23 +4,17 @@ import { pipeline, Duplex, DuplexOptions } from "stream"; * Return a Readable stream of readable streams concatenated together * @param streams Readable streams to concatenate */ -// First Readable --> Readable -// First Transform | Duplex, Last Writable --> Writable -// export function compose( streams: Array< NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream >, options?: DuplexOptions, ): Compose { - // Maybe just return a new stream here if (streams.length < 2) { throw new Error("At least two streams are required to compose"); } - const composed = new Compose(streams, options); - - return composed; + return new Compose(streams, options); } enum EventSubscription { @@ -97,4 +91,21 @@ export class Compose extends Duplex { } return this; } + + public once(event: string, cb: any) { + switch (eventsTarget[event]) { + case EventSubscription.First: + this.first.once(event, cb); + break; + case EventSubscription.Last: + this.last.once(event, cb); + break; + case EventSubscription.All: + this.streams.forEach(s => s.once(event, cb)); + break; + default: + super.once(event, cb); + } + return this; + } } diff --git a/src/functions/demux.ts b/src/functions/demux.ts index a72eb69..77e7d48 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -57,9 +57,7 @@ class Demux extends Writable { ) { super(options); if (demuxBy.keyBy === undefined && demuxBy.key === undefined) { - throw new Error( - "keyBy or key must be provided in second parameter", - ); + throw new Error("keyBy or key must be provided in second argument"); } this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]); this.construct = construct; @@ -68,6 +66,7 @@ class Demux extends Writable { this.nonWritableStreams = []; } + // Throttles when one stream is not writable public _write(chunk: any, encoding?: any, cb?: any) { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { @@ -76,10 +75,6 @@ class Demux extends Writable { writable: true, }; } - // Throttle when one stream is not writable anymore - // Set writable to false - // keep state of all the streams, if one is not writable demux shouldnt be writable - // Small optimization is to keep writing until you get a following event to the unwritable destination let res = false; if (this.streamsByKey[destKey].writable && this.isWritable) { res = this.streamsByKey[destKey].stream.write(chunk, encoding, cb); diff --git a/src/functions/index.ts b/src/functions/index.ts index 59ff9c3..0998e26 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -1,4 +1,4 @@ -import { Readable, Writable, DuplexOptions, Transform, Duplex } from "stream"; +import { Readable, Writable, Transform, Duplex } from "stream"; import { ChildProcess } from "child_process"; import * as baseFunctions from "./baseFunctions"; @@ -7,17 +7,13 @@ import { TransformOptions, WithEncoding, JsonParseOptions, - FlushStrategy, - AccumulatorByIteratee, } from "./baseDefinitions"; /** * Convert an array into a Readable stream of its elements * @param array Array of elements to stream */ -export function fromArray(array: any[]): Readable { - return baseFunctions.fromArray(array); -} +export const fromArray = baseFunctions.fromArray; /** * Return a ReadWrite stream that maps streamed chunks @@ -26,12 +22,7 @@ export function fromArray(array: any[]): Readable { * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects */ -export function map( - mapper: (chunk: T, encoding?: string) => R, - options?: TransformOptions, -): Transform { - return baseFunctions.map(mapper, options); -} +export const map = baseFunctions.map; /** * Return a ReadWrite stream that flat maps streamed chunks @@ -40,14 +31,7 @@ export function map( * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects */ -export function flatMap( - mapper: - | ((chunk: T, encoding: string) => R[]) - | ((chunk: T, encoding: string) => Promise), - options?: TransformOptions, -): Transform { - return baseFunctions.flatMap(mapper, options); -} +export const flatMap = baseFunctions.flatMap; /** * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold @@ -55,14 +39,7 @@ export function flatMap( * @param options? * @param options.objectMode? Whether this stream should behave as a stream of objects. */ -export function filter( - mapper: - | ((chunk: T, encoding: string) => boolean) - | ((chunk: T, encoding: string) => Promise), - options?: TransformOptions, -): Transform { - return baseFunctions.filter(mapper, options); -} +export const filter = baseFunctions.filter; /** * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that @@ -73,15 +50,7 @@ export function filter( * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects */ -export function reduce( - iteratee: - | ((previousValue: R, chunk: T, encoding: string) => R) - | ((previousValue: R, chunk: T, encoding: string) => Promise), - initialValue: R, - options?: TransformOptions, -): Transform { - return baseFunctions.reduce(iteratee, initialValue, options); -} +export const reduce = baseFunctions.reduce; /** * Return a ReadWrite stream that splits streamed chunks using the given separator @@ -89,12 +58,7 @@ export function reduce( * @param options? Defaults to encoding: utf8 * @param options.encoding? Encoding written chunks are assumed to use */ -export function split( - separator?: string | RegExp, - options?: WithEncoding, -): Transform { - return baseFunctions.split(separator, options); -} +export const split = baseFunctions.split; /** * Return a ReadWrite stream that joins streamed chunks using the given separator @@ -102,9 +66,7 @@ export function split( * @param options? Defaults to encoding: utf8 * @param options.encoding? Encoding written chunks are assumed to use */ -export function join(separator: string, options?: WithEncoding): Transform { - return baseFunctions.join(separator, options); -} +export const join = baseFunctions.join; /** * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in @@ -114,21 +76,13 @@ export function join(separator: string, options?: WithEncoding): Transform { * @param options? Defaults to encoding: utf8 * @param options.encoding Encoding written chunks are assumed to use */ -export function replace( - searchValue: string | RegExp, - replaceValue: string, - options?: WithEncoding, -): Transform { - return baseFunctions.replace(searchValue, replaceValue, options); -} +export const replace = baseFunctions.replace; /** * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk * must be a fully defined JSON string in utf8. */ -export function parse(): Transform { - return baseFunctions.parse(); -} +export const parse = baseFunctions.parse; /** * Return a ReadWrite stream that stringifies the streamed chunks to JSON @@ -136,34 +90,26 @@ export function parse(): Transform { * @param options.pretty If true, whitespace is inserted into the stringified chunks. * */ -export function stringify(options?: JsonParseOptions): Transform { - return baseFunctions.stringify(options); -} +export const stringify = baseFunctions.stringify; /** * Return a ReadWrite stream that collects streamed chunks into an array or buffer * @param options? * @param options.objectMode? Whether this stream should behave as a stream of objects */ -export function collect(options?: ThroughOptions): Transform { - return baseFunctions.collect(options); -} +export const collect = baseFunctions.collect; /** * Return a Readable stream of readable streams concatenated together * @param streams Readable streams to concatenate */ -export function concat(...streams: Readable[]): Readable { - return baseFunctions.concat(...streams); -} +export const concat = baseFunctions.concat; /** * Return a Readable stream of readable streams concatenated together * @param streams Readable streams to merge */ -export function merge(...streams: Readable[]): Readable { - return baseFunctions.merge(...streams); -} +export const merge = baseFunctions.merge; /** * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, @@ -171,42 +117,34 @@ export function merge(...streams: Readable[]): Readable { * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to * @param readable Readable stream assumed to yield chunks when the writable stream is written to */ -export function duplex(writable: Writable, readable: Readable): Duplex { - return baseFunctions.duplex(writable, readable); -} +export const duplex = baseFunctions.duplex; /** * Return a Duplex stream from a child process' stdin and stdout * @param childProcess Child process from which to create duplex stream */ -export function child(childProcess: ChildProcess): Duplex { - return baseFunctions.child(childProcess); -} +export const child = baseFunctions.child; /** * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has * ended * @param readable Readable stream to wait on */ -export function last(readable: Readable): Promise { - return baseFunctions.last(readable); -} +export const last = baseFunctions.last; /** * Stores chunks of data internally in array and batches when batchSize is reached. * @param batchSize Size of the batches, defaults to 1000. * @param maxBatchAge? Max lifetime of a batch, defaults to 500 */ -export function batch(batchSize: number, maxBatchAge?: number): Transform { +export function batch(batchSize?: number, maxBatchAge?: number): Transform { return baseFunctions.batch(batchSize, maxBatchAge); } /** * Unbatches and sends individual chunks of data. */ -export function unbatch(): Transform { - return baseFunctions.unbatch(); -} +export const unbatch = baseFunctions.unbatch; /** * Limits rate of data transferred into stream. @@ -224,13 +162,7 @@ export function rate(targetRate?: number, period?: number): Transform { * @param func Function to execute on each data chunk. * @param pause Amount of time to pause processing when max number of parallel processes are executing. */ -export function parallelMap( - mapper: (chunk: T) => R, - parallel?: number, - sleepTime?: number, -) { - return baseFunctions.parallelMap(mapper, parallel, sleepTime); -} +export const parallelMap = baseFunctions.parallelMap; /** * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items @@ -252,19 +184,7 @@ export function parallelMap( * @param flushStrategy Buffering strategy to use. * @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer. */ -export function accumulator( - batchSize: number, - batchRate: number | undefined, - flushStrategy: FlushStrategy, - keyBy?: string, -) { - return baseFunctions.accumulator( - batchSize, - batchRate, - flushStrategy, - keyBy, - ); -} +export const accumulator = baseFunctions.accumulator; /** * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items @@ -280,22 +200,21 @@ export function accumulator( * @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into * or items need to be cleared from buffer. */ -export function accumulatorBy( - batchRate: number | undefined, - flushStrategy: S, - iteratee: AccumulatorByIteratee, -) { - return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee); -} +export const accumulatorBy = baseFunctions.accumulatorBy; +/** + * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. + * @param streams Array of streams to compose. Minimum of two. + * @param options Transform stream options + **/ export const compose = baseFunctions.compose; -export function demux( - construct: ( - destKey?: string, - ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, - demuxer: { key?: string; keyBy?: (chunk: any) => string }, - options?: DuplexOptions, -) { - return baseFunctions.demux(construct, demuxer, options); -} +/** + * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. + * @param construct Constructor for new output source. Should return a Writable or ReadWrite stream. + * @param demuxBy + * @param demuxBy.key? Key to fetch value from source chunks to demultiplex source. + * @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source. + * @param options Writable stream options + **/ +export const demux = baseFunctions.demux; diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 2c5feb6..acc6e2e 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -212,7 +212,8 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write }); composed.on("data", (chunk: Chunk) => { - if (chunk.key === "e") { + pendingReads--; + if (pendingReads === 0) { resolve(); } }); @@ -226,6 +227,7 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write ]; let start = performance.now(); + let pendingReads = input.length; for (const item of input) { const res = composed.write(item); expect(composed._writableState.length).to.be.at.most(2);