diff --git a/package.json b/package.json index ff226d1..66b174f 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,6 @@ "dependencies": {}, "devDependencies": { "@types/chai": "^4.1.7", - "@types/event-stream": "^3.3.34", "@types/node": "^10.12.10", "ava": "^1.0.0-rc.2", "chai": "^4.2.0", diff --git a/src/index.spec.ts b/src/index.spec.ts index 0a0bd2c..0d13a67 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -1,3 +1,4 @@ +import * as cp from "child_process"; import test from "ava"; import { expect } from "chai"; import { Readable } from "stream"; @@ -14,6 +15,8 @@ import { collect, concat, merge, + duplex, + child, } from "."; test.cb("fromArray() streams array elements in flowing mode", t => { @@ -921,3 +924,49 @@ test.cb("merge() merges an empty list of readable streams", t => { .on("error", t.end) .on("end", t.end); }); + +test.cb( + "duplex() combines a writable and readable stream into a ReadWrite stream", + t => { + t.plan(1); + const source = new Readable(); + const catProcess = cp.exec("cat"); + let out = ""; + source + .pipe(duplex(catProcess.stdin, catProcess.stdout)) + .on("data", chunk => (out += chunk)) + .on("error", t.end) + .on("end", () => { + expect(out).to.equal("abcdef"); + t.pass(); + t.end(); + }); + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); + }, +); + +test.cb( + "child() allows easily writing to child process stdin and reading from its stdout", + t => { + t.plan(1); + const source = new Readable(); + const catProcess = cp.exec("cat"); + let out = ""; + source + .pipe(child(catProcess)) + .on("data", chunk => (out += chunk)) + .on("error", t.end) + .on("end", () => { + expect(out).to.equal("abcdef"); + t.pass(); + t.end(); + }); + source.push("ab"); + source.push("cd"); + source.push("ef"); + source.push(null); + }, +); diff --git a/src/index.ts b/src/index.ts index 1691a75..9690665 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ -import { Transform, Readable } from "stream"; +import { Transform, Readable, Writable, Duplex } from "stream"; import * as _utils from "./utils"; +import { ChildProcess } from "child_process"; export const utils = _utils; export interface ThroughOptions { @@ -12,7 +13,7 @@ export interface TransformOptions { /** * Convert an array into a readable stream of its elements - * @param array The array of elements to stream + * @param array Array of elements to stream */ export function fromArray(array: any[]): NodeJS.ReadableStream { let cursor = 0; @@ -31,7 +32,7 @@ export function fromArray(array: any[]): NodeJS.ReadableStream { /** * Return a ReadWrite stream that maps streamed chunks - * @param mapper The mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) + * @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such) * @param options * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects @@ -66,7 +67,7 @@ export function map( /** * Return a ReadWrite stream that flat maps streamed chunks - * @param mapper The mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) + * @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such) * @param options * @param options.readableObjectMode Whether this stream should behave as a readable stream of objects * @param options.writableObjectMode Whether this stream should behave as a writable stream of objects @@ -138,7 +139,7 @@ export function filter( /** * Return a ReadWrite stream that splits streamed chunks using the given separator - * @param separator The separator to split by, defaulting to "\n" + * @param separator Separator to split by, defaulting to "\n" */ export function split( separator: string | RegExp = "\n", @@ -165,7 +166,7 @@ export function split( /** * Return a ReadWrite stream that joins streamed chunks using the given separator - * @param separator The separator to join with + * @param separator Separator to join with */ export function join(separator: string): NodeJS.ReadWriteStream { let isFirstChunk = true; @@ -186,8 +187,8 @@ export function join(separator: string): NodeJS.ReadWriteStream { /** * Return a ReadWrite stream that replaces occurrences of the given string or regular expression in * the streamed chunks with the specified replacement string - * @param searchValue The search string to use - * @param replaceValue The replacement string to use + * @param searchValue Search string to use + * @param replaceValue Replacement string to use */ export function replace( searchValue: string | RegExp, @@ -271,7 +272,7 @@ export function collect( /** * Return a stream of readable streams concatenated together - * @param streams The readable streams to concatenate + * @param streams Readable streams to concatenate */ export function concat( ...streams: NodeJS.ReadableStream[] @@ -313,7 +314,7 @@ export function concat( /** * Return a stream of readable streams merged together in chunk arrival order - * @param streams The readable streams to merge + * @param streams Readable streams to merge */ export function merge( ...streams: NodeJS.ReadableStream[] @@ -348,3 +349,44 @@ export function merge( }, }); } + +/** + * Return a Duplex stream from a writable stream that is assumed to somehow, when written to, + * cause the given readable stream to yield chunks + * @param writable Writable stream assumed to cause the readable stream to yield chunks when written to + * @param readable Readable stream assumed to yield chunks when the writable stream is written to + */ +export function duplex(writable: Writable, readable: Readable) { + const wrapper = new Duplex({ + readableObjectMode: true, + writableObjectMode: true, + read() { + readable.resume(); + }, + write(chunk, encoding, callback) { + return writable.write(chunk, encoding, callback); + }, + final(callback) { + writable.end(callback); + }, + }); + readable + .on("data", chunk => { + if (!wrapper.push(chunk)) { + readable.pause(); + } + }) + .on("error", err => wrapper.emit("error", err)) + .on("end", () => wrapper.push(null)); + writable.on("drain", () => wrapper.emit("drain")); + writable.on("error", err => wrapper.emit("error", err)); + return wrapper; +} + +/** + * Return a Duplex stream from a child process' stdin and stdout + * @param childProcess Child process from which to create duplex stream + */ +export function child(childProcess: ChildProcess) { + return duplex(childProcess.stdin, childProcess.stdout); +} diff --git a/yarn.lock b/yarn.lock index b88ff0e..bbc7fa2 100644 --- a/yarn.lock +++ b/yarn.lock @@ -330,14 +330,7 @@ resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.1.7.tgz#1b8e33b61a8c09cbe1f85133071baa0dbf9fa71a" integrity sha512-2Y8uPt0/jwjhQ6EiluT0XCri1Dbplr0ZxfFXUz+ye13gaqE8u5gL5ppao1JrUYr9cIip5S6MvQzBS7Kke7U9VA== -"@types/event-stream@^3.3.34": - version "3.3.34" - resolved "https://registry.yarnpkg.com/@types/event-stream/-/event-stream-3.3.34.tgz#104bcedd5c61f90917b734bde04e61c6e64f03e1" - integrity sha512-LLiivgWKii4JeMzFy3trrxqkRrVSdue8WmbXyHuSJLwNrhIQU5MTrc65jhxEPwMyh5HR1xevSdD+k2nnSRKw9g== - dependencies: - "@types/node" "*" - -"@types/node@*", "@types/node@^10.12.10": +"@types/node@^10.12.10": version "10.12.10" resolved "https://registry.yarnpkg.com/@types/node/-/node-10.12.10.tgz#4fa76e6598b7de3f0cb6ec3abacc4f59e5b3a2ce" integrity sha512-8xZEYckCbUVgK8Eg7lf5Iy4COKJ5uXlnIOnePN0WUwSQggy9tolM+tDJf7wMOnT/JT/W9xDYIaYggt3mRV2O5w==