Add descriptions for demux and compose
This commit is contained in:
parent
d33d8dcad3
commit
7aeea4815a
@ -10,8 +10,7 @@ export function collect(
|
|||||||
): Transform {
|
): Transform {
|
||||||
const collected: any[] = [];
|
const collected: any[] = [];
|
||||||
return new Transform({
|
return new Transform({
|
||||||
readableObjectMode: options.objectMode,
|
...options,
|
||||||
writableObjectMode: options.objectMode,
|
|
||||||
transform(data, encoding, callback) {
|
transform(data, encoding, callback) {
|
||||||
collected.push(data);
|
collected.push(data);
|
||||||
callback();
|
callback();
|
||||||
|
@ -4,23 +4,17 @@ import { pipeline, Duplex, DuplexOptions } from "stream";
|
|||||||
* Return a Readable stream of readable streams concatenated together
|
* Return a Readable stream of readable streams concatenated together
|
||||||
* @param streams Readable streams to concatenate
|
* @param streams Readable streams to concatenate
|
||||||
*/
|
*/
|
||||||
// First Readable --> Readable
|
|
||||||
// First Transform | Duplex, Last Writable --> Writable
|
|
||||||
//
|
|
||||||
export function compose(
|
export function compose(
|
||||||
streams: Array<
|
streams: Array<
|
||||||
NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream
|
NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream
|
||||||
>,
|
>,
|
||||||
options?: DuplexOptions,
|
options?: DuplexOptions,
|
||||||
): Compose {
|
): Compose {
|
||||||
// Maybe just return a new stream here
|
|
||||||
if (streams.length < 2) {
|
if (streams.length < 2) {
|
||||||
throw new Error("At least two streams are required to compose");
|
throw new Error("At least two streams are required to compose");
|
||||||
}
|
}
|
||||||
|
|
||||||
const composed = new Compose(streams, options);
|
return new Compose(streams, options);
|
||||||
|
|
||||||
return composed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enum EventSubscription {
|
enum EventSubscription {
|
||||||
@ -97,4 +91,21 @@ export class Compose extends Duplex {
|
|||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public once(event: string, cb: any) {
|
||||||
|
switch (eventsTarget[event]) {
|
||||||
|
case EventSubscription.First:
|
||||||
|
this.first.once(event, cb);
|
||||||
|
break;
|
||||||
|
case EventSubscription.Last:
|
||||||
|
this.last.once(event, cb);
|
||||||
|
break;
|
||||||
|
case EventSubscription.All:
|
||||||
|
this.streams.forEach(s => s.once(event, cb));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
super.once(event, cb);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,9 +57,7 @@ class Demux extends Writable {
|
|||||||
) {
|
) {
|
||||||
super(options);
|
super(options);
|
||||||
if (demuxBy.keyBy === undefined && demuxBy.key === undefined) {
|
if (demuxBy.keyBy === undefined && demuxBy.key === undefined) {
|
||||||
throw new Error(
|
throw new Error("keyBy or key must be provided in second argument");
|
||||||
"keyBy or key must be provided in second parameter",
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]);
|
this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]);
|
||||||
this.construct = construct;
|
this.construct = construct;
|
||||||
@ -68,6 +66,7 @@ class Demux extends Writable {
|
|||||||
this.nonWritableStreams = [];
|
this.nonWritableStreams = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Throttles when one stream is not writable
|
||||||
public _write(chunk: any, encoding?: any, cb?: any) {
|
public _write(chunk: any, encoding?: any, cb?: any) {
|
||||||
const destKey = this.demuxer(chunk);
|
const destKey = this.demuxer(chunk);
|
||||||
if (this.streamsByKey[destKey] === undefined) {
|
if (this.streamsByKey[destKey] === undefined) {
|
||||||
@ -76,10 +75,6 @@ class Demux extends Writable {
|
|||||||
writable: true,
|
writable: true,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
// Throttle when one stream is not writable anymore
|
|
||||||
// Set writable to false
|
|
||||||
// keep state of all the streams, if one is not writable demux shouldnt be writable
|
|
||||||
// Small optimization is to keep writing until you get a following event to the unwritable destination
|
|
||||||
let res = false;
|
let res = false;
|
||||||
if (this.streamsByKey[destKey].writable && this.isWritable) {
|
if (this.streamsByKey[destKey].writable && this.isWritable) {
|
||||||
res = this.streamsByKey[destKey].stream.write(chunk, encoding, cb);
|
res = this.streamsByKey[destKey].stream.write(chunk, encoding, cb);
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import { Readable, Writable, DuplexOptions, Transform, Duplex } from "stream";
|
import { Readable, Writable, Transform, Duplex } from "stream";
|
||||||
import { ChildProcess } from "child_process";
|
import { ChildProcess } from "child_process";
|
||||||
import * as baseFunctions from "./baseFunctions";
|
import * as baseFunctions from "./baseFunctions";
|
||||||
|
|
||||||
@ -7,17 +7,13 @@ import {
|
|||||||
TransformOptions,
|
TransformOptions,
|
||||||
WithEncoding,
|
WithEncoding,
|
||||||
JsonParseOptions,
|
JsonParseOptions,
|
||||||
FlushStrategy,
|
|
||||||
AccumulatorByIteratee,
|
|
||||||
} from "./baseDefinitions";
|
} from "./baseDefinitions";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert an array into a Readable stream of its elements
|
* Convert an array into a Readable stream of its elements
|
||||||
* @param array Array of elements to stream
|
* @param array Array of elements to stream
|
||||||
*/
|
*/
|
||||||
export function fromArray(array: any[]): Readable {
|
export const fromArray = baseFunctions.fromArray;
|
||||||
return baseFunctions.fromArray(array);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that maps streamed chunks
|
* Return a ReadWrite stream that maps streamed chunks
|
||||||
@ -26,12 +22,7 @@ export function fromArray(array: any[]): Readable {
|
|||||||
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
||||||
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
||||||
*/
|
*/
|
||||||
export function map<T, R>(
|
export const map = baseFunctions.map;
|
||||||
mapper: (chunk: T, encoding?: string) => R,
|
|
||||||
options?: TransformOptions,
|
|
||||||
): Transform {
|
|
||||||
return baseFunctions.map(mapper, options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that flat maps streamed chunks
|
* Return a ReadWrite stream that flat maps streamed chunks
|
||||||
@ -40,14 +31,7 @@ export function map<T, R>(
|
|||||||
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
||||||
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
||||||
*/
|
*/
|
||||||
export function flatMap<T, R>(
|
export const flatMap = baseFunctions.flatMap;
|
||||||
mapper:
|
|
||||||
| ((chunk: T, encoding: string) => R[])
|
|
||||||
| ((chunk: T, encoding: string) => Promise<R[]>),
|
|
||||||
options?: TransformOptions,
|
|
||||||
): Transform {
|
|
||||||
return baseFunctions.flatMap(mapper, options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
|
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
|
||||||
@ -55,14 +39,7 @@ export function flatMap<T, R>(
|
|||||||
* @param options?
|
* @param options?
|
||||||
* @param options.objectMode? Whether this stream should behave as a stream of objects.
|
* @param options.objectMode? Whether this stream should behave as a stream of objects.
|
||||||
*/
|
*/
|
||||||
export function filter<T>(
|
export const filter = baseFunctions.filter;
|
||||||
mapper:
|
|
||||||
| ((chunk: T, encoding: string) => boolean)
|
|
||||||
| ((chunk: T, encoding: string) => Promise<boolean>),
|
|
||||||
options?: TransformOptions,
|
|
||||||
): Transform {
|
|
||||||
return baseFunctions.filter(mapper, options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
|
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
|
||||||
@ -73,15 +50,7 @@ export function filter<T>(
|
|||||||
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
||||||
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
||||||
*/
|
*/
|
||||||
export function reduce<T, R>(
|
export const reduce = baseFunctions.reduce;
|
||||||
iteratee:
|
|
||||||
| ((previousValue: R, chunk: T, encoding: string) => R)
|
|
||||||
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
|
|
||||||
initialValue: R,
|
|
||||||
options?: TransformOptions,
|
|
||||||
): Transform {
|
|
||||||
return baseFunctions.reduce(iteratee, initialValue, options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that splits streamed chunks using the given separator
|
* Return a ReadWrite stream that splits streamed chunks using the given separator
|
||||||
@ -89,12 +58,7 @@ export function reduce<T, R>(
|
|||||||
* @param options? Defaults to encoding: utf8
|
* @param options? Defaults to encoding: utf8
|
||||||
* @param options.encoding? Encoding written chunks are assumed to use
|
* @param options.encoding? Encoding written chunks are assumed to use
|
||||||
*/
|
*/
|
||||||
export function split(
|
export const split = baseFunctions.split;
|
||||||
separator?: string | RegExp,
|
|
||||||
options?: WithEncoding,
|
|
||||||
): Transform {
|
|
||||||
return baseFunctions.split(separator, options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that joins streamed chunks using the given separator
|
* Return a ReadWrite stream that joins streamed chunks using the given separator
|
||||||
@ -102,9 +66,7 @@ export function split(
|
|||||||
* @param options? Defaults to encoding: utf8
|
* @param options? Defaults to encoding: utf8
|
||||||
* @param options.encoding? Encoding written chunks are assumed to use
|
* @param options.encoding? Encoding written chunks are assumed to use
|
||||||
*/
|
*/
|
||||||
export function join(separator: string, options?: WithEncoding): Transform {
|
export const join = baseFunctions.join;
|
||||||
return baseFunctions.join(separator, options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
|
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
|
||||||
@ -114,21 +76,13 @@ export function join(separator: string, options?: WithEncoding): Transform {
|
|||||||
* @param options? Defaults to encoding: utf8
|
* @param options? Defaults to encoding: utf8
|
||||||
* @param options.encoding Encoding written chunks are assumed to use
|
* @param options.encoding Encoding written chunks are assumed to use
|
||||||
*/
|
*/
|
||||||
export function replace(
|
export const replace = baseFunctions.replace;
|
||||||
searchValue: string | RegExp,
|
|
||||||
replaceValue: string,
|
|
||||||
options?: WithEncoding,
|
|
||||||
): Transform {
|
|
||||||
return baseFunctions.replace(searchValue, replaceValue, options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
|
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
|
||||||
* must be a fully defined JSON string in utf8.
|
* must be a fully defined JSON string in utf8.
|
||||||
*/
|
*/
|
||||||
export function parse(): Transform {
|
export const parse = baseFunctions.parse;
|
||||||
return baseFunctions.parse();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
|
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
|
||||||
@ -136,34 +90,26 @@ export function parse(): Transform {
|
|||||||
* @param options.pretty If true, whitespace is inserted into the stringified chunks.
|
* @param options.pretty If true, whitespace is inserted into the stringified chunks.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
export function stringify(options?: JsonParseOptions): Transform {
|
export const stringify = baseFunctions.stringify;
|
||||||
return baseFunctions.stringify(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
|
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
|
||||||
* @param options?
|
* @param options?
|
||||||
* @param options.objectMode? Whether this stream should behave as a stream of objects
|
* @param options.objectMode? Whether this stream should behave as a stream of objects
|
||||||
*/
|
*/
|
||||||
export function collect(options?: ThroughOptions): Transform {
|
export const collect = baseFunctions.collect;
|
||||||
return baseFunctions.collect(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a Readable stream of readable streams concatenated together
|
* Return a Readable stream of readable streams concatenated together
|
||||||
* @param streams Readable streams to concatenate
|
* @param streams Readable streams to concatenate
|
||||||
*/
|
*/
|
||||||
export function concat(...streams: Readable[]): Readable {
|
export const concat = baseFunctions.concat;
|
||||||
return baseFunctions.concat(...streams);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a Readable stream of readable streams concatenated together
|
* Return a Readable stream of readable streams concatenated together
|
||||||
* @param streams Readable streams to merge
|
* @param streams Readable streams to merge
|
||||||
*/
|
*/
|
||||||
export function merge(...streams: Readable[]): Readable {
|
export const merge = baseFunctions.merge;
|
||||||
return baseFunctions.merge(...streams);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
|
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
|
||||||
@ -171,42 +117,34 @@ export function merge(...streams: Readable[]): Readable {
|
|||||||
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
|
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
|
||||||
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
|
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
|
||||||
*/
|
*/
|
||||||
export function duplex(writable: Writable, readable: Readable): Duplex {
|
export const duplex = baseFunctions.duplex;
|
||||||
return baseFunctions.duplex(writable, readable);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a Duplex stream from a child process' stdin and stdout
|
* Return a Duplex stream from a child process' stdin and stdout
|
||||||
* @param childProcess Child process from which to create duplex stream
|
* @param childProcess Child process from which to create duplex stream
|
||||||
*/
|
*/
|
||||||
export function child(childProcess: ChildProcess): Duplex {
|
export const child = baseFunctions.child;
|
||||||
return baseFunctions.child(childProcess);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
|
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
|
||||||
* ended
|
* ended
|
||||||
* @param readable Readable stream to wait on
|
* @param readable Readable stream to wait on
|
||||||
*/
|
*/
|
||||||
export function last<T>(readable: Readable): Promise<T | null> {
|
export const last = baseFunctions.last;
|
||||||
return baseFunctions.last(readable);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores chunks of data internally in array and batches when batchSize is reached.
|
* Stores chunks of data internally in array and batches when batchSize is reached.
|
||||||
* @param batchSize Size of the batches, defaults to 1000.
|
* @param batchSize Size of the batches, defaults to 1000.
|
||||||
* @param maxBatchAge? Max lifetime of a batch, defaults to 500
|
* @param maxBatchAge? Max lifetime of a batch, defaults to 500
|
||||||
*/
|
*/
|
||||||
export function batch(batchSize: number, maxBatchAge?: number): Transform {
|
export function batch(batchSize?: number, maxBatchAge?: number): Transform {
|
||||||
return baseFunctions.batch(batchSize, maxBatchAge);
|
return baseFunctions.batch(batchSize, maxBatchAge);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unbatches and sends individual chunks of data.
|
* Unbatches and sends individual chunks of data.
|
||||||
*/
|
*/
|
||||||
export function unbatch(): Transform {
|
export const unbatch = baseFunctions.unbatch;
|
||||||
return baseFunctions.unbatch();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Limits rate of data transferred into stream.
|
* Limits rate of data transferred into stream.
|
||||||
@ -224,13 +162,7 @@ export function rate(targetRate?: number, period?: number): Transform {
|
|||||||
* @param func Function to execute on each data chunk.
|
* @param func Function to execute on each data chunk.
|
||||||
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
|
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
|
||||||
*/
|
*/
|
||||||
export function parallelMap<T, R>(
|
export const parallelMap = baseFunctions.parallelMap;
|
||||||
mapper: (chunk: T) => R,
|
|
||||||
parallel?: number,
|
|
||||||
sleepTime?: number,
|
|
||||||
) {
|
|
||||||
return baseFunctions.parallelMap(mapper, parallel, sleepTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
|
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
|
||||||
@ -252,19 +184,7 @@ export function parallelMap<T, R>(
|
|||||||
* @param flushStrategy Buffering strategy to use.
|
* @param flushStrategy Buffering strategy to use.
|
||||||
* @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer.
|
* @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer.
|
||||||
*/
|
*/
|
||||||
export function accumulator(
|
export const accumulator = baseFunctions.accumulator;
|
||||||
batchSize: number,
|
|
||||||
batchRate: number | undefined,
|
|
||||||
flushStrategy: FlushStrategy,
|
|
||||||
keyBy?: string,
|
|
||||||
) {
|
|
||||||
return baseFunctions.accumulator(
|
|
||||||
batchSize,
|
|
||||||
batchRate,
|
|
||||||
flushStrategy,
|
|
||||||
keyBy,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
|
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
|
||||||
@ -280,22 +200,21 @@ export function accumulator(
|
|||||||
* @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into
|
* @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into
|
||||||
* or items need to be cleared from buffer.
|
* or items need to be cleared from buffer.
|
||||||
*/
|
*/
|
||||||
export function accumulatorBy<T, S extends FlushStrategy>(
|
export const accumulatorBy = baseFunctions.accumulatorBy;
|
||||||
batchRate: number | undefined,
|
|
||||||
flushStrategy: S,
|
|
||||||
iteratee: AccumulatorByIteratee<T>,
|
|
||||||
) {
|
|
||||||
return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||||
|
* @param streams Array of streams to compose. Minimum of two.
|
||||||
|
* @param options Transform stream options
|
||||||
|
**/
|
||||||
export const compose = baseFunctions.compose;
|
export const compose = baseFunctions.compose;
|
||||||
|
|
||||||
export function demux(
|
/**
|
||||||
construct: (
|
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||||
destKey?: string,
|
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||||
) => NodeJS.WritableStream | NodeJS.ReadWriteStream,
|
* @param demuxBy
|
||||||
demuxer: { key?: string; keyBy?: (chunk: any) => string },
|
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||||
options?: DuplexOptions,
|
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||||
) {
|
* @param options Writable stream options
|
||||||
return baseFunctions.demux(construct, demuxer, options);
|
**/
|
||||||
}
|
export const demux = baseFunctions.demux;
|
||||||
|
@ -212,7 +212,8 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write
|
|||||||
});
|
});
|
||||||
|
|
||||||
composed.on("data", (chunk: Chunk) => {
|
composed.on("data", (chunk: Chunk) => {
|
||||||
if (chunk.key === "e") {
|
pendingReads--;
|
||||||
|
if (pendingReads === 0) {
|
||||||
resolve();
|
resolve();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -226,6 +227,7 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write
|
|||||||
];
|
];
|
||||||
|
|
||||||
let start = performance.now();
|
let start = performance.now();
|
||||||
|
let pendingReads = input.length;
|
||||||
for (const item of input) {
|
for (const item of input) {
|
||||||
const res = composed.write(item);
|
const res = composed.write(item);
|
||||||
expect(composed._writableState.length).to.be.at.most(2);
|
expect(composed._writableState.length).to.be.at.most(2);
|
||||||
|
Loading…
Reference in New Issue
Block a user