More tests

This commit is contained in:
Jerry Kurian 2019-08-12 14:42:54 -04:00
parent c72ecaf219
commit 3a1fbf44d7
4 changed files with 187 additions and 11 deletions

View File

@ -26,3 +26,5 @@ export enum FlushStrategy {
rolling = "rolling",
sliding = "sliding",
}
export type AccumulatorByIteratee<T> = (event: T, bufferChunk: T) => boolean;

View File

@ -25,7 +25,9 @@ import {
rate,
parallelMap,
accumulator,
accumulatorBy,
} from ".";
import { FlushStrategy } from "./definitions";
import { sleep } from "../helpers";
test.cb("fromArray() streams array elements in flowing mode", t => {
@ -1417,7 +1419,7 @@ test.cb("accumulator() rolling", t => {
const flushes = [firstFlush, secondFlush, thirdFlush];
source
.pipe(accumulator(2, undefined, "rolling"))
.pipe(accumulator(2, undefined, FlushStrategy.rolling))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
@ -1452,7 +1454,50 @@ test.cb("accumulator() rolling with key", t => {
const flushes = [firstFlush, secondFlush];
source
.pipe(accumulator(3, undefined, "rolling", "ts"))
.pipe(accumulator(3, undefined, FlushStrategy.rolling, "ts"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
[...firstFlush, ...secondFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulatorBy() rolling", t => {
t.plan(2);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 2, key: "d" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(
accumulatorBy(
undefined,
FlushStrategy.rolling,
(event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts;
},
),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
@ -1498,7 +1543,7 @@ test.cb("accumulator() sliding", t => {
const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush];
source
.pipe(accumulator(3, undefined, "sliding"))
.pipe(accumulator(3, undefined, FlushStrategy.sliding))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
@ -1555,7 +1600,72 @@ test.cb("accumulator() sliding with key", t => {
sixthFlush,
];
source
.pipe(accumulator(3, undefined, "sliding", "ts"))
.pipe(accumulator(3, undefined, FlushStrategy.sliding, "ts"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulatorBy() sliding", t => {
t.plan(6);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
{ ts: 5, key: "f" },
{ ts: 6, key: "g" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
];
const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fifthFlush,
sixthFlush,
];
source
.pipe(
accumulatorBy(
undefined,
FlushStrategy.sliding,
(event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts ? true : false;
},
),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;

View File

@ -10,6 +10,7 @@ import {
JsonValue,
JsonParseOptions,
FlushStrategy,
AccumulatorByIteratee,
} from "./definitions";
import { sleep } from "../helpers";
@ -622,7 +623,7 @@ function _accumulator<T>(
});
}
function _slidingBy<T>(
function _sliding<T>(
windowLength: number,
rate: number | undefined,
key?: string,
@ -631,7 +632,7 @@ function _slidingBy<T>(
if (key) {
let index = 0;
while (
buffer.length > 0 &&
index < buffer.length &&
buffer[index][key] + windowLength <= event[key]
) {
index++;
@ -645,7 +646,37 @@ function _slidingBy<T>(
};
}
function _rollingBy<T>(
function _slidingByFunction<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
let index = 0;
while (index < buffer.length && iteratee(event, buffer[index])) {
index++;
}
buffer.splice(0, index);
buffer.push(event);
stream.push(buffer);
};
}
function _rollingByFunction<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (iteratee) {
if (buffer.length > 0 && iteratee(event, buffer[0])) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
}
buffer.push(event);
};
}
function _rolling<T>(
windowLength: number,
rate: number | undefined,
key?: string,
@ -691,13 +722,31 @@ export function accumulator(
}
}
export function accumulatorBy<T, S extends FlushStrategy>(
batchRate: number | undefined,
flushStrategy: S,
iteratee: AccumulatorByIteratee<T>,
): Transform {
if (flushStrategy === FlushStrategy.sliding) {
return slidingBy(batchRate, iteratee);
} else {
return rollingBy(batchRate, iteratee);
}
}
export function sliding(
windowLength: number,
rate: number | undefined,
key?: string,
): Transform {
const slidingByFn = _slidingBy(windowLength, rate, key);
return _accumulator(slidingByFn, false);
return _accumulator(_sliding(windowLength, rate, key), false);
}
export function slidingBy<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): Transform {
return _accumulator(_slidingByFunction(rate, iteratee), false);
}
export function rolling(
@ -705,6 +754,12 @@ export function rolling(
rate: number | undefined,
key?: string,
): Transform {
const rollingByFn = _rollingBy(windowLength, rate, key);
return _accumulator(rollingByFn);
return _accumulator(_rolling(windowLength, rate, key));
}
export function rollingBy<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): Transform {
return _accumulator(_rollingByFunction(rate, iteratee));
}

View File

@ -8,6 +8,7 @@ import {
WithEncoding,
JsonParseOptions,
FlushStrategy,
AccumulatorByIteratee,
} from "./definitions";
/**
@ -260,3 +261,11 @@ export function accumulator(
keyBy,
);
}
export function accumulatorBy<T, S extends FlushStrategy>(
batchRate: number | undefined,
flushStrategy: S,
iteratee: AccumulatorByIteratee<T>,
) {
return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee);
}