This commit is contained in:
Jerry Kurian 2019-09-26 09:23:09 -04:00
parent 70edee51c4
commit a11aa10d16
12 changed files with 36 additions and 56 deletions

View File

@ -1,7 +1,13 @@
import { Transform, TransformOptions } from "stream"; import { Transform, TransformOptions } from "stream";
import { AccumulatorByIteratee, FlushStrategy } from "./baseDefinitions";
import { batch } from "."; import { batch } from ".";
export enum FlushStrategy {
rolling = "rolling",
sliding = "sliding",
}
export type AccumulatorByIteratee<T> = (event: T, bufferChunk: T) => boolean;
function _accumulator<T>( function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void, accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
shouldFlush: boolean = true, shouldFlush: boolean = true,

View File

@ -1,12 +1,3 @@
export interface ThroughOptions {
objectMode?: boolean;
}
export interface TransformOptions {
readableObjectMode?: boolean;
writableObjectMode?: boolean;
}
export interface WithEncoding { export interface WithEncoding {
encoding: string; encoding: string;
} }
@ -21,9 +12,3 @@ export type JsonValue = JsonPrimitive | JsonPrimitive[];
export interface JsonParseOptions { export interface JsonParseOptions {
pretty: boolean; pretty: boolean;
} }
export enum FlushStrategy {
rolling = "rolling",
sliding = "sliding",
}
export type AccumulatorByIteratee<T> = (event: T, bufferChunk: T) => boolean;

View File

@ -1,12 +1,10 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { TransformOptions } from "./baseDefinitions";
export function batch( export function batch(
batchSize: number = 1000, batchSize: number = 1000,
maxBatchAge: number = 500, maxBatchAge: number = 500,
options: TransformOptions = { options: TransformOptions = {
readableObjectMode: true, objectMode: true,
writableObjectMode: true,
}, },
): Transform { ): Transform {
let buffer: any[] = []; let buffer: any[] = [];

View File

@ -1,9 +1,6 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { ThroughOptions } from "./baseDefinitions";
export function collect( export function collect(options: TransformOptions = {}): Transform {
options: ThroughOptions = { objectMode: false },
): Transform {
const collected: any[] = []; const collected: any[] = [];
return new Transform({ return new Transform({
...options, ...options,

View File

@ -1,13 +1,11 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { TransformOptions } from "./baseDefinitions";
export function flatMap<T, R>( export function flatMap<T, R>(
mapper: mapper:
| ((chunk: T, encoding: string) => R[]) | ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>), | ((chunk: T, encoding: string) => Promise<R[]>),
options: TransformOptions = { options: TransformOptions = {
readableObjectMode: true, objectMode: true,
writableObjectMode: true,
}, },
): Transform { ): Transform {
return new Transform({ return new Transform({

View File

@ -1,12 +1,8 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { TransformOptions } from "./baseDefinitions";
export function map<T, R>( export function map<T, R>(
mapper: (chunk: T, encoding: string) => R, mapper: (chunk: T, encoding: string) => R,
options: TransformOptions = { options: TransformOptions = { objectMode: true },
readableObjectMode: true,
writableObjectMode: true,
},
): Transform { ): Transform {
return new Transform({ return new Transform({
...options, ...options,

View File

@ -1,14 +1,12 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { sleep } from "../helpers"; import { sleep } from "../helpers";
import { TransformOptions } from "./baseDefinitions";
export function parallelMap<T, R>( export function parallelMap<T, R>(
mapper: (data: T) => R, mapper: (data: T) => R,
parallel: number = 10, parallel: number = 10,
sleepTime: number = 5, sleepTime: number = 5,
options: TransformOptions = { options: TransformOptions = {
readableObjectMode: true, objectMode: true,
writableObjectMode: true,
}, },
) { ) {
let inflight = 0; let inflight = 0;

View File

@ -1,14 +1,12 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { performance } from "perf_hooks"; import { performance } from "perf_hooks";
import { sleep } from "../helpers"; import { sleep } from "../helpers";
import { TransformOptions } from "./baseDefinitions";
export function rate( export function rate(
targetRate: number = 50, targetRate: number = 50,
period: number = 1, period: number = 1,
options: TransformOptions = { options: TransformOptions = {
readableObjectMode: true, objectMode: true,
writableObjectMode: true,
}, },
): Transform { ): Transform {
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip a full period const deltaMS = ((1 / targetRate) * 1000) / period; // Skip a full period

View File

@ -1,5 +1,4 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { TransformOptions } from "./baseDefinitions";
export function reduce<T, R>( export function reduce<T, R>(
iteratee: iteratee:
@ -7,14 +6,12 @@ export function reduce<T, R>(
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>), | ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R, initialValue: R,
options: TransformOptions = { options: TransformOptions = {
readableObjectMode: true, objectMode: true,
writableObjectMode: true,
}, },
) { ) {
let value = initialValue; let value = initialValue;
return new Transform({ return new Transform({
readableObjectMode: options.readableObjectMode, ...options,
writableObjectMode: options.writableObjectMode,
async transform(chunk: T, encoding, callback) { async transform(chunk: T, encoding, callback) {
value = await iteratee(value, chunk, encoding); value = await iteratee(value, chunk, encoding);
callback(); callback();

View File

@ -1,10 +1,8 @@
import { Transform } from "stream"; import { Transform, TransformOptions } from "stream";
import { TransformOptions } from "./baseDefinitions";
export function unbatch( export function unbatch(
options: TransformOptions = { options: TransformOptions = {
readableObjectMode: true, objectMode: true,
writableObjectMode: true,
}, },
) { ) {
return new Transform({ return new Transform({

View File

@ -2,7 +2,7 @@ import test from "ava";
import { expect } from "chai"; import { expect } from "chai";
import { Readable } from "stream"; import { Readable } from "stream";
import { accumulator, accumulatorBy } from "../src"; import { accumulator, accumulatorBy } from "../src";
import { FlushStrategy } from "../src/functions/baseDefinitions"; import { FlushStrategy } from "../src/functions/accumulator";
import { performance } from "perf_hooks"; import { performance } from "perf_hooks";
test.cb("accumulator() rolling", t => { test.cb("accumulator() rolling", t => {

View File

@ -379,6 +379,7 @@ test.cb.only(
const construct = (destKey: string) => { const construct = (destKey: string) => {
const first = map( const first = map(
(chunk: Chunk) => { (chunk: Chunk) => {
console.log("1: ", chunk);
chunk.mapped.push(1); chunk.mapped.push(1);
return chunk; return chunk;
}, },
@ -387,7 +388,9 @@ test.cb.only(
const second = map( const second = map(
async (chunk: Chunk) => { async (chunk: Chunk) => {
console.log("2: ", chunk);
await sleep(slowProcessorSpeed); await sleep(slowProcessorSpeed);
console.log("2 done ", chunk);
chunk.mapped.push(2); chunk.mapped.push(2);
return chunk; return chunk;
}, },
@ -408,6 +411,7 @@ test.cb.only(
// This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event) // This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event)
_demux.on("drain", () => { _demux.on("drain", () => {
expect(_demux._writableState.length).to.be.equal(0); expect(_demux._writableState.length).to.be.equal(0);
console.log(performance.now() - start);
expect(performance.now() - start).to.be.greaterThan( expect(performance.now() - start).to.be.greaterThan(
slowProcessorSpeed * (input.length - 2), slowProcessorSpeed * (input.length - 2),
); );
@ -427,7 +431,7 @@ test.cb.only(
const start = performance.now(); const start = performance.now();
input.forEach(item => { input.forEach(item => {
_demux.write(item); console.log(_demux.write(item));
}); });
}, },
); );
@ -457,6 +461,7 @@ test.cb(
const construct = (destKey: string) => { const construct = (destKey: string) => {
const first = map( const first = map(
(chunk: Chunk) => { (chunk: Chunk) => {
console.log("1: ", chunk);
chunk.mapped.push(1); chunk.mapped.push(1);
return chunk; return chunk;
}, },
@ -464,6 +469,7 @@ test.cb(
); );
const second = map( const second = map(
(chunk: Chunk) => { (chunk: Chunk) => {
console.log("2: ", chunk);
chunk.mapped.push(2); chunk.mapped.push(2);
return chunk; return chunk;
}, },
@ -472,7 +478,9 @@ test.cb(
const third = map( const third = map(
async (chunk: Chunk) => { async (chunk: Chunk) => {
console.log("3: ", chunk);
await sleep(slowProcessorSpeed); await sleep(slowProcessorSpeed);
console.log(" 3 done ", chunk);
chunk.mapped.push(3); chunk.mapped.push(3);
return chunk; return chunk;
}, },
@ -496,6 +504,7 @@ test.cb(
// This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event) // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event)
_demux.on("drain", () => { _demux.on("drain", () => {
expect(_demux._writableState.length).to.be.equal(0); expect(_demux._writableState.length).to.be.equal(0);
console.log(performance.now() - start);
expect(performance.now() - start).to.be.greaterThan( expect(performance.now() - start).to.be.greaterThan(
slowProcessorSpeed * (input.length - 4), slowProcessorSpeed * (input.length - 4),
); );
@ -515,7 +524,7 @@ test.cb(
const start = performance.now(); const start = performance.now();
input.forEach(item => { input.forEach(item => {
_demux.write(item); console.log(_demux.write(item));
}); });
}, },
); );