diff --git a/src/functions/accumulator/index.ts b/src/functions/accumulator/index.ts index 8beaf46..c7b9f51 100644 --- a/src/functions/accumulator/index.ts +++ b/src/functions/accumulator/index.ts @@ -44,6 +44,7 @@ function _sliding( )})`, ), ); + stream.resume(); return; } while ( @@ -107,6 +108,7 @@ function _rolling( )})`, ), ); + stream.resume(); return; } else if ( buffer.length > 0 && diff --git a/src/functions/map/map.spec.ts b/src/functions/map/map.spec.ts index 2812503..5d6a114 100644 --- a/src/functions/map/map.spec.ts +++ b/src/functions/map/map.spec.ts @@ -6,10 +6,11 @@ import { map } from "."; test.cb("map() maps elements synchronously", t => { t.plan(3); const source = new Readable({ objectMode: true }); + const mapStream = map((element: string) => element.toUpperCase()); const expectedElements = ["A", "B", "C"]; let i = 0; source - .pipe(map((element: string) => element.toUpperCase())) + .pipe(mapStream) .on("data", (element: string) => { expect(element).to.equal(expectedElements[i]); t.pass(); @@ -27,15 +28,14 @@ test.cb("map() maps elements synchronously", t => { test.cb("map() maps elements asynchronously", t => { t.plan(3); const source = new Readable({ objectMode: true }); + const mapStream = map(async (element: string) => { + await Promise.resolve(); + return element.toUpperCase(); + }); const expectedElements = ["A", "B", "C"]; let i = 0; source - .pipe( - map(async (element: string) => { - await Promise.resolve(); - return element.toUpperCase(); - }), - ) + .pipe(mapStream) .on("data", (element: string) => { expect(element).to.equal(expectedElements[i]); t.pass(); @@ -51,19 +51,23 @@ test.cb("map() maps elements asynchronously", t => { }); test.cb("map() emits errors during synchronous mapping", t => { - t.plan(2); + t.plan(3); const source = new Readable({ objectMode: true }); + const mapStream = map((element: string) => { + if (element !== "b") { + throw new Error("Failed mapping"); + } + return element.toUpperCase(); + }); source - .pipe( - map((element: string) => { - if (element !== "a") { - throw new Error("Failed mapping"); - } - return element.toUpperCase(); - }), - ) - .resume() + .pipe(mapStream) + .on("data", data => { + expect(data).to.equal("B"); + t.pass(); + }) .on("error", err => { + source.pipe(mapStream); + mapStream.resume(); expect(err.message).to.equal("Failed mapping"); t.pass(); }) @@ -77,31 +81,29 @@ test.cb("map() emits errors during synchronous mapping", t => { test("map() emits errors during asynchronous mapping", t => { t.plan(1); - return new Promise((resolve, reject) => { + return new Promise((resolve, _) => { const source = new Readable({ objectMode: true }); + const mapStream = map(async (element: string) => { + await Promise.resolve(); + if (element === "b") { + throw new Error("Failed mapping"); + } + return element.toUpperCase(); + }); source - .pipe( - map(async (element: string) => { - await Promise.resolve(); - if (element !== "a") { - throw new Error("Failed mapping"); - } - return element.toUpperCase(); - }), - ) - .resume() + .pipe(mapStream) .on("error", err => { expect(err.message).to.equal("Failed mapping"); t.pass(); resolve(); }) - .on("end", () => { - t.fail(); - }); + .on("end", () => t.fail); source.push("a"); source.push("b"); source.push("c"); source.push(null); + source.push(null); + source.push(null); }); });