diff --git a/package.json b/package.json index 217a686..b42b36a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mhysa", - "version": "0.3.6", + "version": "0.4.0", "description": "Promise, Stream and EventEmitter utils for Node.js", "keywords": [ "promise", @@ -31,10 +31,11 @@ "dependencies": {}, "devDependencies": { "@types/chai": "^4.1.7", + "@types/event-stream": "^3.3.34", "@types/node": "^10.12.10", "ava": "^1.0.0-rc.2", "chai": "^4.2.0", - "mhysa": "^0.3.5", + "mhysa": "0.3.6", "prettier": "^1.14.3", "ts-node": "^7.0.1", "tslint": "^5.11.0", diff --git a/src/stream.spec.ts b/src/stream.spec.ts index a17445d..1cb3f35 100644 --- a/src/stream.spec.ts +++ b/src/stream.spec.ts @@ -1,8 +1,15 @@ import test from "ava"; import { expect } from "chai"; import { Readable } from "stream"; -import { fromArray, collect, concat } from "./stream"; - +import { + fromArray, + map, + flatMap, + split, + join, + collect, + concat, +} from "./stream"; test.cb("fromArray() streams array elements in flowing mode", t => { t.plan(3); const elements = ["a", "b", "c"]; @@ -45,6 +52,269 @@ test.cb("fromArray() ends immediately if there are no array elements", t => { .on("end", t.end); }); +test.cb("map() maps elements synchronously", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["A", "B", "C"]; + let i = 0; + source + .pipe(map((element: string) => element.toUpperCase())) + .on("data", element => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("map() maps elements asynchronously", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["A", "B", "C"]; + let i = 0; + source + .pipe( + map(async (element: string) => { + await Promise.resolve(); + return element.toUpperCase(); + }), + ) + .on("data", element => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("map() emits errors during synchronous mapping", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe( + map((element: string) => { + if (element !== "a") { + throw new Error("Failed mapping"); + } + return element.toUpperCase(); + }), + ) + .resume() + .on("error", err => { + expect(err.message).to.equal("Failed mapping"); + t.pass(); + }) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("map() emits errors during asynchronous mapping", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe( + map(async (element: string) => { + await Promise.resolve(); + if (element !== "a") { + throw new Error("Failed mapping"); + } + return element.toUpperCase(); + }), + ) + .resume() + .on("error", err => { + expect(err.message).to.equal("Failed mapping"); + t.pass(); + }) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("flatMap() maps elements synchronously", t => { + t.plan(6); + const source = new Readable({ objectMode: true }); + const expectedElements = ["a", "A", "b", "B", "c", "C"]; + let i = 0; + source + .pipe(flatMap((element: string) => [element, element.toUpperCase()])) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("flatMap() maps elements asynchronously", t => { + t.plan(6); + const source = new Readable({ objectMode: true }); + const expectedElements = ["a", "A", "b", "B", "c", "C"]; + let i = 0; + source + .pipe( + flatMap(async (element: string) => { + await Promise.resolve(); + return [element, element.toUpperCase()]; + }), + ) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("flatMap() emits errors during synchronous mapping", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe( + flatMap((element: string) => { + if (element !== "a") { + throw new Error("Failed mapping"); + } + return [element, element.toUpperCase()]; + }), + ) + .resume() + .on("error", err => { + expect(err.message).to.equal("Failed mapping"); + t.pass(); + }) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("flatMap() emits errors during asynchronous mapping", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe( + flatMap(async (element: string) => { + await Promise.resolve(); + if (element !== "a") { + throw new Error("Failed mapping"); + } + return [element, element.toUpperCase()]; + }), + ) + .resume() + .on("error", err => { + expect(err.message).to.equal("Failed mapping"); + t.pass(); + }) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("split() splits chunks using the default separator (\\n)", t => { + t.plan(5); + const source = new Readable({ objectMode: true }); + const expectedParts = ["ab", "c", "d", "ef", ""]; + let i = 0; + source + .pipe(split()) + .on("data", part => { + expect(part).to.equal(expectedParts[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("ab\n"); + source.push("c"); + source.push("\n"); + source.push("d"); + source.push("\nef\n"); + source.push(null); +}); + +test.cb("split() splits chunks using the specified separator", t => { + t.plan(6); + const source = new Readable({ objectMode: true }); + const expectedParts = ["ab", "c", "d", "e", "f", ""]; + let i = 0; + source + .pipe(split("|")) + .on("data", part => { + expect(part).to.equal(expectedParts[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("ab|"); + source.push("c|d"); + source.push("|"); + source.push("e"); + source.push("|f|"); + source.push(null); +}); + +test.cb("join() joins chunks using the specified separator", t => { + t.plan(9); + const source = new Readable({ objectMode: true }); + const expectedParts = ["ab|", "|", "c|d", "|", "|", "|", "e", "|", "|f|"]; + let i = 0; + source + .pipe(join("|")) + .on("data", part => { + expect(part).to.equal(expectedParts[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("ab|"); + source.push("c|d"); + source.push("|"); + source.push("e"); + source.push("|f|"); + source.push(null); +}); + test.cb( "collect() collects streamed elements into an array (object, flowing mode)", t => { diff --git a/src/stream.ts b/src/stream.ts index 8a2b32f..bc25580 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -20,7 +20,122 @@ export function fromArray(array: any[]): NodeJS.ReadableStream { } /** - * Return a ReadWrite stream that collects streamed objects or bytes into an array or buffer + * Return a ReadWrite stream that maps streamed chunks + * @param mapper The mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) + * @param options + * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects + */ +export function map( + mapper: (chunk: T, encoding: string) => R, + { readableObjectMode = true, writableObjectMode = true } = {}, +): NodeJS.ReadWriteStream { + return new Transform({ + readableObjectMode, + writableObjectMode, + async transform(chunk, encoding, callback) { + let isPromise = false; + try { + const mapped = mapper(chunk, encoding); + isPromise = mapped instanceof Promise; + callback(undefined, await mapped); + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + }); +} + +type FlatMapper = + | ((chunk: T, encoding: string) => R[]) + | ((chunk: T, encoding: string) => Promise); +/** + * Return a ReadWrite stream that flat maps streamed chunks + * @param mapper The mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) + * @param options + * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects + * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects + */ +export function flatMap( + mapper: FlatMapper, + { readableObjectMode = true, writableObjectMode = true } = {}, +): NodeJS.ReadWriteStream { + return new Transform({ + readableObjectMode, + writableObjectMode, + async transform(chunk, encoding, callback) { + let isPromise = false; + try { + const mapped = mapper(chunk, encoding); + isPromise = mapped instanceof Promise; + (await mapped).forEach(c => this.push(c)); + callback(); + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + }); +} + +/** + * Return a ReadWrite stream that splits streamed chunks using the given separator + * @param separator The separator to split by, defaulting to "\n" + */ +export function split(separator: string = "\n"): NodeJS.ReadWriteStream { + let buffered: string = ""; + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + async transform(chunk, encoding, callback) { + const splitted = chunk.split(separator); + if (buffered.length > 0 && splitted.length > 1) { + splitted[0] = buffered.concat(splitted[0]); + buffered = ""; + } + buffered += splitted[splitted.length - 1]; + splitted.slice(0, -1).forEach((part: string) => this.push(part)); + callback(); + }, + flush(callback) { + callback(undefined, buffered); + }, + }); +} + +/** + * Return a ReadWrite stream that joins streamed chunks using the given separator + * @param separator The separator to join with + */ +export function join(separator: string): NodeJS.ReadWriteStream { + let isFirstChunk = true; + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + async transform(chunk, encoding, callback) { + if (!isFirstChunk) { + this.push(separator); + } + this.push(chunk); + isFirstChunk = false; + callback(); + }, + }); +} + +/** + * Return a ReadWrite stream that collects streamed chunks into an array or buffer * @param options * @param options.objectMode Whether this stream should behave as a stream of objects */ diff --git a/yarn.lock b/yarn.lock index f80f174..8cedf50 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2059,6 +2059,11 @@ meow@^5.0.0: trim-newlines "^2.0.0" yargs-parser "^10.0.0" +mhysa@0.3.6: + version "0.3.6" + resolved "https://registry.yarnpkg.com/mhysa/-/mhysa-0.3.6.tgz#c33885ca34c5797486ff9af10e33d17e2cd2b2a9" + integrity sha512-X7AIhISZ/r5xOHaod3SwLCTXr11ttu/fDiKQPpsI0p/RsTTqweBR7hGJeRIF9Rt6XMW/UY21pJEf/ftUkfvkHg== + micromatch@^3.1.10, micromatch@^3.1.4: version "3.1.10" resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-3.1.10.tgz#70859bc95c9840952f359a068a3fc49f9ecfac23" @@ -2158,11 +2163,6 @@ multimatch@^2.1.0: arrify "^1.0.0" minimatch "^3.0.0" -mhysa@^0.3.5: - version "0.3.5" - resolved "https://registry.yarnpkg.com/mhysa/-/mhysa-0.3.5.tgz#79305e1834db166d1daa02dac93072adaa63d6c2" - integrity sha512-Fz31nflyS0G2uwKlunpnKU+M9HUPxK89IWYjsNL3ENgunMlMOq1ELtpGGiTSLrbVV/N5Mc9j51kvraEbdPKgzQ== - nan@^2.9.2: version "2.11.1" resolved "https://registry.yarnpkg.com/nan/-/nan-2.11.1.tgz#90e22bccb8ca57ea4cd37cc83d3819b52eea6766"