From 1d0e15890cf346e36abf999362629f89aa43f7f8 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 22 Aug 2019 14:52:39 -0400 Subject: [PATCH] Tests --- src/functions/baseFunctions.ts | 2 +- src/functions/compose.ts | 19 +++----- src/functions/index.ts | 6 --- src/index.ts | 1 - tests/compose.spec.ts | 88 ++++++++++++++++++++-------------- 5 files changed, 59 insertions(+), 57 deletions(-) diff --git a/src/functions/baseFunctions.ts b/src/functions/baseFunctions.ts index 6bf75a6..005aa3a 100644 --- a/src/functions/baseFunctions.ts +++ b/src/functions/baseFunctions.ts @@ -19,4 +19,4 @@ export { replace } from "./replace"; export { split } from "./split"; export { stringify } from "./stringify"; export { unbatch } from "./unbatch"; -export { compose, composeDuplex } from "./compose"; +export { compose } from "./compose"; diff --git a/src/functions/compose.ts b/src/functions/compose.ts index 8d21283..96008ea 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -2,7 +2,6 @@ import { pipeline, Transform, Writable, - Pipe, WritableOptions, Readable, Duplex, @@ -12,7 +11,6 @@ import { * Return a Readable stream of readable streams concatenated together * @param streams Readable streams to concatenate */ - // First Readable --> Readable // First Transform | Duplex, Last Writable --> Writable // @@ -22,26 +20,23 @@ export function compose( ): Duplex { // Maybe just return a new stream here if (streams.length < 2) { - throw new Error("Not enough"); + throw new Error("At least two streams are required to compose"); } + const first = streams[0] as Writable; + const last = streams[streams.length - 1] as Readable; const duplex = new Duplex({ - objectMode: true, + ...options, write(chunk, enc, cb) { - const first = streams[0] as Writable; if (!first.write(chunk)) { - first.on("drain", cb); + first.once("drain", cb); } else { cb(); } }, read(size) { - let chunk; - while ( - null !== - (chunk = (streams[streams.length - 1] as Readable).read()) - ) { - this.push(chunk); + if (last.readable) { + this.push(last.read(size)); } }, }); diff --git a/src/functions/index.ts b/src/functions/index.ts index 9d08eba..ceba200 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -297,9 +297,3 @@ export function compose( options, ); } -export function composeDuplex( - streams: Array, - options?: WritableOptions, -) { - return baseFunctions.composeDuplex(streams, options); -} diff --git a/src/index.ts b/src/index.ts index d6b784c..98b0a45 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,5 +22,4 @@ export { accumulator, accumulatorBy, compose, - composeDuplex, } from "./functions"; diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index 9fa8d82..ed2304b 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -1,54 +1,68 @@ const test = require("ava"); +const { expect } = require("chai"); const { compose, composeDuplex, map } = require("../src"); -test.cb("compose()", t => { - const first = map((chunk: number) => chunk + "x"); - const second = map((chunk: number) => chunk + "y"); +test.cb("compose() chains two streams together in the correct order", t => { + t.plan(3); + let i = 0; + const first = map((chunk: number) => chunk + 1); + const second = map((chunk: number) => chunk * 2); const composed = compose( [first, second], { objectMode: true }, ); - const third = map((chunk: number) => chunk + "z"); - composed - .pipe(third) - .on("data", data => console.log("Piped composed: ", data)); composed.on("data", data => { - console.log("data on composed", data); + expect(data).to.equal(result[i]); + t.pass(); + i++; + if (i === 3) { + t.end(); + } + }); + composed.on("error", err => { + t.end(err); + }); + composed.on("end", () => { t.end(); }); - composed.on("error", data => { - console.log("ERROR", data); - }); - composed.on("end", data => { - console.log("end", data); - }); - composed.write(1); - composed.write(2); + const input = [1, 2, 3]; + const result = [4, 6, 8]; + + input.forEach(item => composed.write(item)); }); -test.cb.only("composeDuplex()", t => { - const first = map((chunk: number) => chunk + "x"); - const second = map((chunk: number) => chunk + "y"); - const composed = composeDuplex([first, second], { objectMode: true }); - const third = map((chunk: number) => chunk + "z"); - // composed - // .pipe(third) - // .on("data", data => console.log("Piped composed: ", data)); +test.cb( + "compose() followed by pipe chains streams together in the correct order", + t => { + t.plan(3); + let i = 0; + const first = map((chunk: number) => chunk + 1); + const second = map((chunk: number) => chunk * 2); - composed.on("data", data => { - console.log("data on composed", data); - t.end(); - }); - composed.on("error", data => { - console.log("ERROR", data); - }); - composed.on("end", data => { - console.log("end", data); - }); + const composed = compose( + [first, second], + { objectMode: true }, + ); + const third = map((chunk: number) => chunk + 1); + composed.pipe(third).on("data", data => { + expect(data).to.equal(result[i]); + t.pass(); + i++; + if (i === 3) { + t.end(); + } + }); - composed.write(1); - composed.write(2); -}); + composed.on("error", err => { + t.end(err); + }); + + const input = [1, 2, 3]; + const result = [5, 7, 9]; + + input.forEach(item => composed.write(item)); + }, +);