diff --git a/src/functions/accumulator.ts b/src/functions/accumulator.ts index bb8a7fb..0bef8e1 100644 --- a/src/functions/accumulator.ts +++ b/src/functions/accumulator.ts @@ -4,10 +4,11 @@ import { FlushStrategy, TransformOptions, } from "./baseDefinitions"; -import { batch } from "."; +import { batch, rate as _rate } from "."; function _accumulator( accumulateBy: (data: T, buffer: T[], stream: Transform) => void, + rate?: number, shouldFlush: boolean = true, options: TransformOptions = { readableObjectMode: true, @@ -15,7 +16,7 @@ function _accumulator( }, ) { const buffer: T[] = []; - return new Transform({ + const stream = new Transform({ ...options, transform(data: T, encoding, callback) { accumulateBy(data, buffer, this); @@ -28,11 +29,14 @@ function _accumulator( callback(); }, }); + if (rate) { + stream.pipe(_rate(rate)); + } + return stream; } function _sliding( windowLength: number, - rate: number | undefined, key?: string, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { @@ -66,7 +70,6 @@ function _sliding( } function _slidingByFunction( - rate: number | undefined, iteratee: AccumulatorByIteratee, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { @@ -81,7 +84,6 @@ function _slidingByFunction( } function _rollingByFunction( - rate: number | undefined, iteratee: AccumulatorByIteratee, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { @@ -97,7 +99,6 @@ function _rollingByFunction( function _rolling( windowLength: number, - rate: number | undefined, key?: string, ): (event: T, buffer: T[], stream: Transform) => void { return (event: T, buffer: T[], stream: Transform) => { @@ -129,58 +130,84 @@ function _rolling( } export function accumulator( - batchSize: number, - batchRate: number | undefined, flushStrategy: FlushStrategy, + batchSize: number, keyBy?: string, + options: TransformOptions & { rate?: number } = { + readableObjectMode: true, + writableObjectMode: true, + }, ): Transform { if (flushStrategy === FlushStrategy.sliding) { - return sliding(batchSize, batchRate, keyBy); + return sliding(batchSize, keyBy, options); } else if (flushStrategy === FlushStrategy.rolling) { - return rolling(batchSize, batchRate, keyBy); + return rolling(batchSize, keyBy, options); } else { - return batch(batchSize, batchRate); + return batch(batchSize, options.rate); } } export function accumulatorBy( - batchRate: number | undefined, flushStrategy: S, iteratee: AccumulatorByIteratee, + options: TransformOptions & { rate?: number } = { + readableObjectMode: true, + writableObjectMode: true, + }, ): Transform { if (flushStrategy === FlushStrategy.sliding) { - return slidingBy(batchRate, iteratee); + return slidingBy(iteratee, options); } else { - return rollingBy(batchRate, iteratee); + return rollingBy(iteratee, options); } } function sliding( windowLength: number, - rate: number | undefined, key?: string, + options?: TransformOptions & { rate?: number }, ): Transform { - return _accumulator(_sliding(windowLength, rate, key), false); + return _accumulator( + _sliding(windowLength, key), + options && options.rate, + false, + options, + ); } function slidingBy( - rate: number | undefined, iteratee: AccumulatorByIteratee, + options?: TransformOptions & { rate?: number }, ): Transform { - return _accumulator(_slidingByFunction(rate, iteratee), false); + return _accumulator( + _slidingByFunction(iteratee), + options && options.rate, + false, + options, + ); } function rolling( windowLength: number, - rate: number | undefined, key?: string, + options?: TransformOptions & { rate?: number }, ): Transform { - return _accumulator(_rolling(windowLength, rate, key)); + return _accumulator( + _rolling(windowLength, key), + options && options.rate, + true, + options, + ); } function rollingBy( - rate: number | undefined, iteratee: AccumulatorByIteratee, + options?: TransformOptions & { rate?: number }, ): Transform { - return _accumulator(_rollingByFunction(rate, iteratee)); + return _accumulator( + _rollingByFunction(iteratee), + options && options.rate, + true, + options, + ); } diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 4b6312a..2a515ec 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -58,7 +58,21 @@ class Demux extends Writable { this.streamsByKey[destKey] = this.construct(destKey); } if (!this.streamsByKey[destKey].write(chunk, encoding)) { + console.log( + "waiting drain", + chunk, + this._writableState.length, + this.streamsByKey[destKey]._writableState.length, + this.streamsByKey[destKey]._readableState.length, + ); this.streamsByKey[destKey].once("drain", () => { + console.log( + "calling cb after drain", + chunk, + this._writableState.length, + this.streamsByKey[destKey]._writableState.length, + this.streamsByKey[destKey]._readableState.length, + ); cb(); }); } else { diff --git a/src/functions/index.ts b/src/functions/index.ts index 48d8062..778b1b9 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -171,10 +171,11 @@ export const parallelMap = baseFunctions.parallelMap; * When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize * is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000, * each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element. + * @param flushStrategy Buffering strategy to use. * @param batchSize Size of the batch (in units of buffer length or value at key). * @param batchRate Desired rate of data transfer to next stream. - * @param flushStrategy Buffering strategy to use. * @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer. + * @param options Transform stream options */ export const accumulator = baseFunctions.accumulator; @@ -187,10 +188,11 @@ export const accumulator = baseFunctions.accumulator; * 2. Rolling * - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is * then pushed into the buffer. - * @param batchRate Desired rate of data transfer to next stream. * @param flushStrategy Buffering strategy to use. * @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into * or items need to be cleared from buffer. + * @param batchRate Desired rate of data transfer to next stream. + * @param options Transform stream options */ export const accumulatorBy = baseFunctions.accumulatorBy; diff --git a/tests/accumulator.spec.ts b/tests/accumulator.spec.ts index 71fdb1f..2ee455c 100644 --- a/tests/accumulator.spec.ts +++ b/tests/accumulator.spec.ts @@ -18,7 +18,7 @@ test.cb("accumulator() rolling", t => { const flushes = [firstFlush, secondFlush, thirdFlush]; source - .pipe(accumulator(2, undefined, FlushStrategy.rolling)) + .pipe(accumulator(FlushStrategy.rolling, 2)) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -51,7 +51,7 @@ test.cb("accumulator() rolling with key", t => { const flushes = [firstFlush, secondFlush]; source - .pipe(accumulator(3, undefined, FlushStrategy.rolling, "ts")) + .pipe(accumulator(FlushStrategy.rolling, 3, "ts")) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -77,9 +77,8 @@ test.cb( } const source = new Readable({ objectMode: true }); const accumulatorStream = accumulator( - 3, - undefined, FlushStrategy.rolling, + 3, "nonExistingKey", ); const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; @@ -119,12 +118,7 @@ test.cb( key: string; } const source = new Readable({ objectMode: true }); - const accumulatorStream = accumulator( - 3, - undefined, - FlushStrategy.rolling, - "ts", - ); + const accumulatorStream = accumulator(FlushStrategy.rolling, 3, "ts"); const input = [ { ts: 0, key: "a" }, { ts: 1, key: "b" }, @@ -193,7 +187,7 @@ test.cb("accumulator() sliding", t => { const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush]; source - .pipe(accumulator(3, undefined, FlushStrategy.sliding)) + .pipe(accumulator(FlushStrategy.sliding, 3)) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -248,7 +242,7 @@ test.cb("accumulator() sliding with key", t => { sixthFlush, ]; source - .pipe(accumulator(3, undefined, FlushStrategy.sliding, "ts")) + .pipe(accumulator(FlushStrategy.sliding, 3, "ts")) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -274,9 +268,8 @@ test.cb( } const source = new Readable({ objectMode: true }); const accumulatorStream = accumulator( - 3, - undefined, FlushStrategy.sliding, + 3, "nonExistingKey", ); const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; @@ -315,12 +308,7 @@ test.cb( key: string; } const source = new Readable({ objectMode: true }); - const accumulatorStream = accumulator( - 3, - undefined, - FlushStrategy.sliding, - "ts", - ); + const accumulatorStream = accumulator(FlushStrategy.sliding, 3, "ts"); const input = [ { ts: 0, key: "a" }, { key: "b" }, @@ -386,7 +374,6 @@ test.cb("accumulatorBy() rolling", t => { source .pipe( accumulatorBy( - undefined, FlushStrategy.rolling, (event: TestObject, bufferChunk: TestObject) => { return bufferChunk.ts + 3 <= event.ts; @@ -422,7 +409,6 @@ test.cb.skip( { ts: 2, key: "c" }, ]; const accumulaterStream = accumulatorBy( - undefined, FlushStrategy.rolling, (event: TestObject, bufferChunk: TestObject) => { if (event.key !== "a") { @@ -490,7 +476,6 @@ test.cb("accumulatorBy() sliding", t => { source .pipe( accumulatorBy( - undefined, FlushStrategy.sliding, (event: TestObject, bufferChunk: TestObject) => { return bufferChunk.ts + 3 <= event.ts ? true : false; @@ -526,7 +511,6 @@ test.cb.skip( { ts: 2, key: "c" }, ]; const accumulaterStream = accumulatorBy( - undefined, FlushStrategy.sliding, (event: TestObject, bufferChunk: TestObject) => { if (event.key !== "a") { diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 2d43ec1..370e117 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -379,6 +379,7 @@ test.cb.only( const construct = (destKey: string) => { const first = map( (chunk: Chunk) => { + console.log("1: ", chunk); chunk.mapped.push(1); return chunk; }, @@ -387,7 +388,9 @@ test.cb.only( const second = map( async (chunk: Chunk) => { + console.log("2: ", chunk); await sleep(slowProcessorSpeed); + console.log("2 done ", chunk); chunk.mapped.push(2); return chunk; }, @@ -408,6 +411,7 @@ test.cb.only( // This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event) _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); + console.log(performance.now() - start); expect(performance.now() - start).to.be.greaterThan( slowProcessorSpeed * (input.length - 2), ); @@ -427,7 +431,7 @@ test.cb.only( const start = performance.now(); input.forEach(item => { - _demux.write(item); + console.log(_demux.write(item)); }); }, ); @@ -457,6 +461,7 @@ test.cb( const construct = (destKey: string) => { const first = map( (chunk: Chunk) => { + console.log("1: ", chunk); chunk.mapped.push(1); return chunk; }, @@ -464,6 +469,7 @@ test.cb( ); const second = map( (chunk: Chunk) => { + console.log("2: ", chunk); chunk.mapped.push(2); return chunk; }, @@ -472,7 +478,9 @@ test.cb( const third = map( async (chunk: Chunk) => { + console.log("3: ", chunk); await sleep(slowProcessorSpeed); + console.log(" 3 done ", chunk); chunk.mapped.push(3); return chunk; }, @@ -496,6 +504,7 @@ test.cb( // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event) _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); + console.log(performance.now() - start); expect(performance.now() - start).to.be.greaterThan( slowProcessorSpeed * (input.length - 4), ); @@ -515,7 +524,7 @@ test.cb( const start = performance.now(); input.forEach(item => { - _demux.write(item); + console.log(_demux.write(item)); }); }, );