89 Commits

Author SHA1 Message Date
Jerry Kurian
12efbec698 Update tests 2020-01-28 09:48:29 -05:00
Jerry Kurian
ce2bb55b24 Emit correct event 2020-01-27 16:10:00 -05:00
Jerry Kurian
2bbc5c9e0f types 2020-01-27 13:11:51 -05:00
Jerry Kurian
8856cb8d3b Update demux 2020-01-27 13:07:37 -05:00
Jerry Kurian
2b1308a605 use fromArray 2020-01-27 09:29:59 -05:00
Jerry Kurian
11ed6f81e7 remove console log 2020-01-26 10:43:33 -05:00
Jerry Kurian
cf719b25cf Fix broken test 2020-01-26 10:37:59 -05:00
Jerry Kurian
9c09957775 Add test for remux 2020-01-26 10:26:54 -05:00
Jerry Kurian
bff4e0d6ed Add remux options 2020-01-26 09:55:35 -05:00
Jerry Kurian
bd178ce2f0 Revert to old 2020-01-25 12:36:38 -05:00
Jerry Kurian
1227ce7095 Allow demux to be piped (mux) 2020-01-25 12:33:09 -05:00
Lewis Diamond
179d526c6c 2.0.0-alpha.1 2019-12-06 17:14:39 -05:00
Lewis Diamond
4b806c4d4e Fix a few thing with compose 2019-12-06 16:38:52 -05:00
Lewis Diamond
ff2b652ddf Merge pull request #3 from Jogogoplay/feature/ObjectModeByConfig
DefaultOptions implemented as module factory
2019-12-02 16:27:01 -05:00
Lewis Diamond
107bc17bd4 Remove useless generic on accumulator 2019-12-02 16:23:09 -05:00
Lewis Diamond
1b3be03db3 Add vim merge files to .gitignore 2019-12-02 16:21:12 -05:00
Lewis Diamond
fddaa03300 Join should use UTF-8 Encoding 2019-12-02 16:18:24 -05:00
Lewis Diamond
57645c68da Removed console log 2019-12-02 16:17:27 -05:00
Lewis Diamond
a45a144854 Removed useless interface in split 2019-12-02 16:16:43 -05:00
Lewis Diamond
7ab8541cf6 Removed useless params in unbatch 2019-12-02 16:14:32 -05:00
Lewis Diamond
a85054fd82 DefaultOptions implemented as module factory 2019-12-02 16:05:27 -05:00
Jerry Kurian
c690185ab7 Update demux 2019-11-27 16:55:13 -05:00
Jerry Kurian
9b31479406 10% error for compose 2019-11-01 10:10:46 -04:00
Jerry Kurian
298a8b328d Clone buffer before push 2019-11-01 10:09:13 -04:00
Jerry Kurian
1794910b64 Package version 2019-10-08 11:11:46 -04:00
Lewis Diamond
e08558ca88 Merge pull request #2 from Jogogoplay/feature/accumulator
batch,  unbatch, rate, parallel, accumulator with rolling / sliding windows, demultiplexer
2019-10-08 09:45:26 -04:00
Jerry Kurian
ac21fb7ea6 Update package json 2019-09-26 14:31:23 -04:00
Jerry Kurian
9e14d8c044 Update package json 2019-09-26 14:11:17 -04:00
Jerry Kurian
4f80d44ed8 Improve test 2019-09-26 10:36:36 -04:00
Jerry Kurian
b8bd69eb01 Split rate tests 2019-09-26 09:53:15 -04:00
Jerry Kurian
f6e3a03eb7 Add TODO 2019-09-26 09:44:42 -04:00
Jerry Kurian
f177f95f52 Remove logs 2019-09-26 09:24:58 -04:00
Jerry Kurian
a11aa10d16 Clean up 2019-09-26 09:23:09 -04:00
Jerry Kurian
70edee51c4 Update interface 2019-09-13 08:57:19 -04:00
Jerry Kurian
158475183a Cleanup console logs 2019-09-12 15:35:59 -04:00
Jerry Kurian
48a231d61c Remote rate from accumulator 2019-09-12 15:34:42 -04:00
Jerry Kurian
4c7e9ceb7e Improve interface for accumulator 2019-09-12 14:40:47 -04:00
Jerry Kurian
517e281ce5 Remove try catch from provided functions, user handles errors 2019-09-12 09:41:04 -04:00
Jerry Kurian
586f618e95 Update demux 2019-09-12 09:08:49 -04:00
Jerry Kurian
65c36a8f22 Update tests 2019-09-11 16:33:02 -04:00
Jerry Kurian
ce19c5e987 Add test for drain events 2019-09-11 15:09:51 -04:00
Jerry Kurian
f06cb1c33e Remove console log 2019-09-11 14:31:06 -04:00
Jerry Kurian
dcfd6fe4c2 Update tests 2019-09-11 14:29:20 -04:00
Jerry Kurian
9d280b1662 Wait for drain when write returns false in demux 2019-09-10 18:13:13 -04:00
Jerry Kurian
ee3d9b9ded Add spies to ensure demux handles keys correctly 2019-09-10 12:09:26 -04:00
Jerry Kurian
83ef6e9734 remove duplicate descriptions 2019-09-09 15:58:35 -04:00
Jerry Kurian
7aeea4815a Add descriptions for demux and compose 2019-09-09 15:54:29 -04:00
Jerry Kurian
d33d8dcad3 Add generic type 2019-09-09 15:15:40 -04:00
Jerry Kurian
eed36a4fe9 Lots of stuff 2019-09-09 14:43:18 -04:00
Jerry Kurian
ea2ffdb38c Add test for unwritable streams in demux 2019-09-09 13:47:38 -04:00
Jerry Kurian
0067ba6a7c Add tests for demux 2019-09-09 11:53:21 -04:00
Jerry Kurian
599ba16d48 Add more tests for compose 2019-09-09 08:58:04 -04:00
Jerry Kurian
2cbeae38e7 Test readable length in first 2019-09-07 17:14:08 -04:00
Jerry Kurian
ae7c9d6b09 Add test for highwatermark 2019-09-07 14:27:55 -04:00
Jerry Kurian
cd10649d44 WIP Add some backpressure tests for compose 2019-09-07 11:04:33 -04:00
Lewis Diamond
d5f3fd8bd8 Merge branch 'feature/accumulator' of github.com:Jogogoplay/Mhysa into feature/accumulator 2019-08-30 15:26:43 -04:00
Lewis Diamond
2ee04a2d79 unclean 2019-08-30 15:24:38 -04:00
Jerry Kurian
fe0e53147c Handle backpressure 2019-08-30 09:33:29 -04:00
Jerry Kurian
2524d51aa7 Allow CB to be called by construction streams 2019-08-29 14:39:08 -04:00
Jerry Kurian
9765e6cb49 Update tests to write to sink 2019-08-29 08:50:11 -04:00
Jerry Kurian
685215bee6 Add test for keyBy 2019-08-28 17:04:31 -04:00
Jerry Kurian
9b09a3f949 Add demux 2019-08-28 17:01:51 -04:00
Jerry Kurian
c7903376e9 DuplexOptions 2019-08-22 16:47:43 -04:00
Jerry Kurian
f35f025dbc Use class 2019-08-22 15:35:36 -04:00
Jerry Kurian
1d0e15890c Tests 2019-08-22 14:52:39 -04:00
Jerry Kurian
d097fa6aa5 Save 2019-08-22 12:07:30 -04:00
Jerry Kurian
1e7fad2403 Remove composed.spec 2019-08-21 15:40:34 -04:00
Jerry Kurian
6581e1d745 Save 2019-08-21 15:40:19 -04:00
Jerry Kurian
7394b6ef84 Skip full period in rate 2019-08-16 10:06:23 -04:00
Jerry Kurian
50f6886b4b Cleanup 2019-08-16 10:01:55 -04:00
Jerry Kurian
4e80e48fa4 Remove paths 2019-08-16 09:27:17 -04:00
Jerry Kurian
047ff66ee1 Remove unused lib 2019-08-16 09:04:59 -04:00
Jerry Kurian
faac6134af Refactoring 2019-08-16 09:02:54 -04:00
Jerry Kurian
505fefeeb5 Save 2019-08-15 17:06:54 -04:00
Jerry Kurian
d6d974ee0d baseDefinitions 2019-08-15 15:54:53 -04:00
Jerry Kurian
5a9fcc94a6 Refactor 2019-08-15 15:42:54 -04:00
Jerry Kurian
27b4b2427b Tests 2019-08-15 14:27:51 -04:00
Jerry Kurian
6a9f6ff919 Export replace 2019-08-15 11:56:43 -04:00
Jerry Kurian
a40b1bf38c Save 2019-08-15 11:54:50 -04:00
Jerry Kurian
3a1fbf44d7 More tests 2019-08-12 14:42:54 -04:00
Jerry Kurian
c72ecaf219 Add FlushStrategy as enum 2019-08-12 12:08:42 -04:00
Jerry Kurian
e8d672d903 Clean up types 2019-08-12 11:59:27 -04:00
Jerry Kurian
5112ee9540 Types 2019-08-12 11:08:53 -04:00
Jerry Kurian
e932adde67 Update tests 2019-08-12 11:07:39 -04:00
Jerry Kurian
fdcc5bafc6 Add sliding, rolling functions with tests 2019-08-09 17:13:48 -04:00
Jerry Kurian
c1ef5fec4b Export accumulator and map enum 2019-08-09 09:58:14 -04:00
Jerry Kurian
a60b23496b Add tests 2019-08-08 10:58:56 -04:00
Jerry Kurian
d918d8ca10 Save 2019-08-07 18:46:33 -04:00
Jerry Kurian
af9293ab52 Save 2019-08-07 17:18:51 -04:00
58 changed files with 5873 additions and 3996 deletions

11
.gitignore vendored
View File

@@ -4,3 +4,14 @@ dist
sample_output
yarn-error.log
TODO.md
#VIM
## Swap
[._]*.s[a-v][a-z]
!*.svg # comment out if you don't need vector files
[._]*.sw[a-p]
[._]s[a-rt-v][a-z]
[._]ss[a-gi-z]
[._]sw[a-p]
*.orig

View File

@@ -1,6 +1,6 @@
{
"name": "mhysa",
"version": "1.0.2",
"name": "@jogogo/mhysa",
"version": "2.0.0-alpha.1",
"description": "Streams and event emitter utils for Node.js",
"keywords": [
"promise",
@@ -11,40 +11,54 @@
"author": {
"name": "Wenzil"
},
"contributors": [
{
"name": "jerry",
"email": "jerry@jogogo.co"
},
{
"name": "lewis",
"email": "lewis@jogogo.co"
}
],
"license": "MIT",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": [
"dist"
],
"publishConfig": {
"registry": "https://npm.dev.jogogo.co/"
},
"repository": {
"url": "git@github.com:Wenzil/Mhysa.git",
"url": "git@github.com:Jogogoplay/mhysa.git",
"type": "git"
},
"scripts": {
"test": "ava",
"lint": "tslint -p tsconfig.json",
"validate:tslint": "tslint-config-prettier-check ./tslint.json",
"prepublishOnly": "yarn lint && yarn test && yarn tsc"
"prepublishOnly": "yarn lint && yarn test && yarn tsc -d"
},
"dependencies": {},
"devDependencies": {
"@types/chai": "^4.1.7",
"@types/node": "^10.12.10",
"@types/typescript": "^2.0.0",
"ava": "^1.0.0-rc.2",
"@types/node": "^12.7.2",
"@types/sinon": "^7.0.13",
"ava": "^2.4.0",
"chai": "^4.2.0",
"mhysa": "./",
"prettier": "^1.14.3",
"ts-node": "^7.0.1",
"sinon": "^7.4.2",
"ts-node": "^8.3.0",
"tslint": "^5.11.0",
"tslint-config-prettier": "^1.16.0",
"tslint-plugin-prettier": "^2.0.1",
"typescript": "^3.1.6"
"typescript": "^3.5.3"
},
"ava": {
"files": [
"src/**/*.spec.ts"
"tests/*.spec.ts"
],
"sources": [
"src/**/*.ts"

View File

@@ -0,0 +1,180 @@
import { Transform, TransformOptions } from "stream";
export enum FlushStrategy {
rolling = "rolling",
sliding = "sliding",
}
export type AccumulatorByIteratee<T> = (event: T, bufferChunk: T) => boolean;
function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
shouldFlush: boolean = true,
options: TransformOptions = {},
) {
const buffer: T[] = [];
return new Transform({
...options,
transform(data: T, encoding, callback) {
accumulateBy(data, buffer, this);
callback();
},
flush(callback) {
if (shouldFlush) {
this.push(buffer);
}
callback();
},
});
}
function _sliding<T>(
windowLength: number,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (key) {
let index = 0;
if (event[key] === undefined) {
stream.emit(
"error",
new Error(
`Key is missing in event: (${key}, ${JSON.stringify(
event,
)})`,
),
);
stream.resume();
return;
}
while (
index < buffer.length &&
buffer[index][key] + windowLength <= event[key]
) {
index++;
}
buffer.splice(0, index);
} else if (buffer.length === windowLength) {
buffer.shift();
}
buffer.push(event);
stream.push([...buffer]);
};
}
function _slidingByFunction<T>(
iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
let index = 0;
while (index < buffer.length && iteratee(event, buffer[index])) {
index++;
}
buffer.splice(0, index);
buffer.push(event);
stream.push([...buffer]);
};
}
function _rollingByFunction<T>(
iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (iteratee) {
if (buffer.length > 0 && iteratee(event, buffer[0])) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
}
buffer.push(event);
};
}
function _rolling<T>(
windowLength: number,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (key) {
if (event[key] === undefined) {
stream.emit(
"error",
new Error(
`Key is missing in event: (${key}, ${JSON.stringify(
event,
)})`,
),
);
stream.resume();
return;
} else if (
buffer.length > 0 &&
buffer[0][key] + windowLength <= event[key]
) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
} else if (buffer.length === windowLength) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
buffer.push(event);
};
}
export function accumulator(
flushStrategy: FlushStrategy,
batchSize: number,
keyBy?: string,
options?: TransformOptions,
): Transform {
switch (flushStrategy) {
case FlushStrategy.sliding:
return sliding(batchSize, keyBy, options);
case FlushStrategy.rolling:
return rolling(batchSize, keyBy, options);
}
}
export function accumulatorBy<T>(
flushStrategy: FlushStrategy,
iteratee: AccumulatorByIteratee<T>,
options?: TransformOptions,
): Transform {
switch (flushStrategy) {
case FlushStrategy.sliding:
return slidingBy(iteratee, options);
case FlushStrategy.rolling:
return rollingBy(iteratee, options);
}
}
function sliding(
windowLength: number,
key?: string,
options?: TransformOptions,
): Transform {
return _accumulator(_sliding(windowLength, key), false, options);
}
function slidingBy<T>(
iteratee: AccumulatorByIteratee<T>,
options?: TransformOptions,
): Transform {
return _accumulator(_slidingByFunction(iteratee), false, options);
}
function rolling(
windowLength: number,
key?: string,
options?: TransformOptions,
): Transform {
return _accumulator(_rolling(windowLength, key), true, options);
}
function rollingBy<T>(
iteratee: AccumulatorByIteratee<T>,
options?: TransformOptions,
): Transform {
return _accumulator(_rollingByFunction(iteratee), true, options);
}

