Refactor lib structure

This commit is contained in:
Jerry Kurian 2019-06-04 10:21:15 -04:00
parent 81bc9e6bc5
commit eaeed0dde6
6 changed files with 803 additions and 609 deletions

View File

@ -0,0 +1,24 @@
export interface ThroughOptions {
objectMode?: boolean;
}
export interface TransformOptions {
readableObjectMode?: boolean;
writableObjectMode?: boolean;
}
export interface WithEncoding {
encoding: string;
}
export enum SerializationFormats {
utf8 = "utf8",
}
type JsonPrimitive = string | number | object;
export type JsonValue = JsonPrimitive | JsonPrimitive[];
export interface JsonParseOptions {
pretty: boolean;
}

View File

@ -25,6 +25,7 @@ import {
rate,
parallelMap,
} from ".";
import { SerializationFormats } from "./definitions";
test.cb("fromArray() streams array elements in flowing mode", t => {
t.plan(3);
@ -682,7 +683,7 @@ test.cb("parse() parses the streamed elements as JSON", t => {
const expectedElements = ["abc", {}, []];
let i = 0;
source
.pipe(parse())
.pipe(parse(SerializationFormats.utf8))
.on("data", part => {
expect(part).to.deep.equal(expectedElements[i]);
t.pass();
@ -701,7 +702,7 @@ test.cb("parse() emits errors on invalid JSON", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(parse())
.pipe(parse(SerializationFormats.utf8))
.resume()
.on("error", () => t.pass())
.on("end", t.end);
@ -1235,7 +1236,7 @@ test.cb("unbatch() unbatches", t => {
test.cb("rate() sends data at desired rate", t => {
t.plan(9);
const fastRate = 500;
const fastRate = 150;
const medRate = 50;
const slowRate = 1;
const sourceFast = new Readable({ objectMode: true });
@ -1270,7 +1271,7 @@ test.cb("rate() sends data at desired rate", t => {
.on("error", t.end);
sourceSlow
.pipe(rate(slowRate))
.pipe(rate(slowRate, 1))
.on("data", (element: string[]) => {
const currentRate = (k / (performance.now() - start)) * 1000;
expect(element).to.deep.equal(expectedElements[k]);
@ -1306,19 +1307,20 @@ test.cb("parallel() parallel mapping", t => {
"e_processed",
];
const orderedResults: string[] = [];
// Record start / end times of each process and then compare to figure out # of processes ocurring and order
source
.pipe(parallelMap(2, data => data + "_processed"))
.pipe(parallelMap(data => data + "_processed"))
.on("data", (element: string) => {
t.true(expectedElements.includes(element));
orderedResults.push(element);
})
.on("error", t.end)
.on("end", () => {
expect(orderedResults[0]).to.equal("a_processed")
expect(orderedResults[1]).to.equal("b_processed")
expect(orderedResults[2]).to.equal("d_processed")
expect(orderedResults[3]).to.equal("c_processed")
expect(orderedResults[4]).to.equal("e_processed")
expect(orderedResults[0]).to.equal("a_processed");
expect(orderedResults[1]).to.equal("b_processed");
expect(orderedResults[2]).to.equal("d_processed");
expect(orderedResults[3]).to.equal("c_processed");
expect(orderedResults[4]).to.equal("e_processed");
t.end();
});

605
src/functions/functions.ts Normal file
View File

@ -0,0 +1,605 @@
import { Transform, Readable, Writable, Duplex } from "stream";
import { performance } from "perf_hooks";
import { ChildProcess } from "child_process";
import { StringDecoder } from "string_decoder";
import {
TransformOptions,
ThroughOptions,
WithEncoding,
SerializationFormats,
JsonValue,
JsonParseOptions,
} from "./definitions";
import { sleep } from "../helpers";
/**
* Convert an array into a Readable stream of its elements
* @param array Array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
let cursor = 0;
return new Readable({
objectMode: true,
read() {
if (cursor < array.length) {
this.push(array[cursor]);
cursor++;
} else {
this.push(null);
}
},
});
}
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>(
mapper: (chunk: T, encoding: string) => R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
callback(undefined, await mapped);
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
(await mapped).forEach(c => this.push(c));
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function filter<T>(
predicate:
| ((chunk: T, encoding: string) => boolean)
| ((chunk: T, encoding: string) => Promise<boolean>),
options: ThroughOptions = {
objectMode: true,
},
) {
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = predicate(chunk, encoding);
isPromise = result instanceof Promise;
if (!!(await result)) {
callback(undefined, chunk);
} else {
callback();
}
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value
* @param iteratee Reducer function to apply on each streamed chunk
* @param initialValue Initial value
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
let value = initialValue;
return new Transform({
readableObjectMode: options.readableObjectMode,
writableObjectMode: options.writableObjectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = iteratee(value, chunk, encoding);
isPromise = result instanceof Promise;
value = await result;
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
flush(callback) {
// Best effort attempt at yielding the final value (will throw if e.g. yielding an object and
// downstream doesn't expect objects)
try {
callback(undefined, value);
} catch (err) {
try {
this.emit("error", err);
} catch {
// Best effort was made
}
}
},
});
}
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator Separator to split by, defaulting to "\n"
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function split(
separator: string | RegExp = "\n",
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let buffered = "";
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
const splitted = asString.split(separator);
if (splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
buffered += splitted[splitted.length - 1];
splitted.slice(0, -1).forEach((part: string) => this.push(part));
callback();
},
flush(callback) {
callback(undefined, buffered + decoder.end());
},
});
}
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function join(
separator: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let isFirstChunk = true;
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(asString);
isFirstChunk = false;
}
callback();
},
});
}
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
callback(
undefined,
asString.replace(searchValue, replaceValue),
);
} else {
callback();
}
},
});
}
/**
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string.
* @param format Format of serialized data, only utf8 supported.
*/
export function parse(
format: SerializationFormats = SerializationFormats.utf8,
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(format);
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
try {
const asString = decoder.write(chunk);
// Using await causes parsing errors to be emitted
callback(undefined, await JSON.parse(asString));
} catch (err) {
callback(err);
}
},
});
}
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
*/
export function stringify(
options: JsonParseOptions = { pretty: false },
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: JsonValue, encoding, callback) {
callback(
undefined,
options.pretty
? JSON.stringify(chunk, null, 2)
: JSON.stringify(chunk),
);
},
});
}
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function collect(
options: ThroughOptions = { objectMode: false },
): NodeJS.ReadWriteStream {
const collected: any[] = [];
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
transform(data, encoding, callback) {
collected.push(data);
callback();
},
flush(callback) {
this.push(
options.objectMode ? collected : Buffer.concat(collected),
);
callback();
},
});
}
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
export function concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let currentStreamIndex = 0;
const startCurrentStream = () => {
if (currentStreamIndex >= streams.length) {
wrapper.push(null);
} else {
streams[currentStreamIndex]
.on("data", chunk => {
if (!wrapper.push(chunk)) {
streams[currentStreamIndex].pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => {
currentStreamIndex++;
startCurrentStream();
});
}
};
const wrapper = new Readable({
objectMode: true,
read() {
if (!isStarted) {
isStarted = true;
startCurrentStream();
}
if (currentStreamIndex < streams.length) {
streams[currentStreamIndex].resume();
}
},
});
return wrapper;
}
/**
* Return a Readable stream of readable streams merged together in chunk arrival order
* @param streams Readable streams to merge
*/
export function merge(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let streamEndedCount = 0;
return new Readable({
objectMode: true,
read() {
if (streamEndedCount >= streams.length) {
this.push(null);
} else if (!isStarted) {
isStarted = true;
streams.forEach(stream =>
stream
.on("data", chunk => {
if (!this.push(chunk)) {
streams.forEach(s => s.pause());
}
})
.on("error", err => this.emit("error", err))
.on("end", () => {
streamEndedCount++;
if (streamEndedCount === streams.length) {
this.push(null);
}
}),
);
} else {
streams.forEach(s => s.resume());
}
},
});
}
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
export function duplex(writable: Writable, readable: Readable) {
const wrapper = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
read() {
readable.resume();
},
write(chunk, encoding, callback) {
return writable.write(chunk, encoding, callback);
},
final(callback) {
writable.end(callback);
},
});
readable
.on("data", chunk => {
if (!wrapper.push(chunk)) {
readable.pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => wrapper.push(null));
writable.on("drain", () => wrapper.emit("drain"));
writable.on("error", err => wrapper.emit("error", err));
return wrapper;
}
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
export function child(childProcess: ChildProcess) {
return duplex(childProcess.stdin, childProcess.stdout);
}
/**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended
* @param readable Readable stream to wait on
*/
export function last<T>(readable: Readable): Promise<T | null> {
let lastChunk: T | null = null;
return new Promise((resolve, reject) => {
readable
.on("data", chunk => (lastChunk = chunk))
.on("end", () => resolve(lastChunk));
});
}
/**
* Stores chunks of data internally in array and batches when batchSize is reached.
*
* @param batchSize Size of the batches
* @param maxBatchAge Max lifetime of a batch
*/
export function batch(batchSize: number = 1000, maxBatchAge: number = 500) {
const buffer: any[] = [];
let startTime: number | null = null;
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
if (
buffer.length === batchSize - 1 ||
(startTime !== null &&
startTime - performance.now() >= maxBatchAge)
) {
buffer.push(chunk);
callback(undefined, buffer.splice(0));
} else {
if (startTime === null) {
startTime = performance.now();
}
buffer.push(chunk);
callback();
}
},
flush(callback) {
callback(undefined, buffer.splice(0));
},
});
}
/**
* Unbatches and sends individual chunks of data
*/
export function unbatch() {
return new Transform({
objectMode: true,
transform(data, encoding, callback) {
for (const d of data) {
this.push(d);
}
callback();
},
});
}
/**
* Limits date of data transferred into stream.
* @param rate Desired rate in ms
*/
export function rate(targetRate: number = 50, period: number = 2) {
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period
let total = 0;
const start = performance.now();
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
const currentRate = (total / (performance.now() - start)) * 1000;
if (targetRate && currentRate > targetRate) {
await sleep(deltaMS);
}
total += 1;
callback(undefined, data);
},
});
}
/**
* Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
*/
export function parallelMap<T, R>(
mapper: (data: T) => R,
parallel: number = 10,
sleepTime: number = 5,
) {
let inflight = 0;
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
while (parallel <= inflight) {
await sleep(sleepTime);
}
inflight += 1;
callback();
try {
const res = await mapper(data);
this.push(res);
} catch (e) {
this.emit(e);
} finally {
inflight -= 1;
}
},
async flush(callback) {
while (inflight > 0) {
await sleep(sleepTime);
}
callback();
},
});
}

