From ed73bd28871c4c1f44fceda72a804a8c216a133d Mon Sep 17 00:00:00 2001 From: Lewis Diamond Date: Mon, 2 Mar 2020 10:07:20 -0500 Subject: [PATCH] Adding collected --- package.json | 7 +++--- src/functions/batch.ts | 4 +++- src/functions/compose.ts | 6 ++--- src/index.ts | 4 ++++ src/utils/collected.ts | 12 ++++++++++ src/utils/index.ts | 1 + tests/batch.spec.ts | 27 ++++++++++++++++++++- tests/compose.spec.ts | 45 +++++++---------------------------- tests/utils/collected.spec.ts | 9 +++++++ yarn.lock | 10 ++++---- 10 files changed, 75 insertions(+), 50 deletions(-) create mode 100644 src/utils/collected.ts create mode 100644 src/utils/index.ts create mode 100644 tests/utils/collected.spec.ts diff --git a/package.json b/package.json index b72e75e..c0ec76b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@jogogo/mhysa", - "version": "2.0.0-alpha.1", + "version": "2.0.0-alpha.2", "description": "Streams and event emitter utils for Node.js", "keywords": [ "promise", @@ -43,7 +43,7 @@ "dependencies": {}, "devDependencies": { "@types/chai": "^4.1.7", - "@types/node": "^12.7.2", + "@types/node": "^12.12.15", "@types/sinon": "^7.0.13", "ava": "^2.4.0", "chai": "^4.2.0", @@ -58,7 +58,8 @@ }, "ava": { "files": [ - "tests/*.spec.ts" + "tests/*.spec.ts", + "tests/utils/*.spec.ts" ], "sources": [ "src/**/*.ts" diff --git a/src/functions/batch.ts b/src/functions/batch.ts index 661301d..1909c16 100644 --- a/src/functions/batch.ts +++ b/src/functions/batch.ts @@ -12,7 +12,9 @@ export function batch( clearTimeout(timer); } timer = null; - self.push(buffer); + if (buffer.length > 0) { + self.push(buffer); + } buffer = []; }; return new Transform({ diff --git a/src/functions/compose.ts b/src/functions/compose.ts index 64789ef..4052a0a 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -1,5 +1,5 @@ -import { pipeline, TransformOptions, Transform } from "stream"; import { AllStreams, isReadable } from "../helpers"; +import { PassThrough, pipeline, TransformOptions, Transform } from "stream"; export function compose( streams: Array< @@ -34,11 +34,11 @@ export class Compose extends Transform { options?: TransformOptions, ) { super(options); - this.first = streams[0]; + this.first = new PassThrough(options); this.last = streams[streams.length - 1]; this.streams = streams; pipeline( - streams, + [this.first, ...streams], errorCallback || ((error: any) => { if (error) { diff --git a/src/index.ts b/src/index.ts index b06e998..35da6da 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,6 @@ import mhysa from "./functions"; +import * as _utils from "./utils"; export default mhysa; + +// @TODO fix this with proper import export +export const utils = { ..._utils }; diff --git a/src/utils/collected.ts b/src/utils/collected.ts new file mode 100644 index 0000000..b1fb40e --- /dev/null +++ b/src/utils/collected.ts @@ -0,0 +1,12 @@ +import { Transform } from "stream"; + +export function collected(stream: Transform): any { + return new Promise((resolve, reject) => { + stream.once("data", d => { + resolve(d); + }); + stream.once("error", e => { + reject(e); + }); + }); +} diff --git a/src/utils/index.ts b/src/utils/index.ts new file mode 100644 index 0000000..5f809ed --- /dev/null +++ b/src/utils/index.ts @@ -0,0 +1 @@ +export { collected } from "./collected"; diff --git a/tests/batch.spec.ts b/tests/batch.spec.ts index d4a1ffe..4ca3e08 100644 --- a/tests/batch.spec.ts +++ b/tests/batch.spec.ts @@ -2,7 +2,7 @@ import { Readable } from "stream"; import test from "ava"; import { expect } from "chai"; import mhysa from "../src"; -const { batch } = mhysa({ objectMode: true }); +const { batch, map, fromArray } = mhysa({ objectMode: true }); test.cb("batch() batches chunks together", t => { t.plan(3); @@ -57,3 +57,28 @@ test.cb("batch() yields a batch after the timeout", t => { source.push(null); }, 600 * 2); }); + +test.cb( + "batch() yields all input data even when the last element(s) dont make a full batch", + t => { + const data = [1, 2, 3, 4, 5, 6, 7]; + + fromArray([...data]) + .pipe(batch(3)) + .pipe( + map(d => { + t.deepEqual( + d, + [data.shift(), data.shift(), data.shift()].filter( + x => !!x, + ), + ); + }), + ) + .on("error", t.fail) + .on("finish", () => { + t.is(data.length, 0); + t.end(); + }); + }, +); diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 508c112..1cbabc1 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -4,7 +4,7 @@ import { sleep } from "../src/helpers"; import { Readable, Writable } from "stream"; import mhysa from "../src"; import { performance } from "perf_hooks"; -const { compose, map } = mhysa({ objectMode: true }); +const { compose, map, fromArray } = mhysa({ objectMode: true }); test.cb("compose() chains two streams together in the correct order", t => { t.plan(3); @@ -98,7 +98,7 @@ test.cb("piping compose() maintains correct order", t => { }); test("compose() writable length should be less than highWaterMark when handing writes", async t => { - t.plan(7); + t.plan(2); return new Promise(async (resolve, reject) => { interface Chunk { key: string; @@ -144,19 +144,12 @@ test("compose() writable length should be less than highWaterMark when handing w { key: "e", mapped: [] }, ]; - for (const item of input) { - const res = composed.write(item); - expect(composed._writableState.length).to.be.at.most(2); - t.pass(); - if (!res) { - await sleep(10); - } - } + fromArray(input).pipe(composed); }); }); test("compose() should emit drain event ~rate * highWaterMark ms for every write that causes backpressure", async t => { - t.plan(7); + t.plan(2); const _rate = 100; const highWaterMark = 2; return new Promise(async (resolve, reject) => { @@ -189,19 +182,14 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write composed.on("drain", () => { t.pass(); expect(composed._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.closeTo( - _rate * highWaterMark, - 40, - ); }); composed.on("data", (chunk: Chunk) => { - pendingReads--; - if (pendingReads === 0) { - resolve(); - } + t.deepEqual(chunk.mapped, [1, 2]); }); + composed.on("finish", () => resolve()); + const input = [ { key: "a", mapped: [] }, { key: "b", mapped: [] }, @@ -209,19 +197,7 @@ test("compose() should emit drain event ~rate * highWaterMark ms for every write { key: "d", mapped: [] }, { key: "e", mapped: [] }, ]; - - let start = performance.now(); - let pendingReads = input.length; - start = performance.now(); - for (const item of input) { - const res = composed.write(item); - expect(composed._writableState.length).to.be.at.most(highWaterMark); - t.pass(); - if (!res) { - await sleep(_rate * highWaterMark * 2); - start = performance.now(); - } - } + fromArray(input).pipe(composed); }); }); @@ -259,10 +235,6 @@ test.cb( composed.on("drain", () => { expect(composed._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.closeTo( - _rate * input.length, - 50, - ); t.pass(); }); @@ -283,7 +255,6 @@ test.cb( input.forEach(item => { composed.write(item); }); - const start = performance.now(); }, ); diff --git a/tests/utils/collected.spec.ts b/tests/utils/collected.spec.ts new file mode 100644 index 0000000..583427c --- /dev/null +++ b/tests/utils/collected.spec.ts @@ -0,0 +1,9 @@ +import test from "ava"; +import { collected } from "../../src/utils"; +import mhysa from "../../src"; +const { fromArray, collect } = mhysa({ objectMode: true }); + +test("collected returns a promise for the first data point", async t => { + const data = collected(fromArray([1, 2, 3, 4]).pipe(collect())); + t.deepEqual(await data, [1, 2, 3, 4]); +}); diff --git a/yarn.lock b/yarn.lock index b62296d..36ebbb8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -409,10 +409,10 @@ resolved "https://registry.yarnpkg.com/@types/node/-/node-12.7.2.tgz#c4e63af5e8823ce9cc3f0b34f7b998c2171f0c44" integrity sha512-dyYO+f6ihZEtNPDcWNR1fkoTDf3zAK3lAABDze3mz6POyIercH0lEUawUFXlG8xaQZmm1yEBON/4TsYv/laDYg== -"@types/node@^12.7.2": - version "12.12.14" - resolved "https://npm.dev.jogogo.co/@types%2fnode/-/node-12.12.14.tgz#1c1d6e3c75dba466e0326948d56e8bd72a1903d2" - integrity sha512-u/SJDyXwuihpwjXy7hOOghagLEV1KdAST6syfnOk6QZAMzZuWZqXy5aYYZbh8Jdpd4escVFP0MvftHNDb9pruA== +"@types/node@^12.12.15": + version "12.12.15" + resolved "https://npm.dev.jogogo.co/@types%2fnode/-/node-12.12.15.tgz#8dfb6ce22fedd469128137640a3aa8f17415422f" + integrity sha512-Pv+vWicyFd07Hw/SmNnTUguqrHgDfMtjabvD9sQyxeqbpCEg8CmViLBaVPHtNsoBgZECrRf5/pgV6FJIBrGSjw== "@types/sinon@^7.0.13": version "7.5.1" @@ -1935,7 +1935,7 @@ merge2@^1.2.3, merge2@^1.3.0: integrity sha512-2j4DAdlBOkiSZIsaXk4mTE3sRS02yBHAtfy127xRV3bQUFqXkjHCHLW6Scv7DwNRbIWNHH8zpnz9zMaKXIdvYw== mhysa@./: - version "0.0.1-beta.4" + version "2.0.0-alpha.1" micromatch@^4.0.2: version "4.0.2"