From d6d974ee0d0b4312fb4de690911ad9a5acd34563 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 15 Aug 2019 15:54:53 -0400 Subject: [PATCH] baseDefinitions --- src/functions/accumulator/index.ts | 8 ++++++-- src/functions/{definitions.ts => baseDefinitions.ts} | 7 ------- src/functions/batch/index.ts | 2 +- src/functions/collect/index.ts | 2 +- src/functions/filter/index.ts | 2 +- src/functions/flatMap/index.ts | 2 +- src/functions/join/index.ts | 2 +- src/functions/map/index.ts | 2 +- src/functions/parallelMap/index.ts | 2 +- src/functions/parse/index.ts | 2 +- src/functions/rate/index.ts | 2 +- src/functions/reduce/index.ts | 2 +- src/functions/replace/index.ts | 2 +- src/functions/split/index.ts | 2 +- src/functions/stringify/index.ts | 2 +- src/functions/unbatch/index.ts | 2 +- 16 files changed, 20 insertions(+), 23 deletions(-) rename src/functions/{definitions.ts => baseDefinitions.ts} (73%) diff --git a/src/functions/accumulator/index.ts b/src/functions/accumulator/index.ts index c7b9f51..81531c7 100644 --- a/src/functions/accumulator/index.ts +++ b/src/functions/accumulator/index.ts @@ -1,15 +1,19 @@ import { Transform } from "stream"; import { AccumulatorByIteratee, FlushStrategy } from "./definitions"; +import { TransformOptions } from "../baseDefinitions"; import { batch } from "../../index"; function _accumulator( accumulateBy: (data: T, buffer: T[], stream: Transform) => void, shouldFlush: boolean = true, + options: TransformOptions = { + readableObjectMode: true, + writableObjectMode: true, + }, ) { const buffer: T[] = []; return new Transform({ - readableObjectMode: true, - writableObjectMode: true, + ...options, transform(data: any, encoding, callback) { try { accumulateBy(data, buffer, this); diff --git a/src/functions/definitions.ts b/src/functions/baseDefinitions.ts similarity index 73% rename from src/functions/definitions.ts rename to src/functions/baseDefinitions.ts index 6cc97a8..791ada9 100644 --- a/src/functions/definitions.ts +++ b/src/functions/baseDefinitions.ts @@ -21,10 +21,3 @@ export type JsonValue = JsonPrimitive | JsonPrimitive[]; export interface JsonParseOptions { pretty: boolean; } - -export enum FlushStrategy { - rolling = "rolling", - sliding = "sliding", -} - -export type AccumulatorByIteratee = (event: T, bufferChunk: T) => boolean; diff --git a/src/functions/batch/index.ts b/src/functions/batch/index.ts index 6ff3d87..4dad33a 100644 --- a/src/functions/batch/index.ts +++ b/src/functions/batch/index.ts @@ -1,5 +1,5 @@ import { Transform } from "stream"; -import { TransformOptions } from "../definitions"; +import { TransformOptions } from "../baseDefinitions"; /** * Stores chunks of data internally in array and batches when batchSize is reached. * diff --git a/src/functions/collect/index.ts b/src/functions/collect/index.ts index 11ad423..fbed881 100644 --- a/src/functions/collect/index.ts +++ b/src/functions/collect/index.ts @@ -1,5 +1,5 @@ import { Transform } from "stream"; -import { ThroughOptions } from "../definitions"; +import { ThroughOptions } from "../baseDefinitions"; /** * Return a ReadWrite stream that collects streamed chunks into an array or buffer * @param options diff --git a/src/functions/filter/index.ts b/src/functions/filter/index.ts index 49e7a05..fad186d 100644 --- a/src/functions/filter/index.ts +++ b/src/functions/filter/index.ts @@ -1,5 +1,5 @@ import { Transform } from "stream"; -import { ThroughOptions } from "../definitions"; +import { ThroughOptions } from "../baseDefinitions"; /** * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold * @param predicate Predicate with which to filter scream chunks diff --git a/src/functions/flatMap/index.ts b/src/functions/flatMap/index.ts index 9e90c04..32244a2 100644 --- a/src/functions/flatMap/index.ts +++ b/src/functions/flatMap/index.ts @@ -1,5 +1,5 @@ import { Transform } from "stream"; -import { TransformOptions } from "../definitions"; +import { TransformOptions } from "../baseDefinitions"; /** * Return a ReadWrite stream that flat maps streamed chunks * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) diff --git a/src/functions/join/index.ts b/src/functions/join/index.ts index 0bd22d3..8c7352b 100644 --- a/src/functions/join/index.ts +++ b/src/functions/join/index.ts @@ -1,6 +1,6 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; -import { WithEncoding } from "../definitions"; +import { WithEncoding } from "../baseDefinitions"; /** * Return a ReadWrite stream that joins streamed chunks using the given separator * @param separator Separator to join with diff --git a/src/functions/map/index.ts b/src/functions/map/index.ts index 7ddfbed..4941b98 100644 --- a/src/functions/map/index.ts +++ b/src/functions/map/index.ts @@ -1,5 +1,5 @@ import { Transform } from "stream"; -import { TransformOptions } from "../definitions"; +import { TransformOptions } from "../baseDefinitions"; /** * Return a ReadWrite stream that maps streamed chunks * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) diff --git a/src/functions/parallelMap/index.ts b/src/functions/parallelMap/index.ts index ec82f35..353ab29 100644 --- a/src/functions/parallelMap/index.ts +++ b/src/functions/parallelMap/index.ts @@ -1,6 +1,6 @@ import { Transform } from "stream"; import { sleep } from "../../helpers"; -import { TransformOptions } from "../definitions"; +import { TransformOptions } from "../baseDefinitions"; /** * Limits number of parallel processes in flight. * @param parallel Max number of parallel processes. diff --git a/src/functions/parse/index.ts b/src/functions/parse/index.ts index 1e32cb2..d6ac299 100644 --- a/src/functions/parse/index.ts +++ b/src/functions/parse/index.ts @@ -1,6 +1,6 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; -import { SerializationFormats } from "../definitions"; +import { SerializationFormats } from "../baseDefinitions"; /** * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk * must be a fully defined JSON string. diff --git a/src/functions/rate/index.ts b/src/functions/rate/index.ts index 5e88950..febcc1e 100644 --- a/src/functions/rate/index.ts +++ b/src/functions/rate/index.ts @@ -1,7 +1,7 @@ import { Transform } from "stream"; import { performance } from "perf_hooks"; import { sleep } from "../../helpers"; -import { TransformOptions } from "../definitions"; +import { TransformOptions } from "../baseDefinitions"; /** * Limits date of data transferred into stream. * @param targetRate Desired rate in ms diff --git a/src/functions/reduce/index.ts b/src/functions/reduce/index.ts index f7654fb..743d156 100644 --- a/src/functions/reduce/index.ts +++ b/src/functions/reduce/index.ts @@ -1,5 +1,5 @@ import { Transform } from "stream"; -import { TransformOptions } from "../definitions"; +import { TransformOptions } from "../baseDefinitions"; /** * Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that * value diff --git a/src/functions/replace/index.ts b/src/functions/replace/index.ts index 462103c..c31f369 100644 --- a/src/functions/replace/index.ts +++ b/src/functions/replace/index.ts @@ -1,6 +1,6 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; -import { WithEncoding } from "../definitions"; +import { WithEncoding } from "../baseDefinitions"; /** * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in * the streamed chunks with the specified replacement string diff --git a/src/functions/split/index.ts b/src/functions/split/index.ts index fb0f319..a031c8c 100644 --- a/src/functions/split/index.ts +++ b/src/functions/split/index.ts @@ -1,6 +1,6 @@ import { Transform } from "stream"; import { StringDecoder } from "string_decoder"; -import { WithEncoding } from "../definitions"; +import { WithEncoding } from "../baseDefinitions"; /** * Return a ReadWrite stream that splits streamed chunks using the given separator * @param separator Separator to split by, defaulting to "\n" diff --git a/src/functions/stringify/index.ts b/src/functions/stringify/index.ts index 3ac5a6f..5b476af 100644 --- a/src/functions/stringify/index.ts +++ b/src/functions/stringify/index.ts @@ -1,5 +1,5 @@ import { Transform } from "stream"; -import { JsonValue, JsonParseOptions } from "../definitions"; +import { JsonValue, JsonParseOptions } from "../baseDefinitions"; /** * Return a ReadWrite stream that stringifies the streamed chunks to JSON diff --git a/src/functions/unbatch/index.ts b/src/functions/unbatch/index.ts index b0dd51c..946c754 100644 --- a/src/functions/unbatch/index.ts +++ b/src/functions/unbatch/index.ts @@ -1,5 +1,5 @@ import { Transform } from "stream"; -import { TransformOptions } from "../definitions"; +import { TransformOptions } from "../baseDefinitions"; /** * Unbatches and sends individual chunks of data */