diff --git a/src/index.spec.ts b/src/index.spec.ts index 2578958..0a0bd2c 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -5,11 +5,14 @@ import { fromArray, map, flatMap, + filter, split, join, + replace, + parse, + stringify, collect, concat, - filter, merge, } from "."; @@ -416,6 +419,149 @@ test.cb("join() joins chunks using the specified separator", t => { source.push(null); }); +test.cb( + "replace() replaces occurrences of the given string in the streamed elements with the specified " + + "replacement string", + t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["abc", "xyf", "ghi"]; + let i = 0; + source + .pipe(replace("de", "xy")) + .on("data", part => { + expect(part).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("abc"); + source.push("def"); + source.push("ghi"); + source.push(null); + }, +); + +test.cb( + "replace() replaces occurrences of the given regular expression in the streamed elements with " + + "the specified replacement string", + t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["abc", "xyz", "ghi"]; + let i = 0; + source + .pipe(replace(/^def$/, "xyz")) + .on("data", part => { + expect(part).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("abc"); + source.push("def"); + source.push("ghi"); + source.push(null); + }, +); + +test.cb("parse() parses the streamed elements as JSON", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["abc", {}, []]; + let i = 0; + source + .pipe(parse()) + .on("data", part => { + expect(part).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push('"abc"'); + source.push("{}"); + source.push("[]"); + source.push(null); +}); + +test.cb("parse() emits errors on invalid JSON", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe(parse()) + .resume() + .on("error", () => t.pass()) + .on("end", t.end); + + source.push("{}"); + source.push(""); + source.push([]); + source.push(null); +}); + +test.cb("stringify() stringifies the streamed elements as JSON", t => { + t.plan(4); + const source = new Readable({ objectMode: true }); + const expectedElements = [ + '"abc"', + "0", + '{"a":"a","b":"b","c":"c"}', + '["a","b","c"]', + ]; + let i = 0; + source + .pipe(stringify()) + .on("data", part => { + expect(part).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("abc"); + source.push(0); + source.push({ a: "a", b: "b", c: "c" }); + source.push(["a", "b", "c"]); + source.push(null); +}); + +test.cb( + "stringify() stringifies the streamed elements as pretty-printed JSON", + t => { + t.plan(4); + const source = new Readable({ objectMode: true }); + const expectedElements = [ + '"abc"', + "0", + '{\n "a": "a",\n "b": "b",\n "c": "c"\n}', + '[\n "a",\n "b",\n "c"\n]', + ]; + let i = 0; + source + .pipe(stringify({ pretty: true })) + .on("data", part => { + expect(part).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("abc"); + source.push(0); + source.push({ a: "a", b: "b", c: "c" }); + source.push(["a", "b", "c"]); + source.push(null); + }, +); + test.cb( "collect() collects streamed elements into an array (object, flowing mode)", t => { @@ -636,8 +782,8 @@ test.cb( "concat() concatenates multiple readable streams (non-object, paused mode)", t => { t.plan(6); - const source1 = new Readable({ objectMode: false }); - const source2 = new Readable({ objectMode: false }); + const source1 = new Readable({ objectMode: false, read: () => ({}) }); + const source2 = new Readable({ objectMode: false, read: () => ({}) }); const expectedElements = ["a", "b", "c", "d", "e", "f"]; let i = 0; const concatenation = concat(source1, source2) @@ -720,7 +866,7 @@ test.cb("concat() concatenates empty list of readable streams", t => { .on("end", t.end); }); -test.cb.only( +test.cb( "merge() merges multiple readable streams in chunk arrival order", t => { t.plan(6); diff --git a/src/index.ts b/src/index.ts index 3e0293c..1691a75 100644 --- a/src/index.ts +++ b/src/index.ts @@ -183,6 +183,67 @@ export function join(separator: string): NodeJS.ReadWriteStream { }); } +/** + * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in + * the streamed chunks with the specified replacement string + * @param searchValue The search string to use + * @param replaceValue The replacement string to use + */ +export function replace( + searchValue: string | RegExp, + replaceValue: string, +): NodeJS.ReadWriteStream { + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + transform(chunk: string, encoding, callback) { + callback(undefined, chunk.replace(searchValue, replaceValue)); + }, + }); +} + +/** + * Return a ReadWrite stream that parses the streamed chunks as JSON + */ +export function parse(): NodeJS.ReadWriteStream { + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + async transform(chunk: string, encoding, callback) { + try { + // Using await causes parsing errors to be emitted + callback(undefined, await JSON.parse(chunk)); + } catch (err) { + callback(err); + } + }, + }); +} +type JsonPrimitive = string | number | object; +type JsonValue = JsonPrimitive | JsonPrimitive[]; +interface JsonParseOptions { + pretty: boolean; +} +/** + * Return a ReadWrite stream that stringifies the streamed chunks to JSON + */ +export function stringify( + options: JsonParseOptions = { pretty: false }, +): NodeJS.ReadWriteStream { + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + transform(chunk: JsonValue, encoding, callback) { + callback( + undefined, + options.pretty + ? JSON.stringify(chunk, null, 2) + : JSON.stringify(chunk), + ); + }, + }); +} + /** * Return a ReadWrite stream that collects streamed chunks into an array or buffer * @param options diff --git a/yarn.lock b/yarn.lock index 9e4d17f..b88ff0e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2067,7 +2067,7 @@ meow@^5.0.0: yargs-parser "^10.0.0" mhysa@./: - version "0.5.0-beta.0" + version "0.6.0-beta.0" micromatch@^3.1.10, micromatch@^3.1.4: version "3.1.10"