From 9765e6cb49fee9636ba0cdf1c1343206836a8cc7 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 29 Aug 2019 08:50:11 -0400 Subject: [PATCH] Update tests to write to sink --- tests/demux.spec.ts | 107 +++++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index baf29aa..dcd8ad0 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -1,6 +1,7 @@ import test from "ava"; import { expect } from "chai"; import { demux, map } from "../src"; +import { Writable } from "stream"; interface Test { key: string; @@ -10,45 +11,48 @@ test.cb("should spread per key", t => { t.plan(5); const input = [ { key: "a", val: 1 }, - { key: "a", val: 2 }, - { key: "b", val: 3 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, { key: "c", val: 4 }, ]; const results = [ { key: "a", val: 2 }, - { key: "a", val: 3 }, - { key: "b", val: 4 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, { key: "c", val: 5 }, ]; - const destKeys = []; - const dests = []; + const destinationStreamKeys = []; let i = 0; - + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + i++; + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + if (i === 4) { + t.end(); + } + }, + }); const construct = (destKey: string) => { - destKeys.push(destKey); - const dest = map((chunk: Test) => ({ - ...chunk, - val: chunk.val + 1, - })) - .on("data", (d: Test) => { - expect(results).to.deep.include(d); - t.pass(); - }) - .on("end", () => { - i++; - if (i === dests.length) { - t.end(); - } - }); - dests.push(dest); + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + return { + ...chunk, + val: chunk.val + 1, + }; + }); + + dest.pipe(sink); return dest; }; const demuxed = demux(construct, { key: "key" }, { objectMode: true }); demuxed.on("finish", () => { - expect(destKeys).to.deep.equal(["a", "b", "c"]); + expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); t.pass(); - dests.forEach(dest => dest.end()); }); input.forEach(event => demuxed.write(event)); @@ -59,37 +63,41 @@ test.cb("should spread per key using keyBy", t => { t.plan(5); const input = [ { key: "a", val: 1 }, - { key: "a", val: 2 }, - { key: "b", val: 3 }, + { key: "b", val: 2 }, + { key: "a", val: 3 }, { key: "c", val: 4 }, ]; const results = [ { key: "a", val: 2 }, - { key: "a", val: 3 }, - { key: "b", val: 4 }, + { key: "b", val: 3 }, + { key: "a", val: 4 }, { key: "c", val: 5 }, ]; - const destKeys = []; - const dests = []; + const destinationStreamKeys = []; let i = 0; - + const sink = new Writable({ + objectMode: true, + write(chunk, enc, cb) { + i++; + expect(results).to.deep.include(chunk); + expect(input).to.not.deep.include(chunk); + t.pass(); + cb(); + if (i === 4) { + t.end(); + } + }, + }); const construct = (destKey: string) => { - destKeys.push(destKey); - const dest = map((chunk: Test) => ({ - ...chunk, - val: chunk.val + 1, - })) - .on("data", (d: Test) => { - expect(results).to.deep.include(d); - t.pass(); - }) - .on("end", () => { - i++; - if (i === dests.length) { - t.end(); - } - }); - dests.push(dest); + destinationStreamKeys.push(destKey); + const dest = map((chunk: Test) => { + return { + ...chunk, + val: chunk.val + 1, + }; + }); + + dest.pipe(sink); return dest; }; @@ -99,9 +107,8 @@ test.cb("should spread per key using keyBy", t => { { objectMode: true }, ); demuxed.on("finish", () => { - expect(destKeys).to.deep.equal(["a", "b", "c"]); + expect(destinationStreamKeys).to.deep.equal(["a", "b", "c"]); t.pass(); - dests.forEach(dest => dest.end()); }); input.forEach(event => demuxed.write(event));