diff --git a/src/functions/definitions.ts b/src/functions/definitions.ts index 6593ec4..460e4ca 100644 --- a/src/functions/definitions.ts +++ b/src/functions/definitions.ts @@ -38,12 +38,14 @@ export type AccumulatorOptions = S extends FlushStrategy.sampling export interface RollingFlushOptions { windowLength: number; - afterFlush?: (flushed: Array) => Array; + flushMapper?: (flushed: Array) => Array; + timeout?: number; } export interface SlidingFlushOptions { windowLength: number; - afterFlush?: (flushed: Array) => Array; + flushMapper?: (flushed: Array) => Array; + timeout?: number; } export interface SlidingFlushResult { @@ -53,6 +55,7 @@ export interface SlidingFlushResult { export interface SamplingFlushOptions { condition: (event: T, buffer: Array) => boolean; flushMapper?: (flushed: Array) => Array; + timeout?: number; } export interface SamplingFlushResult { diff --git a/src/functions/functions.spec.ts b/src/functions/functions.spec.ts index decd771..5289065 100644 --- a/src/functions/functions.spec.ts +++ b/src/functions/functions.spec.ts @@ -1404,20 +1404,21 @@ test.cb("parallel() parallel mapping", t => { source.push(null); }); -test.cb.only("accumulator() buffering strategy", t => { +test.cb("accumulator() buffering strategy clears buffer on condition", t => { + t.plan(2); let chunkIndex = 0; interface TestObject { ts: number; key: string; } const source = new Readable({ objectMode: true }); - const expectedElements = [ + const firstFlush = [ { ts: 0, key: "a" }, { ts: 1, key: "b" }, { ts: 2, key: "c" }, { ts: 2, key: "d" }, - { ts: 3, key: "e" }, ]; + const secondFlush = [{ ts: 3, key: "e" }]; source .pipe( @@ -1428,15 +1429,98 @@ test.cb.only("accumulator() buffering strategy", t => { .on("data", (flush: TestObject[]) => { if (chunkIndex === 0) { chunkIndex++; - t.deepEqual(flush, expectedElements.slice(0, 4)); + t.deepEqual(flush, firstFlush); } else { - t.deepEqual(flush, expectedElements.slice(4)); + t.deepEqual(flush, secondFlush); } }) .on("error", e => t.end) .on("end", () => { t.end(); }); - expectedElements.forEach(element => source.push(element)); + source.push([...firstFlush, ...secondFlush]); source.push(null); }); + +test.cb("accumulator() buffering strategy clears buffer on timeout", t => { + t.plan(2); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true, read: () => {} }); + const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; + const secondFlush = [ + { ts: 2, key: "c" }, + { ts: 2, key: "d" }, + { ts: 3, key: "e" }, + ]; + source + .pipe( + accumulator(FlushStrategy.sampling, { + condition: (event: TestObject) => event.ts > 3, + timeout: 1000, + }), + ) + .on("data", (flush: TestObject[]) => { + if (chunkIndex === 0) { + chunkIndex++; + t.deepEqual(flush, firstFlush); + } else { + t.deepEqual(flush, secondFlush); + } + }) + .on("error", e => t.end) + .on("end", () => { + t.end(); + }); + source.push(firstFlush); + setTimeout(() => { + source.push(secondFlush); + source.push(null); + }, 2000); +}); + +test.cb( + "accumulator() buffering strategy clears buffer on condition or timeout", + t => { + t.plan(3); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true, read: () => {} }); + const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; + const secondFlush = [{ ts: 2, key: "c" }, { ts: 2, key: "d" }]; + const thirdFlush = [{ ts: 3, key: "e" }]; + source + .pipe( + accumulator(FlushStrategy.sampling, { + condition: (event: TestObject) => event.ts > 2, + timeout: 1000, + }), + ) + .on("data", (flush: TestObject[]) => { + if (chunkIndex === 0) { + chunkIndex++; + t.deepEqual(flush, firstFlush); + } else if (chunkIndex === 1) { + chunkIndex++; + t.deepEqual(flush, secondFlush); + } else { + t.deepEqual(flush, thirdFlush); + } + }) + .on("error", e => t.end) + .on("end", () => { + t.end(); + }); + source.push(firstFlush); + setTimeout(() => { + source.push([...secondFlush, ...thirdFlush]); + source.push(null); + }, 2000); + }, +); diff --git a/src/functions/functions.ts b/src/functions/functions.ts index 40a534c..b9fd812 100644 --- a/src/functions/functions.ts +++ b/src/functions/functions.ts @@ -525,7 +525,6 @@ export function batch(batchSize: number = 1000, maxBatchAge: number = 500) { callback(); }, flush(callback) { - console.error("flushing"); sendChunk(this); callback(); }, @@ -642,10 +641,18 @@ export function accumulator( options: AccumulatorOptions, ) { const buffer: Array = []; - return new Transform({ + let handle: NodeJS.Timer | null = null; + if (options.timeout) { + handle = setInterval(() => { + if (buffer.length > 0) { + transform.push(buffer); + buffer.length = 0; + } + }, options.timeout); + } + const transform = new Transform({ objectMode: true, async transform(data: T[] | T, encoding, callback) { - callback(); switch (flushStrategy) { case FlushStrategy.sampling: { if (!Array.isArray(data)) data = [data]; @@ -655,6 +662,7 @@ export function accumulator( buffer, this, ); + callback(); break; } case FlushStrategy.sliding: { @@ -663,8 +671,10 @@ export function accumulator( } }, flush(callback) { + handle && clearInterval(handle); this.push(buffer); callback(); }, }); + return transform; } diff --git a/src/functions/index.ts b/src/functions/index.ts index 6f2c058..a6ae32e 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -245,7 +245,6 @@ export function parallelMap( parallel?: number, sleepTime?: number, ) { - console.log("hi"); return baseFunctions.parallelMap(mapper, parallel, sleepTime); }