Add sliding, rolling functions with tests

This commit is contained in:
Jerry Kurian 2019-08-09 17:13:48 -04:00
parent c1ef5fec4b
commit fdcc5bafc6
4 changed files with 222 additions and 151 deletions

View File

@ -44,7 +44,7 @@ export interface RollingFlushOptions<T, R> {
export interface SlidingFlushOptions<T, R> {
windowLength: number;
flushMapper?: (flushed: Array<T>) => Array<R>;
windowMapper?: (flushed: Array<T>) => Array<R>;
timeout?: number;
}

View File

@ -3,7 +3,6 @@ import test from "ava";
import { expect } from "chai";
import { performance } from "perf_hooks";
import { Readable } from "stream";
import { FlushStrategy } from "./definitions";
import {
fromArray,
map,
@ -1404,7 +1403,36 @@ test.cb("parallel() parallel mapping", t => {
source.push(null);
});
test.cb("accumulator() buffering strategy clears buffer on condition", t => {
test.cb("accumulator() rolling", t => {
t.plan(3);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const secondFlush = [{ ts: 2, key: "d" }, { ts: 3, key: "e" }];
const thirdFlush = [{ ts: 4, key: "f" }];
const flushes = [firstFlush, secondFlush, thirdFlush];
source
.pipe(accumulator(2, 999, "rolling"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => t.end)
.on("end", () => {
t.end();
});
[...firstFlush, ...secondFlush, ...thirdFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulator() rolling with key", t => {
t.plan(2);
let chunkIndex = 0;
interface TestObject {
@ -1421,11 +1449,7 @@ test.cb("accumulator() buffering strategy clears buffer on condition", t => {
const secondFlush = [{ ts: 3, key: "e" }];
source
.pipe(
accumulator(FlushStrategy.sampling, {
condition: (event: TestObject) => event.ts > 2,
}),
)
.pipe(accumulator(3, 999, "rolling", "ts"))
.on("data", (flush: TestObject[]) => {
if (chunkIndex === 0) {
chunkIndex++;
@ -1434,93 +1458,118 @@ test.cb("accumulator() buffering strategy clears buffer on condition", t => {
t.deepEqual(flush, secondFlush);
}
})
.on("error", e => t.end)
.on("error", (e: any) => t.end)
.on("end", () => {
t.end();
});
source.push([...firstFlush, ...secondFlush]);
[...firstFlush, ...secondFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulator() buffering strategy clears buffer on timeout", t => {
t.plan(2);
test.cb("accumulator() sliding", t => {
t.plan(5);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true, read: () => {} });
const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const secondFlush = [
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 2, key: "d" },
{ ts: 3, key: "e" },
{ ts: 4, key: "d" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 4, key: "d" },
];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fourthFlush,
];
source
.pipe(
accumulator(FlushStrategy.sampling, {
condition: (event: TestObject) => event.ts > 3,
timeout: 1000,
}),
)
.pipe(accumulator(3, 999, "sliding"))
.on("data", (flush: TestObject[]) => {
if (chunkIndex === 0) {
chunkIndex++;
t.deepEqual(flush, firstFlush);
} else {
t.deepEqual(flush, secondFlush);
}
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", e => t.end)
.on("error", (e: any) => t.end)
.on("end", () => {
t.end();
});
source.push(firstFlush);
setTimeout(() => {
source.push(secondFlush);
source.push(null);
}, 2000);
input.forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb(
"accumulator() buffering strategy clears buffer on condition or timeout",
t => {
t.plan(3);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true, read: () => {} });
const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const secondFlush = [{ ts: 2, key: "c" }, { ts: 2, key: "d" }];
const thirdFlush = [{ ts: 3, key: "e" }];
source
.pipe(
accumulator(FlushStrategy.sampling, {
condition: (event: TestObject) => event.ts > 2,
timeout: 1000,
}),
)
.on("data", (flush: TestObject[]) => {
if (chunkIndex === 0) {
chunkIndex++;
t.deepEqual(flush, firstFlush);
} else if (chunkIndex === 1) {
chunkIndex++;
t.deepEqual(flush, secondFlush);
} else {
t.deepEqual(flush, thirdFlush);
}
})
.on("error", e => t.end)
.on("end", () => {
t.end();
});
source.push(firstFlush);
setTimeout(() => {
source.push([...secondFlush, ...thirdFlush]);
source.push(null);
}, 2000);
},
);
test.cb("accumulator() sliding with key", t => {
t.plan(7);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
{ ts: 5, key: "f" },
{ ts: 6, key: "g" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
];
const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fifthFlush,
sixthFlush,
sixthFlush,
];
source
.pipe(accumulator(3, 999, "sliding", "ts"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => t.end)
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
});

View File

@ -2,12 +2,7 @@ import { Transform, Readable, Writable, Duplex } from "stream";
import { performance } from "perf_hooks";
import { ChildProcess } from "child_process";
import { StringDecoder } from "string_decoder";
import {
FlushStrategy,
AccumulatorOptions,
SamplingFlushOptions,
SamplingFlushResult,
TransformOptions,
ThroughOptions,
WithEncoding,
@ -606,75 +601,97 @@ export function parallelMap<T, R>(
});
}
function samplingFlush<T, R>(
event: T,
options: SamplingFlushOptions<T, R>,
buffer: Array<T>,
): SamplingFlushResult<T> {
let flush = null;
if (options.condition(event, buffer)) {
flush = buffer.slice(0);
buffer.length = 0;
}
buffer.push(event);
return { flushed: true, flush };
}
function executeSamplingStrategy<T, R>(
events: T[],
options: SamplingFlushOptions<T, R>,
buffer: Array<T>,
stream: Transform,
): void {
events.forEach(event => {
const sample = samplingFlush(event, options, buffer);
if (sample.flushed && sample.flush && options.flushMapper) {
stream.push(options.flushMapper(sample.flush));
} else if (sample.flushed && sample.flush) {
stream.push(sample.flush);
}
});
}
export function accumulator<T, R, S extends FlushStrategy>(
flushStrategy: S,
options: AccumulatorOptions<T, R, S>,
function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
) {
const buffer: Array<T> = [];
let handle: NodeJS.Timer | null = null;
if (options.timeout) {
handle = setInterval(() => {
if (buffer.length > 0) {
transform.push(buffer);
buffer.length = 0;
}
}, options.timeout);
}
const transform = new Transform({
const buffer: T[] = [];
return new Transform({
objectMode: true,
async transform(data: T[] | T, encoding, callback) {
switch (flushStrategy) {
case FlushStrategy.sampling: {
if (!Array.isArray(data)) data = [data];
executeSamplingStrategy(
data,
options as SamplingFlushOptions<T, R>,
buffer,
this,
);
callback();
break;
}
case FlushStrategy.sliding: {
break;
}
}
async transform(data: any, encoding, callback) {
accumulateBy(data, buffer, this);
callback();
},
flush(callback) {
handle && clearInterval(handle);
this.push(buffer);
callback();
},
});
return transform;
}
function _slidingBy<T>(
windowLength: number,
rate: number,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (key) {
let index = 0;
while (
buffer.length > 0 &&
buffer[index][key] + windowLength <= event[key]
) {
index++;
}
buffer.splice(0, index);
} else if (buffer.length === windowLength) {
buffer.shift();
}
buffer.push(event);
stream.push(buffer);
};
}
function _rollingBy<T>(
windowLength: number,
rate: number,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (key) {
if (
buffer.length > 0 &&
buffer[0][key] + windowLength <= event[key]
) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
} else if (buffer.length === windowLength) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
buffer.push(event);
};
}
export function accumulator(
batchSize: number,
batchRate: number,
flushStrategy: "sliding" | "rolling",
keyBy?: string,
): Transform {
if (flushStrategy === "sliding") {
return sliding(batchSize, batchRate, keyBy);
} else if (flushStrategy === "rolling") {
return rolling(batchSize, batchRate, keyBy);
} else {
return batch(batchSize, batchRate);
}
}
export function sliding(
windowLength: number,
rate: number,
key?: string,
): Transform {
const slidingByFn = _slidingBy(windowLength, rate, key);
return _accumulator(slidingByFn);
}
export function rolling(
windowLength: number,
rate: number,
key?: string,
): Transform {
const rollingByFn = _rollingBy(windowLength, rate, key);
return _accumulator(rollingByFn);
}

View File

@ -3,8 +3,6 @@ import { ChildProcess } from "child_process";
import * as baseFunctions from "./functions";
import {
AccumulatorOptions,
FlushStrategy,
ThroughOptions,
TransformOptions,
WithEncoding,
@ -248,9 +246,16 @@ export function parallelMap<T, R>(
return baseFunctions.parallelMap(mapper, parallel, sleepTime);
}
export function accumulator<T, R, S extends FlushStrategy>(
flushStrategy: S,
options: AccumulatorOptions<T, R, S>,
export function accumulator(
batchSize: number,
batchRate: number,
flushStrategy: "sliding" | "rolling",
keyBy?: string,
) {
return baseFunctions.accumulator(flushStrategy, options);
return baseFunctions.accumulator(
batchSize,
batchRate,
flushStrategy,
keyBy,
);
}