Add last() method

This commit is contained in:
Sami Turcotte 2018-12-02 21:53:10 -05:00
parent cd6fccd925
commit f630379c24
2 changed files with 25 additions and 0 deletions

View File

@ -18,6 +18,7 @@ import {
duplex, duplex,
child, child,
reduce, reduce,
last,
} from "."; } from ".";
test.cb("fromArray() streams array elements in flowing mode", t => { test.cb("fromArray() streams array elements in flowing mode", t => {
@ -1065,3 +1066,14 @@ test.cb(
source.push(null); source.push(null);
}, },
); );
test("last() yields the last chunk streamed by the given readable stream", async t => {
const source = new Readable({ objectMode: true });
const lastPromise = last(source);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
const lastChunk = await lastPromise;
expect(lastChunk).to.equal("ef");
});

View File

@ -427,3 +427,16 @@ export function duplex(writable: Writable, readable: Readable) {
export function child(childProcess: ChildProcess) { export function child(childProcess: ChildProcess) {
return duplex(childProcess.stdin, childProcess.stdout); return duplex(childProcess.stdin, childProcess.stdout);
} }
/**
* Resolve the last streamed chunk of the given readable stream, after it has ended
* @param readable The readable stream to wait on
*/
export function last<T>(readable: Readable): Promise<T | null> {
let lastChunk: T | null = null;
return new Promise((resolve, reject) => {
readable
.on("data", chunk => (lastChunk = chunk))
.on("end", () => resolve(lastChunk));
});
}