Fix a few thing with compose
This commit is contained in:
parent
ff2b652ddf
commit
4b806c4d4e
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@jogogo/mhysa",
|
||||
"version": "0.0.1-beta.4",
|
||||
"version": "0.1.0-alpha.1",
|
||||
"description": "Streams and event emitter utils for Node.js",
|
||||
"keywords": [
|
||||
"promise",
|
||||
@ -35,7 +35,7 @@
|
||||
"type": "git"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "NODE_PATH=src node node_modules/.bin/ava 'tests/*.spec.ts' -e",
|
||||
"test": "NODE_PATH=src node node_modules/.bin/ava tests/*.spec.ts -e",
|
||||
"test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.js",
|
||||
"test:all": "NODE_PATH=src node node_modules/.bin/ava",
|
||||
"lint": "tslint -p tsconfig.json",
|
||||
@ -47,7 +47,7 @@
|
||||
"@types/chai": "^4.1.7",
|
||||
"@types/node": "^12.7.2",
|
||||
"@types/sinon": "^7.0.13",
|
||||
"ava": "^1.0.0-rc.2",
|
||||
"ava": "^2.4.0",
|
||||
"chai": "^4.2.0",
|
||||
"mhysa": "./",
|
||||
"prettier": "^1.14.3",
|
||||
|
@ -1,16 +1,17 @@
|
||||
import { pipeline, Duplex, DuplexOptions } from "stream";
|
||||
import { pipeline, TransformOptions, Transform } from "stream";
|
||||
|
||||
export function compose(
|
||||
streams: Array<
|
||||
NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream
|
||||
>,
|
||||
options?: DuplexOptions,
|
||||
errorCallback?: (err: any) => void,
|
||||
options?: TransformOptions,
|
||||
): Compose {
|
||||
if (streams.length < 2) {
|
||||
throw new Error("At least two streams are required to compose");
|
||||
}
|
||||
|
||||
return new Compose(streams, options);
|
||||
return new Compose(streams, errorCallback, options);
|
||||
}
|
||||
|
||||
enum EventSubscription {
|
||||
@ -20,46 +21,72 @@ enum EventSubscription {
|
||||
Self,
|
||||
}
|
||||
|
||||
const eventsTarget = {
|
||||
close: EventSubscription.Last,
|
||||
data: EventSubscription.Last,
|
||||
drain: EventSubscription.Self,
|
||||
end: EventSubscription.Last,
|
||||
error: EventSubscription.Self,
|
||||
finish: EventSubscription.Last,
|
||||
pause: EventSubscription.Last,
|
||||
pipe: EventSubscription.First,
|
||||
readable: EventSubscription.Last,
|
||||
resume: EventSubscription.Last,
|
||||
unpipe: EventSubscription.First,
|
||||
};
|
||||
|
||||
type AllStreams =
|
||||
| NodeJS.ReadableStream
|
||||
| NodeJS.ReadWriteStream
|
||||
| NodeJS.WritableStream;
|
||||
|
||||
export class Compose extends Duplex {
|
||||
function isReadable(stream: AllStreams): stream is NodeJS.WritableStream {
|
||||
return (
|
||||
(stream as NodeJS.ReadableStream).pipe !== undefined &&
|
||||
(stream as any).readable === true
|
||||
);
|
||||
}
|
||||
|
||||
export class Compose extends Transform {
|
||||
private first: AllStreams;
|
||||
private last: AllStreams;
|
||||
private streams: AllStreams[];
|
||||
private inputStream: ReadableStream;
|
||||
|
||||
constructor(streams: AllStreams[], options?: DuplexOptions) {
|
||||
constructor(
|
||||
streams: AllStreams[],
|
||||
errorCallback?: (err: any) => void,
|
||||
options?: TransformOptions,
|
||||
) {
|
||||
super(options);
|
||||
this.first = streams[0];
|
||||
this.last = streams[streams.length - 1];
|
||||
this.streams = streams;
|
||||
pipeline(streams, (err: any) => {
|
||||
this.emit("error", err);
|
||||
pipeline(
|
||||
streams,
|
||||
errorCallback ||
|
||||
((error: any) => {
|
||||
if (error) {
|
||||
this.emit("error", error);
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
if (isReadable(this.last)) {
|
||||
(this.last as NodeJS.ReadWriteStream).pipe(
|
||||
new Transform({
|
||||
...options,
|
||||
transform: (d: any, encoding, cb) => {
|
||||
this.push(d);
|
||||
cb();
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public _transform(chunk: any, encoding: string, cb: any) {
|
||||
(this.first as NodeJS.WritableStream).write(chunk, encoding, cb);
|
||||
}
|
||||
|
||||
public _flush(cb: any) {
|
||||
if (isReadable(this.first)) {
|
||||
(this.first as any).push(null);
|
||||
}
|
||||
this.last.once("end", () => {
|
||||
cb();
|
||||
});
|
||||
}
|
||||
|
||||
public pipe<T extends NodeJS.WritableStream>(dest: T) {
|
||||
return (this.last as NodeJS.ReadableStream).pipe(dest);
|
||||
}
|
||||
|
||||
public _write(chunk: any, encoding: string, cb: any) {
|
||||
(this.first as NodeJS.WritableStream).write(chunk, encoding, cb);
|
||||
public _destroy(error: any, cb: (error?: any) => void) {
|
||||
this.streams.forEach(s => (s as any).destroy());
|
||||
cb(error);
|
||||
}
|
||||
|
||||
public bubble(...events: string[]) {
|
||||
@ -69,38 +96,4 @@ export class Compose extends Duplex {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public on(event: string, cb: any) {
|
||||
switch (eventsTarget[event]) {
|
||||
case EventSubscription.First:
|
||||
this.first.on(event, cb);
|
||||
break;
|
||||
case EventSubscription.Last:
|
||||
this.last.on(event, cb);
|
||||
break;
|
||||
case EventSubscription.All:
|
||||
this.streams.forEach(s => s.on(event, cb));
|
||||
break;
|
||||
default:
|
||||
super.on(event, cb);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public once(event: string, cb: any) {
|
||||
switch (eventsTarget[event]) {
|
||||
case EventSubscription.First:
|
||||
this.first.once(event, cb);
|
||||
break;
|
||||
case EventSubscription.Last:
|
||||
this.last.once(event, cb);
|
||||
break;
|
||||
case EventSubscription.All:
|
||||
this.streams.forEach(s => s.once(event, cb));
|
||||
break;
|
||||
default:
|
||||
super.once(event, cb);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
@ -121,6 +121,10 @@ export default function mhysa(defaultOptions?: TransformOptions) {
|
||||
/**
|
||||
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
|
||||
* must be a fully defined JSON string in utf8.
|
||||
* @param format: @type SerializationFormats defaults SerializationFormats.utf8
|
||||
* @param emitError: @type boolean Whether or not to emit an error when
|
||||
* failing to parse. An error will automatically close the stream.
|
||||
* Defaults to true.
|
||||
*/
|
||||
parse,
|
||||
|
||||
@ -245,9 +249,10 @@ export default function mhysa(defaultOptions?: TransformOptions) {
|
||||
/**
|
||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||
* @param streams Array of streams to compose. Minimum of two.
|
||||
* @param errorCallback a function that handles any error coming out of the pipeline
|
||||
* @param options Transform stream options
|
||||
*/
|
||||
compose: withDefaultOptions(1, compose),
|
||||
compose: withDefaultOptions(2, compose),
|
||||
|
||||
/**
|
||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||
|
@ -4,6 +4,7 @@ import { SerializationFormats } from "./baseDefinitions";
|
||||
|
||||
export function parse(
|
||||
format: SerializationFormats = SerializationFormats.utf8,
|
||||
emitError: boolean = true,
|
||||
): Transform {
|
||||
const decoder = new StringDecoder(format);
|
||||
return new Transform({
|
||||
@ -13,9 +14,13 @@ export function parse(
|
||||
try {
|
||||
const asString = decoder.write(chunk);
|
||||
// Using await causes parsing errors to be emitted
|
||||
callback(undefined, await JSON.parse(asString));
|
||||
callback(null, await JSON.parse(asString));
|
||||
} catch (err) {
|
||||
if (emitError) {
|
||||
callback(err);
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
|
@ -1,6 +1,7 @@
|
||||
const test = require("ava");
|
||||
const { expect } = require("chai");
|
||||
const { sleep } = require("../src/helpers");
|
||||
import * as test from "ava";
|
||||
import { expect } from "chai";
|
||||
import { sleep } from "../src/helpers";
|
||||
import { Readable, Writable } from "stream";
|
||||
import mhysa from "../src";
|
||||
import { performance } from "perf_hooks";
|
||||
const { compose, map } = mhysa({ objectMode: true });
|
||||
@ -22,10 +23,7 @@ test.cb("compose() chains two streams together in the correct order", t => {
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
{ objectMode: true },
|
||||
);
|
||||
const composed = compose([first, second]);
|
||||
|
||||
composed.on("data", data => {
|
||||
expect(data).to.deep.equal(result[i]);
|
||||
@ -35,12 +33,6 @@ test.cb("compose() chains two streams together in the correct order", t => {
|
||||
t.end();
|
||||
}
|
||||
});
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
composed.on("end", () => {
|
||||
t.end();
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
@ -72,10 +64,7 @@ test.cb("piping compose() maintains correct order", t => {
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
{ objectMode: true },
|
||||
);
|
||||
const composed = compose([first, second]);
|
||||
const third = map((chunk: Chunk) => {
|
||||
chunk.visited.push(3);
|
||||
return chunk;
|
||||
@ -115,28 +104,19 @@ test("compose() writable length should be less than highWaterMark when handing w
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{
|
||||
objectMode: true,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
const second = map(
|
||||
async (chunk: Chunk) => {
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ objectMode: true },
|
||||
);
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
{ objectMode: true, highWaterMark: 2 },
|
||||
);
|
||||
const composed = compose([first, second], undefined, {
|
||||
highWaterMark: 2,
|
||||
});
|
||||
composed.on("error", err => {
|
||||
reject();
|
||||
});
|
||||
@ -180,29 +160,20 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_rate);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{
|
||||
objectMode: true,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
const second = map(
|
||||
async (chunk: Chunk) => {
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ objectMode: true },
|
||||
);
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
{ objectMode: true, highWaterMark },
|
||||
);
|
||||
const composed = compose([first, second], undefined, {
|
||||
highWaterMark,
|
||||
});
|
||||
composed.on("error", err => {
|
||||
reject();
|
||||
});
|
||||
@ -255,29 +226,20 @@ test.cb(
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_rate);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{
|
||||
objectMode: true,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
const second = map(
|
||||
async (chunk: Chunk) => {
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ objectMode: true },
|
||||
);
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
{ objectMode: true, highWaterMark: 5 },
|
||||
);
|
||||
const composed = compose([first, second], undefined, {
|
||||
highWaterMark: 5,
|
||||
});
|
||||
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
@ -322,15 +284,10 @@ test.cb(
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(
|
||||
(chunk: Chunk) => {
|
||||
const first = map((chunk: Chunk) => {
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{
|
||||
objectMode: true,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
const second = map(
|
||||
async (chunk: Chunk) => {
|
||||
@ -341,13 +298,12 @@ test.cb(
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ objectMode: true, highWaterMark: 1 },
|
||||
{ highWaterMark: 1 },
|
||||
);
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
{ objectMode: true, highWaterMark: 5 },
|
||||
);
|
||||
const composed = compose([first, second], undefined, {
|
||||
highWaterMark: 5,
|
||||
});
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
@ -398,7 +354,6 @@ test.cb(
|
||||
return chunk;
|
||||
},
|
||||
{
|
||||
objectMode: true,
|
||||
highWaterMark: 2,
|
||||
},
|
||||
);
|
||||
@ -410,13 +365,12 @@ test.cb(
|
||||
chunk.mapped.push("second");
|
||||
return chunk;
|
||||
},
|
||||
{ objectMode: true, highWaterMark: 2 },
|
||||
{ highWaterMark: 2 },
|
||||
);
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
{ objectMode: true, highWaterMark: 5 },
|
||||
);
|
||||
const composed = compose([first, second], undefined, {
|
||||
highWaterMark: 5,
|
||||
});
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
});
|
||||
@ -458,29 +412,20 @@ test.cb(
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
const first = map(
|
||||
async (chunk: Chunk) => {
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_rate);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
},
|
||||
{
|
||||
objectMode: true,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
const second = map(
|
||||
async (chunk: Chunk) => {
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
},
|
||||
{ objectMode: true },
|
||||
);
|
||||
});
|
||||
|
||||
const composed = compose(
|
||||
[first, second],
|
||||
{ objectMode: true, highWaterMark: 6 },
|
||||
);
|
||||
const composed = compose([first, second], undefined, {
|
||||
highWaterMark: 6,
|
||||
});
|
||||
|
||||
composed.on("error", err => {
|
||||
t.end(err);
|
||||
@ -510,3 +455,124 @@ test.cb(
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("compose() should be 'destroyable'", t => {
|
||||
t.plan(3);
|
||||
const _sleep = 100;
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
await sleep(_sleep);
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose([first, second], (err: any) => {
|
||||
t.pass();
|
||||
});
|
||||
|
||||
const fakeSource = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
return;
|
||||
},
|
||||
});
|
||||
|
||||
const fakeSink = new Writable({
|
||||
objectMode: true,
|
||||
write(data, enc, cb) {
|
||||
const cur = input.shift();
|
||||
t.is(cur.key, data.key);
|
||||
t.deepEqual(cur.mapped, [1, 2]);
|
||||
if (cur.key === "a") {
|
||||
composed.destroy();
|
||||
}
|
||||
cb();
|
||||
},
|
||||
});
|
||||
|
||||
composed.on("close", t.end);
|
||||
fakeSource.pipe(composed).pipe(fakeSink);
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
fakeSource.push(input[0]);
|
||||
fakeSource.push(input[1]);
|
||||
fakeSource.push(input[2]);
|
||||
fakeSource.push(input[3]);
|
||||
fakeSource.push(input[4]);
|
||||
});
|
||||
|
||||
test.cb("compose() `finish` and `end` propagates", t => {
|
||||
interface Chunk {
|
||||
key: string;
|
||||
mapped: number[];
|
||||
}
|
||||
|
||||
t.plan(8);
|
||||
const first = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(1);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const second = map(async (chunk: Chunk) => {
|
||||
chunk.mapped.push(2);
|
||||
return chunk;
|
||||
});
|
||||
|
||||
const composed = compose([first, second], undefined, {
|
||||
highWaterMark: 3,
|
||||
});
|
||||
|
||||
const fakeSource = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
return;
|
||||
},
|
||||
});
|
||||
const sink = map((d: Chunk) => {
|
||||
const curr = input.shift();
|
||||
t.is(curr.key, d.key);
|
||||
t.deepEqual(d.mapped, [1, 2]);
|
||||
});
|
||||
|
||||
fakeSource.pipe(composed).pipe(sink);
|
||||
|
||||
fakeSource.on("end", () => {
|
||||
t.pass();
|
||||
});
|
||||
composed.on("finish", () => {
|
||||
t.pass();
|
||||
});
|
||||
composed.on("end", () => {
|
||||
t.pass();
|
||||
t.end();
|
||||
});
|
||||
sink.on("finish", () => {
|
||||
t.pass();
|
||||
});
|
||||
|
||||
const input = [
|
||||
{ key: "a", mapped: [] },
|
||||
{ key: "b", mapped: [] },
|
||||
{ key: "c", mapped: [] },
|
||||
{ key: "d", mapped: [] },
|
||||
{ key: "e", mapped: [] },
|
||||
];
|
||||
fakeSource.push(input[0]);
|
||||
fakeSource.push(input[1]);
|
||||
fakeSource.push(null);
|
||||
});
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { Readable } from "stream";
|
||||
import { Readable, finished } from "stream";
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import mhysa from "../src";
|
||||
@ -26,13 +26,17 @@ test.cb("parse() parses the streamed elements as JSON", t => {
|
||||
});
|
||||
|
||||
test.cb("parse() emits errors on invalid JSON", t => {
|
||||
t.plan(2);
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
|
||||
source
|
||||
.pipe(parse())
|
||||
.resume()
|
||||
.on("error", () => t.pass())
|
||||
.on("end", t.end);
|
||||
.on("error", (d: any) => {
|
||||
t.pass();
|
||||
t.end();
|
||||
})
|
||||
.on("end", t.fail);
|
||||
|
||||
source.push("{}");
|
||||
source.push({});
|
||||
|
Loading…
Reference in New Issue
Block a user