diff --git a/src/functions/functions.spec.ts b/src/functions/functions.spec.ts index fc64ada..decd771 100644 --- a/src/functions/functions.spec.ts +++ b/src/functions/functions.spec.ts @@ -1405,6 +1405,7 @@ test.cb("parallel() parallel mapping", t => { }); test.cb.only("accumulator() buffering strategy", t => { + let chunkIndex = 0; interface TestObject { ts: number; key: string; @@ -1417,6 +1418,7 @@ test.cb.only("accumulator() buffering strategy", t => { { ts: 2, key: "d" }, { ts: 3, key: "e" }, ]; + source .pipe( accumulator(FlushStrategy.sampling, { @@ -1424,17 +1426,17 @@ test.cb.only("accumulator() buffering strategy", t => { }), ) .on("data", (flush: TestObject[]) => { - console.log("FLUSH", flush); - flush.forEach(item => expectedElements.includes(item)); - }) - .on("error", e => { - console.log("Got error: ", e); - t.end(); + if (chunkIndex === 0) { + chunkIndex++; + t.deepEqual(flush, expectedElements.slice(0, 4)); + } else { + t.deepEqual(flush, expectedElements.slice(4)); + } }) + .on("error", e => t.end) .on("end", () => { - console.log("end"); t.end(); }); - source.push(expectedElements); + expectedElements.forEach(element => source.push(element)); source.push(null); }); diff --git a/src/functions/functions.ts b/src/functions/functions.ts index 2cc2d78..40a534c 100644 --- a/src/functions/functions.ts +++ b/src/functions/functions.ts @@ -644,10 +644,11 @@ export function accumulator( const buffer: Array = []; return new Transform({ objectMode: true, - async transform(data, encoding, callback) { + async transform(data: T[] | T, encoding, callback) { callback(); switch (flushStrategy) { case FlushStrategy.sampling: { + if (!Array.isArray(data)) data = [data]; executeSamplingStrategy( data, options as SamplingFlushOptions,