import { Transform, TransformOptions } from "stream"; export enum FlushStrategy { rolling = "rolling", sliding = "sliding", } export type AccumulatorByIteratee = (event: T, bufferChunk: T) => boolean; function _accumulator( accumulateBy: (data: T, buffer: T[], stream: Transform) => void, shouldFlush: boolean = true, options: TransformOptions = {}, ) { const buffer: T[] = []; return new Transform({ ...options, transform(data: T, encoding, callback) { accumulateBy(data, buffer, this); callback(); }, flush(callback) { if (shouldFlush) { this.push(buffer); } callback(); }, }); } function _sliding( windowLength: number, key?: string, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { if (key) { let index = 0; if (event[key] === undefined) { stream.emit( "error", new Error( `Key is missing in event: (${key}, ${JSON.stringify( event, )})`, ), ); stream.resume(); return; } while ( index < buffer.length && buffer[index][key] + windowLength <= event[key] ) { index++; } buffer.splice(0, index); } else if (buffer.length === windowLength) { buffer.shift(); } buffer.push(event); stream.push([...buffer]); }; } function _slidingByFunction( iteratee: AccumulatorByIteratee, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { let index = 0; while (index < buffer.length && iteratee(event, buffer[index])) { index++; } buffer.splice(0, index); buffer.push(event); stream.push([...buffer]); }; } function _rollingByFunction( iteratee: AccumulatorByIteratee, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { if (iteratee) { if (buffer.length > 0 && iteratee(event, buffer[0])) { stream.push(buffer.slice(0)); buffer.length = 0; } } buffer.push(event); }; } function _rolling( windowLength: number, key?: string, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { if (key) { if (event[key] === undefined) { stream.emit( "error", new Error( `Key is missing in event: (${key}, ${JSON.stringify( event, )})`, ), ); stream.resume(); return; } else if ( buffer.length > 0 && buffer[0][key] + windowLength <= event[key] ) { stream.push(buffer.slice(0)); buffer.length = 0; } } else if (buffer.length === windowLength) { stream.push(buffer.slice(0)); buffer.length = 0; } buffer.push(event); }; } export function accumulator( flushStrategy: FlushStrategy, batchSize: number, keyBy?: string, options?: TransformOptions, ): Transform { switch (flushStrategy) { case FlushStrategy.sliding: return sliding(batchSize, keyBy, options); case FlushStrategy.rolling: return rolling(batchSize, keyBy, options); } } export function accumulatorBy( flushStrategy: FlushStrategy, iteratee: AccumulatorByIteratee, options?: TransformOptions, ): Transform { switch (flushStrategy) { case FlushStrategy.sliding: return slidingBy(iteratee, options); case FlushStrategy.rolling: return rollingBy(iteratee, options); } } function sliding( windowLength: number, key?: string, options?: TransformOptions, ): Transform { return _accumulator(_sliding(windowLength, key), false, options); } function slidingBy( iteratee: AccumulatorByIteratee, options?: TransformOptions, ): Transform { return _accumulator(_slidingByFunction(iteratee), false, options); } function rolling( windowLength: number, key?: string, options?: TransformOptions, ): Transform { return _accumulator(_rolling(windowLength, key), true, options); } function rollingBy( iteratee: AccumulatorByIteratee, options?: TransformOptions, ): Transform { return _accumulator(_rollingByFunction(iteratee), true, options); }