diff --git a/.gitignore b/.gitignore index e14fcc3..893af13 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ node_modules dist sample_output -yarn-error.log \ No newline at end of file +yarn-error.log +TODO.md \ No newline at end of file diff --git a/package.json b/package.json index 8df65ad..ff226d1 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "mhysa", - "version": "0.5.0", - "description": "Promise, Stream and EventEmitter utils for Node.js", + "version": "0.6.0-beta.0", + "description": "Streams and event emitter utils for Node.js", "keywords": [ "promise", "stream", @@ -9,8 +9,7 @@ "utils" ], "author": { - "name": "Sami Turcotte", - "email": "samiturcotte@gmail.com" + "name": "Wenzil" }, "license": "MIT", "main": "dist/index.js", diff --git a/src/index.spec.ts b/src/index.spec.ts index 035667b..2578958 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -1,7 +1,17 @@ import test from "ava"; import { expect } from "chai"; import { Readable } from "stream"; -import { fromArray, map, flatMap, split, join, collect, concat } from "./"; +import { + fromArray, + map, + flatMap, + split, + join, + collect, + concat, + filter, + merge, +} from "."; test.cb("fromArray() streams array elements in flowing mode", t => { t.plan(3); @@ -9,7 +19,7 @@ test.cb("fromArray() streams array elements in flowing mode", t => { const stream = fromArray(elements); let i = 0; stream - .on("data", element => { + .on("data", (element: string) => { expect(element).to.equal(elements[i]); t.pass(); i++; @@ -52,7 +62,7 @@ test.cb("map() maps elements synchronously", t => { let i = 0; source .pipe(map((element: string) => element.toUpperCase())) - .on("data", element => { + .on("data", (element: string) => { expect(element).to.equal(expectedElements[i]); t.pass(); i++; @@ -78,7 +88,7 @@ test.cb("map() maps elements asynchronously", t => { return element.toUpperCase(); }), ) - .on("data", element => { + .on("data", (element: string) => { expect(element).to.equal(expectedElements[i]); t.pass(); i++; @@ -239,6 +249,104 @@ test.cb("flatMap() emits errors during asynchronous mapping", t => { source.push(null); }); +test.cb("filter() filters elements synchronously", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + const expectedElements = ["a", "c"]; + let i = 0; + source + .pipe(filter((element: string) => element !== "b")) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("filter() filters elements asynchronously", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + const expectedElements = ["a", "c"]; + let i = 0; + source + .pipe( + filter(async (element: string) => { + await Promise.resolve(); + return element !== "b"; + }), + ) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("filter() emits errors during synchronous filtering", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe( + filter((element: string) => { + if (element !== "a") { + throw new Error("Failed filtering"); + } + return true; + }), + ) + .resume() + .on("error", err => { + expect(err.message).to.equal("Failed filtering"); + t.pass(); + }) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("filter() emits errors during asynchronous filtering", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe( + filter(async (element: string) => { + await Promise.resolve(); + if (element !== "a") { + throw new Error("Failed filtering"); + } + return true; + }), + ) + .resume() + .on("error", err => { + expect(err.message).to.equal("Failed filtering"); + t.pass(); + }) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + test.cb("split() splits chunks using the default separator (\\n)", t => { t.plan(5); const source = new Readable({ objectMode: true }); @@ -445,7 +553,7 @@ test.cb( const expectedElements = ["a", "b", "c", "d", "e", "f"]; let i = 0; concat(source1, source2) - .on("data", element => { + .on("data", (element: string) => { expect(element).to.equal(expectedElements[i]); t.pass(); i++; @@ -505,7 +613,7 @@ test.cb( const expectedElements = ["a", "b", "c", "d", "e", "f"]; let i = 0; concat(source1, source2) - .on("data", element => { + .on("data", (element: string) => { expect(element).to.deep.equal(Buffer.from(expectedElements[i])); t.pass(); i++; @@ -548,13 +656,13 @@ test.cb( .on("end", t.end); source1.push("a"); - source2.push("d"); - source1.push("b"); - source2.push("e"); - source1.push("c"); - source2.push("f"); - source2.push(null); - source1.push(null); + setTimeout(() => source2.push("d"), 10); + setTimeout(() => source1.push("b"), 20); + setTimeout(() => source2.push("e"), 30); + setTimeout(() => source1.push("c"), 40); + setTimeout(() => source2.push("f"), 50); + setTimeout(() => source2.push(null), 60); + setTimeout(() => source1.push(null), 70); }, ); @@ -564,7 +672,7 @@ test.cb("concat() concatenates a single readable stream (object mode)", t => { const expectedElements = ["a", "b", "c", "d", "e", "f"]; let i = 0; concat(source) - .on("data", element => { + .on("data", (element: string) => { expect(element).to.equal(expectedElements[i]); t.pass(); i++; @@ -586,7 +694,7 @@ test.cb( const expectedElements = ["a", "b", "c", "d", "e", "f"]; let i = 0; concat(source) - .on("data", element => { + .on("data", (element: string) => { expect(element).to.deep.equal(Buffer.from(expectedElements[i])); t.pass(); i++; @@ -611,3 +719,59 @@ test.cb("concat() concatenates empty list of readable streams", t => { .on("error", t.end) .on("end", t.end); }); + +test.cb.only( + "merge() merges multiple readable streams in chunk arrival order", + t => { + t.plan(6); + const source1 = new Readable({ objectMode: true, read: () => ({}) }); + const source2 = new Readable({ objectMode: true, read: () => ({}) }); + const expectedElements = ["a", "d", "b", "e", "c", "f"]; + let i = 0; + merge(source1, source2) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source1.push("a"); + setTimeout(() => source2.push("d"), 10); + setTimeout(() => source1.push("b"), 20); + setTimeout(() => source2.push("e"), 30); + setTimeout(() => source1.push("c"), 40); + setTimeout(() => source2.push("f"), 50); + setTimeout(() => source2.push(null), 60); + setTimeout(() => source1.push(null), 70); + }, +); + +test.cb("merge() merges a readable stream", t => { + t.plan(3); + const source = new Readable({ objectMode: true, read: () => ({}) }); + const expectedElements = ["a", "b", "c"]; + let i = 0; + merge(source) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("merge() merges an empty list of readable streams", t => { + t.plan(0); + merge() + .on("data", () => t.pass()) + .on("error", t.end) + .on("end", t.end); +}); diff --git a/src/index.ts b/src/index.ts index 8eca646..3e0293c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,10 +2,10 @@ import { Transform, Readable } from "stream"; import * as _utils from "./utils"; export const utils = _utils; -export interface ReadableOptions { +export interface ThroughOptions { objectMode?: boolean; } -export interface ThroughOptions { +export interface TransformOptions { readableObjectMode?: boolean; writableObjectMode?: boolean; } @@ -38,14 +38,14 @@ export function fromArray(array: any[]): NodeJS.ReadableStream { */ export function map( mapper: (chunk: T, encoding: string) => R, - options: ThroughOptions = { + options: TransformOptions = { readableObjectMode: true, writableObjectMode: true, }, ): NodeJS.ReadWriteStream { return new Transform({ ...options, - async transform(chunk, encoding, callback) { + async transform(chunk: T, encoding, callback) { let isPromise = false; try { const mapped = mapper(chunk, encoding); @@ -75,14 +75,14 @@ export function flatMap( mapper: | ((chunk: T, encoding: string) => R[]) | ((chunk: T, encoding: string) => Promise), - options: ThroughOptions = { + options: TransformOptions = { readableObjectMode: true, writableObjectMode: true, }, ): NodeJS.ReadWriteStream { return new Transform({ ...options, - async transform(chunk, encoding, callback) { + async transform(chunk: T, encoding, callback) { let isPromise = false; try { const mapped = mapper(chunk, encoding); @@ -102,6 +102,40 @@ export function flatMap( }); } +export function filter( + predicate: + | ((chunk: T, encoding: string) => boolean) + | ((chunk: T, encoding: string) => Promise), + options: ThroughOptions = { + objectMode: true, + }, +) { + return new Transform({ + readableObjectMode: options.objectMode, + writableObjectMode: options.objectMode, + async transform(chunk: T, encoding, callback) { + let isPromise = false; + try { + const result = predicate(chunk, encoding); + isPromise = result instanceof Promise; + if (!!(await result)) { + callback(undefined, chunk); + } else { + callback(); + } + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + }); +} + /** * Return a ReadWrite stream that splits streamed chunks using the given separator * @param separator The separator to split by, defaulting to "\n" @@ -138,7 +172,7 @@ export function join(separator: string): NodeJS.ReadWriteStream { return new Transform({ readableObjectMode: true, writableObjectMode: true, - async transform(chunk, encoding, callback) { + async transform(chunk: string, encoding, callback) { if (!isFirstChunk) { this.push(separator); } @@ -155,7 +189,7 @@ export function join(separator: string): NodeJS.ReadWriteStream { * @param options.objectMode Whether this stream should behave as a stream of objects */ export function collect( - options: ReadableOptions = { objectMode: false }, + options: ThroughOptions = { objectMode: false }, ): NodeJS.ReadWriteStream { const collected: any[] = []; return new Transform({ @@ -215,3 +249,41 @@ export function concat( }); return wrapper; } + +/** + * Return a stream of readable streams merged together in chunk arrival order + * @param streams The readable streams to merge + */ +export function merge( + ...streams: NodeJS.ReadableStream[] +): NodeJS.ReadableStream { + let isStarted = false; + let streamEndedCount = 0; + return new Readable({ + objectMode: true, + read() { + if (streamEndedCount >= streams.length) { + this.push(null); + } else if (!isStarted) { + isStarted = true; + streams.forEach(stream => + stream + .on("data", chunk => { + if (!this.push(chunk)) { + streams.forEach(s => s.pause()); + } + }) + .on("error", err => this.emit("error", err)) + .on("end", () => { + streamEndedCount++; + if (streamEndedCount === streams.length) { + this.push(null); + } + }), + ); + } else { + streams.forEach(s => s.resume()); + } + }, + }); +}