DefaultOptions implemented as module factory

This commit is contained in:
Lewis Diamond 2019-12-02 16:02:40 -05:00
parent c690185ab7
commit a85054fd82
40 changed files with 357 additions and 316 deletions

View File

@ -40,7 +40,7 @@
"test:all": "NODE_PATH=src node node_modules/.bin/ava", "test:all": "NODE_PATH=src node node_modules/.bin/ava",
"lint": "tslint -p tsconfig.json", "lint": "tslint -p tsconfig.json",
"validate:tslint": "tslint-config-prettier-check ./tslint.json", "validate:tslint": "tslint-config-prettier-check ./tslint.json",
"prepublishOnly": "yarn lint && yarn test && yarn tsc" "prepublishOnly": "yarn lint && yarn test && yarn tsc -d"
}, },
"dependencies": {}, "dependencies": {},
"devDependencies": { "devDependencies": {

View File

@ -1,5 +1,4 @@
import { Transform, TransformOptions } from "stream"; import { Transform, TransformOptions } from "stream";
import { batch } from ".";
export enum FlushStrategy { export enum FlushStrategy {
rolling = "rolling", rolling = "rolling",
@ -129,23 +128,23 @@ export function accumulator(
keyBy?: string, keyBy?: string,
options?: TransformOptions, options?: TransformOptions,
): Transform { ): Transform {
if (flushStrategy === FlushStrategy.sliding) { switch (flushStrategy) {
case FlushStrategy.sliding:
return sliding(batchSize, keyBy, options); return sliding(batchSize, keyBy, options);
} else if (flushStrategy === FlushStrategy.rolling) { case FlushStrategy.rolling:
return rolling(batchSize, keyBy, options); return rolling(batchSize, keyBy, options);
} else {
return batch(batchSize);
} }
} }
export function accumulatorBy<T, S extends FlushStrategy>( export function accumulatorBy<T, S extends FlushStrategy>(
flushStrategy: S, flushStrategy: FlushStrategy,
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
options?: TransformOptions, options?: TransformOptions,
): Transform { ): Transform {
if (flushStrategy === FlushStrategy.sliding) { switch (flushStrategy) {
case FlushStrategy.sliding:
return slidingBy(iteratee, options); return slidingBy(iteratee, options);
} else { case FlushStrategy.rolling:
return rollingBy(iteratee, options); return rollingBy(iteratee, options);
} }
} }

View File

@ -1,23 +0,0 @@
export { accumulator, accumulatorBy } from "./accumulator";
export { batch } from "./batch";
export { child } from "./child";
export { collect } from "./collect";
export { concat } from "./concat";
export { duplex } from "./duplex";
export { filter } from "./filter";
export { flatMap } from "./flatMap";
export { fromArray } from "./fromArray";
export { join } from "./join";
export { last } from "./last";
export { map } from "./map";
export { merge } from "./merge";
export { parallelMap } from "./parallelMap";
export { parse } from "./parse";
export { rate } from "./rate";
export { reduce } from "./reduce";
export { replace } from "./replace";
export { split } from "./split";
export { stringify } from "./stringify";
export { unbatch } from "./unbatch";
export { compose } from "./compose";
export { demux } from "./demux";

View File

@ -3,9 +3,7 @@ import { Transform, TransformOptions } from "stream";
export function batch( export function batch(
batchSize: number = 1000, batchSize: number = 1000,
maxBatchAge: number = 500, maxBatchAge: number = 500,
options: TransformOptions = { options: TransformOptions = {},
objectMode: true,
},
): Transform { ): Transform {
let buffer: any[] = []; let buffer: any[] = [];
let timer: NodeJS.Timer | null = null; let timer: NodeJS.Timer | null = null;

View File

@ -1,5 +1,5 @@
import { ChildProcess } from "child_process"; import { ChildProcess } from "child_process";
import { duplex } from "./baseFunctions"; import { duplex } from "./duplex";
export function child(childProcess: ChildProcess) { export function child(childProcess: ChildProcess) {
if (childProcess.stdin === null) { if (childProcess.stdin === null) {

View File

@ -25,7 +25,7 @@ const eventsTarget = {
type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
export function demux( export function demux(
construct: () => DemuxStreams, construct: (destKey?: string) => DemuxStreams,
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options?: WritableOptions, options?: WritableOptions,
): Writable { ): Writable {

View File

@ -4,9 +4,7 @@ export function flatMap<T, R>(
mapper: mapper:
| ((chunk: T, encoding: string) => R[]) | ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>), | ((chunk: T, encoding: string) => Promise<R[]>),
options: TransformOptions = { options?: TransformOptions,
objectMode: true,
},
): Transform { ): Transform {
return new Transform({ return new Transform({
...options, ...options,

View File

@ -1,39 +1,87 @@
import { Transform } from "stream"; import {
import * as baseFunctions from "./baseFunctions"; Transform,
TransformOptions,
WritableOptions,
ReadableOptions,
} from "stream";
import { accumulator, accumulatorBy } from "./accumulator";
import { batch } from "./batch";
import { child } from "./child";
import { collect } from "./collect";
import { concat } from "./concat";
import { duplex } from "./duplex";
import { filter } from "./filter";
import { flatMap } from "./flatMap";
import { fromArray } from "./fromArray";
import { join } from "./join";
import { last } from "./last";
import { map } from "./map";
import { merge } from "./merge";
import { parallelMap } from "./parallelMap";
import { parse } from "./parse";
import { rate } from "./rate";
import { reduce } from "./reduce";
import { replace } from "./replace";
import { split } from "./split";
import { stringify } from "./stringify";
import { unbatch } from "./unbatch";
import { compose } from "./compose";
import { demux } from "./demux";
/** export default function mhysa(defaultOptions?: TransformOptions) {
function withDefaultOptions<T extends any[], R>(
n: number,
fn: (...args: T) => R,
): (...args: T) => R {
return (...args) => {
const options = {
...defaultOptions,
...((args[n] || {}) as TransformOptions | {}),
};
const provided = args.slice(0, n);
const nextArgs = [
...provided,
...Array(n - provided.length).fill(undefined),
options,
] as T;
return fn(...nextArgs) as R;
};
}
return {
/**
* Convert an array into a Readable stream of its elements * Convert an array into a Readable stream of its elements
* @param array Array of elements to stream * @param array Array of elements to stream
*/ */
export const fromArray = baseFunctions.fromArray; fromArray,
/** /**
* Return a ReadWrite stream that maps streamed chunks * Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options? * @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/ */
export const map = baseFunctions.map; map: withDefaultOptions(1, map),
/** /**
* Return a ReadWrite stream that flat maps streamed chunks * Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options? * @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/ */
export const flatMap = baseFunctions.flatMap; flatMap: withDefaultOptions(1, flatMap),
/** /**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks * @param predicate Predicate with which to filter scream chunks
* @param options? * @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects. * @param options.objectMode? Whether this stream should behave as a stream of objects.
*/ */
export const filter = baseFunctions.filter; filter: withDefaultOptions(1, filter),
/** /**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value * value
* @param iteratee Reducer function to apply on each streamed chunk * @param iteratee Reducer function to apply on each streamed chunk
@ -42,25 +90,25 @@ export const filter = baseFunctions.filter;
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/ */
export const reduce = baseFunctions.reduce; reduce: withDefaultOptions(2, reduce),
/** /**
* Return a ReadWrite stream that splits streamed chunks using the given separator * Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator? Separator to split by, defaulting to "\n" * @param separator? Separator to split by, defaulting to "\n"
* @param options? Defaults to encoding: utf8 * @param options? Defaults to encoding: utf8
* @param options.encoding? Encoding written chunks are assumed to use * @param options.encoding? Encoding written chunks are assumed to use
*/ */
export const split = baseFunctions.split; split,
/** /**
* Return a ReadWrite stream that joins streamed chunks using the given separator * Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with * @param separator Separator to join with
* @param options? Defaults to encoding: utf8 * @param options? Defaults to encoding: utf8
* @param options.encoding? Encoding written chunks are assumed to use * @param options.encoding? Encoding written chunks are assumed to use
*/ */
export const join = baseFunctions.join; join: withDefaultOptions(1, join),
/** /**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string * the streamed chunks with the specified replacement string
* @param searchValue Search string to use * @param searchValue Search string to use
@ -68,95 +116,95 @@ export const join = baseFunctions.join;
* @param options? Defaults to encoding: utf8 * @param options? Defaults to encoding: utf8
* @param options.encoding Encoding written chunks are assumed to use * @param options.encoding Encoding written chunks are assumed to use
*/ */
export const replace = baseFunctions.replace; replace,
/** /**
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string in utf8. * must be a fully defined JSON string in utf8.
*/ */
export const parse = baseFunctions.parse; parse,
/** /**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON * Return a ReadWrite stream that stringifies the streamed chunks to JSON
* @param options? * @param options?
* @param options.pretty If true, whitespace is inserted into the stringified chunks. * @param options.pretty If true, whitespace is inserted into the stringified chunks.
* *
*/ */
export const stringify = baseFunctions.stringify; stringify,
/** /**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer * Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options? * @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects * @param options.objectMode? Whether this stream should behave as a stream of objects
*/ */
export const collect = baseFunctions.collect; collect: withDefaultOptions(0, collect),
/** /**
* Return a Readable stream of readable streams concatenated together * Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate * @param streams Readable streams to concatenate
*/ */
export const concat = baseFunctions.concat; concat,
/** /**
* Return a Readable stream of readable streams concatenated together * Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to merge * @param streams Readable streams to merge
*/ */
export const merge = baseFunctions.merge; merge,
/** /**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to, * Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks * cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to * @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/ */
export const duplex = baseFunctions.duplex; duplex,
/** /**
* Return a Duplex stream from a child process' stdin and stdout * Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream * @param childProcess Child process from which to create duplex stream
*/ */
export const child = baseFunctions.child; child,
/** /**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended * ended
* @param readable Readable stream to wait on * @param readable Readable stream to wait on
*/ */
export const last = baseFunctions.last; last,
/** /**
* Stores chunks of data internally in array and batches when batchSize is reached. * Stores chunks of data internally in array and batches when batchSize is reached.
* @param batchSize Size of the batches, defaults to 1000. * @param batchSize Size of the batches, defaults to 1000.
* @param maxBatchAge? Max lifetime of a batch, defaults to 500 * @param maxBatchAge? Max lifetime of a batch, defaults to 500
*/
export function batch(batchSize?: number, maxBatchAge?: number): Transform {
return baseFunctions.batch(batchSize, maxBatchAge);
}
/**
* Unbatches and sends individual chunks of data.
*/
export const unbatch = baseFunctions.unbatch;
/**
* Limits rate of data transferred into stream.
* @param options? * @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects
*/
batch: withDefaultOptions(2, batch),
/**
* Unbatches and sends individual chunks of data.
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects
*/
unbatch: withDefaultOptions(0, unbatch),
/**
* Limits rate of data transferred into stream.
* @param targetRate? Desired rate in ms. * @param targetRate? Desired rate in ms.
* @param period? Period to sleep for when rate is above or equal to targetRate. * @param period? Period to sleep for when rate is above or equal to targetRate.
* @param options?
*/ */
export function rate(targetRate?: number, period?: number): Transform { rate: withDefaultOptions(2, rate),
return baseFunctions.rate(targetRate, period);
}
/** /**
* Limits number of parallel processes in flight. * Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes. * @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk. * @param func Function to execute on each data chunk.
* @param pause Amount of time to pause processing when max number of parallel processes are executing. * @param pause Amount of time to pause processing when max number of parallel processes are executing.
*/ */
export const parallelMap = baseFunctions.parallelMap; parallelMap: withDefaultOptions(3, parallelMap),
/** /**
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
* in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies: * in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies:
* 1. Sliding * 1. Sliding
@ -173,13 +221,12 @@ export const parallelMap = baseFunctions.parallelMap;
* each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element. * each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element.
* @param flushStrategy Buffering strategy to use. * @param flushStrategy Buffering strategy to use.
* @param batchSize Size of the batch (in units of buffer length or value at key). * @param batchSize Size of the batch (in units of buffer length or value at key).
* @param batchRate Desired rate of data transfer to next stream.
* @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer. * @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer.
* @param options Transform stream options * @param options Transform stream options
*/ */
export const accumulator = baseFunctions.accumulator; accumulator: withDefaultOptions(3, accumulator),
/** /**
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
* in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies: * in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies:
* 1. Sliding * 1. Sliding
@ -191,19 +238,18 @@ export const accumulator = baseFunctions.accumulator;
* @param flushStrategy Buffering strategy to use. * @param flushStrategy Buffering strategy to use.
* @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into * @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into
* or items need to be cleared from buffer. * or items need to be cleared from buffer.
* @param batchRate Desired rate of data transfer to next stream.
* @param options Transform stream options * @param options Transform stream options
*/ */
export const accumulatorBy = baseFunctions.accumulatorBy; accumulatorBy: withDefaultOptions(2, accumulatorBy),
/** /**
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
* @param streams Array of streams to compose. Minimum of two. * @param streams Array of streams to compose. Minimum of two.
* @param options Transform stream options * @param options Transform stream options
*/ */
export const compose = baseFunctions.compose; compose: withDefaultOptions(1, compose),
/** /**
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream. * @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
* @param demuxBy * @param demuxBy
@ -211,4 +257,6 @@ export const compose = baseFunctions.compose;
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source. * @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
* @param options Writable stream options * @param options Writable stream options
*/ */
export const demux = baseFunctions.demux; demux: withDefaultOptions(2, demux),
};
}

View File

@ -1,10 +1,10 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { StringDecoder } from "string_decoder"; import { StringDecoder } from "string_decoder";
import { WithEncoding } from "./baseDefinitions"; import { WithEncoding } from "./baseDefinitions";
export function join( export function join(
separator: string, separator: string,
options: WithEncoding = { encoding: "utf8" }, options?: WithEncoding & TransformOptions,
): Transform { ): Transform {
let isFirstChunk = true; let isFirstChunk = true;
const decoder = new StringDecoder(options.encoding); const decoder = new StringDecoder(options.encoding);

View File

@ -4,10 +4,8 @@ import { sleep } from "../helpers";
export function parallelMap<T, R>( export function parallelMap<T, R>(
mapper: (data: T) => R, mapper: (data: T) => R,
parallel: number = 10, parallel: number = 10,
sleepTime: number = 5, sleepTime: number = 1,
options: TransformOptions = { options?: TransformOptions,
objectMode: true,
},
) { ) {
let inflight = 0; let inflight = 0;
return new Transform({ return new Transform({

View File

@ -5,13 +5,12 @@ import { sleep } from "../helpers";
export function rate( export function rate(
targetRate: number = 50, targetRate: number = 50,
period: number = 1, period: number = 1,
options: TransformOptions = { options?: TransformOptions,
objectMode: true,
},
): Transform { ): Transform {
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip a full period const deltaMS = ((1 / targetRate) * 1000) / period; // Skip a full period
let total = 0; let total = 0;
const start = performance.now(); const start = performance.now();
console.log(options);
return new Transform({ return new Transform({
...options, ...options,
async transform(data, encoding, callback) { async transform(data, encoding, callback) {

View File

@ -5,9 +5,7 @@ export function reduce<T, R>(
| ((previousValue: R, chunk: T, encoding: string) => R) | ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>), | ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R, initialValue: R,
options: TransformOptions = { options?: TransformOptions,
objectMode: true,
},
) { ) {
let value = initialValue; let value = initialValue;
return new Transform({ return new Transform({

View File

@ -1,7 +1,6 @@
import { Transform } from "stream"; import { Transform } from "stream";
import { StringDecoder } from "string_decoder"; import { StringDecoder } from "string_decoder";
import { WithEncoding } from "./baseDefinitions"; import { WithEncoding } from "./baseDefinitions";
export function replace( export function replace(
searchValue: string | RegExp, searchValue: string | RegExp,
replaceValue: string, replaceValue: string,

View File

@ -2,6 +2,10 @@ import { Transform } from "stream";
import { StringDecoder } from "string_decoder"; import { StringDecoder } from "string_decoder";
import { WithEncoding } from "./baseDefinitions"; import { WithEncoding } from "./baseDefinitions";
interface SplitParams {
separator?: string | RegExp;
options?: WithEncoding;
}
export function split( export function split(
separator: string | RegExp = "\n", separator: string | RegExp = "\n",
options: WithEncoding = { encoding: "utf8" }, options: WithEncoding = { encoding: "utf8" },

View File

@ -1,10 +1,6 @@
import { Transform, TransformOptions } from "stream"; import { Transform, TransformOptions } from "stream";
export function unbatch( export function unbatch(options?: TransformOptions) {
options: TransformOptions = {
objectMode: true,
},
) {
return new Transform({ return new Transform({
...options, ...options,
transform(data, encoding, callback) { transform(data, encoding, callback) {

View File

@ -1,26 +1,2 @@
export { import mhysa from "./functions";
fromArray, export default mhysa;
map,
flatMap,
filter,
reduce,
split,
join,
replace,
parse,
stringify,
collect,
concat,
merge,
duplex,
child,
last,
batch,
unbatch,
rate,
parallelMap,
accumulator,
accumulatorBy,
compose,
demux,
} from "./functions";

View File

@ -1,9 +1,10 @@
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { Readable } from "stream"; import { Readable } from "stream";
import { accumulator, accumulatorBy } from "../src"; import mhysa from "../src";
import { FlushStrategy } from "../src/functions/accumulator"; import { FlushStrategy } from "../src/functions/accumulator";
import { performance } from "perf_hooks"; import { performance } from "perf_hooks";
const { accumulator, accumulatorBy } = mhysa({ objectMode: true });
test.cb("accumulator() rolling", t => { test.cb("accumulator() rolling", t => {
t.plan(3); t.plan(3);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { batch } from "../src"; import mhysa from "../src";
const { batch } = mhysa({ objectMode: true });
test.cb("batch() batches chunks together", t => { test.cb("batch() batches chunks together", t => {
t.plan(3); t.plan(3);
@ -31,7 +32,9 @@ test.cb("batch() yields a batch after the timeout", t => {
t.plan(3); t.plan(3);
const source = new Readable({ const source = new Readable({
objectMode: true, objectMode: true,
read(size: number) {}, read(size: number) {
return;
},
}); });
const expectedElements = [["a", "b"], ["c"], ["d"]]; const expectedElements = [["a", "b"], ["c"], ["d"]];
let i = 0; let i = 0;

View File

@ -2,7 +2,8 @@ import * as cp from "child_process";
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { child } from "../src"; import mhysa from "../src";
const { child } = mhysa();
test.cb( test.cb(
"child() allows easily writing to child process stdin and reading from its stdout", "child() allows easily writing to child process stdin and reading from its stdout",

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { collect } from "../src"; import mhysa from "../src";
const { collect } = mhysa();
test.cb( test.cb(
"collect() collects streamed elements into an array (object, flowing mode)", "collect() collects streamed elements into an array (object, flowing mode)",

View File

@ -1,8 +1,9 @@
const test = require("ava"); const test = require("ava");
const { expect } = require("chai"); const { expect } = require("chai");
const { compose, map } = require("../src");
const { sleep } = require("../src/helpers"); const { sleep } = require("../src/helpers");
import mhysa from "../src";
import { performance } from "perf_hooks"; import { performance } from "perf_hooks";
const { compose, map } = mhysa({ objectMode: true });
test.cb("compose() chains two streams together in the correct order", t => { test.cb("compose() chains two streams together in the correct order", t => {
t.plan(3); t.plan(3);
@ -211,7 +212,7 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write
expect(composed._writableState.length).to.be.equal(0); expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.closeTo( expect(performance.now() - start).to.be.closeTo(
_rate * highWaterMark, _rate * highWaterMark,
20, 40,
); );
}); });
@ -286,7 +287,7 @@ test.cb(
expect(composed._writableState.length).to.be.equal(0); expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.closeTo( expect(performance.now() - start).to.be.closeTo(
_rate * input.length, _rate * input.length,
25, 50,
); );
t.pass(); t.pass();
}); });

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { concat, collect } from "../src"; import mhysa from "../src";
const { concat, collect } = mhysa();
test.cb( test.cb(
"concat() concatenates multiple readable streams (object, flowing mode)", "concat() concatenates multiple readable streams (object, flowing mode)",

View File

@ -0,0 +1,21 @@
import { Readable } from "stream";
import test from "ava";
import mhysa from "../src";
const withDefaultOptions = mhysa({ objectMode: true });
const withoutOptions = mhysa();
test("Mhysa instances can have default options", t => {
let batch = withDefaultOptions.batch();
t.true(batch._readableState.objectMode);
t.true(batch._writableState.objectMode);
batch = withDefaultOptions.batch(3);
t.true(batch._readableState.objectMode);
t.true(batch._writableState.objectMode);
batch = withDefaultOptions.batch(3, 1);
t.true(batch._readableState.objectMode);
t.true(batch._writableState.objectMode);
batch = withDefaultOptions.batch(3, 1, { objectMode: false });
t.false(batch._readableState.objectMode);
t.false(batch._writableState.objectMode);
});

View File

@ -1,10 +1,11 @@
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
const { demux, map } = require("../src"); import mhysa from "../src";
import { Writable } from "stream"; import { Writable } from "stream";
const sinon = require("sinon"); const sinon = require("sinon");
const { sleep } = require("../src/helpers"); const { sleep } = require("../src/helpers");
import { performance } from "perf_hooks"; import { performance } from "perf_hooks";
const { demux, map } = mhysa();
interface Test { interface Test {
key: string; key: string;

View File

@ -2,7 +2,8 @@ import * as cp from "child_process";
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { duplex } from "../src"; import mhysa from "../src";
const { duplex } = mhysa();
test.cb( test.cb(
"duplex() combines a writable and readable stream into a ReadWrite stream", "duplex() combines a writable and readable stream into a ReadWrite stream",

View File

@ -1,7 +1,8 @@
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { Readable } from "stream"; import { Readable } from "stream";
import { filter } from "../src"; import mhysa from "../src";
const { filter } = mhysa();
test.cb("filter() filters elements synchronously", t => { test.cb("filter() filters elements synchronously", t => {
t.plan(2); t.plan(2);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { flatMap } from "../src"; import mhysa from "../src";
const { flatMap } = mhysa({ objectMode: true });
test.cb("flatMap() maps elements synchronously", t => { test.cb("flatMap() maps elements synchronously", t => {
t.plan(6); t.plan(6);

View File

@ -1,6 +1,7 @@
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { fromArray } from "../src"; import mhysa from "../src";
const { fromArray } = mhysa();
test.cb("fromArray() streams array elements in flowing mode", t => { test.cb("fromArray() streams array elements in flowing mode", t => {
t.plan(3); t.plan(3);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { join } from "../src"; import mhysa from "../src";
const { join } = mhysa();
test.cb("join() joins chunks using the specified separator", t => { test.cb("join() joins chunks using the specified separator", t => {
t.plan(9); t.plan(9);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { last } from "../src"; import mhysa from "../src";
const { last } = mhysa();
test("last() resolves to the last chunk streamed by the given readable stream", async t => { test("last() resolves to the last chunk streamed by the given readable stream", async t => {
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });

View File

@ -1,12 +1,15 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { map } from "../src"; import mhysa from "../src";
const { map } = mhysa();
test.cb("map() maps elements synchronously", t => { test.cb("map() maps elements synchronously", t => {
t.plan(3); t.plan(3);
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const mapStream = map((element: string) => element.toUpperCase()); const mapStream = map((element: string) => element.toUpperCase(), {
objectMode: true,
});
const expectedElements = ["A", "B", "C"]; const expectedElements = ["A", "B", "C"];
let i = 0; let i = 0;
source source
@ -28,10 +31,13 @@ test.cb("map() maps elements synchronously", t => {
test.cb("map() maps elements asynchronously", t => { test.cb("map() maps elements asynchronously", t => {
t.plan(3); t.plan(3);
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const mapStream = map(async (element: string) => { const mapStream = map(
async (element: string) => {
await Promise.resolve(); await Promise.resolve();
return element.toUpperCase(); return element.toUpperCase();
}); },
{ objectMode: true },
);
const expectedElements = ["A", "B", "C"]; const expectedElements = ["A", "B", "C"];
let i = 0; let i = 0;
source source

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { merge } from "../src"; import mhysa from "../src";
const { merge } = mhysa();
test.cb( test.cb(
"merge() merges multiple readable streams in chunk arrival order", "merge() merges multiple readable streams in chunk arrival order",

View File

@ -2,8 +2,9 @@ import { Readable } from "stream";
import { performance } from "perf_hooks"; import { performance } from "perf_hooks";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { parallelMap } from "../src"; import mhysa from "../src";
import { sleep } from "../src/helpers"; import { sleep } from "../src/helpers";
const { parallelMap } = mhysa({ objectMode: true });
test.cb("parallelMap() parallel mapping", t => { test.cb("parallelMap() parallel mapping", t => {
t.plan(6); t.plan(6);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { parse } from "../src"; import mhysa from "../src";
const { parse } = mhysa();
test.cb("parse() parses the streamed elements as JSON", t => { test.cb("parse() parses the streamed elements as JSON", t => {
t.plan(3); t.plan(3);

View File

@ -2,7 +2,8 @@ import { Readable } from "stream";
import { performance } from "perf_hooks"; import { performance } from "perf_hooks";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { rate } from "../src"; import mhysa from "../src";
const { rate } = mhysa({ objectMode: true });
test.cb("rate() sends data at a rate of 150", t => { test.cb("rate() sends data at a rate of 150", t => {
t.plan(5); t.plan(5);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { reduce } from "../src"; import mhysa from "../src";
const { reduce } = mhysa({ objectMode: true });
test.cb("reduce() reduces elements synchronously", t => { test.cb("reduce() reduces elements synchronously", t => {
t.plan(1); t.plan(1);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { replace } from "../src"; import mhysa from "../src";
const { replace } = mhysa();
test.cb( test.cb(
"replace() replaces occurrences of the given string in the streamed elements with the specified " + "replace() replaces occurrences of the given string in the streamed elements with the specified " +

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { split } from "../src"; import mhysa from "../src";
const { split } = mhysa();
test.cb("split() splits chunks using the default separator (\\n)", t => { test.cb("split() splits chunks using the default separator (\\n)", t => {
t.plan(5); t.plan(5);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { stringify } from "../src"; import mhysa from "../src";
const { stringify } = mhysa();
test.cb("stringify() stringifies the streamed elements as JSON", t => { test.cb("stringify() stringifies the streamed elements as JSON", t => {
t.plan(4); t.plan(4);

View File

@ -1,7 +1,8 @@
import { Readable } from "stream"; import { Readable } from "stream";
import test from "ava"; import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { unbatch, batch } from "../src"; import mhysa from "../src";
const { unbatch, batch } = mhysa({ objectMode: true });
test.cb("unbatch() unbatches", t => { test.cb("unbatch() unbatches", t => {
t.plan(3); t.plan(3);
@ -9,7 +10,7 @@ test.cb("unbatch() unbatches", t => {
const expectedElements = ["a", "b", "c"]; const expectedElements = ["a", "b", "c"];
let i = 0; let i = 0;
source source
.pipe(batch(3)) .pipe(batch(3, undefined, { objectMode: true }))
.pipe(unbatch()) .pipe(unbatch())
.on("data", (element: string) => { .on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]); expect(element).to.equal(expectedElements[i]);