View File

@@ -1,12 +1,3 @@
export interface ThroughOptions {
objectMode?: boolean;
}
export interface TransformOptions {
readableObjectMode?: boolean;
writableObjectMode?: boolean;
}
export interface WithEncoding {
encoding: string;
}
@@ -21,4 +12,3 @@ export type JsonValue = JsonPrimitive | JsonPrimitive[];
export interface JsonParseOptions {
pretty: boolean;
}

38
src/functions/batch.ts Normal file
View File

@@ -0,0 +1,38 @@
import { Transform, TransformOptions } from "stream";
export function batch(
batchSize: number = 1000,
maxBatchAge: number = 500,
options: TransformOptions = {},
): Transform {
let buffer: any[] = [];
let timer: NodeJS.Timer | null = null;
const sendChunk = (self: Transform) => {
if (timer) {
clearTimeout(timer);
}
timer = null;
self.push(buffer);
buffer = [];
};
return new Transform({
...options,
transform(chunk, encoding, callback) {
buffer.push(chunk);
if (buffer.length === batchSize) {
sendChunk(this);
} else {
if (timer === null) {
timer = setInterval(() => {
sendChunk(this);
}, maxBatchAge);
}
}
callback();
},
flush(callback) {
sendChunk(this);
callback();
},
});
}

11
src/functions/child.ts Normal file
View File

@@ -0,0 +1,11 @@
import { ChildProcess } from "child_process";
import { duplex } from "./duplex";
export function child(childProcess: ChildProcess) {
if (childProcess.stdin === null) {
throw new Error("childProcess.stdin is null");
} else if (childProcess.stdout === null) {
throw new Error("childProcess.stdout is null");
}
return duplex(childProcess.stdin, childProcess.stdout);
}

18
src/functions/collect.ts Normal file
View File

@@ -0,0 +1,18 @@
import { Transform, TransformOptions } from "stream";
export function collect(options: TransformOptions = {}): Transform {
const collected: any[] = [];
return new Transform({
...options,
transform(data, encoding, callback) {
collected.push(data);
callback();
},
flush(callback) {
this.push(
options.objectMode ? collected : Buffer.concat(collected),
);
callback();
},
});
}

88
src/functions/compose.ts Normal file
View File

@@ -0,0 +1,88 @@
import { pipeline, TransformOptions, Transform } from "stream";
import { AllStreams, isReadable } from "../helpers";
export function compose(
streams: Array<
NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream
>,
errorCallback?: (err: any) => void,
options?: TransformOptions,
): Compose {
if (streams.length < 2) {
throw new Error("At least two streams are required to compose");
}
return new Compose(streams, errorCallback, options);
}
enum EventSubscription {
Last = 0,
First,
All,
Self,
}
export class Compose extends Transform {
private first: AllStreams;
private last: AllStreams;
private streams: AllStreams[];
private inputStream: ReadableStream;
constructor(
streams: AllStreams[],
errorCallback?: (err: any) => void,
options?: TransformOptions,
) {
super(options);
this.first = streams[0];
this.last = streams[streams.length - 1];
this.streams = streams;
pipeline(
streams,
errorCallback ||
((error: any) => {
if (error) {
this.emit("error", error);
}
}),
);
if (isReadable(this.last)) {
(this.last as NodeJS.ReadWriteStream).pipe(
new Transform({
...options,
transform: (d: any, encoding, cb) => {
this.push(d);
cb();
},
}),
);
}
}
public _transform(chunk: any, encoding: string, cb: any) {
(this.first as NodeJS.WritableStream).write(chunk, encoding, cb);
}
public _flush(cb: any) {
if (isReadable(this.first)) {
(this.first as any).push(null);
}
this.last.once("end", () => {
cb();
});
}
public _destroy(error: any, cb: (error?: any) => void) {
this.streams.forEach(s => (s as any).destroy());
cb(error);
}
public bubble(...events: string[]) {
this.streams.forEach(s => {
events.forEach(e => {
s.on(e, (...args) => super.emit(e, ...args));
});
});
}
}

37
src/functions/concat.ts Normal file
View File

@@ -0,0 +1,37 @@
import { Readable } from "stream";
export function concat(...streams: NodeJS.ReadableStream[]): Readable {
let isStarted = false;
let currentStreamIndex = 0;
const startCurrentStream = () => {
if (currentStreamIndex >= streams.length) {
wrapper.push(null);
} else {
streams[currentStreamIndex]
.on("data", chunk => {
if (!wrapper.push(chunk)) {
streams[currentStreamIndex].pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => {
currentStreamIndex++;
startCurrentStream();
});
}
};
const wrapper = new Readable({
objectMode: true,
read() {
if (!isStarted) {
isStarted = true;
startCurrentStream();
}
if (currentStreamIndex < streams.length) {
streams[currentStreamIndex].resume();
}
},
});
return wrapper;
}

103
src/functions/demux.ts Normal file
View File

@@ -0,0 +1,103 @@
import { DuplexOptions, Duplex, Transform, Writable } from "stream";
import { isReadable } from "../helpers";
enum EventSubscription {
Last = 0,
First,
All,
Self,
Unhandled,
}
type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
interface DemuxOptions extends DuplexOptions {
remultiplex?: boolean;
}
export function demux(
construct: (destKey?: string) => DemuxStreams,
demuxBy: string | ((chunk: any) => string),
options?: DemuxOptions,
): Duplex {
return new Demux(construct, demuxBy, options);
}
class Demux extends Duplex {
private streamsByKey: {
[key: string]: DemuxStreams;
};
private demuxer: (chunk: any) => string;
private construct: (destKey?: string) => DemuxStreams;
private remultiplex: boolean;
private transform: Transform;
constructor(
construct: (destKey?: string) => DemuxStreams,
demuxBy: string | ((chunk: any) => string),
options: DemuxOptions = {},
) {
super(options);
this.demuxer =
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
this.construct = construct;
this.remultiplex =
options.remultiplex === undefined ? true : options.remultiplex;
this.streamsByKey = {};
this.transform = new Transform({
...options,
transform: (d, _, cb) => {
this.push(d);
cb(null);
},
});
this.on("unpipe", () => this._flush());
}
public _read(size: number) {}
public async _write(chunk: any, encoding: any, cb: any) {
const destKey = this.demuxer(chunk);
if (this.streamsByKey[destKey] === undefined) {
const newPipeline = await this.construct(destKey);
this.streamsByKey[destKey] = newPipeline;
if (this.remultiplex && isReadable(newPipeline)) {
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
} else if (this.remultiplex) {
console.error(
`Pipeline construct for ${destKey} does not implement readable interface`,
);
}
}
if (!this.streamsByKey[destKey].write(chunk, encoding)) {
this.streamsByKey[destKey].once("drain", () => {
cb();
});
} else {
cb();
}
}
public _flush() {
const pipelines = Object.values(this.streamsByKey);
let totalEnded = 0;
pipelines.forEach(pipeline => {
pipeline.once("end", () => {
totalEnded++;
if (pipelines.length === totalEnded) {
this.push(null);
this.emit("end");
}
});
});
pipelines.forEach(pipeline => pipeline.end());
}
public _destroy(error: any, cb: (error?: any) => void) {
const pipelines = Object.values(this.streamsByKey);
pipelines.forEach(p => (p as any).destroy());
cb(error);
}
}

31
src/functions/duplex.ts Normal file
View File

@@ -0,0 +1,31 @@
import { Duplex } from "stream";
export function duplex(
writable: NodeJS.WritableStream,
readable: NodeJS.ReadableStream,
) {
const wrapper = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
read() {
readable.resume();
},
write(chunk, encoding, callback) {
return writable.write(chunk, encoding, callback);
},
final(callback) {
writable.end(callback);
},
});
readable
.on("data", chunk => {
if (!wrapper.push(chunk)) {
readable.pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => wrapper.push(null));
writable.on("drain", () => wrapper.emit("drain"));
writable.on("error", err => wrapper.emit("error", err));
return wrapper;
}

20
src/functions/filter.ts Normal file
View File

@@ -0,0 +1,20 @@
import { Transform, TransformOptions } from "stream";
export function filter<T>(
predicate:
| ((chunk: T, encoding: string) => boolean)
| ((chunk: T, encoding: string) => Promise<boolean>),
options?: TransformOptions,
) {
return new Transform({
...options,
async transform(chunk: T, encoding?: any, callback?: any) {
const result = await predicate(chunk, encoding);
if (result === true) {
callback(null, chunk);
} else {
callback();
}
},
});
}

16
src/functions/flatMap.ts Normal file
View File

@@ -0,0 +1,16 @@
import { Transform, TransformOptions } from "stream";
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options?: TransformOptions,
): Transform {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
(await mapper(chunk, encoding)).forEach(c => this.push(c));
callback();
},
});
}

View File

