diff --git a/src/index.spec.ts b/src/index.spec.ts index 0d13a67..6c2ae77 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -17,6 +17,7 @@ import { merge, duplex, child, + reduce, } from "."; test.cb("fromArray() streams array elements in flowing mode", t => { @@ -353,6 +354,100 @@ test.cb("filter() emits errors during asynchronous filtering", t => { source.push(null); }); +test.cb("reduce() reduces elements synchronously", t => { + t.plan(1); + const source = new Readable({ objectMode: true }); + const expectedValue = 6; + source + .pipe(reduce((acc: number, element: string) => acc + element.length, 0)) + .on("data", (element: string) => { + expect(element).to.equal(expectedValue); + t.pass(); + }) + .on("error", t.end) + .on("end", t.end); + + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); +}); + +test.cb("reduce() reduces elements asynchronously", t => { + t.plan(1); + const source = new Readable({ objectMode: true }); + const expectedValue = 6; + source + .pipe( + reduce(async (acc: number, element: string) => { + await Promise.resolve(); + return acc + element.length; + }, 0), + ) + .on("data", (element: string) => { + expect(element).to.equal(expectedValue); + t.pass(); + }) + .on("error", t.end) + .on("end", t.end); + + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); +}); + +test.cb("reduce() emits errors during synchronous reduce", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe( + reduce((acc: number, element: string) => { + if (element !== "ab") { + throw new Error("Failed reduce"); + } + return acc + element.length; + }, 0), + ) + .resume() + .on("error", err => { + expect(err.message).to.equal("Failed reduce"); + t.pass(); + }) + .on("end", t.end); + + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); +}); + +test.cb("reduce() emits errors during asynchronous reduce", t => { + t.plan(2); + const source = new Readable({ objectMode: true }); + source + .pipe( + reduce(async (acc: number, element: string) => { + await Promise.resolve(); + if (element !== "ab") { + throw new Error("Failed mapping"); + } + return acc + element.length; + }, 0), + ) + .resume() + .on("error", err => { + expect(err.message).to.equal("Failed mapping"); + t.pass(); + }) + .on("end", t.end); + + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); +}); + test.cb("split() splits chunks using the default separator (\\n)", t => { t.plan(5); const source = new Readable({ objectMode: true }); diff --git a/src/index.ts b/src/index.ts index 9690665..cde63f1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -137,6 +137,43 @@ export function filter( }); } +export function reduce( + iteratee: + | ((previousValue: R, chunk: T, encoding: string) => R) + | ((previousValue: R, chunk: T, encoding: string) => Promise), + initialValue: R, + options: TransformOptions = { + readableObjectMode: true, + writableObjectMode: true, + }, +) { + let value = initialValue; + return new Transform({ + readableObjectMode: options.readableObjectMode, + writableObjectMode: options.writableObjectMode, + async transform(chunk: T, encoding, callback) { + let isPromise = false; + try { + const result = iteratee(value, chunk, encoding); + isPromise = result instanceof Promise; + value = await result; + callback(); + } catch (err) { + if (isPromise) { + // Calling the callback asynchronously with an error wouldn't emit the error, so emit directly + this.emit("error", err); + callback(); + } else { + callback(err); + } + } + }, + flush(callback) { + callback(undefined, value); + }, + }); +} + /** * Return a ReadWrite stream that splits streamed chunks using the given separator * @param separator Separator to split by, defaulting to "\n"