Improve interface for accumulator

This commit is contained in:
Jerry Kurian 2019-09-12 14:40:47 -04:00
parent 517e281ce5
commit 4c7e9ceb7e
5 changed files with 86 additions and 50 deletions

View File

@ -4,10 +4,11 @@ import {
FlushStrategy, FlushStrategy,
TransformOptions, TransformOptions,
} from "./baseDefinitions"; } from "./baseDefinitions";
import { batch } from "."; import { batch, rate as _rate } from ".";
function _accumulator<T>( function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void, accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
rate?: number,
shouldFlush: boolean = true, shouldFlush: boolean = true,
options: TransformOptions = { options: TransformOptions = {
readableObjectMode: true, readableObjectMode: true,
@ -15,7 +16,7 @@ function _accumulator<T>(
}, },
) { ) {
const buffer: T[] = []; const buffer: T[] = [];
return new Transform({ const stream = new Transform({
...options, ...options,
transform(data: T, encoding, callback) { transform(data: T, encoding, callback) {
accumulateBy(data, buffer, this); accumulateBy(data, buffer, this);
@ -28,11 +29,14 @@ function _accumulator<T>(
callback(); callback();
}, },
}); });
if (rate) {
stream.pipe(_rate(rate));
}
return stream;
} }
function _sliding<T>( function _sliding<T>(
windowLength: number, windowLength: number,
rate: number | undefined,
key?: string, key?: string,
): (event: T, buffer: T[], stream: Transform) => void { ): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => { return (event: T, buffer: T[], stream: Transform) => {
@ -66,7 +70,6 @@ function _sliding<T>(
} }
function _slidingByFunction<T>( function _slidingByFunction<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void { ): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => { return (event: T, buffer: T[], stream: Transform) => {
@ -81,7 +84,6 @@ function _slidingByFunction<T>(
} }
function _rollingByFunction<T>( function _rollingByFunction<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void { ): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => { return (event: T, buffer: T[], stream: Transform) => {
@ -97,7 +99,6 @@ function _rollingByFunction<T>(
function _rolling<T>( function _rolling<T>(
windowLength: number, windowLength: number,
rate: number | undefined,
key?: string, key?: string,
): (event: T, buffer: T[], stream: Transform) => void { ): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => { return (event: T, buffer: T[], stream: Transform) => {
@ -129,58 +130,84 @@ function _rolling<T>(
} }
export function accumulator( export function accumulator(
batchSize: number,
batchRate: number | undefined,
flushStrategy: FlushStrategy, flushStrategy: FlushStrategy,
batchSize: number,
keyBy?: string, keyBy?: string,
options: TransformOptions & { rate?: number } = {
readableObjectMode: true,
writableObjectMode: true,
},
): Transform { ): Transform {
if (flushStrategy === FlushStrategy.sliding) { if (flushStrategy === FlushStrategy.sliding) {
return sliding(batchSize, batchRate, keyBy); return sliding(batchSize, keyBy, options);
} else if (flushStrategy === FlushStrategy.rolling) { } else if (flushStrategy === FlushStrategy.rolling) {
return rolling(batchSize, batchRate, keyBy); return rolling(batchSize, keyBy, options);
} else { } else {
return batch(batchSize, batchRate); return batch(batchSize, options.rate);
} }
} }
export function accumulatorBy<T, S extends FlushStrategy>( export function accumulatorBy<T, S extends FlushStrategy>(
batchRate: number | undefined,
flushStrategy: S, flushStrategy: S,
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
options: TransformOptions & { rate?: number } = {
readableObjectMode: true,
writableObjectMode: true,
},
): Transform { ): Transform {
if (flushStrategy === FlushStrategy.sliding) { if (flushStrategy === FlushStrategy.sliding) {
return slidingBy(batchRate, iteratee); return slidingBy(iteratee, options);
} else { } else {
return rollingBy(batchRate, iteratee); return rollingBy(iteratee, options);
} }
} }
function sliding( function sliding(
windowLength: number, windowLength: number,
rate: number | undefined,
key?: string, key?: string,
options?: TransformOptions & { rate?: number },
): Transform { ): Transform {
return _accumulator(_sliding(windowLength, rate, key), false); return _accumulator(
_sliding(windowLength, key),
options && options.rate,
false,
options,
);
} }
function slidingBy<T>( function slidingBy<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
options?: TransformOptions & { rate?: number },
): Transform { ): Transform {
return _accumulator(_slidingByFunction(rate, iteratee), false); return _accumulator(
_slidingByFunction(iteratee),
options && options.rate,
false,
options,
);
} }
function rolling( function rolling(
windowLength: number, windowLength: number,
rate: number | undefined,
key?: string, key?: string,
options?: TransformOptions & { rate?: number },
): Transform { ): Transform {
return _accumulator(_rolling(windowLength, rate, key)); return _accumulator(
_rolling(windowLength, key),
options && options.rate,
true,
options,
);
} }
function rollingBy<T>( function rollingBy<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
options?: TransformOptions & { rate?: number },
): Transform { ): Transform {
return _accumulator(_rollingByFunction(rate, iteratee)); return _accumulator(
_rollingByFunction(iteratee),
options && options.rate,
true,
options,
);
} }

View File

@ -58,7 +58,21 @@ class Demux extends Writable {
this.streamsByKey[destKey] = this.construct(destKey); this.streamsByKey[destKey] = this.construct(destKey);
} }
if (!this.streamsByKey[destKey].write(chunk, encoding)) { if (!this.streamsByKey[destKey].write(chunk, encoding)) {
console.log(
"waiting drain",
chunk,
this._writableState.length,
this.streamsByKey[destKey]._writableState.length,
this.streamsByKey[destKey]._readableState.length,
);
this.streamsByKey[destKey].once("drain", () => { this.streamsByKey[destKey].once("drain", () => {
console.log(
"calling cb after drain",
chunk,
this._writableState.length,
this.streamsByKey[destKey]._writableState.length,
this.streamsByKey[destKey]._readableState.length,
);
cb(); cb();
}); });
} else { } else {

View File

@ -171,10 +171,11 @@ export const parallelMap = baseFunctions.parallelMap;
* When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize * When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize
* is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000, * is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000,
* each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element. * each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element.
* @param flushStrategy Buffering strategy to use.
* @param batchSize Size of the batch (in units of buffer length or value at key). * @param batchSize Size of the batch (in units of buffer length or value at key).
* @param batchRate Desired rate of data transfer to next stream. * @param batchRate Desired rate of data transfer to next stream.
* @param flushStrategy Buffering strategy to use.
* @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer. * @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer.
* @param options Transform stream options
*/ */
export const accumulator = baseFunctions.accumulator; export const accumulator = baseFunctions.accumulator;
@ -187,10 +188,11 @@ export const accumulator = baseFunctions.accumulator;
* 2. Rolling * 2. Rolling
* - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is * - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is
* then pushed into the buffer. * then pushed into the buffer.
* @param batchRate Desired rate of data transfer to next stream.
* @param flushStrategy Buffering strategy to use. * @param flushStrategy Buffering strategy to use.
* @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into * @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into
* or items need to be cleared from buffer. * or items need to be cleared from buffer.
* @param batchRate Desired rate of data transfer to next stream.
* @param options Transform stream options
*/ */
export const accumulatorBy = baseFunctions.accumulatorBy; export const accumulatorBy = baseFunctions.accumulatorBy;

View File

@ -18,7 +18,7 @@ test.cb("accumulator() rolling", t => {
const flushes = [firstFlush, secondFlush, thirdFlush]; const flushes = [firstFlush, secondFlush, thirdFlush];
source source
.pipe(accumulator(2, undefined, FlushStrategy.rolling)) .pipe(accumulator(FlushStrategy.rolling, 2))
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]); t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++; chunkIndex++;
@ -51,7 +51,7 @@ test.cb("accumulator() rolling with key", t => {
const flushes = [firstFlush, secondFlush]; const flushes = [firstFlush, secondFlush];
source source
.pipe(accumulator(3, undefined, FlushStrategy.rolling, "ts")) .pipe(accumulator(FlushStrategy.rolling, 3, "ts"))
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]); t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++; chunkIndex++;
@ -77,9 +77,8 @@ test.cb(
} }
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator( const accumulatorStream = accumulator(
3,
undefined,
FlushStrategy.rolling, FlushStrategy.rolling,
3,
"nonExistingKey", "nonExistingKey",
); );
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
@ -119,12 +118,7 @@ test.cb(
key: string; key: string;
} }
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator( const accumulatorStream = accumulator(FlushStrategy.rolling, 3, "ts");
3,
undefined,
FlushStrategy.rolling,
"ts",
);
const input = [ const input = [
{ ts: 0, key: "a" }, { ts: 0, key: "a" },
{ ts: 1, key: "b" }, { ts: 1, key: "b" },
@ -193,7 +187,7 @@ test.cb("accumulator() sliding", t => {
const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush]; const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush];
source source
.pipe(accumulator(3, undefined, FlushStrategy.sliding)) .pipe(accumulator(FlushStrategy.sliding, 3))
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]); t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++; chunkIndex++;
@ -248,7 +242,7 @@ test.cb("accumulator() sliding with key", t => {
sixthFlush, sixthFlush,
]; ];
source source
.pipe(accumulator(3, undefined, FlushStrategy.sliding, "ts")) .pipe(accumulator(FlushStrategy.sliding, 3, "ts"))
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]); t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++; chunkIndex++;
@ -274,9 +268,8 @@ test.cb(
} }
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator( const accumulatorStream = accumulator(
3,
undefined,
FlushStrategy.sliding, FlushStrategy.sliding,
3,
"nonExistingKey", "nonExistingKey",
); );
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
@ -315,12 +308,7 @@ test.cb(
key: string; key: string;
} }
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator( const accumulatorStream = accumulator(FlushStrategy.sliding, 3, "ts");
3,
undefined,
FlushStrategy.sliding,
"ts",
);
const input = [ const input = [
{ ts: 0, key: "a" }, { ts: 0, key: "a" },
{ key: "b" }, { key: "b" },
@ -386,7 +374,6 @@ test.cb("accumulatorBy() rolling", t => {
source source
.pipe( .pipe(
accumulatorBy( accumulatorBy(
undefined,
FlushStrategy.rolling, FlushStrategy.rolling,
(event: TestObject, bufferChunk: TestObject) => { (event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts; return bufferChunk.ts + 3 <= event.ts;
@ -422,7 +409,6 @@ test.cb.skip(
{ ts: 2, key: "c" }, { ts: 2, key: "c" },
]; ];
const accumulaterStream = accumulatorBy( const accumulaterStream = accumulatorBy(
undefined,
FlushStrategy.rolling, FlushStrategy.rolling,
(event: TestObject, bufferChunk: TestObject) => { (event: TestObject, bufferChunk: TestObject) => {
if (event.key !== "a") { if (event.key !== "a") {
@ -490,7 +476,6 @@ test.cb("accumulatorBy() sliding", t => {
source source
.pipe( .pipe(
accumulatorBy( accumulatorBy(
undefined,
FlushStrategy.sliding, FlushStrategy.sliding,
(event: TestObject, bufferChunk: TestObject) => { (event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts ? true : false; return bufferChunk.ts + 3 <= event.ts ? true : false;
@ -526,7 +511,6 @@ test.cb.skip(
{ ts: 2, key: "c" }, { ts: 2, key: "c" },
]; ];
const accumulaterStream = accumulatorBy( const accumulaterStream = accumulatorBy(
undefined,
FlushStrategy.sliding, FlushStrategy.sliding,
(event: TestObject, bufferChunk: TestObject) => { (event: TestObject, bufferChunk: TestObject) => {
if (event.key !== "a") { if (event.key !== "a") {

View File

@ -379,6 +379,7 @@ test.cb.only(
const construct = (destKey: string) => { const construct = (destKey: string) => {
const first = map( const first = map(
(chunk: Chunk) => { (chunk: Chunk) => {
console.log("1: ", chunk);
chunk.mapped.push(1); chunk.mapped.push(1);
return chunk; return chunk;
}, },
@ -387,7 +388,9 @@ test.cb.only(
const second = map( const second = map(
async (chunk: Chunk) => { async (chunk: Chunk) => {
console.log("2: ", chunk);
await sleep(slowProcessorSpeed); await sleep(slowProcessorSpeed);
console.log("2 done ", chunk);
chunk.mapped.push(2); chunk.mapped.push(2);
return chunk; return chunk;
}, },
@ -408,6 +411,7 @@ test.cb.only(
// This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event) // This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event)
_demux.on("drain", () => { _demux.on("drain", () => {
expect(_demux._writableState.length).to.be.equal(0); expect(_demux._writableState.length).to.be.equal(0);
console.log(performance.now() - start);
expect(performance.now() - start).to.be.greaterThan( expect(performance.now() - start).to.be.greaterThan(
slowProcessorSpeed * (input.length - 2), slowProcessorSpeed * (input.length - 2),
); );
@ -427,7 +431,7 @@ test.cb.only(
const start = performance.now(); const start = performance.now();
input.forEach(item => { input.forEach(item => {
_demux.write(item); console.log(_demux.write(item));
}); });
}, },
); );
@ -457,6 +461,7 @@ test.cb(
const construct = (destKey: string) => { const construct = (destKey: string) => {
const first = map( const first = map(
(chunk: Chunk) => { (chunk: Chunk) => {
console.log("1: ", chunk);
chunk.mapped.push(1); chunk.mapped.push(1);
return chunk; return chunk;
}, },
@ -464,6 +469,7 @@ test.cb(
); );
const second = map( const second = map(
(chunk: Chunk) => { (chunk: Chunk) => {
console.log("2: ", chunk);
chunk.mapped.push(2); chunk.mapped.push(2);
return chunk; return chunk;
}, },
@ -472,7 +478,9 @@ test.cb(
const third = map( const third = map(
async (chunk: Chunk) => { async (chunk: Chunk) => {
console.log("3: ", chunk);
await sleep(slowProcessorSpeed); await sleep(slowProcessorSpeed);
console.log(" 3 done ", chunk);
chunk.mapped.push(3); chunk.mapped.push(3);
return chunk; return chunk;
}, },
@ -496,6 +504,7 @@ test.cb(
// This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event) // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event)
_demux.on("drain", () => { _demux.on("drain", () => {
expect(_demux._writableState.length).to.be.equal(0); expect(_demux._writableState.length).to.be.equal(0);
console.log(performance.now() - start);
expect(performance.now() - start).to.be.greaterThan( expect(performance.now() - start).to.be.greaterThan(
slowProcessorSpeed * (input.length - 4), slowProcessorSpeed * (input.length - 4),
); );
@ -515,7 +524,7 @@ test.cb(
const start = performance.now(); const start = performance.now();
input.forEach(item => { input.forEach(item => {
_demux.write(item); console.log(_demux.write(item));
}); });
}, },
); );