From a11aa10d166d3a35a210cf886e56a18d69120f31 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 26 Sep 2019 09:23:09 -0400 Subject: [PATCH] Clean up --- src/functions/accumulator.ts | 8 +++++++- src/functions/baseDefinitions.ts | 15 --------------- src/functions/batch.ts | 6 ++---- src/functions/collect.ts | 7 ++----- src/functions/flatMap.ts | 6 ++---- src/functions/map.ts | 8 ++------ src/functions/parallelMap.ts | 6 ++---- src/functions/rate.ts | 6 ++---- src/functions/reduce.ts | 9 +++------ src/functions/unbatch.ts | 6 ++---- tests/accumulator.spec.ts | 2 +- tests/demux.spec.ts | 13 +++++++++++-- 12 files changed, 36 insertions(+), 56 deletions(-) diff --git a/src/functions/accumulator.ts b/src/functions/accumulator.ts index 020d900..82ca9ae 100644 --- a/src/functions/accumulator.ts +++ b/src/functions/accumulator.ts @@ -1,7 +1,13 @@ import { Transform, TransformOptions } from "stream"; -import { AccumulatorByIteratee, FlushStrategy } from "./baseDefinitions"; import { batch } from "."; +export enum FlushStrategy { + rolling = "rolling", + sliding = "sliding", +} + +export type AccumulatorByIteratee = (event: T, bufferChunk: T) => boolean; + function _accumulator( accumulateBy: (data: T, buffer: T[], stream: Transform) => void, shouldFlush: boolean = true, diff --git a/src/functions/baseDefinitions.ts b/src/functions/baseDefinitions.ts index c3f5461..b02dd10 100644 --- a/src/functions/baseDefinitions.ts +++ b/src/functions/baseDefinitions.ts @@ -1,12 +1,3 @@ -export interface ThroughOptions { - objectMode?: boolean; -} - -export interface TransformOptions { - readableObjectMode?: boolean; - writableObjectMode?: boolean; -} - export interface WithEncoding { encoding: string; } @@ -21,9 +12,3 @@ export type JsonValue = JsonPrimitive | JsonPrimitive[]; export interface JsonParseOptions { pretty: boolean; } -export enum FlushStrategy { - rolling = "rolling", - sliding = "sliding", -} - -export type AccumulatorByIteratee = (event: T, bufferChunk: T) => boolean; diff --git a/src/functions/batch.ts b/src/functions/batch.ts index 0d0f314..e9f8915 100644 --- a/src/functions/batch.ts +++ b/src/functions/batch.ts @@ -1,12 +1,10 @@ -import { Transform } from "stream"; -import { TransformOptions } from "./baseDefinitions"; +import { Transform, TransformOptions } from "stream"; export function batch( batchSize: number = 1000, maxBatchAge: number = 500, options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, + objectMode: true, }, ): Transform { let buffer: any[] = []; diff --git a/src/functions/collect.ts b/src/functions/collect.ts index 38cd6ea..3c081bb 100644 --- a/src/functions/collect.ts +++ b/src/functions/collect.ts @@ -1,9 +1,6 @@ -import { Transform } from "stream"; -import { ThroughOptions } from "./baseDefinitions"; +import { Transform, TransformOptions } from "stream"; -export function collect( - options: ThroughOptions = { objectMode: false }, -): Transform { +export function collect(options: TransformOptions = {}): Transform { const collected: any[] = []; return new Transform({ ...options, diff --git a/src/functions/flatMap.ts b/src/functions/flatMap.ts index 2abb726..dd7820d 100644 --- a/src/functions/flatMap.ts +++ b/src/functions/flatMap.ts @@ -1,13 +1,11 @@ -import { Transform } from "stream"; -import { TransformOptions } from "./baseDefinitions"; +import { Transform, TransformOptions } from "stream"; export function flatMap( mapper: | ((chunk: T, encoding: string) => R[]) | ((chunk: T, encoding: string) => Promise), options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, + objectMode: true, }, ): Transform { return new Transform({ diff --git a/src/functions/map.ts b/src/functions/map.ts index 589f0a9..38d6a59 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -1,12 +1,8 @@ -import { Transform } from "stream"; -import { TransformOptions } from "./baseDefinitions"; +import { Transform, TransformOptions } from "stream"; export function map( mapper: (chunk: T, encoding: string) => R, - options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, - }, + options: TransformOptions = { objectMode: true }, ): Transform { return new Transform({ ...options, diff --git a/src/functions/parallelMap.ts b/src/functions/parallelMap.ts index 6bc6b79..8cb3e80 100644 --- a/src/functions/parallelMap.ts +++ b/src/functions/parallelMap.ts @@ -1,14 +1,12 @@ -import { Transform } from "stream"; +import { Transform, TransformOptions } from "stream"; import { sleep } from "../helpers"; -import { TransformOptions } from "./baseDefinitions"; export function parallelMap( mapper: (data: T) => R, parallel: number = 10, sleepTime: number = 5, options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, + objectMode: true, }, ) { let inflight = 0; diff --git a/src/functions/rate.ts b/src/functions/rate.ts index cb5cbfb..083d854 100644 --- a/src/functions/rate.ts +++ b/src/functions/rate.ts @@ -1,14 +1,12 @@ -import { Transform } from "stream"; +import { Transform, TransformOptions } from "stream"; import { performance } from "perf_hooks"; import { sleep } from "../helpers"; -import { TransformOptions } from "./baseDefinitions"; export function rate( targetRate: number = 50, period: number = 1, options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, + objectMode: true, }, ): Transform { const deltaMS = ((1 / targetRate) * 1000) / period; // Skip a full period diff --git a/src/functions/reduce.ts b/src/functions/reduce.ts index ff76025..9f19ca4 100644 --- a/src/functions/reduce.ts +++ b/src/functions/reduce.ts @@ -1,5 +1,4 @@ -import { Transform } from "stream"; -import { TransformOptions } from "./baseDefinitions"; +import { Transform, TransformOptions } from "stream"; export function reduce( iteratee: @@ -7,14 +6,12 @@ export function reduce( | ((previousValue: R, chunk: T, encoding: string) => Promise), initialValue: R, options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, + objectMode: true, }, ) { let value = initialValue; return new Transform({ - readableObjectMode: options.readableObjectMode, - writableObjectMode: options.writableObjectMode, + ...options, async transform(chunk: T, encoding, callback) { value = await iteratee(value, chunk, encoding); callback(); diff --git a/src/functions/unbatch.ts b/src/functions/unbatch.ts index 0f9b3f6..93d6bfe 100644 --- a/src/functions/unbatch.ts +++ b/src/functions/unbatch.ts @@ -1,10 +1,8 @@ -import { Transform } from "stream"; -import { TransformOptions } from "./baseDefinitions"; +import { Transform, TransformOptions } from "stream"; export function unbatch( options: TransformOptions = { - readableObjectMode: true, - writableObjectMode: true, + objectMode: true, }, ) { return new Transform({ diff --git a/tests/accumulator.spec.ts b/tests/accumulator.spec.ts index 9e08c74..51d6098 100644 --- a/tests/accumulator.spec.ts +++ b/tests/accumulator.spec.ts @@ -2,7 +2,7 @@ import test from "ava"; import { expect } from "chai"; import { Readable } from "stream"; import { accumulator, accumulatorBy } from "../src"; -import { FlushStrategy } from "../src/functions/baseDefinitions"; +import { FlushStrategy } from "../src/functions/accumulator"; import { performance } from "perf_hooks"; test.cb("accumulator() rolling", t => { diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 2d43ec1..370e117 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -379,6 +379,7 @@ test.cb.only( const construct = (destKey: string) => { const first = map( (chunk: Chunk) => { + console.log("1: ", chunk); chunk.mapped.push(1); return chunk; }, @@ -387,7 +388,9 @@ test.cb.only( const second = map( async (chunk: Chunk) => { + console.log("2: ", chunk); await sleep(slowProcessorSpeed); + console.log("2 done ", chunk); chunk.mapped.push(2); return chunk; }, @@ -408,6 +411,7 @@ test.cb.only( // This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event) _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); + console.log(performance.now() - start); expect(performance.now() - start).to.be.greaterThan( slowProcessorSpeed * (input.length - 2), ); @@ -427,7 +431,7 @@ test.cb.only( const start = performance.now(); input.forEach(item => { - _demux.write(item); + console.log(_demux.write(item)); }); }, ); @@ -457,6 +461,7 @@ test.cb( const construct = (destKey: string) => { const first = map( (chunk: Chunk) => { + console.log("1: ", chunk); chunk.mapped.push(1); return chunk; }, @@ -464,6 +469,7 @@ test.cb( ); const second = map( (chunk: Chunk) => { + console.log("2: ", chunk); chunk.mapped.push(2); return chunk; }, @@ -472,7 +478,9 @@ test.cb( const third = map( async (chunk: Chunk) => { + console.log("3: ", chunk); await sleep(slowProcessorSpeed); + console.log(" 3 done ", chunk); chunk.mapped.push(3); return chunk; }, @@ -496,6 +504,7 @@ test.cb( // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event) _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); + console.log(performance.now() - start); expect(performance.now() - start).to.be.greaterThan( slowProcessorSpeed * (input.length - 4), ); @@ -515,7 +524,7 @@ test.cb( const start = performance.now(); input.forEach(item => { - _demux.write(item); + console.log(_demux.write(item)); }); }, );