From 70edee51c4a7d8f968181098fc98562dbe7d8c6f Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Fri, 13 Sep 2019 08:57:19 -0400 Subject: [PATCH] Update interface --- src/functions/accumulator.ts | 23 +++++------------------ src/functions/demux.ts | 2 +- tests/accumulator.spec.ts | 30 ++++++++++++++++++++++++------ 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/functions/accumulator.ts b/src/functions/accumulator.ts index 3176304..020d900 100644 --- a/src/functions/accumulator.ts +++ b/src/functions/accumulator.ts @@ -1,18 +1,11 @@ -import { Transform } from "stream"; -import { - AccumulatorByIteratee, - FlushStrategy, - TransformOptions, -} from "./baseDefinitions"; +import { Transform, TransformOptions } from "stream"; +import { AccumulatorByIteratee, FlushStrategy } from "./baseDefinitions"; import { batch } from "."; function _accumulator( accumulateBy: (data: T, buffer: T[], stream: Transform) => void, shouldFlush: boolean = true, - options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, - }, + options: TransformOptions = {}, ) { const buffer: T[] = []; return new Transform({ @@ -128,10 +121,7 @@ export function accumulator( flushStrategy: FlushStrategy, batchSize: number, keyBy?: string, - options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, - }, + options?: TransformOptions, ): Transform { if (flushStrategy === FlushStrategy.sliding) { return sliding(batchSize, keyBy, options); @@ -145,10 +135,7 @@ export function accumulator( export function accumulatorBy( flushStrategy: S, iteratee: AccumulatorByIteratee, - options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, - }, + options?: TransformOptions, ): Transform { if (flushStrategy === FlushStrategy.sliding) { return slidingBy(iteratee, options); diff --git a/src/functions/demux.ts b/src/functions/demux.ts index 4b6312a..98a1225 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -43,7 +43,7 @@ class Demux extends Writable { destKey?: string, ) => NodeJS.WritableStream | NodeJS.ReadWriteStream, demuxBy: string | ((chunk: any) => string), - options?: WritableOptions, + options: WritableOptions = {}, ) { super(options); this.demuxer = diff --git a/tests/accumulator.spec.ts b/tests/accumulator.spec.ts index feac408..9e08c74 100644 --- a/tests/accumulator.spec.ts +++ b/tests/accumulator.spec.ts @@ -19,7 +19,11 @@ test.cb("accumulator() rolling", t => { const flushes = [firstFlush, secondFlush, thirdFlush]; source - .pipe(accumulator(FlushStrategy.rolling, 2)) + .pipe( + accumulator(FlushStrategy.rolling, 2, undefined, { + objectMode: true, + }), + ) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -52,7 +56,7 @@ test.cb("accumulator() rolling with key", t => { const flushes = [firstFlush, secondFlush]; source - .pipe(accumulator(FlushStrategy.rolling, 3, "ts")) + .pipe(accumulator(FlushStrategy.rolling, 3, "ts", { objectMode: true })) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -81,6 +85,7 @@ test.cb( FlushStrategy.rolling, 3, "nonExistingKey", + { objectMode: true }, ); const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; @@ -119,7 +124,9 @@ test.cb( key: string; } const source = new Readable({ objectMode: true }); - const accumulatorStream = accumulator(FlushStrategy.rolling, 3, "ts"); + const accumulatorStream = accumulator(FlushStrategy.rolling, 3, "ts", { + objectMode: true, + }); const input = [ { ts: 0, key: "a" }, { ts: 1, key: "b" }, @@ -188,7 +195,11 @@ test.cb("accumulator() sliding", t => { const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush]; source - .pipe(accumulator(FlushStrategy.sliding, 3)) + .pipe( + accumulator(FlushStrategy.sliding, 3, undefined, { + objectMode: true, + }), + ) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -243,7 +254,7 @@ test.cb("accumulator() sliding with key", t => { sixthFlush, ]; source - .pipe(accumulator(FlushStrategy.sliding, 3, "ts")) + .pipe(accumulator(FlushStrategy.sliding, 3, "ts", { objectMode: true })) .on("data", (flush: TestObject[]) => { t.deepEqual(flush, flushes[chunkIndex]); chunkIndex++; @@ -272,6 +283,7 @@ test.cb( FlushStrategy.sliding, 3, "nonExistingKey", + { objectMode: true }, ); const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; @@ -309,7 +321,9 @@ test.cb( key: string; } const source = new Readable({ objectMode: true }); - const accumulatorStream = accumulator(FlushStrategy.sliding, 3, "ts"); + const accumulatorStream = accumulator(FlushStrategy.sliding, 3, "ts", { + objectMode: true, + }); const input = [ { ts: 0, key: "a" }, { key: "b" }, @@ -379,6 +393,7 @@ test.cb("accumulatorBy() rolling", t => { (event: TestObject, bufferChunk: TestObject) => { return bufferChunk.ts + 3 <= event.ts; }, + { objectMode: true }, ), ) .on("data", (flush: TestObject[]) => { @@ -417,6 +432,7 @@ test.cb.skip( } return bufferChunk.ts + 3 <= event.ts; }, + { objectMode: true }, ); source .pipe(accumulaterStream) @@ -481,6 +497,7 @@ test.cb("accumulatorBy() sliding", t => { (event: TestObject, bufferChunk: TestObject) => { return bufferChunk.ts + 3 <= event.ts ? true : false; }, + { objectMode: true }, ), ) .on("data", (flush: TestObject[]) => { @@ -519,6 +536,7 @@ test.cb.skip( } return bufferChunk.ts + 3 <= event.ts ? true : false; }, + { objectMode: true }, ); source .pipe(accumulaterStream)