Update interface

This commit is contained in:
Jerry Kurian 2019-09-13 08:57:19 -04:00
parent 158475183a
commit 70edee51c4
3 changed files with 30 additions and 25 deletions

View File

@ -1,18 +1,11 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { import { AccumulatorByIteratee, FlushStrategy } from "./baseDefinitions";
AccumulatorByIteratee,
FlushStrategy,
TransformOptions,
} from "./baseDefinitions";
import { batch } from "."; import { batch } from ".";
function _accumulator<T>( function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void, accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
shouldFlush: boolean = true, shouldFlush: boolean = true,
options: TransformOptions = { options: TransformOptions = {},
readableObjectMode: true,
writableObjectMode: true,
},
) { ) {
const buffer: T[] = []; const buffer: T[] = [];
return new Transform({ return new Transform({
@ -128,10 +121,7 @@ export function accumulator(
flushStrategy: FlushStrategy, flushStrategy: FlushStrategy,
batchSize: number, batchSize: number,
keyBy?: string, keyBy?: string,
options: TransformOptions = { options?: TransformOptions,
readableObjectMode: true,
writableObjectMode: true,
},
): Transform { ): Transform {
if (flushStrategy === FlushStrategy.sliding) { if (flushStrategy === FlushStrategy.sliding) {
return sliding(batchSize, keyBy, options); return sliding(batchSize, keyBy, options);
@ -145,10 +135,7 @@ export function accumulator(
export function accumulatorBy<T, S extends FlushStrategy>( export function accumulatorBy<T, S extends FlushStrategy>(
flushStrategy: S, flushStrategy: S,
iteratee: AccumulatorByIteratee<T>, iteratee: AccumulatorByIteratee<T>,
options: TransformOptions = { options?: TransformOptions,
readableObjectMode: true,
writableObjectMode: true,
},
): Transform { ): Transform {
if (flushStrategy === FlushStrategy.sliding) { if (flushStrategy === FlushStrategy.sliding) {
return slidingBy(iteratee, options); return slidingBy(iteratee, options);

View File

@ -43,7 +43,7 @@ class Demux extends Writable {
destKey?: string, destKey?: string,
) => NodeJS.WritableStream | NodeJS.ReadWriteStream, ) => NodeJS.WritableStream | NodeJS.ReadWriteStream,
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options?: WritableOptions, options: WritableOptions = {},
) { ) {
super(options); super(options);
this.demuxer = this.demuxer =

View File

@ -19,7 +19,11 @@ test.cb("accumulator() rolling", t => {
const flushes = [firstFlush, secondFlush, thirdFlush]; const flushes = [firstFlush, secondFlush, thirdFlush];
source source
.pipe(accumulator(FlushStrategy.rolling, 2)) .pipe(
accumulator(FlushStrategy.rolling, 2, undefined, {
objectMode: true,
}),
)
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]); t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++; chunkIndex++;
@ -52,7 +56,7 @@ test.cb("accumulator() rolling with key", t => {
const flushes = [firstFlush, secondFlush]; const flushes = [firstFlush, secondFlush];
source source
.pipe(accumulator(FlushStrategy.rolling, 3, "ts")) .pipe(accumulator(FlushStrategy.rolling, 3, "ts", { objectMode: true }))
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]); t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++; chunkIndex++;
@ -81,6 +85,7 @@ test.cb(
FlushStrategy.rolling, FlushStrategy.rolling,
3, 3,
"nonExistingKey", "nonExistingKey",
{ objectMode: true },
); );
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
@ -119,7 +124,9 @@ test.cb(
key: string; key: string;
} }
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(FlushStrategy.rolling, 3, "ts"); const accumulatorStream = accumulator(FlushStrategy.rolling, 3, "ts", {
objectMode: true,
});
const input = [ const input = [
{ ts: 0, key: "a" }, { ts: 0, key: "a" },
{ ts: 1, key: "b" }, { ts: 1, key: "b" },
@ -188,7 +195,11 @@ test.cb("accumulator() sliding", t => {
const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush]; const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush];
source source
.pipe(accumulator(FlushStrategy.sliding, 3)) .pipe(
accumulator(FlushStrategy.sliding, 3, undefined, {
objectMode: true,
}),
)
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]); t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++; chunkIndex++;
@ -243,7 +254,7 @@ test.cb("accumulator() sliding with key", t => {
sixthFlush, sixthFlush,
]; ];
source source
.pipe(accumulator(FlushStrategy.sliding, 3, "ts")) .pipe(accumulator(FlushStrategy.sliding, 3, "ts", { objectMode: true }))
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]); t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++; chunkIndex++;
@ -272,6 +283,7 @@ test.cb(
FlushStrategy.sliding, FlushStrategy.sliding,
3, 3,
"nonExistingKey", "nonExistingKey",
{ objectMode: true },
); );
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
@ -309,7 +321,9 @@ test.cb(
key: string; key: string;
} }
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(FlushStrategy.sliding, 3, "ts"); const accumulatorStream = accumulator(FlushStrategy.sliding, 3, "ts", {
objectMode: true,
});
const input = [ const input = [
{ ts: 0, key: "a" }, { ts: 0, key: "a" },
{ key: "b" }, { key: "b" },
@ -379,6 +393,7 @@ test.cb("accumulatorBy() rolling", t => {
(event: TestObject, bufferChunk: TestObject) => { (event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts; return bufferChunk.ts + 3 <= event.ts;
}, },
{ objectMode: true },
), ),
) )
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
@ -417,6 +432,7 @@ test.cb.skip(
} }
return bufferChunk.ts + 3 <= event.ts; return bufferChunk.ts + 3 <= event.ts;
}, },
{ objectMode: true },
); );
source source
.pipe(accumulaterStream) .pipe(accumulaterStream)
@ -481,6 +497,7 @@ test.cb("accumulatorBy() sliding", t => {
(event: TestObject, bufferChunk: TestObject) => { (event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts ? true : false; return bufferChunk.ts + 3 <= event.ts ? true : false;
}, },
{ objectMode: true },
), ),
) )
.on("data", (flush: TestObject[]) => { .on("data", (flush: TestObject[]) => {
@ -519,6 +536,7 @@ test.cb.skip(
} }
return bufferChunk.ts + 3 <= event.ts ? true : false; return bufferChunk.ts + 3 <= event.ts ? true : false;
}, },
{ objectMode: true },
); );
source source
.pipe(accumulaterStream) .pipe(accumulaterStream)