Add reduce() method

This commit is contained in:
Sami Turcotte 2018-12-02 01:38:19 -05:00
parent c99b946e6b
commit cd6fccd925
2 changed files with 132 additions and 0 deletions

View File

@ -17,6 +17,7 @@ import {
merge, merge,
duplex, duplex,
child, child,
reduce,
} from "."; } from ".";
test.cb("fromArray() streams array elements in flowing mode", t => { test.cb("fromArray() streams array elements in flowing mode", t => {
@ -353,6 +354,100 @@ test.cb("filter() emits errors during asynchronous filtering", t => {
source.push(null); source.push(null);
}); });
test.cb("reduce() reduces elements synchronously", t => {
t.plan(1);
const source = new Readable({ objectMode: true });
const expectedValue = 6;
source
.pipe(reduce((acc: number, element: string) => acc + element.length, 0))
.on("data", (element: string) => {
expect(element).to.equal(expectedValue);
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb("reduce() reduces elements asynchronously", t => {
t.plan(1);
const source = new Readable({ objectMode: true });
const expectedValue = 6;
source
.pipe(
reduce(async (acc: number, element: string) => {
await Promise.resolve();
return acc + element.length;
}, 0),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedValue);
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb("reduce() emits errors during synchronous reduce", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
reduce((acc: number, element: string) => {
if (element !== "ab") {
throw new Error("Failed reduce");
}
return acc + element.length;
}, 0),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed reduce");
t.pass();
})
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb("reduce() emits errors during asynchronous reduce", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
reduce(async (acc: number, element: string) => {
await Promise.resolve();
if (element !== "ab") {
throw new Error("Failed mapping");
}
return acc + element.length;
}, 0),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb("split() splits chunks using the default separator (\\n)", t => { test.cb("split() splits chunks using the default separator (\\n)", t => {
t.plan(5); t.plan(5);
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });

View File

@ -137,6 +137,43 @@ export function filter<T>(
}); });
} }
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
let value = initialValue;
return new Transform({
readableObjectMode: options.readableObjectMode,
writableObjectMode: options.writableObjectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = iteratee(value, chunk, encoding);
isPromise = result instanceof Promise;
value = await result;
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
flush(callback) {
callback(undefined, value);
},
});
}
/** /**
* Return a ReadWrite stream that splits streamed chunks using the given separator * Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator Separator to split by, defaulting to "\n" * @param separator Separator to split by, defaulting to "\n"