137
src/functions/index.ts Normal file
View File

@ -0,0 +1,137 @@
import { Readable, Writable } from "stream";
import { ChildProcess } from "child_process";
import * as baseFunctions from "./functions";
import {
ThroughOptions,
TransformOptions,
WithEncoding,
SerializationFormats,
JsonParseOptions,
} from "./definitions";
export function fromArray(array: any[]): NodeJS.ReadableStream {
return baseFunctions.fromArray(array);
}
export function map<T, R>(
mapper: (chunk: T, encoding?: string) => R,
options?: TransformOptions,
): NodeJS.ReadWriteStream {
return baseFunctions.map(mapper, options);
}
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options?: TransformOptions,
): NodeJS.ReadWriteStream {
return baseFunctions.flatMap(mapper, options);
}
export function filter<T>(
mapper:
| ((chunk: T, encoding: string) => boolean)
| ((chunk: T, encoding: string) => Promise<boolean>),
options?: ThroughOptions,
): NodeJS.ReadWriteStream {
return baseFunctions.filter(mapper, options);
}
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options?: TransformOptions,
): NodeJS.ReadWriteStream {
return baseFunctions.reduce(iteratee, initialValue, options);
}
export function split(
separator?: string | RegExp,
options?: WithEncoding,
): NodeJS.ReadWriteStream {
return baseFunctions.split(separator, options);
}
export function join(
separator: string,
options?: WithEncoding,
): NodeJS.ReadWriteStream {
return baseFunctions.join(separator, options);
}
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options?: WithEncoding,
): NodeJS.ReadWriteStream {
return baseFunctions.replace(searchValue, replaceValue, options);
}
export function parse(format: SerializationFormats): NodeJS.ReadWriteStream {
return baseFunctions.parse(format);
}
export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream {
return baseFunctions.stringify(options);
}
export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream {
return baseFunctions.collect(options);
}
export function concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
return baseFunctions.concat(...streams);
}
export function merge(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
return baseFunctions.merge(...streams);
}
export function duplex(
writable: Writable,
readable: Readable,
): NodeJS.ReadWriteStream {
return baseFunctions.duplex(writable, readable);
}
export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream {
return baseFunctions.child(childProcess);
}
export function last<T>(readable: Readable): Promise<T | null> {
return baseFunctions.last(readable);
}
export function batch(
batchSize?: number,
maxBatchAge?: number,
): NodeJS.ReadWriteStream {
return baseFunctions.batch(batchSize, maxBatchAge);
}
export function unbatch(): NodeJS.ReadWriteStream {
return baseFunctions.unbatch();
}
export function rate(
targetRate?: number,
period?: number,
): NodeJS.ReadWriteStream {
return baseFunctions.rate(targetRate, period);
}
export function parallelMap<T, R>(
mapper: (data: T) => R,
parallel?: number,
sleepTime?: number,
) {
return baseFunctions.parallelMap(mapper, parallel, sleepTime);
}

