From 505fefeeb50c58c2b7bf9433caba4ff5cb2e6f37 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 15 Aug 2019 17:06:54 -0400 Subject: [PATCH] Save --- src/functions/batch/batch.spec.ts | 58 ++++++ src/functions/child/child.spec.ts | 28 +++ src/functions/child/index.ts | 2 + src/functions/collect/collect.spec.ts | 132 +++++++++++++ src/functions/collect/index.ts | 2 +- src/functions/concat/concat.spec.ts | 180 ++++++++++++++++++ src/functions/concat/index.ts | 2 +- src/functions/duplex/duplex.spec.ts | 28 +++ src/functions/duplex/index.ts | 7 +- src/functions/flatMap/index.ts | 2 +- src/functions/fromArray/index.ts | 2 +- src/functions/index.ts | 52 ++--- src/functions/join/index.ts | 2 +- src/functions/last/index.ts | 3 +- src/functions/last/last.spec.ts | 15 ++ src/functions/merge/merge.spec.ts | 60 ++++++ src/functions/parallelMap/parallelMap.spec.ts | 77 ++++++++ src/functions/parse/index.ts | 2 +- src/functions/parse/parse.spec.ts | 40 ++++ src/functions/rate/index.ts | 3 +- src/functions/rate/rate.spec.ts | 67 +++++++ src/functions/replace/index.ts | 2 +- src/functions/split/index.ts | 2 +- src/functions/stringify/index.ts | 2 +- src/functions/stringify/stringify.spec.ts | 61 ++++++ src/functions/unbatch/unbatch.spec.ts | 26 +++ 26 files changed, 812 insertions(+), 45 deletions(-) create mode 100644 src/functions/batch/batch.spec.ts create mode 100644 src/functions/child/child.spec.ts create mode 100644 src/functions/collect/collect.spec.ts create mode 100644 src/functions/concat/concat.spec.ts create mode 100644 src/functions/duplex/duplex.spec.ts create mode 100644 src/functions/last/last.spec.ts create mode 100644 src/functions/merge/merge.spec.ts create mode 100644 src/functions/parallelMap/parallelMap.spec.ts create mode 100644 src/functions/parse/parse.spec.ts create mode 100644 src/functions/rate/rate.spec.ts create mode 100644 src/functions/stringify/stringify.spec.ts create mode 100644 src/functions/unbatch/unbatch.spec.ts diff --git a/src/functions/batch/batch.spec.ts b/src/functions/batch/batch.spec.ts new file mode 100644 index 0000000..056af0d --- /dev/null +++ b/src/functions/batch/batch.spec.ts @@ -0,0 +1,58 @@ +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { batch } from "."; + +test.cb("batch() batches chunks together", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = [["a", "b", "c"], ["d", "e", "f"], ["g"]]; + let i = 0; + source + .pipe(batch(3)) + .on("data", (element: string[]) => { + expect(element).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push("d"); + source.push("e"); + source.push("f"); + source.push("g"); + source.push(null); +}); + +test.cb("batch() yields a batch after the timeout", t => { + t.plan(3); + const source = new Readable({ + objectMode: true, + read(size: number) {}, + }); + const expectedElements = [["a", "b"], ["c"], ["d"]]; + let i = 0; + source + .pipe(batch(3)) + .on("data", (element: string[]) => { + expect(element).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.fail) + .on("end", t.end); + + source.push("a"); + source.push("b"); + setTimeout(() => { + source.push("c"); + }, 600); + setTimeout(() => { + source.push("d"); + source.push(null); + }, 600 * 2); +}); diff --git a/src/functions/child/child.spec.ts b/src/functions/child/child.spec.ts new file mode 100644 index 0000000..fd1ae79 --- /dev/null +++ b/src/functions/child/child.spec.ts @@ -0,0 +1,28 @@ +import * as cp from "child_process"; +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { child } from "."; + +test.cb( + "child() allows easily writing to child process stdin and reading from its stdout", + t => { + t.plan(1); + const source = new Readable(); + const catProcess = cp.exec("cat"); + let out = ""; + source + .pipe(child(catProcess)) + .on("data", chunk => (out += chunk)) + .on("error", t.end) + .on("end", () => { + expect(out).to.equal("abcdef"); + t.pass(); + t.end(); + }); + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); + }, +); diff --git a/src/functions/child/index.ts b/src/functions/child/index.ts index efe4f90..e564eec 100644 --- a/src/functions/child/index.ts +++ b/src/functions/child/index.ts @@ -1,3 +1,5 @@ +import { ChildProcess } from "child_process"; +import { duplex } from "../baseFunctions"; /** * Return a Duplex stream from a child process' stdin and stdout * @param childProcess Child process from which to create duplex stream diff --git a/src/functions/collect/collect.spec.ts b/src/functions/collect/collect.spec.ts new file mode 100644 index 0000000..b585fe9 --- /dev/null +++ b/src/functions/collect/collect.spec.ts @@ -0,0 +1,132 @@ +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { collect } from "."; + +test.cb( + "collect() collects streamed elements into an array (object, flowing mode)", + t => { + t.plan(1); + const source = new Readable({ objectMode: true }); + + source + .pipe(collect({ objectMode: true })) + .on("data", collected => { + expect(collected).to.deep.equal(["a", "b", "c"]); + t.pass(); + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); + }, +); + +test.cb( + "collect() collects streamed elements into an array (object, paused mode)", + t => { + t.plan(1); + const source = new Readable({ objectMode: true }); + const collector = source.pipe(collect({ objectMode: true })); + + collector + .on("readable", () => { + let collected = collector.read(); + while (collected !== null) { + expect(collected).to.deep.equal(["a", "b", "c"]); + t.pass(); + collected = collector.read(); + } + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); + }, +); + +test.cb( + "collect() collects streamed bytes into a buffer (non-object, flowing mode)", + t => { + t.plan(1); + const source = new Readable({ objectMode: false }); + + source + .pipe(collect()) + .on("data", collected => { + expect(collected).to.deep.equal(Buffer.from("abc")); + t.pass(); + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); + }, +); + +test.cb( + "collect() collects streamed bytes into a buffer (non-object, paused mode)", + t => { + t.plan(1); + const source = new Readable({ objectMode: false }); + const collector = source.pipe(collect({ objectMode: false })); + collector + .on("readable", () => { + let collected = collector.read(); + while (collected !== null) { + expect(collected).to.deep.equal(Buffer.from("abc")); + t.pass(); + collected = collector.read(); + } + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); + }, +); + +test.cb( + "collect() emits an empty array if the source was empty (object mode)", + t => { + t.plan(1); + const source = new Readable({ objectMode: true }); + const collector = source.pipe(collect({ objectMode: true })); + collector + .on("data", collected => { + expect(collected).to.deep.equal([]); + t.pass(); + }) + .on("error", t.end) + .on("end", t.end); + + source.push(null); + }, +); + +test.cb( + "collect() emits nothing if the source was empty (non-object mode)", + t => { + t.plan(0); + const source = new Readable({ objectMode: false }); + const collector = source.pipe(collect({ objectMode: false })); + collector + .on("data", () => t.fail()) + .on("error", t.end) + .on("end", t.end); + + source.push(null); + }, +); diff --git a/src/functions/collect/index.ts b/src/functions/collect/index.ts index fbed881..57dd8e0 100644 --- a/src/functions/collect/index.ts +++ b/src/functions/collect/index.ts @@ -7,7 +7,7 @@ import { ThroughOptions } from "../baseDefinitions"; */ export function collect( options: ThroughOptions = { objectMode: false }, -): NodeJS.ReadWriteStream { +): Transform { const collected: any[] = []; return new Transform({ readableObjectMode: options.objectMode, diff --git a/src/functions/concat/concat.spec.ts b/src/functions/concat/concat.spec.ts new file mode 100644 index 0000000..0750174 --- /dev/null +++ b/src/functions/concat/concat.spec.ts @@ -0,0 +1,180 @@ +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { concat, collect } from "../baseFunctions"; + +test.cb( + "concat() concatenates multiple readable streams (object, flowing mode)", + t => { + t.plan(6); + const source1 = new Readable({ objectMode: true }); + const source2 = new Readable({ objectMode: true }); + const expectedElements = ["a", "b", "c", "d", "e", "f"]; + let i = 0; + concat(source1, source2) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source1.push("a"); + source2.push("d"); + source1.push("b"); + source2.push("e"); + source1.push("c"); + source2.push("f"); + source2.push(null); + source1.push(null); + }, +); + +test.cb( + "concat() concatenates multiple readable streams (object, paused mode)", + t => { + t.plan(6); + const source1 = new Readable({ objectMode: true }); + const source2 = new Readable({ objectMode: true }); + const expectedElements = ["a", "b", "c", "d", "e", "f"]; + let i = 0; + const concatenation = concat(source1, source2) + .on("readable", () => { + let element = concatenation.read(); + while (element !== null) { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + element = concatenation.read(); + } + }) + .on("error", t.end) + .on("end", t.end); + + source1.push("a"); + source2.push("d"); + source1.push("b"); + source2.push("e"); + source1.push("c"); + source2.push("f"); + source2.push(null); + source1.push(null); + }, +); + +test.cb( + "concat() concatenates multiple readable streams (non-object, flowing mode)", + t => { + t.plan(6); + const source1 = new Readable({ objectMode: false }); + const source2 = new Readable({ objectMode: false }); + const expectedElements = ["a", "b", "c", "d", "e", "f"]; + let i = 0; + concat(source1, source2) + .on("data", (element: string) => { + expect(element).to.deep.equal(Buffer.from(expectedElements[i])); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source1.push("a"); + source2.push("d"); + source1.push("b"); + source2.push("e"); + source1.push("c"); + source2.push("f"); + source2.push(null); + source1.push(null); + }, +); + +test.cb( + "concat() concatenates multiple readable streams (non-object, paused mode)", + t => { + t.plan(6); + const source1 = new Readable({ objectMode: false, read: () => ({}) }); + const source2 = new Readable({ objectMode: false, read: () => ({}) }); + const expectedElements = ["a", "b", "c", "d", "e", "f"]; + let i = 0; + const concatenation = concat(source1, source2) + .on("readable", () => { + let element = concatenation.read(); + while (element !== null) { + expect(element).to.deep.equal( + Buffer.from(expectedElements[i]), + ); + t.pass(); + i++; + element = concatenation.read(); + } + }) + .on("error", t.end) + .on("end", t.end); + + source1.push("a"); + setTimeout(() => source2.push("d"), 10); + setTimeout(() => source1.push("b"), 20); + setTimeout(() => source2.push("e"), 30); + setTimeout(() => source1.push("c"), 40); + setTimeout(() => source2.push("f"), 50); + setTimeout(() => source2.push(null), 60); + setTimeout(() => source1.push(null), 70); + }, +); + +test.cb("concat() concatenates a single readable stream (object mode)", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["a", "b", "c", "d", "e", "f"]; + let i = 0; + concat(source) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb( + "concat() concatenates a single readable stream (non-object mode)", + t => { + t.plan(3); + const source = new Readable({ objectMode: false }); + const expectedElements = ["a", "b", "c", "d", "e", "f"]; + let i = 0; + concat(source) + .on("data", (element: string) => { + expect(element).to.deep.equal(Buffer.from(expectedElements[i])); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); + }, +); + +test.cb("concat() concatenates empty list of readable streams", t => { + t.plan(0); + concat() + .pipe(collect()) + .on("data", _ => { + t.fail(); + }) + .on("error", t.end) + .on("end", t.end); +}); diff --git a/src/functions/concat/index.ts b/src/functions/concat/index.ts index 8064b30..af79db9 100644 --- a/src/functions/concat/index.ts +++ b/src/functions/concat/index.ts @@ -3,7 +3,7 @@ import { Readable } from "stream"; * Return a Readable stream of readable streams concatenated together * @param streams Readable streams to concatenate */ -export function concat(...streams: Readable[]): Readable { +export function concat(...streams: NodeJS.ReadableStream[]): Readable { let isStarted = false; let currentStreamIndex = 0; const startCurrentStream = () => { diff --git a/src/functions/duplex/duplex.spec.ts b/src/functions/duplex/duplex.spec.ts new file mode 100644 index 0000000..c1ef28b --- /dev/null +++ b/src/functions/duplex/duplex.spec.ts @@ -0,0 +1,28 @@ +import * as cp from "child_process"; +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { duplex } from "../baseFunctions"; + +test.cb( + "duplex() combines a writable and readable stream into a ReadWrite stream", + t => { + t.plan(1); + const source = new Readable(); + const catProcess = cp.exec("cat"); + let out = ""; + source + .pipe(duplex(catProcess.stdin!, catProcess.stdout!)) + .on("data", chunk => (out += chunk)) + .on("error", t.end) + .on("end", () => { + expect(out).to.equal("abcdef"); + t.pass(); + t.end(); + }); + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); + }, +); diff --git a/src/functions/duplex/index.ts b/src/functions/duplex/index.ts index 2470da1..b1e967a 100644 --- a/src/functions/duplex/index.ts +++ b/src/functions/duplex/index.ts @@ -1,11 +1,14 @@ -import { Duplex, Writable, Readable } from "stream"; +import { Duplex } from "stream"; /** * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, * cause the given readable stream to yield chunks * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to * @param readable Readable stream assumed to yield chunks when the writable stream is written to */ -export function duplex(writable: Writable, readable: Readable) { +export function duplex( + writable: NodeJS.WritableStream, + readable: NodeJS.ReadableStream, +) { const wrapper = new Duplex({ readableObjectMode: true, writableObjectMode: true, diff --git a/src/functions/flatMap/index.ts b/src/functions/flatMap/index.ts index 32244a2..3497ca7 100644 --- a/src/functions/flatMap/index.ts +++ b/src/functions/flatMap/index.ts @@ -15,7 +15,7 @@ export function flatMap( readableObjectMode: true, writableObjectMode: true, }, -): NodeJS.ReadWriteStream { +): Transform { return new Transform({ ...options, async transform(chunk: T, encoding, callback) { diff --git a/src/functions/fromArray/index.ts b/src/functions/fromArray/index.ts index f92654e..54e01a9 100644 --- a/src/functions/fromArray/index.ts +++ b/src/functions/fromArray/index.ts @@ -3,7 +3,7 @@ import { Readable } from "stream"; * Convert an array into a Readable stream of its elements * @param array Array of elements to stream */ -export function fromArray(array: any[]): NodeJS.ReadableStream { +export function fromArray(array: any[]): Readable { let cursor = 0; return new Readable({ objectMode: true, diff --git a/src/functions/index.ts b/src/functions/index.ts index 2100353..247400f 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -1,4 +1,4 @@ -import { Readable, Writable, Transform } from "stream"; +import { Readable, Writable, Transform, Duplex } from "stream"; import { ChildProcess } from "child_process"; import * as baseFunctions from "./baseFunctions"; @@ -7,15 +7,18 @@ import { TransformOptions, WithEncoding, JsonParseOptions, +} from "./baseDefinitions"; + +import { FlushStrategy, AccumulatorByIteratee, -} from "./definitions"; +} from "./accumulator/definitions"; /** * Convert an array into a Readable stream of its elements * @param array Array of elements to stream */ -export function fromArray(array: any[]): NodeJS.ReadableStream { +export function fromArray(array: any[]): Readable { return baseFunctions.fromArray(array); } @@ -45,7 +48,7 @@ export function flatMap( | ((chunk: T, encoding: string) => R[]) | ((chunk: T, encoding: string) => Promise), options?: TransformOptions, -): NodeJS.ReadWriteStream { +): Transform { return baseFunctions.flatMap(mapper, options); } @@ -60,7 +63,7 @@ export function filter( | ((chunk: T, encoding: string) => boolean) | ((chunk: T, encoding: string) => Promise), options?: ThroughOptions, -): NodeJS.ReadWriteStream { +): Transform { return baseFunctions.filter(mapper, options); } @@ -79,7 +82,7 @@ export function reduce( | ((previousValue: R, chunk: T, encoding: string) => Promise), initialValue: R, options?: TransformOptions, -): NodeJS.ReadWriteStream { +): Transform { return baseFunctions.reduce(iteratee, initialValue, options); } @@ -92,7 +95,7 @@ export function reduce( export function split( separator?: string | RegExp, options?: WithEncoding, -): NodeJS.ReadWriteStream { +): Transform { return baseFunctions.split(separator, options); } @@ -102,10 +105,7 @@ export function split( * @param options? Defaults to encoding: utf8 * @param options.encoding? Encoding written chunks are assumed to use */ -export function join( - separator: string, - options?: WithEncoding, -): NodeJS.ReadWriteStream { +export function join(separator: string, options?: WithEncoding): Transform { return baseFunctions.join(separator, options); } @@ -121,7 +121,7 @@ export function replace( searchValue: string | RegExp, replaceValue: string, options?: WithEncoding, -): NodeJS.ReadWriteStream { +): Transform { return baseFunctions.replace(searchValue, replaceValue, options); } @@ -129,7 +129,7 @@ export function replace( * Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk * must be a fully defined JSON string in utf8. */ -export function parse(): NodeJS.ReadWriteStream { +export function parse(): Transform { return baseFunctions.parse(); } @@ -139,7 +139,7 @@ export function parse(): NodeJS.ReadWriteStream { * @param options.pretty If true, whitespace is inserted into the stringified chunks. * */ -export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream { +export function stringify(options?: JsonParseOptions): Transform { return baseFunctions.stringify(options); } @@ -148,7 +148,7 @@ export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream { * @param options? * @param options.objectMode? Whether this stream should behave as a stream of objects */ -export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream { +export function collect(options?: ThroughOptions): Transform { return baseFunctions.collect(options); } @@ -156,9 +156,7 @@ export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream { * Return a Readable stream of readable streams concatenated together * @param streams Readable streams to concatenate */ -export function concat( - ...streams: NodeJS.ReadableStream[] -): NodeJS.ReadableStream { +export function concat(...streams: Readable[]): Readable { return baseFunctions.concat(...streams); } @@ -166,9 +164,7 @@ export function concat( * Return a Readable stream of readable streams concatenated together * @param streams Readable streams to merge */ -export function merge( - ...streams: NodeJS.ReadableStream[] -): NodeJS.ReadableStream { +export function merge(...streams: Readable[]): Readable { return baseFunctions.merge(...streams); } @@ -178,10 +174,7 @@ export function merge( * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to * @param readable Readable stream assumed to yield chunks when the writable stream is written to */ -export function duplex( - writable: Writable, - readable: Readable, -): NodeJS.ReadWriteStream { +export function duplex(writable: Writable, readable: Readable): Duplex { return baseFunctions.duplex(writable, readable); } @@ -189,7 +182,7 @@ export function duplex( * Return a Duplex stream from a child process' stdin and stdout * @param childProcess Child process from which to create duplex stream */ -export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream { +export function child(childProcess: ChildProcess): Duplex { return baseFunctions.child(childProcess); } @@ -214,7 +207,7 @@ export function batch(batchSize: number, maxBatchAge?: number): Transform { /** * Unbatches and sends individual chunks of data */ -export function unbatch(): NodeJS.ReadWriteStream { +export function unbatch(): Transform { return baseFunctions.unbatch(); } @@ -224,10 +217,7 @@ export function unbatch(): NodeJS.ReadWriteStream { * @param targetRate? Desired rate in ms * @param period? Period to sleep for when rate is above or equal to targetRate */ -export function rate( - targetRate?: number, - period?: number, -): NodeJS.ReadWriteStream { +export function rate(targetRate?: number, period?: number): Transform { return baseFunctions.rate(targetRate, period); } diff --git a/src/functions/join/index.ts b/src/functions/join/index.ts index 8c7352b..d3772c0 100644 --- a/src/functions/join/index.ts +++ b/src/functions/join/index.ts @@ -10,7 +10,7 @@ import { WithEncoding } from "../baseDefinitions"; export function join( separator: string, options: WithEncoding = { encoding: "utf8" }, -): NodeJS.ReadWriteStream { +): Transform { let isFirstChunk = true; const decoder = new StringDecoder(options.encoding); return new Transform({ diff --git a/src/functions/last/index.ts b/src/functions/last/index.ts index baf7440..98422a7 100644 --- a/src/functions/last/index.ts +++ b/src/functions/last/index.ts @@ -1,10 +1,9 @@ -import { Readable } from "stream"; /** * Return a Promise resolving to the last streamed chunk of the given readable stream, after it has * ended * @param readable Readable stream to wait on */ -export function last(readable: Readable): Promise { +export function last(readable: NodeJS.ReadableStream): Promise { let lastChunk: T | null = null; return new Promise((resolve, _) => { readable diff --git a/src/functions/last/last.spec.ts b/src/functions/last/last.spec.ts new file mode 100644 index 0000000..5bb0338 --- /dev/null +++ b/src/functions/last/last.spec.ts @@ -0,0 +1,15 @@ +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { last } from "../baseFunctions"; + +test("last() resolves to the last chunk streamed by the given readable stream", async t => { + const source = new Readable({ objectMode: true }); + const lastPromise = last(source); + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); + const lastChunk = await lastPromise; + expect(lastChunk).to.equal("ef"); +}); diff --git a/src/functions/merge/merge.spec.ts b/src/functions/merge/merge.spec.ts new file mode 100644 index 0000000..84a8dca --- /dev/null +++ b/src/functions/merge/merge.spec.ts @@ -0,0 +1,60 @@ +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { merge } from "../baseFunctions"; + +test.cb( + "merge() merges multiple readable streams in chunk arrival order", + t => { + t.plan(6); + const source1 = new Readable({ objectMode: true, read: () => ({}) }); + const source2 = new Readable({ objectMode: true, read: () => ({}) }); + const expectedElements = ["a", "d", "b", "e", "c", "f"]; + let i = 0; + merge(source1, source2) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source1.push("a"); + setTimeout(() => source2.push("d"), 10); + setTimeout(() => source1.push("b"), 20); + setTimeout(() => source2.push("e"), 30); + setTimeout(() => source1.push("c"), 40); + setTimeout(() => source2.push("f"), 50); + setTimeout(() => source2.push(null), 60); + setTimeout(() => source1.push(null), 70); + }, +); + +test.cb("merge() merges a readable stream", t => { + t.plan(3); + const source = new Readable({ objectMode: true, read: () => ({}) }); + const expectedElements = ["a", "b", "c"]; + let i = 0; + merge(source) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +}); + +test.cb("merge() merges an empty list of readable streams", t => { + t.plan(0); + merge() + .on("data", () => t.pass()) + .on("error", t.end) + .on("end", t.end); +}); diff --git a/src/functions/parallelMap/parallelMap.spec.ts b/src/functions/parallelMap/parallelMap.spec.ts new file mode 100644 index 0000000..9ae4d37 --- /dev/null +++ b/src/functions/parallelMap/parallelMap.spec.ts @@ -0,0 +1,77 @@ +import { Readable } from "stream"; +import { performance } from "perf_hooks"; +import test from "ava"; +import { expect } from "chai"; +import { parallelMap } from "../baseFunctions"; +import { sleep } from "../../helpers"; + +test.cb("parallelMap() parallel mapping", t => { + t.plan(6); + const offset = 50; + const source = new Readable({ objectMode: true }); + const expectedElements = [ + "a_processed", + "b_processed", + "c_processed", + "d_processed", + "e_processed", + "f_processed", + ]; + interface IPerfData { + start: number; + output?: string; + finish?: number; + } + const orderedResults: IPerfData[] = []; + source + .pipe( + parallelMap(async (data: any) => { + const perfData: IPerfData = { start: performance.now() }; + const c = data + "_processed"; + perfData.output = c; + await sleep(offset); + perfData.finish = performance.now(); + orderedResults.push(perfData); + return c; + }, 2), + ) + .on("data", (element: string) => { + t.true(expectedElements.includes(element)); + }) + .on("error", t.end) + .on("end", async () => { + expect(orderedResults[0].finish).to.be.lessThan( + orderedResults[2].start, + ); + expect(orderedResults[1].finish).to.be.lessThan( + orderedResults[3].start, + ); + expect(orderedResults[2].finish).to.be.lessThan( + orderedResults[4].start, + ); + expect(orderedResults[3].finish).to.be.lessThan( + orderedResults[5].start, + ); + expect(orderedResults[0].start).to.be.lessThan( + orderedResults[2].start + offset, + ); + expect(orderedResults[1].start).to.be.lessThan( + orderedResults[3].start + offset, + ); + expect(orderedResults[2].start).to.be.lessThan( + orderedResults[4].start + offset, + ); + expect(orderedResults[3].start).to.be.lessThan( + orderedResults[5].start + offset, + ); + t.end(); + }); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push("d"); + source.push("e"); + source.push("f"); + source.push(null); +}); diff --git a/src/functions/parse/index.ts b/src/functions/parse/index.ts index d6ac299..3bc9d69 100644 --- a/src/functions/parse/index.ts +++ b/src/functions/parse/index.ts @@ -8,7 +8,7 @@ import { SerializationFormats } from "../baseDefinitions"; */ export function parse( format: SerializationFormats = SerializationFormats.utf8, -): NodeJS.ReadWriteStream { +): Transform { const decoder = new StringDecoder(format); return new Transform({ readableObjectMode: true, diff --git a/src/functions/parse/parse.spec.ts b/src/functions/parse/parse.spec.ts new file mode 100644 index 0000000..0f17b53 --- /dev/null +++ b/src/functions/parse/parse.spec.ts @@ -0,0 +1,40 @@ +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { parse } from "../baseFunctions"; + +test.cb("parse() parses the streamed elements as JSON", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["abc", {}, []]; + let i = 0; + source + .pipe(parse()) + .on("data", part => { + expect(part).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push('"abc"'); + source.push("{}"); + source.push("[]"); + source.push(null); +}); + +test.cb("parse() emits errors on invalid JSON", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe(parse()) + .resume() + .on("error", () => t.pass()) + .on("end", t.end); + + source.push("{}"); + source.push({}); + source.push([]); + source.push(null); +}); diff --git a/src/functions/rate/index.ts b/src/functions/rate/index.ts index febcc1e..b199efd 100644 --- a/src/functions/rate/index.ts +++ b/src/functions/rate/index.ts @@ -14,11 +14,12 @@ export function rate( readableObjectMode: true, writableObjectMode: true, }, -) { +): Transform { const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period let total = 0; const start = performance.now(); return new Transform({ + ...options, async transform(data, encoding, callback) { const currentRate = (total / (performance.now() - start)) * 1000; if (targetRate && currentRate > targetRate) { diff --git a/src/functions/rate/rate.spec.ts b/src/functions/rate/rate.spec.ts new file mode 100644 index 0000000..a88d179 --- /dev/null +++ b/src/functions/rate/rate.spec.ts @@ -0,0 +1,67 @@ +import { Readable } from "stream"; +import { performance } from "perf_hooks"; +import test from "ava"; +import { expect } from "chai"; +import { rate } from "../baseFunctions"; + +test.cb("rate() sends data at desired rate", t => { + t.plan(9); + const fastRate = 150; + const medRate = 50; + const slowRate = 1; + const sourceFast = new Readable({ objectMode: true }); + const sourceMed = new Readable({ objectMode: true }); + const sourceSlow = new Readable({ objectMode: true }); + const expectedElements = ["a", "b", "c"]; + const start = performance.now(); + let i = 0; + let j = 0; + let k = 0; + + sourceFast + .pipe(rate(fastRate)) + .on("data", (element: string[]) => { + const currentRate = (i / (performance.now() - start)) * 1000; + expect(element).to.deep.equal(expectedElements[i]); + expect(currentRate).lessThan(fastRate); + t.pass(); + i++; + }) + .on("error", t.end); + + sourceMed + .pipe(rate(medRate)) + .on("data", (element: string[]) => { + const currentRate = (j / (performance.now() - start)) * 1000; + expect(element).to.deep.equal(expectedElements[j]); + expect(currentRate).lessThan(medRate); + t.pass(); + j++; + }) + .on("error", t.end); + + sourceSlow + .pipe(rate(slowRate, 1)) + .on("data", (element: string[]) => { + const currentRate = (k / (performance.now() - start)) * 1000; + expect(element).to.deep.equal(expectedElements[k]); + expect(currentRate).lessThan(slowRate); + t.pass(); + k++; + }) + .on("error", t.end) + .on("end", t.end); + + sourceFast.push("a"); + sourceFast.push("b"); + sourceFast.push("c"); + sourceFast.push(null); + sourceMed.push("a"); + sourceMed.push("b"); + sourceMed.push("c"); + sourceMed.push(null); + sourceSlow.push("a"); + sourceSlow.push("b"); + sourceSlow.push("c"); + sourceSlow.push(null); +}); diff --git a/src/functions/replace/index.ts b/src/functions/replace/index.ts index c31f369..4726a35 100644 --- a/src/functions/replace/index.ts +++ b/src/functions/replace/index.ts @@ -13,7 +13,7 @@ export function replace( searchValue: string | RegExp, replaceValue: string, options: WithEncoding = { encoding: "utf8" }, -): NodeJS.ReadWriteStream { +): Transform { const decoder = new StringDecoder(options.encoding); return new Transform({ readableObjectMode: true, diff --git a/src/functions/split/index.ts b/src/functions/split/index.ts index a031c8c..4ae3e4e 100644 --- a/src/functions/split/index.ts +++ b/src/functions/split/index.ts @@ -10,7 +10,7 @@ import { WithEncoding } from "../baseDefinitions"; export function split( separator: string | RegExp = "\n", options: WithEncoding = { encoding: "utf8" }, -): NodeJS.ReadWriteStream { +): Transform { let buffered = ""; const decoder = new StringDecoder(options.encoding); diff --git a/src/functions/stringify/index.ts b/src/functions/stringify/index.ts index 5b476af..aaa5918 100644 --- a/src/functions/stringify/index.ts +++ b/src/functions/stringify/index.ts @@ -6,7 +6,7 @@ import { JsonValue, JsonParseOptions } from "../baseDefinitions"; */ export function stringify( options: JsonParseOptions = { pretty: false }, -): NodeJS.ReadWriteStream { +): Transform { return new Transform({ readableObjectMode: true, writableObjectMode: true, diff --git a/src/functions/stringify/stringify.spec.ts b/src/functions/stringify/stringify.spec.ts new file mode 100644 index 0000000..1569ec1 --- /dev/null +++ b/src/functions/stringify/stringify.spec.ts @@ -0,0 +1,61 @@ +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { stringify } from "../baseFunctions"; + +test.cb("stringify() stringifies the streamed elements as JSON", t => { + t.plan(4); + const source = new Readable({ objectMode: true }); + const expectedElements = [ + '"abc"', + "0", + '{"a":"a","b":"b","c":"c"}', + '["a","b","c"]', + ]; + let i = 0; + source + .pipe(stringify()) + .on("data", part => { + expect(part).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("abc"); + source.push(0); + source.push({ a: "a", b: "b", c: "c" }); + source.push(["a", "b", "c"]); + source.push(null); +}); + +test.cb( + "stringify() stringifies the streamed elements as pretty-printed JSON", + t => { + t.plan(4); + const source = new Readable({ objectMode: true }); + const expectedElements = [ + '"abc"', + "0", + '{\n "a": "a",\n "b": "b",\n "c": "c"\n}', + '[\n "a",\n "b",\n "c"\n]', + ]; + let i = 0; + source + .pipe(stringify({ pretty: true })) + .on("data", part => { + expect(part).to.deep.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("abc"); + source.push(0); + source.push({ a: "a", b: "b", c: "c" }); + source.push(["a", "b", "c"]); + source.push(null); + }, +); diff --git a/src/functions/unbatch/unbatch.spec.ts b/src/functions/unbatch/unbatch.spec.ts new file mode 100644 index 0000000..6a99a05 --- /dev/null +++ b/src/functions/unbatch/unbatch.spec.ts @@ -0,0 +1,26 @@ +import { Readable } from "stream"; +import test from "ava"; +import { expect } from "chai"; +import { unbatch, batch } from "../baseFunctions"; + +test.cb("unbatch() unbatches", t => { + t.plan(3); + const source = new Readable({ objectMode: true }); + const expectedElements = ["a", "b", "c"]; + let i = 0; + source + .pipe(batch(3)) + .pipe(unbatch()) + .on("data", (element: string) => { + expect(element).to.equal(expectedElements[i]); + t.pass(); + i++; + }) + .on("error", t.end) + .on("end", t.end); + + source.push("a"); + source.push("b"); + source.push("c"); + source.push(null); +});