Remote rate from accumulator

This commit is contained in:
Jerry Kurian 2019-09-12 15:34:42 -04:00
parent 4c7e9ceb7e
commit 48a231d61c
2 changed files with 14 additions and 38 deletions

View File

@ -4,11 +4,10 @@ import {
FlushStrategy, FlushStrategy,
TransformOptions, TransformOptions,
} from "./baseDefinitions"; } from "./baseDefinitions";
import { batch, rate as _rate } from "."; import { batch } from ".";
function _accumulator<T>( function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void, accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
rate?: number,
shouldFlush: boolean = true, shouldFlush: boolean = true,
options: TransformOptions = { options: TransformOptions = {
readableObjectMode: true, readableObjectMode: true,
@ -16,7 +15,7 @@ function _accumulator<T>(
}, },
) { ) {
const buffer: T[] = []; const buffer: T[] = [];
const stream = new Transform({ return new Transform({
...options, ...options,
transform(data: T, encoding, callback) { transform(data: T, encoding, callback) {
accumulateBy(data, buffer, this); accumulateBy(data, buffer, this);
@ -29,10 +28,6 @@ function _accumulator<T>(
callback(); callback();
}, },
}); });
if (rate) {
stream.pipe(_rate(rate));
}
return stream;
} }
function _sliding<T>( function _sliding<T>(
@ -133,7 +128,7 @@ export function accumulator(
flushStrategy: FlushStrategy, flushStrategy: FlushStrategy,
batchSize: number, batchSize: number,
keyBy?: string, keyBy?: string,
options: TransformOptions & { rate?: number } = { options: TransformOptions = {
readableObjectMode: true, readableObjectMode: true,
writableObjectMode: true, writableObjectMode: true,
}, },
@ -143,14 +138,14 @@ export function accumulator(
} else if (flushStrategy === FlushStrategy.rolling) { } else if (flushStrategy === FlushStrategy.rolling) {
return rolling(batchSize, keyBy, options); return rolling(batchSize, keyBy, options);
} else { } else {
return batch(batchSize, options.rate); return batch(batchSize);
} }
} }
export function accumulatorBy<T, S extends FlushStrategy>( export function accumulatorBy<T, S extends FlushStrategy>(
flushStrategy: S, flushStrategy: S,
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
options: TransformOptions & { rate?: number } = { options: TransformOptions = {
readableObjectMode: true, readableObjectMode: true,
writableObjectMode: true, writableObjectMode: true,
}, },
@ -165,49 +160,29 @@ export function accumulatorBy<T, S extends FlushStrategy>(
function sliding( function sliding(
windowLength: number, windowLength: number,
key?: string, key?: string,
options?: TransformOptions & { rate?: number }, options?: TransformOptions,
): Transform { ): Transform {
return _accumulator( return _accumulator(_sliding(windowLength, key), false, options);
_sliding(windowLength, key),
options && options.rate,
false,
options,
);
} }
function slidingBy<T>( function slidingBy<T>(
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
options?: TransformOptions & { rate?: number }, options?: TransformOptions,
): Transform { ): Transform {
return _accumulator( return _accumulator(_slidingByFunction(iteratee), false, options);
_slidingByFunction(iteratee),
options && options.rate,
false,
options,
);
} }
function rolling( function rolling(
windowLength: number, windowLength: number,
key?: string, key?: string,
options?: TransformOptions & { rate?: number }, options?: TransformOptions,
): Transform { ): Transform {
return _accumulator( return _accumulator(_rolling(windowLength, key), true, options);
_rolling(windowLength, key),
options && options.rate,
true,
options,
);
} }
function rollingBy<T>( function rollingBy<T>(
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
options?: TransformOptions & { rate?: number }, options?: TransformOptions,
): Transform { ): Transform {
return _accumulator( return _accumulator(_rollingByFunction(iteratee), true, options);
_rollingByFunction(iteratee),
options && options.rate,
true,
options,
);
} }

View File

@ -3,6 +3,7 @@ import { expect } from "chai";
import { Readable } from "stream"; import { Readable } from "stream";
import { accumulator, accumulatorBy } from "../src"; import { accumulator, accumulatorBy } from "../src";
import { FlushStrategy } from "../src/functions/baseDefinitions"; import { FlushStrategy } from "../src/functions/baseDefinitions";
import { performance } from "perf_hooks";
test.cb("accumulator() rolling", t => { test.cb("accumulator() rolling", t => {
t.plan(3); t.plan(3);