Add replace(), parse() and stringify() methods

This commit is contained in:
Sami Turcotte 2018-12-01 02:18:03 -05:00
parent 46597f3185
commit 6201d7812f
3 changed files with 212 additions and 5 deletions

View File

@ -5,11 +5,14 @@ import {
fromArray,
map,
flatMap,
filter,
split,
join,
replace,
parse,
stringify,
collect,
concat,
filter,
merge,
} from ".";
@ -416,6 +419,149 @@ test.cb("join() joins chunks using the specified separator", t => {
source.push(null);
});
test.cb(
"replace() replaces occurrences of the given string in the streamed elements with the specified " +
"replacement string",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["abc", "xyf", "ghi"];
let i = 0;
source
.pipe(replace("de", "xy"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push("def");
source.push("ghi");
source.push(null);
},
);
test.cb(
"replace() replaces occurrences of the given regular expression in the streamed elements with " +
"the specified replacement string",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["abc", "xyz", "ghi"];
let i = 0;
source
.pipe(replace(/^def$/, "xyz"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push("def");
source.push("ghi");
source.push(null);
},
);
test.cb("parse() parses the streamed elements as JSON", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["abc", {}, []];
let i = 0;
source
.pipe(parse())
.on("data", part => {
expect(part).to.deep.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push('"abc"');
source.push("{}");
source.push("[]");
source.push(null);
});
test.cb("parse() emits errors on invalid JSON", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(parse())
.resume()
.on("error", () => t.pass())
.on("end", t.end);
source.push("{}");
source.push("");
source.push([]);
source.push(null);
});
test.cb("stringify() stringifies the streamed elements as JSON", t => {
t.plan(4);
const source = new Readable({ objectMode: true });
const expectedElements = [
'"abc"',
"0",
'{"a":"a","b":"b","c":"c"}',
'["a","b","c"]',
];
let i = 0;
source
.pipe(stringify())
.on("data", part => {
expect(part).to.deep.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push(0);
source.push({ a: "a", b: "b", c: "c" });
source.push(["a", "b", "c"]);
source.push(null);
});
test.cb(
"stringify() stringifies the streamed elements as pretty-printed JSON",
t => {
t.plan(4);
const source = new Readable({ objectMode: true });
const expectedElements = [
'"abc"',
"0",
'{\n "a": "a",\n "b": "b",\n "c": "c"\n}',
'[\n "a",\n "b",\n "c"\n]',
];
let i = 0;
source
.pipe(stringify({ pretty: true }))
.on("data", part => {
expect(part).to.deep.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push(0);
source.push({ a: "a", b: "b", c: "c" });
source.push(["a", "b", "c"]);
source.push(null);
},
);
test.cb(
"collect() collects streamed elements into an array (object, flowing mode)",
t => {
@ -636,8 +782,8 @@ test.cb(
"concat() concatenates multiple readable streams (non-object, paused mode)",
t => {
t.plan(6);
const source1 = new Readable({ objectMode: false });
const source2 = new Readable({ objectMode: false });
const source1 = new Readable({ objectMode: false, read: () => ({}) });
const source2 = new Readable({ objectMode: false, read: () => ({}) });
const expectedElements = ["a", "b", "c", "d", "e", "f"];
let i = 0;
const concatenation = concat(source1, source2)
@ -720,7 +866,7 @@ test.cb("concat() concatenates empty list of readable streams", t => {
.on("end", t.end);
});
test.cb.only(
test.cb(
"merge() merges multiple readable streams in chunk arrival order",
t => {
t.plan(6);

View File

@ -183,6 +183,67 @@ export function join(separator: string): NodeJS.ReadWriteStream {
});
}
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue The search string to use
* @param replaceValue The replacement string to use
*/
export function replace(
searchValue: string | RegExp,
replaceValue: string,
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: string, encoding, callback) {
callback(undefined, chunk.replace(searchValue, replaceValue));
},
});
}
/**
* Return a ReadWrite stream that parses the streamed chunks as JSON
*/
export function parse(): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: string, encoding, callback) {
try {
// Using await causes parsing errors to be emitted
callback(undefined, await JSON.parse(chunk));
} catch (err) {
callback(err);
}
},
});
}
type JsonPrimitive = string | number | object;
type JsonValue = JsonPrimitive | JsonPrimitive[];
interface JsonParseOptions {
pretty: boolean;
}
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
*/
export function stringify(
options: JsonParseOptions = { pretty: false },
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: JsonValue, encoding, callback) {
callback(
undefined,
options.pretty
? JSON.stringify(chunk, null, 2)
: JSON.stringify(chunk),
);
},
});
}
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options

View File

@ -2067,7 +2067,7 @@ meow@^5.0.0:
yargs-parser "^10.0.0"
mhysa@./:
version "0.5.0-beta.0"
version "0.6.0-beta.0"
micromatch@^3.1.10, micromatch@^3.1.4:
version "3.1.10"