This commit is contained in:
Jerry Kurian 2019-08-15 14:27:51 -04:00
parent 6a9f6ff919
commit 27b4b2427b
3 changed files with 351 additions and 84 deletions

View File

@ -22,11 +22,12 @@
"type": "git" "type": "git"
}, },
"scripts": { "scripts": {
"test": "ava", "test": "NODE_PATH=src node node_modules/.bin/ava 'src/**/**/*.spec.ts' -e",
"test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.js", "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.ts",
"lint": "tslint -p tsconfig.json", "test:all": "NODE_PATH=src node node_modules/.bin/ava",
"validate:tslint": "tslint-config-prettier-check ./tslint.json", "lint": "tslint -p tsconfig.json",
"prepublishOnly": "yarn lint && yarn test && yarn tsc" "validate:tslint": "tslint-config-prettier-check ./tslint.json",
"prepublishOnly": "yarn lint && yarn test && yarn tsc"
}, },
"dependencies": {}, "dependencies": {},
"devDependencies": { "devDependencies": {

View File

@ -70,48 +70,107 @@ test.cb("accumulator() rolling with key", t => {
source.push(null); source.push(null);
}); });
test.cb("accumulatorBy() rolling", t => { test.cb(
t.plan(2); "accumulator() rolling should emit error and ignore chunk when its missing key",
let chunkIndex = 0; t => {
interface TestObject { t.plan(2);
ts: number; let index = 0;
key: string; interface TestObject {
} ts: number;
const source = new Readable({ objectMode: true }); key: string;
const firstFlush = [ }
{ ts: 0, key: "a" }, const source = new Readable({ objectMode: true });
{ ts: 1, key: "b" }, const accumulatorStream = accumulator(
{ ts: 2, key: "c" }, 3,
{ ts: 2, key: "d" }, undefined,
]; FlushStrategy.rolling,
const secondFlush = [{ ts: 3, key: "e" }]; "nonExistingKey",
const flushes = [firstFlush, secondFlush]; );
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
source source
.pipe( .pipe(accumulatorStream)
accumulatorBy( .on("data", (flush: TestObject[]) => {
undefined, // No valid data output
FlushStrategy.rolling, expect(flush).to.deep.equal([]);
(event: TestObject, bufferChunk: TestObject) => { })
return bufferChunk.ts + 3 <= event.ts; .on("error", (err: any) => {
}, source.pipe(accumulatorStream);
), accumulatorStream.resume();
) expect(err.message).to.equal(
.on("data", (flush: TestObject[]) => { `Key is missing in event: (nonExistingKey, ${JSON.stringify(
t.deepEqual(flush, flushes[chunkIndex]); input[index],
chunkIndex++; )})`,
}) );
.on("error", (e: any) => { index++;
t.end(e); t.pass();
}) })
.on("end", () => { .on("end", () => {
t.end(); t.end();
});
input.forEach(item => {
source.push(item);
}); });
[...firstFlush, ...secondFlush].forEach(item => { source.push(null);
source.push(item); },
}); );
source.push(null);
}); test.cb(
"accumulator() rolling should emit error, ignore chunk when key is missing and continue processing chunks correctly",
t => {
t.plan(3);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(
3,
undefined,
FlushStrategy.rolling,
"ts",
);
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ key: "d" },
{ ts: 3, key: "e" },
];
const firstFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(accumulatorStream)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (err: any) => {
source.pipe(accumulatorStream);
accumulatorStream.resume();
expect(err.message).to.equal(
`Key is missing in event: (ts, ${JSON.stringify(
input[3],
)})`,
);
t.pass();
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb("accumulator() sliding", t => { test.cb("accumulator() sliding", t => {
t.plan(4); t.plan(4);
@ -216,6 +275,199 @@ test.cb("accumulator() sliding with key", t => {
source.push(null); source.push(null);
}); });
test.cb(
"accumulator() sliding should emit error and ignore chunk when key is missing",
t => {
t.plan(2);
let index = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(
3,
undefined,
FlushStrategy.sliding,
"nonExistingKey",
);
const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
source
.pipe(accumulatorStream)
.on("data", (flush: TestObject[]) => {
expect(flush).to.deep.equal([]);
})
.on("error", (err: any) => {
source.pipe(accumulatorStream);
accumulatorStream.resume();
expect(err.message).to.equal(
`Key is missing in event: (nonExistingKey, ${JSON.stringify(
input[index],
)})`,
);
index++;
t.pass();
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb(
"accumulator() sliding should emit error, ignore chunk when key is missing and continue processing chunks correctly",
t => {
t.plan(6);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const accumulatorStream = accumulator(
3,
undefined,
FlushStrategy.sliding,
"ts",
);
const input = [
{ ts: 0, key: "a" },
{ key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
{ ts: 5, key: "f" },
{ ts: 6, key: "g" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 2, key: "c" }];
const thirdFlush = [{ ts: 2, key: "c" }, { ts: 3, key: "d" }];
const fourthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
const fifthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fifthFlush,
];
source
.pipe(accumulatorStream)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (err: any) => {
source.pipe(accumulatorStream);
accumulatorStream.resume();
expect(err.message).to.equal(
`Key is missing in event: (ts, ${JSON.stringify(
input[1],
)})`,
);
t.pass();
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb("accumulatorBy() rolling", t => {
t.plan(2);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 2, key: "d" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(
accumulatorBy(
undefined,
FlushStrategy.rolling,
(event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts;
},
),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
[...firstFlush, ...secondFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb(
"accumulatorBy() rolling should emit error when key iteratee throws",
t => {
t.plan(2);
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const accumulaterStream = accumulatorBy(
undefined,
FlushStrategy.rolling,
(event: TestObject, bufferChunk: TestObject) => {
if (event.key !== "a") {
throw new Error("Failed mapping");
}
return bufferChunk.ts + 3 <= event.ts;
},
);
source
.pipe(accumulaterStream)
.on("error", (err: any) => {
source.pipe(accumulaterStream);
accumulaterStream.resume();
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
},
);
test.cb("accumulatorBy() sliding", t => { test.cb("accumulatorBy() sliding", t => {
t.plan(6); t.plan(6);
let chunkIndex = 0; let chunkIndex = 0;
@ -281,43 +533,45 @@ test.cb("accumulatorBy() sliding", t => {
source.push(null); source.push(null);
}); });
test.cb.only("accumulatorBy() sliding should throw", t => { test.cb(
t.plan(2); "accumulatorBy() sliding should emit error when key iteratee throws",
interface TestObject { t => {
ts: number; t.plan(2);
key: string; interface TestObject {
} ts: number;
const source = new Readable({ objectMode: true }); key: string;
const input = [ }
{ ts: 0, key: "a" }, const source = new Readable({ objectMode: true });
{ ts: 1, key: "b" }, const input = [
{ ts: 2, key: "c" }, { ts: 0, key: "a" },
{ ts: 3, key: "d" }, { ts: 1, key: "b" },
]; { ts: 2, key: "c" },
const accumulaterStream = accumulatorBy( ];
undefined, const accumulaterStream = accumulatorBy(
FlushStrategy.sliding, undefined,
(event: TestObject, bufferChunk: TestObject) => { FlushStrategy.sliding,
if (event.key !== "a" && event.key !== "b") { (event: TestObject, bufferChunk: TestObject) => {
throw new Error("Failed mapping"); if (event.key !== "a") {
} throw new Error("Failed mapping");
return bufferChunk.ts + 3 <= event.ts ? true : false; }
}, return bufferChunk.ts + 3 <= event.ts ? true : false;
); },
source );
.pipe(accumulaterStream) source
.on("error", (err: any) => { .pipe(accumulaterStream)
source.pipe(accumulaterStream); .on("error", (err: any) => {
accumulaterStream.resume(); source.pipe(accumulaterStream);
expect(err.message).to.equal("Failed mapping"); accumulaterStream.resume();
t.pass(); expect(err.message).to.equal("Failed mapping");
}) t.pass();
.on("end", () => { })
t.end(); .on("end", () => {
}); t.end();
});
input.forEach(item => { input.forEach(item => {
source.push(item); source.push(item);
}); });
source.push(null); source.push(null);
}); },
);

View File

@ -35,6 +35,17 @@ function _sliding<T>(
return (event: T, buffer: T[], stream: Transform) => { return (event: T, buffer: T[], stream: Transform) => {
if (key) { if (key) {
let index = 0; let index = 0;
if (event[key] === undefined) {
stream.emit(
"error",
new Error(
`Key is missing in event: (${key}, ${JSON.stringify(
event,
)})`,
),
);
return;
}
while ( while (
index < buffer.length && index < buffer.length &&
buffer[index][key] + windowLength <= event[key] buffer[index][key] + windowLength <= event[key]
@ -96,6 +107,7 @@ function _rolling<T>(
)})`, )})`,
), ),
); );
return;
} else if ( } else if (
buffer.length > 0 && buffer.length > 0 &&
buffer[0][key] + windowLength <= event[key] buffer[0][key] + windowLength <= event[key]