From 517e281ce5953dda9aab7bf9c65d7a969e278b23 Mon Sep 17 00:00:00 2001 From: Jerry Kurian Date: Thu, 12 Sep 2019 09:41:04 -0400 Subject: [PATCH] Remove try catch from provided functions, user handles errors --- src/functions/accumulator.ts | 8 ++------ src/functions/filter.ts | 22 +++++----------------- src/functions/flatMap.ts | 17 ++--------------- src/functions/map.ts | 1 - src/functions/parallelMap.ts | 11 +++-------- src/functions/reduce.ts | 17 ++--------------- tests/accumulator.spec.ts | 4 ++-- tests/demux.spec.ts | 8 ++++---- tests/filter.spec.ts | 4 ++-- tests/flatMap.spec.ts | 4 ++-- tests/reduce.spec.ts | 4 ++-- 11 files changed, 26 insertions(+), 74 deletions(-) diff --git a/src/functions/accumulator.ts b/src/functions/accumulator.ts index c406afe..bb8a7fb 100644 --- a/src/functions/accumulator.ts +++ b/src/functions/accumulator.ts @@ -18,12 +18,8 @@ function _accumulator( return new Transform({ ...options, transform(data: T, encoding, callback) { - try { - accumulateBy(data, buffer, this); - callback(); - } catch (err) { - callback(err); - } + accumulateBy(data, buffer, this); + callback(); }, flush(callback) { if (shouldFlush) { diff --git a/src/functions/filter.ts b/src/functions/filter.ts index e7578b3..cd89864 100644 --- a/src/functions/filter.ts +++ b/src/functions/filter.ts @@ -9,23 +9,11 @@ export function filter( return new Transform({ ...options, async transform(chunk: T, encoding?: any, callback?: any) { - let isPromise = false; - try { - const result = predicate(chunk, encoding); - isPromise = result instanceof Promise; - if (!!(await result)) { - callback(null, chunk); - } else { - callback(); - } - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } + const result = await predicate(chunk, encoding); + if (result === true) { + callback(null, chunk); + } else { + callback(); } }, }); diff --git a/src/functions/flatMap.ts b/src/functions/flatMap.ts index 99f38a6..2abb726 100644 --- a/src/functions/flatMap.ts +++ b/src/functions/flatMap.ts @@ -13,21 +13,8 @@ export function flatMap( return new Transform({ ...options, async transform(chunk: T, encoding, callback) { - let isPromise = false; - try { - const mapped = mapper(chunk, encoding); - isPromise = mapped instanceof Promise; - (await mapped).forEach(c => this.push(c)); - callback(); - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } - } + (await mapper(chunk, encoding)).forEach(c => this.push(c)); + callback(); }, }); } diff --git a/src/functions/map.ts b/src/functions/map.ts index 13834af..589f0a9 100644 --- a/src/functions/map.ts +++ b/src/functions/map.ts @@ -8,7 +8,6 @@ export function map( writableObjectMode: true, }, ): Transform { - // remove try catch return new Transform({ ...options, async transform(chunk: T, encoding, callback) { diff --git a/src/functions/parallelMap.ts b/src/functions/parallelMap.ts index 56c9f41..6bc6b79 100644 --- a/src/functions/parallelMap.ts +++ b/src/functions/parallelMap.ts @@ -20,14 +20,9 @@ export function parallelMap( } inflight += 1; callback(); - try { - const res = await mapper(data); - this.push(res); - } catch (e) { - this.emit(e); - } finally { - inflight -= 1; - } + const res = await mapper(data); + this.push(res); + inflight -= 1; }, async flush(callback) { while (inflight > 0) { diff --git a/src/functions/reduce.ts b/src/functions/reduce.ts index 6ee665f..ff76025 100644 --- a/src/functions/reduce.ts +++ b/src/functions/reduce.ts @@ -16,21 +16,8 @@ export function reduce( readableObjectMode: options.readableObjectMode, writableObjectMode: options.writableObjectMode, async transform(chunk: T, encoding, callback) { - let isPromise = false; - try { - const result = iteratee(value, chunk, encoding); - isPromise = result instanceof Promise; - value = await result; - callback(); - } catch (err) { - if (isPromise) { - // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly - this.emit("error", err); - callback(); - } else { - callback(err); - } - } + value = await iteratee(value, chunk, encoding); + callback(); }, flush(callback) { // Best effort attempt at yielding the final value (will throw if e.g. yielding an object and diff --git a/tests/accumulator.spec.ts b/tests/accumulator.spec.ts index 523455a..71fdb1f 100644 --- a/tests/accumulator.spec.ts +++ b/tests/accumulator.spec.ts @@ -407,7 +407,7 @@ test.cb("accumulatorBy() rolling", t => { source.push(null); }); -test.cb( +test.cb.skip( "accumulatorBy() rolling should emit error when key iteratee throws", t => { t.plan(2); @@ -511,7 +511,7 @@ test.cb("accumulatorBy() sliding", t => { source.push(null); }); -test.cb( +test.cb.skip( "accumulatorBy() sliding should emit error when key iteratee throws", t => { t.plan(2); diff --git a/tests/demux.spec.ts b/tests/demux.spec.ts index 725ec43..2d43ec1 100644 --- a/tests/demux.spec.ts +++ b/tests/demux.spec.ts @@ -354,7 +354,7 @@ test("demux() should emit one drain event when writing 6 items with highWaterMar }); }); -test.cb( +test.cb.only( "demux() should emit drain event when third stream is bottleneck", t => { t.plan(8); @@ -405,7 +405,7 @@ test.cb( t.end(err); }); - // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first) + // This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event) _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan( @@ -445,7 +445,7 @@ test.cb( const sink = new Writable({ objectMode: true, write(chunk, encoding, cb) { - expect(chunk.mapped).to.deep.equal([1, 2, 3]); + expect(chunk.mapped).to.deep.equal([1, 2]); t.pass(); pendingReads--; if (pendingReads === 0) { @@ -493,7 +493,7 @@ test.cb( t.end(err); }); - // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first) + // This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event) _demux.on("drain", () => { expect(_demux._writableState.length).to.be.equal(0); expect(performance.now() - start).to.be.greaterThan( diff --git a/tests/filter.spec.ts b/tests/filter.spec.ts index 0732d06..badfda7 100644 --- a/tests/filter.spec.ts +++ b/tests/filter.spec.ts @@ -58,7 +58,7 @@ test.cb("filter() filters elements asynchronously", t => { source.push(null); }); -test.cb("filter() emits errors during synchronous filtering", t => { +test.cb.skip("filter() emits errors during synchronous filtering", t => { t.plan(2); const source = new Readable({ objectMode: true }); source @@ -86,7 +86,7 @@ test.cb("filter() emits errors during synchronous filtering", t => { source.push(null); }); -test.cb("filter() emits errors during asynchronous filtering", t => { +test.cb.skip("filter() emits errors during asynchronous filtering", t => { t.plan(2); const source = new Readable({ objectMode: true }); source diff --git a/tests/flatMap.spec.ts b/tests/flatMap.spec.ts index a8b22bb..84cebfb 100644 --- a/tests/flatMap.spec.ts +++ b/tests/flatMap.spec.ts @@ -48,7 +48,7 @@ test.cb("flatMap() maps elements asynchronously", t => { source.push(null); }); -test.cb("flatMap() emits errors during synchronous mapping", t => { +test.cb.skip("flatMap() emits errors during synchronous mapping", t => { t.plan(2); const source = new Readable({ objectMode: true }); source @@ -73,7 +73,7 @@ test.cb("flatMap() emits errors during synchronous mapping", t => { source.push(null); }); -test.cb("flatMap() emits errors during asynchronous mapping", t => { +test.cb.skip("flatMap() emits errors during asynchronous mapping", t => { t.plan(2); const source = new Readable({ objectMode: true }); source diff --git a/tests/reduce.spec.ts b/tests/reduce.spec.ts index b005896..8d504db 100644 --- a/tests/reduce.spec.ts +++ b/tests/reduce.spec.ts @@ -46,7 +46,7 @@ test.cb("reduce() reduces elements asynchronously", t => { source.push(null); }); -test.cb("reduce() emits errors during synchronous reduce", t => { +test.cb.skip("reduce() emits errors during synchronous reduce", t => { t.plan(2); const source = new Readable({ objectMode: true }); source @@ -71,7 +71,7 @@ test.cb("reduce() emits errors during synchronous reduce", t => { source.push(null); }); -test.cb("reduce() emits errors during asynchronous reduce", t => { +test.cb.skip("reduce() emits errors during asynchronous reduce", t => { t.plan(2); const source = new Readable({ objectMode: true }); source