diff --git a/src/functions/batch.ts b/src/functions/batch.ts index 76551eb..4b56b4c 100644 --- a/src/functions/batch.ts +++ b/src/functions/batch.ts @@ -4,7 +4,7 @@ import { TransformOptions } from "./baseDefinitions"; * Stores chunks of data internally in array and batches when batchSize is reached. * * @param batchSize Size of the batches - * @param maxBatchAge Max lifetime of a batch + * @param maxBatchAge Max lifetime of a batch in seconds */ export function batch( batchSize: number = 1000, diff --git a/src/functions/index.ts b/src/functions/index.ts index 426d03d..8bdbf75 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -202,17 +202,17 @@ export function batch(batchSize: number, maxBatchAge?: number): Transform { } /** - * Unbatches and sends individual chunks of data + * Unbatches and sends individual chunks of data. */ export function unbatch(): Transform { return baseFunctions.unbatch(); } /** - * Limits date of data transferred into stream. + * Limits rate of data transferred into stream. * @param options? - * @param targetRate? Desired rate in ms - * @param period? Period to sleep for when rate is above or equal to targetRate + * @param targetRate? Desired rate in ms. + * @param period? Period to sleep for when rate is above or equal to targetRate. */ export function rate(targetRate?: number, period?: number): Transform { return baseFunctions.rate(targetRate, period); @@ -221,7 +221,7 @@ export function rate(targetRate?: number, period?: number): Transform { /** * Limits number of parallel processes in flight. * @param parallel Max number of parallel processes. - * @param func Function to execute on each data chunk + * @param func Function to execute on each data chunk. * @param pause Amount of time to pause processing when max number of parallel processes are executing. */ export function parallelMap( @@ -232,6 +232,26 @@ export function parallelMap( return baseFunctions.parallelMap(mapper, parallel, sleepTime); } +/** + * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items + * in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies: + * 1. Sliding + * - If the buffer is larger than the batchSize, the front of the buffer is popped to maintain + * the batchSize. When no key is provided, the batchSize is effectively the buffer length. When + * a key is provided, the batchSize is based on the value at that key. For example, given a key + * of `timestamp` and a batchSize of 3000, each item in the buffer will be guaranteed to be + * within 3000 timestamp units from the first element. This means that with a key, multiple elements + * may be spliced off the front of the buffer. The buffer is then pushed into the stream. + * 2. Rolling + * - If the buffer is larger than the batchSize, the buffer is cleared and pushed into the stream. + * When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize + * is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000, + * each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element. + * @param batchSize Size of the batch (in units of buffer length or value at key). + * @param batchRate Desired rate of data transfer to next stream. + * @param flushStrategy Buffering strategy to use. + * @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer. + */ export function accumulator( batchSize: number, batchRate: number | undefined, @@ -246,6 +266,20 @@ export function accumulator( ); } +/** + * Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items + * in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies: + * 1. Sliding + * - If the iteratee returns false, the front of the buffer is popped until iteratee returns true. The + * item is pushed into the buffer and buffer is pushed into stream. + * 2. Rolling + * - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is + * then pushed into the buffer. + * @param batchRate Desired rate of data transfer to next stream. + * @param flushStrategy Buffering strategy to use. + * @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into + * or items need to be cleared from buffer. + */ export function accumulatorBy( batchRate: number | undefined, flushStrategy: S, diff --git a/tests/accumulator.spec.ts b/tests/accumulator.spec.ts index c5a5ae3..523455a 100644 --- a/tests/accumulator.spec.ts +++ b/tests/accumulator.spec.ts @@ -26,9 +26,7 @@ test.cb("accumulator() rolling", t => { .on("error", (e: any) => { t.end(e); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); [...firstFlush, ...secondFlush, ...thirdFlush].forEach(item => { source.push(item); }); @@ -61,9 +59,7 @@ test.cb("accumulator() rolling with key", t => { .on("error", (e: any) => { t.end(e); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); [...firstFlush, ...secondFlush].forEach(item => { source.push(item); }); @@ -105,9 +101,7 @@ test.cb( index++; t.pass(); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); }); @@ -162,9 +156,7 @@ test.cb( ); t.pass(); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); }); @@ -209,9 +201,7 @@ test.cb("accumulator() sliding", t => { .on("error", (e: any) => { t.end(e); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); }); @@ -266,9 +256,7 @@ test.cb("accumulator() sliding with key", t => { .on("error", (e: any) => { t.end(e); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); }); @@ -309,9 +297,7 @@ test.cb( index++; t.pass(); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); }); @@ -372,9 +358,7 @@ test.cb( ); t.pass(); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); }); @@ -416,9 +400,7 @@ test.cb("accumulatorBy() rolling", t => { .on("error", (e: any) => { t.end(e); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); [...firstFlush, ...secondFlush].forEach(item => { source.push(item); }); @@ -457,9 +439,7 @@ test.cb( expect(err.message).to.equal("Failed mapping"); t.pass(); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); @@ -524,9 +504,7 @@ test.cb("accumulatorBy() sliding", t => { .on("error", (e: any) => { t.end(e); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); }); @@ -565,9 +543,7 @@ test.cb( expect(err.message).to.equal("Failed mapping"); t.pass(); }) - .on("end", () => { - t.end(); - }); + .on("end", t.end); input.forEach(item => { source.push(item); diff --git a/tests/batch.spec.ts b/tests/batch.spec.ts index bccd1b6..0c2cd3a 100644 --- a/tests/batch.spec.ts +++ b/tests/batch.spec.ts @@ -11,8 +11,7 @@ test.cb("batch() batches chunks together", t => { source .pipe(batch(3)) .on("data", (element: string[]) => { - expect(element).to.deep.equal(expectedElements[i]); - t.pass(); + t.deepEqual(element, expectedElements[i]); i++; }) .on("error", t.end) @@ -39,8 +38,7 @@ test.cb("batch() yields a batch after the timeout", t => { source .pipe(batch(3)) .on("data", (element: string[]) => { - expect(element).to.deep.equal(expectedElements[i]); - t.pass(); + t.deepEqual(element, expectedElements[i]); i++; }) .on("error", t.fail) diff --git a/tsconfig.json b/tsconfig.json index c9faf21..fabdff4 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -9,15 +9,11 @@ "suppressImplicitAnyIndexErrors": true, "outDir": "./dist", "module": "commonjs" - }, - "target": "es5", - "lib": [ - "es2016" - ], - "sourceMap": true, - "declaration": true }, - "include": [ - "src/**/*.ts" - ] + "target": "es5", + "lib": [ + "es2016" + ], + "sourceMap": true, + "declaration": true }