3
src/helpers.ts Normal file
View File

@ -0,0 +1,3 @@
export async function sleep(time: number): Promise<{} | null> {
return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null;
}

View File

@ -1,599 +1,22 @@
import { Transform, Readable, Writable, Duplex } from "stream";
import { performance } from "perf_hooks";
import { ChildProcess } from "child_process";
import { StringDecoder } from "string_decoder";
export interface ThroughOptions {
objectMode?: boolean;
}
export interface TransformOptions {
readableObjectMode?: boolean;
writableObjectMode?: boolean;
}
export interface WithEncoding {
encoding: string;
}
async function sleep(time: number) {
return time > 0 ? new Promise(resolve => setTimeout(resolve, time)) : null;
}
/**
* Convert an array into a Readable stream of its elements
* @param array Array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
let cursor = 0;
return new Readable({
objectMode: true,
read() {
if (cursor < array.length) {
this.push(array[cursor]);
cursor++;
} else {
this.push(null);
}
},
});
}
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>(
mapper: (chunk: T, encoding: string) => R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
callback(undefined, await mapped);
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
(await mapped).forEach(c => this.push(c));
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function filter<T>(
predicate:
| ((chunk: T, encoding: string) => boolean)
| ((chunk: T, encoding: string) => Promise<boolean>),
options: ThroughOptions = {
objectMode: true,
},
) {
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = predicate(chunk, encoding);
isPromise = result instanceof Promise;
if (!!(await result)) {
callback(undefined, chunk);
} else {
callback();
}
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value
* @param iteratee Reducer function to apply on each streamed chunk
* @param initialValue Initial value
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
let value = initialValue;
return new Transform({
readableObjectMode: options.readableObjectMode,
writableObjectMode: options.writableObjectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = iteratee(value, chunk, encoding);
isPromise = result instanceof Promise;
value = await result;
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
flush(callback) {
// Best effort attempt at yielding the final value (will throw if e.g. yielding an object and
// downstream doesn't expect objects)
try {
callback(undefined, value);
} catch (err) {
try {
this.emit("error", err);
} catch {
// Best effort was made
}
}
},
});
}
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator Separator to split by, defaulting to "\n"
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function split(
separator: string | RegExp = "\n",
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let buffered = "";
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
const splitted = asString.split(separator);
if (splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
buffered += splitted[splitted.length - 1];
splitted.slice(0, -1).forEach((part: string) => this.push(part));
callback();
},
flush(callback) {
callback(undefined, buffered + decoder.end());
},
});
}
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function join(
separator: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let isFirstChunk = true;
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(asString);
isFirstChunk = false;
}
callback();
},
});
}
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
callback(
undefined,
asString.replace(searchValue, replaceValue),
);
} else {
callback();
}
},
});
}
/**
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string.
*/
export function parse(): NodeJS.ReadWriteStream {
const decoder = new StringDecoder("utf8"); // JSON must be utf8
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
try {
const asString = decoder.write(chunk);
// Using await causes parsing errors to be emitted
callback(undefined, await JSON.parse(asString));
} catch (err) {
callback(err);
}
},
});
}
type JsonPrimitive = string | number | object;
type JsonValue = JsonPrimitive | JsonPrimitive[];
interface JsonParseOptions {
pretty: boolean;
}
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
*/
export function stringify(
options: JsonParseOptions = { pretty: false },
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: JsonValue, encoding, callback) {
callback(
undefined,
options.pretty
? JSON.stringify(chunk, null, 2)
: JSON.stringify(chunk),
);
},
});
}
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function collect(
options: ThroughOptions = { objectMode: false },
): NodeJS.ReadWriteStream {
const collected: any[] = [];
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
transform(data, encoding, callback) {
collected.push(data);
callback();
},
flush(callback) {
this.push(
options.objectMode ? collected : Buffer.concat(collected),
);
callback();
},
});
}
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
export function concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let currentStreamIndex = 0;
const startCurrentStream = () => {
if (currentStreamIndex >= streams.length) {
wrapper.push(null);
} else {
streams[currentStreamIndex]
.on("data", chunk => {
if (!wrapper.push(chunk)) {
streams[currentStreamIndex].pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => {
currentStreamIndex++;
startCurrentStream();
});
}
};
const wrapper = new Readable({
objectMode: true,
read() {
if (!isStarted) {
isStarted = true;
startCurrentStream();
}
if (currentStreamIndex < streams.length) {
streams[currentStreamIndex].resume();
}
},
});
return wrapper;
}
/**
* Return a Readable stream of readable streams merged together in chunk arrival order
* @param streams Readable streams to merge
*/
export function merge(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let streamEndedCount = 0;
return new Readable({
objectMode: true,
read() {
if (streamEndedCount >= streams.length) {
this.push(null);
} else if (!isStarted) {
isStarted = true;
streams.forEach(stream =>
stream
.on("data", chunk => {
if (!this.push(chunk)) {
streams.forEach(s => s.pause());
}
})
.on("error", err => this.emit("error", err))
.on("end", () => {
streamEndedCount++;
if (streamEndedCount === streams.length) {
this.push(null);
}
}),
);
} else {
streams.forEach(s => s.resume());
}
},
});
}
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
export function duplex(writable: Writable, readable: Readable) {
const wrapper = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
read() {
readable.resume();
},
write(chunk, encoding, callback) {
return writable.write(chunk, encoding, callback);
},
final(callback) {
writable.end(callback);
},
});
readable
.on("data", chunk => {
if (!wrapper.push(chunk)) {
readable.pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => wrapper.push(null));
writable.on("drain", () => wrapper.emit("drain"));
writable.on("error", err => wrapper.emit("error", err));
return wrapper;
}
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
export function child(childProcess: ChildProcess) {
return duplex(childProcess.stdin, childProcess.stdout);
}
/**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended
* @param readable Readable stream to wait on
*/
export function last<T>(readable: Readable): Promise<T | null> {
let lastChunk: T | null = null;
return new Promise((resolve, reject) => {
readable
.on("data", chunk => (lastChunk = chunk))
.on("end", () => resolve(lastChunk));
});
}
/**
* Stores chunks of data internally in array and batches when batchSize is reached.
*
* @param batchSize Size of the batches
*/
export function batch(batchSize: number) {
const buffer: any[] = [];
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
if (buffer.length === batchSize - 1) {
buffer.push(chunk);
callback(undefined, buffer.splice(0));
} else {
buffer.push(chunk);
callback();
}
},
flush(callback) {
callback(undefined, buffer.splice(0));
},
});
}
/**
* Unbatches and sends individual chunks of data
*/
export function unbatch() {
return new Transform({
objectMode: true,
transform(data, encoding, callback) {
for (const d of data) {
this.push(d);
}
callback();
},
});
}
/**
* Limits date of data transferred into stream.
* @param rate Desired rate in ms
*/
export function rate(targetRate: number) {
const deltaMS = (1 / targetRate) * 1000;
let total = 0;
const start = performance.now();
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
const currentRate = (total / (performance.now() - start)) * 1000;
if (targetRate && currentRate > targetRate) {
await sleep(deltaMS);
}
total += 1;
callback(undefined, data);
},
});
}
/**
* Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk
*/
export function parallelMap<T, R>(parallel: number, func: (data: T) => R) {
let inflight = 0;
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
while (parallel <= inflight) {
await sleep(5);
}
inflight += 1;
callback();
try {
const res = await func(data);
this.push(res);
} catch (e) {
this.emit(e);
} finally {
inflight -= 1;
}
},
async flush(callback) {
while (inflight > 0) {
await sleep(5);
}
callback();
},
});
}
export {
fromArray,
map,
flatMap,
filter,
reduce,
split,
join,
replace,
parse,
stringify,
collect,
concat,
merge,
duplex,
child,
last,
batch,
unbatch,
rate,
parallelMap,
} from "./functions";