@@ -0,0 +1,16 @@
import { Readable } from "stream";
export function fromArray(array: any[]): Readable {
let cursor = 0;
return new Readable({
objectMode: true,
read() {
if (cursor < array.length) {
this.push(array[cursor]);
cursor++;
} else {
this.push(null);
}
},
});
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,604 +0,0 @@
import { Transform, Readable, Writable, Duplex } from "stream";
import { performance } from "perf_hooks";
import { ChildProcess } from "child_process";
import { StringDecoder } from "string_decoder";
import {
TransformOptions,
ThroughOptions,
WithEncoding,
SerializationFormats,
JsonValue,
JsonParseOptions,
} from "./definitions";
import { sleep } from "../helpers";
/**
* Convert an array into a Readable stream of its elements
* @param array Array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
let cursor = 0;
return new Readable({
objectMode: true,
read() {
if (cursor < array.length) {
this.push(array[cursor]);
cursor++;
} else {
this.push(null);
}
},
});
}
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>(
mapper: (chunk: T, encoding: string) => R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
try {
const mapped = await mapper(chunk, encoding);
this.push(mapped);
callback();
} catch (err) {
callback(err);
}
},
});
}
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
(await mapped).forEach(c => this.push(c));
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function filter<T>(
predicate:
| ((chunk: T, encoding: string) => boolean)
| ((chunk: T, encoding: string) => Promise<boolean>),
options: ThroughOptions = {
objectMode: true,
},
) {
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = predicate(chunk, encoding);
isPromise = result instanceof Promise;
if (!!(await result)) {
callback(undefined, chunk);
} else {
callback();
}
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value
* @param iteratee Reducer function to apply on each streamed chunk
* @param initialValue Initial value
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
let value = initialValue;
return new Transform({
readableObjectMode: options.readableObjectMode,
writableObjectMode: options.writableObjectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = iteratee(value, chunk, encoding);
isPromise = result instanceof Promise;
value = await result;
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
flush(callback) {
// Best effort attempt at yielding the final value (will throw if e.g. yielding an object and
// downstream doesn't expect objects)
try {
callback(undefined, value);
} catch (err) {
try {
this.emit("error", err);
} catch {
// Best effort was made
}
}
},
});
}
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator Separator to split by, defaulting to "\n"
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function split(
separator: string | RegExp = "\n",
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let buffered = "";
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
const splitted = asString.split(separator);
if (splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
buffered += splitted[splitted.length - 1];
splitted.slice(0, -1).forEach((part: string) => this.push(part));
callback();
},
flush(callback) {
callback(undefined, buffered + decoder.end());
},
});
}
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function join(
separator: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let isFirstChunk = true;
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(asString);
isFirstChunk = false;
}
callback();
},
});
}
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
callback(
undefined,
asString.replace(searchValue, replaceValue),
);
} else {
callback();
}
},
});
}
/**
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string.
* @param format Format of serialized data, only utf8 supported.
*/
export function parse(
format: SerializationFormats = SerializationFormats.utf8,
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(format);
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
try {
const asString = decoder.write(chunk);
// Using await causes parsing errors to be emitted
callback(undefined, await JSON.parse(asString));
} catch (err) {
callback(err);
}
},
});
}
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
*/
export function stringify(
options: JsonParseOptions = { pretty: false },
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: JsonValue, encoding, callback) {
callback(
undefined,
options.pretty
? JSON.stringify(chunk, null, 2)
: JSON.stringify(chunk),
);
},
});
}
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function collect(
options: ThroughOptions = { objectMode: false },
): NodeJS.ReadWriteStream {
const collected: any[] = [];
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
transform(data, encoding, callback) {
collected.push(data);
callback();
},
flush(callback) {
this.push(
options.objectMode ? collected : Buffer.concat(collected),
);
callback();
},
});
}
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
export function concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let currentStreamIndex = 0;
const startCurrentStream = () => {
if (currentStreamIndex >= streams.length) {
wrapper.push(null);
} else {
streams[currentStreamIndex]
.on("data", chunk => {
if (!wrapper.push(chunk)) {
streams[currentStreamIndex].pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => {
currentStreamIndex++;
startCurrentStream();
});
}
};
const wrapper = new Readable({
objectMode: true,
read() {
if (!isStarted) {
isStarted = true;
startCurrentStream();
}
if (currentStreamIndex < streams.length) {
streams[currentStreamIndex].resume();
}
},
});
return wrapper;
}
/**
* Return a Readable stream of readable streams merged together in chunk arrival order
* @param streams Readable streams to merge
*/
export function merge(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let streamEndedCount = 0;
return new Readable({
objectMode: true,
read() {
if (streamEndedCount >= streams.length) {
this.push(null);
} else if (!isStarted) {
isStarted = true;
streams.forEach(stream =>
stream
.on("data", chunk => {
if (!this.push(chunk)) {
streams.forEach(s => s.pause());
}
})
.on("error", err => this.emit("error", err))
.on("end", () => {
streamEndedCount++;
if (streamEndedCount === streams.length) {
this.push(null);
}
}),
);
} else {
streams.forEach(s => s.resume());
}
},
});
}
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
export function duplex(writable: Writable, readable: Readable) {
const wrapper = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
read() {
readable.resume();
},
write(chunk, encoding, callback) {
return writable.write(chunk, encoding, callback);
},
final(callback) {
writable.end(callback);
},
});
readable
.on("data", chunk => {
if (!wrapper.push(chunk)) {
readable.pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => wrapper.push(null));
writable.on("drain", () => wrapper.emit("drain"));
writable.on("error", err => wrapper.emit("error", err));
return wrapper;
}
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
export function child(childProcess: ChildProcess) {
return duplex(childProcess.stdin, childProcess.stdout);
}
/**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended
* @param readable Readable stream to wait on
*/
export function last<T>(readable: Readable): Promise<T | null> {
let lastChunk: T | null = null;
return new Promise((resolve, reject) => {
readable
.on("data", chunk => (lastChunk = chunk))
.on("end", () => resolve(lastChunk));
});
}
/**
* Stores chunks of data internally in array and batches when batchSize is reached.
*
* @param batchSize Size of the batches
* @param maxBatchAge Max lifetime of a batch
*/
export function batch(batchSize: number = 1000, maxBatchAge: number = 500) {
let buffer: any[] = [];
let timer: NodeJS.Timer | null = null;
let sendChunk = (self: Transform) => {
timer && clearTimeout(timer);
timer = null;
self.push(buffer);
buffer = [];
};
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
buffer.push(chunk);
if (buffer.length === batchSize) {
sendChunk(this);
} else {
if (timer === null) {
timer = setInterval(() => {
sendChunk(this);
}, maxBatchAge);
}
}
callback();
},
flush(callback) {
console.error("flushing");
sendChunk(this);
callback();
},
});
}
/**
* Unbatches and sends individual chunks of data
*/
export function unbatch() {
return new Transform({
objectMode: true,
transform(data, encoding, callback) {
for (const d of data) {
this.push(d);
}
callback();
},
});
}
/**
* Limits date of data transferred into stream.
* @param targetRate Desired rate in ms
* @param period Period to sleep for when rate is above or equal to targetRate
*/
export function rate(targetRate: number = 50, period: number = 2) {
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period
let total = 0;
const start = performance.now();
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
const currentRate = (total / (performance.now() - start)) * 1000;
if (targetRate && currentRate > targetRate) {
await sleep(deltaMS);
}
total += 1;
callback(undefined, data);
},
});
}
/**
* Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
*/
export function parallelMap<T, R>(
mapper: (data: T) => R,
parallel: number = 10,
sleepTime: number = 5,
) {
let inflight = 0;
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
while (parallel <= inflight) {
await sleep(sleepTime);
}
inflight += 1;
callback();
try {
const res = await mapper(data);
this.push(res);
} catch (e) {
this.emit(e);
} finally {
inflight -= 1;
}
},
async flush(callback) {
while (inflight > 0) {
await sleep(sleepTime);
}
callback();
},
});
}

View File

@@ -1,241 +1,267 @@
import { Readable, Writable } from "stream";
import { ChildProcess } from "child_process";
import * as baseFunctions from "./functions";
import {
ThroughOptions,
Transform,
TransformOptions,
WithEncoding,
JsonParseOptions,
} from "./definitions";
WritableOptions,
ReadableOptions,
} from "stream";
import { accumulator, accumulatorBy } from "./accumulator";
import { batch } from "./batch";
import { child } from "./child";
import { collect } from "./collect";
import { concat } from "./concat";
import { duplex } from "./duplex";
import { filter } from "./filter";
import { flatMap } from "./flatMap";
import { fromArray } from "./fromArray";
import { join } from "./join";
import { last } from "./last";
import { map } from "./map";
import { merge } from "./merge";
import { parallelMap } from "./parallelMap";
import { parse } from "./parse";
import { rate } from "./rate";
import { reduce } from "./reduce";
import { replace } from "./replace";
import { split } from "./split";
import { stringify } from "./stringify";
import { unbatch } from "./unbatch";
import { compose } from "./compose";
import { demux } from "./demux";
/**
* Convert an array into a Readable stream of its elements
* @param array Array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
return baseFunctions.fromArray(array);
}
export default function mhysa(defaultOptions?: TransformOptions) {
function withDefaultOptions<T extends any[], R>(
n: number,
fn: (...args: T) => R,
): (...args: T) => R {
return (...args) => {
const options = {
...defaultOptions,
...((args[n] || {}) as TransformOptions | {}),
};
const provided = args.slice(0, n);
const nextArgs = [
...provided,
...Array(n - provided.length).fill(undefined),
options,
] as T;
return fn(...nextArgs) as R;
};
}
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>(
mapper: (chunk: T, encoding?: string) => R,
options?: TransformOptions,
): NodeJS.ReadWriteStream {
return baseFunctions.map(mapper, options);
}
return {
/**
* Convert an array into a Readable stream of its elements
* @param array Array of elements to stream
*/
fromArray,
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options?: TransformOptions,
): NodeJS.ReadWriteStream {
return baseFunctions.flatMap(mapper, options);
}
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
map: withDefaultOptions(1, map),
/**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects.
*/
export function filter<T>(
mapper:
| ((chunk: T, encoding: string) => boolean)
| ((chunk: T, encoding: string) => Promise<boolean>),
options?: ThroughOptions,
): NodeJS.ReadWriteStream {
return baseFunctions.filter(mapper, options);
}
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
flatMap: withDefaultOptions(1, flatMap),
/**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value
* @param iteratee Reducer function to apply on each streamed chunk
* @param initialValue Initial value
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options?: TransformOptions,
): NodeJS.ReadWriteStream {
return baseFunctions.reduce(iteratee, initialValue, options);
}
/**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects.
*/
filter: withDefaultOptions(1, filter),
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator? Separator to split by, defaulting to "\n"
* @param options? Defaults to encoding: utf8
* @param options.encoding? Encoding written chunks are assumed to use
*/
export function split(
separator?: string | RegExp,
options?: WithEncoding,
): NodeJS.ReadWriteStream {
return baseFunctions.split(separator, options);
}
/**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value
* @param iteratee Reducer function to apply on each streamed chunk
* @param initialValue Initial value
* @param options?
* @param options.readableObjectMode? Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode? Whether this stream should behave as a writable stream of objects
*/
reduce: withDefaultOptions(2, reduce),
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options? Defaults to encoding: utf8
* @param options.encoding? Encoding written chunks are assumed to use
*/
export function join(
separator: string,
options?: WithEncoding,
): NodeJS.ReadWriteStream {
return baseFunctions.join(separator, options);
}
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator? Separator to split by, defaulting to "\n"
* @param options? Defaults to encoding: utf8
* @param options.encoding? Encoding written chunks are assumed to use
*/
split,
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options? Defaults to encoding: utf8
* @param options.encoding Encoding written chunks are assumed to use
*/
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options?: WithEncoding,
): NodeJS.ReadWriteStream {
return baseFunctions.replace(searchValue, replaceValue, options);
}
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options? Defaults to encoding: utf8
* @param options.encoding? Encoding written chunks are assumed to use
*/
join: withDefaultOptions(1, join),
/**
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string in utf8.
*/
export function parse(): NodeJS.ReadWriteStream {
return baseFunctions.parse();
}
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options? Defaults to encoding: utf8
* @param options.encoding Encoding written chunks are assumed to use
*/
replace,
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
* @param options?
* @param options.pretty If true, whitespace is inserted into the stringified chunks.
*
*/
export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream {
return baseFunctions.stringify(options);
}
/**
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string in utf8.
* @param format: @type SerializationFormats defaults SerializationFormats.utf8
* @param emitError: @type boolean Whether or not to emit an error when
* failing to parse. An error will automatically close the stream.
* Defaults to true.
*/
parse,
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects
*/
export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream {
return baseFunctions.collect(options);
}
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
* @param options?
* @param options.pretty If true, whitespace is inserted into the stringified chunks.
*
*/
stringify,
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
export function concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
return baseFunctions.concat(...streams);
}
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects
*/
collect: withDefaultOptions(0, collect),
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to merge
*/
export function merge(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
return baseFunctions.merge(...streams);
}
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
concat,
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
export function duplex(
writable: Writable,
readable: Readable,
): NodeJS.ReadWriteStream {
return baseFunctions.duplex(writable, readable);
}
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to merge
*/
merge,
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream {
return baseFunctions.child(childProcess);
}
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
duplex,
/**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended
* @param readable Readable stream to wait on
*/
export function last<T>(readable: Readable): Promise<T | null> {
return baseFunctions.last(readable);
}
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
child,
/**
* Stores chunks of data internally in array and batches when batchSize is reached.
* @param batchSize Size of the batches, defaults to 1000.
* @param maxBatchAge? Max lifetime of a batch, defaults to 500
*/
export function batch(batchSize: number, maxBatchAge?: number): NodeJS.ReadWriteStream {
return baseFunctions.batch(batchSize, maxBatchAge);
}
/**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended
* @param readable Readable stream to wait on
*/
last,
/**
* Unbatches and sends individual chunks of data
*/
export function unbatch(): NodeJS.ReadWriteStream {
return baseFunctions.unbatch();
}
/**
* Stores chunks of data internally in array and batches when batchSize is reached.
* @param batchSize Size of the batches, defaults to 1000.
* @param maxBatchAge? Max lifetime of a batch, defaults to 500
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects
*/
batch: withDefaultOptions(2, batch),
/**
* Limits date of data transferred into stream.
* @param options?
* @param targetRate? Desired rate in ms
* @param period? Period to sleep for when rate is above or equal to targetRate
*/
export function rate(targetRate?: number, period?: number): NodeJS.ReadWriteStream {
return baseFunctions.rate(targetRate, period);
}
/**
* Unbatches and sends individual chunks of data.
* @param options?
* @param options.objectMode? Whether this stream should behave as a stream of objects
*/
unbatch: withDefaultOptions(0, unbatch),
/**
* Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
*/
export function parallelMap<T, R>(
mapper: (chunk: T) => R,
parallel?: number,
sleepTime?: number,
) {
return baseFunctions.parallelMap(mapper, parallel, sleepTime);
/**
* Limits rate of data transferred into stream.
* @param targetRate? Desired rate in ms.
* @param period? Period to sleep for when rate is above or equal to targetRate.
* @param options?
*/
rate: withDefaultOptions(2, rate),
/**
* Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk.
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
*/
parallelMap: withDefaultOptions(3, parallelMap),
/**
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
* in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies:
* 1. Sliding
* - If the buffer is larger than the batchSize, the front of the buffer is popped to maintain
* the batchSize. When no key is provided, the batchSize is effectively the buffer length. When
* a key is provided, the batchSize is based on the value at that key. For example, given a key
* of `timestamp` and a batchSize of 3000, each item in the buffer will be guaranteed to be
* within 3000 timestamp units from the first element. This means that with a key, multiple elements
* may be spliced off the front of the buffer. The buffer is then pushed into the stream.
* 2. Rolling
* - If the buffer is larger than the batchSize, the buffer is cleared and pushed into the stream.
* When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize
* is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000,
* each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element.
* @param flushStrategy Buffering strategy to use.
* @param batchSize Size of the batch (in units of buffer length or value at key).
* @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer.
* @param options Transform stream options
*/
accumulator: withDefaultOptions(3, accumulator),
/**
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
* in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies:
* 1. Sliding
* - If the iteratee returns false, the front of the buffer is popped until iteratee returns true. The
* item is pushed into the buffer and buffer is pushed into stream.
* 2. Rolling
* - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is
* then pushed into the buffer.
* @param flushStrategy Buffering strategy to use.
* @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into
* or items need to be cleared from buffer.
* @param options Transform stream options
*/
accumulatorBy: withDefaultOptions(2, accumulatorBy),
/**
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
* @param streams Array of streams to compose. Minimum of two.
* @param errorCallback a function that handles any error coming out of the pipeline
* @param options Transform stream options
*/
compose: withDefaultOptions(2, compose),
/**
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
* @param demuxBy
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source.
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
* @param options Writable stream options
*/
demux: withDefaultOptions(2, demux),
};
}

