This commit is contained in:
Jerry Kurian 2019-08-07 18:46:33 -04:00
parent af9293ab52
commit d918d8ca10
2 changed files with 12 additions and 9 deletions

View File

@ -1405,6 +1405,7 @@ test.cb("parallel() parallel mapping", t => {
}); });
test.cb.only("accumulator() buffering strategy", t => { test.cb.only("accumulator() buffering strategy", t => {
let chunkIndex = 0;
interface TestObject { interface TestObject {
ts: number; ts: number;
key: string; key: string;
@ -1417,6 +1418,7 @@ test.cb.only("accumulator() buffering strategy", t => {
{ ts: 2, key: "d" }, { ts: 2, key: "d" },
{ ts: 3, key: "e" }, { ts: 3, key: "e" },
]; ];
source source
.pipe( .pipe(
accumulator(FlushStrategy.sampling, { accumulator(FlushStrategy.sampling, {
@ -1424,17 +1426,17 @@ test.cb.only("accumulator() buffering strategy", t => {
}), }),
) )
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
console.log("FLUSH", flush); if (chunkIndex === 0) {
flush.forEach(item => expectedElements.includes(item)); chunkIndex++;
}) t.deepEqual(flush, expectedElements.slice(0, 4));
.on("error", e => { } else {
console.log("Got error: ", e); t.deepEqual(flush, expectedElements.slice(4));
t.end(); }
}) })
.on("error", e => t.end)
.on("end", () => { .on("end", () => {
console.log("end");
t.end(); t.end();
}); });
source.push(expectedElements); expectedElements.forEach(element => source.push(element));
source.push(null); source.push(null);
}); });

View File

@ -644,10 +644,11 @@ export function accumulator<T, R, S extends FlushStrategy>(
const buffer: Array<T> = []; const buffer: Array<T> = [];
return new Transform({ return new Transform({
objectMode: true, objectMode: true,
async transform(data, encoding, callback) { async transform(data: T[] | T, encoding, callback) {
callback(); callback();
switch (flushStrategy) { switch (flushStrategy) {
case FlushStrategy.sampling: { case FlushStrategy.sampling: {
if (!Array.isArray(data)) data = [data];
executeSamplingStrategy( executeSamplingStrategy(
data, data,
options as SamplingFlushOptions<T, R>, options as SamplingFlushOptions<T, R>,