diff --git a/src/functions/compose.ts b/src/functions/compose.ts index 7a4fdb4..195598e 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -40,7 +40,6 @@ type AllStreams = | NodeJS.WritableStream; export class Compose extends Duplex { - public writable: boolean; private first: AllStreams; private last: AllStreams; private streams: AllStreams[]; diff --git a/src/functions/map.ts b/src/functions/map.ts index 5848ca5..c088c72 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -16,7 +16,6 @@ export function map( const mapped = await mapper(chunk, encoding); callback(null, mapped); } catch (err) { - console.log("caught error", err.message); callback(err); } }, diff --git a/tests/compose.spec.ts b/tests/compose.spec.ts index acc6e2e..de7603b 100644 --- a/tests/compose.spec.ts +++ b/tests/compose.spec.ts @@ -272,6 +272,7 @@ test.cb( [first, second], { objectMode: true, highWaterMark: 5 }, ); + composed.on("error", err => { t.end(err); }); @@ -309,12 +310,13 @@ test.cb( "compose() should emit drain event immediately when second stream is bottleneck", t => { t.plan(6); + const _rate = 200; interface Chunk { key: string; mapped: number[]; } const first = map( - async (chunk: Chunk) => { + (chunk: Chunk) => { chunk.mapped.push(1); return chunk; }, @@ -326,10 +328,11 @@ test.cb( const second = map( async (chunk: Chunk) => { pendingReads--; - await sleep(200); + await sleep(_rate); expect(second._writableState.length).to.be.equal(1); expect(first._readableState.length).to.equal(pendingReads); chunk.mapped.push(2); + console.log("returning chunk from second map", chunk); return chunk; }, { objectMode: true, highWaterMark: 1 }, @@ -340,15 +343,17 @@ test.cb( { objectMode: true, highWaterMark: 5 }, ); composed.on("error", err => { + console.log("ending tests and got error", err); t.end(err); }); composed.on("drain", () => { expect(composed._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.lessThan(50); + expect(performance.now() - start).to.be.lessThan(_rate); t.pass(); }); + // Check if this is causing double cb composed.on("data", (chunk: Chunk) => { // Since second is bottleneck, composed will write into first immediately. Buffer should be empty. expect(composed._writableState.length).to.be.equal(0); @@ -370,6 +375,7 @@ test.cb( input.forEach(item => { composed.write(item); }); + const start = performance.now(); }, ); diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 15d89c3..06cec0e 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -44,6 +44,49 @@ test.cb("demux() constructor should be called once per key", t => { demuxed.end(); }); +test.cb("demux() should send input through correct pipeline", t => { + t.plan(6); + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, + ]; + const pipelineSpies = {}; + const construct = (destKey: string) => { + const mapper = sinon.spy((chunk: Test) => { + return { ...chunk, visited: [1] }; + }); + const dest = map(mapper); + pipelineSpies[destKey] = mapper; + + return dest; + }; + + const demuxed = demux(construct, { key: "key" }, { objectMode: true }); + + demuxed.on("finish", () => { + pipelineSpies["a"].getCalls().forEach(call => { + expect(call.args[0].key).to.equal("a"); + t.pass(); + }); + pipelineSpies["b"].getCalls().forEach(call => { + expect(call.args[0].key).to.equal("b"); + t.pass(); + }); + pipelineSpies["c"].getCalls().forEach(call => { + expect(call.args[0].key).to.equal("c"); + t.pass(); + }); + t.end(); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); + test.cb("demux() constructor should be called once per key using keyBy", t => { t.plan(1); const input = [ @@ -82,6 +125,53 @@ test.cb("demux() constructor should be called once per key using keyBy", t => { demuxed.end(); }); +test.cb("demux() should send input through correct pipeline using keyBy", t => { + t.plan(6); + const input = [ + { key: "a", visited: [] }, + { key: "b", visited: [] }, + { key: "a", visited: [] }, + { key: "c", visited: [] }, + { key: "a", visited: [] }, + { key: "b", visited: [] }, + ]; + const pipelineSpies = {}; + const construct = (destKey: string) => { + const mapper = sinon.spy((chunk: Test) => { + return { ...chunk, visited: [1] }; + }); + const dest = map(mapper); + pipelineSpies[destKey] = mapper; + + return dest; + }; + + const demuxed = demux( + construct, + { keyBy: item => item.key }, + { objectMode: true }, + ); + + demuxed.on("finish", () => { + pipelineSpies["a"].getCalls().forEach(call => { + expect(call.args[0].key).to.equal("a"); + t.pass(); + }); + pipelineSpies["b"].getCalls().forEach(call => { + expect(call.args[0].key).to.equal("b"); + t.pass(); + }); + pipelineSpies["c"].getCalls().forEach(call => { + expect(call.args[0].key).to.equal("c"); + t.pass(); + }); + t.end(); + }); + + input.forEach(event => demuxed.write(event)); + demuxed.end(); +}); + test.cb("should emit errors", t => { t.plan(2); let index = 0; @@ -367,6 +457,7 @@ test.cb( t => { t.plan(6); const highWaterMark = 5; + const _rate = 200; interface Chunk { key: string; mapped: number[]; @@ -393,7 +484,7 @@ test.cb( const second = map( async (chunk: Chunk) => { pendingReads--; - await sleep(200); + await sleep(_rate); chunk.mapped.push(2); expect(second._writableState.length).to.be.equal(1); expect(first._readableState.length).to.equal(pendingReads); @@ -419,7 +510,7 @@ test.cb( _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); - expect(performance.now() - start).to.be.lessThan(50); + expect(performance.now() - start).to.be.lessThan(_rate); t.pass(); });