diff --git a/.gitignore b/.gitignore index 893af13..b1e470d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,15 @@ node_modules dist sample_output yarn-error.log -TODO.md \ No newline at end of file +TODO.md + + +#VIM +## Swap +[._]*.s[a-v][a-z] +!*.svg # comment out if you don't need vector files +[._]*.sw[a-p] +[._]s[a-rt-v][a-z] +[._]ss[a-gi-z] +[._]sw[a-p] +*.orig diff --git a/package.json b/package.json index 06f9734..d8fe45e 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,7 @@ "test:all": "NODE_PATH=src node node_modules/.bin/ava", "lint": "tslint -p tsconfig.json", "validate:tslint": "tslint-config-prettier-check ./tslint.json", - "prepublishOnly": "yarn lint && yarn test && yarn tsc" + "prepublishOnly": "yarn lint && yarn test && yarn tsc -d" }, "dependencies": {}, "devDependencies": { diff --git a/src/functions/accumulator.ts b/src/functions/accumulator.ts index c4ac31a..58effa9 100644 --- a/src/functions/accumulator.ts +++ b/src/functions/accumulator.ts @@ -1,5 +1,4 @@ import { Transform, TransformOptions } from "stream"; -import { batch } from "."; export enum FlushStrategy { rolling = "rolling", @@ -129,24 +128,24 @@ export function accumulator( keyBy?: string, options?: TransformOptions, ): Transform { - if (flushStrategy === FlushStrategy.sliding) { - return sliding(batchSize, keyBy, options); - } else if (flushStrategy === FlushStrategy.rolling) { - return rolling(batchSize, keyBy, options); - } else { - return batch(batchSize); + switch (flushStrategy) { + case FlushStrategy.sliding: + return sliding(batchSize, keyBy, options); + case FlushStrategy.rolling: + return rolling(batchSize, keyBy, options); } } -export function accumulatorBy( - flushStrategy: S, +export function accumulatorBy( + flushStrategy: FlushStrategy, iteratee: AccumulatorByIteratee, options?: TransformOptions, ): Transform { - if (flushStrategy === FlushStrategy.sliding) { - return slidingBy(iteratee, options); - } else { - return rollingBy(iteratee, options); + switch (flushStrategy) { + case FlushStrategy.sliding: + return slidingBy(iteratee, options); + case FlushStrategy.rolling: + return rollingBy(iteratee, options); } } diff --git a/src/functions/baseFunctions.ts b/src/functions/baseFunctions.ts deleted file mode 100644 index 6ff480a..0000000 --- a/src/functions/baseFunctions.ts +++ /dev/null @@ -1,23 +0,0 @@ -export { accumulator, accumulatorBy } from "./accumulator"; -export { batch } from "./batch"; -export { child } from "./child"; -export { collect } from "./collect"; -export { concat } from "./concat"; -export { duplex } from "./duplex"; -export { filter } from "./filter"; -export { flatMap } from "./flatMap"; -export { fromArray } from "./fromArray"; -export { join } from "./join"; -export { last } from "./last"; -export { map } from "./map"; -export { merge } from "./merge"; -export { parallelMap } from "./parallelMap"; -export { parse } from "./parse"; -export { rate } from "./rate"; -export { reduce } from "./reduce"; -export { replace } from "./replace"; -export { split } from "./split"; -export { stringify } from "./stringify"; -export { unbatch } from "./unbatch"; -export { compose } from "./compose"; -export { demux } from "./demux"; diff --git a/src/functions/batch.ts b/src/functions/batch.ts index e9f8915..661301d 100644 --- a/src/functions/batch.ts +++ b/src/functions/batch.ts @@ -3,9 +3,7 @@ import { Transform, TransformOptions } from "stream"; export function batch( batchSize: number = 1000, maxBatchAge: number = 500, - options: TransformOptions = { - objectMode: true, - }, + options: TransformOptions = {}, ): Transform { let buffer: any[] = []; let timer: NodeJS.Timer | null = null; diff --git a/src/functions/child.ts b/src/functions/child.ts index 73bdbef..a312562 100644 --- a/src/functions/child.ts +++ b/src/functions/child.ts @@ -1,5 +1,5 @@ import { ChildProcess } from "child_process"; -import { duplex } from "./baseFunctions"; +import { duplex } from "./duplex"; export function child(childProcess: ChildProcess) { if (childProcess.stdin === null) { diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 9e26594..6435447 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -25,7 +25,7 @@ const eventsTarget = { type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; export function demux( - construct: () => DemuxStreams, + construct: (destKey?: string) => DemuxStreams, demuxBy: string | ((chunk: any) => string), options?: WritableOptions, ): Writable { diff --git a/src/functions/flatMap.ts b/src/functions/flatMap.ts index dd7820d..3a87c52 100644 --- a/src/functions/flatMap.ts +++ b/src/functions/flatMap.ts @@ -4,9 +4,7 @@ export function flatMap( mapper: | ((chunk: T, encoding: string) => R[]) | ((chunk: T, encoding: string) => Promise), - options: TransformOptions = { - objectMode: true, - }, + options?: TransformOptions, ): Transform { return new Transform({ ...options, diff --git a/src/functions/index.ts b/src/functions/index.ts index 778b1b9..e64fe5e 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -1,214 +1,262 @@ -import { Transform } from "stream"; -import * as baseFunctions from "./baseFunctions"; +import { + Transform, + TransformOptions, + WritableOptions, + ReadableOptions, +} from "stream"; +import { accumulator, accumulatorBy } from "./accumulator"; +import { batch } from "./batch"; +import { child } from "./child"; +import { collect } from "./collect"; +import { concat } from "./concat"; +import { duplex } from "./duplex"; +import { filter } from "./filter"; +import { flatMap } from "./flatMap"; +import { fromArray } from "./fromArray"; +import { join } from "./join"; +import { last } from "./last"; +import { map } from "./map"; +import { merge } from "./merge"; +import { parallelMap } from "./parallelMap"; +import { parse } from "./parse"; +import { rate } from "./rate"; +import { reduce } from "./reduce"; +import { replace } from "./replace"; +import { split } from "./split"; +import { stringify } from "./stringify"; +import { unbatch } from "./unbatch"; +import { compose } from "./compose"; +import { demux } from "./demux"; -/** - * Convert an array into a Readable stream of its elements - * @param array Array of elements to stream - */ -export const fromArray = baseFunctions.fromArray; +export default function mhysa(defaultOptions?: TransformOptions) { + function withDefaultOptions( + n: number, + fn: (...args: T) => R, + ): (...args: T) => R { + return (...args) => { + const options = { + ...defaultOptions, + ...((args[n] || {}) as TransformOptions | {}), + }; + const provided = args.slice(0, n); + const nextArgs = [ + ...provided, + ...Array(n - provided.length).fill(undefined), + options, + ] as T; + return fn(...nextArgs) as R; + }; + } -/** - * Return a ReadWrite stream that maps streamed chunks - * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) - * @param options? - * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects - */ -export const map = baseFunctions.map; + return { + /** + * Convert an array into a Readable stream of its elements + * @param array Array of elements to stream + */ + fromArray, -/** - * Return a ReadWrite stream that flat maps streamed chunks - * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) - * @param options? - * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects - */ -export const flatMap = baseFunctions.flatMap; + /** + * Return a ReadWrite stream that maps streamed chunks + * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) + * @param options? + * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects + */ + map: withDefaultOptions(1, map), -/** - * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold - * @param predicate Predicate with which to filter scream chunks - * @param options? - * @param options.objectMode? Whether this stream should behave as a stream of objects. - */ -export const filter = baseFunctions.filter; + /** + * Return a ReadWrite stream that flat maps streamed chunks + * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) + * @param options? + * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects + */ + flatMap: withDefaultOptions(1, flatMap), -/** - * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that - * value - * @param iteratee Reducer function to apply on each streamed chunk - * @param initialValue Initial value - * @param options? - * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects - * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects - */ -export const reduce = baseFunctions.reduce; + /** + * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold + * @param predicate Predicate with which to filter scream chunks + * @param options? + * @param options.objectMode? Whether this stream should behave as a stream of objects. + */ + filter: withDefaultOptions(1, filter), -/** - * Return a ReadWrite stream that splits streamed chunks using the given separator - * @param separator? Separator to split by, defaulting to "\n" - * @param options? Defaults to encoding: utf8 - * @param options.encoding? Encoding written chunks are assumed to use - */ -export const split = baseFunctions.split; + /** + * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that + * value + * @param iteratee Reducer function to apply on each streamed chunk + * @param initialValue Initial value + * @param options? + * @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects + */ + reduce: withDefaultOptions(2, reduce), -/** - * Return a ReadWrite stream that joins streamed chunks using the given separator - * @param separator Separator to join with - * @param options? Defaults to encoding: utf8 - * @param options.encoding? Encoding written chunks are assumed to use - */ -export const join = baseFunctions.join; + /** + * Return a ReadWrite stream that splits streamed chunks using the given separator + * @param separator? Separator to split by, defaulting to "\n" + * @param options? Defaults to encoding: utf8 + * @param options.encoding? Encoding written chunks are assumed to use + */ + split, -/** - * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in - * the streamed chunks with the specified replacement string - * @param searchValue Search string to use - * @param replaceValue Replacement string to use - * @param options? Defaults to encoding: utf8 - * @param options.encoding Encoding written chunks are assumed to use - */ -export const replace = baseFunctions.replace; + /** + * Return a ReadWrite stream that joins streamed chunks using the given separator + * @param separator Separator to join with + * @param options? Defaults to encoding: utf8 + * @param options.encoding? Encoding written chunks are assumed to use + */ + join: withDefaultOptions(1, join), -/** - * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk - * must be a fully defined JSON string in utf8. - */ -export const parse = baseFunctions.parse; + /** + * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in + * the streamed chunks with the specified replacement string + * @param searchValue Search string to use + * @param replaceValue Replacement string to use + * @param options? Defaults to encoding: utf8 + * @param options.encoding Encoding written chunks are assumed to use + */ + replace, -/** - * Return a ReadWrite stream that stringifies the streamed chunks to JSON - * @param options? - * @param options.pretty If true, whitespace is inserted into the stringified chunks. - * - */ -export const stringify = baseFunctions.stringify; + /** + * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk + * must be a fully defined JSON string in utf8. + */ + parse, -/** - * Return a ReadWrite stream that collects streamed chunks into an array or buffer - * @param options? - * @param options.objectMode? Whether this stream should behave as a stream of objects - */ -export const collect = baseFunctions.collect; + /** + * Return a ReadWrite stream that stringifies the streamed chunks to JSON + * @param options? + * @param options.pretty If true, whitespace is inserted into the stringified chunks. + * + */ + stringify, -/** - * Return a Readable stream of readable streams concatenated together - * @param streams Readable streams to concatenate - */ -export const concat = baseFunctions.concat; + /** + * Return a ReadWrite stream that collects streamed chunks into an array or buffer + * @param options? + * @param options.objectMode? Whether this stream should behave as a stream of objects + */ + collect: withDefaultOptions(0, collect), -/** - * Return a Readable stream of readable streams concatenated together - * @param streams Readable streams to merge - */ -export const merge = baseFunctions.merge; + /** + * Return a Readable stream of readable streams concatenated together + * @param streams Readable streams to concatenate + */ + concat, -/** - * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, - * cause the given readable stream to yield chunks - * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to - * @param readable Readable stream assumed to yield chunks when the writable stream is written to - */ -export const duplex = baseFunctions.duplex; + /** + * Return a Readable stream of readable streams concatenated together + * @param streams Readable streams to merge + */ + merge, -/** - * Return a Duplex stream from a child process' stdin and stdout - * @param childProcess Child process from which to create duplex stream - */ -export const child = baseFunctions.child; + /** + * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, + * cause the given readable stream to yield chunks + * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to + * @param readable Readable stream assumed to yield chunks when the writable stream is written to + */ + duplex, -/** - * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has - * ended - * @param readable Readable stream to wait on - */ -export const last = baseFunctions.last; + /** + * Return a Duplex stream from a child process' stdin and stdout + * @param childProcess Child process from which to create duplex stream + */ + child, -/** - * Stores chunks of data internally in array and batches when batchSize is reached. - * @param batchSize Size of the batches, defaults to 1000. - * @param maxBatchAge? Max lifetime of a batch, defaults to 500 - */ -export function batch(batchSize?: number, maxBatchAge?: number): Transform { - return baseFunctions.batch(batchSize, maxBatchAge); + /** + * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has + * ended + * @param readable Readable stream to wait on + */ + last, + + /** + * Stores chunks of data internally in array and batches when batchSize is reached. + * @param batchSize Size of the batches, defaults to 1000. + * @param maxBatchAge? Max lifetime of a batch, defaults to 500 + * @param options? + * @param options.objectMode? Whether this stream should behave as a stream of objects + */ + batch: withDefaultOptions(2, batch), + + /** + * Unbatches and sends individual chunks of data. + * @param options? + * @param options.objectMode? Whether this stream should behave as a stream of objects + */ + unbatch: withDefaultOptions(0, unbatch), + + /** + * Limits rate of data transferred into stream. + * @param targetRate? Desired rate in ms. + * @param period? Period to sleep for when rate is above or equal to targetRate. + * @param options? + */ + rate: withDefaultOptions(2, rate), + + /** + * Limits number of parallel processes in flight. + * @param parallel Max number of parallel processes. + * @param func Function to execute on each data chunk. + * @param pause Amount of time to pause processing when max number of parallel processes are executing. + */ + parallelMap: withDefaultOptions(3, parallelMap), + + /** + * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items + * in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies: + * 1. Sliding + * - If the buffer is larger than the batchSize, the front of the buffer is popped to maintain + * the batchSize. When no key is provided, the batchSize is effectively the buffer length. When + * a key is provided, the batchSize is based on the value at that key. For example, given a key + * of `timestamp` and a batchSize of 3000, each item in the buffer will be guaranteed to be + * within 3000 timestamp units from the first element. This means that with a key, multiple elements + * may be spliced off the front of the buffer. The buffer is then pushed into the stream. + * 2. Rolling + * - If the buffer is larger than the batchSize, the buffer is cleared and pushed into the stream. + * When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize + * is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000, + * each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element. + * @param flushStrategy Buffering strategy to use. + * @param batchSize Size of the batch (in units of buffer length or value at key). + * @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer. + * @param options Transform stream options + */ + accumulator: withDefaultOptions(3, accumulator), + + /** + * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items + * in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies: + * 1. Sliding + * - If the iteratee returns false, the front of the buffer is popped until iteratee returns true. The + * item is pushed into the buffer and buffer is pushed into stream. + * 2. Rolling + * - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is + * then pushed into the buffer. + * @param flushStrategy Buffering strategy to use. + * @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into + * or items need to be cleared from buffer. + * @param options Transform stream options + */ + accumulatorBy: withDefaultOptions(2, accumulatorBy), + + /** + * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. + * @param streams Array of streams to compose. Minimum of two. + * @param options Transform stream options + */ + compose: withDefaultOptions(1, compose), + + /** + * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. + * @param construct Constructor for new output source. Should return a Writable or ReadWrite stream. + * @param demuxBy + * @param demuxBy.key? Key to fetch value from source chunks to demultiplex source. + * @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source. + * @param options Writable stream options + */ + demux: withDefaultOptions(2, demux), + }; } - -/** - * Unbatches and sends individual chunks of data. - */ -export const unbatch = baseFunctions.unbatch; - -/** - * Limits rate of data transferred into stream. - * @param options? - * @param targetRate? Desired rate in ms. - * @param period? Period to sleep for when rate is above or equal to targetRate. - */ -export function rate(targetRate?: number, period?: number): Transform { - return baseFunctions.rate(targetRate, period); -} - -/** - * Limits number of parallel processes in flight. - * @param parallel Max number of parallel processes. - * @param func Function to execute on each data chunk. - * @param pause Amount of time to pause processing when max number of parallel processes are executing. - */ -export const parallelMap = baseFunctions.parallelMap; - -/** - * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items - * in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies: - * 1. Sliding - * - If the buffer is larger than the batchSize, the front of the buffer is popped to maintain - * the batchSize. When no key is provided, the batchSize is effectively the buffer length. When - * a key is provided, the batchSize is based on the value at that key. For example, given a key - * of `timestamp` and a batchSize of 3000, each item in the buffer will be guaranteed to be - * within 3000 timestamp units from the first element. This means that with a key, multiple elements - * may be spliced off the front of the buffer. The buffer is then pushed into the stream. - * 2. Rolling - * - If the buffer is larger than the batchSize, the buffer is cleared and pushed into the stream. - * When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize - * is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000, - * each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element. - * @param flushStrategy Buffering strategy to use. - * @param batchSize Size of the batch (in units of buffer length or value at key). - * @param batchRate Desired rate of data transfer to next stream. - * @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer. - * @param options Transform stream options - */ -export const accumulator = baseFunctions.accumulator; - -/** - * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items - * in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies: - * 1. Sliding - * - If the iteratee returns false, the front of the buffer is popped until iteratee returns true. The - * item is pushed into the buffer and buffer is pushed into stream. - * 2. Rolling - * - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is - * then pushed into the buffer. - * @param flushStrategy Buffering strategy to use. - * @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into - * or items need to be cleared from buffer. - * @param batchRate Desired rate of data transfer to next stream. - * @param options Transform stream options - */ -export const accumulatorBy = baseFunctions.accumulatorBy; - -/** - * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. - * @param streams Array of streams to compose. Minimum of two. - * @param options Transform stream options - */ -export const compose = baseFunctions.compose; - -/** - * Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream. - * @param construct Constructor for new output source. Should return a Writable or ReadWrite stream. - * @param demuxBy - * @param demuxBy.key? Key to fetch value from source chunks to demultiplex source. - * @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source. - * @param options Writable stream options - */ -export const demux = baseFunctions.demux; diff --git a/src/functions/join.ts b/src/functions/join.ts index b49022b..32dbc83 100644 --- a/src/functions/join.ts +++ b/src/functions/join.ts @@ -1,10 +1,10 @@ -import { Transform } from "stream"; +import { Transform, TransformOptions } from "stream"; import { StringDecoder } from "string_decoder"; import { WithEncoding } from "./baseDefinitions"; export function join( separator: string, - options: WithEncoding = { encoding: "utf8" }, + options: WithEncoding & TransformOptions = { encoding: "utf8" }, ): Transform { let isFirstChunk = true; const decoder = new StringDecoder(options.encoding); diff --git a/src/functions/parallelMap.ts b/src/functions/parallelMap.ts index 8cb3e80..ead351d 100644 --- a/src/functions/parallelMap.ts +++ b/src/functions/parallelMap.ts @@ -4,10 +4,8 @@ import { sleep } from "../helpers"; export function parallelMap( mapper: (data: T) => R, parallel: number = 10, - sleepTime: number = 5, - options: TransformOptions = { - objectMode: true, - }, + sleepTime: number = 1, + options?: TransformOptions, ) { let inflight = 0; return new Transform({ diff --git a/src/functions/rate.ts b/src/functions/rate.ts index 083d854..8f0f734 100644 --- a/src/functions/rate.ts +++ b/src/functions/rate.ts @@ -5,9 +5,7 @@ import { sleep } from "../helpers"; export function rate( targetRate: number = 50, period: number = 1, - options: TransformOptions = { - objectMode: true, - }, + options?: TransformOptions, ): Transform { const deltaMS = ((1 / targetRate) * 1000) / period; // Skip a full period let total = 0; diff --git a/src/functions/reduce.ts b/src/functions/reduce.ts index 9f19ca4..f595633 100644 --- a/src/functions/reduce.ts +++ b/src/functions/reduce.ts @@ -5,9 +5,7 @@ export function reduce( | ((previousValue: R, chunk: T, encoding: string) => R) | ((previousValue: R, chunk: T, encoding: string) => Promise), initialValue: R, - options: TransformOptions = { - objectMode: true, - }, + options?: TransformOptions, ) { let value = initialValue; return new Transform({ diff --git a/src/functions/replace.ts b/src/functions/replace.ts index dc5a05e..f3938ba 100644 --- a/src/functions/replace.ts +++ b/src/functions/replace.ts @@ -1,7 +1,6 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; import { WithEncoding } from "./baseDefinitions"; - export function replace( searchValue: string | RegExp, replaceValue: string, diff --git a/src/functions/unbatch.ts b/src/functions/unbatch.ts index 93d6bfe..02375ed 100644 --- a/src/functions/unbatch.ts +++ b/src/functions/unbatch.ts @@ -1,10 +1,6 @@ import { Transform, TransformOptions } from "stream"; -export function unbatch( - options: TransformOptions = { - objectMode: true, - }, -) { +export function unbatch(options?: TransformOptions) { return new Transform({ ...options, transform(data, encoding, callback) { diff --git a/src/index.ts b/src/index.ts index 924b246..b06e998 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,26 +1,2 @@ -export { - fromArray, - map, - flatMap, - filter, - reduce, - split, - join, - replace, - parse, - stringify, - collect, - concat, - merge, - duplex, - child, - last, - batch, - unbatch, - rate, - parallelMap, - accumulator, - accumulatorBy, - compose, - demux, -} from "./functions"; +import mhysa from "./functions"; +export default mhysa; diff --git a/tests/accumulator.spec.ts b/tests/accumulator.spec.ts index 51d6098..bb07a3b 100644 --- a/tests/accumulator.spec.ts +++ b/tests/accumulator.spec.ts @@ -1,9 +1,10 @@ import test from "ava"; import { expect } from "chai"; import { Readable } from "stream"; -import { accumulator, accumulatorBy } from "../src"; +import mhysa from "../src"; import { FlushStrategy } from "../src/functions/accumulator"; import { performance } from "perf_hooks"; +const { accumulator, accumulatorBy } = mhysa({ objectMode: true }); test.cb("accumulator() rolling", t => { t.plan(3); diff --git a/tests/batch.spec.ts b/tests/batch.spec.ts index 0c2cd3a..d4a1ffe 100644 --- a/tests/batch.spec.ts +++ b/tests/batch.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { batch } from "../src"; +import mhysa from "../src"; +const { batch } = mhysa({ objectMode: true }); test.cb("batch() batches chunks together", t => { t.plan(3); @@ -31,7 +32,9 @@ test.cb("batch() yields a batch after the timeout", t => { t.plan(3); const source = new Readable({ objectMode: true, - read(size: number) {}, + read(size: number) { + return; + }, }); const expectedElements = [["a", "b"], ["c"], ["d"]]; let i = 0; diff --git a/tests/child.spec.ts b/tests/child.spec.ts index 7730790..0b837b7 100644 --- a/tests/child.spec.ts +++ b/tests/child.spec.ts @@ -2,7 +2,8 @@ import * as cp from "child_process"; import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { child } from "../src"; +import mhysa from "../src"; +const { child } = mhysa(); test.cb( "child() allows easily writing to child process stdin and reading from its stdout", diff --git a/tests/collect.spec.ts b/tests/collect.spec.ts index 1e4cd03..ae13dc2 100644 --- a/tests/collect.spec.ts +++ b/tests/collect.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { collect } from "../src"; +import mhysa from "../src"; +const { collect } = mhysa(); test.cb( "collect() collects streamed elements into an array (object, flowing mode)", diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 820f6bb..269badc 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -1,8 +1,9 @@ const test = require("ava"); const { expect } = require("chai"); -const { compose, map } = require("../src"); const { sleep } = require("../src/helpers"); +import mhysa from "../src"; import { performance } from "perf_hooks"; +const { compose, map } = mhysa({ objectMode: true }); test.cb("compose() chains two streams together in the correct order", t => { t.plan(3); @@ -211,7 +212,7 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write expect(composed._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.closeTo( _rate * highWaterMark, - 20, + 40, ); }); @@ -286,7 +287,7 @@ test.cb( expect(composed._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.closeTo( _rate * input.length, - 25, + 50, ); t.pass(); }); diff --git a/tests/concat.spec.ts b/tests/concat.spec.ts index 596fbad..350ab65 100644 --- a/tests/concat.spec.ts +++ b/tests/concat.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { concat, collect } from "../src"; +import mhysa from "../src"; +const { concat, collect } = mhysa(); test.cb( "concat() concatenates multiple readable streams (object, flowing mode)", diff --git a/tests/defaultOptions.spec.ts b/tests/defaultOptions.spec.ts new file mode 100644 index 0000000..3281001 --- /dev/null +++ b/tests/defaultOptions.spec.ts @@ -0,0 +1,21 @@ +import { Readable } from "stream"; +import test from "ava"; +import mhysa from "../src"; + +const withDefaultOptions = mhysa({ objectMode: true }); +const withoutOptions = mhysa(); + +test("Mhysa instances can have default options", t => { + let batch = withDefaultOptions.batch(); + t.true(batch._readableState.objectMode); + t.true(batch._writableState.objectMode); + batch = withDefaultOptions.batch(3); + t.true(batch._readableState.objectMode); + t.true(batch._writableState.objectMode); + batch = withDefaultOptions.batch(3, 1); + t.true(batch._readableState.objectMode); + t.true(batch._writableState.objectMode); + batch = withDefaultOptions.batch(3, 1, { objectMode: false }); + t.false(batch._readableState.objectMode); + t.false(batch._writableState.objectMode); +}); diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 2d43ec1..5c7e017 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -1,10 +1,11 @@ import test from "ava"; import { expect } from "chai"; -const { demux, map } = require("../src"); +import mhysa from "../src"; import { Writable } from "stream"; const sinon = require("sinon"); const { sleep } = require("../src/helpers"); import { performance } from "perf_hooks"; +const { demux, map } = mhysa(); interface Test { key: string; diff --git a/tests/duplex.spec.ts b/tests/duplex.spec.ts index e5fafd7..f0838d4 100644 --- a/tests/duplex.spec.ts +++ b/tests/duplex.spec.ts @@ -2,7 +2,8 @@ import * as cp from "child_process"; import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { duplex } from "../src"; +import mhysa from "../src"; +const { duplex } = mhysa(); test.cb( "duplex() combines a writable and readable stream into a ReadWrite stream", diff --git a/tests/filter.spec.ts b/tests/filter.spec.ts index badfda7..9d3e7c0 100644 --- a/tests/filter.spec.ts +++ b/tests/filter.spec.ts @@ -1,7 +1,8 @@ import test from "ava"; import { expect } from "chai"; import { Readable } from "stream"; -import { filter } from "../src"; +import mhysa from "../src"; +const { filter } = mhysa(); test.cb("filter() filters elements synchronously", t => { t.plan(2); diff --git a/tests/flatMap.spec.ts b/tests/flatMap.spec.ts index 84cebfb..6a1b468 100644 --- a/tests/flatMap.spec.ts +++ b/tests/flatMap.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { flatMap } from "../src"; +import mhysa from "../src"; +const { flatMap } = mhysa({ objectMode: true }); test.cb("flatMap() maps elements synchronously", t => { t.plan(6); diff --git a/tests/fromArray.spec.ts b/tests/fromArray.spec.ts index 3e4c93e..7416923 100644 --- a/tests/fromArray.spec.ts +++ b/tests/fromArray.spec.ts @@ -1,6 +1,7 @@ import test from "ava"; import { expect } from "chai"; -import { fromArray } from "../src"; +import mhysa from "../src"; +const { fromArray } = mhysa(); test.cb("fromArray() streams array elements in flowing mode", t => { t.plan(3); diff --git a/tests/join.spec.ts b/tests/join.spec.ts index 6b0be52..06cf717 100644 --- a/tests/join.spec.ts +++ b/tests/join.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { join } from "../src"; +import mhysa from "../src"; +const { join } = mhysa(); test.cb("join() joins chunks using the specified separator", t => { t.plan(9); diff --git a/tests/last.spec.ts b/tests/last.spec.ts index 033c9d8..7c6f321 100644 --- a/tests/last.spec.ts +++ b/tests/last.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { last } from "../src"; +import mhysa from "../src"; +const { last } = mhysa(); test("last() resolves to the last chunk streamed by the given readable stream", async t => { const source = new Readable({ objectMode: true }); diff --git a/tests/map.spec.ts b/tests/map.spec.ts index 35c8e84..7c416d6 100644 --- a/tests/map.spec.ts +++ b/tests/map.spec.ts @@ -1,12 +1,15 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { map } from "../src"; +import mhysa from "../src"; +const { map } = mhysa(); test.cb("map() maps elements synchronously", t => { t.plan(3); const source = new Readable({ objectMode: true }); - const mapStream = map((element: string) => element.toUpperCase()); + const mapStream = map((element: string) => element.toUpperCase(), { + objectMode: true, + }); const expectedElements = ["A", "B", "C"]; let i = 0; source @@ -28,10 +31,13 @@ test.cb("map() maps elements synchronously", t => { test.cb("map() maps elements asynchronously", t => { t.plan(3); const source = new Readable({ objectMode: true }); - const mapStream = map(async (element: string) => { - await Promise.resolve(); - return element.toUpperCase(); - }); + const mapStream = map( + async (element: string) => { + await Promise.resolve(); + return element.toUpperCase(); + }, + { objectMode: true }, + ); const expectedElements = ["A", "B", "C"]; let i = 0; source diff --git a/tests/merge.spec.ts b/tests/merge.spec.ts index dbbfd79..3fe769f 100644 --- a/tests/merge.spec.ts +++ b/tests/merge.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { merge } from "../src"; +import mhysa from "../src"; +const { merge } = mhysa(); test.cb( "merge() merges multiple readable streams in chunk arrival order", diff --git a/tests/parallelMap.spec.ts b/tests/parallelMap.spec.ts index dff719a..89a80ca 100644 --- a/tests/parallelMap.spec.ts +++ b/tests/parallelMap.spec.ts @@ -2,8 +2,9 @@ import { Readable } from "stream"; import { performance } from "perf_hooks"; import test from "ava"; import { expect } from "chai"; -import { parallelMap } from "../src"; +import mhysa from "../src"; import { sleep } from "../src/helpers"; +const { parallelMap } = mhysa({ objectMode: true }); test.cb("parallelMap() parallel mapping", t => { t.plan(6); diff --git a/tests/parse.spec.ts b/tests/parse.spec.ts index d9aebbb..ea18be3 100644 --- a/tests/parse.spec.ts +++ b/tests/parse.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { parse } from "../src"; +import mhysa from "../src"; +const { parse } = mhysa(); test.cb("parse() parses the streamed elements as JSON", t => { t.plan(3); diff --git a/tests/rate.spec.ts b/tests/rate.spec.ts index 1c26554..8d88841 100644 --- a/tests/rate.spec.ts +++ b/tests/rate.spec.ts @@ -2,7 +2,8 @@ import { Readable } from "stream"; import { performance } from "perf_hooks"; import test from "ava"; import { expect } from "chai"; -import { rate } from "../src"; +import mhysa from "../src"; +const { rate } = mhysa({ objectMode: true }); test.cb("rate() sends data at a rate of 150", t => { t.plan(5); diff --git a/tests/reduce.spec.ts b/tests/reduce.spec.ts index 8d504db..de70740 100644 --- a/tests/reduce.spec.ts +++ b/tests/reduce.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { reduce } from "../src"; +import mhysa from "../src"; +const { reduce } = mhysa({ objectMode: true }); test.cb("reduce() reduces elements synchronously", t => { t.plan(1); diff --git a/tests/replace.spec.ts b/tests/replace.spec.ts index 5829f8e..7297759 100644 --- a/tests/replace.spec.ts +++ b/tests/replace.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { replace } from "../src"; +import mhysa from "../src"; +const { replace } = mhysa(); test.cb( "replace() replaces occurrences of the given string in the streamed elements with the specified " + diff --git a/tests/split.spec.ts b/tests/split.spec.ts index 1819e2b..9f287d0 100644 --- a/tests/split.spec.ts +++ b/tests/split.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { split } from "../src"; +import mhysa from "../src"; +const { split } = mhysa(); test.cb("split() splits chunks using the default separator (\\n)", t => { t.plan(5); diff --git a/tests/stringify.spec.ts b/tests/stringify.spec.ts index 7452e99..fb138ef 100644 --- a/tests/stringify.spec.ts +++ b/tests/stringify.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { stringify } from "../src"; +import mhysa from "../src"; +const { stringify } = mhysa(); test.cb("stringify() stringifies the streamed elements as JSON", t => { t.plan(4); diff --git a/tests/unbatch.spec.ts b/tests/unbatch.spec.ts index d48b1b9..c3351cf 100644 --- a/tests/unbatch.spec.ts +++ b/tests/unbatch.spec.ts @@ -1,7 +1,8 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; -import { unbatch, batch } from "../src"; +import mhysa from "../src"; +const { unbatch, batch } = mhysa({ objectMode: true }); test.cb("unbatch() unbatches", t => { t.plan(3);