Update tests

This commit is contained in:
Jerry Kurian 2019-08-12 11:07:39 -04:00
parent fdcc5bafc6
commit e932adde67
3 changed files with 24 additions and 29 deletions

View File

@ -1417,12 +1417,14 @@ test.cb("accumulator() rolling", t => {
const flushes = [firstFlush, secondFlush, thirdFlush];
source
.pipe(accumulator(2, 999, "rolling"))
.pipe(accumulator(2, undefined, "rolling"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => t.end)
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
@ -1447,16 +1449,13 @@ test.cb("accumulator() rolling with key", t => {
{ ts: 2, key: "d" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(accumulator(3, 999, "rolling", "ts"))
.pipe(accumulator(3, undefined, "rolling", "ts"))
.on("data", (flush: TestObject[]) => {
if (chunkIndex === 0) {
chunkIndex++;
t.deepEqual(flush, firstFlush);
} else {
t.deepEqual(flush, secondFlush);
}
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => t.end)
.on("end", () => {
@ -1469,7 +1468,7 @@ test.cb("accumulator() rolling with key", t => {
});
test.cb("accumulator() sliding", t => {
t.plan(5);
t.plan(4);
let chunkIndex = 0;
interface TestObject {
ts: number;
@ -1495,15 +1494,9 @@ test.cb("accumulator() sliding", t => {
{ ts: 4, key: "d" },
];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fourthFlush,
];
const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush];
source
.pipe(accumulator(3, 999, "sliding"))
.pipe(accumulator(3, undefined, "sliding"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
@ -1519,7 +1512,7 @@ test.cb("accumulator() sliding", t => {
});
test.cb("accumulator() sliding with key", t => {
t.plan(7);
t.plan(6);
let chunkIndex = 0;
interface TestObject {
ts: number;
@ -1556,10 +1549,9 @@ test.cb("accumulator() sliding with key", t => {
fourthFlush,
fifthFlush,
sixthFlush,
sixthFlush,
];
source
.pipe(accumulator(3, 999, "sliding", "ts"))
.pipe(accumulator(3, undefined, "sliding", "ts"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;

View File

@ -603,6 +603,7 @@ export function parallelMap<T, R>(
function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
shouldFlush: boolean = true,
) {
const buffer: T[] = [];
return new Transform({
@ -612,7 +613,9 @@ function _accumulator<T>(
callback();
},
flush(callback) {
this.push(buffer);
if (shouldFlush) {
this.push(buffer);
}
callback();
},
});
@ -620,7 +623,7 @@ function _accumulator<T>(
function _slidingBy<T>(
windowLength: number,
rate: number,
rate: number | undefined,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
@ -643,7 +646,7 @@ function _slidingBy<T>(
function _rollingBy<T>(
windowLength: number,
rate: number,
rate: number | undefined,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
@ -665,7 +668,7 @@ function _rollingBy<T>(
export function accumulator(
batchSize: number,
batchRate: number,
batchRate: number | undefined,
flushStrategy: "sliding" | "rolling",
keyBy?: string,
): Transform {
@ -680,16 +683,16 @@ export function accumulator(
export function sliding(
windowLength: number,
rate: number,
rate: number | undefined,
key?: string,
): Transform {
const slidingByFn = _slidingBy(windowLength, rate, key);
return _accumulator(slidingByFn);
return _accumulator(slidingByFn, false);
}
export function rolling(
windowLength: number,
rate: number,
rate: number | undefined,
key?: string,
): Transform {
const rollingByFn = _rollingBy(windowLength, rate, key);

View File

@ -248,7 +248,7 @@ export function parallelMap<T, R>(
export function accumulator(
batchSize: number,
batchRate: number,
batchRate: number | undefined,
flushStrategy: "sliding" | "rolling",
keyBy?: string,
) {