From f630379c249bd77825085dd9d762463eac93f578 Mon Sep 17 00:00:00 2001 From: Sami Turcotte Date: Sun, 2 Dec 2018 21:53:10 -0500 Subject: [PATCH] Add last() method --- src/index.spec.ts | 12 ++++++++++++ src/index.ts | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/index.spec.ts b/src/index.spec.ts index 6c2ae77..da24e1e 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -18,6 +18,7 @@ import { duplex, child, reduce, + last, } from "."; test.cb("fromArray() streams array elements in flowing mode", t => { @@ -1065,3 +1066,14 @@ test.cb( source.push(null); }, ); + +test("last() yields the last chunk streamed by the given readable stream", async t => { + const source = new Readable({ objectMode: true }); + const lastPromise = last(source); + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); + const lastChunk = await lastPromise; + expect(lastChunk).to.equal("ef"); +}); diff --git a/src/index.ts b/src/index.ts index cde63f1..9890ca3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -427,3 +427,16 @@ export function duplex(writable: Writable, readable: Readable) { export function child(childProcess: ChildProcess) { return duplex(childProcess.stdin, childProcess.stdout); } + +/** + * Resolve the last streamed chunk of the given readable stream, after it has ended + * @param readable The readable stream to wait on + */ +export function last(readable: Readable): Promise { + let lastChunk: T | null = null; + return new Promise((resolve, reject) => { + readable + .on("data", chunk => (lastChunk = chunk)) + .on("end", () => resolve(lastChunk)); + }); +}