Add map, flatMap, split and join streams
This commit is contained in:
parent
6bda77d668
commit
8bb6016fbf
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "mhysa",
|
"name": "mhysa",
|
||||||
"version": "0.3.6",
|
"version": "0.4.0",
|
||||||
"description": "Promise, Stream and EventEmitter utils for Node.js",
|
"description": "Promise, Stream and EventEmitter utils for Node.js",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"promise",
|
"promise",
|
||||||
@ -31,10 +31,11 @@
|
|||||||
"dependencies": {},
|
"dependencies": {},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@types/chai": "^4.1.7",
|
"@types/chai": "^4.1.7",
|
||||||
|
"@types/event-stream": "^3.3.34",
|
||||||
"@types/node": "^10.12.10",
|
"@types/node": "^10.12.10",
|
||||||
"ava": "^1.0.0-rc.2",
|
"ava": "^1.0.0-rc.2",
|
||||||
"chai": "^4.2.0",
|
"chai": "^4.2.0",
|
||||||
"mhysa": "^0.3.5",
|
"mhysa": "0.3.6",
|
||||||
"prettier": "^1.14.3",
|
"prettier": "^1.14.3",
|
||||||
"ts-node": "^7.0.1",
|
"ts-node": "^7.0.1",
|
||||||
"tslint": "^5.11.0",
|
"tslint": "^5.11.0",
|
||||||
|
@ -1,8 +1,15 @@
|
|||||||
import test from "ava";
|
import test from "ava";
|
||||||
import { expect } from "chai";
|
import { expect } from "chai";
|
||||||
import { Readable } from "stream";
|
import { Readable } from "stream";
|
||||||
import { fromArray, collect, concat } from "./stream";
|
import {
|
||||||
|
fromArray,
|
||||||
|
map,
|
||||||
|
flatMap,
|
||||||
|
split,
|
||||||
|
join,
|
||||||
|
collect,
|
||||||
|
concat,
|
||||||
|
} from "./stream";
|
||||||
test.cb("fromArray() streams array elements in flowing mode", t => {
|
test.cb("fromArray() streams array elements in flowing mode", t => {
|
||||||
t.plan(3);
|
t.plan(3);
|
||||||
const elements = ["a", "b", "c"];
|
const elements = ["a", "b", "c"];
|
||||||
@ -45,6 +52,269 @@ test.cb("fromArray() ends immediately if there are no array elements", t => {
|
|||||||
.on("end", t.end);
|
.on("end", t.end);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test.cb("map() maps elements synchronously", t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["A", "B", "C"];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(map((element: string) => element.toUpperCase()))
|
||||||
|
.on("data", element => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("map() maps elements asynchronously", t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["A", "B", "C"];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(
|
||||||
|
map(async (element: string) => {
|
||||||
|
await Promise.resolve();
|
||||||
|
return element.toUpperCase();
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.on("data", element => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("map() emits errors during synchronous mapping", t => {
|
||||||
|
t.plan(2);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
source
|
||||||
|
.pipe(
|
||||||
|
map((element: string) => {
|
||||||
|
if (element !== "a") {
|
||||||
|
throw new Error("Failed mapping");
|
||||||
|
}
|
||||||
|
return element.toUpperCase();
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.resume()
|
||||||
|
.on("error", err => {
|
||||||
|
expect(err.message).to.equal("Failed mapping");
|
||||||
|
t.pass();
|
||||||
|
})
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("map() emits errors during asynchronous mapping", t => {
|
||||||
|
t.plan(2);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
source
|
||||||
|
.pipe(
|
||||||
|
map(async (element: string) => {
|
||||||
|
await Promise.resolve();
|
||||||
|
if (element !== "a") {
|
||||||
|
throw new Error("Failed mapping");
|
||||||
|
}
|
||||||
|
return element.toUpperCase();
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.resume()
|
||||||
|
.on("error", err => {
|
||||||
|
expect(err.message).to.equal("Failed mapping");
|
||||||
|
t.pass();
|
||||||
|
})
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("flatMap() maps elements synchronously", t => {
|
||||||
|
t.plan(6);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["a", "A", "b", "B", "c", "C"];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(flatMap((element: string) => [element, element.toUpperCase()]))
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("flatMap() maps elements asynchronously", t => {
|
||||||
|
t.plan(6);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["a", "A", "b", "B", "c", "C"];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(
|
||||||
|
flatMap(async (element: string) => {
|
||||||
|
await Promise.resolve();
|
||||||
|
return [element, element.toUpperCase()];
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("flatMap() emits errors during synchronous mapping", t => {
|
||||||
|
t.plan(2);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
source
|
||||||
|
.pipe(
|
||||||
|
flatMap((element: string) => {
|
||||||
|
if (element !== "a") {
|
||||||
|
throw new Error("Failed mapping");
|
||||||
|
}
|
||||||
|
return [element, element.toUpperCase()];
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.resume()
|
||||||
|
.on("error", err => {
|
||||||
|
expect(err.message).to.equal("Failed mapping");
|
||||||
|
t.pass();
|
||||||
|
})
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("flatMap() emits errors during asynchronous mapping", t => {
|
||||||
|
t.plan(2);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
source
|
||||||
|
.pipe(
|
||||||
|
flatMap(async (element: string) => {
|
||||||
|
await Promise.resolve();
|
||||||
|
if (element !== "a") {
|
||||||
|
throw new Error("Failed mapping");
|
||||||
|
}
|
||||||
|
return [element, element.toUpperCase()];
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.resume()
|
||||||
|
.on("error", err => {
|
||||||
|
expect(err.message).to.equal("Failed mapping");
|
||||||
|
t.pass();
|
||||||
|
})
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("split() splits chunks using the default separator (\\n)", t => {
|
||||||
|
t.plan(5);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedParts = ["ab", "c", "d", "ef", ""];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(split())
|
||||||
|
.on("data", part => {
|
||||||
|
expect(part).to.equal(expectedParts[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("ab\n");
|
||||||
|
source.push("c");
|
||||||
|
source.push("\n");
|
||||||
|
source.push("d");
|
||||||
|
source.push("\nef\n");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("split() splits chunks using the specified separator", t => {
|
||||||
|
t.plan(6);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedParts = ["ab", "c", "d", "e", "f", ""];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(split("|"))
|
||||||
|
.on("data", part => {
|
||||||
|
expect(part).to.equal(expectedParts[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("ab|");
|
||||||
|
source.push("c|d");
|
||||||
|
source.push("|");
|
||||||
|
source.push("e");
|
||||||
|
source.push("|f|");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("join() joins chunks using the specified separator", t => {
|
||||||
|
t.plan(9);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedParts = ["ab|", "|", "c|d", "|", "|", "|", "e", "|", "|f|"];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(join("|"))
|
||||||
|
.on("data", part => {
|
||||||
|
expect(part).to.equal(expectedParts[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("ab|");
|
||||||
|
source.push("c|d");
|
||||||
|
source.push("|");
|
||||||
|
source.push("e");
|
||||||
|
source.push("|f|");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
test.cb(
|
test.cb(
|
||||||
"collect() collects streamed elements into an array (object, flowing mode)",
|
"collect() collects streamed elements into an array (object, flowing mode)",
|
||||||
t => {
|
t => {
|
||||||
|
117
src/stream.ts
117
src/stream.ts
@ -20,7 +20,122 @@ export function fromArray(array: any[]): NodeJS.ReadableStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a ReadWrite stream that collects streamed objects or bytes into an array or buffer
|
* Return a ReadWrite stream that maps streamed chunks
|
||||||
|
* @param mapper The mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
|
||||||
|
* @param options
|
||||||
|
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
|
||||||
|
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
|
||||||
|
*/
|
||||||
|
export function map<T, R>(
|
||||||
|
mapper: (chunk: T, encoding: string) => R,
|
||||||
|
{ readableObjectMode = true, writableObjectMode = true } = {},
|
||||||
|
): NodeJS.ReadWriteStream {
|
||||||
|
return new Transform({
|
||||||
|
readableObjectMode,
|
||||||
|
writableObjectMode,
|
||||||
|
async transform(chunk, encoding, callback) {
|
||||||
|
let isPromise = false;
|
||||||
|
try {
|
||||||
|
const mapped = mapper(chunk, encoding);
|
||||||
|
isPromise = mapped instanceof Promise;
|
||||||
|
callback(undefined, await mapped);
|
||||||
|
} catch (err) {
|
||||||
|
if (isPromise) {
|
||||||
|
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
|
||||||
|
this.emit("error", err);
|
||||||
|
callback();
|
||||||
|
} else {
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
type FlatMapper<T, R> =
|
||||||
|
| ((chunk: T, encoding: string) => R[])
|
||||||
|
| ((chunk: T, encoding: string) => Promise<R[]>);
|
||||||
|
/**
|
||||||
|
* Return a ReadWrite stream that flat maps streamed chunks
|
||||||
|
* @param mapper The mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
|
||||||
|
* @param options
|
||||||
|
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
|
||||||
|
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
|
||||||
|
*/
|
||||||
|
export function flatMap<T, R>(
|
||||||
|
mapper: FlatMapper<T, R>,
|
||||||
|
{ readableObjectMode = true, writableObjectMode = true } = {},
|
||||||
|
): NodeJS.ReadWriteStream {
|
||||||
|
return new Transform({
|
||||||
|
readableObjectMode,
|
||||||
|
writableObjectMode,
|
||||||
|
async transform(chunk, encoding, callback) {
|
||||||
|
let isPromise = false;
|
||||||
|
try {
|
||||||
|
const mapped = mapper(chunk, encoding);
|
||||||
|
isPromise = mapped instanceof Promise;
|
||||||
|
(await mapped).forEach(c => this.push(c));
|
||||||
|
callback();
|
||||||
|
} catch (err) {
|
||||||
|
if (isPromise) {
|
||||||
|
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
|
||||||
|
this.emit("error", err);
|
||||||
|
callback();
|
||||||
|
} else {
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a ReadWrite stream that splits streamed chunks using the given separator
|
||||||
|
* @param separator The separator to split by, defaulting to "\n"
|
||||||
|
*/
|
||||||
|
export function split(separator: string = "\n"): NodeJS.ReadWriteStream {
|
||||||
|
let buffered: string = "";
|
||||||
|
return new Transform({
|
||||||
|
readableObjectMode: true,
|
||||||
|
writableObjectMode: true,
|
||||||
|
async transform(chunk, encoding, callback) {
|
||||||
|
const splitted = chunk.split(separator);
|
||||||
|
if (buffered.length > 0 && splitted.length > 1) {
|
||||||
|
splitted[0] = buffered.concat(splitted[0]);
|
||||||
|
buffered = "";
|
||||||
|
}
|
||||||
|
buffered += splitted[splitted.length - 1];
|
||||||
|
splitted.slice(0, -1).forEach((part: string) => this.push(part));
|
||||||
|
callback();
|
||||||
|
},
|
||||||
|
flush(callback) {
|
||||||
|
callback(undefined, buffered);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a ReadWrite stream that joins streamed chunks using the given separator
|
||||||
|
* @param separator The separator to join with
|
||||||
|
*/
|
||||||
|
export function join(separator: string): NodeJS.ReadWriteStream {
|
||||||
|
let isFirstChunk = true;
|
||||||
|
return new Transform({
|
||||||
|
readableObjectMode: true,
|
||||||
|
writableObjectMode: true,
|
||||||
|
async transform(chunk, encoding, callback) {
|
||||||
|
if (!isFirstChunk) {
|
||||||
|
this.push(separator);
|
||||||
|
}
|
||||||
|
this.push(chunk);
|
||||||
|
isFirstChunk = false;
|
||||||
|
callback();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
|
||||||
* @param options
|
* @param options
|
||||||
* @param options.objectMode Whether this stream should behave as a stream of objects
|
* @param options.objectMode Whether this stream should behave as a stream of objects
|
||||||
*/
|
*/
|
||||||
|
10
yarn.lock
10
yarn.lock
@ -2059,6 +2059,11 @@ meow@^5.0.0:
|
|||||||
trim-newlines "^2.0.0"
|
trim-newlines "^2.0.0"
|
||||||
yargs-parser "^10.0.0"
|
yargs-parser "^10.0.0"
|
||||||
|
|
||||||
|
mhysa@0.3.6:
|
||||||
|
version "0.3.6"
|
||||||
|
resolved "https://registry.yarnpkg.com/mhysa/-/mhysa-0.3.6.tgz#c33885ca34c5797486ff9af10e33d17e2cd2b2a9"
|
||||||
|
integrity sha512-X7AIhISZ/r5xOHaod3SwLCTXr11ttu/fDiKQPpsI0p/RsTTqweBR7hGJeRIF9Rt6XMW/UY21pJEf/ftUkfvkHg==
|
||||||
|
|
||||||
micromatch@^3.1.10, micromatch@^3.1.4:
|
micromatch@^3.1.10, micromatch@^3.1.4:
|
||||||
version "3.1.10"
|
version "3.1.10"
|
||||||
resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-3.1.10.tgz#70859bc95c9840952f359a068a3fc49f9ecfac23"
|
resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-3.1.10.tgz#70859bc95c9840952f359a068a3fc49f9ecfac23"
|
||||||
@ -2158,11 +2163,6 @@ multimatch@^2.1.0:
|
|||||||
arrify "^1.0.0"
|
arrify "^1.0.0"
|
||||||
minimatch "^3.0.0"
|
minimatch "^3.0.0"
|
||||||
|
|
||||||
mhysa@^0.3.5:
|
|
||||||
version "0.3.5"
|
|
||||||
resolved "https://registry.yarnpkg.com/mhysa/-/mhysa-0.3.5.tgz#79305e1834db166d1daa02dac93072adaa63d6c2"
|
|
||||||
integrity sha512-Fz31nflyS0G2uwKlunpnKU+M9HUPxK89IWYjsNL3ENgunMlMOq1ELtpGGiTSLrbVV/N5Mc9j51kvraEbdPKgzQ==
|
|
||||||
|
|
||||||
nan@^2.9.2:
|
nan@^2.9.2:
|
||||||
version "2.11.1"
|
version "2.11.1"
|
||||||
resolved "https://registry.yarnpkg.com/nan/-/nan-2.11.1.tgz#90e22bccb8ca57ea4cd37cc83d3819b52eea6766"
|
resolved "https://registry.yarnpkg.com/nan/-/nan-2.11.1.tgz#90e22bccb8ca57ea4cd37cc83d3819b52eea6766"
|
||||||
|
Loading…
Reference in New Issue
Block a user