diff --git a/package.json b/package.json index 8009f9f..9f86034 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,7 @@ "tslint": "^5.11.0", "tslint-config-prettier": "^1.16.0", "tslint-plugin-prettier": "^2.0.1", - "typescript": "^3.1.6" + "typescript": "^3.5.3" }, "ava": { "files": [ diff --git a/src/functions/definitions.ts b/src/functions/definitions.ts index 8abb2ea..6593ec4 100644 --- a/src/functions/definitions.ts +++ b/src/functions/definitions.ts @@ -22,3 +22,40 @@ export interface JsonParseOptions { pretty: boolean; } +export enum FlushStrategy { + sampling, + rolling, + sliding, +} + +export type AccumulatorOptions = S extends FlushStrategy.sampling + ? SamplingFlushOptions + : S extends FlushStrategy.sliding + ? SlidingFlushOptions + : S extends FlushStrategy.rolling + ? RollingFlushOptions + : never; + +export interface RollingFlushOptions { + windowLength: number; + afterFlush?: (flushed: Array) => Array; +} + +export interface SlidingFlushOptions { + windowLength: number; + afterFlush?: (flushed: Array) => Array; +} + +export interface SlidingFlushResult { + first: T; +} + +export interface SamplingFlushOptions { + condition: (event: T, buffer: Array) => boolean; + flushMapper?: (flushed: Array) => Array; +} + +export interface SamplingFlushResult { + flushed: boolean; + flush?: Array | null; +} diff --git a/src/functions/functions.spec.ts b/src/functions/functions.spec.ts index 5bc2f9f..fc64ada 100644 --- a/src/functions/functions.spec.ts +++ b/src/functions/functions.spec.ts @@ -3,6 +3,7 @@ import test from "ava"; import { expect } from "chai"; import { performance } from "perf_hooks"; import { Readable } from "stream"; +import { FlushStrategy } from "./definitions"; import { fromArray, map, @@ -24,6 +25,7 @@ import { unbatch, rate, parallelMap, + accumulator, } from "."; import { sleep } from "../helpers"; @@ -1401,3 +1403,38 @@ test.cb("parallel() parallel mapping", t => { source.push("f"); source.push(null); }); + +test.cb.only("accumulator() buffering strategy", t => { + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const expectedElements = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { ts: 2, key: "d" }, + { ts: 3, key: "e" }, + ]; + source + .pipe( + accumulator(FlushStrategy.sampling, { + condition: (event: TestObject) => event.ts > 2, + }), + ) + .on("data", (flush: TestObject[]) => { + console.log("FLUSH", flush); + flush.forEach(item => expectedElements.includes(item)); + }) + .on("error", e => { + console.log("Got error: ", e); + t.end(); + }) + .on("end", () => { + console.log("end"); + t.end(); + }); + source.push(expectedElements); + source.push(null); +}); diff --git a/src/functions/functions.ts b/src/functions/functions.ts index fd10e73..2cc2d78 100644 --- a/src/functions/functions.ts +++ b/src/functions/functions.ts @@ -4,6 +4,10 @@ import { ChildProcess } from "child_process"; import { StringDecoder } from "string_decoder"; import { + FlushStrategy, + AccumulatorOptions, + SamplingFlushOptions, + SamplingFlushResult, TransformOptions, ThroughOptions, WithEncoding, @@ -602,3 +606,64 @@ export function parallelMap( }, }); } + +function samplingFlush( + event: T, + options: SamplingFlushOptions, + buffer: Array, +): SamplingFlushResult { + let flush = null; + if (options.condition(event, buffer)) { + flush = buffer.slice(0); + buffer.length = 0; + } + buffer.push(event); + return { flushed: true, flush }; +} + +function executeSamplingStrategy( + events: T[], + options: SamplingFlushOptions, + buffer: Array, + stream: Transform, +): void { + events.forEach(event => { + const sample = samplingFlush(event, options, buffer); + if (sample.flushed && sample.flush && options.flushMapper) { + stream.push(options.flushMapper(sample.flush)); + } else if (sample.flushed && sample.flush) { + stream.push(sample.flush); + } + }); +} + +export function accumulator( + flushStrategy: S, + options: AccumulatorOptions, +) { + const buffer: Array = []; + return new Transform({ + objectMode: true, + async transform(data, encoding, callback) { + callback(); + switch (flushStrategy) { + case FlushStrategy.sampling: { + executeSamplingStrategy( + data, + options as SamplingFlushOptions, + buffer, + this, + ); + break; + } + case FlushStrategy.sliding: { + break; + } + } + }, + flush(callback) { + this.push(buffer); + callback(); + }, + }); +} diff --git a/src/functions/index.ts b/src/functions/index.ts index 9085930..6f2c058 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -3,6 +3,8 @@ import { ChildProcess } from "child_process"; import * as baseFunctions from "./functions"; import { + AccumulatorOptions, + FlushStrategy, ThroughOptions, TransformOptions, WithEncoding, @@ -205,7 +207,10 @@ export function last(readable: Readable): Promise { * @param batchSize Size of the batches, defaults to 1000. * @param maxBatchAge? Max lifetime of a batch, defaults to 500 */ -export function batch(batchSize: number, maxBatchAge?: number): NodeJS.ReadWriteStream { +export function batch( + batchSize: number, + maxBatchAge?: number, +): NodeJS.ReadWriteStream { return baseFunctions.batch(batchSize, maxBatchAge); } @@ -222,7 +227,10 @@ export function unbatch(): NodeJS.ReadWriteStream { * @param targetRate? Desired rate in ms * @param period? Period to sleep for when rate is above or equal to targetRate */ -export function rate(targetRate?: number, period?: number): NodeJS.ReadWriteStream { +export function rate( + targetRate?: number, + period?: number, +): NodeJS.ReadWriteStream { return baseFunctions.rate(targetRate, period); } @@ -237,5 +245,13 @@ export function parallelMap( parallel?: number, sleepTime?: number, ) { + console.log("hi"); return baseFunctions.parallelMap(mapper, parallel, sleepTime); } + +export function accumulator( + flushStrategy: S, + options: AccumulatorOptions, +) { + return baseFunctions.accumulator(flushStrategy, options); +} diff --git a/yarn.lock b/yarn.lock index a103462..e0b7aca 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3309,11 +3309,16 @@ type-fest@^0.3.0: resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.3.1.tgz#63d00d204e059474fe5e1b7c011112bbd1dc29e1" integrity sha512-cUGJnCdr4STbePCgqNFbpVNCepa+kAVohJs1sLhxzdH+gnEoOd8VhbYa7pD3zZYGiURWM2xzEII3fQcRizDkYQ== -typescript@*, typescript@^3.1.6: +typescript@*: version "3.5.2" resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.5.2.tgz#a09e1dc69bc9551cadf17dba10ee42cf55e5d56c" integrity sha512-7KxJovlYhTX5RaRbUdkAXN1KUZ8PwWlTzQdHV6xNqvuFOs7+WBo10TQUqT19Q/Jz2hk5v9TQDIhyLhhJY4p5AA== +typescript@^3.5.3: + version "3.5.3" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.5.3.tgz#c830f657f93f1ea846819e929092f5fe5983e977" + integrity sha512-ACzBtm/PhXBDId6a6sDJfroT2pOWt/oOnk4/dElG5G33ZL776N3Y6/6bKZJBFpd+b05F3Ct9qDjMeJmRWtE2/g== + uid2@0.0.3: version "0.0.3" resolved "https://registry.yarnpkg.com/uid2/-/uid2-0.0.3.tgz#483126e11774df2f71b8b639dcd799c376162b82"