Compare commits
97 Commits
feature/ba
...
feature/de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c75ef88b4 | ||
|
|
7113400cb1 | ||
|
|
a42560edfc | ||
|
|
58a95a91d0 | ||
|
|
71b03678ba | ||
|
|
f661f9be6b | ||
|
|
ed73bd2887 | ||
|
|
2841f4e182 | ||
|
|
12efbec698 | ||
|
|
ce2bb55b24 | ||
|
|
2bbc5c9e0f | ||
|
|
8856cb8d3b | ||
|
|
2b1308a605 | ||
|
|
11ed6f81e7 | ||
|
|
cf719b25cf | ||
|
|
9c09957775 | ||
|
|
bff4e0d6ed | ||
|
|
bd178ce2f0 | ||
|
|
1227ce7095 | ||
|
|
179d526c6c | ||
|
|
4b806c4d4e | ||
|
|
ff2b652ddf | ||
|
|
107bc17bd4 | ||
|
|
1b3be03db3 | ||
|
|
fddaa03300 | ||
|
|
57645c68da | ||
|
|
a45a144854 | ||
|
|
7ab8541cf6 | ||
|
|
a85054fd82 | ||
|
|
c690185ab7 | ||
|
|
9b31479406 | ||
|
|
298a8b328d | ||
|
|
1794910b64 | ||
|
|
e08558ca88 | ||
|
|
ac21fb7ea6 | ||
|
|
9e14d8c044 | ||
|
|
4f80d44ed8 | ||
|
|
b8bd69eb01 | ||
|
|
f6e3a03eb7 | ||
|
|
f177f95f52 | ||
|
|
a11aa10d16 | ||
|
|
70edee51c4 | ||
|
|
158475183a | ||
|
|
48a231d61c | ||
|
|
4c7e9ceb7e | ||
|
|
517e281ce5 | ||
|
|
586f618e95 | ||
|
|
65c36a8f22 | ||
|
|
ce19c5e987 | ||
|
|
f06cb1c33e | ||
|
|
dcfd6fe4c2 | ||
|
|
9d280b1662 | ||
|
|
ee3d9b9ded | ||
|
|
83ef6e9734 | ||
|
|
7aeea4815a | ||
|
|
d33d8dcad3 | ||
|
|
eed36a4fe9 | ||
|
|
ea2ffdb38c | ||
|
|
0067ba6a7c | ||
|
|
599ba16d48 | ||
|
|
2cbeae38e7 | ||
|
|
ae7c9d6b09 | ||
|
|
cd10649d44 | ||
|
|
d5f3fd8bd8 | ||
|
|
2ee04a2d79 | ||
|
|
fe0e53147c | ||
|
|
2524d51aa7 | ||
|
|
9765e6cb49 | ||
|
|
685215bee6 | ||
|
|
9b09a3f949 | ||
|
|
c7903376e9 | ||
|
|
f35f025dbc | ||
|
|
1d0e15890c | ||
|
|
d097fa6aa5 | ||
|
|
1e7fad2403 | ||
|
|
6581e1d745 | ||
|
|
7394b6ef84 | ||
|
|
50f6886b4b | ||
|
|
4e80e48fa4 | ||
|
|
047ff66ee1 | ||
|
|
faac6134af | ||
|
|
505fefeeb5 | ||
|
|
d6d974ee0d | ||
|
|
5a9fcc94a6 | ||
|
|
27b4b2427b | ||
|
|
6a9f6ff919 | ||
|
|
a40b1bf38c | ||
|
|
3a1fbf44d7 | ||
|
|
c72ecaf219 | ||
|
|
e8d672d903 | ||
|
|
5112ee9540 | ||
|
|
e932adde67 | ||
|
|
fdcc5bafc6 | ||
|
|
c1ef5fec4b | ||
|
|
a60b23496b | ||
|
|
d918d8ca10 | ||
|
|
af9293ab52 |
11
.gitignore
vendored
11
.gitignore
vendored
@@ -4,3 +4,14 @@ dist
|
||||
sample_output
|
||||
yarn-error.log
|
||||
TODO.md
|
||||
|
||||
|
||||
#VIM
|
||||
## Swap
|
||||
[._]*.s[a-v][a-z]
|
||||
!*.svg # comment out if you don't need vector files
|
||||
[._]*.sw[a-p]
|
||||
[._]s[a-rt-v][a-z]
|
||||
[._]ss[a-gi-z]
|
||||
[._]sw[a-p]
|
||||
*.orig
|
||||
|
||||
35
package.json
35
package.json
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "mhysa",
|
||||
"version": "1.0.2",
|
||||
"name": "@jogogo/mhysa",
|
||||
"version": "2.0.0-alpha.4",
|
||||
"description": "Streams and event emitter utils for Node.js",
|
||||
"keywords": [
|
||||
"promise",
|
||||
@@ -11,40 +11,55 @@
|
||||
"author": {
|
||||
"name": "Wenzil"
|
||||
},
|
||||
"contributors": [
|
||||
{
|
||||
"name": "jerry",
|
||||
"email": "jerry@jogogo.co"
|
||||
},
|
||||
{
|
||||
"name": "lewis",
|
||||
"email": "lewis@jogogo.co"
|
||||
}
|
||||
],
|
||||
"license": "MIT",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"files": [
|
||||
"dist"
|
||||
],
|
||||
"publishConfig": {
|
||||
"registry": "https://npm.dev.jogogo.co/"
|
||||
},
|
||||
"repository": {
|
||||
"url": "git@github.com:Wenzil/Mhysa.git",
|
||||
"url": "git@github.com:Jogogoplay/mhysa.git",
|
||||
"type": "git"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "ava",
|
||||
"lint": "tslint -p tsconfig.json",
|
||||
"validate:tslint": "tslint-config-prettier-check ./tslint.json",
|
||||
"prepublishOnly": "yarn lint && yarn test && yarn tsc"
|
||||
"prepublishOnly": "yarn lint && yarn test && yarn tsc -d"
|
||||
},
|
||||
"dependencies": {},
|
||||
"devDependencies": {
|
||||
"@types/chai": "^4.1.7",
|
||||
"@types/node": "^10.12.10",
|
||||
"@types/typescript": "^2.0.0",
|
||||
"ava": "^1.0.0-rc.2",
|
||||
"@types/node": "^12.12.15",
|
||||
"@types/sinon": "^7.0.13",
|
||||
"ava": "^2.4.0",
|
||||
"chai": "^4.2.0",
|
||||
"mhysa": "./",
|
||||
"prettier": "^1.14.3",
|
||||
"ts-node": "^7.0.1",
|
||||
"sinon": "^7.4.2",
|
||||
"ts-node": "^8.3.0",
|
||||
"tslint": "^5.11.0",
|
||||
"tslint-config-prettier": "^1.16.0",
|
||||
"tslint-plugin-prettier": "^2.0.1",
|
||||
"typescript": "^3.1.6"
|
||||
"typescript": "^3.5.3"
|
||||
},
|
||||
"ava": {
|
||||
"files": [
|
||||
"src/**/*.spec.ts"
|
||||
"tests/*.spec.ts",
|
||||
"tests/utils/*.spec.ts"
|
||||
],
|
||||
"sources": [
|
||||
"src/**/*.ts"
|
||||
|
||||
180
src/functions/accumulator.ts
Normal file
180
src/functions/accumulator.ts
Normal file
@@ -0,0 +1,180 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
|
||||
export enum FlushStrategy {
|
||||
rolling = "rolling",
|
||||
sliding = "sliding",
|
||||
}
|
||||
|
||||
export type AccumulatorByIteratee<T> = (event: T, bufferChunk: T) => boolean;
|
||||
|
||||
function _accumulator<T>(
|
||||
accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
|
||||
shouldFlush: boolean = true,
|
||||
options: TransformOptions = {},
|
||||
) {
|
||||
const buffer: T[] = [];
|
||||
return new Transform({
|
||||
...options,
|
||||
transform(data: T, encoding, callback) {
|
||||
accumulateBy(data, buffer, this);
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
if (shouldFlush) {
|
||||
this.push(buffer);
|
||||
}
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function _sliding<T>(
|
||||
windowLength: number,
|
||||
key?: string,
|
||||
): (event: T, buffer: T[], stream: Transform) => void {
|
||||
return (event: T, buffer: T[], stream: Transform) => {
|
||||
if (key) {
|
||||
let index = 0;
|
||||
if (event[key] === undefined) {
|
||||
stream.emit(
|
||||
"error",
|
||||
new Error(
|
||||
`Key is missing in event: (${key}, ${JSON.stringify(
|
||||
event,
|
||||
)})`,
|
||||
),
|
||||
);
|
||||
stream.resume();
|
||||
return;
|
||||
}
|
||||
while (
|
||||
index < buffer.length &&
|
||||
buffer[index][key] + windowLength <= event[key]
|
||||
) {
|
||||
index++;
|
||||
}
|
||||
buffer.splice(0, index);
|
||||
} else if (buffer.length === windowLength) {
|
||||
buffer.shift();
|
||||
}
|
||||
buffer.push(event);
|
||||
stream.push([...buffer]);
|
||||
};
|
||||
}
|
||||
|
||||
function _slidingByFunction<T>(
|
||||
iteratee: AccumulatorByIteratee<T>,
|
||||
): (event: T, buffer: T[], stream: Transform) => void {
|
||||
return (event: T, buffer: T[], stream: Transform) => {
|
||||
let index = 0;
|
||||
while (index < buffer.length && iteratee(event, buffer[index])) {
|
||||
index++;
|
||||
}
|
||||
buffer.splice(0, index);
|
||||
buffer.push(event);
|
||||
stream.push([...buffer]);
|
||||
};
|
||||
}
|
||||
|
||||
function _rollingByFunction<T>(
|
||||
iteratee: AccumulatorByIteratee<T>,
|
||||
): (event: T, buffer: T[], stream: Transform) => void {
|
||||
return (event: T, buffer: T[], stream: Transform) => {
|
||||
if (iteratee) {
|
||||
if (buffer.length > 0 && iteratee(event, buffer[0])) {
|
||||
stream.push(buffer.slice(0));
|
||||
buffer.length = 0;
|
||||
}
|
||||
}
|
||||
buffer.push(event);
|
||||
};
|
||||
}
|
||||
|
||||
function _rolling<T>(
|
||||
windowLength: number,
|
||||
key?: string,
|
||||
): (event: T, buffer: T[], stream: Transform) => void {
|
||||
return (event: T, buffer: T[], stream: Transform) => {
|
||||
if (key) {
|
||||
if (event[key] === undefined) {
|
||||
stream.emit(
|
||||
"error",
|
||||
new Error(
|
||||
`Key is missing in event: (${key}, ${JSON.stringify(
|
||||
event,
|
||||
)})`,
|
||||
),
|
||||
);
|
||||
stream.resume();
|
||||
return;
|
||||
} else if (
|
||||
buffer.length > 0 &&
|
||||
buffer[0][key] + windowLength <= event[key]
|
||||
) {
|
||||
stream.push(buffer.slice(0));
|
||||
buffer.length = 0;
|
||||
}
|
||||
} else if (buffer.length === windowLength) {
|
||||
stream.push(buffer.slice(0));
|
||||
buffer.length = 0;
|
||||
}
|
||||
buffer.push(event);
|
||||
};
|
||||
}
|
||||
|
||||
export function accumulator(
|
||||
flushStrategy: FlushStrategy,
|
||||
batchSize: number,
|
||||
keyBy?: string,
|
||||
options?: TransformOptions,
|
||||
): Transform {
|
||||
switch (flushStrategy) {
|
||||
case FlushStrategy.sliding:
|
||||
return sliding(batchSize, keyBy, options);
|
||||
case FlushStrategy.rolling:
|
||||
return rolling(batchSize, keyBy, options);
|
||||
}
|
||||
}
|
||||
|
||||
export function accumulatorBy<T>(
|
||||
flushStrategy: FlushStrategy,
|
||||
iteratee: AccumulatorByIteratee<T>,
|
||||
options?: TransformOptions,
|
||||
): Transform {
|
||||
switch (flushStrategy) {
|
||||
case FlushStrategy.sliding:
|
||||
return slidingBy(iteratee, options);
|
||||
case FlushStrategy.rolling:
|
||||
return rollingBy(iteratee, options);
|
||||
}
|
||||
}
|
||||
|
||||
function sliding(
|
||||
windowLength: number,
|
||||
key?: string,
|
||||
options?: TransformOptions,
|
||||
): Transform {
|
||||
return _accumulator(_sliding(windowLength, key), false, options);
|
||||
}
|
||||
|
||||
function slidingBy<T>(
|
||||
iteratee: AccumulatorByIteratee<T>,
|
||||
options?: TransformOptions,
|
||||
): Transform {
|
||||
return _accumulator(_slidingByFunction(iteratee), false, options);
|
||||
}
|
||||
|
||||
function rolling(
|
||||
windowLength: number,
|
||||
key?: string,
|
||||
options?: TransformOptions,
|
||||
): Transform {
|
||||
return _accumulator(_rolling(windowLength, key), true, options);
|
||||
}
|
||||
|
||||
function rollingBy<T>(
|
||||
iteratee: AccumulatorByIteratee<T>,
|
||||
options?: TransformOptions,
|
||||
): Transform {
|
||||
return _accumulator(_rollingByFunction(iteratee), true, options);
|
||||
}
|
||||
@@ -1,12 +1,3 @@
|
||||
export interface ThroughOptions {
|
||||
objectMode?: boolean;
|
||||
}
|
||||
|
||||
export interface TransformOptions {
|
||||
readableObjectMode?: boolean;
|
||||
writableObjectMode?: boolean;
|
||||
}
|
||||
|
||||
export interface WithEncoding {
|
||||
encoding: string;
|
||||
}
|
||||
@@ -21,4 +12,3 @@ export type JsonValue = JsonPrimitive | JsonPrimitive[];
|
||||
export interface JsonParseOptions {
|
||||
pretty: boolean;
|
||||
}
|
||||
|
||||
40
src/functions/batch.ts
Normal file
40
src/functions/batch.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
|
||||
export function batch(
|
||||
batchSize: number = 1000,
|
||||
maxBatchAge: number = 500,
|
||||
options: TransformOptions = {},
|
||||
): Transform {
|
||||
let buffer: any[] = [];
|
||||
let timer: NodeJS.Timer | null = null;
|
||||
const sendChunk = (self: Transform) => {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
timer = null;
|
||||
if (buffer.length > 0) {
|
||||
self.push(buffer);
|
||||
}
|
||||
buffer = [];
|
||||
};
|
||||
return new Transform({
|
||||
...options,
|
||||
transform(chunk, encoding, callback) {
|
||||
buffer.push(chunk);
|
||||
if (buffer.length === batchSize) {
|
||||
sendChunk(this);
|
||||
} else {
|
||||
if (timer === null) {
|
||||
timer = setInterval(() => {
|
||||
sendChunk(this);
|
||||
}, maxBatchAge);
|
||||
}
|
||||
}
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
sendChunk(this);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
11
src/functions/child.ts
Normal file
11
src/functions/child.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { ChildProcess } from "child_process";
|
||||
import { duplex } from "./duplex";
|
||||
|
||||
export function child(childProcess: ChildProcess) {
|
||||
if (childProcess.stdin === null) {
|
||||
throw new Error("childProcess.stdin is null");
|
||||
} else if (childProcess.stdout === null) {
|
||||
throw new Error("childProcess.stdout is null");
|
||||
}
|
||||
return duplex(childProcess.stdin, childProcess.stdout);
|
||||
}
|
||||
18
src/functions/collect.ts
Normal file
18
src/functions/collect.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
|
||||
export function collect(options: TransformOptions = {}): Transform {
|
||||
const collected: any[] = [];
|
||||
return new Transform({
|
||||
...options,
|
||||
transform(data, encoding, callback) {
|
||||
collected.push(data);
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
this.push(
|
||||
options.objectMode ? collected : Buffer.concat(collected),
|
||||
);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
88
src/functions/compose.ts
Normal file
88
src/functions/compose.ts
Normal file
@@ -0,0 +1,88 @@
|
||||
import { AllStreams, isReadable } from "../helpers";
|
||||
import { PassThrough, pipeline, TransformOptions, Transform } from "stream";
|
||||
|
||||
export function compose(
|
||||
streams: Array<
|
||||
NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream
|
||||
>,
|
||||
errorCallback?: (err: any) => void,
|
||||
options?: TransformOptions,
|
||||
): Compose {
|
||||
if (streams.length < 2) {
|
||||
throw new Error("At least two streams are required to compose");
|
||||
}
|
||||
|
||||
return new Compose(streams, errorCallback, options);
|
||||
}
|
||||
|
||||
enum EventSubscription {
|
||||
Last = 0,
|
||||
First,
|
||||
All,
|
||||
Self,
|
||||
}
|
||||
|
||||
export class Compose extends Transform {
|
||||
private first: AllStreams;
|
||||
private last: AllStreams;
|
||||
private streams: AllStreams[];
|
||||
private inputStream: ReadableStream;
|
||||
|
||||
constructor(
|
||||
streams: AllStreams[],
|
||||
errorCallback?: (err: any) => void,
|
||||
options?: TransformOptions,
|
||||
) {
|
||||
super(options);
|
||||
this.first = new PassThrough(options);
|
||||
this.last = streams[streams.length - 1];
|
||||
this.streams = streams;
|
||||
pipeline(
|
||||
[this.first, ...streams],
|
||||
errorCallback ||
|
||||
((error: any) => {
|
||||
if (error) {
|
||||
this.emit("error", error);
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
if (isReadable(this.last)) {
|
||||
(this.last as NodeJS.ReadWriteStream).pipe(
|
||||
new Transform({
|
||||
...options,
|
||||
transform: (d: any, encoding, cb) => {
|
||||
this.push(d);
|
||||
cb();
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public _transform(chunk: any, encoding: string, cb: any) {
|
||||
(this.first as NodeJS.WritableStream).write(chunk, encoding, cb);
|
||||
}
|
||||
|
||||
public _flush(cb: any) {
|
||||
if (isReadable(this.first)) {
|
||||
(this.first as any).push(null);
|
||||
}
|
||||
this.last.once("end", () => {
|
||||
cb();
|
||||
});
|
||||
}
|
||||
|
||||
public _destroy(error: any, cb: (error?: any) => void) {
|
||||
this.streams.forEach(s => (s as any).destroy());
|
||||
cb(error);
|
||||
}
|
||||
|
||||
public bubble(...events: string[]) {
|
||||
this.streams.forEach(s => {
|
||||
events.forEach(e => {
|
||||
s.on(e, (...args) => super.emit(e, ...args));
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
37
src/functions/concat.ts
Normal file
37
src/functions/concat.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import { Readable } from "stream";
|
||||
|
||||
export function concat(...streams: NodeJS.ReadableStream[]): Readable {
|
||||
let isStarted = false;
|
||||
let currentStreamIndex = 0;
|
||||
const startCurrentStream = () => {
|
||||
if (currentStreamIndex >= streams.length) {
|
||||
wrapper.push(null);
|
||||
} else {
|
||||
streams[currentStreamIndex]
|
||||
.on("data", chunk => {
|
||||
if (!wrapper.push(chunk)) {
|
||||
streams[currentStreamIndex].pause();
|
||||
}
|
||||
})
|
||||
.on("error", err => wrapper.emit("error", err))
|
||||
.on("end", () => {
|
||||
currentStreamIndex++;
|
||||
startCurrentStream();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const wrapper = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
if (!isStarted) {
|
||||
isStarted = true;
|
||||
startCurrentStream();
|
||||
}
|
||||
if (currentStreamIndex < streams.length) {
|
||||
streams[currentStreamIndex].resume();
|
||||
}
|
||||
},
|
||||
});
|
||||
return wrapper;
|
||||
}
|
||||
131
src/functions/demux.ts
Normal file
131
src/functions/demux.ts
Normal file
@@ -0,0 +1,131 @@
|
||||
import { DuplexOptions, Duplex, Transform } from "stream";
|
||||
|
||||
import { isReadable } from "../helpers";
|
||||
|
||||
type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
|
||||
|
||||
export interface DemuxOptions extends DuplexOptions {
|
||||
remultiplex?: boolean;
|
||||
}
|
||||
|
||||
export function demux(
|
||||
pipelineConstructor: (
|
||||
destKey?: string,
|
||||
chunk?: any,
|
||||
) => DemuxStreams | DemuxStreams[],
|
||||
demuxBy: string | ((chunk: any) => string),
|
||||
options?: DemuxOptions,
|
||||
): Duplex {
|
||||
return new Demux(pipelineConstructor, demuxBy, options);
|
||||
}
|
||||
|
||||
class Demux extends Duplex {
|
||||
private streamsByKey: {
|
||||
[key: string]: DemuxStreams[];
|
||||
};
|
||||
private demuxer: (chunk: any) => string;
|
||||
private pipelineConstructor: (
|
||||
destKey?: string,
|
||||
chunk?: any,
|
||||
) => DemuxStreams[];
|
||||
private remultiplex: boolean;
|
||||
private transform: Transform;
|
||||
constructor(
|
||||
pipelineConstructor: (
|
||||
destKey?: string,
|
||||
chunk?: any,
|
||||
) => DemuxStreams | DemuxStreams[],
|
||||
demuxBy: string | ((chunk: any) => string),
|
||||
options: DemuxOptions = {},
|
||||
) {
|
||||
super(options);
|
||||
this.demuxer =
|
||||
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
|
||||
this.pipelineConstructor = (destKey: string, chunk?: any) => {
|
||||
const pipeline = pipelineConstructor(destKey, chunk);
|
||||
return Array.isArray(pipeline) ? pipeline : [pipeline];
|
||||
};
|
||||
this.remultiplex =
|
||||
options.remultiplex === undefined ? true : options.remultiplex;
|
||||
this.streamsByKey = {};
|
||||
this.transform = new Transform({
|
||||
...options,
|
||||
transform: (d, _, cb) => {
|
||||
this.push(d);
|
||||
cb(null);
|
||||
},
|
||||
});
|
||||
|
||||
this.on("unpipe", () => this._flush());
|
||||
}
|
||||
|
||||
// tslint:disable-next-line
|
||||
public _read(size: number) {}
|
||||
|
||||
public async _write(chunk: any, encoding: any, cb: any) {
|
||||
const destKey = this.demuxer(chunk);
|
||||
if (this.streamsByKey[destKey] === undefined) {
|
||||
const newPipelines = this.pipelineConstructor(destKey, chunk);
|
||||
this.streamsByKey[destKey] = newPipelines;
|
||||
|
||||
newPipelines.forEach(newPipeline => {
|
||||
if (this.remultiplex && isReadable(newPipeline)) {
|
||||
(newPipeline as NodeJS.ReadWriteStream).pipe(
|
||||
this.transform,
|
||||
);
|
||||
} else if (this.remultiplex) {
|
||||
console.error(
|
||||
`Pipeline construct for ${destKey} does not implement readable interface`,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
const pipelines = this.streamsByKey[destKey];
|
||||
const pendingDrains: Array<Promise<any>> = [];
|
||||
|
||||
pipelines.forEach(pipeline => {
|
||||
if (!pipeline.write(chunk, encoding)) {
|
||||
pendingDrains.push(
|
||||
new Promise(resolve => {
|
||||
pipeline.once("drain", () => {
|
||||
resolve();
|
||||
});
|
||||
}),
|
||||
);
|
||||
}
|
||||
});
|
||||
await Promise.all(pendingDrains);
|
||||
cb();
|
||||
}
|
||||
|
||||
public _flush() {
|
||||
const pipelines: DemuxStreams[] = [].concat.apply(
|
||||
[],
|
||||
Object.values(this.streamsByKey),
|
||||
);
|
||||
const flushPromises: Array<Promise<void>> = [];
|
||||
pipelines.forEach(pipeline => {
|
||||
flushPromises.push(
|
||||
new Promise(resolve => {
|
||||
pipeline.once("end", () => {
|
||||
resolve();
|
||||
});
|
||||
}),
|
||||
);
|
||||
});
|
||||
pipelines.forEach(pipeline => pipeline.end());
|
||||
Promise.all(flushPromises).then(() => {
|
||||
this.push(null);
|
||||
this.emit("end");
|
||||
});
|
||||
}
|
||||
|
||||
public _destroy(error: any, cb: (error?: any) => void) {
|
||||
const pipelines: DemuxStreams[] = [].concat.apply(
|
||||
[],
|
||||
Object.values(this.streamsByKey),
|
||||
);
|
||||
pipelines.forEach(p => (p as any).destroy());
|
||||
cb(error);
|
||||
}
|
||||
}
|
||||
31
src/functions/duplex.ts
Normal file
31
src/functions/duplex.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { Duplex } from "stream";
|
||||
|
||||
export function duplex(
|
||||
writable: NodeJS.WritableStream,
|
||||
readable: NodeJS.ReadableStream,
|
||||
) {
|
||||
const wrapper = new Duplex({
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
read() {
|
||||
readable.resume();
|
||||
},
|
||||
write(chunk, encoding, callback) {
|
||||
return writable.write(chunk, encoding, callback);
|
||||
},
|
||||
final(callback) {
|
||||
writable.end(callback);
|
||||
},
|
||||
});
|
||||
readable
|
||||
.on("data", chunk => {
|
||||
if (!wrapper.push(chunk)) {
|
||||
readable.pause();
|
||||
}
|
||||
})
|
||||
.on("error", err => wrapper.emit("error", err))
|
||||
.on("end", () => wrapper.push(null));
|
||||
writable.on("drain", () => wrapper.emit("drain"));
|
||||
writable.on("error", err => wrapper.emit("error", err));
|
||||
return wrapper;
|
||||
}
|
||||
20
src/functions/filter.ts
Normal file
20
src/functions/filter.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
|
||||
export function filter<T>(
|
||||
predicate:
|
||||
| ((chunk: T, encoding: string) => boolean)
|
||||
| ((chunk: T, encoding: string) => Promise<boolean>),
|
||||
options?: TransformOptions,
|
||||
) {
|
||||
return new Transform({
|
||||
...options,
|
||||
async transform(chunk: T, encoding?: any, callback?: any) {
|
||||
const result = await predicate(chunk, encoding);
|
||||
if (result === true) {
|
||||
callback(null, chunk);
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
16
src/functions/flatMap.ts
Normal file
16
src/functions/flatMap.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
|
||||
export function flatMap<T, R>(
|
||||
mapper:
|
||||
| ((chunk: T, encoding: string) => R[])
|
||||
| ((chunk: T, encoding: string) => Promise<R[]>),
|
||||
options?: TransformOptions,
|
||||
): Transform {
|
||||
return new Transform({
|
||||
...options,
|
||||
async transform(chunk: T, encoding, callback) {
|
||||
(await mapper(chunk, encoding)).forEach(c => this.push(c));
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
16
src/functions/fromArray.ts
Normal file
16
src/functions/fromArray.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
import { Readable } from "stream";
|
||||
|
||||
export function fromArray(array: any[]): Readable {
|
||||
let cursor = 0;
|
||||
return new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
if (cursor < array.length) {
|
||||
this.push(array[cursor]);
|
||||
cursor++;
|
||||
} else {
|
||||
this.push(null);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,604 +0,0 @@
|
||||
import { Transform, Readable, Writable, Duplex } from "stream";
|
||||
import { performance } from "perf_hooks";
|
||||
import { ChildProcess } from "child_process";
|
||||
import { StringDecoder } from "string_decoder";
|
||||
|
||||
import {
|
||||
TransformOptions,
|
||||
ThroughOptions,
|
||||
WithEncoding,
|
||||
SerializationFormats,
|
||||
JsonValue,
|
||||
JsonParseOptions,
|
||||
} from "./definitions";
|
||||
import { sleep } from "../helpers";
|
||||
|
||||
/**
|
||||
* Convert an array into a Readable stream of its elements
|
||||
* @param array Array of elements to stream
|
||||
*/
|
||||
export function fromArray(array: any[]): NodeJS.ReadableStream {
|
||||
let cursor = 0;
|
||||
return new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
if (cursor < array.length) {
|
||||
this.push(array[cursor]);
|
||||
cursor++;
|
||||
} else {
|
||||
this.push(null);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that maps streamed chunks
|
||||
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
|
||||
* @param options
|
||||
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
|
||||
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
|
||||
*/
|
||||
export function map<T, R>(
|
||||
mapper: (chunk: T, encoding: string) => R,
|
||||
options: TransformOptions = {
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
},
|
||||
): NodeJS.ReadWriteStream {
|
||||
return new Transform({
|
||||
...options,
|
||||
async transform(chunk: T, encoding, callback) {
|
||||
try {
|
||||
const mapped = await mapper(chunk, encoding);
|
||||
this.push(mapped);
|
||||
callback();
|
||||
} catch (err) {
|
||||
callback(err);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that flat maps streamed chunks
|
||||
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
|
||||
* @param options
|
||||
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
|
||||
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
|
||||
*/
|
||||
export function flatMap<T, R>(
|
||||
mapper:
|
||||
| ((chunk: T, encoding: string) => R[])
|
||||
| ((chunk: T, encoding: string) => Promise<R[]>),
|
||||
options: TransformOptions = {
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
},
|
||||
): NodeJS.ReadWriteStream {
|
||||
return new Transform({
|
||||
...options,
|
||||
async transform(chunk: T, encoding, callback) {
|
||||
let isPromise = false;
|
||||
try {
|
||||
const mapped = mapper(chunk, encoding);
|
||||
isPromise = mapped instanceof Promise;
|
||||
(await mapped).forEach(c => this.push(c));
|
||||
callback();
|
||||
} catch (err) {
|
||||
if (isPromise) {
|
||||
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
|
||||
this.emit("error", err);
|
||||
callback();
|
||||
} else {
|
||||
callback(err);
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
|
||||
* @param predicate Predicate with which to filter scream chunks
|
||||
* @param options
|
||||
* @param options.objectMode Whether this stream should behave as a stream of objects
|
||||
*/
|
||||
export function filter<T>(
|
||||
predicate:
|
||||
| ((chunk: T, encoding: string) => boolean)
|
||||
| ((chunk: T, encoding: string) => Promise<boolean>),
|
||||
options: ThroughOptions = {
|
||||
objectMode: true,
|
||||
},
|
||||
) {
|
||||
return new Transform({
|
||||
readableObjectMode: options.objectMode,
|
||||
writableObjectMode: options.objectMode,
|
||||
async transform(chunk: T, encoding, callback) {
|
||||
let isPromise = false;
|
||||
try {
|
||||
const result = predicate(chunk, encoding);
|
||||
isPromise = result instanceof Promise;
|
||||
if (!!(await result)) {
|
||||
callback(undefined, chunk);
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
} catch (err) {
|
||||
if (isPromise) {
|
||||
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
|
||||
this.emit("error", err);
|
||||
callback();
|
||||
} else {
|
||||
callback(err);
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
|
||||
* value
|
||||
* @param iteratee Reducer function to apply on each streamed chunk
|
||||
* @param initialValue Initial value
|
||||
* @param options
|
||||
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
|
||||
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
|
||||
*/
|
||||
export function reduce<T, R>(
|
||||
iteratee:
|
||||
| ((previousValue: R, chunk: T, encoding: string) => R)
|
||||
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
|
||||
initialValue: R,
|
||||
options: TransformOptions = {
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
},
|
||||
) {
|
||||
let value = initialValue;
|
||||
return new Transform({
|
||||
readableObjectMode: options.readableObjectMode,
|
||||
writableObjectMode: options.writableObjectMode,
|
||||
async transform(chunk: T, encoding, callback) {
|
||||
let isPromise = false;
|
||||
try {
|
||||
const result = iteratee(value, chunk, encoding);
|
||||
isPromise = result instanceof Promise;
|
||||
value = await result;
|
||||
callback();
|
||||
} catch (err) {
|
||||
if (isPromise) {
|
||||
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
|
||||
this.emit("error", err);
|
||||
callback();
|
||||
} else {
|
||||
callback(err);
|
||||
}
|
||||
}
|
||||
},
|
||||
flush(callback) {
|
||||
// Best effort attempt at yielding the final value (will throw if e.g. yielding an object and
|
||||
// downstream doesn't expect objects)
|
||||
try {
|
||||
callback(undefined, value);
|
||||
} catch (err) {
|
||||
try {
|
||||
this.emit("error", err);
|
||||
} catch {
|
||||
// Best effort was made
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that splits streamed chunks using the given separator
|
||||
* @param separator Separator to split by, defaulting to "\n"
|
||||
* @param options
|
||||
* @param options.encoding Encoding written chunks are assumed to use
|
||||
*/
|
||||
export function split(
|
||||
separator: string | RegExp = "\n",
|
||||
options: WithEncoding = { encoding: "utf8" },
|
||||
): NodeJS.ReadWriteStream {
|
||||
let buffered = "";
|
||||
const decoder = new StringDecoder(options.encoding);
|
||||
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
transform(chunk: Buffer, encoding, callback) {
|
||||
const asString = decoder.write(chunk);
|
||||
const splitted = asString.split(separator);
|
||||
if (splitted.length > 1) {
|
||||
splitted[0] = buffered.concat(splitted[0]);
|
||||
buffered = "";
|
||||
}
|
||||
buffered += splitted[splitted.length - 1];
|
||||
splitted.slice(0, -1).forEach((part: string) => this.push(part));
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
callback(undefined, buffered + decoder.end());
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that joins streamed chunks using the given separator
|
||||
* @param separator Separator to join with
|
||||
* @param options
|
||||
* @param options.encoding Encoding written chunks are assumed to use
|
||||
*/
|
||||
export function join(
|
||||
separator: string,
|
||||
options: WithEncoding = { encoding: "utf8" },
|
||||
): NodeJS.ReadWriteStream {
|
||||
let isFirstChunk = true;
|
||||
const decoder = new StringDecoder(options.encoding);
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
async transform(chunk: Buffer, encoding, callback) {
|
||||
const asString = decoder.write(chunk);
|
||||
// Take care not to break up multi-byte characters spanning multiple chunks
|
||||
if (asString !== "" || chunk.length === 0) {
|
||||
if (!isFirstChunk) {
|
||||
this.push(separator);
|
||||
}
|
||||
this.push(asString);
|
||||
isFirstChunk = false;
|
||||
}
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
|
||||
* the streamed chunks with the specified replacement string
|
||||
* @param searchValue Search string to use
|
||||
* @param replaceValue Replacement string to use
|
||||
* @param options
|
||||
* @param options.encoding Encoding written chunks are assumed to use
|
||||
*/
|
||||
export function replace(
|
||||
searchValue: string | RegExp,
|
||||
replaceValue: string,
|
||||
options: WithEncoding = { encoding: "utf8" },
|
||||
): NodeJS.ReadWriteStream {
|
||||
const decoder = new StringDecoder(options.encoding);
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
transform(chunk: Buffer, encoding, callback) {
|
||||
const asString = decoder.write(chunk);
|
||||
// Take care not to break up multi-byte characters spanning multiple chunks
|
||||
if (asString !== "" || chunk.length === 0) {
|
||||
callback(
|
||||
undefined,
|
||||
asString.replace(searchValue, replaceValue),
|
||||
);
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
|
||||
* must be a fully defined JSON string.
|
||||
* @param format Format of serialized data, only utf8 supported.
|
||||
*/
|
||||
export function parse(
|
||||
format: SerializationFormats = SerializationFormats.utf8,
|
||||
): NodeJS.ReadWriteStream {
|
||||
const decoder = new StringDecoder(format);
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
async transform(chunk: Buffer, encoding, callback) {
|
||||
try {
|
||||
const asString = decoder.write(chunk);
|
||||
// Using await causes parsing errors to be emitted
|
||||
callback(undefined, await JSON.parse(asString));
|
||||
} catch (err) {
|
||||
callback(err);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
|
||||
*/
|
||||
export function stringify(
|
||||
options: JsonParseOptions = { pretty: false },
|
||||
): NodeJS.ReadWriteStream {
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
transform(chunk: JsonValue, encoding, callback) {
|
||||
callback(
|
||||
undefined,
|
||||
options.pretty
|
||||
? JSON.stringify(chunk, null, 2)
|
||||
: JSON.stringify(chunk),
|
||||
);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
|
||||
* @param options
|
||||
* @param options.objectMode Whether this stream should behave as a stream of objects
|
||||
*/
|
||||
export function collect(
|
||||
options: ThroughOptions = { objectMode: false },
|
||||
): NodeJS.ReadWriteStream {
|
||||
const collected: any[] = [];
|
||||
return new Transform({
|
||||
readableObjectMode: options.objectMode,
|
||||
writableObjectMode: options.objectMode,
|
||||
transform(data, encoding, callback) {
|
||||
collected.push(data);
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
this.push(
|
||||
options.objectMode ? collected : Buffer.concat(collected),
|
||||
);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a Readable stream of readable streams concatenated together
|
||||
* @param streams Readable streams to concatenate
|
||||
*/
|
||||
export function concat(
|
||||
...streams: NodeJS.ReadableStream[]
|
||||
): NodeJS.ReadableStream {
|
||||
let isStarted = false;
|
||||
let currentStreamIndex = 0;
|
||||
const startCurrentStream = () => {
|
||||
if (currentStreamIndex >= streams.length) {
|
||||
wrapper.push(null);
|
||||
} else {
|
||||
streams[currentStreamIndex]
|
||||
.on("data", chunk => {
|
||||
if (!wrapper.push(chunk)) {
|
||||
streams[currentStreamIndex].pause();
|
||||
}
|
||||
})
|
||||
.on("error", err => wrapper.emit("error", err))
|
||||
.on("end", () => {
|
||||
currentStreamIndex++;
|
||||
startCurrentStream();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const wrapper = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
if (!isStarted) {
|
||||
isStarted = true;
|
||||
startCurrentStream();
|
||||
}
|
||||
if (currentStreamIndex < streams.length) {
|
||||
streams[currentStreamIndex].resume();
|
||||
}
|
||||
},
|
||||
});
|
||||
return wrapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a Readable stream of readable streams merged together in chunk arrival order
|
||||
* @param streams Readable streams to merge
|
||||
*/
|
||||
export function merge(
|
||||
...streams: NodeJS.ReadableStream[]
|
||||
): NodeJS.ReadableStream {
|
||||
let isStarted = false;
|
||||
let streamEndedCount = 0;
|
||||
return new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
if (streamEndedCount >= streams.length) {
|
||||
this.push(null);
|
||||
} else if (!isStarted) {
|
||||
isStarted = true;
|
||||
streams.forEach(stream =>
|
||||
stream
|
||||
.on("data", chunk => {
|
||||
if (!this.push(chunk)) {
|
||||
streams.forEach(s => s.pause());
|
||||
}
|
||||
})
|
||||
.on("error", err => this.emit("error", err))
|
||||
.on("end", () => {
|
||||
streamEndedCount++;
|
||||
if (streamEndedCount === streams.length) {
|
||||
this.push(null);
|
||||
}
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
streams.forEach(s => s.resume());
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
|
||||
* cause the given readable stream to yield chunks
|
||||
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
|
||||
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
|
||||
*/
|
||||
export function duplex(writable: Writable, readable: Readable) {
|
||||
const wrapper = new Duplex({
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
read() {
|
||||
readable.resume();
|
||||
},
|
||||
write(chunk, encoding, callback) {
|
||||
return writable.write(chunk, encoding, callback);
|
||||
},
|
||||
final(callback) {
|
||||
writable.end(callback);
|
||||
},
|
||||
});
|
||||
readable
|
||||
.on("data", chunk => {
|
||||
if (!wrapper.push(chunk)) {
|
||||
readable.pause();
|
||||
}
|
||||
})
|
||||
.on("error", err => wrapper.emit("error", err))
|
||||
.on("end", () => wrapper.push(null));
|
||||
writable.on("drain", () => wrapper.emit("drain"));
|
||||
writable.on("error", err => wrapper.emit("error", err));
|
||||
return wrapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a Duplex stream from a child process' stdin and stdout
|
||||
* @param childProcess Child process from which to create duplex stream
|
||||
*/
|
||||
export function child(childProcess: ChildProcess) {
|
||||
return duplex(childProcess.stdin, childProcess.stdout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
|
||||
* ended
|
||||
* @param readable Readable stream to wait on
|
||||
*/
|
||||
export function last<T>(readable: Readable): Promise<T | null> {
|
||||
let lastChunk: T | null = null;
|
||||
return new Promise((resolve, reject) => {
|
||||
readable
|
||||
.on("data", chunk => (lastChunk = chunk))
|
||||
.on("end", () => resolve(lastChunk));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores chunks of data internally in array and batches when batchSize is reached.
|
||||
*
|
||||
* @param batchSize Size of the batches
|
||||
* @param maxBatchAge Max lifetime of a batch
|
||||
*/
|
||||
export function batch(batchSize: number = 1000, maxBatchAge: number = 500) {
|
||||
let buffer: any[] = [];
|
||||
let timer: NodeJS.Timer | null = null;
|
||||
let sendChunk = (self: Transform) => {
|
||||
timer && clearTimeout(timer);
|
||||
timer = null;
|
||||
self.push(buffer);
|
||||
buffer = [];
|
||||
};
|
||||
return new Transform({
|
||||
objectMode: true,
|
||||
transform(chunk, encoding, callback) {
|
||||
buffer.push(chunk);
|
||||
if (buffer.length === batchSize) {
|
||||
sendChunk(this);
|
||||
} else {
|
||||
if (timer === null) {
|
||||
timer = setInterval(() => {
|
||||
sendChunk(this);
|
||||
}, maxBatchAge);
|
||||
}
|
||||
}
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
console.error("flushing");
|
||||
sendChunk(this);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Unbatches and sends individual chunks of data
|
||||
*/
|
||||
export function unbatch() {
|
||||
return new Transform({
|
||||
objectMode: true,
|
||||
transform(data, encoding, callback) {
|
||||
for (const d of data) {
|
||||
this.push(d);
|
||||
}
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Limits date of data transferred into stream.
|
||||
* @param targetRate Desired rate in ms
|
||||
* @param period Period to sleep for when rate is above or equal to targetRate
|
||||
*/
|
||||
export function rate(targetRate: number = 50, period: number = 2) {
|
||||
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period
|
||||
let total = 0;
|
||||
const start = performance.now();
|
||||
return new Transform({
|
||||
objectMode: true,
|
||||
async transform(data, encoding, callback) {
|
||||
const currentRate = (total / (performance.now() - start)) * 1000;
|
||||
if (targetRate && currentRate > targetRate) {
|
||||
await sleep(deltaMS);
|
||||
}
|
||||
total += 1;
|
||||
callback(undefined, data);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Limits number of parallel processes in flight.
|
||||
* @param parallel Max number of parallel processes.
|
||||
* @param func Function to execute on each data chunk
|
||||
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
|
||||
*/
|
||||
export function parallelMap<T, R>(
|
||||
mapper: (data: T) => R,
|
||||
parallel: number = 10,
|
||||
sleepTime: number = 5,
|
||||
) {
|
||||
let inflight = 0;
|
||||
return new Transform({
|
||||
objectMode: true,
|
||||
async transform(data, encoding, callback) {
|
||||
while (parallel <= inflight) {
|
||||
await sleep(sleepTime);
|
||||
}
|
||||
inflight += 1;
|
||||
callback();
|
||||
try {
|
||||
const res = await mapper(data);
|
||||
this.push(res);
|
||||
} catch (e) {
|
||||
this.emit(e);
|
||||
} finally {
|
||||
inflight -= 1;
|
||||
}
|
||||
},
|
||||
async flush(callback) {
|
||||
while (inflight > 0) {
|
||||
await sleep(sleepTime);
|
||||
}
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -1,21 +1,59 @@
|
||||
import { Readable, Writable } from "stream";
|
||||
import { ChildProcess } from "child_process";
|
||||
import * as baseFunctions from "./functions";
|
||||
|
||||
import {
|
||||
ThroughOptions,
|
||||
Transform,
|
||||
TransformOptions,
|
||||
WithEncoding,
|
||||
JsonParseOptions,
|
||||
} from "./definitions";
|
||||
WritableOptions,
|
||||
ReadableOptions,
|
||||
} from "stream";
|
||||
import { accumulator, accumulatorBy } from "./accumulator";
|
||||
import { batch } from "./batch";
|
||||
import { child } from "./child";
|
||||
import { collect } from "./collect";
|
||||
import { concat } from "./concat";
|
||||
import { duplex } from "./duplex";
|
||||
import { filter } from "./filter";
|
||||
import { flatMap } from "./flatMap";
|
||||
import { fromArray } from "./fromArray";
|
||||
import { join } from "./join";
|
||||
import { last } from "./last";
|
||||
import { map } from "./map";
|
||||
import { merge } from "./merge";
|
||||
import { parallelMap } from "./parallelMap";
|
||||
import { parse } from "./parse";
|
||||
import { rate } from "./rate";
|
||||
import { reduce } from "./reduce";
|
||||
import { replace } from "./replace";
|
||||
import { split } from "./split";
|
||||
import { stringify } from "./stringify";
|
||||
import { unbatch } from "./unbatch";
|
||||
import { compose } from "./compose";
|
||||
import { demux } from "./demux";
|
||||
|
||||
export default function mhysa(defaultOptions?: TransformOptions) {
|
||||
function withDefaultOptions<T extends any[], R>(
|
||||
n: number,
|
||||
fn: (...args: T) => R,
|
||||
): (...args: T) => R {
|
||||
return (...args) => {
|
||||
const options = {
|
||||
...defaultOptions,
|
||||
...((args[n] || {}) as TransformOptions | {}),
|
||||
};
|
||||
const provided = args.slice(0, n);
|
||||
const nextArgs = [
|
||||
...provided,
|
||||
...Array(n - provided.length).fill(undefined),
|
||||
options,
|
||||
] as T;
|
||||
return fn(...nextArgs) as R;
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
/**
|
||||
* Convert an array into a Readable stream of its elements
|
||||
* @param array Array of elements to stream
|
||||
*/
|
||||
export function fromArray(array: any[]): NodeJS.ReadableStream {
|
||||
return baseFunctions.fromArray(array);
|
||||
}
|
||||
fromArray,
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that maps streamed chunks
|
||||
@@ -24,12 +62,7 @@ export function fromArray(array: any[]): NodeJS.ReadableStream {
|
||||
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
||||
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
||||
*/
|
||||
export function map<T, R>(
|
||||
mapper: (chunk: T, encoding?: string) => R,
|
||||
options?: TransformOptions,
|
||||
): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.map(mapper, options);
|
||||
}
|
||||
map: withDefaultOptions(1, map),
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that flat maps streamed chunks
|
||||
@@ -38,14 +71,7 @@ export function map<T, R>(
|
||||
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
||||
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
||||
*/
|
||||
export function flatMap<T, R>(
|
||||
mapper:
|
||||
| ((chunk: T, encoding: string) => R[])
|
||||
| ((chunk: T, encoding: string) => Promise<R[]>),
|
||||
options?: TransformOptions,
|
||||
): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.flatMap(mapper, options);
|
||||
}
|
||||
flatMap: withDefaultOptions(1, flatMap),
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
|
||||
@@ -53,14 +79,7 @@ export function flatMap<T, R>(
|
||||
* @param options?
|
||||
* @param options.objectMode? Whether this stream should behave as a stream of objects.
|
||||
*/
|
||||
export function filter<T>(
|
||||
mapper:
|
||||
| ((chunk: T, encoding: string) => boolean)
|
||||
| ((chunk: T, encoding: string) => Promise<boolean>),
|
||||
options?: ThroughOptions,
|
||||
): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.filter(mapper, options);
|
||||
}
|
||||
filter: withDefaultOptions(1, filter),
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
|
||||
@@ -71,15 +90,7 @@ export function filter<T>(
|
||||
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
|
||||
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
|
||||
*/
|
||||
export function reduce<T, R>(
|
||||
iteratee:
|
||||
| ((previousValue: R, chunk: T, encoding: string) => R)
|
||||
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
|
||||
initialValue: R,
|
||||
options?: TransformOptions,
|
||||
): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.reduce(iteratee, initialValue, options);
|
||||
}
|
||||
reduce: withDefaultOptions(2, reduce),
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that splits streamed chunks using the given separator
|
||||
@@ -87,12 +98,7 @@ export function reduce<T, R>(
|
||||
* @param options? Defaults to encoding: utf8
|
||||
* @param options.encoding? Encoding written chunks are assumed to use
|
||||
*/
|
||||
export function split(
|
||||
separator?: string | RegExp,
|
||||
options?: WithEncoding,
|
||||
): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.split(separator, options);
|
||||
}
|
||||
split,
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that joins streamed chunks using the given separator
|
||||
@@ -100,12 +106,7 @@ export function split(
|
||||
* @param options? Defaults to encoding: utf8
|
||||
* @param options.encoding? Encoding written chunks are assumed to use
|
||||
*/
|
||||
export function join(
|
||||
separator: string,
|
||||
options?: WithEncoding,
|
||||
): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.join(separator, options);
|
||||
}
|
||||
join: withDefaultOptions(1, join),
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
|
||||
@@ -115,21 +116,17 @@ export function join(
|
||||
* @param options? Defaults to encoding: utf8
|
||||
* @param options.encoding Encoding written chunks are assumed to use
|
||||
*/
|
||||
export function replace(
|
||||
searchValue: string | RegExp,
|
||||
replaceValue: string,
|
||||
options?: WithEncoding,
|
||||
): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.replace(searchValue, replaceValue, options);
|
||||
}
|
||||
replace,
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
|
||||
* must be a fully defined JSON string in utf8.
|
||||
* @param format: @type SerializationFormats defaults SerializationFormats.utf8
|
||||
* @param emitError: @type boolean Whether or not to emit an error when
|
||||
* failing to parse. An error will automatically close the stream.
|
||||
* Defaults to true.
|
||||
*/
|
||||
export function parse(): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.parse();
|
||||
}
|
||||
parse,
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
|
||||
@@ -137,38 +134,26 @@ export function parse(): NodeJS.ReadWriteStream {
|
||||
* @param options.pretty If true, whitespace is inserted into the stringified chunks.
|
||||
*
|
||||
*/
|
||||
export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.stringify(options);
|
||||
}
|
||||
stringify,
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
|
||||
* @param options?
|
||||
* @param options.objectMode? Whether this stream should behave as a stream of objects
|
||||
*/
|
||||
export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.collect(options);
|
||||
}
|
||||
collect: withDefaultOptions(0, collect),
|
||||
|
||||
/**
|
||||
* Return a Readable stream of readable streams concatenated together
|
||||
* @param streams Readable streams to concatenate
|
||||
*/
|
||||
export function concat(
|
||||
...streams: NodeJS.ReadableStream[]
|
||||
): NodeJS.ReadableStream {
|
||||
return baseFunctions.concat(...streams);
|
||||
}
|
||||
concat,
|
||||
|
||||
/**
|
||||
* Return a Readable stream of readable streams concatenated together
|
||||
* @param streams Readable streams to merge
|
||||
*/
|
||||
export function merge(
|
||||
...streams: NodeJS.ReadableStream[]
|
||||
): NodeJS.ReadableStream {
|
||||
return baseFunctions.merge(...streams);
|
||||
}
|
||||
merge,
|
||||
|
||||
/**
|
||||
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
|
||||
@@ -176,66 +161,107 @@ export function merge(
|
||||
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
|
||||
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
|
||||
*/
|
||||
export function duplex(
|
||||
writable: Writable,
|
||||
readable: Readable,
|
||||
): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.duplex(writable, readable);
|
||||
}
|
||||
duplex,
|
||||
|
||||
/**
|
||||
* Return a Duplex stream from a child process' stdin and stdout
|
||||
* @param childProcess Child process from which to create duplex stream
|
||||
*/
|
||||
export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.child(childProcess);
|
||||
}
|
||||
child,
|
||||
|
||||
/**
|
||||
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
|
||||
* ended
|
||||
* @param readable Readable stream to wait on
|
||||
*/
|
||||
export function last<T>(readable: Readable): Promise<T | null> {
|
||||
return baseFunctions.last(readable);
|
||||
}
|
||||
last,
|
||||
|
||||
/**
|
||||
* Stores chunks of data internally in array and batches when batchSize is reached.
|
||||
* @param batchSize Size of the batches, defaults to 1000.
|
||||
* @param maxBatchAge? Max lifetime of a batch, defaults to 500
|
||||
*/
|
||||
export function batch(batchSize: number, maxBatchAge?: number): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.batch(batchSize, maxBatchAge);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unbatches and sends individual chunks of data
|
||||
*/
|
||||
export function unbatch(): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.unbatch();
|
||||
}
|
||||
|
||||
/**
|
||||
* Limits date of data transferred into stream.
|
||||
* @param options?
|
||||
* @param targetRate? Desired rate in ms
|
||||
* @param period? Period to sleep for when rate is above or equal to targetRate
|
||||
* @param options.objectMode? Whether this stream should behave as a stream of objects
|
||||
*/
|
||||
export function rate(targetRate?: number, period?: number): NodeJS.ReadWriteStream {
|
||||
return baseFunctions.rate(targetRate, period);
|
||||
}
|
||||
batch: withDefaultOptions(2, batch),
|
||||
|
||||
/**
|
||||
* Unbatches and sends individual chunks of data.
|
||||
* @param options?
|
||||
* @param options.objectMode? Whether this stream should behave as a stream of objects
|
||||
*/
|
||||
unbatch: withDefaultOptions(0, unbatch),
|
||||
|
||||
/**
|
||||
* Limits rate of data transferred into stream.
|
||||
* @param targetRate? Desired rate in ms.
|
||||
* @param period? Period to sleep for when rate is above or equal to targetRate.
|
||||
* @param options?
|
||||
*/
|
||||
rate: withDefaultOptions(2, rate),
|
||||
|
||||
/**
|
||||
* Limits number of parallel processes in flight.
|
||||
* @param parallel Max number of parallel processes.
|
||||
* @param func Function to execute on each data chunk
|
||||
* @param func Function to execute on each data chunk.
|
||||
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
|
||||
*/
|
||||
export function parallelMap<T, R>(
|
||||
mapper: (chunk: T) => R,
|
||||
parallel?: number,
|
||||
sleepTime?: number,
|
||||
) {
|
||||
return baseFunctions.parallelMap(mapper, parallel, sleepTime);
|
||||
parallelMap: withDefaultOptions(3, parallelMap),
|
||||
|
||||
/**
|
||||
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
|
||||
* in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies:
|
||||
* 1. Sliding
|
||||
* - If the buffer is larger than the batchSize, the front of the buffer is popped to maintain
|
||||
* the batchSize. When no key is provided, the batchSize is effectively the buffer length. When
|
||||
* a key is provided, the batchSize is based on the value at that key. For example, given a key
|
||||
* of `timestamp` and a batchSize of 3000, each item in the buffer will be guaranteed to be
|
||||
* within 3000 timestamp units from the first element. This means that with a key, multiple elements
|
||||
* may be spliced off the front of the buffer. The buffer is then pushed into the stream.
|
||||
* 2. Rolling
|
||||
* - If the buffer is larger than the batchSize, the buffer is cleared and pushed into the stream.
|
||||
* When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize
|
||||
* is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000,
|
||||
* each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element.
|
||||
* @param flushStrategy Buffering strategy to use.
|
||||
* @param batchSize Size of the batch (in units of buffer length or value at key).
|
||||
* @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer.
|
||||
* @param options Transform stream options
|
||||
*/
|
||||
accumulator: withDefaultOptions(3, accumulator),
|
||||
|
||||
/**
|
||||
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
|
||||
* in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies:
|
||||
* 1. Sliding
|
||||
* - If the iteratee returns false, the front of the buffer is popped until iteratee returns true. The
|
||||
* item is pushed into the buffer and buffer is pushed into stream.
|
||||
* 2. Rolling
|
||||
* - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is
|
||||
* then pushed into the buffer.
|
||||
* @param flushStrategy Buffering strategy to use.
|
||||
* @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into
|
||||
* or items need to be cleared from buffer.
|
||||
* @param options Transform stream options
|
||||
*/
|
||||
accumulatorBy: withDefaultOptions(2, accumulatorBy),
|
||||
|
||||
/**
|
||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||
* @param streams Array of streams to compose. Minimum of two.
|
||||
* @param errorCallback a function that handles any error coming out of the pipeline
|
||||
* @param options Transform stream options
|
||||
*/
|
||||
compose: withDefaultOptions(2, compose),
|
||||
|
||||
/**
|
||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||
* @param demuxBy
|
||||
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||
* @param options Writable stream options
|
||||
*/
|
||||
demux: withDefaultOptions(2, demux),
|
||||
};
|
||||
}
|
||||
|
||||
26
src/functions/join.ts
Normal file
26
src/functions/join.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
import { StringDecoder } from "string_decoder";
|
||||
import { WithEncoding } from "./baseDefinitions";
|
||||
|
||||
export function join(
|
||||
separator: string,
|
||||
options: WithEncoding & TransformOptions = { encoding: "utf8" },
|
||||
): Transform {
|
||||
let isFirstChunk = true;
|
||||
const decoder = new StringDecoder(options.encoding);
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
async transform(chunk: Buffer, encoding, callback) {
|
||||
const asString = decoder.write(chunk);
|
||||
// Take care not to break up multi-byte characters spanning multiple chunks
|
||||
if (asString !== "" || chunk.length === 0) {
|
||||
if (!isFirstChunk) {
|
||||
this.push(separator);
|
||||
}
|
||||
this.push(asString);
|
||||
isFirstChunk = false;
|
||||
}
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
8
src/functions/last.ts
Normal file
8
src/functions/last.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
export function last<T>(readable: NodeJS.ReadableStream): Promise<T | null> {
|
||||
let lastChunk: T | null = null;
|
||||
return new Promise((resolve, _) => {
|
||||
readable
|
||||
.on("data", chunk => (lastChunk = chunk))
|
||||
.on("end", () => resolve(lastChunk));
|
||||
});
|
||||
}
|
||||
13
src/functions/map.ts
Normal file
13
src/functions/map.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
|
||||
export function map<T, R>(
|
||||
mapper: (chunk: T, encoding: string) => R,
|
||||
options: TransformOptions = { objectMode: true },
|
||||
): Transform {
|
||||
return new Transform({
|
||||
...options,
|
||||
async transform(chunk: T, encoding, callback) {
|
||||
callback(null, await mapper(chunk, encoding));
|
||||
},
|
||||
});
|
||||
}
|
||||
33
src/functions/merge.ts
Normal file
33
src/functions/merge.ts
Normal file
@@ -0,0 +1,33 @@
|
||||
import { Readable } from "stream";
|
||||
|
||||
export function merge(...streams: Readable[]): Readable {
|
||||
let isStarted = false;
|
||||
let streamEndedCount = 0;
|
||||
return new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
if (streamEndedCount >= streams.length) {
|
||||
this.push(null);
|
||||
} else if (!isStarted) {
|
||||
isStarted = true;
|
||||
streams.forEach(stream =>
|
||||
stream
|
||||
.on("data", chunk => {
|
||||
if (!this.push(chunk)) {
|
||||
streams.forEach(s => s.pause());
|
||||
}
|
||||
})
|
||||
.on("error", err => this.emit("error", err))
|
||||
.on("end", () => {
|
||||
streamEndedCount++;
|
||||
if (streamEndedCount === streams.length) {
|
||||
this.push(null);
|
||||
}
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
streams.forEach(s => s.resume());
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
30
src/functions/parallelMap.ts
Normal file
30
src/functions/parallelMap.ts
Normal file
@@ -0,0 +1,30 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
import { sleep } from "../helpers";
|
||||
|
||||
export function parallelMap<T, R>(
|
||||
mapper: (data: T) => R,
|
||||
parallel: number = 10,
|
||||
sleepTime: number = 1,
|
||||
options?: TransformOptions,
|
||||
) {
|
||||
let inflight = 0;
|
||||
return new Transform({
|
||||
...options,
|
||||
async transform(data, encoding, callback) {
|
||||
while (parallel <= inflight) {
|
||||
await sleep(sleepTime);
|
||||
}
|
||||
inflight += 1;
|
||||
callback();
|
||||
const res = await mapper(data);
|
||||
this.push(res);
|
||||
inflight -= 1;
|
||||
},
|
||||
async flush(callback) {
|
||||
while (inflight > 0) {
|
||||
await sleep(sleepTime);
|
||||
}
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
27
src/functions/parse.ts
Normal file
27
src/functions/parse.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import { Transform } from "stream";
|
||||
import { StringDecoder } from "string_decoder";
|
||||
import { SerializationFormats } from "./baseDefinitions";
|
||||
|
||||
export function parse(
|
||||
format: SerializationFormats = SerializationFormats.utf8,
|
||||
emitError: boolean = true,
|
||||
): Transform {
|
||||
const decoder = new StringDecoder(format);
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
async transform(chunk: Buffer, encoding, callback) {
|
||||
try {
|
||||
const asString = decoder.write(chunk);
|
||||
// Using await causes parsing errors to be emitted
|
||||
callback(null, await JSON.parse(asString));
|
||||
} catch (err) {
|
||||
if (emitError) {
|
||||
callback(err);
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
24
src/functions/rate.ts
Normal file
24
src/functions/rate.ts
Normal file
@@ -0,0 +1,24 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
import { performance } from "perf_hooks";
|
||||
import { sleep } from "../helpers";
|
||||
|
||||
export function rate(
|
||||
targetRate: number = 50,
|
||||
period: number = 1,
|
||||
options?: TransformOptions,
|
||||
): Transform {
|
||||
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip a full period
|
||||
let total = 0;
|
||||
const start = performance.now();
|
||||
return new Transform({
|
||||
...options,
|
||||
async transform(data, encoding, callback) {
|
||||
const currentRate = (total / (performance.now() - start)) * 1000;
|
||||
if (targetRate && currentRate > targetRate) {
|
||||
await sleep(deltaMS);
|
||||
}
|
||||
total += 1;
|
||||
callback(undefined, data);
|
||||
},
|
||||
});
|
||||
}
|
||||
31
src/functions/reduce.ts
Normal file
31
src/functions/reduce.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
|
||||
export function reduce<T, R>(
|
||||
iteratee:
|
||||
| ((previousValue: R, chunk: T, encoding: string) => R)
|
||||
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
|
||||
initialValue: R,
|
||||
options?: TransformOptions,
|
||||
) {
|
||||
let value = initialValue;
|
||||
return new Transform({
|
||||
...options,
|
||||
async transform(chunk: T, encoding, callback) {
|
||||
value = await iteratee(value, chunk, encoding);
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
// Best effort attempt at yielding the final value (will throw if e.g. yielding an object and
|
||||
// downstream doesn't expect objects)
|
||||
try {
|
||||
callback(undefined, value);
|
||||
} catch (err) {
|
||||
try {
|
||||
this.emit("error", err);
|
||||
} catch {
|
||||
// Best effort was made
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
25
src/functions/replace.ts
Normal file
25
src/functions/replace.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import { Transform } from "stream";
|
||||
import { StringDecoder } from "string_decoder";
|
||||
import { WithEncoding } from "./baseDefinitions";
|
||||
export function replace(
|
||||
searchValue: string | RegExp,
|
||||
replaceValue: string,
|
||||
options: WithEncoding = { encoding: "utf8" },
|
||||
): Transform {
|
||||
const decoder = new StringDecoder(options.encoding);
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
transform(chunk: Buffer, encoding, callback) {
|
||||
const asString = decoder.write(chunk);
|
||||
// Take care not to break up multi-byte characters spanning multiple chunks
|
||||
if (asString !== "" || chunk.length === 0) {
|
||||
callback(
|
||||
undefined,
|
||||
asString.replace(searchValue, replaceValue),
|
||||
);
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
29
src/functions/split.ts
Normal file
29
src/functions/split.ts
Normal file
@@ -0,0 +1,29 @@
|
||||
import { Transform } from "stream";
|
||||
import { StringDecoder } from "string_decoder";
|
||||
import { WithEncoding } from "./baseDefinitions";
|
||||
|
||||
export function split(
|
||||
separator: string | RegExp = "\n",
|
||||
options: WithEncoding = { encoding: "utf8" },
|
||||
): Transform {
|
||||
let buffered = "";
|
||||
const decoder = new StringDecoder(options.encoding);
|
||||
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
transform(chunk: Buffer, encoding, callback) {
|
||||
const asString = decoder.write(chunk);
|
||||
const splitted = asString.split(separator);
|
||||
if (splitted.length > 1) {
|
||||
splitted[0] = buffered.concat(splitted[0]);
|
||||
buffered = "";
|
||||
}
|
||||
buffered += splitted[splitted.length - 1];
|
||||
splitted.slice(0, -1).forEach((part: string) => this.push(part));
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
callback(undefined, buffered + decoder.end());
|
||||
},
|
||||
});
|
||||
}
|
||||
19
src/functions/stringify.ts
Normal file
19
src/functions/stringify.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
import { Transform } from "stream";
|
||||
import { JsonValue, JsonParseOptions } from "./baseDefinitions";
|
||||
|
||||
export function stringify(
|
||||
options: JsonParseOptions = { pretty: false },
|
||||
): Transform {
|
||||
return new Transform({
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
transform(chunk: JsonValue, encoding, callback) {
|
||||
callback(
|
||||
undefined,
|
||||
options.pretty
|
||||
? JSON.stringify(chunk, null, 2)
|
||||
: JSON.stringify(chunk),
|
||||
);
|
||||
},
|
||||
});
|
||||
}
|
||||
13
src/functions/unbatch.ts
Normal file
13
src/functions/unbatch.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { Transform, TransformOptions } from "stream";
|
||||
|
||||
export function unbatch(options?: TransformOptions) {
|
||||
return new Transform({
|
||||
...options,
|
||||
transform(data, encoding, callback) {
|
||||
for (const d of data) {
|
||||
this.push(d);
|
||||
}
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -1,3 +1,17 @@
|
||||
export async function sleep(time: number): Promise<{} | null> {
|
||||
return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null;
|
||||
}
|
||||
|
||||
export type AllStreams =
|
||||
| NodeJS.ReadableStream
|
||||
| NodeJS.ReadWriteStream
|
||||
| NodeJS.WritableStream;
|
||||
|
||||
export function isReadable(
|
||||
stream: AllStreams,
|
||||
): stream is NodeJS.WritableStream {
|
||||
return (
|
||||
(stream as NodeJS.ReadableStream).pipe !== undefined &&
|
||||
(stream as any).readable === true
|
||||
);
|
||||
}
|
||||
|
||||
28
src/index.ts
28
src/index.ts
@@ -1,22 +1,6 @@
|
||||
export {
|
||||
fromArray,
|
||||
map,
|
||||
flatMap,
|
||||
filter,
|
||||
reduce,
|
||||
split,
|
||||
join,
|
||||
replace,
|
||||
parse,
|
||||
stringify,
|
||||
collect,
|
||||
concat,
|
||||
merge,
|
||||
duplex,
|
||||
child,
|
||||
last,
|
||||
batch,
|
||||
unbatch,
|
||||
rate,
|
||||
parallelMap,
|
||||
} from "./functions";
|
||||
import mhysa from "./functions";
|
||||
import * as _utils from "./utils";
|
||||
export default mhysa;
|
||||
|
||||
// @TODO fix this with proper import export
|
||||
export const utils = { ..._utils };
|
||||
|
||||
12
src/utils/collected.ts
Normal file
12
src/utils/collected.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import { Transform } from "stream";
|
||||
|
||||
export function collected(stream: Transform): any {
|
||||
return new Promise((resolve, reject) => {
|
||||
stream.once("data", d => {
|
||||
resolve(d);
|
||||
});
|
||||
stream.once("error", e => {
|
||||
reject(e);
|
||||
});
|
||||
});
|
||||
}
|
||||
1
src/utils/index.ts
Normal file
1
src/utils/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export { collected } from "./collected";
|
||||
557
tests/accumulator.spec.ts
Normal file
557
tests/accumulator.spec.ts
Normal file
@@ -0,0 +1,557 @@
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import { Readable } from "stream";
|
||||
import mhysa from "../src";
|
||||
import { FlushStrategy } from "../src/functions/accumulator";
|
||||
import { performance } from "perf_hooks";
|
||||
const { accumulator, accumulatorBy } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("accumulator() rolling", t => {
|
||||
t.plan(3);
|
||||
let chunkIndex = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
|
||||
const secondFlush = [{ ts: 2, key: "d" }, { ts: 3, key: "e" }];
|
||||
const thirdFlush = [{ ts: 4, key: "f" }];
|
||||
const flushes = [firstFlush, secondFlush, thirdFlush];
|
||||
|
||||
source
|
||||
.pipe(
|
||||
accumulator(FlushStrategy.rolling, 2, undefined, {
|
||||
objectMode: true,
|
||||
}),
|
||||
)
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
t.deepEqual(flush, flushes[chunkIndex]);
|
||||
chunkIndex++;
|
||||
})
|
||||
.on("error", (e: any) => {
|
||||
t.end(e);
|
||||
})
|
||||
.on("end", t.end);
|
||||
[...firstFlush, ...secondFlush, ...thirdFlush].forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("accumulator() rolling with key", t => {
|
||||
t.plan(2);
|
||||
let chunkIndex = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const firstFlush = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 2, key: "d" },
|
||||
];
|
||||
const secondFlush = [{ ts: 3, key: "e" }];
|
||||
const flushes = [firstFlush, secondFlush];
|
||||
|
||||
source
|
||||
.pipe(accumulator(FlushStrategy.rolling, 3, "ts", { objectMode: true }))
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
t.deepEqual(flush, flushes[chunkIndex]);
|
||||
chunkIndex++;
|
||||
})
|
||||
.on("error", (e: any) => {
|
||||
t.end(e);
|
||||
})
|
||||
.on("end", t.end);
|
||||
[...firstFlush, ...secondFlush].forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"accumulator() rolling should emit error and ignore chunk when its missing key",
|
||||
t => {
|
||||
t.plan(2);
|
||||
let index = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const accumulatorStream = accumulator(
|
||||
FlushStrategy.rolling,
|
||||
3,
|
||||
"nonExistingKey",
|
||||
{ objectMode: true },
|
||||
);
|
||||
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
|
||||
|
||||
source
|
||||
.pipe(accumulatorStream)
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
// No valid data output
|
||||
expect(flush).to.deep.equal([]);
|
||||
})
|
||||
.on("error", (err: any) => {
|
||||
source.pipe(accumulatorStream);
|
||||
accumulatorStream.resume();
|
||||
expect(err.message).to.equal(
|
||||
`Key is missing in event: (nonExistingKey, ${JSON.stringify(
|
||||
input[index],
|
||||
)})`,
|
||||
);
|
||||
index++;
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"accumulator() rolling should emit error, ignore chunk when key is missing and continue processing chunks correctly",
|
||||
t => {
|
||||
t.plan(3);
|
||||
let chunkIndex = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const accumulatorStream = accumulator(FlushStrategy.rolling, 3, "ts", {
|
||||
objectMode: true,
|
||||
});
|
||||
const input = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ key: "d" },
|
||||
{ ts: 3, key: "e" },
|
||||
];
|
||||
const firstFlush = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
];
|
||||
const secondFlush = [{ ts: 3, key: "e" }];
|
||||
const flushes = [firstFlush, secondFlush];
|
||||
|
||||
source
|
||||
.pipe(accumulatorStream)
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
t.deepEqual(flush, flushes[chunkIndex]);
|
||||
chunkIndex++;
|
||||
})
|
||||
.on("error", (err: any) => {
|
||||
source.pipe(accumulatorStream);
|
||||
accumulatorStream.resume();
|
||||
expect(err.message).to.equal(
|
||||
`Key is missing in event: (ts, ${JSON.stringify(
|
||||
input[3],
|
||||
)})`,
|
||||
);
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("accumulator() sliding", t => {
|
||||
t.plan(4);
|
||||
let chunkIndex = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const input = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 4, key: "d" },
|
||||
];
|
||||
const firstFlush = [{ ts: 0, key: "a" }];
|
||||
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
|
||||
const thirdFlush = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
];
|
||||
const fourthFlush = [
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 4, key: "d" },
|
||||
];
|
||||
|
||||
const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush];
|
||||
source
|
||||
.pipe(
|
||||
accumulator(FlushStrategy.sliding, 3, undefined, {
|
||||
objectMode: true,
|
||||
}),
|
||||
)
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
t.deepEqual(flush, flushes[chunkIndex]);
|
||||
chunkIndex++;
|
||||
})
|
||||
.on("error", (e: any) => {
|
||||
t.end(e);
|
||||
})
|
||||
.on("end", t.end);
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("accumulator() sliding with key", t => {
|
||||
t.plan(6);
|
||||
let chunkIndex = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const input = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 3, key: "d" },
|
||||
{ ts: 5, key: "f" },
|
||||
{ ts: 6, key: "g" },
|
||||
];
|
||||
const firstFlush = [{ ts: 0, key: "a" }];
|
||||
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
|
||||
const thirdFlush = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
];
|
||||
const fourthFlush = [
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 3, key: "d" },
|
||||
];
|
||||
const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
|
||||
const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
|
||||
|
||||
const flushes = [
|
||||
firstFlush,
|
||||
secondFlush,
|
||||
thirdFlush,
|
||||
fourthFlush,
|
||||
fifthFlush,
|
||||
sixthFlush,
|
||||
];
|
||||
source
|
||||
.pipe(accumulator(FlushStrategy.sliding, 3, "ts", { objectMode: true }))
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
t.deepEqual(flush, flushes[chunkIndex]);
|
||||
chunkIndex++;
|
||||
})
|
||||
.on("error", (e: any) => {
|
||||
t.end(e);
|
||||
})
|
||||
.on("end", t.end);
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"accumulator() sliding should emit error and ignore chunk when key is missing",
|
||||
t => {
|
||||
t.plan(2);
|
||||
let index = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const accumulatorStream = accumulator(
|
||||
FlushStrategy.sliding,
|
||||
3,
|
||||
"nonExistingKey",
|
||||
{ objectMode: true },
|
||||
);
|
||||
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
|
||||
|
||||
source
|
||||
.pipe(accumulatorStream)
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
expect(flush).to.deep.equal([]);
|
||||
})
|
||||
.on("error", (err: any) => {
|
||||
source.pipe(accumulatorStream);
|
||||
accumulatorStream.resume();
|
||||
expect(err.message).to.equal(
|
||||
`Key is missing in event: (nonExistingKey, ${JSON.stringify(
|
||||
input[index],
|
||||
)})`,
|
||||
);
|
||||
index++;
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"accumulator() sliding should emit error, ignore chunk when key is missing and continue processing chunks correctly",
|
||||
t => {
|
||||
t.plan(6);
|
||||
let chunkIndex = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const accumulatorStream = accumulator(FlushStrategy.sliding, 3, "ts", {
|
||||
objectMode: true,
|
||||
});
|
||||
const input = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 3, key: "d" },
|
||||
{ ts: 5, key: "f" },
|
||||
{ ts: 6, key: "g" },
|
||||
];
|
||||
const firstFlush = [{ ts: 0, key: "a" }];
|
||||
const secondFlush = [{ ts: 0, key: "a" }, { ts: 2, key: "c" }];
|
||||
const thirdFlush = [{ ts: 2, key: "c" }, { ts: 3, key: "d" }];
|
||||
const fourthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
|
||||
const fifthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
|
||||
|
||||
const flushes = [
|
||||
firstFlush,
|
||||
secondFlush,
|
||||
thirdFlush,
|
||||
fourthFlush,
|
||||
fifthFlush,
|
||||
];
|
||||
source
|
||||
.pipe(accumulatorStream)
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
t.deepEqual(flush, flushes[chunkIndex]);
|
||||
chunkIndex++;
|
||||
})
|
||||
.on("error", (err: any) => {
|
||||
source.pipe(accumulatorStream);
|
||||
accumulatorStream.resume();
|
||||
expect(err.message).to.equal(
|
||||
`Key is missing in event: (ts, ${JSON.stringify(
|
||||
input[1],
|
||||
)})`,
|
||||
);
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("accumulatorBy() rolling", t => {
|
||||
t.plan(2);
|
||||
let chunkIndex = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const firstFlush = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 2, key: "d" },
|
||||
];
|
||||
const secondFlush = [{ ts: 3, key: "e" }];
|
||||
const flushes = [firstFlush, secondFlush];
|
||||
|
||||
source
|
||||
.pipe(
|
||||
accumulatorBy(
|
||||
FlushStrategy.rolling,
|
||||
(event: TestObject, bufferChunk: TestObject) => {
|
||||
return bufferChunk.ts + 3 <= event.ts;
|
||||
},
|
||||
{ objectMode: true },
|
||||
),
|
||||
)
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
t.deepEqual(flush, flushes[chunkIndex]);
|
||||
chunkIndex++;
|
||||
})
|
||||
.on("error", (e: any) => {
|
||||
t.end(e);
|
||||
})
|
||||
.on("end", t.end);
|
||||
[...firstFlush, ...secondFlush].forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb.skip(
|
||||
"accumulatorBy() rolling should emit error when key iteratee throws",
|
||||
t => {
|
||||
t.plan(2);
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const input = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
];
|
||||
const accumulaterStream = accumulatorBy(
|
||||
FlushStrategy.rolling,
|
||||
(event: TestObject, bufferChunk: TestObject) => {
|
||||
if (event.key !== "a") {
|
||||
throw new Error("Failed mapping");
|
||||
}
|
||||
return bufferChunk.ts + 3 <= event.ts;
|
||||
},
|
||||
{ objectMode: true },
|
||||
);
|
||||
source
|
||||
.pipe(accumulaterStream)
|
||||
.on("error", (err: any) => {
|
||||
source.pipe(accumulaterStream);
|
||||
accumulaterStream.resume();
|
||||
expect(err.message).to.equal("Failed mapping");
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("accumulatorBy() sliding", t => {
|
||||
t.plan(6);
|
||||
let chunkIndex = 0;
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const input = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 3, key: "d" },
|
||||
{ ts: 5, key: "f" },
|
||||
{ ts: 6, key: "g" },
|
||||
];
|
||||
const firstFlush = [{ ts: 0, key: "a" }];
|
||||
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
|
||||
const thirdFlush = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
];
|
||||
const fourthFlush = [
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
{ ts: 3, key: "d" },
|
||||
];
|
||||
const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
|
||||
const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
|
||||
|
||||
const flushes = [
|
||||
firstFlush,
|
||||
secondFlush,
|
||||
thirdFlush,
|
||||
fourthFlush,
|
||||
fifthFlush,
|
||||
sixthFlush,
|
||||
];
|
||||
source
|
||||
.pipe(
|
||||
accumulatorBy(
|
||||
FlushStrategy.sliding,
|
||||
(event: TestObject, bufferChunk: TestObject) => {
|
||||
return bufferChunk.ts + 3 <= event.ts ? true : false;
|
||||
},
|
||||
{ objectMode: true },
|
||||
),
|
||||
)
|
||||
.on("data", (flush: TestObject[]) => {
|
||||
t.deepEqual(flush, flushes[chunkIndex]);
|
||||
chunkIndex++;
|
||||
})
|
||||
.on("error", (e: any) => {
|
||||
t.end(e);
|
||||
})
|
||||
.on("end", t.end);
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb.skip(
|
||||
"accumulatorBy() sliding should emit error when key iteratee throws",
|
||||
t => {
|
||||
t.plan(2);
|
||||
interface TestObject {
|
||||
ts: number;
|
||||
key: string;
|
||||
}
|
||||
const source = new Readable({ objectMode: true });
|
||||
const input = [
|
||||
{ ts: 0, key: "a" },
|
||||
{ ts: 1, key: "b" },
|
||||
{ ts: 2, key: "c" },
|
||||
];
|
||||
const accumulaterStream = accumulatorBy(
|
||||
FlushStrategy.sliding,
|
||||
(event: TestObject, bufferChunk: TestObject) => {
|
||||
if (event.key !== "a") {
|
||||
throw new Error("Failed mapping");
|
||||
}
|
||||
return bufferChunk.ts + 3 <= event.ts ? true : false;
|
||||
},
|
||||
{ objectMode: true },
|
||||
);
|
||||
source
|
||||
.pipe(accumulaterStream)
|
||||
.on("error", (err: any) => {
|
||||
source.pipe(accumulaterStream);
|
||||
accumulaterStream.resume();
|
||||
expect(err.message).to.equal("Failed mapping");
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
input.forEach(item => {
|
||||
source.push(item);
|
||||
});
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
84
tests/batch.spec.ts
Normal file
84
tests/batch.spec.ts
Normal file
@@ -0,0 +1,84 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { batch, map, fromArray } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("batch() batches chunks together", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = [["a", "b", "c"], ["d", "e", "f"], ["g"]];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(batch(3))
|
||||
.on("data", (element: string[]) => {
|
||||
t.deepEqual(element, expectedElements[i]);
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push("d");
|
||||
source.push("e");
|
||||
source.push("f");
|
||||
source.push("g");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("batch() yields a batch after the timeout", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({
|
||||
objectMode: true,
|
||||
read(size: number) {
|
||||
return;
|
||||
},
|
||||
});
|
||||
const expectedElements = [["a", "b"], ["c"], ["d"]];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(batch(3))
|
||||
.on("data", (element: string[]) => {
|
||||
t.deepEqual(element, expectedElements[i]);
|
||||
i++;
|
||||
})
|
||||
.on("error", t.fail)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
setTimeout(() => {
|
||||
source.push("c");
|
||||
}, 600);
|
||||
setTimeout(() => {
|
||||
source.push("d");
|
||||
source.push(null);
|
||||
}, 600 * 2);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"batch() yields all input data even when the last element(s) dont make a full batch",
|
||||
t => {
|
||||
const data = [1, 2, 3, 4, 5, 6, 7];
|
||||
|
||||
fromArray([...data])
|
||||
.pipe(batch(3))
|
||||
.pipe(
|
||||
map(d => {
|
||||
t.deepEqual(
|
||||
d,
|
||||
[data.shift(), data.shift(), data.shift()].filter(
|
||||
x => !!x,
|
||||
),
|
||||
);
|
||||
}),
|
||||
)
|
||||
.on("error", t.fail)
|
||||
.on("finish", () => {
|
||||
t.is(data.length, 0);
|
||||
t.end();
|
||||
});
|
||||
},
|
||||
);
|
||||
29
tests/child.spec.ts
Normal file
29
tests/child.spec.ts
Normal file
@@ -0,0 +1,29 @@
|
||||
import * as cp from "child_process";
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { child } = mhysa();
|
||||
|
||||
test.cb(
|
||||
"child() allows easily writing to child process stdin and reading from its stdout",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable();
|
||||
const catProcess = cp.exec("cat");
|
||||
let out = "";
|
||||
source
|
||||
.pipe(child(catProcess))
|
||||
.on("data", chunk => (out += chunk))
|
||||
.on("error", t.end)
|
||||
.on("end", () => {
|
||||
expect(out).to.equal("abcdef");
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
source.push("ab");
|
||||
source.push("cd");
|
||||
source.push("ef");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
133
tests/collect.spec.ts
Normal file
133
tests/collect.spec.ts
Normal file
@@ -0,0 +1,133 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { collect } = mhysa();
|
||||
|
||||
test.cb(
|
||||
"collect() collects streamed elements into an array (object, flowing mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
|
||||
source
|
||||
.pipe(collect({ objectMode: true }))
|
||||
.on("data", collected => {
|
||||
expect(collected).to.deep.equal(["a", "b", "c"]);
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() collects streamed elements into an array (object, paused mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const collector = source.pipe(collect({ objectMode: true }));
|
||||
|
||||
collector
|
||||
.on("readable", () => {
|
||||
let collected = collector.read();
|
||||
while (collected !== null) {
|
||||
expect(collected).to.deep.equal(["a", "b", "c"]);
|
||||
t.pass();
|
||||
collected = collector.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() collects streamed bytes into a buffer (non-object, flowing mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: false });
|
||||
|
||||
source
|
||||
.pipe(collect())
|
||||
.on("data", collected => {
|
||||
expect(collected).to.deep.equal(Buffer.from("abc"));
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() collects streamed bytes into a buffer (non-object, paused mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: false });
|
||||
const collector = source.pipe(collect({ objectMode: false }));
|
||||
collector
|
||||
.on("readable", () => {
|
||||
let collected = collector.read();
|
||||
while (collected !== null) {
|
||||
expect(collected).to.deep.equal(Buffer.from("abc"));
|
||||
t.pass();
|
||||
collected = collector.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() emits an empty array if the source was empty (object mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const collector = source.pipe(collect({ objectMode: true }));
|
||||
collector
|
||||
.on("data", collected => {
|
||||
expect(collected).to.deep.equal([]);
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() emits nothing if the source was empty (non-object mode)",
|
||||
t => {
|
||||
t.plan(0);
|
||||
const source = new Readable({ objectMode: false });
|
||||
const collector = source.pipe(collect({ objectMode: false }));
|
||||
collector
|
||||
.on("data", () => t.fail())
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
580
tests/compose.spec.ts
Normal file
580
tests/compose.spec.ts
Normal file
@@ -0,0 +1,580 @@
|
||||
import * as test from "ava";
|
||||
import { expect } from "chai";
|
||||
import { sleep } from "../src/helpers";
|
||||
import { Readable, Writable } from "stream";
|
||||
import mhysa from "../src";
|
||||
import { performance } from "perf_hooks";
|
||||
const { compose, map, fromArray } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("compose() chains two streams together in the correct order", t => {
|
||||
t.plan(3);
|
||||
interface Chunk {
|
||||
visited: number[];
|
||||
key: string;
|
||||
}
|
||||
|
||||
let i = 0;
|
||||
const first = map((chunk: Chunk) => {
|
||||
chunk.visited.push(1);
|
||||
return chunk;
|
||||
});
|
||||
const second = map((chunk: Chunk) => {
|
||||
chunk.visited.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose([first, second]);
|
||||
|
||||
composed.on("data", data => {
|
||||
expect(data).to.deep.equal(result[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
if (i === 3) {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
];
|
||||
const result = [
|
||||
{ key: "a", visited: [1, 2] },
|
||||
{ key: "b", visited: [1, 2] },
|
||||
{ key: "c", visited: [1, 2] },
|
||||
];
|
||||
|
||||
input.forEach(item => composed.write(item));
|
||||
});
|
||||
|
||||
test.cb("piping compose() maintains correct order", t => {
|
||||
t.plan(3);
|
||||
interface Chunk {
|
||||
visited: number[];
|
||||
key: string;
|
||||
}
|
||||
let i = 0;
|
||||
const first = map((chunk: Chunk) => {
|
||||
chunk.visited.push(1);
|
||||
return chunk;
|
||||
});
|
||||
const second = map((chunk: Chunk) => {
|
||||
chunk.visited.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose([first, second]);
|
||||
const third = map((chunk: Chunk) => {
|
||||
chunk.visited.push(3);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
composed.pipe(third).on("data", data => {
|
||||
expect(data).to.deep.equal(result[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
if (i === 3) {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
];
|
||||
const result = [
|
||||
{ key: "a", visited: [1, 2, 3] },
|
||||
{ key: "b", visited: [1, 2, 3] },
|
||||
{ key: "c", visited: [1, 2, 3] },
|
||||
];
|
||||
|
||||
input.forEach(item => composed.write(item));
|
||||
});
|
||||
|
||||
test("compose() writable length should be less than highWaterMark when handing writes", async t => {
|
||||
t.plan(2);
|
||||
return new Promise(async (resolve, reject) => {
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
undefined,
|
||||
{
|
||||
highWaterMark: 2,
|
||||
},
|
||||
);
|
||||
composed.on("error", err => {
|
||||
reject();
|
||||
});
|
||||
|
||||
composed.on("drain", () => {
|
||||
t.pass();
|
||||
expect(composed._writableState.length).to.be.equal(0);
|
||||
});
|
||||
|
||||
composed.on("data", (chunk: Chunk) => {
|
||||
if (chunk.key === "e") {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
|
||||
fromArray(input).pipe(composed);
|
||||
});
|
||||
});
|
||||
|
||||
test("compose() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", async t => {
|
||||
t.plan(2);
|
||||
const _rate = 100;
|
||||
const highWaterMark = 2;
|
||||
return new Promise(async (resolve, reject) => {
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_rate);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
undefined,
|
||||
{
|
||||
highWaterMark,
|
||||
},
|
||||
);
|
||||
composed.on("error", err => {
|
||||
reject();
|
||||
});
|
||||
|
||||
composed.on("drain", () => {
|
||||
t.pass();
|
||||
expect(composed._writableState.length).to.be.equal(0);
|
||||
});
|
||||
|
||||
composed.on("data", (chunk: Chunk) => {
|
||||
t.deepEqual(chunk.mapped, [1, 2]);
|
||||
});
|
||||
|
||||
composed.on("finish", () => resolve());
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
fromArray(input).pipe(composed);
|
||||
});
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"compose() should emit drain event after 500 ms when writing 5 items that take 100ms to process with a highWaterMark of 5 ",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const _rate = 100;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_rate);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
undefined,
|
||||
{
|
||||
highWaterMark: 5,
|
||||
},
|
||||
);
|
||||
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
|
||||
composed.on("drain", () => {
|
||||
expect(composed._writableState.length).to.be.equal(0);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
composed.on("data", (chunk: Chunk) => {
|
||||
t.pass();
|
||||
if (chunk.key === "e") {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
input.forEach(item => {
|
||||
composed.write(item);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"compose() should emit drain event immediately when second stream is bottleneck",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const _rate = 200;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map((chunk: Chunk) => {
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(
|
||||
async (chunk: Chunk) => {
|
||||
pendingReads--;
|
||||
await sleep(_rate);
|
||||
expect(second._writableState.length).to.be.equal(1);
|
||||
expect(first._readableState.length).to.equal(pendingReads);
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
undefined,
|
||||
{
|
||||
highWaterMark: 5,
|
||||
},
|
||||
);
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
|
||||
composed.on("drain", () => {
|
||||
expect(composed._writableState.length).to.be.equal(0);
|
||||
expect(performance.now() - start).to.be.lessThan(_rate);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
composed.on("data", (chunk: Chunk) => {
|
||||
expect(composed._writableState.length).to.be.equal(0);
|
||||
t.pass();
|
||||
if (chunk.key === "e") {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
let pendingReads = input.length;
|
||||
|
||||
input.forEach(item => {
|
||||
composed.write(item);
|
||||
});
|
||||
|
||||
const start = performance.now();
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"compose() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck",
|
||||
t => {
|
||||
t.plan(6);
|
||||
interface Chunk {
|
||||
index: number;
|
||||
mapped: string[];
|
||||
}
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
expect(first._readableState.length).to.be.at.most(2);
|
||||
chunk.mapped.push("first");
|
||||
return chunk;
|
||||
},
|
||||
{
|
||||
highWaterMark: 2,
|
||||
},
|
||||
);
|
||||
|
||||
const second = map(
|
||||
async (chunk: Chunk) => {
|
||||
expect(second._writableState.length).to.be.equal(1);
|
||||
await sleep(100);
|
||||
chunk.mapped.push("second");
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 2 },
|
||||
);
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
undefined,
|
||||
{
|
||||
highWaterMark: 5,
|
||||
},
|
||||
);
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
|
||||
composed.on("data", (chunk: Chunk) => {
|
||||
expect(chunk.mapped.length).to.equal(2);
|
||||
expect(chunk.mapped).to.deep.equal(["first", "second"]);
|
||||
t.pass();
|
||||
if (chunk.index === 5) {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
|
||||
composed.on("drain", () => {
|
||||
expect(composed._writableState.length).to.be.equal(0);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ index: 1, mapped: [] },
|
||||
{ index: 2, mapped: [] },
|
||||
{ index: 3, mapped: [] },
|
||||
{ index: 4, mapped: [] },
|
||||
{ index: 5, mapped: [] },
|
||||
];
|
||||
|
||||
input.forEach(item => {
|
||||
composed.write(item);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"compose() should not emit drain event writing 5 items to compose with a highWaterMark of 6",
|
||||
t => {
|
||||
t.plan(5);
|
||||
const _rate = 100;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_rate);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
undefined,
|
||||
{
|
||||
highWaterMark: 6,
|
||||
},
|
||||
);
|
||||
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
|
||||
composed.on("drain", () => {
|
||||
t.end(new Error("Drain should not be emitted"));
|
||||
});
|
||||
|
||||
composed.on("data", (chunk: Chunk) => {
|
||||
t.pass();
|
||||
if (chunk.key === "e") {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
|
||||
input.forEach(item => {
|
||||
composed.write(item);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("compose() should be 'destroyable'", t => {
|
||||
t.plan(3);
|
||||
const _sleep = 100;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_sleep);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
(err: any) => {
|
||||
t.pass();
|
||||
},
|
||||
);
|
||||
|
||||
const fakeSource = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
return;
|
||||
},
|
||||
});
|
||||
|
||||
const fakeSink = new Writable({
|
||||
objectMode: true,
|
||||
write(data, enc, cb) {
|
||||
const cur = input.shift();
|
||||
t.is(cur.key, data.key);
|
||||
t.deepEqual(cur.mapped, [1, 2]);
|
||||
if (cur.key === "a") {
|
||||
composed.destroy();
|
||||
}
|
||||
cb();
|
||||
},
|
||||
});
|
||||
|
||||
composed.on("close", t.end);
|
||||
fakeSource.pipe(composed).pipe(fakeSink);
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
fakeSource.push(input[0]);
|
||||
fakeSource.push(input[1]);
|
||||
fakeSource.push(input[2]);
|
||||
fakeSource.push(input[3]);
|
||||
fakeSource.push(input[4]);
|
||||
});
|
||||
|
||||
test.cb("compose() `finish` and `end` propagates", t => {
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
|
||||
t.plan(8);
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
undefined,
|
||||
{
|
||||
highWaterMark: 3,
|
||||
},
|
||||
);
|
||||
|
||||
const fakeSource = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
return;
|
||||
},
|
||||
});
|
||||
const sink = map((d: Chunk) => {
|
||||
const curr = input.shift();
|
||||
t.is(curr.key, d.key);
|
||||
t.deepEqual(d.mapped, [1, 2]);
|
||||
});
|
||||
|
||||
fakeSource.pipe(composed).pipe(sink);
|
||||
|
||||
fakeSource.on("end", () => {
|
||||
t.pass();
|
||||
});
|
||||
composed.on("finish", () => {
|
||||
t.pass();
|
||||
});
|
||||
composed.on("end", () => {
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
sink.on("finish", () => {
|
||||
t.pass();
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
fakeSource.push(input[0]);
|
||||
fakeSource.push(input[1]);
|
||||
fakeSource.push(null);
|
||||
});
|
||||
181
tests/concat.spec.ts
Normal file
181
tests/concat.spec.ts
Normal file
@@ -0,0 +1,181 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { concat, collect } = mhysa();
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates multiple readable streams (object, flowing mode)",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: true });
|
||||
const source2 = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
concat(source1, source2)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
source2.push("d");
|
||||
source1.push("b");
|
||||
source2.push("e");
|
||||
source1.push("c");
|
||||
source2.push("f");
|
||||
source2.push(null);
|
||||
source1.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates multiple readable streams (object, paused mode)",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: true });
|
||||
const source2 = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
const concatenation = concat(source1, source2)
|
||||
.on("readable", () => {
|
||||
let element = concatenation.read();
|
||||
while (element !== null) {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
element = concatenation.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
source2.push("d");
|
||||
source1.push("b");
|
||||
source2.push("e");
|
||||
source1.push("c");
|
||||
source2.push("f");
|
||||
source2.push(null);
|
||||
source1.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates multiple readable streams (non-object, flowing mode)",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: false });
|
||||
const source2 = new Readable({ objectMode: false });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
concat(source1, source2)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.deep.equal(Buffer.from(expectedElements[i]));
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
source2.push("d");
|
||||
source1.push("b");
|
||||
source2.push("e");
|
||||
source1.push("c");
|
||||
source2.push("f");
|
||||
source2.push(null);
|
||||
source1.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates multiple readable streams (non-object, paused mode)",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: false, read: () => ({}) });
|
||||
const source2 = new Readable({ objectMode: false, read: () => ({}) });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
const concatenation = concat(source1, source2)
|
||||
.on("readable", () => {
|
||||
let element = concatenation.read();
|
||||
while (element !== null) {
|
||||
expect(element).to.deep.equal(
|
||||
Buffer.from(expectedElements[i]),
|
||||
);
|
||||
t.pass();
|
||||
i++;
|
||||
element = concatenation.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
setTimeout(() => source2.push("d"), 10);
|
||||
setTimeout(() => source1.push("b"), 20);
|
||||
setTimeout(() => source2.push("e"), 30);
|
||||
setTimeout(() => source1.push("c"), 40);
|
||||
setTimeout(() => source2.push("f"), 50);
|
||||
setTimeout(() => source2.push(null), 60);
|
||||
setTimeout(() => source1.push(null), 70);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("concat() concatenates a single readable stream (object mode)", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
concat(source)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates a single readable stream (non-object mode)",
|
||||
t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: false });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
concat(source)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.deep.equal(Buffer.from(expectedElements[i]));
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("concat() concatenates empty list of readable streams", t => {
|
||||
t.plan(0);
|
||||
concat()
|
||||
.pipe(collect())
|
||||
.on("data", _ => {
|
||||
t.fail();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
21
tests/defaultOptions.spec.ts
Normal file
21
tests/defaultOptions.spec.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import mhysa from "../src";
|
||||
|
||||
const withDefaultOptions = mhysa({ objectMode: true });
|
||||
const withoutOptions = mhysa();
|
||||
|
||||
test("Mhysa instances can have default options", t => {
|
||||
let batch = withDefaultOptions.batch();
|
||||
t.true(batch._readableState.objectMode);
|
||||
t.true(batch._writableState.objectMode);
|
||||
batch = withDefaultOptions.batch(3);
|
||||
t.true(batch._readableState.objectMode);
|
||||
t.true(batch._writableState.objectMode);
|
||||
batch = withDefaultOptions.batch(3, 1);
|
||||
t.true(batch._readableState.objectMode);
|
||||
t.true(batch._writableState.objectMode);
|
||||
batch = withDefaultOptions.batch(3, 1, { objectMode: false });
|
||||
t.false(batch._readableState.objectMode);
|
||||
t.false(batch._writableState.objectMode);
|
||||
});
|
||||
864
tests/demux.spec.ts
Normal file
864
tests/demux.spec.ts
Normal file
@@ -0,0 +1,864 @@
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
import { Writable, Readable } from "stream";
|
||||
const sinon = require("sinon");
|
||||
const { sleep } = require("../src/helpers");
|
||||
import { performance } from "perf_hooks";
|
||||
const { demux, map, fromArray } = mhysa({ objectMode: true });
|
||||
|
||||
interface Test {
|
||||
key: string;
|
||||
visited: number[];
|
||||
}
|
||||
|
||||
test.cb("demux() constructor should be called once per key", t => {
|
||||
t.plan(1);
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
];
|
||||
const construct = sinon.spy((destKey: string) => {
|
||||
const dest = map((chunk: Test) => {
|
||||
chunk.visited.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
return dest;
|
||||
});
|
||||
|
||||
const demuxed = demux(construct, "key", {});
|
||||
|
||||
demuxed.on("finish", () => {
|
||||
expect(construct.withArgs("a").callCount).to.equal(1);
|
||||
expect(construct.withArgs("b").callCount).to.equal(1);
|
||||
expect(construct.withArgs("c").callCount).to.equal(1);
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
|
||||
fromArray(input).pipe(demuxed);
|
||||
});
|
||||
|
||||
test.cb("demux() item written passed in constructor", t => {
|
||||
t.plan(4);
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
];
|
||||
const construct = sinon.spy((destKey: string, item: any) => {
|
||||
expect(item).to.deep.equal({ key: destKey, visited: [] });
|
||||
t.pass();
|
||||
const dest = map((chunk: Test) => {
|
||||
chunk.visited.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
return dest;
|
||||
});
|
||||
|
||||
const demuxed = demux(construct, "key", {});
|
||||
|
||||
demuxed.on("finish", () => {
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
|
||||
fromArray(input).pipe(demuxed);
|
||||
});
|
||||
|
||||
test.cb("demux() should send input through correct pipeline", t => {
|
||||
t.plan(6);
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
];
|
||||
const pipelineSpies = {};
|
||||
const construct = (destKey: string) => {
|
||||
const mapper = sinon.spy((chunk: Test) => {
|
||||
return { ...chunk, visited: [1] };
|
||||
});
|
||||
const dest = map(mapper);
|
||||
pipelineSpies[destKey] = mapper;
|
||||
|
||||
return dest;
|
||||
};
|
||||
|
||||
const demuxed = demux(construct, "key", {});
|
||||
|
||||
demuxed.on("finish", () => {
|
||||
pipelineSpies["a"].getCalls().forEach(call => {
|
||||
expect(call.args[0].key).to.equal("a");
|
||||
t.pass();
|
||||
});
|
||||
pipelineSpies["b"].getCalls().forEach(call => {
|
||||
expect(call.args[0].key).to.equal("b");
|
||||
t.pass();
|
||||
});
|
||||
pipelineSpies["c"].getCalls().forEach(call => {
|
||||
expect(call.args[0].key).to.equal("c");
|
||||
t.pass();
|
||||
});
|
||||
t.end();
|
||||
});
|
||||
|
||||
fromArray(input).pipe(demuxed);
|
||||
});
|
||||
|
||||
test.cb("demux() constructor should be called once per key using keyBy", t => {
|
||||
t.plan(1);
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
];
|
||||
|
||||
const construct = sinon.spy((destKey: string) => {
|
||||
const dest = map((chunk: Test) => {
|
||||
chunk.visited.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
return dest;
|
||||
});
|
||||
|
||||
const demuxed = demux(construct, item => item.key, {});
|
||||
|
||||
demuxed.on("finish", () => {
|
||||
expect(construct.withArgs("a").callCount).to.equal(1);
|
||||
expect(construct.withArgs("b").callCount).to.equal(1);
|
||||
expect(construct.withArgs("c").callCount).to.equal(1);
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
|
||||
fromArray(input).pipe(demuxed);
|
||||
});
|
||||
|
||||
test.cb("demux() should send input through correct pipeline using keyBy", t => {
|
||||
t.plan(6);
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
];
|
||||
const pipelineSpies = {};
|
||||
const construct = (destKey: string) => {
|
||||
const mapper = sinon.spy((chunk: Test) => {
|
||||
return { ...chunk, visited: [1] };
|
||||
});
|
||||
const dest = map(mapper);
|
||||
pipelineSpies[destKey] = mapper;
|
||||
|
||||
return dest;
|
||||
};
|
||||
|
||||
const demuxed = demux(construct, item => item.key, {});
|
||||
|
||||
demuxed.on("finish", () => {
|
||||
pipelineSpies["a"].getCalls().forEach(call => {
|
||||
expect(call.args[0].key).to.equal("a");
|
||||
t.pass();
|
||||
});
|
||||
pipelineSpies["b"].getCalls().forEach(call => {
|
||||
expect(call.args[0].key).to.equal("b");
|
||||
t.pass();
|
||||
});
|
||||
pipelineSpies["c"].getCalls().forEach(call => {
|
||||
expect(call.args[0].key).to.equal("c");
|
||||
t.pass();
|
||||
});
|
||||
t.end();
|
||||
});
|
||||
|
||||
fromArray(input).pipe(demuxed);
|
||||
});
|
||||
|
||||
test("demux() write should return false and emit drain if more than highWaterMark items are buffered", t => {
|
||||
return new Promise(async (resolve, reject) => {
|
||||
t.plan(7);
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const input: Chunk[] = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
];
|
||||
let pendingReads = input.length;
|
||||
const highWaterMark = 5;
|
||||
const slowProcessorSpeed = 25;
|
||||
const construct = (destKey: string) => {
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
await sleep(slowProcessorSpeed);
|
||||
return { ...chunk, mapped: [1] };
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
first.on("data", chunk => {
|
||||
expect(chunk.mapped).to.deep.equal([1]);
|
||||
pendingReads--;
|
||||
if (pendingReads === 0) {
|
||||
resolve();
|
||||
}
|
||||
t.pass();
|
||||
});
|
||||
|
||||
return first;
|
||||
};
|
||||
|
||||
const _demux = demux(construct, "key", {
|
||||
highWaterMark,
|
||||
});
|
||||
|
||||
_demux.on("error", err => {
|
||||
reject();
|
||||
});
|
||||
|
||||
for (const item of input) {
|
||||
const res = _demux.write(item);
|
||||
expect(_demux._writableState.length).to.be.at.most(highWaterMark);
|
||||
if (!res) {
|
||||
await new Promise((resolv, rej) => {
|
||||
_demux.once("drain", () => {
|
||||
expect(_demux._writableState.length).to.be.equal(0);
|
||||
t.pass();
|
||||
resolv();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
test("demux() should emit one drain event after slowProcessorSpeed * highWaterMark ms when first stream is bottleneck", t => {
|
||||
return new Promise(async (resolve, reject) => {
|
||||
t.plan(7);
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const input: Chunk[] = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
];
|
||||
let pendingReads = input.length;
|
||||
const highWaterMark = 5;
|
||||
const slowProcessorSpeed = 25;
|
||||
|
||||
const construct = (destKey: string) => {
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
await sleep(slowProcessorSpeed);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
first.on("data", () => {
|
||||
t.pass();
|
||||
pendingReads--;
|
||||
if (pendingReads === 0) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
return first;
|
||||
};
|
||||
const _demux = demux(construct, "key", {
|
||||
highWaterMark,
|
||||
});
|
||||
_demux.on("error", err => {
|
||||
reject();
|
||||
});
|
||||
|
||||
const start = performance.now();
|
||||
for (const item of input) {
|
||||
const res = _demux.write(item);
|
||||
if (!res) {
|
||||
await new Promise((resolv, rej) => {
|
||||
// This event should be received after all items in demux are processed
|
||||
_demux.once("drain", () => {
|
||||
expect(performance.now() - start).to.be.greaterThan(
|
||||
slowProcessorSpeed * highWaterMark,
|
||||
);
|
||||
t.pass();
|
||||
resolv();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
test("demux() should emit one drain event when writing 6 items with highWaterMark of 5", t => {
|
||||
return new Promise(async (resolve, reject) => {
|
||||
t.plan(1);
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const highWaterMark = 5;
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
];
|
||||
let pendingReads = input.length;
|
||||
const construct = (destKey: string) => {
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
await sleep(50);
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
first.on("data", () => {
|
||||
pendingReads--;
|
||||
if (pendingReads === 0) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
return first;
|
||||
};
|
||||
const _demux = demux(construct, "key", {
|
||||
highWaterMark: 5,
|
||||
});
|
||||
|
||||
_demux.on("error", err => {
|
||||
reject();
|
||||
});
|
||||
|
||||
for (const item of input) {
|
||||
const res = _demux.write(item);
|
||||
expect(_demux._writableState.length).to.be.at.most(highWaterMark);
|
||||
if (!res) {
|
||||
await new Promise(_resolve => {
|
||||
_demux.once("drain", () => {
|
||||
_resolve();
|
||||
expect(_demux._writableState.length).to.be.equal(0);
|
||||
t.pass();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"demux() should emit drain event when second stream is bottleneck after (highWaterMark - 2) * slowProcessorSpeed ms",
|
||||
t => {
|
||||
// ie) first two items are pushed directly into first and second streams (highWaterMark - 2 remain in demux)
|
||||
t.plan(8);
|
||||
const slowProcessorSpeed = 100;
|
||||
const highWaterMark = 5;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const sink = new Writable({
|
||||
objectMode: true,
|
||||
write(chunk, encoding, cb) {
|
||||
expect(chunk.mapped).to.deep.equal([1, 2]);
|
||||
t.pass();
|
||||
pendingReads--;
|
||||
if (pendingReads === 0) {
|
||||
t.end();
|
||||
}
|
||||
cb();
|
||||
},
|
||||
});
|
||||
const construct = (destKey: string) => {
|
||||
const first = map(
|
||||
(chunk: Chunk) => {
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
const second = map(
|
||||
async (chunk: Chunk) => {
|
||||
await sleep(slowProcessorSpeed);
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
first.pipe(second).pipe(sink);
|
||||
return first;
|
||||
};
|
||||
const _demux = demux(construct, () => "a", {
|
||||
highWaterMark,
|
||||
});
|
||||
_demux.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
|
||||
_demux.on("drain", () => {
|
||||
expect(_demux._writableState.length).to.be.equal(0);
|
||||
expect(performance.now() - start).to.be.greaterThan(
|
||||
slowProcessorSpeed * 3,
|
||||
);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
{ key: "f", mapped: [] },
|
||||
{ key: "g", mapped: [] },
|
||||
];
|
||||
let pendingReads = input.length;
|
||||
|
||||
const start = performance.now();
|
||||
fromArray(input).pipe(_demux);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"demux() should emit drain event when third stream is bottleneck",
|
||||
t => {
|
||||
// @TODO investigate why drain is emitted after slowProcessorSpeed
|
||||
t.plan(8);
|
||||
const slowProcessorSpeed = 100;
|
||||
const highWaterMark = 5;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const sink = new Writable({
|
||||
objectMode: true,
|
||||
write(chunk, encoding, cb) {
|
||||
expect(chunk.mapped).to.deep.equal([1, 2, 3]);
|
||||
t.pass();
|
||||
pendingReads--;
|
||||
if (pendingReads === 0) {
|
||||
t.end();
|
||||
}
|
||||
cb();
|
||||
},
|
||||
});
|
||||
const construct = (destKey: string) => {
|
||||
const first = map(
|
||||
(chunk: Chunk) => {
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
const second = map(
|
||||
(chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
const third = map(
|
||||
async (chunk: Chunk) => {
|
||||
await sleep(slowProcessorSpeed);
|
||||
chunk.mapped.push(3);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
first
|
||||
.pipe(second)
|
||||
.pipe(third)
|
||||
.pipe(sink);
|
||||
return first;
|
||||
};
|
||||
const _demux = demux(construct, () => "a", {
|
||||
highWaterMark,
|
||||
});
|
||||
_demux.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
|
||||
_demux.on("drain", () => {
|
||||
expect(_demux._writableState.length).to.be.equal(0);
|
||||
expect(performance.now() - start).to.be.greaterThan(
|
||||
slowProcessorSpeed,
|
||||
);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
{ key: "f", mapped: [] },
|
||||
{ key: "g", mapped: [] },
|
||||
];
|
||||
let pendingReads = input.length;
|
||||
|
||||
const start = performance.now();
|
||||
fromArray(input).pipe(_demux);
|
||||
},
|
||||
);
|
||||
|
||||
test("demux() should be blocked by slowest pipeline", t => {
|
||||
t.plan(1);
|
||||
const slowProcessorSpeed = 100;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
return new Promise(async (resolve, reject) => {
|
||||
const construct = (destKey: string) => {
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
await sleep(slowProcessorSpeed);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
return first;
|
||||
};
|
||||
|
||||
const _demux = demux(construct, "key", {
|
||||
highWaterMark: 1,
|
||||
});
|
||||
|
||||
_demux.on("error", err => {
|
||||
reject(err);
|
||||
});
|
||||
|
||||
_demux.on("data", async chunk => {
|
||||
pendingReads--;
|
||||
if (chunk.key === "b") {
|
||||
expect(performance.now() - start).to.be.greaterThan(
|
||||
slowProcessorSpeed * totalItems,
|
||||
);
|
||||
t.pass();
|
||||
expect(pendingReads).to.equal(0);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
];
|
||||
|
||||
let pendingReads = input.length;
|
||||
const totalItems = input.length;
|
||||
const start = performance.now();
|
||||
for (const item of input) {
|
||||
if (!_demux.write(item)) {
|
||||
await new Promise(_resolve => {
|
||||
_demux.once("drain", () => {
|
||||
_resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
test.cb("Demux should remux to sink", t => {
|
||||
t.plan(6);
|
||||
let i = 0;
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
];
|
||||
const result = [
|
||||
{ key: "a", visited: ["a"] },
|
||||
{ key: "b", visited: ["b"] },
|
||||
{ key: "a", visited: ["a"] },
|
||||
{ key: "c", visited: ["c"] },
|
||||
{ key: "a", visited: ["a"] },
|
||||
{ key: "b", visited: ["b"] },
|
||||
];
|
||||
const construct = (destKey: string) => {
|
||||
const dest = map((chunk: any) => {
|
||||
chunk.visited.push(destKey);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
return dest;
|
||||
};
|
||||
|
||||
const sink = map(d => {
|
||||
t.deepEqual(d, result[i]);
|
||||
i++;
|
||||
if (i === input.length) {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
|
||||
const demuxed = demux(construct, "key", {});
|
||||
|
||||
fromArray(input)
|
||||
.pipe(demuxed)
|
||||
.pipe(sink);
|
||||
});
|
||||
|
||||
test.cb("Demux should send data events", t => {
|
||||
t.plan(6);
|
||||
let i = 0;
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
];
|
||||
const result = [
|
||||
{ key: "a", visited: ["a"] },
|
||||
{ key: "b", visited: ["b"] },
|
||||
{ key: "a", visited: ["a"] },
|
||||
{ key: "c", visited: ["c"] },
|
||||
{ key: "a", visited: ["a"] },
|
||||
{ key: "b", visited: ["b"] },
|
||||
];
|
||||
const construct = (destKey: string) => {
|
||||
const dest = map((chunk: any) => {
|
||||
chunk.visited.push(destKey);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
return dest;
|
||||
};
|
||||
|
||||
const demuxed = demux(construct, "key", {});
|
||||
|
||||
fromArray(input).pipe(demuxed);
|
||||
|
||||
demuxed.on("data", d => {
|
||||
t.deepEqual(d, result[i]);
|
||||
i++;
|
||||
if (i === input.length) {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
test.cb("demux() `finish` and `end` propagates", t => {
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
|
||||
t.plan(9);
|
||||
|
||||
const construct = (destKey: string) => {
|
||||
const dest = map((chunk: any) => {
|
||||
chunk.mapped.push(destKey);
|
||||
return chunk;
|
||||
});
|
||||
return dest;
|
||||
};
|
||||
|
||||
const _demux = demux(construct, "key", {
|
||||
highWaterMark: 3,
|
||||
});
|
||||
|
||||
const fakeSource = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
return;
|
||||
},
|
||||
});
|
||||
|
||||
const sink = map((d: any) => {
|
||||
const curr = input.shift();
|
||||
t.is(curr.key, d.key);
|
||||
t.deepEqual(d.mapped, [d.key]);
|
||||
});
|
||||
|
||||
fakeSource.pipe(_demux).pipe(sink);
|
||||
|
||||
fakeSource.on("end", () => {
|
||||
t.pass();
|
||||
});
|
||||
_demux.on("finish", () => {
|
||||
t.pass();
|
||||
});
|
||||
_demux.on("unpipe", () => {
|
||||
t.pass();
|
||||
});
|
||||
_demux.on("end", () => {
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
sink.on("finish", () => {
|
||||
t.pass();
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
];
|
||||
fakeSource.push(input[0]);
|
||||
fakeSource.push(input[1]);
|
||||
fakeSource.push(null);
|
||||
});
|
||||
|
||||
test.cb("demux() `unpipe` propagates", t => {
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
|
||||
t.plan(7);
|
||||
|
||||
const construct = (destKey: string) => {
|
||||
const dest = map((chunk: any) => {
|
||||
chunk.mapped.push(destKey);
|
||||
return chunk;
|
||||
});
|
||||
return dest;
|
||||
};
|
||||
|
||||
const _demux = demux(construct, "key", {
|
||||
highWaterMark: 3,
|
||||
});
|
||||
|
||||
const fakeSource = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
return;
|
||||
},
|
||||
});
|
||||
|
||||
const sink = map((d: any) => {
|
||||
const curr = input.shift();
|
||||
t.is(curr.key, d.key);
|
||||
t.deepEqual(d.mapped, [d.key]);
|
||||
});
|
||||
|
||||
fakeSource.pipe(_demux).pipe(sink);
|
||||
|
||||
_demux.on("unpipe", () => {
|
||||
t.pass();
|
||||
});
|
||||
|
||||
sink.on("unpipe", () => {
|
||||
t.pass();
|
||||
});
|
||||
|
||||
sink.on("finish", () => {
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
];
|
||||
fakeSource.push(input[0]);
|
||||
fakeSource.push(input[1]);
|
||||
fakeSource.push(null);
|
||||
});
|
||||
|
||||
test.cb("demux() should be 'destroyable'", t => {
|
||||
t.plan(2);
|
||||
const _sleep = 100;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: string[];
|
||||
}
|
||||
|
||||
const construct = (destKey: string) => {
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_sleep);
|
||||
chunk.mapped.push(destKey);
|
||||
return chunk;
|
||||
});
|
||||
return first;
|
||||
};
|
||||
|
||||
const _demux = demux(construct, "key");
|
||||
|
||||
const fakeSource = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
return;
|
||||
},
|
||||
});
|
||||
|
||||
const fakeSink = new Writable({
|
||||
objectMode: true,
|
||||
write(data, enc, cb) {
|
||||
const cur = input.shift();
|
||||
t.is(cur.key, data.key);
|
||||
t.deepEqual(cur.mapped, ["a"]);
|
||||
if (cur.key === "a") {
|
||||
_demux.destroy();
|
||||
}
|
||||
cb();
|
||||
},
|
||||
});
|
||||
|
||||
_demux.on("close", t.end);
|
||||
fakeSource.pipe(_demux).pipe(fakeSink);
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
fakeSource.push(input[0]);
|
||||
fakeSource.push(input[1]);
|
||||
fakeSource.push(input[2]);
|
||||
fakeSource.push(input[3]);
|
||||
fakeSource.push(input[4]);
|
||||
});
|
||||
29
tests/duplex.spec.ts
Normal file
29
tests/duplex.spec.ts
Normal file
@@ -0,0 +1,29 @@
|
||||
import * as cp from "child_process";
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { duplex } = mhysa();
|
||||
|
||||
test.cb(
|
||||
"duplex() combines a writable and readable stream into a ReadWrite stream",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable();
|
||||
const catProcess = cp.exec("cat");
|
||||
let out = "";
|
||||
source
|
||||
.pipe(duplex(catProcess.stdin!, catProcess.stdout!))
|
||||
.on("data", chunk => (out += chunk))
|
||||
.on("error", t.end)
|
||||
.on("end", () => {
|
||||
expect(out).to.equal("abcdef");
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
source.push("ab");
|
||||
source.push("cd");
|
||||
source.push("ef");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
117
tests/filter.spec.ts
Normal file
117
tests/filter.spec.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import { Readable } from "stream";
|
||||
import mhysa from "../src";
|
||||
const { filter } = mhysa();
|
||||
|
||||
test.cb("filter() filters elements synchronously", t => {
|
||||
t.plan(2);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "c"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(
|
||||
filter((element: string) => element !== "b", {
|
||||
readableObjectMode: true,
|
||||
writableObjectMode: true,
|
||||
}),
|
||||
)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("filter() filters elements asynchronously", t => {
|
||||
t.plan(2);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "c"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(
|
||||
filter(
|
||||
async (element: string) => {
|
||||
await Promise.resolve();
|
||||
return element !== "b";
|
||||
},
|
||||
{ readableObjectMode: true, writableObjectMode: true },
|
||||
),
|
||||
)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb.skip("filter() emits errors during synchronous filtering", t => {
|
||||
t.plan(2);
|
||||
const source = new Readable({ objectMode: true });
|
||||
source
|
||||
.pipe(
|
||||
filter(
|
||||
(element: string) => {
|
||||
if (element !== "a") {
|
||||
throw new Error("Failed filtering");
|
||||
}
|
||||
return true;
|
||||
},
|
||||
{ readableObjectMode: true, writableObjectMode: true },
|
||||
),
|
||||
)
|
||||
.resume()
|
||||
.on("error", err => {
|
||||
expect(err.message).to.equal("Failed filtering");
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb.skip("filter() emits errors during asynchronous filtering", t => {
|
||||
t.plan(2);
|
||||
const source = new Readable({ objectMode: true });
|
||||
source
|
||||
.pipe(
|
||||
filter(
|
||||
async (element: string) => {
|
||||
await Promise.resolve();
|
||||
if (element !== "a") {
|
||||
throw new Error("Failed filtering");
|
||||
}
|
||||
return true;
|
||||
},
|
||||
{ readableObjectMode: true, writableObjectMode: true },
|
||||
),
|
||||
)
|
||||
.resume()
|
||||
.on("error", err => {
|
||||
expect(err.message).to.equal("Failed filtering");
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
101
tests/flatMap.spec.ts
Normal file
101
tests/flatMap.spec.ts
Normal file
@@ -0,0 +1,101 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { flatMap } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("flatMap() maps elements synchronously", t => {
|
||||
t.plan(6);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "A", "b", "B", "c", "C"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(flatMap((element: string) => [element, element.toUpperCase()]))
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("flatMap() maps elements asynchronously", t => {
|
||||
t.plan(6);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "A", "b", "B", "c", "C"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(
|
||||
flatMap(async (element: string) => {
|
||||
await Promise.resolve();
|
||||
return [element, element.toUpperCase()];
|
||||
}),
|
||||
)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb.skip("flatMap() emits errors during synchronous mapping", t => {
|
||||
t.plan(2);
|
||||
const source = new Readable({ objectMode: true });
|
||||
source
|
||||
.pipe(
|
||||
flatMap((element: string) => {
|
||||
if (element !== "a") {
|
||||
throw new Error("Failed mapping");
|
||||
}
|
||||
return [element, element.toUpperCase()];
|
||||
}),
|
||||
)
|
||||
.resume()
|
||||
.on("error", err => {
|
||||
expect(err.message).to.equal("Failed mapping");
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb.skip("flatMap() emits errors during asynchronous mapping", t => {
|
||||
t.plan(2);
|
||||
const source = new Readable({ objectMode: true });
|
||||
source
|
||||
.pipe(
|
||||
flatMap(async (element: string) => {
|
||||
await Promise.resolve();
|
||||
if (element !== "a") {
|
||||
throw new Error("Failed mapping");
|
||||
}
|
||||
return [element, element.toUpperCase()];
|
||||
}),
|
||||
)
|
||||
.resume()
|
||||
.on("error", err => {
|
||||
expect(err.message).to.equal("Failed mapping");
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
46
tests/fromArray.spec.ts
Normal file
46
tests/fromArray.spec.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { fromArray } = mhysa();
|
||||
|
||||
test.cb("fromArray() streams array elements in flowing mode", t => {
|
||||
t.plan(3);
|
||||
const elements = ["a", "b", "c"];
|
||||
const stream = fromArray(elements);
|
||||
let i = 0;
|
||||
stream
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(elements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
|
||||
test.cb("fromArray() ends immediately if there are no array elements", t => {
|
||||
t.plan(0);
|
||||
fromArray([])
|
||||
.on("data", () => t.fail())
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
|
||||
test.cb("fromArray() streams array elements in paused mode", t => {
|
||||
t.plan(3);
|
||||
const elements = ["a", "b", "c"];
|
||||
const stream = fromArray(elements);
|
||||
let i = 0;
|
||||
stream
|
||||
.on("readable", () => {
|
||||
let element = stream.read();
|
||||
while (element !== null) {
|
||||
expect(element).to.equal(elements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
element = stream.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
57
tests/join.spec.ts
Normal file
57
tests/join.spec.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { join } = mhysa();
|
||||
|
||||
test.cb("join() joins chunks using the specified separator", t => {
|
||||
t.plan(9);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedParts = ["ab|", "|", "c|d", "|", "|", "|", "e", "|", "|f|"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(join("|"))
|
||||
.on("data", part => {
|
||||
expect(part).to.equal(expectedParts[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("ab|");
|
||||
source.push("c|d");
|
||||
source.push("|");
|
||||
source.push("e");
|
||||
source.push("|f|");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"join() joins chunks using the specified separator without breaking up multi-byte characters " +
|
||||
"spanning multiple chunks",
|
||||
t => {
|
||||
t.plan(5);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedParts = ["ø", "|", "ö", "|", "一"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(join("|"))
|
||||
.on("data", part => {
|
||||
expect(part).to.equal(expectedParts[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks
|
||||
source.push(Buffer.from("ø").slice(1, 2));
|
||||
source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks
|
||||
source.push(Buffer.from("ö").slice(1, 2));
|
||||
source.push(Buffer.from("一").slice(0, 1)); // 3-byte character spanning three chunks
|
||||
source.push(Buffer.from("一").slice(1, 2));
|
||||
source.push(Buffer.from("一").slice(2, 3));
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
16
tests/last.spec.ts
Normal file
16
tests/last.spec.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { last } = mhysa();
|
||||
|
||||
test("last() resolves to the last chunk streamed by the given readable stream", async t => {
|
||||
const source = new Readable({ objectMode: true });
|
||||
const lastPromise = last(source);
|
||||
source.push("ab");
|
||||
source.push("cd");
|
||||
source.push("ef");
|
||||
source.push(null);
|
||||
const lastChunk = await lastPromise;
|
||||
expect(lastChunk).to.equal("ef");
|
||||
});
|
||||
57
tests/map.spec.ts
Normal file
57
tests/map.spec.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { map } = mhysa();
|
||||
|
||||
test.cb("map() maps elements synchronously", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const mapStream = map((element: string) => element.toUpperCase(), {
|
||||
objectMode: true,
|
||||
});
|
||||
const expectedElements = ["A", "B", "C"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(mapStream)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("map() maps elements asynchronously", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const mapStream = map(
|
||||
async (element: string) => {
|
||||
await Promise.resolve();
|
||||
return element.toUpperCase();
|
||||
},
|
||||
{ objectMode: true },
|
||||
);
|
||||
const expectedElements = ["A", "B", "C"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(mapStream)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
61
tests/merge.spec.ts
Normal file
61
tests/merge.spec.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { merge } = mhysa();
|
||||
|
||||
test.cb(
|
||||
"merge() merges multiple readable streams in chunk arrival order",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: true, read: () => ({}) });
|
||||
const source2 = new Readable({ objectMode: true, read: () => ({}) });
|
||||
const expectedElements = ["a", "d", "b", "e", "c", "f"];
|
||||
let i = 0;
|
||||
merge(source1, source2)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
setTimeout(() => source2.push("d"), 10);
|
||||
setTimeout(() => source1.push("b"), 20);
|
||||
setTimeout(() => source2.push("e"), 30);
|
||||
setTimeout(() => source1.push("c"), 40);
|
||||
setTimeout(() => source2.push("f"), 50);
|
||||
setTimeout(() => source2.push(null), 60);
|
||||
setTimeout(() => source1.push(null), 70);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("merge() merges a readable stream", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true, read: () => ({}) });
|
||||
const expectedElements = ["a", "b", "c"];
|
||||
let i = 0;
|
||||
merge(source)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("merge() merges an empty list of readable streams", t => {
|
||||
t.plan(0);
|
||||
merge()
|
||||
.on("data", () => t.pass())
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
78
tests/parallelMap.spec.ts
Normal file
78
tests/parallelMap.spec.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
import { Readable } from "stream";
|
||||
import { performance } from "perf_hooks";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
import { sleep } from "../src/helpers";
|
||||
const { parallelMap } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("parallelMap() parallel mapping", t => {
|
||||
t.plan(6);
|
||||
const offset = 50;
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = [
|
||||
"a_processed",
|
||||
"b_processed",
|
||||
"c_processed",
|
||||
"d_processed",
|
||||
"e_processed",
|
||||
"f_processed",
|
||||
];
|
||||
interface IPerfData {
|
||||
start: number;
|
||||
output?: string;
|
||||
finish?: number;
|
||||
}
|
||||
const orderedResults: IPerfData[] = [];
|
||||
source
|
||||
.pipe(
|
||||
parallelMap(async (data: any) => {
|
||||
const perfData: IPerfData = { start: performance.now() };
|
||||
const c = data + "_processed";
|
||||
perfData.output = c;
|
||||
await sleep(offset);
|
||||
perfData.finish = performance.now();
|
||||
orderedResults.push(perfData);
|
||||
return c;
|
||||
}, 2),
|
||||
)
|
||||
.on("data", (element: string) => {
|
||||
t.true(expectedElements.includes(element));
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", async () => {
|
||||
expect(orderedResults[0].finish).to.be.lessThan(
|
||||
orderedResults[2].start,
|
||||
);
|
||||
expect(orderedResults[1].finish).to.be.lessThan(
|
||||
orderedResults[3].start,
|
||||
);
|
||||
expect(orderedResults[2].finish).to.be.lessThan(
|
||||
orderedResults[4].start,
|
||||
);
|
||||
expect(orderedResults[3].finish).to.be.lessThan(
|
||||
orderedResults[5].start,
|
||||
);
|
||||
expect(orderedResults[0].start).to.be.lessThan(
|
||||
orderedResults[2].start + offset,
|
||||
);
|
||||
expect(orderedResults[1].start).to.be.lessThan(
|
||||
orderedResults[3].start + offset,
|
||||
);
|
||||
expect(orderedResults[2].start).to.be.lessThan(
|
||||
orderedResults[4].start + offset,
|
||||
);
|
||||
expect(orderedResults[3].start).to.be.lessThan(
|
||||
orderedResults[5].start + offset,
|
||||
);
|
||||
t.end();
|
||||
});
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push("d");
|
||||
source.push("e");
|
||||
source.push("f");
|
||||
source.push(null);
|
||||
});
|
||||
45
tests/parse.spec.ts
Normal file
45
tests/parse.spec.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import { Readable, finished } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { parse } = mhysa();
|
||||
|
||||
test.cb("parse() parses the streamed elements as JSON", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["abc", {}, []];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(parse())
|
||||
.on("data", part => {
|
||||
expect(part).to.deep.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push('"abc"');
|
||||
source.push("{}");
|
||||
source.push("[]");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("parse() emits errors on invalid JSON", t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
|
||||
source
|
||||
.pipe(parse())
|
||||
.resume()
|
||||
.on("error", (d: any) => {
|
||||
t.pass();
|
||||
t.end();
|
||||
})
|
||||
.on("end", t.fail);
|
||||
|
||||
source.push("{}");
|
||||
source.push({});
|
||||
source.push([]);
|
||||
source.push(null);
|
||||
});
|
||||
90
tests/rate.spec.ts
Normal file
90
tests/rate.spec.ts
Normal file
@@ -0,0 +1,90 @@
|
||||
import { Readable } from "stream";
|
||||
import { performance } from "perf_hooks";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { rate } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("rate() sends data at a rate of 150", t => {
|
||||
t.plan(5);
|
||||
const targetRate = 150;
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e"];
|
||||
const start = performance.now();
|
||||
let i = 0;
|
||||
|
||||
source
|
||||
.pipe(rate(targetRate))
|
||||
.on("data", (element: string[]) => {
|
||||
const currentRate = (i / (performance.now() - start)) * 1000;
|
||||
expect(element).to.deep.equal(expectedElements[i]);
|
||||
expect(currentRate).lessThan(targetRate);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push("d");
|
||||
source.push("e");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("rate() sends data at a rate of 50", t => {
|
||||
t.plan(5);
|
||||
const targetRate = 50;
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e"];
|
||||
const start = performance.now();
|
||||
let i = 0;
|
||||
|
||||
source
|
||||
.pipe(rate(targetRate))
|
||||
.on("data", (element: string[]) => {
|
||||
const currentRate = (i / (performance.now() - start)) * 1000;
|
||||
expect(element).to.deep.equal(expectedElements[i]);
|
||||
expect(currentRate).lessThan(targetRate);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push("d");
|
||||
source.push("e");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("rate() sends data at a rate of 1", t => {
|
||||
t.plan(5);
|
||||
const targetRate = 1;
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e"];
|
||||
const start = performance.now();
|
||||
let i = 0;
|
||||
|
||||
source
|
||||
.pipe(rate(targetRate))
|
||||
.on("data", (element: string[]) => {
|
||||
const currentRate = (i / (performance.now() - start)) * 1000;
|
||||
expect(element).to.deep.equal(expectedElements[i]);
|
||||
expect(currentRate).lessThan(targetRate);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push("d");
|
||||
source.push("e");
|
||||
source.push(null);
|
||||
});
|
||||
99
tests/reduce.spec.ts
Normal file
99
tests/reduce.spec.ts
Normal file
@@ -0,0 +1,99 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { reduce } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("reduce() reduces elements synchronously", t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedValue = 6;
|
||||
source
|
||||
.pipe(reduce((acc: number, element: string) => acc + element.length, 0))
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedValue);
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("ab");
|
||||
source.push("cd");
|
||||
source.push("ef");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("reduce() reduces elements asynchronously", t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedValue = 6;
|
||||
source
|
||||
.pipe(
|
||||
reduce(async (acc: number, element: string) => {
|
||||
await Promise.resolve();
|
||||
return acc + element.length;
|
||||
}, 0),
|
||||
)
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedValue);
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("ab");
|
||||
source.push("cd");
|
||||
source.push("ef");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb.skip("reduce() emits errors during synchronous reduce", t => {
|
||||
t.plan(2);
|
||||
const source = new Readable({ objectMode: true });
|
||||
source
|
||||
.pipe(
|
||||
reduce((acc: number, element: string) => {
|
||||
if (element !== "ab") {
|
||||
throw new Error("Failed reduce");
|
||||
}
|
||||
return acc + element.length;
|
||||
}, 0),
|
||||
)
|
||||
.resume()
|
||||
.on("error", err => {
|
||||
expect(err.message).to.equal("Failed reduce");
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("ab");
|
||||
source.push("cd");
|
||||
source.push("ef");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb.skip("reduce() emits errors during asynchronous reduce", t => {
|
||||
t.plan(2);
|
||||
const source = new Readable({ objectMode: true });
|
||||
source
|
||||
.pipe(
|
||||
reduce(async (acc: number, element: string) => {
|
||||
await Promise.resolve();
|
||||
if (element !== "ab") {
|
||||
throw new Error("Failed mapping");
|
||||
}
|
||||
return acc + element.length;
|
||||
}, 0),
|
||||
)
|
||||
.resume()
|
||||
.on("error", err => {
|
||||
expect(err.message).to.equal("Failed mapping");
|
||||
t.pass();
|
||||
})
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("ab");
|
||||
source.push("cd");
|
||||
source.push("ef");
|
||||
source.push(null);
|
||||
});
|
||||
81
tests/replace.spec.ts
Normal file
81
tests/replace.spec.ts
Normal file
@@ -0,0 +1,81 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { replace } = mhysa();
|
||||
|
||||
test.cb(
|
||||
"replace() replaces occurrences of the given string in the streamed elements with the specified " +
|
||||
"replacement string",
|
||||
t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["abc", "xyf", "ghi"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(replace("de", "xy"))
|
||||
.on("data", part => {
|
||||
expect(part).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("abc");
|
||||
source.push("def");
|
||||
source.push("ghi");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"replace() replaces occurrences of the given regular expression in the streamed elements with " +
|
||||
"the specified replacement string",
|
||||
t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["abc", "xyz", "ghi"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(replace(/^def$/, "xyz"))
|
||||
.on("data", part => {
|
||||
expect(part).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("abc");
|
||||
source.push("def");
|
||||
source.push("ghi");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"replace() replaces occurrences of the given multi-byte character even if it spans multiple chunks",
|
||||
t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["ø", "O", "a"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(replace("ö", "O"))
|
||||
.on("data", part => {
|
||||
expect(part).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks
|
||||
source.push(Buffer.from("ø").slice(1, 2));
|
||||
source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks
|
||||
source.push(Buffer.from("ö").slice(1, 2));
|
||||
source.push("a");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
99
tests/split.spec.ts
Normal file
99
tests/split.spec.ts
Normal file
@@ -0,0 +1,99 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { split } = mhysa();
|
||||
|
||||
test.cb("split() splits chunks using the default separator (\\n)", t => {
|
||||
t.plan(5);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedParts = ["ab", "c", "d", "ef", ""];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(split())
|
||||
.on("data", part => {
|
||||
expect(part).to.equal(expectedParts[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("ab\n");
|
||||
source.push("c");
|
||||
source.push("\n");
|
||||
source.push("d");
|
||||
source.push("\nef\n");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb("split() splits chunks using the specified separator", t => {
|
||||
t.plan(6);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedParts = ["ab", "c", "d", "e", "f", ""];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(split("|"))
|
||||
.on("data", (part: string) => {
|
||||
expect(part).to.equal(expectedParts[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("ab|");
|
||||
source.push("c|d");
|
||||
source.push("|");
|
||||
source.push("e");
|
||||
source.push("|f|");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"split() splits utf8 encoded buffers using the specified separator",
|
||||
t => {
|
||||
t.plan(3);
|
||||
const expectedElements = ["a", "b", "c"];
|
||||
let i = 0;
|
||||
const through = split(",");
|
||||
const buf = Buffer.from("a,b,c");
|
||||
through
|
||||
.on("data", element => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
i++;
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
for (let j = 0; j < buf.length; ++j) {
|
||||
through.write(buf.slice(j, j + 1));
|
||||
}
|
||||
through.end();
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"split() splits utf8 encoded buffers with multi-byte characters using the specified separator",
|
||||
t => {
|
||||
t.plan(3);
|
||||
const expectedElements = ["一", "一", "一"];
|
||||
let i = 0;
|
||||
const through = split(",");
|
||||
const buf = Buffer.from("一,一,一"); // Those spaces are multi-byte utf8 characters (code: 4E00)
|
||||
through
|
||||
.on("data", element => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
i++;
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
for (let j = 0; j < buf.length; ++j) {
|
||||
through.write(buf.slice(j, j + 1));
|
||||
}
|
||||
through.end();
|
||||
},
|
||||
);
|
||||
62
tests/stringify.spec.ts
Normal file
62
tests/stringify.spec.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { stringify } = mhysa();
|
||||
|
||||
test.cb("stringify() stringifies the streamed elements as JSON", t => {
|
||||
t.plan(4);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = [
|
||||
'"abc"',
|
||||
"0",
|
||||
'{"a":"a","b":"b","c":"c"}',
|
||||
'["a","b","c"]',
|
||||
];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(stringify())
|
||||
.on("data", part => {
|
||||
expect(part).to.deep.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("abc");
|
||||
source.push(0);
|
||||
source.push({ a: "a", b: "b", c: "c" });
|
||||
source.push(["a", "b", "c"]);
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"stringify() stringifies the streamed elements as pretty-printed JSON",
|
||||
t => {
|
||||
t.plan(4);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = [
|
||||
'"abc"',
|
||||
"0",
|
||||
'{\n "a": "a",\n "b": "b",\n "c": "c"\n}',
|
||||
'[\n "a",\n "b",\n "c"\n]',
|
||||
];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(stringify({ pretty: true }))
|
||||
.on("data", part => {
|
||||
expect(part).to.deep.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("abc");
|
||||
source.push(0);
|
||||
source.push({ a: "a", b: "b", c: "c" });
|
||||
source.push(["a", "b", "c"]);
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
27
tests/unbatch.spec.ts
Normal file
27
tests/unbatch.spec.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import { Readable } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
const { unbatch, batch } = mhysa({ objectMode: true });
|
||||
|
||||
test.cb("unbatch() unbatches", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c"];
|
||||
let i = 0;
|
||||
source
|
||||
.pipe(batch(3))
|
||||
.pipe(unbatch())
|
||||
.on("data", (element: string) => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
9
tests/utils/collected.spec.ts
Normal file
9
tests/utils/collected.spec.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import test from "ava";
|
||||
import { collected } from "../../src/utils";
|
||||
import mhysa from "../../src";
|
||||
const { fromArray, collect } = mhysa({ objectMode: true });
|
||||
|
||||
test("collected returns a promise for the first data point", async t => {
|
||||
const data = collected(fromArray([1, 2, 3, 4]).pipe(collect()));
|
||||
t.deepEqual(await data, [1, 2, 3, 4]);
|
||||
});
|
||||
@@ -3,16 +3,19 @@
|
||||
"noImplicitAny": true,
|
||||
"strictNullChecks": true,
|
||||
"noImplicitReturns": true,
|
||||
"noUnusedLocals": true,
|
||||
"noUnusedLocals": false,
|
||||
"noImplicitThis": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"suppressImplicitAnyIndexErrors": true,
|
||||
"outDir": "./dist",
|
||||
"module": "commonjs",
|
||||
"target": "es5",
|
||||
"lib": ["es2016"],
|
||||
"sourceMap": true,
|
||||
"declaration": true
|
||||
"module": "commonjs"
|
||||
},
|
||||
"include": ["src/**/*.ts"]
|
||||
"target": "es5",
|
||||
"lib": [
|
||||
"es2016"
|
||||
],
|
||||
"sourceMap": true,
|
||||
"declaration": true,
|
||||
"include": ["src/**/*"],
|
||||
"exclude": ["tests", "node_modules"]
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
"no-implicit-dependencies": [true, "dev"],
|
||||
"prettier": [true, ".prettierrc"],
|
||||
"ordered-imports": false,
|
||||
"interface-name": false
|
||||
"interface-name": false,
|
||||
"object-literal-sort-keys": false
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user