Add duplex() and child() methods

This commit is contained in:
Sami Turcotte 2018-12-02 01:18:28 -05:00
parent 6201d7812f
commit c99b946e6b
4 changed files with 102 additions and 19 deletions

View File

@ -30,7 +30,6 @@
"dependencies": {},
"devDependencies": {
"@types/chai": "^4.1.7",
"@types/event-stream": "^3.3.34",
"@types/node": "^10.12.10",
"ava": "^1.0.0-rc.2",
"chai": "^4.2.0",

View File

@ -1,3 +1,4 @@
import * as cp from "child_process";
import test from "ava";
import { expect } from "chai";
import { Readable } from "stream";
@ -14,6 +15,8 @@ import {
collect,
concat,
merge,
duplex,
child,
} from ".";
test.cb("fromArray() streams array elements in flowing mode", t => {
@ -921,3 +924,49 @@ test.cb("merge() merges an empty list of readable streams", t => {
.on("error", t.end)
.on("end", t.end);
});
test.cb(
"duplex() combines a writable and readable stream into a ReadWrite stream",
t => {
t.plan(1);
const source = new Readable();
const catProcess = cp.exec("cat");
let out = "";
source
.pipe(duplex(catProcess.stdin, catProcess.stdout))
.on("data", chunk => (out += chunk))
.on("error", t.end)
.on("end", () => {
expect(out).to.equal("abcdef");
t.pass();
t.end();
});
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
},
);
test.cb(
"child() allows easily writing to child process stdin and reading from its stdout",
t => {
t.plan(1);
const source = new Readable();
const catProcess = cp.exec("cat");
let out = "";
source
.pipe(child(catProcess))
.on("data", chunk => (out += chunk))
.on("error", t.end)
.on("end", () => {
expect(out).to.equal("abcdef");
t.pass();
t.end();
});
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
},
);

View File

@ -1,5 +1,6 @@
import { Transform, Readable } from "stream";
import { Transform, Readable, Writable, Duplex } from "stream";
import * as _utils from "./utils";
import { ChildProcess } from "child_process";
export const utils = _utils;
export interface ThroughOptions {
@ -12,7 +13,7 @@ export interface TransformOptions {
/**
* Convert an array into a readable stream of its elements
* @param array The array of elements to stream
* @param array Array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
let cursor = 0;
@ -31,7 +32,7 @@ export function fromArray(array: any[]): NodeJS.ReadableStream {
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper The mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
@ -66,7 +67,7 @@ export function map<T, R>(
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper The mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
@ -138,7 +139,7 @@ export function filter<T>(
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator The separator to split by, defaulting to "\n"
* @param separator Separator to split by, defaulting to "\n"
*/
export function split(
separator: string | RegExp = "\n",
@ -165,7 +166,7 @@ export function split(
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator The separator to join with
* @param separator Separator to join with
*/
export function join(separator: string): NodeJS.ReadWriteStream {
let isFirstChunk = true;
@ -186,8 +187,8 @@ export function join(separator: string): NodeJS.ReadWriteStream {
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue The search string to use
* @param replaceValue The replacement string to use
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
*/
export function replace(
searchValue: string | RegExp,
@ -271,7 +272,7 @@ export function collect(
/**
* Return a stream of readable streams concatenated together
* @param streams The readable streams to concatenate
* @param streams Readable streams to concatenate
*/
export function concat(
...streams: NodeJS.ReadableStream[]
@ -313,7 +314,7 @@ export function concat(
/**
* Return a stream of readable streams merged together in chunk arrival order
* @param streams The readable streams to merge
* @param streams Readable streams to merge
*/
export function merge(
...streams: NodeJS.ReadableStream[]
@ -348,3 +349,44 @@ export function merge(
},
});
}
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
export function duplex(writable: Writable, readable: Readable) {
const wrapper = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
read() {
readable.resume();
},
write(chunk, encoding, callback) {
return writable.write(chunk, encoding, callback);
},
final(callback) {
writable.end(callback);
},
});
readable
.on("data", chunk => {
if (!wrapper.push(chunk)) {
readable.pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => wrapper.push(null));
writable.on("drain", () => wrapper.emit("drain"));
writable.on("error", err => wrapper.emit("error", err));
return wrapper;
}
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
export function child(childProcess: ChildProcess) {
return duplex(childProcess.stdin, childProcess.stdout);
}

View File

@ -330,14 +330,7 @@
resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.1.7.tgz#1b8e33b61a8c09cbe1f85133071baa0dbf9fa71a"
integrity sha512-2Y8uPt0/jwjhQ6EiluT0XCri1Dbplr0ZxfFXUz+ye13gaqE8u5gL5ppao1JrUYr9cIip5S6MvQzBS7Kke7U9VA==
"@types/event-stream@^3.3.34":
version "3.3.34"
resolved "https://registry.yarnpkg.com/@types/event-stream/-/event-stream-3.3.34.tgz#104bcedd5c61f90917b734bde04e61c6e64f03e1"
integrity sha512-LLiivgWKii4JeMzFy3trrxqkRrVSdue8WmbXyHuSJLwNrhIQU5MTrc65jhxEPwMyh5HR1xevSdD+k2nnSRKw9g==
dependencies:
"@types/node" "*"
"@types/node@*", "@types/node@^10.12.10":
"@types/node@^10.12.10":
version "10.12.10"
resolved "https://registry.yarnpkg.com/@types/node/-/node-10.12.10.tgz#4fa76e6598b7de3f0cb6ec3abacc4f59e5b3a2ce"
integrity sha512-8xZEYckCbUVgK8Eg7lf5Iy4COKJ5uXlnIOnePN0WUwSQggy9tolM+tDJf7wMOnT/JT/W9xDYIaYggt3mRV2O5w==