diff --git a/src/functions/definitions.ts b/src/functions/definitions.ts index 962a7a4..2729b65 100644 --- a/src/functions/definitions.ts +++ b/src/functions/definitions.ts @@ -23,42 +23,6 @@ export interface JsonParseOptions { } export enum FlushStrategy { - sampling = "sampling", rolling = "rolling", sliding = "sliding", } - -export type AccumulatorOptions = S extends FlushStrategy.sampling - ? SamplingFlushOptions - : S extends FlushStrategy.sliding - ? SlidingFlushOptions - : S extends FlushStrategy.rolling - ? RollingFlushOptions - : never; - -export interface RollingFlushOptions { - windowLength: number; - flushMapper?: (flushed: Array) => Array; - timeout?: number; -} - -export interface SlidingFlushOptions { - windowLength: number; - windowMapper?: (flushed: Array) => Array; - timeout?: number; -} - -export interface SlidingFlushResult { - first: T; -} - -export interface SamplingFlushOptions { - condition: (event: T, buffer: Array) => boolean; - flushMapper?: (flushed: Array) => Array; - timeout?: number; -} - -export interface SamplingFlushResult { - flushed: boolean; - flush?: Array | null; -} diff --git a/src/functions/functions.ts b/src/functions/functions.ts index 143a5cc..2df000e 100644 --- a/src/functions/functions.ts +++ b/src/functions/functions.ts @@ -9,6 +9,7 @@ import { SerializationFormats, JsonValue, JsonParseOptions, + FlushStrategy, } from "./definitions"; import { sleep } from "../helpers"; @@ -651,7 +652,16 @@ function _rollingBy( ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { if (key) { - if ( + if (event[key] === undefined) { + stream.emit( + "error", + new Error( + `Key is missing in event: (${key}, ${JSON.stringify( + event, + )})`, + ), + ); + } else if ( buffer.length > 0 && buffer[0][key] + windowLength <= event[key] ) { @@ -669,12 +679,12 @@ function _rollingBy( export function accumulator( batchSize: number, batchRate: number | undefined, - flushStrategy: "sliding" | "rolling", + flushStrategy: FlushStrategy, keyBy?: string, ): Transform { - if (flushStrategy === "sliding") { + if (flushStrategy === FlushStrategy.sliding) { return sliding(batchSize, batchRate, keyBy); - } else if (flushStrategy === "rolling") { + } else if (flushStrategy === FlushStrategy.rolling) { return rolling(batchSize, batchRate, keyBy); } else { return batch(batchSize, batchRate);