From fdcc5bafc61e421f82bbacd675f483cec1717391 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Fri, 9 Aug 2019 17:13:48 -0400 Subject: [PATCH] Add sliding, rolling functions with tests --- src/functions/definitions.ts | 2 +- src/functions/functions.spec.ts | 201 ++++++++++++++++++++------------ src/functions/functions.ts | 153 +++++++++++++----------- src/functions/index.ts | 17 ++- 4 files changed, 222 insertions(+), 151 deletions(-) diff --git a/src/functions/definitions.ts b/src/functions/definitions.ts index a328242..962a7a4 100644 --- a/src/functions/definitions.ts +++ b/src/functions/definitions.ts @@ -44,7 +44,7 @@ export interface RollingFlushOptions { export interface SlidingFlushOptions { windowLength: number; - flushMapper?: (flushed: Array) => Array; + windowMapper?: (flushed: Array) => Array; timeout?: number; } diff --git a/src/functions/functions.spec.ts b/src/functions/functions.spec.ts index 5289065..d5b2072 100644 --- a/src/functions/functions.spec.ts +++ b/src/functions/functions.spec.ts @@ -3,7 +3,6 @@ import test from "ava"; import { expect } from "chai"; import { performance } from "perf_hooks"; import { Readable } from "stream"; -import { FlushStrategy } from "./definitions"; import { fromArray, map, @@ -1404,7 +1403,36 @@ test.cb("parallel() parallel mapping", t => { source.push(null); }); -test.cb("accumulator() buffering strategy clears buffer on condition", t => { +test.cb("accumulator() rolling", t => { + t.plan(3); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; + const secondFlush = [{ ts: 2, key: "d" }, { ts: 3, key: "e" }]; + const thirdFlush = [{ ts: 4, key: "f" }]; + const flushes = [firstFlush, secondFlush, thirdFlush]; + + source + .pipe(accumulator(2, 999, "rolling")) + .on("data", (flush: TestObject[]) => { + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; + }) + .on("error", (e: any) => t.end) + .on("end", () => { + t.end(); + }); + [...firstFlush, ...secondFlush, ...thirdFlush].forEach(item => { + source.push(item); + }); + source.push(null); +}); + +test.cb("accumulator() rolling with key", t => { t.plan(2); let chunkIndex = 0; interface TestObject { @@ -1421,11 +1449,7 @@ test.cb("accumulator() buffering strategy clears buffer on condition", t => { const secondFlush = [{ ts: 3, key: "e" }]; source - .pipe( - accumulator(FlushStrategy.sampling, { - condition: (event: TestObject) => event.ts > 2, - }), - ) + .pipe(accumulator(3, 999, "rolling", "ts")) .on("data", (flush: TestObject[]) => { if (chunkIndex === 0) { chunkIndex++; @@ -1434,93 +1458,118 @@ test.cb("accumulator() buffering strategy clears buffer on condition", t => { t.deepEqual(flush, secondFlush); } }) - .on("error", e => t.end) + .on("error", (e: any) => t.end) .on("end", () => { t.end(); }); - source.push([...firstFlush, ...secondFlush]); + [...firstFlush, ...secondFlush].forEach(item => { + source.push(item); + }); source.push(null); }); -test.cb("accumulator() buffering strategy clears buffer on timeout", t => { - t.plan(2); +test.cb("accumulator() sliding", t => { + t.plan(5); let chunkIndex = 0; interface TestObject { ts: number; key: string; } - const source = new Readable({ objectMode: true, read: () => {} }); - const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; - const secondFlush = [ + const source = new Readable({ objectMode: true }); + const input = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, { ts: 2, key: "c" }, - { ts: 2, key: "d" }, - { ts: 3, key: "e" }, + { ts: 4, key: "d" }, + ]; + const firstFlush = [{ ts: 0, key: "a" }]; + const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; + const thirdFlush = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + ]; + const fourthFlush = [ + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { ts: 4, key: "d" }, + ]; + + const flushes = [ + firstFlush, + secondFlush, + thirdFlush, + fourthFlush, + fourthFlush, ]; source - .pipe( - accumulator(FlushStrategy.sampling, { - condition: (event: TestObject) => event.ts > 3, - timeout: 1000, - }), - ) + .pipe(accumulator(3, 999, "sliding")) .on("data", (flush: TestObject[]) => { - if (chunkIndex === 0) { - chunkIndex++; - t.deepEqual(flush, firstFlush); - } else { - t.deepEqual(flush, secondFlush); - } + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; }) - .on("error", e => t.end) + .on("error", (e: any) => t.end) .on("end", () => { t.end(); }); - source.push(firstFlush); - setTimeout(() => { - source.push(secondFlush); - source.push(null); - }, 2000); + input.forEach(item => { + source.push(item); + }); + source.push(null); }); -test.cb( - "accumulator() buffering strategy clears buffer on condition or timeout", - t => { - t.plan(3); - let chunkIndex = 0; - interface TestObject { - ts: number; - key: string; - } - const source = new Readable({ objectMode: true, read: () => {} }); - const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; - const secondFlush = [{ ts: 2, key: "c" }, { ts: 2, key: "d" }]; - const thirdFlush = [{ ts: 3, key: "e" }]; - source - .pipe( - accumulator(FlushStrategy.sampling, { - condition: (event: TestObject) => event.ts > 2, - timeout: 1000, - }), - ) - .on("data", (flush: TestObject[]) => { - if (chunkIndex === 0) { - chunkIndex++; - t.deepEqual(flush, firstFlush); - } else if (chunkIndex === 1) { - chunkIndex++; - t.deepEqual(flush, secondFlush); - } else { - t.deepEqual(flush, thirdFlush); - } - }) - .on("error", e => t.end) - .on("end", () => { - t.end(); - }); - source.push(firstFlush); - setTimeout(() => { - source.push([...secondFlush, ...thirdFlush]); - source.push(null); - }, 2000); - }, -); +test.cb("accumulator() sliding with key", t => { + t.plan(7); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const input = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { ts: 3, key: "d" }, + { ts: 5, key: "f" }, + { ts: 6, key: "g" }, + ]; + const firstFlush = [{ ts: 0, key: "a" }]; + const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; + const thirdFlush = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + ]; + const fourthFlush = [ + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { ts: 3, key: "d" }, + ]; + const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }]; + const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }]; + + const flushes = [ + firstFlush, + secondFlush, + thirdFlush, + fourthFlush, + fifthFlush, + sixthFlush, + sixthFlush, + ]; + source + .pipe(accumulator(3, 999, "sliding", "ts")) + .on("data", (flush: TestObject[]) => { + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; + }) + .on("error", (e: any) => t.end) + .on("end", () => { + t.end(); + }); + input.forEach(item => { + source.push(item); + }); + source.push(null); +}); diff --git a/src/functions/functions.ts b/src/functions/functions.ts index b9fd812..219fed0 100644 --- a/src/functions/functions.ts +++ b/src/functions/functions.ts @@ -2,12 +2,7 @@ import { Transform, Readable, Writable, Duplex } from "stream"; import { performance } from "perf_hooks"; import { ChildProcess } from "child_process"; import { StringDecoder } from "string_decoder"; - import { - FlushStrategy, - AccumulatorOptions, - SamplingFlushOptions, - SamplingFlushResult, TransformOptions, ThroughOptions, WithEncoding, @@ -606,75 +601,97 @@ export function parallelMap( }); } -function samplingFlush( - event: T, - options: SamplingFlushOptions, - buffer: Array, -): SamplingFlushResult { - let flush = null; - if (options.condition(event, buffer)) { - flush = buffer.slice(0); - buffer.length = 0; - } - buffer.push(event); - return { flushed: true, flush }; -} - -function executeSamplingStrategy( - events: T[], - options: SamplingFlushOptions, - buffer: Array, - stream: Transform, -): void { - events.forEach(event => { - const sample = samplingFlush(event, options, buffer); - if (sample.flushed && sample.flush && options.flushMapper) { - stream.push(options.flushMapper(sample.flush)); - } else if (sample.flushed && sample.flush) { - stream.push(sample.flush); - } - }); -} - -export function accumulator( - flushStrategy: S, - options: AccumulatorOptions, +function _accumulator( + accumulateBy: (data: T, buffer: T[], stream: Transform) => void, ) { - const buffer: Array = []; - let handle: NodeJS.Timer | null = null; - if (options.timeout) { - handle = setInterval(() => { - if (buffer.length > 0) { - transform.push(buffer); - buffer.length = 0; - } - }, options.timeout); - } - const transform = new Transform({ + const buffer: T[] = []; + return new Transform({ objectMode: true, - async transform(data: T[] | T, encoding, callback) { - switch (flushStrategy) { - case FlushStrategy.sampling: { - if (!Array.isArray(data)) data = [data]; - executeSamplingStrategy( - data, - options as SamplingFlushOptions, - buffer, - this, - ); - callback(); - break; - } - case FlushStrategy.sliding: { - break; - } - } + async transform(data: any, encoding, callback) { + accumulateBy(data, buffer, this); + callback(); }, flush(callback) { - handle && clearInterval(handle); this.push(buffer); callback(); }, }); - return transform; +} + +function _slidingBy( + windowLength: number, + rate: number, + key?: string, +): (event: T, buffer: T[], stream: Transform) => void { + return (event: T, buffer: T[], stream: Transform) => { + if (key) { + let index = 0; + while ( + buffer.length > 0 && + buffer[index][key] + windowLength <= event[key] + ) { + index++; + } + buffer.splice(0, index); + } else if (buffer.length === windowLength) { + buffer.shift(); + } + buffer.push(event); + stream.push(buffer); + }; +} + +function _rollingBy( + windowLength: number, + rate: number, + key?: string, +): (event: T, buffer: T[], stream: Transform) => void { + return (event: T, buffer: T[], stream: Transform) => { + if (key) { + if ( + buffer.length > 0 && + buffer[0][key] + windowLength <= event[key] + ) { + stream.push(buffer.slice(0)); + buffer.length = 0; + } + } else if (buffer.length === windowLength) { + stream.push(buffer.slice(0)); + buffer.length = 0; + } + buffer.push(event); + }; +} + +export function accumulator( + batchSize: number, + batchRate: number, + flushStrategy: "sliding" | "rolling", + keyBy?: string, +): Transform { + if (flushStrategy === "sliding") { + return sliding(batchSize, batchRate, keyBy); + } else if (flushStrategy === "rolling") { + return rolling(batchSize, batchRate, keyBy); + } else { + return batch(batchSize, batchRate); + } +} + +export function sliding( + windowLength: number, + rate: number, + key?: string, +): Transform { + const slidingByFn = _slidingBy(windowLength, rate, key); + return _accumulator(slidingByFn); +} + +export function rolling( + windowLength: number, + rate: number, + key?: string, +): Transform { + const rollingByFn = _rollingBy(windowLength, rate, key); + return _accumulator(rollingByFn); } diff --git a/src/functions/index.ts b/src/functions/index.ts index a6ae32e..3fe17b2 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -3,8 +3,6 @@ import { ChildProcess } from "child_process"; import * as baseFunctions from "./functions"; import { - AccumulatorOptions, - FlushStrategy, ThroughOptions, TransformOptions, WithEncoding, @@ -248,9 +246,16 @@ export function parallelMap( return baseFunctions.parallelMap(mapper, parallel, sleepTime); } -export function accumulator( - flushStrategy: S, - options: AccumulatorOptions, +export function accumulator( + batchSize: number, + batchRate: number, + flushStrategy: "sliding" | "rolling", + keyBy?: string, ) { - return baseFunctions.accumulator(flushStrategy, options); + return baseFunctions.accumulator( + batchSize, + batchRate, + flushStrategy, + keyBy, + ); }