Add spies to ensure demux handles keys correctly

This commit is contained in:
Jerry Kurian 2019-09-10 12:09:26 -04:00
parent 83ef6e9734
commit ee3d9b9ded
4 changed files with 102 additions and 7 deletions

View File

@ -40,7 +40,6 @@ type AllStreams =
| NodeJS.WritableStream; | NodeJS.WritableStream;
export class Compose extends Duplex { export class Compose extends Duplex {
public writable: boolean;
private first: AllStreams; private first: AllStreams;
private last: AllStreams; private last: AllStreams;
private streams: AllStreams[]; private streams: AllStreams[];

View File

@ -16,7 +16,6 @@ export function map<T, R>(
const mapped = await mapper(chunk, encoding); const mapped = await mapper(chunk, encoding);
callback(null, mapped); callback(null, mapped);
} catch (err) { } catch (err) {
console.log("caught error", err.message);
callback(err); callback(err);
} }
}, },

View File

@ -272,6 +272,7 @@ test.cb(
[first, second], [first, second],
{ objectMode: true, highWaterMark: 5 }, { objectMode: true, highWaterMark: 5 },
); );
composed.on("error", err => { composed.on("error", err => {
t.end(err); t.end(err);
}); });
@ -309,12 +310,13 @@ test.cb(
"compose() should emit drain event immediately when second stream is bottleneck", "compose() should emit drain event immediately when second stream is bottleneck",
t => { t => {
t.plan(6); t.plan(6);
const _rate = 200;
interface Chunk { interface Chunk {
key: string; key: string;
mapped: number[]; mapped: number[];
} }
const first = map( const first = map(
async (chunk: Chunk) => { (chunk: Chunk) => {
chunk.mapped.push(1); chunk.mapped.push(1);
return chunk; return chunk;
}, },
@ -326,10 +328,11 @@ test.cb(
const second = map( const second = map(
async (chunk: Chunk) => { async (chunk: Chunk) => {
pendingReads--; pendingReads--;
await sleep(200); await sleep(_rate);
expect(second._writableState.length).to.be.equal(1); expect(second._writableState.length).to.be.equal(1);
expect(first._readableState.length).to.equal(pendingReads); expect(first._readableState.length).to.equal(pendingReads);
chunk.mapped.push(2); chunk.mapped.push(2);
console.log("returning chunk from second map", chunk);
return chunk; return chunk;
}, },
{ objectMode: true, highWaterMark: 1 }, { objectMode: true, highWaterMark: 1 },
@ -340,15 +343,17 @@ test.cb(
{ objectMode: true, highWaterMark: 5 }, { objectMode: true, highWaterMark: 5 },
); );
composed.on("error", err => { composed.on("error", err => {
console.log("ending tests and got error", err);
t.end(err); t.end(err);
}); });
composed.on("drain", () => { composed.on("drain", () => {
expect(composed._writableState.length).to.be.equal(0); expect(composed._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.lessThan(50); expect(performance.now() - start).to.be.lessThan(_rate);
t.pass(); t.pass();
}); });
// Check if this is causing double cb
composed.on("data", (chunk: Chunk) => { composed.on("data", (chunk: Chunk) => {
// Since second is bottleneck, composed will write into first immediately. Buffer should be empty. // Since second is bottleneck, composed will write into first immediately. Buffer should be empty.
expect(composed._writableState.length).to.be.equal(0); expect(composed._writableState.length).to.be.equal(0);
@ -370,6 +375,7 @@ test.cb(
input.forEach(item => { input.forEach(item => {
composed.write(item); composed.write(item);
}); });
const start = performance.now(); const start = performance.now();
}, },
); );

View File

@ -44,6 +44,49 @@ test.cb("demux() constructor should be called once per key", t => {
demuxed.end(); demuxed.end();
}); });
test.cb("demux() should send input through correct pipeline", t => {
t.plan(6);
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "c", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
];
const pipelineSpies = {};
const construct = (destKey: string) => {
const mapper = sinon.spy((chunk: Test) => {
return { ...chunk, visited: [1] };
});
const dest = map(mapper);
pipelineSpies[destKey] = mapper;
return dest;
};
const demuxed = demux(construct, { key: "key" }, { objectMode: true });
demuxed.on("finish", () => {
pipelineSpies["a"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("a");
t.pass();
});
pipelineSpies["b"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("b");
t.pass();
});
pipelineSpies["c"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("c");
t.pass();
});
t.end();
});
input.forEach(event => demuxed.write(event));
demuxed.end();
});
test.cb("demux() constructor should be called once per key using keyBy", t => { test.cb("demux() constructor should be called once per key using keyBy", t => {
t.plan(1); t.plan(1);
const input = [ const input = [
@ -82,6 +125,53 @@ test.cb("demux() constructor should be called once per key using keyBy", t => {
demuxed.end(); demuxed.end();
}); });
test.cb("demux() should send input through correct pipeline using keyBy", t => {
t.plan(6);
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "a", visited: [] },
{ key: "c", visited: [] },
{ key: "a", visited: [] },
{ key: "b", visited: [] },
];
const pipelineSpies = {};
const construct = (destKey: string) => {
const mapper = sinon.spy((chunk: Test) => {
return { ...chunk, visited: [1] };
});
const dest = map(mapper);
pipelineSpies[destKey] = mapper;
return dest;
};
const demuxed = demux(
construct,
{ keyBy: item => item.key },
{ objectMode: true },
);
demuxed.on("finish", () => {
pipelineSpies["a"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("a");
t.pass();
});
pipelineSpies["b"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("b");
t.pass();
});
pipelineSpies["c"].getCalls().forEach(call => {
expect(call.args[0].key).to.equal("c");
t.pass();
});
t.end();
});
input.forEach(event => demuxed.write(event));
demuxed.end();
});
test.cb("should emit errors", t => { test.cb("should emit errors", t => {
t.plan(2); t.plan(2);
let index = 0; let index = 0;
@ -367,6 +457,7 @@ test.cb(
t => { t => {
t.plan(6); t.plan(6);
const highWaterMark = 5; const highWaterMark = 5;
const _rate = 200;
interface Chunk { interface Chunk {
key: string; key: string;
mapped: number[]; mapped: number[];
@ -393,7 +484,7 @@ test.cb(
const second = map( const second = map(
async (chunk: Chunk) => { async (chunk: Chunk) => {
pendingReads--; pendingReads--;
await sleep(200); await sleep(_rate);
chunk.mapped.push(2); chunk.mapped.push(2);
expect(second._writableState.length).to.be.equal(1); expect(second._writableState.length).to.be.equal(1);
expect(first._readableState.length).to.equal(pendingReads); expect(first._readableState.length).to.equal(pendingReads);
@ -419,7 +510,7 @@ test.cb(
_demux.on("drain", () => { _demux.on("drain", () => {
expect(_demux._writableState.length).to.be.equal(0); expect(_demux._writableState.length).to.be.equal(0);
expect(performance.now() - start).to.be.lessThan(50); expect(performance.now() - start).to.be.lessThan(_rate);
t.pass(); t.pass();
}); });