Clean up types

This commit is contained in:
Jerry Kurian 2019-08-12 11:59:27 -04:00
parent 5112ee9540
commit e8d672d903
2 changed files with 14 additions and 40 deletions

View File

@ -23,42 +23,6 @@ export interface JsonParseOptions {
} }
export enum FlushStrategy { export enum FlushStrategy {
sampling = "sampling",
rolling = "rolling", rolling = "rolling",
sliding = "sliding", sliding = "sliding",
} }
export type AccumulatorOptions<T, R, S> = S extends FlushStrategy.sampling
? SamplingFlushOptions<T, R>
: S extends FlushStrategy.sliding
? SlidingFlushOptions<T, R>
: S extends FlushStrategy.rolling
? RollingFlushOptions<T, R>
: never;
export interface RollingFlushOptions<T, R> {
windowLength: number;
flushMapper?: (flushed: Array<T>) => Array<R>;
timeout?: number;
}
export interface SlidingFlushOptions<T, R> {
windowLength: number;
windowMapper?: (flushed: Array<T>) => Array<R>;
timeout?: number;
}
export interface SlidingFlushResult<T> {
first: T;
}
export interface SamplingFlushOptions<T, R> {
condition: (event: T, buffer: Array<T>) => boolean;
flushMapper?: (flushed: Array<T>) => Array<R>;
timeout?: number;
}
export interface SamplingFlushResult<T> {
flushed: boolean;
flush?: Array<T> | null;
}

View File

@ -9,6 +9,7 @@ import {
SerializationFormats, SerializationFormats,
JsonValue, JsonValue,
JsonParseOptions, JsonParseOptions,
FlushStrategy,
} from "./definitions"; } from "./definitions";
import { sleep } from "../helpers"; import { sleep } from "../helpers";
@ -651,7 +652,16 @@ function _rollingBy<T>(
): (event: T, buffer: T[], stream: Transform) => void { ): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => { return (event: T, buffer: T[], stream: Transform) => {
if (key) { if (key) {
if ( if (event[key] === undefined) {
stream.emit(
"error",
new Error(
`Key is missing in event: (${key}, ${JSON.stringify(
event,
)})`,
),
);
} else if (
buffer.length > 0 && buffer.length > 0 &&
buffer[0][key] + windowLength <= event[key] buffer[0][key] + windowLength <= event[key]
) { ) {
@ -669,12 +679,12 @@ function _rollingBy<T>(
export function accumulator( export function accumulator(
batchSize: number, batchSize: number,
batchRate: number | undefined, batchRate: number | undefined,
flushStrategy: "sliding" | "rolling", flushStrategy: FlushStrategy,
keyBy?: string, keyBy?: string,
): Transform { ): Transform {
if (flushStrategy === "sliding") { if (flushStrategy === FlushStrategy.sliding) {
return sliding(batchSize, batchRate, keyBy); return sliding(batchSize, batchRate, keyBy);
} else if (flushStrategy === "rolling") { } else if (flushStrategy === FlushStrategy.rolling) {
return rolling(batchSize, batchRate, keyBy); return rolling(batchSize, batchRate, keyBy);
} else { } else {
return batch(batchSize, batchRate); return batch(batchSize, batchRate);