diff --git a/package.json b/package.json index a5c8e2e..53eb1bf 100644 --- a/package.json +++ b/package.json @@ -22,11 +22,12 @@ "type": "git" }, "scripts": { - "test": "ava", - "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.js", - "lint": "tslint -p tsconfig.json", - "validate:tslint": "tslint-config-prettier-check ./tslint.json", - "prepublishOnly": "yarn lint && yarn test && yarn tsc" + "test": "NODE_PATH=src node node_modules/.bin/ava 'src/**/**/*.spec.ts' -e", + "test:debug": "NODE_PATH=src node inspect node_modules/ava/profile.ts", + "test:all": "NODE_PATH=src node node_modules/.bin/ava", + "lint": "tslint -p tsconfig.json", + "validate:tslint": "tslint-config-prettier-check ./tslint.json", + "prepublishOnly": "yarn lint && yarn test && yarn tsc" }, "dependencies": {}, "devDependencies": { diff --git a/src/functions/accumulator/accumulator.spec.ts b/src/functions/accumulator/accumulator.spec.ts index c22b2fe..a64c921 100644 --- a/src/functions/accumulator/accumulator.spec.ts +++ b/src/functions/accumulator/accumulator.spec.ts @@ -70,48 +70,107 @@ test.cb("accumulator() rolling with key", t => { source.push(null); }); -test.cb("accumulatorBy() rolling", t => { - t.plan(2); - let chunkIndex = 0; - interface TestObject { - ts: number; - key: string; - } - const source = new Readable({ objectMode: true }); - const firstFlush = [ - { ts: 0, key: "a" }, - { ts: 1, key: "b" }, - { ts: 2, key: "c" }, - { ts: 2, key: "d" }, - ]; - const secondFlush = [{ ts: 3, key: "e" }]; - const flushes = [firstFlush, secondFlush]; +test.cb( + "accumulator() rolling should emit error and ignore chunk when its missing key", + t => { + t.plan(2); + let index = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const accumulatorStream = accumulator( + 3, + undefined, + FlushStrategy.rolling, + "nonExistingKey", + ); + const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; - source - .pipe( - accumulatorBy( - undefined, - FlushStrategy.rolling, - (event: TestObject, bufferChunk: TestObject) => { - return bufferChunk.ts + 3 <= event.ts; - }, - ), - ) - .on("data", (flush: TestObject[]) => { - t.deepEqual(flush, flushes[chunkIndex]); - chunkIndex++; - }) - .on("error", (e: any) => { - t.end(e); - }) - .on("end", () => { - t.end(); + source + .pipe(accumulatorStream) + .on("data", (flush: TestObject[]) => { + // No valid data output + expect(flush).to.deep.equal([]); + }) + .on("error", (err: any) => { + source.pipe(accumulatorStream); + accumulatorStream.resume(); + expect(err.message).to.equal( + `Key is missing in event: (nonExistingKey, ${JSON.stringify( + input[index], + )})`, + ); + index++; + t.pass(); + }) + .on("end", () => { + t.end(); + }); + input.forEach(item => { + source.push(item); }); - [...firstFlush, ...secondFlush].forEach(item => { - source.push(item); - }); - source.push(null); -}); + source.push(null); + }, +); + +test.cb( + "accumulator() rolling should emit error, ignore chunk when key is missing and continue processing chunks correctly", + t => { + t.plan(3); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const accumulatorStream = accumulator( + 3, + undefined, + FlushStrategy.rolling, + "ts", + ); + const input = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { key: "d" }, + { ts: 3, key: "e" }, + ]; + const firstFlush = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + ]; + const secondFlush = [{ ts: 3, key: "e" }]; + const flushes = [firstFlush, secondFlush]; + + source + .pipe(accumulatorStream) + .on("data", (flush: TestObject[]) => { + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; + }) + .on("error", (err: any) => { + source.pipe(accumulatorStream); + accumulatorStream.resume(); + expect(err.message).to.equal( + `Key is missing in event: (ts, ${JSON.stringify( + input[3], + )})`, + ); + t.pass(); + }) + .on("end", () => { + t.end(); + }); + input.forEach(item => { + source.push(item); + }); + source.push(null); + }, +); test.cb("accumulator() sliding", t => { t.plan(4); @@ -216,6 +275,199 @@ test.cb("accumulator() sliding with key", t => { source.push(null); }); +test.cb( + "accumulator() sliding should emit error and ignore chunk when key is missing", + t => { + t.plan(2); + let index = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const accumulatorStream = accumulator( + 3, + undefined, + FlushStrategy.sliding, + "nonExistingKey", + ); + const input = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }]; + + source + .pipe(accumulatorStream) + .on("data", (flush: TestObject[]) => { + expect(flush).to.deep.equal([]); + }) + .on("error", (err: any) => { + source.pipe(accumulatorStream); + accumulatorStream.resume(); + expect(err.message).to.equal( + `Key is missing in event: (nonExistingKey, ${JSON.stringify( + input[index], + )})`, + ); + index++; + t.pass(); + }) + .on("end", () => { + t.end(); + }); + input.forEach(item => { + source.push(item); + }); + source.push(null); + }, +); + +test.cb( + "accumulator() sliding should emit error, ignore chunk when key is missing and continue processing chunks correctly", + t => { + t.plan(6); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const accumulatorStream = accumulator( + 3, + undefined, + FlushStrategy.sliding, + "ts", + ); + const input = [ + { ts: 0, key: "a" }, + { key: "b" }, + { ts: 2, key: "c" }, + { ts: 3, key: "d" }, + { ts: 5, key: "f" }, + { ts: 6, key: "g" }, + ]; + const firstFlush = [{ ts: 0, key: "a" }]; + const secondFlush = [{ ts: 0, key: "a" }, { ts: 2, key: "c" }]; + const thirdFlush = [{ ts: 2, key: "c" }, { ts: 3, key: "d" }]; + const fourthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }]; + const fifthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }]; + + const flushes = [ + firstFlush, + secondFlush, + thirdFlush, + fourthFlush, + fifthFlush, + ]; + source + .pipe(accumulatorStream) + .on("data", (flush: TestObject[]) => { + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; + }) + .on("error", (err: any) => { + source.pipe(accumulatorStream); + accumulatorStream.resume(); + expect(err.message).to.equal( + `Key is missing in event: (ts, ${JSON.stringify( + input[1], + )})`, + ); + t.pass(); + }) + .on("end", () => { + t.end(); + }); + input.forEach(item => { + source.push(item); + }); + source.push(null); + }, +); + +test.cb("accumulatorBy() rolling", t => { + t.plan(2); + let chunkIndex = 0; + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const firstFlush = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + { ts: 2, key: "d" }, + ]; + const secondFlush = [{ ts: 3, key: "e" }]; + const flushes = [firstFlush, secondFlush]; + + source + .pipe( + accumulatorBy( + undefined, + FlushStrategy.rolling, + (event: TestObject, bufferChunk: TestObject) => { + return bufferChunk.ts + 3 <= event.ts; + }, + ), + ) + .on("data", (flush: TestObject[]) => { + t.deepEqual(flush, flushes[chunkIndex]); + chunkIndex++; + }) + .on("error", (e: any) => { + t.end(e); + }) + .on("end", () => { + t.end(); + }); + [...firstFlush, ...secondFlush].forEach(item => { + source.push(item); + }); + source.push(null); +}); + +test.cb( + "accumulatorBy() rolling should emit error when key iteratee throws", + t => { + t.plan(2); + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const input = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + ]; + const accumulaterStream = accumulatorBy( + undefined, + FlushStrategy.rolling, + (event: TestObject, bufferChunk: TestObject) => { + if (event.key !== "a") { + throw new Error("Failed mapping"); + } + return bufferChunk.ts + 3 <= event.ts; + }, + ); + source + .pipe(accumulaterStream) + .on("error", (err: any) => { + source.pipe(accumulaterStream); + accumulaterStream.resume(); + expect(err.message).to.equal("Failed mapping"); + t.pass(); + }) + .on("end", () => { + t.end(); + }); + + input.forEach(item => { + source.push(item); + }); + source.push(null); + }, +); + test.cb("accumulatorBy() sliding", t => { t.plan(6); let chunkIndex = 0; @@ -281,43 +533,45 @@ test.cb("accumulatorBy() sliding", t => { source.push(null); }); -test.cb.only("accumulatorBy() sliding should throw", t => { - t.plan(2); - interface TestObject { - ts: number; - key: string; - } - const source = new Readable({ objectMode: true }); - const input = [ - { ts: 0, key: "a" }, - { ts: 1, key: "b" }, - { ts: 2, key: "c" }, - { ts: 3, key: "d" }, - ]; - const accumulaterStream = accumulatorBy( - undefined, - FlushStrategy.sliding, - (event: TestObject, bufferChunk: TestObject) => { - if (event.key !== "a" && event.key !== "b") { - throw new Error("Failed mapping"); - } - return bufferChunk.ts + 3 <= event.ts ? true : false; - }, - ); - source - .pipe(accumulaterStream) - .on("error", (err: any) => { - source.pipe(accumulaterStream); - accumulaterStream.resume(); - expect(err.message).to.equal("Failed mapping"); - t.pass(); - }) - .on("end", () => { - t.end(); - }); +test.cb( + "accumulatorBy() sliding should emit error when key iteratee throws", + t => { + t.plan(2); + interface TestObject { + ts: number; + key: string; + } + const source = new Readable({ objectMode: true }); + const input = [ + { ts: 0, key: "a" }, + { ts: 1, key: "b" }, + { ts: 2, key: "c" }, + ]; + const accumulaterStream = accumulatorBy( + undefined, + FlushStrategy.sliding, + (event: TestObject, bufferChunk: TestObject) => { + if (event.key !== "a") { + throw new Error("Failed mapping"); + } + return bufferChunk.ts + 3 <= event.ts ? true : false; + }, + ); + source + .pipe(accumulaterStream) + .on("error", (err: any) => { + source.pipe(accumulaterStream); + accumulaterStream.resume(); + expect(err.message).to.equal("Failed mapping"); + t.pass(); + }) + .on("end", () => { + t.end(); + }); - input.forEach(item => { - source.push(item); - }); - source.push(null); -}); + input.forEach(item => { + source.push(item); + }); + source.push(null); + }, +); diff --git a/src/functions/accumulator/index.ts b/src/functions/accumulator/index.ts index a801dad..8beaf46 100644 --- a/src/functions/accumulator/index.ts +++ b/src/functions/accumulator/index.ts @@ -35,6 +35,17 @@ function _sliding( return (event: T, buffer: T[], stream: Transform) => { if (key) { let index = 0; + if (event[key] === undefined) { + stream.emit( + "error", + new Error( + `Key is missing in event: (${key}, ${JSON.stringify( + event, + )})`, + ), + ); + return; + } while ( index < buffer.length && buffer[index][key] + windowLength <= event[key] @@ -96,6 +107,7 @@ function _rolling( )})`, ), ); + return; } else if ( buffer.length > 0 && buffer[0][key] + windowLength <= event[key]