26
src/functions/join.ts Normal file
View File

@@ -0,0 +1,26 @@
import { Transform, TransformOptions } from "stream";
import { StringDecoder } from "string_decoder";
import { WithEncoding } from "./baseDefinitions";
export function join(
separator: string,
options: WithEncoding & TransformOptions = { encoding: "utf8" },
): Transform {
let isFirstChunk = true;
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(asString);
isFirstChunk = false;
}
callback();
},
});
}

8
src/functions/last.ts Normal file
View File

@@ -0,0 +1,8 @@
export function last<T>(readable: NodeJS.ReadableStream): Promise<T | null> {
let lastChunk: T | null = null;
return new Promise((resolve, _) => {
readable
.on("data", chunk => (lastChunk = chunk))
.on("end", () => resolve(lastChunk));
});
}

13
src/functions/map.ts Normal file
View File

@@ -0,0 +1,13 @@
import { Transform, TransformOptions } from "stream";
export function map<T, R>(
mapper: (chunk: T, encoding: string) => R,
options: TransformOptions = { objectMode: true },
): Transform {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
callback(null, await mapper(chunk, encoding));
},
});
}

33
src/functions/merge.ts Normal file
View File

@@ -0,0 +1,33 @@
import { Readable } from "stream";
export function merge(...streams: Readable[]): Readable {
let isStarted = false;
let streamEndedCount = 0;
return new Readable({
objectMode: true,
read() {
if (streamEndedCount >= streams.length) {
this.push(null);
} else if (!isStarted) {
isStarted = true;
streams.forEach(stream =>
stream
.on("data", chunk => {
if (!this.push(chunk)) {
streams.forEach(s => s.pause());
}
})
.on("error", err => this.emit("error", err))
.on("end", () => {
streamEndedCount++;
if (streamEndedCount === streams.length) {
this.push(null);
}
}),
);
} else {
streams.forEach(s => s.resume());
}
},
});
}

View File

@@ -0,0 +1,30 @@
import { Transform, TransformOptions } from "stream";
import { sleep } from "../helpers";
export function parallelMap<T, R>(
mapper: (data: T) => R,
parallel: number = 10,
sleepTime: number = 1,
options?: TransformOptions,
) {
let inflight = 0;
return new Transform({
...options,
async transform(data, encoding, callback) {
while (parallel <= inflight) {
await sleep(sleepTime);
}
inflight += 1;
callback();
const res = await mapper(data);
this.push(res);
inflight -= 1;
},
async flush(callback) {
while (inflight > 0) {
await sleep(sleepTime);
}
callback();
},
});
}

27
src/functions/parse.ts Normal file
View File

@@ -0,0 +1,27 @@
import { Transform } from "stream";
import { StringDecoder } from "string_decoder";
import { SerializationFormats } from "./baseDefinitions";
export function parse(
format: SerializationFormats = SerializationFormats.utf8,
emitError: boolean = true,
): Transform {
const decoder = new StringDecoder(format);
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
try {
const asString = decoder.write(chunk);
// Using await causes parsing errors to be emitted
callback(null, await JSON.parse(asString));
} catch (err) {
if (emitError) {
callback(err);
} else {
callback();
}
}
},
});
}

24
src/functions/rate.ts Normal file
View File

@@ -0,0 +1,24 @@
import { Transform, TransformOptions } from "stream";
import { performance } from "perf_hooks";
import { sleep } from "../helpers";
export function rate(
targetRate: number = 50,
period: number = 1,
options?: TransformOptions,
): Transform {
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip a full period
let total = 0;
const start = performance.now();
return new Transform({
...options,
async transform(data, encoding, callback) {
const currentRate = (total / (performance.now() - start)) * 1000;
if (targetRate && currentRate > targetRate) {
await sleep(deltaMS);
}
total += 1;
callback(undefined, data);
},
});
}

31
src/functions/reduce.ts Normal file
View File

@@ -0,0 +1,31 @@
import { Transform, TransformOptions } from "stream";
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options?: TransformOptions,
) {
let value = initialValue;
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
value = await iteratee(value, chunk, encoding);
callback();
},
flush(callback) {
// Best effort attempt at yielding the final value (will throw if e.g. yielding an object and
// downstream doesn't expect objects)
try {
callback(undefined, value);
} catch (err) {
try {
this.emit("error", err);
} catch {
// Best effort was made
}
}
},
});
}

25
src/functions/replace.ts Normal file
View File

@@ -0,0 +1,25 @@
import { Transform } from "stream";
import { StringDecoder } from "string_decoder";
import { WithEncoding } from "./baseDefinitions";
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options: WithEncoding = { encoding: "utf8" },
): Transform {
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
callback(
undefined,
asString.replace(searchValue, replaceValue),
);
} else {
callback();
}
},
});
}

29
src/functions/split.ts Normal file
View File

@@ -0,0 +1,29 @@
import { Transform } from "stream";
import { StringDecoder } from "string_decoder";
import { WithEncoding } from "./baseDefinitions";
export function split(
separator: string | RegExp = "\n",
options: WithEncoding = { encoding: "utf8" },
): Transform {
let buffered = "";
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
const splitted = asString.split(separator);
if (splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
buffered += splitted[splitted.length - 1];
splitted.slice(0, -1).forEach((part: string) => this.push(part));
callback();
},
flush(callback) {
callback(undefined, buffered + decoder.end());
},
});
}

View File

@@ -0,0 +1,19 @@
import { Transform } from "stream";
import { JsonValue, JsonParseOptions } from "./baseDefinitions";
export function stringify(
options: JsonParseOptions = { pretty: false },
): Transform {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: JsonValue, encoding, callback) {
callback(
undefined,
options.pretty
? JSON.stringify(chunk, null, 2)
: JSON.stringify(chunk),
);
},
});
}

13
src/functions/unbatch.ts Normal file
View File

@@ -0,0 +1,13 @@
import { Transform, TransformOptions } from "stream";
export function unbatch(options?: TransformOptions) {
return new Transform({
...options,
transform(data, encoding, callback) {
for (const d of data) {
this.push(d);
}
callback();
},
});
}

View File

@@ -1,3 +1,17 @@
export async function sleep(time: number): Promise<{} | null> {
return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null;
}
export type AllStreams =
| NodeJS.ReadableStream
| NodeJS.ReadWriteStream
| NodeJS.WritableStream;
export function isReadable(
stream: AllStreams,
): stream is NodeJS.WritableStream {
return (
(stream as NodeJS.ReadableStream).pipe !== undefined &&
(stream as any).readable === true
);
}

View File

@@ -1,22 +1,2 @@
export {
fromArray,
map,
flatMap,
filter,
reduce,
split,
join,
replace,
parse,
stringify,
collect,
concat,
merge,
duplex,
child,
last,
batch,
unbatch,
rate,
parallelMap,
} from "./functions";
import mhysa from "./functions";
export default mhysa;

557
tests/accumulator.spec.ts Normal file
View File

