From 48a231d61c34f15a2773c09a115b4ab21bfbf1d8 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 12 Sep 2019 15:34:42 -0400 Subject: [PATCH] Remote rate from accumulator --- src/functions/accumulator.ts | 51 +++++++++--------------------------- tests/accumulator.spec.ts | 1 + 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/src/functions/accumulator.ts b/src/functions/accumulator.ts index 0bef8e1..3176304 100644 --- a/src/functions/accumulator.ts +++ b/src/functions/accumulator.ts @@ -4,11 +4,10 @@ import { FlushStrategy, TransformOptions, } from "./baseDefinitions"; -import { batch, rate as _rate } from "."; +import { batch } from "."; function _accumulator( accumulateBy: (data: T, buffer: T[], stream: Transform) => void, - rate?: number, shouldFlush: boolean = true, options: TransformOptions = { readableObjectMode: true, @@ -16,7 +15,7 @@ function _accumulator( }, ) { const buffer: T[] = []; - const stream = new Transform({ + return new Transform({ ...options, transform(data: T, encoding, callback) { accumulateBy(data, buffer, this); @@ -29,10 +28,6 @@ function _accumulator( callback(); }, }); - if (rate) { - stream.pipe(_rate(rate)); - } - return stream; } function _sliding( @@ -133,7 +128,7 @@ export function accumulator( flushStrategy: FlushStrategy, batchSize: number, keyBy?: string, - options: TransformOptions & { rate?: number } = { + options: TransformOptions = { readableObjectMode: true, writableObjectMode: true, }, @@ -143,14 +138,14 @@ export function accumulator( } else if (flushStrategy === FlushStrategy.rolling) { return rolling(batchSize, keyBy, options); } else { - return batch(batchSize, options.rate); + return batch(batchSize); } } export function accumulatorBy( flushStrategy: S, iteratee: AccumulatorByIteratee, - options: TransformOptions & { rate?: number } = { + options: TransformOptions = { readableObjectMode: true, writableObjectMode: true, }, @@ -165,49 +160,29 @@ export function accumulatorBy( function sliding( windowLength: number, key?: string, - options?: TransformOptions & { rate?: number }, + options?: TransformOptions, ): Transform { - return _accumulator( - _sliding(windowLength, key), - options && options.rate, - false, - options, - ); + return _accumulator(_sliding(windowLength, key), false, options); } function slidingBy( iteratee: AccumulatorByIteratee, - options?: TransformOptions & { rate?: number }, + options?: TransformOptions, ): Transform { - return _accumulator( - _slidingByFunction(iteratee), - options && options.rate, - false, - options, - ); + return _accumulator(_slidingByFunction(iteratee), false, options); } function rolling( windowLength: number, key?: string, - options?: TransformOptions & { rate?: number }, + options?: TransformOptions, ): Transform { - return _accumulator( - _rolling(windowLength, key), - options && options.rate, - true, - options, - ); + return _accumulator(_rolling(windowLength, key), true, options); } function rollingBy( iteratee: AccumulatorByIteratee, - options?: TransformOptions & { rate?: number }, + options?: TransformOptions, ): Transform { - return _accumulator( - _rollingByFunction(iteratee), - options && options.rate, - true, - options, - ); + return _accumulator(_rollingByFunction(iteratee), true, options); } diff --git a/tests/accumulator.spec.ts b/tests/accumulator.spec.ts index 2ee455c..feac408 100644 --- a/tests/accumulator.spec.ts +++ b/tests/accumulator.spec.ts @@ -3,6 +3,7 @@ import { expect } from "chai"; import { Readable } from "stream"; import { accumulator, accumulatorBy } from "../src"; import { FlushStrategy } from "../src/functions/baseDefinitions"; +import { performance } from "perf_hooks"; test.cb("accumulator() rolling", t => { t.plan(3);