diff --git a/src/functions/compose.ts b/src/functions/compose.ts index ee17101..da46ee8 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -1,11 +1,4 @@ -import { - pipeline, - Duplex, - Transform, - Readable, - Writable, - DuplexOptions, -} from "stream"; +import { pipeline, Duplex, DuplexOptions } from "stream"; /** * Return a Readable stream of readable streams concatenated together @@ -39,7 +32,7 @@ enum EventSubscription { const eventsTarget = { close: EventSubscription.Last, data: EventSubscription.Last, - drain: EventSubscription.First, + drain: EventSubscription.Self, end: EventSubscription.Last, error: EventSubscription.Self, finish: EventSubscription.Last, @@ -56,6 +49,7 @@ type AllStreams = | NodeJS.WritableStream; export class Compose extends Duplex { + public writable: boolean; private first: AllStreams; private last: AllStreams; private streams: AllStreams[]; @@ -75,9 +69,7 @@ export class Compose extends Duplex { } public _write(chunk: any, encoding: string, cb: any) { - const res = (this.first as NodeJS.WritableStream).write(chunk); - cb(); - return res; + (this.first as NodeJS.WritableStream).write(chunk, encoding, cb); } public bubble(...events: string[]) { diff --git a/src/functions/demux.ts b/src/functions/demux.ts index ef7a26a..138e6c7 100644 --- a/src/functions/demux.ts +++ b/src/functions/demux.ts @@ -14,6 +14,7 @@ export function demux( } class Demux extends Writable { + public isWritable: boolean; private streamsByKey: { [key: string]: { stream: NodeJS.WritableStream | NodeJS.ReadWriteStream; @@ -21,7 +22,6 @@ class Demux extends Writable { }; }; private demuxer: (chunk: any) => string; - private isWritable: boolean; private nonWritableStreams: Array; private construct: ( destKey?: string, @@ -43,9 +43,10 @@ class Demux extends Writable { this.construct = construct; this.streamsByKey = {}; this.isWritable = true; + this.nonWritableStreams = []; } - public _write(chunk: any, encoding: string, cb: any) { + public _write(chunk: any, encoding?: any, cb?: any) { const destKey = this.demuxer(chunk); if (this.streamsByKey[destKey] === undefined) { this.streamsByKey[destKey] = { @@ -57,35 +58,22 @@ class Demux extends Writable { // Set writable to false // keep state of all the streams, if one is not writable demux shouldnt be writable // Small optimization is to keep writing until you get a following event to the unwritable destination - let res = false; - if (this.isWritable && this.streamsByKey[destKey].writable) { + if (this.streamsByKey[destKey].writable && this.isWritable) { res = this.streamsByKey[destKey].stream.write(chunk, encoding, cb); - } else if (this.isWritable) { - this.isWritable = false; - // Buffer chunk? - return this.isWritable; } - - /* If write above returns false and the stream written to was writable previously, we need to make demux - * non-writable and update state to know the stream is nonWritable. - * If write returns true and the stream was previously not writable, we need to update which streams - * are non writable and determine if it is safe for demux to become writable (all streams are writable) - */ - if (!res) { + if (!res && this.isWritable) { + this.isWritable = false; this.streamsByKey[destKey].writable = false; this.nonWritableStreams.push(destKey); - this.isWritable = false; this.streamsByKey[destKey].stream.once("drain", () => { - this.streamsByKey[destKey].writable = true; - this.nonWritableStreams = this.nonWritableStreams.filter( - key => key !== destKey, - ); - + this.nonWritableStreams.filter(key => key !== destKey); this.isWritable = this.nonWritableStreams.length === 0; + this.streamsByKey[destKey].stream.write(chunk, encoding, cb); + if (this.isWritable) { + this.emit("drain"); + } }); } - - return this.writable; } } diff --git a/src/functions/filter.ts b/src/functions/filter.ts index 5714dd1..336db0c 100644 --- a/src/functions/filter.ts +++ b/src/functions/filter.ts @@ -1,5 +1,4 @@ -import { Transform } from "stream"; -import { ThroughOptions } from "./baseDefinitions"; +import { Transform, TransformOptions } from "stream"; /** * Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold * @param predicate Predicate with which to filter scream chunks @@ -10,20 +9,17 @@ export function filter( predicate: | ((chunk: T, encoding: string) => boolean) | ((chunk: T, encoding: string) => Promise), - options: ThroughOptions = { - objectMode: true, - }, + options?: TransformOptions, ) { return new Transform({ - readableObjectMode: options.objectMode, - writableObjectMode: options.objectMode, - async transform(chunk: T, encoding, callback) { + ...options, + async transform(chunk: T, encoding?: any, callback?: any) { let isPromise = false; try { const result = predicate(chunk, encoding); isPromise = result instanceof Promise; if (!!(await result)) { - callback(undefined, chunk); + callback(null, chunk); } else { callback(); } diff --git a/src/functions/index.ts b/src/functions/index.ts index a5b53db..59ff9c3 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -59,7 +59,7 @@ export function filter( mapper: | ((chunk: T, encoding: string) => boolean) | ((chunk: T, encoding: string) => Promise), - options?: ThroughOptions, + options?: TransformOptions, ): Transform { return baseFunctions.filter(mapper, options); } diff --git a/src/functions/map.ts b/src/functions/map.ts index 61e84d5..05fe627 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -20,8 +20,7 @@ export function map( async transform(chunk: T, encoding, callback) { try { const mapped = await mapper(chunk, encoding); - this.push(mapped); - callback(); + callback(null, mapped); } catch (err) { console.log("caught error", err.message); callback(err); diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index ed2304b..362f484 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -1,6 +1,8 @@ const test = require("ava"); const { expect } = require("chai"); -const { compose, composeDuplex, map } = require("../src"); +const { compose, composeDuplex, map, rate } = require("../src"); +const { sleep } = require("../src/helpers"); +import { performance } from "perf_hooks"; test.cb("compose() chains two streams together in the correct order", t => { t.plan(3); @@ -66,3 +68,167 @@ test.cb( input.forEach(item => composed.write(item)); }, ); + +test.cb( + "compose() should emit drain event after 1 second when first stream is bottleneck", + t => { + t.plan(1); + const first = map( + async (chunk: number) => { + await sleep(200); + return chunk; + }, + { + objectMode: true, + }, + ); + + const second = map( + async (chunk: number) => { + return chunk; + }, + { objectMode: true }, + ); + + const composed = compose( + [first, second], + { objectMode: true, highWaterMark: 2 }, + ); + composed.on("error", err => { + t.end(err); + }); + + composed.on("drain", err => { + expect(performance.now() - start).to.be.greaterThan(1000); + t.pass(); + }); + + composed.on("data", chunk => { + if (chunk.data === 5) { + t.end(); + } + }); + + const input = [ + { data: 1 }, + { data: 2 }, + { data: 3 }, + { data: 4 }, + { data: 5 }, + ]; + + input.forEach(item => { + composed.write(item); + }); + const start = performance.now(); + }, +); + +test.cb( + "compose() should emit drain event immediately when second stream is bottleneck", + t => { + t.plan(1); + const first = map( + async (chunk: number) => { + return chunk; + }, + { + objectMode: true, + }, + ); + + const second = map( + async (chunk: number) => { + await sleep(500); + return chunk; + }, + { objectMode: true }, + ); + + const composed = compose( + [first, second], + { objectMode: true, highWaterMark: 2 }, + ); + composed.on("error", err => { + t.end(err); + }); + + composed.on("drain", err => { + expect(performance.now() - start).to.be.lessThan(100); + t.pass(); + }); + + composed.on("data", chunk => { + if (chunk.data === 5) { + t.end(); + } + }); + + const input = [ + { data: 1 }, + { data: 2 }, + { data: 3 }, + { data: 4 }, + { data: 5 }, + ]; + + input.forEach(item => { + composed.write(item); + }); + const start = performance.now(); + }, +); + +test.cb( + "first should contain up to highWaterMark items in readable state when second is bottleneck", + t => { + t.plan(10); + const first = map( + async (chunk: number) => { + expect(first._readableState.length).to.be.at.most(2); + t.pass(); + return chunk; + }, + { + objectMode: true, + highWaterMark: 2, + }, + ); + + const second = map( + async (chunk: number) => { + expect(second._writableState.length).to.be.equal(1); + t.pass(); + await sleep(100); + return chunk; + }, + { objectMode: true, highWaterMark: 2 }, + ); + + const composed = compose( + [first, second], + { objectMode: true }, + ); + composed.on("error", err => { + t.end(err); + }); + + composed.on("data", chunk => { + if (chunk.data === 5) { + t.end(); + } + }); + + const input = [ + { data: 1 }, + { data: 2 }, + { data: 3 }, + { data: 4 }, + { data: 5 }, + ]; + + input.forEach(item => { + composed.write(item); + }); + }, +); diff --git a/tests/filter.spec.ts b/tests/filter.spec.ts index 7fa2053..0732d06 100644 --- a/tests/filter.spec.ts +++ b/tests/filter.spec.ts @@ -9,7 +9,12 @@ test.cb("filter() filters elements synchronously", t => { const expectedElements = ["a", "c"]; let i = 0; source - .pipe(filter((element: string) => element !== "b")) + .pipe( + filter((element: string) => element !== "b", { + readableObjectMode: true, + writableObjectMode: true, + }), + ) .on("data", (element: string) => { expect(element).to.equal(expectedElements[i]); t.pass(); @@ -31,10 +36,13 @@ test.cb("filter() filters elements asynchronously", t => { let i = 0; source .pipe( - filter(async (element: string) => { - await Promise.resolve(); - return element !== "b"; - }), + filter( + async (element: string) => { + await Promise.resolve(); + return element !== "b"; + }, + { readableObjectMode: true, writableObjectMode: true }, + ), ) .on("data", (element: string) => { expect(element).to.equal(expectedElements[i]); @@ -55,12 +63,15 @@ test.cb("filter() emits errors during synchronous filtering", t => { const source = new Readable({ objectMode: true }); source .pipe( - filter((element: string) => { - if (element !== "a") { - throw new Error("Failed filtering"); - } - return true; - }), + filter( + (element: string) => { + if (element !== "a") { + throw new Error("Failed filtering"); + } + return true; + }, + { readableObjectMode: true, writableObjectMode: true }, + ), ) .resume() .on("error", err => { @@ -80,13 +91,16 @@ test.cb("filter() emits errors during asynchronous filtering", t => { const source = new Readable({ objectMode: true }); source .pipe( - filter(async (element: string) => { - await Promise.resolve(); - if (element !== "a") { - throw new Error("Failed filtering"); - } - return true; - }), + filter( + async (element: string) => { + await Promise.resolve(); + if (element !== "a") { + throw new Error("Failed filtering"); + } + return true; + }, + { readableObjectMode: true, writableObjectMode: true }, + ), ) .resume() .on("error", err => {