@@ -0,0 +1,557 @@
import test from "ava";
import { expect } from "chai";
import { Readable } from "stream";
import mhysa from "../src";
import { FlushStrategy } from "../src/functions/accumulator";
import { performance } from "perf_hooks";
const { accumulator, accumulatorBy } = mhysa({ objectMode: true });
test.cb("accumulator() rolling", t => {
t.plan(3);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const secondFlush = [{ ts: 2, key: "d" }, { ts: 3, key: "e" }];
const thirdFlush = [{ ts: 4, key: "f" }];
const flushes = [firstFlush, secondFlush, thirdFlush];
source
.pipe(
accumulator(FlushStrategy.rolling, 2, undefined, {
objectMode: true,
}),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", t.end);
[...firstFlush, ...secondFlush, ...thirdFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulator() rolling with key", t => {
t.plan(2);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 2, key: "d" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(accumulator(FlushStrategy.rolling, 3, "ts", { objectMode: true }))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", t.end);
[...firstFlush, ...secondFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb(
"accumulator() rolling should emit error and ignore chunk when its missing key",
t => {
t.plan(2);
let index = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(
FlushStrategy.rolling,
3,
"nonExistingKey",
{ objectMode: true },
);
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
source
.pipe(accumulatorStream)
.on("data", (flush: TestObject[]) => {
// No valid data output
expect(flush).to.deep.equal([]);
})
.on("error", (err: any) => {
source.pipe(accumulatorStream);
accumulatorStream.resume();
expect(err.message).to.equal(
`Key is missing in event: (nonExistingKey, ${JSON.stringify(
input[index],
)})`,
);
index++;
t.pass();
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb(
"accumulator() rolling should emit error, ignore chunk when key is missing and continue processing chunks correctly",
t => {
t.plan(3);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(FlushStrategy.rolling, 3, "ts", {
objectMode: true,
});
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ key: "d" },
{ ts: 3, key: "e" },
];
const firstFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(accumulatorStream)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (err: any) => {
source.pipe(accumulatorStream);
accumulatorStream.resume();
expect(err.message).to.equal(
`Key is missing in event: (ts, ${JSON.stringify(
input[3],
)})`,
);
t.pass();
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb("accumulator() sliding", t => {
t.plan(4);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 4, key: "d" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 4, key: "d" },
];
const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush];
source
.pipe(
accumulator(FlushStrategy.sliding, 3, undefined, {
objectMode: true,
}),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulator() sliding with key", t => {
t.plan(6);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
{ ts: 5, key: "f" },
{ ts: 6, key: "g" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
];
const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fifthFlush,
sixthFlush,
];
source
.pipe(accumulator(FlushStrategy.sliding, 3, "ts", { objectMode: true }))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb(
"accumulator() sliding should emit error and ignore chunk when key is missing",
t => {
t.plan(2);
let index = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(
FlushStrategy.sliding,
3,
"nonExistingKey",
{ objectMode: true },
);
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
source
.pipe(accumulatorStream)
.on("data", (flush: TestObject[]) => {
expect(flush).to.deep.equal([]);
})
.on("error", (err: any) => {
source.pipe(accumulatorStream);
accumulatorStream.resume();
expect(err.message).to.equal(
`Key is missing in event: (nonExistingKey, ${JSON.stringify(
input[index],
)})`,
);
index++;
t.pass();
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb(
"accumulator() sliding should emit error, ignore chunk when key is missing and continue processing chunks correctly",
t => {
t.plan(6);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(FlushStrategy.sliding, 3, "ts", {
objectMode: true,
});
const input = [
{ ts: 0, key: "a" },
{ key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
{ ts: 5, key: "f" },
{ ts: 6, key: "g" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 2, key: "c" }];
const thirdFlush = [{ ts: 2, key: "c" }, { ts: 3, key: "d" }];
const fourthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
const fifthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fifthFlush,
];
source
.pipe(accumulatorStream)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (err: any) => {
source.pipe(accumulatorStream);
accumulatorStream.resume();
expect(err.message).to.equal(
`Key is missing in event: (ts, ${JSON.stringify(
input[1],
)})`,
);
t.pass();
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb("accumulatorBy() rolling", t => {
t.plan(2);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 2, key: "d" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(
accumulatorBy(
FlushStrategy.rolling,
(event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts;
},
{ objectMode: true },
),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", t.end);
[...firstFlush, ...secondFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb.skip(
"accumulatorBy() rolling should emit error when key iteratee throws",
t => {
t.plan(2);
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const accumulaterStream = accumulatorBy(
FlushStrategy.rolling,
(event: TestObject, bufferChunk: TestObject) => {
if (event.key !== "a") {
throw new Error("Failed mapping");
}
return bufferChunk.ts + 3 <= event.ts;
},
{ objectMode: true },
);
source
.pipe(accumulaterStream)
.on("error", (err: any) => {
source.pipe(accumulaterStream);
accumulaterStream.resume();
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb("accumulatorBy() sliding", t => {
t.plan(6);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
{ ts: 5, key: "f" },
{ ts: 6, key: "g" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
];
const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fifthFlush,
sixthFlush,
];
source
.pipe(
accumulatorBy(
FlushStrategy.sliding,
(event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts ? true : false;
},
{ objectMode: true },
),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb.skip(
"accumulatorBy() sliding should emit error when key iteratee throws",
t => {
t.plan(2);
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const accumulaterStream = accumulatorBy(
FlushStrategy.sliding,
(event: TestObject, bufferChunk: TestObject) => {
if (event.key !== "a") {
throw new Error("Failed mapping");
}
return bufferChunk.ts + 3 <= event.ts ? true : false;
},
{ objectMode: true },
);
source
.pipe(accumulaterStream)
.on("error", (err: any) => {
source.pipe(accumulaterStream);
accumulaterStream.resume();
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);

59
tests/batch.spec.ts Normal file
View File

@@ -0,0 +1,59 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { batch } = mhysa({ objectMode: true });
test.cb("batch() batches chunks together", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = [["a", "b", "c"], ["d", "e", "f"], ["g"]];
let i = 0;
source
.pipe(batch(3))
.on("data", (element: string[]) => {
t.deepEqual(element, expectedElements[i]);
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push("d");
source.push("e");
source.push("f");
source.push("g");
source.push(null);
});
test.cb("batch() yields a batch after the timeout", t => {
t.plan(3);
const source = new Readable({
objectMode: true,
read(size: number) {
return;
},
});
const expectedElements = [["a", "b"], ["c"], ["d"]];
let i = 0;
source
.pipe(batch(3))
.on("data", (element: string[]) => {
t.deepEqual(element, expectedElements[i]);
i++;
})
.on("error", t.fail)
.on("end", t.end);
source.push("a");
source.push("b");
setTimeout(() => {
source.push("c");
}, 600);
setTimeout(() => {
source.push("d");
source.push(null);
}, 600 * 2);
});

29
tests/child.spec.ts Normal file
View File

@@ -0,0 +1,29 @@
import * as cp from "child_process";
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { child } = mhysa();
test.cb(
"child() allows easily writing to child process stdin and reading from its stdout",
t => {
t.plan(1);
const source = new Readable();
const catProcess = cp.exec("cat");
let out = "";
source
.pipe(child(catProcess))
.on("data", chunk => (out += chunk))
.on("error", t.end)
.on("end", () => {
expect(out).to.equal("abcdef");
t.pass();
t.end();
});
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
},
);

133
tests/collect.spec.ts Normal file
View File

@@ -0,0 +1,133 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { collect } = mhysa();
test.cb(
"collect() collects streamed elements into an array (object, flowing mode)",
t => {
t.plan(1);
const source = new Readable({ objectMode: true });
source
.pipe(collect({ objectMode: true }))
.on("data", collected => {
expect(collected).to.deep.equal(["a", "b", "c"]);
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
},
);
test.cb(
"collect() collects streamed elements into an array (object, paused mode)",
t => {
t.plan(1);
const source = new Readable({ objectMode: true });
const collector = source.pipe(collect({ objectMode: true }));
collector
.on("readable", () => {
let collected = collector.read();
while (collected !== null) {
expect(collected).to.deep.equal(["a", "b", "c"]);
t.pass();
collected = collector.read();
}
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
},
);
test.cb(
"collect() collects streamed bytes into a buffer (non-object, flowing mode)",
t => {
t.plan(1);
const source = new Readable({ objectMode: false });
source
.pipe(collect())
.on("data", collected => {
expect(collected).to.deep.equal(Buffer.from("abc"));
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
},
);
test.cb(
"collect() collects streamed bytes into a buffer (non-object, paused mode)",
t => {
t.plan(1);
const source = new Readable({ objectMode: false });
const collector = source.pipe(collect({ objectMode: false }));
collector
.on("readable", () => {
let collected = collector.read();
while (collected !== null) {
expect(collected).to.deep.equal(Buffer.from("abc"));
t.pass();
collected = collector.read();
}
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
},
);
test.cb(
"collect() emits an empty array if the source was empty (object mode)",
t => {
t.plan(1);
const source = new Readable({ objectMode: true });
const collector = source.pipe(collect({ objectMode: true }));
collector
.on("data", collected => {
expect(collected).to.deep.equal([]);
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push(null);
},
);
test.cb(
"collect() emits nothing if the source was empty (non-object mode)",
t => {
t.plan(0);
const source = new Readable({ objectMode: false });
const collector = source.pipe(collect({ objectMode: false }));
collector
.on("data", () => t.fail())
.on("error", t.end)
.on("end", t.end);
source.push(null);
},
);

609
tests/compose.spec.ts Normal file
View File

@@ -0,0 +1,609 @@
import * as test from "ava";
import { expect } from "chai";
import { sleep } from "../src/helpers";
import { Readable, Writable } from "stream";
import mhysa from "../src";
import { performance } from "perf_hooks";
const { compose, map } = mhysa({ objectMode: true });
test.cb("compose() chains two streams together in the correct order", t => {
t.plan(3);
interface Chunk {
visited: number[];
key: string;
}
let i = 0;
const first = map((chunk: Chunk) => {
chunk.visited.push(1);
return chunk;
});
const second = map((chunk: Chunk) => {
chunk.visited.push(2);
return chunk;
});
const composed = compose([first, second]);
composed.on("data", data => {
expect(data).to.deep.equal(result[i]);
t.pass();
i++;
if (i === 3) {
t.end();
}
});
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "c", visited: [] },
];
const result = [
{ key: "a", visited: [1, 2] },
{ key: "b", visited: [1, 2] },
{ key: "c", visited: [1, 2] },
];
input.forEach(item => composed.write(item));
});
test.cb("piping compose() maintains correct order", t => {
t.plan(3);
interface Chunk {
visited: number[];
key: string;
}
let i = 0;
const first = map((chunk: Chunk) => {
chunk.visited.push(1);
return chunk;
});
const second = map((chunk: Chunk) => {
chunk.visited.push(2);
return chunk;
});
const composed = compose([first, second]);
const third = map((chunk: Chunk) => {
chunk.visited.push(3);
return chunk;
});
composed.pipe(third).on("data", data => {
expect(data).to.deep.equal(result[i]);
t.pass();
i++;
if (i === 3) {
t.end();
}
});
composed.on("error", err => {
t.end(err);
});
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "c", visited: [] },
];
const result = [
{ key: "a", visited: [1, 2, 3] },
{ key: "b", visited: [1, 2, 3] },
{ key: "c", visited: [1, 2, 3] },
];
input.forEach(item => composed.write(item));
});
test("compose() writable length should be less than highWaterMark when handing writes", async t => {
t.plan(7);
return new Promise(async (resolve, reject) => {
interface Chunk {
key: string;
mapped: number[];
}
const first = map(async (chunk: Chunk) => {
chunk.mapped.push(1);
return chunk;
});
const second = map(async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
});
const composed = compose(
[first, second],
undefined,
{
highWaterMark: 2,
},
);
composed.on("error", err => {
reject();
});
composed.on("drain", () => {
t.pass();
expect(composed._writableState.length).to.be.equal(0);
});
composed.on("data", (chunk: Chunk) => {
if (chunk.key === "e") {
resolve();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
for (const item of input) {
const res = composed.write(item);
expect(composed._writableState.length).to.be.at.most(2);
t.pass();
if (!res) {
await sleep(10);
}
}
});
});
test("compose() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", async t => {
t.plan(7);
const _rate = 100;
const highWaterMark = 2;
return new Promise(async (resolve, reject) => {
interface Chunk {
key: string;
mapped: number[];
}
const first = map(async (chunk: Chunk) => {
await sleep(_rate);
chunk.mapped.push(1);
return chunk;
});
const second = map(async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
});
const composed = compose(
[first, second],
undefined,
{
highWaterMark,
},
);
composed.on("error", err => {
reject();
});
composed.on("drain", () => {
t.pass();
expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.closeTo(
_rate * highWaterMark,
40,
);
});
composed.on("data", (chunk: Chunk) => {
pendingReads--;
if (pendingReads === 0) {
resolve();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
let start = performance.now();
let pendingReads = input.length;
start = performance.now();
for (const item of input) {
const res = composed.write(item);
expect(composed._writableState.length).to.be.at.most(highWaterMark);
t.pass();
if (!res) {
await sleep(_rate * highWaterMark * 2);
start = performance.now();
}
}
});
});
test.cb(
"compose() should emit drain event after 500 ms when writing 5 items that take 100ms to process with a highWaterMark of 5 ",
t => {
t.plan(6);
const _rate = 100;
interface Chunk {
key: string;
mapped: number[];
}
const first = map(async (chunk: Chunk) => {
await sleep(_rate);
chunk.mapped.push(1);
return chunk;
});
const second = map(async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
});
const composed = compose(
[first, second],
undefined,
{
highWaterMark: 5,
},
);
composed.on("error", err => {
t.end(err);
});
composed.on("drain", () => {
expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.closeTo(
_rate * input.length,
50,
);
t.pass();
});
composed.on("data", (chunk: Chunk) => {
t.pass();
if (chunk.key === "e") {
t.end();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
input.forEach(item => {
composed.write(item);
});
const start = performance.now();
},
);
test.cb(
"compose() should emit drain event immediately when second stream is bottleneck",
t => {
t.plan(6);
const _rate = 200;
interface Chunk {
key: string;
mapped: number[];
}
const first = map((chunk: Chunk) => {
chunk.mapped.push(1);
return chunk;
});
const second = map(
async (chunk: Chunk) => {
pendingReads--;
await sleep(_rate);
expect(second._writableState.length).to.be.equal(1);
expect(first._readableState.length).to.equal(pendingReads);
chunk.mapped.push(2);
return chunk;
},
{ highWaterMark: 1 },
);
const composed = compose(
[first, second],
undefined,
{
highWaterMark: 5,
},
);
composed.on("error", err => {
t.end(err);
});
composed.on("drain", () => {
expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.lessThan(_rate);
t.pass();
});
composed.on("data", (chunk: Chunk) => {
expect(composed._writableState.length).to.be.equal(0);
t.pass();
if (chunk.key === "e") {
t.end();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
let pendingReads = input.length;
input.forEach(item => {
composed.write(item);
});
const start = performance.now();
},
);
test.cb(
"compose() should emit drain event and first should contain up to highWaterMark items in readable state when second is bottleneck",
t => {
t.plan(6);
interface Chunk {
index: number;
mapped: string[];
}
const first = map(
async (chunk: Chunk) => {
expect(first._readableState.length).to.be.at.most(2);
chunk.mapped.push("first");
return chunk;
},
{
highWaterMark: 2,
},
);
const second = map(
async (chunk: Chunk) => {
expect(second._writableState.length).to.be.equal(1);
await sleep(100);
chunk.mapped.push("second");
return chunk;
},
{ highWaterMark: 2 },
);
const composed = compose(
[first, second],
undefined,
{
highWaterMark: 5,
},
);
composed.on("error", err => {
t.end(err);
});
composed.on("data", (chunk: Chunk) => {
expect(chunk.mapped.length).to.equal(2);
expect(chunk.mapped).to.deep.equal(["first", "second"]);
t.pass();
if (chunk.index === 5) {
t.end();
}
});
composed.on("drain", () => {
expect(composed._writableState.length).to.be.equal(0);
t.pass();
});
const input = [
{ index: 1, mapped: [] },
{ index: 2, mapped: [] },
{ index: 3, mapped: [] },
{ index: 4, mapped: [] },
{ index: 5, mapped: [] },
];
input.forEach(item => {
composed.write(item);
});
},
);
test.cb(
"compose() should not emit drain event writing 5 items to compose with a highWaterMark of 6",
t => {
t.plan(5);
const _rate = 100;
interface Chunk {
key: string;
mapped: number[];
}
const first = map(async (chunk: Chunk) => {
await sleep(_rate);
chunk.mapped.push(1);
return chunk;
});
const second = map(async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
});
const composed = compose(
[first, second],
undefined,
{
highWaterMark: 6,
},
);
composed.on("error", err => {
t.end(err);
});
composed.on("drain", () => {
t.end(new Error("Drain should not be emitted"));
});
composed.on("data", (chunk: Chunk) => {
t.pass();
if (chunk.key === "e") {
t.end();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
input.forEach(item => {
composed.write(item);
});
},
);
test.cb("compose() should be 'destroyable'", t => {
t.plan(3);
const _sleep = 100;
interface Chunk {
key: string;
mapped: number[];
}
const first = map(async (chunk: Chunk) => {
await sleep(_sleep);
chunk.mapped.push(1);
return chunk;
});
const second = map(async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
});
const composed = compose(
[first, second],
(err: any) => {
t.pass();
},
);
const fakeSource = new Readable({
objectMode: true,
read() {
return;
},
});
const fakeSink = new Writable({
objectMode: true,
write(data, enc, cb) {
const cur = input.shift();
t.is(cur.key, data.key);
t.deepEqual(cur.mapped, [1, 2]);
if (cur.key === "a") {
composed.destroy();
}
cb();
},
});
composed.on("close", t.end);
fakeSource.pipe(composed).pipe(fakeSink);
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
fakeSource.push(input[0]);
fakeSource.push(input[1]);
fakeSource.push(input[2]);
fakeSource.push(input[3]);
fakeSource.push(input[4]);
});
test.cb("compose() `finish` and `end` propagates", t => {
interface Chunk {
key: string;
mapped: number[];
}
t.plan(8);
const first = map(async (chunk: Chunk) => {
chunk.mapped.push(1);
return chunk;
});
const second = map(async (chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
});
const composed = compose(
[first, second],
undefined,
{
highWaterMark: 3,
},
);
const fakeSource = new Readable({
objectMode: true,
read() {
return;
},
});
const sink = map((d: Chunk) => {
const curr = input.shift();
t.is(curr.key, d.key);
t.deepEqual(d.mapped, [1, 2]);
});
fakeSource.pipe(composed).pipe(sink);
fakeSource.on("end", () => {
t.pass();
});
composed.on("finish", () => {
t.pass();
});
composed.on("end", () => {
t.pass();
t.end();
});
sink.on("finish", () => {
t.pass();
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
fakeSource.push(input[0]);
fakeSource.push(input[1]);
fakeSource.push(null);
});

181
tests/concat.spec.ts Normal file
View File

@@ -0,0 +1,181 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { concat, collect } = mhysa();
test.cb(
"concat() concatenates multiple readable streams (object, flowing mode)",
t => {
t.plan(6);
const source1 = new Readable({ objectMode: true });
const source2 = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c", "d", "e", "f"];
let i = 0;
concat(source1, source2)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source1.push("a");
source2.push("d");
source1.push("b");
source2.push("e");
source1.push("c");
source2.push("f");
source2.push(null);
source1.push(null);
},
);
test.cb(
"concat() concatenates multiple readable streams (object, paused mode)",
t => {
t.plan(6);
const source1 = new Readable({ objectMode: true });
const source2 = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c", "d", "e", "f"];
let i = 0;
const concatenation = concat(source1, source2)
.on("readable", () => {
let element = concatenation.read();
while (element !== null) {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
element = concatenation.read();
}
})
.on("error", t.end)
.on("end", t.end);
source1.push("a");
source2.push("d");
source1.push("b");
source2.push("e");
source1.push("c");
source2.push("f");
source2.push(null);
source1.push(null);
},
);
test.cb(
"concat() concatenates multiple readable streams (non-object, flowing mode)",
t => {
t.plan(6);
const source1 = new Readable({ objectMode: false });
const source2 = new Readable({ objectMode: false });
const expectedElements = ["a", "b", "c", "d", "e", "f"];
let i = 0;
concat(source1, source2)
.on("data", (element: string) => {
expect(element).to.deep.equal(Buffer.from(expectedElements[i]));
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source1.push("a");
source2.push("d");
source1.push("b");
source2.push("e");
source1.push("c");
source2.push("f");
source2.push(null);
source1.push(null);
},
);
test.cb(
"concat() concatenates multiple readable streams (non-object, paused mode)",
t => {
t.plan(6);
const source1 = new Readable({ objectMode: false, read: () => ({}) });
const source2 = new Readable({ objectMode: false, read: () => ({}) });
const expectedElements = ["a", "b", "c", "d", "e", "f"];
let i = 0;
const concatenation = concat(source1, source2)
.on("readable", () => {
let element = concatenation.read();
while (element !== null) {
expect(element).to.deep.equal(
Buffer.from(expectedElements[i]),
);
t.pass();
i++;
element = concatenation.read();
}
})
.on("error", t.end)
.on("end", t.end);
source1.push("a");
setTimeout(() => source2.push("d"), 10);
setTimeout(() => source1.push("b"), 20);
setTimeout(() => source2.push("e"), 30);
setTimeout(() => source1.push("c"), 40);
setTimeout(() => source2.push("f"), 50);
setTimeout(() => source2.push(null), 60);
setTimeout(() => source1.push(null), 70);
},
);
test.cb("concat() concatenates a single readable stream (object mode)", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c", "d", "e", "f"];
let i = 0;
concat(source)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb(
"concat() concatenates a single readable stream (non-object mode)",
t => {
t.plan(3);
const source = new Readable({ objectMode: false });
const expectedElements = ["a", "b", "c", "d", "e", "f"];
let i = 0;
concat(source)
.on("data", (element: string) => {
expect(element).to.deep.equal(Buffer.from(expectedElements[i]));
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
},
);
test.cb("concat() concatenates empty list of readable streams", t => {
t.plan(0);
concat()
.pipe(collect())
.on("data", _ => {
t.fail();
})
.on("error", t.end)
.on("end", t.end);
});

View File

@@ -0,0 +1,21 @@
import { Readable } from "stream";
import test from "ava";
import mhysa from "../src";
const withDefaultOptions = mhysa({ objectMode: true });
const withoutOptions = mhysa();
test("Mhysa instances can have default options", t => {
let batch = withDefaultOptions.batch();
t.true(batch._readableState.objectMode);
t.true(batch._writableState.objectMode);
batch = withDefaultOptions.batch(3);
t.true(batch._readableState.objectMode);
t.true(batch._writableState.objectMode);
batch = withDefaultOptions.batch(3, 1);
t.true(batch._readableState.objectMode);
t.true(batch._writableState.objectMode);
batch = withDefaultOptions.batch(3, 1, { objectMode: false });
t.false(batch._readableState.objectMode);
t.false(batch._writableState.objectMode);
});

836
tests/demux.spec.ts Normal file
View File

@@ -0,0 +1,836 @@
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
import { Writable, Readable } from "stream";
const sinon = require("sinon");
const { sleep } = require("../src/helpers");
import { performance } from "perf_hooks";
const { demux, map, fromArray } = mhysa({ objectMode: true });
interface Test {
key: string;
visited: number[];
}
test.cb("demux() constructor should be called once per key", t => {
t.plan(1);
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "c", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
];
const construct = sinon.spy((destKey: string) => {
const dest = map((chunk: Test) => {
chunk.visited.push(1);
return chunk;
});
return dest;
});
const demuxed = demux(construct, "key", {});
demuxed.on("finish", () => {
expect(construct.withArgs("a").callCount).to.equal(1);
expect(construct.withArgs("b").callCount).to.equal(1);
expect(construct.withArgs("c").callCount).to.equal(1);
t.pass();
t.end();
});
fromArray(input).pipe(demuxed);
});
test.cb("demux() should send input through correct pipeline", t => {
t.plan(6);
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "c", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
];
const pipelineSpies = {};
const construct = (destKey: string) => {
const mapper = sinon.spy((chunk: Test) => {
return { ...chunk, visited: [1] };
});
const dest = map(mapper);
pipelineSpies[destKey] = mapper;
return dest;
};
const demuxed = demux(construct, "key", {});
demuxed.on("finish", () => {
pipelineSpies["a"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("a");
t.pass();
});
pipelineSpies["b"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("b");
t.pass();
});
pipelineSpies["c"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("c");
t.pass();
});
t.end();
});
fromArray(input).pipe(demuxed);
});
test.cb("demux() constructor should be called once per key using keyBy", t => {
t.plan(1);
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "c", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
];
const construct = sinon.spy((destKey: string) => {
const dest = map((chunk: Test) => {
chunk.visited.push(1);
return chunk;
});
return dest;
});
const demuxed = demux(construct, item => item.key, {});
demuxed.on("finish", () => {
expect(construct.withArgs("a").callCount).to.equal(1);
expect(construct.withArgs("b").callCount).to.equal(1);
expect(construct.withArgs("c").callCount).to.equal(1);
t.pass();
t.end();
});
fromArray(input).pipe(demuxed);
});
test.cb("demux() should send input through correct pipeline using keyBy", t => {
t.plan(6);
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "c", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
];
const pipelineSpies = {};
const construct = (destKey: string) => {
const mapper = sinon.spy((chunk: Test) => {
return { ...chunk, visited: [1] };
});
const dest = map(mapper);
pipelineSpies[destKey] = mapper;
return dest;
};
const demuxed = demux(construct, item => item.key, {});
demuxed.on("finish", () => {
pipelineSpies["a"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("a");
t.pass();
});
pipelineSpies["b"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("b");
t.pass();
});
pipelineSpies["c"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("c");
t.pass();
});
t.end();
});
fromArray(input).pipe(demuxed);
});
test("demux() write should return false and emit drain if more than highWaterMark items are buffered", t => {
return new Promise(async (resolve, reject) => {
t.plan(7);
interface Chunk {
key: string;
mapped: number[];
}
const input: Chunk[] = [
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
];
let pendingReads = input.length;
const highWaterMark = 5;
const slowProcessorSpeed = 25;
const construct = (destKey: string) => {
const first = map(
async (chunk: Chunk) => {
await sleep(slowProcessorSpeed);
return { ...chunk, mapped: [1] };
},
{ highWaterMark: 1 },
);
first.on("data", chunk => {
expect(chunk.mapped).to.deep.equal([1]);
pendingReads--;
if (pendingReads === 0) {
resolve();
}
t.pass();
});
return first;
};
const _demux = demux(construct, "key", {
highWaterMark,
});
_demux.on("error", err => {
reject();
});
for (const item of input) {
const res = _demux.write(item);
expect(_demux._writableState.length).to.be.at.most(highWaterMark);
if (!res) {
await new Promise((resolv, rej) => {
_demux.once("drain", () => {
expect(_demux._writableState.length).to.be.equal(0);
t.pass();
resolv();
});
});
}
}
});
});
test("demux() should emit one drain event after slowProcessorSpeed * highWaterMark ms when first stream is bottleneck", t => {
return new Promise(async (resolve, reject) => {
t.plan(7);
interface Chunk {
key: string;
mapped: number[];
}
const input: Chunk[] = [
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
];
let pendingReads = input.length;
const highWaterMark = 5;
const slowProcessorSpeed = 25;
const construct = (destKey: string) => {
const first = map(
async (chunk: Chunk) => {
await sleep(slowProcessorSpeed);
chunk.mapped.push(1);
return chunk;
},
{ highWaterMark: 1 },
);
first.on("data", () => {
t.pass();
pendingReads--;
if (pendingReads === 0) {
resolve();
}
});
return first;
};
const _demux = demux(construct, "key", {
highWaterMark,
});
_demux.on("error", err => {
reject();
});
const start = performance.now();
for (const item of input) {
const res = _demux.write(item);
if (!res) {
await new Promise((resolv, rej) => {
// This event should be received after all items in demux are processed
_demux.once("drain", () => {
expect(performance.now() - start).to.be.greaterThan(
slowProcessorSpeed * highWaterMark,
);
t.pass();
resolv();
});
});
}
}
});
});
test("demux() should emit one drain event when writing 6 items with highWaterMark of 5", t => {
return new Promise(async (resolve, reject) => {
t.plan(1);
interface Chunk {
key: string;
mapped: number[];
}
const highWaterMark = 5;
const input = [
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
];
let pendingReads = input.length;
const construct = (destKey: string) => {
const first = map(
async (chunk: Chunk) => {
await sleep(50);
chunk.mapped.push(2);
return chunk;
},
{ highWaterMark: 1 },
);
first.on("data", () => {
pendingReads--;
if (pendingReads === 0) {
resolve();
}
});
return first;
};
const _demux = demux(construct, "key", {
highWaterMark: 5,
});
_demux.on("error", err => {
reject();
});
for (const item of input) {
const res = _demux.write(item);
expect(_demux._writableState.length).to.be.at.most(highWaterMark);
if (!res) {
await new Promise(_resolve => {
_demux.once("drain", () => {
_resolve();
expect(_demux._writableState.length).to.be.equal(0);
t.pass();
});
});
}
}
});
});
test.cb(
"demux() should emit drain event when second stream is bottleneck after (highWaterMark - 2) * slowProcessorSpeed ms",
t => {
// ie) first two items are pushed directly into first and second streams (highWaterMark - 2 remain in demux)
t.plan(8);
const slowProcessorSpeed = 100;
const highWaterMark = 5;
interface Chunk {
key: string;
mapped: number[];
}
const sink = new Writable({
objectMode: true,
write(chunk, encoding, cb) {
expect(chunk.mapped).to.deep.equal([1, 2]);
t.pass();
pendingReads--;
if (pendingReads === 0) {
t.end();
}
cb();
},
});
const construct = (destKey: string) => {
const first = map(
(chunk: Chunk) => {
chunk.mapped.push(1);
return chunk;
},
{ highWaterMark: 1 },
);
const second = map(
async (chunk: Chunk) => {
await sleep(slowProcessorSpeed);
chunk.mapped.push(2);
return chunk;
},
{ highWaterMark: 1 },
);
first.pipe(second).pipe(sink);
return first;
};
const _demux = demux(construct, () => "a", {
highWaterMark,
});
_demux.on("error", err => {
t.end(err);
});
_demux.on("drain", () => {
expect(_demux._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.greaterThan(
slowProcessorSpeed * 3,
);
t.pass();
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
{ key: "f", mapped: [] },
{ key: "g", mapped: [] },
];
let pendingReads = input.length;
const start = performance.now();
fromArray(input).pipe(_demux);
},
);
test.cb(
"demux() should emit drain event when third stream is bottleneck",
t => {
// @TODO investigate why drain is emitted after slowProcessorSpeed
t.plan(8);
const slowProcessorSpeed = 100;
const highWaterMark = 5;
interface Chunk {
key: string;
mapped: number[];
}
const sink = new Writable({
objectMode: true,
write(chunk, encoding, cb) {
expect(chunk.mapped).to.deep.equal([1, 2, 3]);
t.pass();
pendingReads--;
if (pendingReads === 0) {
t.end();
}
cb();
},
});
const construct = (destKey: string) => {
const first = map(
(chunk: Chunk) => {
chunk.mapped.push(1);
return chunk;
},
{ highWaterMark: 1 },
);
const second = map(
(chunk: Chunk) => {
chunk.mapped.push(2);
return chunk;
},
{ highWaterMark: 1 },
);
const third = map(
async (chunk: Chunk) => {
await sleep(slowProcessorSpeed);
chunk.mapped.push(3);
return chunk;
},
{ highWaterMark: 1 },
);
first
.pipe(second)
.pipe(third)
.pipe(sink);
return first;
};
const _demux = demux(construct, () => "a", {
highWaterMark,
});
_demux.on("error", err => {
t.end(err);
});
_demux.on("drain", () => {
expect(_demux._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.greaterThan(
slowProcessorSpeed,
);
t.pass();
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
{ key: "f", mapped: [] },
{ key: "g", mapped: [] },
];
let pendingReads = input.length;
const start = performance.now();
fromArray(input).pipe(_demux);
},
);
test("demux() should be blocked by slowest pipeline", t => {
t.plan(1);
const slowProcessorSpeed = 100;
interface Chunk {
key: string;
mapped: number[];
}
return new Promise(async (resolve, reject) => {
const construct = (destKey: string) => {
const first = map(
async (chunk: Chunk) => {
await sleep(slowProcessorSpeed);
chunk.mapped.push(1);
return chunk;
},
{ highWaterMark: 1 },
);
return first;
};
const _demux = demux(construct, "key", {
highWaterMark: 1,
});
_demux.on("error", err => {
reject(err);
});
_demux.on("data", async chunk => {
pendingReads--;
if (chunk.key === "b") {
expect(performance.now() - start).to.be.greaterThan(
slowProcessorSpeed * totalItems,
);
t.pass();
expect(pendingReads).to.equal(0);
resolve();
}
});
const input = [
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "c", mapped: [] },
{ key: "c", mapped: [] },
{ key: "b", mapped: [] },
];
let pendingReads = input.length;
const totalItems = input.length;
const start = performance.now();
for (const item of input) {
if (!_demux.write(item)) {
await new Promise(_resolve => {
_demux.once("drain", () => {
_resolve();
});
});
}
}
});
});
test.cb("Demux should remux to sink", t => {
t.plan(6);
let i = 0;
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "c", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
];
const result = [
{ key: "a", visited: ["a"] },
{ key: "b", visited: ["b"] },
{ key: "a", visited: ["a"] },
{ key: "c", visited: ["c"] },
{ key: "a", visited: ["a"] },
{ key: "b", visited: ["b"] },
];
const construct = (destKey: string) => {
const dest = map((chunk: any) => {
chunk.visited.push(destKey);
return chunk;
});
return dest;
};
const sink = map(d => {
t.deepEqual(d, result[i]);
i++;
if (i === input.length) {
t.end();
}
});
const demuxed = demux(construct, "key", {});
fromArray(input)
.pipe(demuxed)
.pipe(sink);
});
test.cb("Demux should send data events", t => {
t.plan(6);
let i = 0;
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "c", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
];
const result = [
{ key: "a", visited: ["a"] },
{ key: "b", visited: ["b"] },
{ key: "a", visited: ["a"] },
{ key: "c", visited: ["c"] },
{ key: "a", visited: ["a"] },
{ key: "b", visited: ["b"] },
];
const construct = (destKey: string) => {
const dest = map((chunk: any) => {
chunk.visited.push(destKey);
return chunk;
});
return dest;
};
const demuxed = demux(construct, "key", {});
fromArray(input).pipe(demuxed);
demuxed.on("data", d => {
t.deepEqual(d, result[i]);
i++;
if (i === input.length) {
t.end();
}
});
});
test.cb("demux() `finish` and `end` propagates", t => {
interface Chunk {
key: string;
mapped: number[];
}
t.plan(9);
const construct = (destKey: string) => {
const dest = map((chunk: any) => {
chunk.mapped.push(destKey);
return chunk;
});
return dest;
};
const _demux = demux(construct, "key", {
highWaterMark: 3,
});
const fakeSource = new Readable({
objectMode: true,
read() {
return;
},
});
const sink = map((d: any) => {
const curr = input.shift();
t.is(curr.key, d.key);
t.deepEqual(d.mapped, [d.key]);
});
fakeSource.pipe(_demux).pipe(sink);
fakeSource.on("end", () => {
t.pass();
});
_demux.on("finish", () => {
t.pass();
});
_demux.on("unpipe", () => {
t.pass();
});
_demux.on("end", () => {
t.pass();
t.end();
});
sink.on("finish", () => {
t.pass();
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
];
fakeSource.push(input[0]);
fakeSource.push(input[1]);
fakeSource.push(null);
});
test.cb("demux() `unpipe` propagates", t => {
interface Chunk {
key: string;
mapped: number[];
}
t.plan(7);
const construct = (destKey: string) => {
const dest = map((chunk: any) => {
chunk.mapped.push(destKey);
return chunk;
});
return dest;
};
const _demux = demux(construct, "key", {
highWaterMark: 3,
});
const fakeSource = new Readable({
objectMode: true,
read() {
return;
},
});
const sink = map((d: any) => {
const curr = input.shift();
t.is(curr.key, d.key);
t.deepEqual(d.mapped, [d.key]);
});
fakeSource.pipe(_demux).pipe(sink);
_demux.on("unpipe", () => {
t.pass();
});
sink.on("unpipe", () => {
t.pass();
});
sink.on("finish", () => {
t.pass();
t.end();
});
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "a", mapped: [] },
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
];
fakeSource.push(input[0]);
fakeSource.push(input[1]);
fakeSource.push(null);
});
test.cb("demux() should be 'destroyable'", t => {
t.plan(2);
const _sleep = 100;
interface Chunk {
key: string;
mapped: string[];
}
const construct = (destKey: string) => {
const first = map(async (chunk: Chunk) => {
await sleep(_sleep);
chunk.mapped.push(destKey);
return chunk;
});
return first;
};
const _demux = demux(construct, "key");
const fakeSource = new Readable({
objectMode: true,
read() {
return;
},
});
const fakeSink = new Writable({
objectMode: true,
write(data, enc, cb) {
const cur = input.shift();
t.is(cur.key, data.key);
t.deepEqual(cur.mapped, ["a"]);
if (cur.key === "a") {
_demux.destroy();
}
cb();
},
});
_demux.on("close", t.end);
fakeSource.pipe(_demux).pipe(fakeSink);
const input = [
{ key: "a", mapped: [] },
{ key: "b", mapped: [] },
{ key: "c", mapped: [] },
{ key: "d", mapped: [] },
{ key: "e", mapped: [] },
];
fakeSource.push(input[0]);
fakeSource.push(input[1]);
fakeSource.push(input[2]);
fakeSource.push(input[3]);
fakeSource.push(input[4]);
});

29
tests/duplex.spec.ts Normal file
View File

@@ -0,0 +1,29 @@
import * as cp from "child_process";
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { duplex } = mhysa();
test.cb(
"duplex() combines a writable and readable stream into a ReadWrite stream",
t => {
t.plan(1);
const source = new Readable();
const catProcess = cp.exec("cat");
let out = "";
source
.pipe(duplex(catProcess.stdin!, catProcess.stdout!))
.on("data", chunk => (out += chunk))
.on("error", t.end)
.on("end", () => {
expect(out).to.equal("abcdef");
t.pass();
t.end();
});
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
},
);

117
tests/filter.spec.ts Normal file
View File

@@ -0,0 +1,117 @@
import test from "ava";
import { expect } from "chai";
import { Readable } from "stream";
import mhysa from "../src";
const { filter } = mhysa();
test.cb("filter() filters elements synchronously", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "c"];
let i = 0;
source
.pipe(
filter((element: string) => element !== "b", {
readableObjectMode: true,
writableObjectMode: true,
}),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("filter() filters elements asynchronously", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "c"];
let i = 0;
source
.pipe(
filter(
async (element: string) => {
await Promise.resolve();
return element !== "b";
},
{ readableObjectMode: true, writableObjectMode: true },
),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb.skip("filter() emits errors during synchronous filtering", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
filter(
(element: string) => {
if (element !== "a") {
throw new Error("Failed filtering");
}
return true;
},
{ readableObjectMode: true, writableObjectMode: true },
),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed filtering");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb.skip("filter() emits errors during asynchronous filtering", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
filter(
async (element: string) => {
await Promise.resolve();
if (element !== "a") {
throw new Error("Failed filtering");
}
return true;
},
{ readableObjectMode: true, writableObjectMode: true },
),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed filtering");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});

101
tests/flatMap.spec.ts Normal file
View File

@@ -0,0 +1,101 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { flatMap } = mhysa({ objectMode: true });
test.cb("flatMap() maps elements synchronously", t => {
t.plan(6);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "A", "b", "B", "c", "C"];
let i = 0;
source
.pipe(flatMap((element: string) => [element, element.toUpperCase()]))
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("flatMap() maps elements asynchronously", t => {
t.plan(6);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "A", "b", "B", "c", "C"];
let i = 0;
source
.pipe(
flatMap(async (element: string) => {
await Promise.resolve();
return [element, element.toUpperCase()];
}),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb.skip("flatMap() emits errors during synchronous mapping", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
flatMap((element: string) => {
if (element !== "a") {
throw new Error("Failed mapping");
}
return [element, element.toUpperCase()];
}),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb.skip("flatMap() emits errors during asynchronous mapping", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
flatMap(async (element: string) => {
await Promise.resolve();
if (element !== "a") {
throw new Error("Failed mapping");
}
return [element, element.toUpperCase()];
}),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});

46
tests/fromArray.spec.ts Normal file
View File

@@ -0,0 +1,46 @@
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { fromArray } = mhysa();
test.cb("fromArray() streams array elements in flowing mode", t => {
t.plan(3);
const elements = ["a", "b", "c"];
const stream = fromArray(elements);
let i = 0;
stream
.on("data", (element: string) => {
expect(element).to.equal(elements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
});
test.cb("fromArray() ends immediately if there are no array elements", t => {
t.plan(0);
fromArray([])
.on("data", () => t.fail())
.on("error", t.end)
.on("end", t.end);
});
test.cb("fromArray() streams array elements in paused mode", t => {
t.plan(3);
const elements = ["a", "b", "c"];
const stream = fromArray(elements);
let i = 0;
stream
.on("readable", () => {
let element = stream.read();
while (element !== null) {
expect(element).to.equal(elements[i]);
t.pass();
i++;
element = stream.read();
}
})
.on("error", t.end)
.on("end", t.end);
});

57
tests/join.spec.ts Normal file
View File

@@ -0,0 +1,57 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { join } = mhysa();
test.cb("join() joins chunks using the specified separator", t => {
t.plan(9);
const source = new Readable({ objectMode: true });
const expectedParts = ["ab|", "|", "c|d", "|", "|", "|", "e", "|", "|f|"];
let i = 0;
source
.pipe(join("|"))
.on("data", part => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("ab|");
source.push("c|d");
source.push("|");
source.push("e");
source.push("|f|");
source.push(null);
});
test.cb(
"join() joins chunks using the specified separator without breaking up multi-byte characters " +
"spanning multiple chunks",
t => {
t.plan(5);
const source = new Readable({ objectMode: true });
const expectedParts = ["ø", "|", "ö", "|", "一"];
let i = 0;
source
.pipe(join("|"))
.on("data", part => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ø").slice(1, 2));
source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ö").slice(1, 2));
source.push(Buffer.from("一").slice(0, 1)); // 3-byte character spanning three chunks
source.push(Buffer.from("一").slice(1, 2));
source.push(Buffer.from("一").slice(2, 3));
source.push(null);
},
);

16
tests/last.spec.ts Normal file
View File

@@ -0,0 +1,16 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { last } = mhysa();
test("last() resolves to the last chunk streamed by the given readable stream", async t => {
const source = new Readable({ objectMode: true });
const lastPromise = last(source);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
const lastChunk = await lastPromise;
expect(lastChunk).to.equal("ef");
});

57
tests/map.spec.ts Normal file
View File

@@ -0,0 +1,57 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { map } = mhysa();
test.cb("map() maps elements synchronously", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const mapStream = map((element: string) => element.toUpperCase(), {
objectMode: true,
});
const expectedElements = ["A", "B", "C"];
let i = 0;
source
.pipe(mapStream)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("map() maps elements asynchronously", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const mapStream = map(
async (element: string) => {
await Promise.resolve();
return element.toUpperCase();
},
{ objectMode: true },
);
const expectedElements = ["A", "B", "C"];
let i = 0;
source
.pipe(mapStream)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});

61
tests/merge.spec.ts Normal file
View File

@@ -0,0 +1,61 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { merge } = mhysa();
test.cb(
"merge() merges multiple readable streams in chunk arrival order",
t => {
t.plan(6);
const source1 = new Readable({ objectMode: true, read: () => ({}) });
const source2 = new Readable({ objectMode: true, read: () => ({}) });
const expectedElements = ["a", "d", "b", "e", "c", "f"];
let i = 0;
merge(source1, source2)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source1.push("a");
setTimeout(() => source2.push("d"), 10);
setTimeout(() => source1.push("b"), 20);
setTimeout(() => source2.push("e"), 30);
setTimeout(() => source1.push("c"), 40);
setTimeout(() => source2.push("f"), 50);
setTimeout(() => source2.push(null), 60);
setTimeout(() => source1.push(null), 70);
},
);
test.cb("merge() merges a readable stream", t => {
t.plan(3);
const source = new Readable({ objectMode: true, read: () => ({}) });
const expectedElements = ["a", "b", "c"];
let i = 0;
merge(source)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("merge() merges an empty list of readable streams", t => {
t.plan(0);
merge()
.on("data", () => t.pass())
.on("error", t.end)
.on("end", t.end);
});

78
tests/parallelMap.spec.ts Normal file
View File

@@ -0,0 +1,78 @@
import { Readable } from "stream";
import { performance } from "perf_hooks";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
import { sleep } from "../src/helpers";
const { parallelMap } = mhysa({ objectMode: true });
test.cb("parallelMap() parallel mapping", t => {
t.plan(6);
const offset = 50;
const source = new Readable({ objectMode: true });
const expectedElements = [
"a_processed",
"b_processed",
"c_processed",
"d_processed",
"e_processed",
"f_processed",
];
interface IPerfData {
start: number;
output?: string;
finish?: number;
}
const orderedResults: IPerfData[] = [];
source
.pipe(
parallelMap(async (data: any) => {
const perfData: IPerfData = { start: performance.now() };
const c = data + "_processed";
perfData.output = c;
await sleep(offset);
perfData.finish = performance.now();
orderedResults.push(perfData);
return c;
}, 2),
)
.on("data", (element: string) => {
t.true(expectedElements.includes(element));
})
.on("error", t.end)
.on("end", async () => {
expect(orderedResults[0].finish).to.be.lessThan(
orderedResults[2].start,
);
expect(orderedResults[1].finish).to.be.lessThan(
orderedResults[3].start,
);
expect(orderedResults[2].finish).to.be.lessThan(
orderedResults[4].start,
);
expect(orderedResults[3].finish).to.be.lessThan(
orderedResults[5].start,
);
expect(orderedResults[0].start).to.be.lessThan(
orderedResults[2].start + offset,
);
expect(orderedResults[1].start).to.be.lessThan(
orderedResults[3].start + offset,
);
expect(orderedResults[2].start).to.be.lessThan(
orderedResults[4].start + offset,
);
expect(orderedResults[3].start).to.be.lessThan(
orderedResults[5].start + offset,
);
t.end();
});
source.push("a");
source.push("b");
source.push("c");
source.push("d");
source.push("e");
source.push("f");
source.push(null);
});

45
tests/parse.spec.ts Normal file
View File

@@ -0,0 +1,45 @@
import { Readable, finished } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { parse } = mhysa();
test.cb("parse() parses the streamed elements as JSON", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["abc", {}, []];
let i = 0;
source
.pipe(parse())
.on("data", part => {
expect(part).to.deep.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push('"abc"');
source.push("{}");
source.push("[]");
source.push(null);
});
test.cb("parse() emits errors on invalid JSON", t => {
t.plan(1);
const source = new Readable({ objectMode: true });
source
.pipe(parse())
.resume()
.on("error", (d: any) => {
t.pass();
t.end();
})
.on("end", t.fail);
source.push("{}");
source.push({});
source.push([]);
source.push(null);
});

90
tests/rate.spec.ts Normal file
View File

@@ -0,0 +1,90 @@
import { Readable } from "stream";
import { performance } from "perf_hooks";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { rate } = mhysa({ objectMode: true });
test.cb("rate() sends data at a rate of 150", t => {
t.plan(5);
const targetRate = 150;
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c", "d", "e"];
const start = performance.now();
let i = 0;
source
.pipe(rate(targetRate))
.on("data", (element: string[]) => {
const currentRate = (i / (performance.now() - start)) * 1000;
expect(element).to.deep.equal(expectedElements[i]);
expect(currentRate).lessThan(targetRate);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push("d");
source.push("e");
source.push(null);
});
test.cb("rate() sends data at a rate of 50", t => {
t.plan(5);
const targetRate = 50;
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c", "d", "e"];
const start = performance.now();
let i = 0;
source
.pipe(rate(targetRate))
.on("data", (element: string[]) => {
const currentRate = (i / (performance.now() - start)) * 1000;
expect(element).to.deep.equal(expectedElements[i]);
expect(currentRate).lessThan(targetRate);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push("d");
source.push("e");
source.push(null);
});
test.cb("rate() sends data at a rate of 1", t => {
t.plan(5);
const targetRate = 1;
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c", "d", "e"];
const start = performance.now();
let i = 0;
source
.pipe(rate(targetRate))
.on("data", (element: string[]) => {
const currentRate = (i / (performance.now() - start)) * 1000;
expect(element).to.deep.equal(expectedElements[i]);
expect(currentRate).lessThan(targetRate);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push("d");
source.push("e");
source.push(null);
});

99
tests/reduce.spec.ts Normal file
View File

@@ -0,0 +1,99 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { reduce } = mhysa({ objectMode: true });
test.cb("reduce() reduces elements synchronously", t => {
t.plan(1);
const source = new Readable({ objectMode: true });
const expectedValue = 6;
source
.pipe(reduce((acc: number, element: string) => acc + element.length, 0))
.on("data", (element: string) => {
expect(element).to.equal(expectedValue);
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb("reduce() reduces elements asynchronously", t => {
t.plan(1);
const source = new Readable({ objectMode: true });
const expectedValue = 6;
source
.pipe(
reduce(async (acc: number, element: string) => {
await Promise.resolve();
return acc + element.length;
}, 0),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedValue);
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb.skip("reduce() emits errors during synchronous reduce", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
reduce((acc: number, element: string) => {
if (element !== "ab") {
throw new Error("Failed reduce");
}
return acc + element.length;
}, 0),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed reduce");
t.pass();
})
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb.skip("reduce() emits errors during asynchronous reduce", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
reduce(async (acc: number, element: string) => {
await Promise.resolve();
if (element !== "ab") {
throw new Error("Failed mapping");
}
return acc + element.length;
}, 0),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});

81
tests/replace.spec.ts Normal file
View File

@@ -0,0 +1,81 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { replace } = mhysa();
test.cb(
"replace() replaces occurrences of the given string in the streamed elements with the specified " +
"replacement string",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["abc", "xyf", "ghi"];
let i = 0;
source
.pipe(replace("de", "xy"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push("def");
source.push("ghi");
source.push(null);
},
);
test.cb(
"replace() replaces occurrences of the given regular expression in the streamed elements with " +
"the specified replacement string",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["abc", "xyz", "ghi"];
let i = 0;
source
.pipe(replace(/^def$/, "xyz"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push("def");
source.push("ghi");
source.push(null);
},
);
test.cb(
"replace() replaces occurrences of the given multi-byte character even if it spans multiple chunks",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["ø", "O", "a"];
let i = 0;
source
.pipe(replace("ö", "O"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ø").slice(1, 2));
source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ö").slice(1, 2));
source.push("a");
source.push(null);
},
);

99
tests/split.spec.ts Normal file
View File

@@ -0,0 +1,99 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { split } = mhysa();
test.cb("split() splits chunks using the default separator (\\n)", t => {
t.plan(5);
const source = new Readable({ objectMode: true });
const expectedParts = ["ab", "c", "d", "ef", ""];
let i = 0;
source
.pipe(split())
.on("data", part => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("ab\n");
source.push("c");
source.push("\n");
source.push("d");
source.push("\nef\n");
source.push(null);
});
test.cb("split() splits chunks using the specified separator", t => {
t.plan(6);
const source = new Readable({ objectMode: true });
const expectedParts = ["ab", "c", "d", "e", "f", ""];
let i = 0;
source
.pipe(split("|"))
.on("data", (part: string) => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("ab|");
source.push("c|d");
source.push("|");
source.push("e");
source.push("|f|");
source.push(null);
});
test.cb(
"split() splits utf8 encoded buffers using the specified separator",
t => {
t.plan(3);
const expectedElements = ["a", "b", "c"];
let i = 0;
const through = split(",");
const buf = Buffer.from("a,b,c");
through
.on("data", element => {
expect(element).to.equal(expectedElements[i]);
i++;
t.pass();
})
.on("error", t.end)
.on("end", t.end);
for (let j = 0; j < buf.length; ++j) {
through.write(buf.slice(j, j + 1));
}
through.end();
},
);
test.cb(
"split() splits utf8 encoded buffers with multi-byte characters using the specified separator",
t => {
t.plan(3);
const expectedElements = ["一", "一", "一"];
let i = 0;
const through = split(",");
const buf = Buffer.from("一,一,一"); // Those spaces are multi-byte utf8 characters (code: 4E00)
through
.on("data", element => {
expect(element).to.equal(expectedElements[i]);
i++;
t.pass();
})
.on("error", t.end)
.on("end", t.end);
for (let j = 0; j < buf.length; ++j) {
through.write(buf.slice(j, j + 1));
}
through.end();
},
);

62
tests/stringify.spec.ts Normal file
View File

@@ -0,0 +1,62 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { stringify } = mhysa();
test.cb("stringify() stringifies the streamed elements as JSON", t => {
t.plan(4);
const source = new Readable({ objectMode: true });
const expectedElements = [
'"abc"',
"0",
'{"a":"a","b":"b","c":"c"}',
'["a","b","c"]',
];
let i = 0;
source
.pipe(stringify())
.on("data", part => {
expect(part).to.deep.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push(0);
source.push({ a: "a", b: "b", c: "c" });
source.push(["a", "b", "c"]);
source.push(null);
});
test.cb(
"stringify() stringifies the streamed elements as pretty-printed JSON",
t => {
t.plan(4);
const source = new Readable({ objectMode: true });
const expectedElements = [
'"abc"',
"0",
'{\n "a": "a",\n "b": "b",\n "c": "c"\n}',
'[\n "a",\n "b",\n "c"\n]',
];
let i = 0;
source
.pipe(stringify({ pretty: true }))
.on("data", part => {
expect(part).to.deep.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push(0);
source.push({ a: "a", b: "b", c: "c" });
source.push(["a", "b", "c"]);
source.push(null);
},
);

27
tests/unbatch.spec.ts Normal file
View File

@@ -0,0 +1,27 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import mhysa from "../src";
const { unbatch, batch } = mhysa({ objectMode: true });
test.cb("unbatch() unbatches", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "b", "c"];
let i = 0;
source
.pipe(batch(3))
.pipe(unbatch())
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});

View File

@@ -1,18 +1,21 @@
{
"compilerOptions": {
"noImplicitAny": true,
"strictNullChecks": true,
"noImplicitReturns": true,
"noUnusedLocals": true,
"noImplicitThis": true,
"forceConsistentCasingInFileNames": true,
"suppressImplicitAnyIndexErrors": true,
"outDir": "./dist",
"module": "commonjs",
"target": "es5",
"lib": ["es2016"],
"sourceMap": true,
"declaration": true
},
"include": ["src/**/*.ts"]
"compilerOptions": {
"noImplicitAny": true,
"strictNullChecks": true,
"noImplicitReturns": true,
"noUnusedLocals": false,
"noImplicitThis": true,
"forceConsistentCasingInFileNames": true,
"suppressImplicitAnyIndexErrors": true,
"outDir": "./dist",
"module": "commonjs"
},
"target": "es5",
"lib": [
"es2016"
],
"sourceMap": true,
"declaration": true,
"include": ["src/**/*"],
"exclude": ["tests", "node_modules"]
}

View File

@@ -9,6 +9,7 @@
"no-implicit-dependencies": [true, "dev"],
"prettier": [true, ".prettierrc"],
"ordered-imports": false,
"interface-name": false
"interface-name": false,
"object-literal-sort-keys": false
}
}

2941
yarn.lock

File diff suppressed because it is too large Load Diff