Add tests

This commit is contained in:
Jerry Kurian 2019-08-08 10:58:56 -04:00
parent d918d8ca10
commit a60b23496b
4 changed files with 108 additions and 12 deletions

View File

@ -38,12 +38,14 @@ export type AccumulatorOptions<T, R, S> = S extends FlushStrategy.sampling
export interface RollingFlushOptions<T, R> { export interface RollingFlushOptions<T, R> {
windowLength: number; windowLength: number;
afterFlush?: (flushed: Array<T>) => Array<R>; flushMapper?: (flushed: Array<T>) => Array<R>;
timeout?: number;
} }
export interface SlidingFlushOptions<T, R> { export interface SlidingFlushOptions<T, R> {
windowLength: number; windowLength: number;
afterFlush?: (flushed: Array<T>) => Array<R>; flushMapper?: (flushed: Array<T>) => Array<R>;
timeout?: number;
} }
export interface SlidingFlushResult<T> { export interface SlidingFlushResult<T> {
@ -53,6 +55,7 @@ export interface SlidingFlushResult<T> {
export interface SamplingFlushOptions<T, R> { export interface SamplingFlushOptions<T, R> {
condition: (event: T, buffer: Array<T>) => boolean; condition: (event: T, buffer: Array<T>) => boolean;
flushMapper?: (flushed: Array<T>) => Array<R>; flushMapper?: (flushed: Array<T>) => Array<R>;
timeout?: number;
} }
export interface SamplingFlushResult<T> { export interface SamplingFlushResult<T> {

View File

@ -1404,20 +1404,21 @@ test.cb("parallel() parallel mapping", t => {
source.push(null); source.push(null);
}); });
test.cb.only("accumulator() buffering strategy", t => { test.cb("accumulator() buffering strategy clears buffer on condition", t => {
t.plan(2);
let chunkIndex = 0; let chunkIndex = 0;
interface TestObject { interface TestObject {
ts: number; ts: number;
key: string; key: string;
} }
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const expectedElements = [ const firstFlush = [
{ ts: 0, key: "a" }, { ts: 0, key: "a" },
{ ts: 1, key: "b" }, { ts: 1, key: "b" },
{ ts: 2, key: "c" }, { ts: 2, key: "c" },
{ ts: 2, key: "d" }, { ts: 2, key: "d" },
{ ts: 3, key: "e" },
]; ];
const secondFlush = [{ ts: 3, key: "e" }];
source source
.pipe( .pipe(
@ -1428,15 +1429,98 @@ test.cb.only("accumulator() buffering strategy", t => {
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
if (chunkIndex === 0) { if (chunkIndex === 0) {
chunkIndex++; chunkIndex++;
t.deepEqual(flush, expectedElements.slice(0, 4)); t.deepEqual(flush, firstFlush);
} else { } else {
t.deepEqual(flush, expectedElements.slice(4)); t.deepEqual(flush, secondFlush);
} }
}) })
.on("error", e => t.end) .on("error", e => t.end)
.on("end", () => { .on("end", () => {
t.end(); t.end();
}); });
expectedElements.forEach(element => source.push(element)); source.push([...firstFlush, ...secondFlush]);
source.push(null); source.push(null);
}); });
test.cb("accumulator() buffering strategy clears buffer on timeout", t => {
t.plan(2);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true, read: () => {} });
const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const secondFlush = [
{ ts: 2, key: "c" },
{ ts: 2, key: "d" },
{ ts: 3, key: "e" },
];
source
.pipe(
accumulator(FlushStrategy.sampling, {
condition: (event: TestObject) => event.ts > 3,
timeout: 1000,
}),
)
.on("data", (flush: TestObject[]) => {
if (chunkIndex === 0) {
chunkIndex++;
t.deepEqual(flush, firstFlush);
} else {
t.deepEqual(flush, secondFlush);
}
})
.on("error", e => t.end)
.on("end", () => {
t.end();
});
source.push(firstFlush);
setTimeout(() => {
source.push(secondFlush);
source.push(null);
}, 2000);
});
test.cb(
"accumulator() buffering strategy clears buffer on condition or timeout",
t => {
t.plan(3);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true, read: () => {} });
const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const secondFlush = [{ ts: 2, key: "c" }, { ts: 2, key: "d" }];
const thirdFlush = [{ ts: 3, key: "e" }];
source
.pipe(
accumulator(FlushStrategy.sampling, {
condition: (event: TestObject) => event.ts > 2,
timeout: 1000,
}),
)
.on("data", (flush: TestObject[]) => {
if (chunkIndex === 0) {
chunkIndex++;
t.deepEqual(flush, firstFlush);
} else if (chunkIndex === 1) {
chunkIndex++;
t.deepEqual(flush, secondFlush);
} else {
t.deepEqual(flush, thirdFlush);
}
})
.on("error", e => t.end)
.on("end", () => {
t.end();
});
source.push(firstFlush);
setTimeout(() => {
source.push([...secondFlush, ...thirdFlush]);
source.push(null);
}, 2000);
},
);

View File

@ -525,7 +525,6 @@ export function batch(batchSize: number = 1000, maxBatchAge: number = 500) {
callback(); callback();
}, },
flush(callback) { flush(callback) {
console.error("flushing");
sendChunk(this); sendChunk(this);
callback(); callback();
}, },
@ -642,10 +641,18 @@ export function accumulator<T, R, S extends FlushStrategy>(
options: AccumulatorOptions<T, R, S>, options: AccumulatorOptions<T, R, S>,
) { ) {
const buffer: Array<T> = []; const buffer: Array<T> = [];
return new Transform({ let handle: NodeJS.Timer | null = null;
if (options.timeout) {
handle = setInterval(() => {
if (buffer.length > 0) {
transform.push(buffer);
buffer.length = 0;
}
}, options.timeout);
}
const transform = new Transform({
objectMode: true, objectMode: true,
async transform(data: T[] | T, encoding, callback) { async transform(data: T[] | T, encoding, callback) {
callback();
switch (flushStrategy) { switch (flushStrategy) {
case FlushStrategy.sampling: { case FlushStrategy.sampling: {
if (!Array.isArray(data)) data = [data]; if (!Array.isArray(data)) data = [data];
@ -655,6 +662,7 @@ export function accumulator<T, R, S extends FlushStrategy>(
buffer, buffer,
this, this,
); );
callback();
break; break;
} }
case FlushStrategy.sliding: { case FlushStrategy.sliding: {
@ -663,8 +671,10 @@ export function accumulator<T, R, S extends FlushStrategy>(
} }
}, },
flush(callback) { flush(callback) {
handle && clearInterval(handle);
this.push(buffer); this.push(buffer);
callback(); callback();
}, },
}); });
return transform;
} }

View File

@ -245,7 +245,6 @@ export function parallelMap<T, R>(
parallel?: number, parallel?: number,
sleepTime?: number, sleepTime?: number,
) { ) {
console.log("hi");
return baseFunctions.parallelMap(mapper, parallel, sleepTime); return baseFunctions.parallelMap(mapper, parallel, sleepTime);
} }