diff --git a/src/functions/definitions.ts b/src/functions/definitions.ts index 2729b65..6cc97a8 100644 --- a/src/functions/definitions.ts +++ b/src/functions/definitions.ts @@ -26,3 +26,5 @@ export enum FlushStrategy { rolling = "rolling", sliding = "sliding", } + +export type AccumulatorByIteratee = (event: T, bufferChunk: T) => boolean; diff --git a/src/functions/functions.spec.ts b/src/functions/functions.spec.ts index 6d1b31e..f174928 100644 --- a/src/functions/functions.spec.ts +++ b/src/functions/functions.spec.ts @@ -25,7 +25,9 @@ import { rate, parallelMap, accumulator, + accumulatorBy, } from "."; +import { FlushStrategy } from "./definitions"; import { sleep } from "../helpers"; test.cb("fromArray() streams array elements in flowing mode", t => { @@ -1417,7 +1419,7 @@ test.cb("accumulator() rolling", t => { const flushes = [firstFlush, secondFlush, thirdFlush]; source - .pipe(accumulator(2, undefined, "rolling")) + .pipe(accumulator(2, undefined, FlushStrategy.rolling)) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -1452,7 +1454,50 @@ test.cb("accumulator() rolling with key", t => { const flushes = [firstFlush, secondFlush]; source - .pipe(accumulator(3, undefined, "rolling", "ts")) + .pipe(accumulator(3, undefined, FlushStrategy.rolling, "ts")) + .on("data", (flush: TestObject[]) => { + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; + }) + .on("error", (e: any) => { + t.end(e); + }) + .on("end", () => { + t.end(); + }); + [...firstFlush, ...secondFlush].forEach(item => { + source.push(item); + }); + source.push(null); +}); + +test.cb("accumulatorBy() rolling", t => { + t.plan(2); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const firstFlush = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { ts: 2, key: "d" }, + ]; + const secondFlush = [{ ts: 3, key: "e" }]; + const flushes = [firstFlush, secondFlush]; + + source + .pipe( + accumulatorBy( + undefined, + FlushStrategy.rolling, + (event: TestObject, bufferChunk: TestObject) => { + return bufferChunk.ts + 3 <= event.ts; + }, + ), + ) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -1498,7 +1543,7 @@ test.cb("accumulator() sliding", t => { const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush]; source - .pipe(accumulator(3, undefined, "sliding")) + .pipe(accumulator(3, undefined, FlushStrategy.sliding)) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -1555,7 +1600,72 @@ test.cb("accumulator() sliding with key", t => { sixthFlush, ]; source - .pipe(accumulator(3, undefined, "sliding", "ts")) + .pipe(accumulator(3, undefined, FlushStrategy.sliding, "ts")) + .on("data", (flush: TestObject[]) => { + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; + }) + .on("error", (e: any) => { + t.end(e); + }) + .on("end", () => { + t.end(); + }); + input.forEach(item => { + source.push(item); + }); + source.push(null); +}); + +test.cb("accumulatorBy() sliding", t => { + t.plan(6); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const input = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { ts: 3, key: "d" }, + { ts: 5, key: "f" }, + { ts: 6, key: "g" }, + ]; + const firstFlush = [{ ts: 0, key: "a" }]; + const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; + const thirdFlush = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + ]; + const fourthFlush = [ + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { ts: 3, key: "d" }, + ]; + const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }]; + const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }]; + + const flushes = [ + firstFlush, + secondFlush, + thirdFlush, + fourthFlush, + fifthFlush, + sixthFlush, + ]; + source + .pipe( + accumulatorBy( + undefined, + FlushStrategy.sliding, + (event: TestObject, bufferChunk: TestObject) => { + return bufferChunk.ts + 3 <= event.ts ? true : false; + }, + ), + ) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; diff --git a/src/functions/functions.ts b/src/functions/functions.ts index 2df000e..49eea1f 100644 --- a/src/functions/functions.ts +++ b/src/functions/functions.ts @@ -10,6 +10,7 @@ import { JsonValue, JsonParseOptions, FlushStrategy, + AccumulatorByIteratee, } from "./definitions"; import { sleep } from "../helpers"; @@ -622,7 +623,7 @@ function _accumulator( }); } -function _slidingBy( +function _sliding( windowLength: number, rate: number | undefined, key?: string, @@ -631,7 +632,7 @@ function _slidingBy( if (key) { let index = 0; while ( - buffer.length > 0 && + index < buffer.length && buffer[index][key] + windowLength <= event[key] ) { index++; @@ -645,7 +646,37 @@ function _slidingBy( }; } -function _rollingBy( +function _slidingByFunction( + rate: number | undefined, + iteratee: AccumulatorByIteratee, +): (event: T, buffer: T[], stream: Transform) => void { + return (event: T, buffer: T[], stream: Transform) => { + let index = 0; + while (index < buffer.length && iteratee(event, buffer[index])) { + index++; + } + buffer.splice(0, index); + buffer.push(event); + stream.push(buffer); + }; +} + +function _rollingByFunction( + rate: number | undefined, + iteratee: AccumulatorByIteratee, +): (event: T, buffer: T[], stream: Transform) => void { + return (event: T, buffer: T[], stream: Transform) => { + if (iteratee) { + if (buffer.length > 0 && iteratee(event, buffer[0])) { + stream.push(buffer.slice(0)); + buffer.length = 0; + } + } + buffer.push(event); + }; +} + +function _rolling( windowLength: number, rate: number | undefined, key?: string, @@ -691,13 +722,31 @@ export function accumulator( } } +export function accumulatorBy( + batchRate: number | undefined, + flushStrategy: S, + iteratee: AccumulatorByIteratee, +): Transform { + if (flushStrategy === FlushStrategy.sliding) { + return slidingBy(batchRate, iteratee); + } else { + return rollingBy(batchRate, iteratee); + } +} + export function sliding( windowLength: number, rate: number | undefined, key?: string, ): Transform { - const slidingByFn = _slidingBy(windowLength, rate, key); - return _accumulator(slidingByFn, false); + return _accumulator(_sliding(windowLength, rate, key), false); +} + +export function slidingBy( + rate: number | undefined, + iteratee: AccumulatorByIteratee, +): Transform { + return _accumulator(_slidingByFunction(rate, iteratee), false); } export function rolling( @@ -705,6 +754,12 @@ export function rolling( rate: number | undefined, key?: string, ): Transform { - const rollingByFn = _rollingBy(windowLength, rate, key); - return _accumulator(rollingByFn); + return _accumulator(_rolling(windowLength, rate, key)); +} + +export function rollingBy( + rate: number | undefined, + iteratee: AccumulatorByIteratee, +): Transform { + return _accumulator(_rollingByFunction(rate, iteratee)); } diff --git a/src/functions/index.ts b/src/functions/index.ts index 6265a92..2c511c0 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -8,6 +8,7 @@ import { WithEncoding, JsonParseOptions, FlushStrategy, + AccumulatorByIteratee, } from "./definitions"; /** @@ -260,3 +261,11 @@ export function accumulator( keyBy, ); } + +export function accumulatorBy( + batchRate: number | undefined, + flushStrategy: S, + iteratee: AccumulatorByIteratee, +) { + return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee); +}