From e932adde670f1ff9aee6ad1109429610ebd40766 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Mon, 12 Aug 2019 11:07:39 -0400 Subject: [PATCH] Update tests --- src/functions/functions.spec.ts | 34 +++++++++++++-------------------- src/functions/functions.ts | 17 ++++++++++------- src/functions/index.ts | 2 +- 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/src/functions/functions.spec.ts b/src/functions/functions.spec.ts index d5b2072..6fe8384 100644 --- a/src/functions/functions.spec.ts +++ b/src/functions/functions.spec.ts @@ -1417,12 +1417,14 @@ test.cb("accumulator() rolling", t => { const flushes = [firstFlush, secondFlush, thirdFlush]; source - .pipe(accumulator(2, 999, "rolling")) + .pipe(accumulator(2, undefined, "rolling")) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; }) - .on("error", (e: any) => t.end) + .on("error", (e: any) => { + t.end(e); + }) .on("end", () => { t.end(); }); @@ -1447,16 +1449,13 @@ test.cb("accumulator() rolling with key", t => { { ts: 2, key: "d" }, ]; const secondFlush = [{ ts: 3, key: "e" }]; + const flushes = [firstFlush, secondFlush]; source - .pipe(accumulator(3, 999, "rolling", "ts")) + .pipe(accumulator(3, undefined, "rolling", "ts")) .on("data", (flush: TestObject[]) => { - if (chunkIndex === 0) { - chunkIndex++; - t.deepEqual(flush, firstFlush); - } else { - t.deepEqual(flush, secondFlush); - } + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; }) .on("error", (e: any) => t.end) .on("end", () => { @@ -1469,7 +1468,7 @@ test.cb("accumulator() rolling with key", t => { }); test.cb("accumulator() sliding", t => { - t.plan(5); + t.plan(4); let chunkIndex = 0; interface TestObject { ts: number; @@ -1495,15 +1494,9 @@ test.cb("accumulator() sliding", t => { { ts: 4, key: "d" }, ]; - const flushes = [ - firstFlush, - secondFlush, - thirdFlush, - fourthFlush, - fourthFlush, - ]; + const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush]; source - .pipe(accumulator(3, 999, "sliding")) + .pipe(accumulator(3, undefined, "sliding")) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -1519,7 +1512,7 @@ test.cb("accumulator() sliding", t => { }); test.cb("accumulator() sliding with key", t => { - t.plan(7); + t.plan(6); let chunkIndex = 0; interface TestObject { ts: number; @@ -1556,10 +1549,9 @@ test.cb("accumulator() sliding with key", t => { fourthFlush, fifthFlush, sixthFlush, - sixthFlush, ]; source - .pipe(accumulator(3, 999, "sliding", "ts")) + .pipe(accumulator(3, undefined, "sliding", "ts")) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; diff --git a/src/functions/functions.ts b/src/functions/functions.ts index 219fed0..143a5cc 100644 --- a/src/functions/functions.ts +++ b/src/functions/functions.ts @@ -603,6 +603,7 @@ export function parallelMap( function _accumulator( accumulateBy: (data: T, buffer: T[], stream: Transform) => void, + shouldFlush: boolean = true, ) { const buffer: T[] = []; return new Transform({ @@ -612,7 +613,9 @@ function _accumulator( callback(); }, flush(callback) { - this.push(buffer); + if (shouldFlush) { + this.push(buffer); + } callback(); }, }); @@ -620,7 +623,7 @@ function _accumulator( function _slidingBy( windowLength: number, - rate: number, + rate: number | undefined, key?: string, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { @@ -643,7 +646,7 @@ function _slidingBy( function _rollingBy( windowLength: number, - rate: number, + rate: number | undefined, key?: string, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { @@ -665,7 +668,7 @@ function _rollingBy( export function accumulator( batchSize: number, - batchRate: number, + batchRate: number | undefined, flushStrategy: "sliding" | "rolling", keyBy?: string, ): Transform { @@ -680,16 +683,16 @@ export function accumulator( export function sliding( windowLength: number, - rate: number, + rate: number | undefined, key?: string, ): Transform { const slidingByFn = _slidingBy(windowLength, rate, key); - return _accumulator(slidingByFn); + return _accumulator(slidingByFn, false); } export function rolling( windowLength: number, - rate: number, + rate: number | undefined, key?: string, ): Transform { const rollingByFn = _rollingBy(windowLength, rate, key); diff --git a/src/functions/index.ts b/src/functions/index.ts index 3fe17b2..5599934 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -248,7 +248,7 @@ export function parallelMap( export function accumulator( batchSize: number, - batchRate: number, + batchRate: number | undefined, flushStrategy: "sliding" | "rolling", keyBy?: string, ) {