This commit is contained in:
Jerry Kurian
2019-08-15 11:54:50 -04:00
parent 3a1fbf44d7
commit a40b1bf38c
38 changed files with 1981 additions and 2616 deletions

View File

@@ -0,0 +1,323 @@
import test from "ava";
import { expect } from "chai";
import { Readable } from "stream";
import { accumulator, accumulatorBy } from ".";
import { FlushStrategy } from "./definitions";
test.cb("accumulator() rolling", t => {
t.plan(3);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const secondFlush = [{ ts: 2, key: "d" }, { ts: 3, key: "e" }];
const thirdFlush = [{ ts: 4, key: "f" }];
const flushes = [firstFlush, secondFlush, thirdFlush];
source
.pipe(accumulator(2, undefined, FlushStrategy.rolling))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
[...firstFlush, ...secondFlush, ...thirdFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulator() rolling with key", t => {
t.plan(2);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 2, key: "d" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(accumulator(3, undefined, FlushStrategy.rolling, "ts"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
[...firstFlush, ...secondFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulatorBy() rolling", t => {
t.plan(2);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const firstFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 2, key: "d" },
];
const secondFlush = [{ ts: 3, key: "e" }];
const flushes = [firstFlush, secondFlush];
source
.pipe(
accumulatorBy(
undefined,
FlushStrategy.rolling,
(event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts;
},
),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
[...firstFlush, ...secondFlush].forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulator() sliding", t => {
t.plan(4);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 4, key: "d" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 4, key: "d" },
];
const flushes = [firstFlush, secondFlush, thirdFlush, fourthFlush];
source
.pipe(accumulator(3, undefined, FlushStrategy.sliding))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulator() sliding with key", t => {
t.plan(6);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
{ ts: 5, key: "f" },
{ ts: 6, key: "g" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
];
const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fifthFlush,
sixthFlush,
];
source
.pipe(accumulator(3, undefined, FlushStrategy.sliding, "ts"))
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb("accumulatorBy() sliding", t => {
t.plan(6);
let chunkIndex = 0;
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
{ ts: 5, key: "f" },
{ ts: 6, key: "g" },
];
const firstFlush = [{ ts: 0, key: "a" }];
const secondFlush = [{ ts: 0, key: "a" }, { ts: 1, key: "b" }];
const thirdFlush = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
];
const fourthFlush = [
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
];
const fifthFlush = [{ ts: 3, key: "d" }, { ts: 5, key: "f" }];
const sixthFlush = [{ ts: 5, key: "f" }, { ts: 6, key: "g" }];
const flushes = [
firstFlush,
secondFlush,
thirdFlush,
fourthFlush,
fifthFlush,
sixthFlush,
];
source
.pipe(
accumulatorBy(
undefined,
FlushStrategy.sliding,
(event: TestObject, bufferChunk: TestObject) => {
return bufferChunk.ts + 3 <= event.ts ? true : false;
},
),
)
.on("data", (flush: TestObject[]) => {
t.deepEqual(flush, flushes[chunkIndex]);
chunkIndex++;
})
.on("error", (e: any) => {
t.end(e);
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
});
test.cb.only("accumulatorBy() sliding should throw", t => {
t.plan(2);
interface TestObject {
ts: number;
key: string;
}
const source = new Readable({ objectMode: true });
const input = [
{ ts: 0, key: "a" },
{ ts: 1, key: "b" },
{ ts: 2, key: "c" },
{ ts: 3, key: "d" },
];
const accumulaterStream = accumulatorBy(
undefined,
FlushStrategy.sliding,
(event: TestObject, bufferChunk: TestObject) => {
if (event.key !== "a" && event.key !== "b") {
throw new Error("Failed mapping");
}
return bufferChunk.ts + 3 <= event.ts ? true : false;
},
);
source
.pipe(accumulaterStream)
.on("error", (err: any) => {
source.pipe(accumulaterStream);
accumulaterStream.resume();
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", () => {
t.end();
});
input.forEach(item => {
source.push(item);
});
source.push(null);
});

View File

@@ -0,0 +1,6 @@
export enum FlushStrategy {
rolling = "rolling",
sliding = "sliding",
}
export type AccumulatorByIteratee<T> = (event: T, bufferChunk: T) => boolean;

View File

@@ -0,0 +1,169 @@
import { Transform } from "stream";
import { AccumulatorByIteratee, FlushStrategy } from "./definitions";
import { batch } from "../../index";
function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
shouldFlush: boolean = true,
) {
const buffer: T[] = [];
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(data: any, encoding, callback) {
try {
accumulateBy(data, buffer, this);
callback();
} catch (err) {
callback(err);
}
},
flush(callback) {
if (shouldFlush) {
this.push(buffer);
}
callback();
},
});
}
function _sliding<T>(
windowLength: number,
rate: number | undefined,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (key) {
let index = 0;
while (
index < buffer.length &&
buffer[index][key] + windowLength <= event[key]
) {
index++;
}
buffer.splice(0, index);
} else if (buffer.length === windowLength) {
buffer.shift();
}
buffer.push(event);
stream.push(buffer);
};
}
function _slidingByFunction<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
let index = 0;
while (index < buffer.length && iteratee(event, buffer[index])) {
index++;
}
buffer.splice(0, index);
buffer.push(event);
stream.push(buffer);
};
}
function _rollingByFunction<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (iteratee) {
if (buffer.length > 0 && iteratee(event, buffer[0])) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
}
buffer.push(event);
};
}
function _rolling<T>(
windowLength: number,
rate: number | undefined,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (key) {
if (event[key] === undefined) {
stream.emit(
"error",
new Error(
`Key is missing in event: (${key}, ${JSON.stringify(
event,
)})`,
),
);
} else if (
buffer.length > 0 &&
buffer[0][key] + windowLength <= event[key]
) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
} else if (buffer.length === windowLength) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
buffer.push(event);
};
}
export function accumulator(
batchSize: number,
batchRate: number | undefined,
flushStrategy: FlushStrategy,
keyBy?: string,
): Transform {
if (flushStrategy === FlushStrategy.sliding) {
return sliding(batchSize, batchRate, keyBy);
} else if (flushStrategy === FlushStrategy.rolling) {
return rolling(batchSize, batchRate, keyBy);
} else {
return batch(batchSize, batchRate);
}
}
export function accumulatorBy<T, S extends FlushStrategy>(
batchRate: number | undefined,
flushStrategy: S,
iteratee: AccumulatorByIteratee<T>,
): Transform {
if (flushStrategy === FlushStrategy.sliding) {
return slidingBy(batchRate, iteratee);
} else {
return rollingBy(batchRate, iteratee);
}
}
function sliding(
windowLength: number,
rate: number | undefined,
key?: string,
): Transform {
return _accumulator(_sliding(windowLength, rate, key), false);
}
function slidingBy<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): Transform {
return _accumulator(_slidingByFunction(rate, iteratee), false);
}
function rolling(
windowLength: number,
rate: number | undefined,
key?: string,
): Transform {
return _accumulator(_rolling(windowLength, rate, key));
}
function rollingBy<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): Transform {
return _accumulator(_rollingByFunction(rate, iteratee));
}

View File

@@ -0,0 +1,20 @@
export { accumulator, accumulatorBy } from "./accumulator";
export { batch } from "./batch";
export { child } from "./child";
export { collect } from "./collect";
export { concat } from "./concat";
export { duplex } from "./duplex";
export { filter } from "./filter";
export { flatMap } from "./flatMap";
export { fromArray } from "./fromArray";
export { join } from "./join";
export { last } from "./last";
export { map } from "./map";
export { merge } from "./merge";
export { parallelMap } from "./parallelMap";
export { parse } from "./parse";
export { rate } from "./rate";
export { reduce } from "./reduce";
export { split } from "./split";
export { stringify } from "./stringify";
export { unbatch } from "./unbatch";

View File

@@ -0,0 +1,47 @@
import { Transform } from "stream";
import { TransformOptions } from "../definitions";
/**
* Stores chunks of data internally in array and batches when batchSize is reached.
*
* @param batchSize Size of the batches
* @param maxBatchAge Max lifetime of a batch
*/
export function batch(
batchSize: number = 1000,
maxBatchAge: number = 500,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): Transform {
let buffer: any[] = [];
let timer: NodeJS.Timer | null = null;
const sendChunk = (self: Transform) => {
if (timer) {
clearTimeout(timer);
}
timer = null;
self.push(buffer);
buffer = [];
};
return new Transform({
...options,
transform(chunk, encoding, callback) {
buffer.push(chunk);
if (buffer.length === batchSize) {
sendChunk(this);
} else {
if (timer === null) {
timer = setInterval(() => {
sendChunk(this);
}, maxBatchAge);
}
}
callback();
},
flush(callback) {
sendChunk(this);
callback();
},
});
}

View File

@@ -0,0 +1,12 @@
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
export function child(childProcess: ChildProcess) {
if (childProcess.stdin === null) {
throw new Error("childProcess.stdin is null");
} else if (childProcess.stdout === null) {
throw new Error("childProcess.stdout is null");
}
return duplex(childProcess.stdin, childProcess.stdout);
}

View File

@@ -0,0 +1,26 @@
import { Transform } from "stream";
import { ThroughOptions } from "../definitions";
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function collect(
options: ThroughOptions = { objectMode: false },
): NodeJS.ReadWriteStream {
const collected: any[] = [];
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
transform(data, encoding, callback) {
collected.push(data);
callback();
},
flush(callback) {
this.push(
options.objectMode ? collected : Buffer.concat(collected),
);
callback();
},
});
}

View File

@@ -0,0 +1,40 @@
import { Readable } from "stream";
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
export function concat(...streams: Readable[]): Readable {
let isStarted = false;
let currentStreamIndex = 0;
const startCurrentStream = () => {
if (currentStreamIndex >= streams.length) {
wrapper.push(null);
} else {
streams[currentStreamIndex]
.on("data", chunk => {
if (!wrapper.push(chunk)) {
streams[currentStreamIndex].pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => {
currentStreamIndex++;
startCurrentStream();
});
}
};
const wrapper = new Readable({
objectMode: true,
read() {
if (!isStarted) {
isStarted = true;
startCurrentStream();
}
if (currentStreamIndex < streams.length) {
streams[currentStreamIndex].resume();
}
},
});
return wrapper;
}

View File

@@ -0,0 +1,33 @@
import { Duplex, Writable, Readable } from "stream";
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
export function duplex(writable: Writable, readable: Readable) {
const wrapper = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
read() {
readable.resume();
},
write(chunk, encoding, callback) {
return writable.write(chunk, encoding, callback);
},
final(callback) {
writable.end(callback);
},
});
readable
.on("data", chunk => {
if (!wrapper.push(chunk)) {
readable.pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => wrapper.push(null));
writable.on("drain", () => wrapper.emit("drain"));
writable.on("error", err => wrapper.emit("error", err));
return wrapper;
}

View File

@@ -0,0 +1,102 @@
import test from "ava";
import { expect } from "chai";
import { Readable } from "stream";
import { filter } from ".";
test.cb("filter() filters elements synchronously", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "c"];
let i = 0;
source
.pipe(filter((element: string) => element !== "b"))
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("filter() filters elements asynchronously", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "c"];
let i = 0;
source
.pipe(
filter(async (element: string) => {
await Promise.resolve();
return element !== "b";
}),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("filter() emits errors during synchronous filtering", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
filter((element: string) => {
if (element !== "a") {
throw new Error("Failed filtering");
}
return true;
}),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed filtering");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("filter() emits errors during asynchronous filtering", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
filter(async (element: string) => {
await Promise.resolve();
if (element !== "a") {
throw new Error("Failed filtering");
}
return true;
}),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed filtering");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});

View File

@@ -0,0 +1,41 @@
import { Transform } from "stream";
import { ThroughOptions } from "../definitions";
/**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function filter<T>(
predicate:
| ((chunk: T, encoding: string) => boolean)
| ((chunk: T, encoding: string) => Promise<boolean>),
options: ThroughOptions = {
objectMode: true,
},
) {
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = predicate(chunk, encoding);
isPromise = result instanceof Promise;
if (!!(await result)) {
callback(undefined, chunk);
} else {
callback();
}
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}

View File

@@ -0,0 +1,100 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import { flatMap } from ".";
test.cb("flatMap() maps elements synchronously", t => {
t.plan(6);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "A", "b", "B", "c", "C"];
let i = 0;
source
.pipe(flatMap((element: string) => [element, element.toUpperCase()]))
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("flatMap() maps elements asynchronously", t => {
t.plan(6);
const source = new Readable({ objectMode: true });
const expectedElements = ["a", "A", "b", "B", "c", "C"];
let i = 0;
source
.pipe(
flatMap(async (element: string) => {
await Promise.resolve();
return [element, element.toUpperCase()];
}),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("flatMap() emits errors during synchronous mapping", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
flatMap((element: string) => {
if (element !== "a") {
throw new Error("Failed mapping");
}
return [element, element.toUpperCase()];
}),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("flatMap() emits errors during asynchronous mapping", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
flatMap(async (element: string) => {
await Promise.resolve();
if (element !== "a") {
throw new Error("Failed mapping");
}
return [element, element.toUpperCase()];
}),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});

View File

@@ -0,0 +1,39 @@
import { Transform } from "stream";
import { TransformOptions } from "../definitions";
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
(await mapped).forEach(c => this.push(c));
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}

View File

@@ -0,0 +1,45 @@
import test from "ava";
import { expect } from "chai";
import { fromArray } from ".";
test.cb("fromArray() streams array elements in flowing mode", t => {
t.plan(3);
const elements = ["a", "b", "c"];
const stream = fromArray(elements);
let i = 0;
stream
.on("data", (element: string) => {
expect(element).to.equal(elements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
});
test.cb("fromArray() ends immediately if there are no array elements", t => {
t.plan(0);
fromArray([])
.on("data", () => t.fail())
.on("error", t.end)
.on("end", t.end);
});
test.cb("fromArray() streams array elements in paused mode", t => {
t.plan(3);
const elements = ["a", "b", "c"];
const stream = fromArray(elements);
let i = 0;
stream
.on("readable", () => {
let element = stream.read();
while (element !== null) {
expect(element).to.equal(elements[i]);
t.pass();
i++;
element = stream.read();
}
})
.on("error", t.end)
.on("end", t.end);
});

View File

@@ -0,0 +1,19 @@
import { Readable } from "stream";
/**
* Convert an array into a Readable stream of its elements
* @param array Array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
let cursor = 0;
return new Readable({
objectMode: true,
read() {
if (cursor < array.length) {
this.push(array[cursor]);
cursor++;
} else {
this.push(null);
}
},
});
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,765 +0,0 @@
import { Transform, Readable, Writable, Duplex } from "stream";
import { performance } from "perf_hooks";
import { ChildProcess } from "child_process";
import { StringDecoder } from "string_decoder";
import {
TransformOptions,
ThroughOptions,
WithEncoding,
SerializationFormats,
JsonValue,
JsonParseOptions,
FlushStrategy,
AccumulatorByIteratee,
} from "./definitions";
import { sleep } from "../helpers";
/**
* Convert an array into a Readable stream of its elements
* @param array Array of elements to stream
*/
export function fromArray(array: any[]): NodeJS.ReadableStream {
let cursor = 0;
return new Readable({
objectMode: true,
read() {
if (cursor < array.length) {
this.push(array[cursor]);
cursor++;
} else {
this.push(null);
}
},
});
}
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>(
mapper: (chunk: T, encoding: string) => R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
try {
const mapped = await mapper(chunk, encoding);
this.push(mapped);
callback();
} catch (err) {
callback(err);
}
},
});
}
/**
* Return a ReadWrite stream that flat maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to an array of new chunks (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function flatMap<T, R>(
mapper:
| ((chunk: T, encoding: string) => R[])
| ((chunk: T, encoding: string) => Promise<R[]>),
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): NodeJS.ReadWriteStream {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const mapped = mapper(chunk, encoding);
isPromise = mapped instanceof Promise;
(await mapped).forEach(c => this.push(c));
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that filters out streamed chunks for which the predicate does not hold
* @param predicate Predicate with which to filter scream chunks
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function filter<T>(
predicate:
| ((chunk: T, encoding: string) => boolean)
| ((chunk: T, encoding: string) => Promise<boolean>),
options: ThroughOptions = {
objectMode: true,
},
) {
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = predicate(chunk, encoding);
isPromise = result instanceof Promise;
if (!!(await result)) {
callback(undefined, chunk);
} else {
callback();
}
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
});
}
/**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value
* @param iteratee Reducer function to apply on each streamed chunk
* @param initialValue Initial value
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
let value = initialValue;
return new Transform({
readableObjectMode: options.readableObjectMode,
writableObjectMode: options.writableObjectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = iteratee(value, chunk, encoding);
isPromise = result instanceof Promise;
value = await result;
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
flush(callback) {
// Best effort attempt at yielding the final value (will throw if e.g. yielding an object and
// downstream doesn't expect objects)
try {
callback(undefined, value);
} catch (err) {
try {
this.emit("error", err);
} catch {
// Best effort was made
}
}
},
});
}
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator Separator to split by, defaulting to "\n"
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function split(
separator: string | RegExp = "\n",
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let buffered = "";
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
const splitted = asString.split(separator);
if (splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
buffered += splitted[splitted.length - 1];
splitted.slice(0, -1).forEach((part: string) => this.push(part));
callback();
},
flush(callback) {
callback(undefined, buffered + decoder.end());
},
});
}
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function join(
separator: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let isFirstChunk = true;
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(asString);
isFirstChunk = false;
}
callback();
},
});
}
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
callback(
undefined,
asString.replace(searchValue, replaceValue),
);
} else {
callback();
}
},
});
}
/**
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string.
* @param format Format of serialized data, only utf8 supported.
*/
export function parse(
format: SerializationFormats = SerializationFormats.utf8,
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(format);
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
try {
const asString = decoder.write(chunk);
// Using await causes parsing errors to be emitted
callback(undefined, await JSON.parse(asString));
} catch (err) {
callback(err);
}
},
});
}
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
*/
export function stringify(
options: JsonParseOptions = { pretty: false },
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: JsonValue, encoding, callback) {
callback(
undefined,
options.pretty
? JSON.stringify(chunk, null, 2)
: JSON.stringify(chunk),
);
},
});
}
/**
* Return a ReadWrite stream that collects streamed chunks into an array or buffer
* @param options
* @param options.objectMode Whether this stream should behave as a stream of objects
*/
export function collect(
options: ThroughOptions = { objectMode: false },
): NodeJS.ReadWriteStream {
const collected: any[] = [];
return new Transform({
readableObjectMode: options.objectMode,
writableObjectMode: options.objectMode,
transform(data, encoding, callback) {
collected.push(data);
callback();
},
flush(callback) {
this.push(
options.objectMode ? collected : Buffer.concat(collected),
);
callback();
},
});
}
/**
* Return a Readable stream of readable streams concatenated together
* @param streams Readable streams to concatenate
*/
export function concat(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let currentStreamIndex = 0;
const startCurrentStream = () => {
if (currentStreamIndex >= streams.length) {
wrapper.push(null);
} else {
streams[currentStreamIndex]
.on("data", chunk => {
if (!wrapper.push(chunk)) {
streams[currentStreamIndex].pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => {
currentStreamIndex++;
startCurrentStream();
});
}
};
const wrapper = new Readable({
objectMode: true,
read() {
if (!isStarted) {
isStarted = true;
startCurrentStream();
}
if (currentStreamIndex < streams.length) {
streams[currentStreamIndex].resume();
}
},
});
return wrapper;
}
/**
* Return a Readable stream of readable streams merged together in chunk arrival order
* @param streams Readable streams to merge
*/
export function merge(
...streams: NodeJS.ReadableStream[]
): NodeJS.ReadableStream {
let isStarted = false;
let streamEndedCount = 0;
return new Readable({
objectMode: true,
read() {
if (streamEndedCount >= streams.length) {
this.push(null);
} else if (!isStarted) {
isStarted = true;
streams.forEach(stream =>
stream
.on("data", chunk => {
if (!this.push(chunk)) {
streams.forEach(s => s.pause());
}
})
.on("error", err => this.emit("error", err))
.on("end", () => {
streamEndedCount++;
if (streamEndedCount === streams.length) {
this.push(null);
}
}),
);
} else {
streams.forEach(s => s.resume());
}
},
});
}
/**
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
* cause the given readable stream to yield chunks
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
*/
export function duplex(writable: Writable, readable: Readable) {
const wrapper = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
read() {
readable.resume();
},
write(chunk, encoding, callback) {
return writable.write(chunk, encoding, callback);
},
final(callback) {
writable.end(callback);
},
});
readable
.on("data", chunk => {
if (!wrapper.push(chunk)) {
readable.pause();
}
})
.on("error", err => wrapper.emit("error", err))
.on("end", () => wrapper.push(null));
writable.on("drain", () => wrapper.emit("drain"));
writable.on("error", err => wrapper.emit("error", err));
return wrapper;
}
/**
* Return a Duplex stream from a child process' stdin and stdout
* @param childProcess Child process from which to create duplex stream
*/
export function child(childProcess: ChildProcess) {
return duplex(childProcess.stdin, childProcess.stdout);
}
/**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended
* @param readable Readable stream to wait on
*/
export function last<T>(readable: Readable): Promise<T | null> {
let lastChunk: T | null = null;
return new Promise((resolve, reject) => {
readable
.on("data", chunk => (lastChunk = chunk))
.on("end", () => resolve(lastChunk));
});
}
/**
* Stores chunks of data internally in array and batches when batchSize is reached.
*
* @param batchSize Size of the batches
* @param maxBatchAge Max lifetime of a batch
*/
export function batch(batchSize: number = 1000, maxBatchAge: number = 500) {
let buffer: any[] = [];
let timer: NodeJS.Timer | null = null;
let sendChunk = (self: Transform) => {
timer && clearTimeout(timer);
timer = null;
self.push(buffer);
buffer = [];
};
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
buffer.push(chunk);
if (buffer.length === batchSize) {
sendChunk(this);
} else {
if (timer === null) {
timer = setInterval(() => {
sendChunk(this);
}, maxBatchAge);
}
}
callback();
},
flush(callback) {
sendChunk(this);
callback();
},
});
}
/**
* Unbatches and sends individual chunks of data
*/
export function unbatch() {
return new Transform({
objectMode: true,
transform(data, encoding, callback) {
for (const d of data) {
this.push(d);
}
callback();
},
});
}
/**
* Limits date of data transferred into stream.
* @param targetRate Desired rate in ms
* @param period Period to sleep for when rate is above or equal to targetRate
*/
export function rate(targetRate: number = 50, period: number = 2) {
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period
let total = 0;
const start = performance.now();
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
const currentRate = (total / (performance.now() - start)) * 1000;
if (targetRate && currentRate > targetRate) {
await sleep(deltaMS);
}
total += 1;
callback(undefined, data);
},
});
}
/**
* Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
*/
export function parallelMap<T, R>(
mapper: (data: T) => R,
parallel: number = 10,
sleepTime: number = 5,
) {
let inflight = 0;
return new Transform({
objectMode: true,
async transform(data, encoding, callback) {
while (parallel <= inflight) {
await sleep(sleepTime);
}
inflight += 1;
callback();
try {
const res = await mapper(data);
this.push(res);
} catch (e) {
this.emit(e);
} finally {
inflight -= 1;
}
},
async flush(callback) {
while (inflight > 0) {
await sleep(sleepTime);
}
callback();
},
});
}
function _accumulator<T>(
accumulateBy: (data: T, buffer: T[], stream: Transform) => void,
shouldFlush: boolean = true,
) {
const buffer: T[] = [];
return new Transform({
objectMode: true,
async transform(data: any, encoding, callback) {
accumulateBy(data, buffer, this);
callback();
},
flush(callback) {
if (shouldFlush) {
this.push(buffer);
}
callback();
},
});
}
function _sliding<T>(
windowLength: number,
rate: number | undefined,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (key) {
let index = 0;
while (
index < buffer.length &&
buffer[index][key] + windowLength <= event[key]
) {
index++;
}
buffer.splice(0, index);
} else if (buffer.length === windowLength) {
buffer.shift();
}
buffer.push(event);
stream.push(buffer);
};
}
function _slidingByFunction<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
let index = 0;
while (index < buffer.length && iteratee(event, buffer[index])) {
index++;
}
buffer.splice(0, index);
buffer.push(event);
stream.push(buffer);
};
}
function _rollingByFunction<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (iteratee) {
if (buffer.length > 0 && iteratee(event, buffer[0])) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
}
buffer.push(event);
};
}
function _rolling<T>(
windowLength: number,
rate: number | undefined,
key?: string,
): (event: T, buffer: T[], stream: Transform) => void {
return (event: T, buffer: T[], stream: Transform) => {
if (key) {
if (event[key] === undefined) {
stream.emit(
"error",
new Error(
`Key is missing in event: (${key}, ${JSON.stringify(
event,
)})`,
),
);
} else if (
buffer.length > 0 &&
buffer[0][key] + windowLength <= event[key]
) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
} else if (buffer.length === windowLength) {
stream.push(buffer.slice(0));
buffer.length = 0;
}
buffer.push(event);
};
}
export function accumulator(
batchSize: number,
batchRate: number | undefined,
flushStrategy: FlushStrategy,
keyBy?: string,
): Transform {
if (flushStrategy === FlushStrategy.sliding) {
return sliding(batchSize, batchRate, keyBy);
} else if (flushStrategy === FlushStrategy.rolling) {
return rolling(batchSize, batchRate, keyBy);
} else {
return batch(batchSize, batchRate);
}
}
export function accumulatorBy<T, S extends FlushStrategy>(
batchRate: number | undefined,
flushStrategy: S,
iteratee: AccumulatorByIteratee<T>,
): Transform {
if (flushStrategy === FlushStrategy.sliding) {
return slidingBy(batchRate, iteratee);
} else {
return rollingBy(batchRate, iteratee);
}
}
export function sliding(
windowLength: number,
rate: number | undefined,
key?: string,
): Transform {
return _accumulator(_sliding(windowLength, rate, key), false);
}
export function slidingBy<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): Transform {
return _accumulator(_slidingByFunction(rate, iteratee), false);
}
export function rolling(
windowLength: number,
rate: number | undefined,
key?: string,
): Transform {
return _accumulator(_rolling(windowLength, rate, key));
}
export function rollingBy<T>(
rate: number | undefined,
iteratee: AccumulatorByIteratee<T>,
): Transform {
return _accumulator(_rollingByFunction(rate, iteratee));
}

View File

@@ -1,6 +1,6 @@
import { Readable, Writable } from "stream";
import { Readable, Writable, Transform } from "stream";
import { ChildProcess } from "child_process";
import * as baseFunctions from "./functions";
import * as baseFunctions from "./baseFunctions";
import {
ThroughOptions,
@@ -29,7 +29,7 @@ export function fromArray(array: any[]): NodeJS.ReadableStream {
export function map<T, R>(
mapper: (chunk: T, encoding?: string) => R,
options?: TransformOptions,
): NodeJS.ReadWriteStream {
): Transform {
return baseFunctions.map(mapper, options);
}
@@ -207,10 +207,7 @@ export function last<T>(readable: Readable): Promise<T | null> {
* @param batchSize Size of the batches, defaults to 1000.
* @param maxBatchAge? Max lifetime of a batch, defaults to 500
*/
export function batch(
batchSize: number,
maxBatchAge?: number,
): NodeJS.ReadWriteStream {
export function batch(batchSize: number, maxBatchAge?: number): Transform {
return baseFunctions.batch(batchSize, maxBatchAge);
}

View File

@@ -0,0 +1,31 @@
import { Transform } from "stream";
import { StringDecoder } from "string_decoder";
import { WithEncoding } from "../definitions";
/**
* Return a ReadWrite stream that joins streamed chunks using the given separator
* @param separator Separator to join with
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function join(
separator: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let isFirstChunk = true;
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
if (!isFirstChunk) {
this.push(separator);
}
this.push(asString);
isFirstChunk = false;
}
callback();
},
});
}

View File

@@ -0,0 +1,56 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import { join } from ".";
test.cb("join() joins chunks using the specified separator", t => {
t.plan(9);
const source = new Readable({ objectMode: true });
const expectedParts = ["ab|", "|", "c|d", "|", "|", "|", "e", "|", "|f|"];
let i = 0;
source
.pipe(join("|"))
.on("data", part => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("ab|");
source.push("c|d");
source.push("|");
source.push("e");
source.push("|f|");
source.push(null);
});
test.cb(
"join() joins chunks using the specified separator without breaking up multi-byte characters " +
"spanning multiple chunks",
t => {
t.plan(5);
const source = new Readable({ objectMode: true });
const expectedParts = ["ø", "|", "ö", "|", "一"];
let i = 0;
source
.pipe(join("|"))
.on("data", part => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ø").slice(1, 2));
source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ö").slice(1, 2));
source.push(Buffer.from("一").slice(0, 1)); // 3-byte character spanning three chunks
source.push(Buffer.from("一").slice(1, 2));
source.push(Buffer.from("一").slice(2, 3));
source.push(null);
},
);

View File

@@ -0,0 +1,14 @@
import { Readable } from "stream";
/**
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
* ended
* @param readable Readable stream to wait on
*/
export function last<T>(readable: Readable): Promise<T | null> {
let lastChunk: T | null = null;
return new Promise((resolve, _) => {
readable
.on("data", chunk => (lastChunk = chunk))
.on("end", () => resolve(lastChunk));
});
}

View File

@@ -0,0 +1,29 @@
import { Transform } from "stream";
import { TransformOptions } from "../definitions";
/**
* Return a ReadWrite stream that maps streamed chunks
* @param mapper Mapper function, mapping each (chunk, encoding) to a new chunk (or a promise of such)
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function map<T, R>(
mapper: (chunk: T, encoding: string) => R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
): Transform {
return new Transform({
...options,
async transform(chunk: T, encoding, callback) {
try {
const mapped = await mapper(chunk, encoding);
this.push(mapped);
callback();
} catch (err) {
callback(err);
}
},
});
}

View File

@@ -0,0 +1,107 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import { map } from ".";
test.cb("map() maps elements synchronously", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["A", "B", "C"];
let i = 0;
source
.pipe(map((element: string) => element.toUpperCase()))
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("map() maps elements asynchronously", t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["A", "B", "C"];
let i = 0;
source
.pipe(
map(async (element: string) => {
await Promise.resolve();
return element.toUpperCase();
}),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test.cb("map() emits errors during synchronous mapping", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
map((element: string) => {
if (element !== "a") {
throw new Error("Failed mapping");
}
return element.toUpperCase();
}),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
test("map() emits errors during asynchronous mapping", t => {
t.plan(1);
return new Promise((resolve, reject) => {
const source = new Readable({ objectMode: true });
source
.pipe(
map(async (element: string) => {
await Promise.resolve();
if (element !== "a") {
throw new Error("Failed mapping");
}
return element.toUpperCase();
}),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
resolve();
})
.on("end", () => {
t.fail();
});
source.push("a");
source.push("b");
source.push("c");
source.push(null);
});
});

View File

@@ -0,0 +1,36 @@
import { Readable } from "stream";
/**
* Return a Readable stream of readable streams merged together in chunk arrival order
* @param streams Readable streams to merge
*/
export function merge(...streams: Readable[]): Readable {
let isStarted = false;
let streamEndedCount = 0;
return new Readable({
objectMode: true,
read() {
if (streamEndedCount >= streams.length) {
this.push(null);
} else if (!isStarted) {
isStarted = true;
streams.forEach(stream =>
stream
.on("data", chunk => {
if (!this.push(chunk)) {
streams.forEach(s => s.pause());
}
})
.on("error", err => this.emit("error", err))
.on("end", () => {
streamEndedCount++;
if (streamEndedCount === streams.length) {
this.push(null);
}
}),
);
} else {
streams.forEach(s => s.resume());
}
},
});
}

View File

@@ -0,0 +1,44 @@
import { Transform } from "stream";
import { sleep } from "../../helpers";
import { TransformOptions } from "../definitions";
/**
* Limits number of parallel processes in flight.
* @param parallel Max number of parallel processes.
* @param func Function to execute on each data chunk
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
*/
export function parallelMap<T, R>(
mapper: (data: T) => R,
parallel: number = 10,
sleepTime: number = 5,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
let inflight = 0;
return new Transform({
...options,
async transform(data, encoding, callback) {
while (parallel <= inflight) {
await sleep(sleepTime);
}
inflight += 1;
callback();
try {
const res = await mapper(data);
this.push(res);
} catch (e) {
this.emit(e);
} finally {
inflight -= 1;
}
},
async flush(callback) {
while (inflight > 0) {
await sleep(sleepTime);
}
callback();
},
});
}

View File

@@ -0,0 +1,26 @@
import { Transform } from "stream";
import { StringDecoder } from "string_decoder";
import { SerializationFormats } from "../definitions";
/**
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
* must be a fully defined JSON string.
* @param format Format of serialized data, only utf8 supported.
*/
export function parse(
format: SerializationFormats = SerializationFormats.utf8,
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(format);
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
async transform(chunk: Buffer, encoding, callback) {
try {
const asString = decoder.write(chunk);
// Using await causes parsing errors to be emitted
callback(undefined, await JSON.parse(asString));
} catch (err) {
callback(err);
}
},
});
}

View File

@@ -0,0 +1,31 @@
import { Transform } from "stream";
import { performance } from "perf_hooks";
import { sleep } from "../../helpers";
import { TransformOptions } from "../definitions";
/**
* Limits date of data transferred into stream.
* @param targetRate Desired rate in ms
* @param period Period to sleep for when rate is above or equal to targetRate
*/
export function rate(
targetRate: number = 50,
period: number = 2,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period
let total = 0;
const start = performance.now();
return new Transform({
async transform(data, encoding, callback) {
const currentRate = (total / (performance.now() - start)) * 1000;
if (targetRate && currentRate > targetRate) {
await sleep(deltaMS);
}
total += 1;
callback(undefined, data);
},
});
}

View File

@@ -0,0 +1,57 @@
import { Transform } from "stream";
import { TransformOptions } from "../definitions";
/**
* Return a ReadWrite stream that reduces streamed chunks down to a single value and yield that
* value
* @param iteratee Reducer function to apply on each streamed chunk
* @param initialValue Initial value
* @param options
* @param options.readableObjectMode Whether this stream should behave as a readable stream of objects
* @param options.writableObjectMode Whether this stream should behave as a writable stream of objects
*/
export function reduce<T, R>(
iteratee:
| ((previousValue: R, chunk: T, encoding: string) => R)
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
initialValue: R,
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
let value = initialValue;
return new Transform({
readableObjectMode: options.readableObjectMode,
writableObjectMode: options.writableObjectMode,
async transform(chunk: T, encoding, callback) {
let isPromise = false;
try {
const result = iteratee(value, chunk, encoding);
isPromise = result instanceof Promise;
value = await result;
callback();
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback();
} else {
callback(err);
}
}
},
flush(callback) {
// Best effort attempt at yielding the final value (will throw if e.g. yielding an object and
// downstream doesn't expect objects)
try {
callback(undefined, value);
} catch (err) {
try {
this.emit("error", err);
} catch {
// Best effort was made
}
}
},
});
}

View File

@@ -0,0 +1,98 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import { reduce } from ".";
test.cb("reduce() reduces elements synchronously", t => {
t.plan(1);
const source = new Readable({ objectMode: true });
const expectedValue = 6;
source
.pipe(reduce((acc: number, element: string) => acc + element.length, 0))
.on("data", (element: string) => {
expect(element).to.equal(expectedValue);
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb("reduce() reduces elements asynchronously", t => {
t.plan(1);
const source = new Readable({ objectMode: true });
const expectedValue = 6;
source
.pipe(
reduce(async (acc: number, element: string) => {
await Promise.resolve();
return acc + element.length;
}, 0),
)
.on("data", (element: string) => {
expect(element).to.equal(expectedValue);
t.pass();
})
.on("error", t.end)
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb("reduce() emits errors during synchronous reduce", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
reduce((acc: number, element: string) => {
if (element !== "ab") {
throw new Error("Failed reduce");
}
return acc + element.length;
}, 0),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed reduce");
t.pass();
})
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});
test.cb("reduce() emits errors during asynchronous reduce", t => {
t.plan(2);
const source = new Readable({ objectMode: true });
source
.pipe(
reduce(async (acc: number, element: string) => {
await Promise.resolve();
if (element !== "ab") {
throw new Error("Failed mapping");
}
return acc + element.length;
}, 0),
)
.resume()
.on("error", err => {
expect(err.message).to.equal("Failed mapping");
t.pass();
})
.on("end", t.end);
source.push("ab");
source.push("cd");
source.push("ef");
source.push(null);
});

View File

@@ -0,0 +1,33 @@
import { Transform } from "stream";
import { StringDecoder } from "string_decoder";
import { WithEncoding } from "../definitions";
/**
* Return a ReadWrite stream that replaces occurrences of the given string or regular expression in
* the streamed chunks with the specified replacement string
* @param searchValue Search string to use
* @param replaceValue Replacement string to use
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function replace(
searchValue: string | RegExp,
replaceValue: string,
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
// Take care not to break up multi-byte characters spanning multiple chunks
if (asString !== "" || chunk.length === 0) {
callback(
undefined,
asString.replace(searchValue, replaceValue),
);
} else {
callback();
}
},
});
}

View File

@@ -0,0 +1,80 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import { replace } from ".";
test.cb(
"replace() replaces occurrences of the given string in the streamed elements with the specified " +
"replacement string",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["abc", "xyf", "ghi"];
let i = 0;
source
.pipe(replace("de", "xy"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push("def");
source.push("ghi");
source.push(null);
},
);
test.cb(
"replace() replaces occurrences of the given regular expression in the streamed elements with " +
"the specified replacement string",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["abc", "xyz", "ghi"];
let i = 0;
source
.pipe(replace(/^def$/, "xyz"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("abc");
source.push("def");
source.push("ghi");
source.push(null);
},
);
test.cb(
"replace() replaces occurrences of the given multi-byte character even if it spans multiple chunks",
t => {
t.plan(3);
const source = new Readable({ objectMode: true });
const expectedElements = ["ø", "O", "a"];
let i = 0;
source
.pipe(replace("ö", "O"))
.on("data", part => {
expect(part).to.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push(Buffer.from("ø").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ø").slice(1, 2));
source.push(Buffer.from("ö").slice(0, 1)); // 2-byte character spanning two chunks
source.push(Buffer.from("ö").slice(1, 2));
source.push("a");
source.push(null);
},
);

View File

@@ -0,0 +1,34 @@
import { Transform } from "stream";
import { StringDecoder } from "string_decoder";
import { WithEncoding } from "../definitions";
/**
* Return a ReadWrite stream that splits streamed chunks using the given separator
* @param separator Separator to split by, defaulting to "\n"
* @param options
* @param options.encoding Encoding written chunks are assumed to use
*/
export function split(
separator: string | RegExp = "\n",
options: WithEncoding = { encoding: "utf8" },
): NodeJS.ReadWriteStream {
let buffered = "";
const decoder = new StringDecoder(options.encoding);
return new Transform({
readableObjectMode: true,
transform(chunk: Buffer, encoding, callback) {
const asString = decoder.write(chunk);
const splitted = asString.split(separator);
if (splitted.length > 1) {
splitted[0] = buffered.concat(splitted[0]);
buffered = "";
}
buffered += splitted[splitted.length - 1];
splitted.slice(0, -1).forEach((part: string) => this.push(part));
callback();
},
flush(callback) {
callback(undefined, buffered + decoder.end());
},
});
}

View File

@@ -0,0 +1,98 @@
import { Readable } from "stream";
import test from "ava";
import { expect } from "chai";
import { split } from ".";
test.cb("split() splits chunks using the default separator (\\n)", t => {
t.plan(5);
const source = new Readable({ objectMode: true });
const expectedParts = ["ab", "c", "d", "ef", ""];
let i = 0;
source
.pipe(split())
.on("data", part => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("ab\n");
source.push("c");
source.push("\n");
source.push("d");
source.push("\nef\n");
source.push(null);
});
test.cb("split() splits chunks using the specified separator", t => {
t.plan(6);
const source = new Readable({ objectMode: true });
const expectedParts = ["ab", "c", "d", "e", "f", ""];
let i = 0;
source
.pipe(split("|"))
.on("data", (part: string) => {
expect(part).to.equal(expectedParts[i]);
t.pass();
i++;
})
.on("error", t.end)
.on("end", t.end);
source.push("ab|");
source.push("c|d");
source.push("|");
source.push("e");
source.push("|f|");
source.push(null);
});
test.cb(
"split() splits utf8 encoded buffers using the specified separator",
t => {
t.plan(3);
const expectedElements = ["a", "b", "c"];
let i = 0;
const through = split(",");
const buf = Buffer.from("a,b,c");
through
.on("data", element => {
expect(element).to.equal(expectedElements[i]);
i++;
t.pass();
})
.on("error", t.end)
.on("end", t.end);
for (let j = 0; j < buf.length; ++j) {
through.write(buf.slice(j, j + 1));
}
through.end();
},
);
test.cb(
"split() splits utf8 encoded buffers with multi-byte characters using the specified separator",
t => {
t.plan(3);
const expectedElements = ["一", "一", "一"];
let i = 0;
const through = split(",");
const buf = Buffer.from("一,一,一"); // Those spaces are multi-byte utf8 characters (code: 4E00)
through
.on("data", element => {
expect(element).to.equal(expectedElements[i]);
i++;
t.pass();
})
.on("error", t.end)
.on("end", t.end);
for (let j = 0; j < buf.length; ++j) {
through.write(buf.slice(j, j + 1));
}
through.end();
},
);

View File

@@ -0,0 +1,22 @@
import { Transform } from "stream";
import { JsonValue, JsonParseOptions } from "../definitions";
/**
* Return a ReadWrite stream that stringifies the streamed chunks to JSON
*/
export function stringify(
options: JsonParseOptions = { pretty: false },
): NodeJS.ReadWriteStream {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk: JsonValue, encoding, callback) {
callback(
undefined,
options.pretty
? JSON.stringify(chunk, null, 2)
: JSON.stringify(chunk),
);
},
});
}

View File

@@ -0,0 +1,21 @@
import { Transform } from "stream";
import { TransformOptions } from "../definitions";
/**
* Unbatches and sends individual chunks of data
*/
export function unbatch(
options: TransformOptions = {
readableObjectMode: true,
writableObjectMode: true,
},
) {
return new Transform({
...options,
transform(data, encoding, callback) {
for (const d of data) {
this.push(d);
}
callback();
},
});
}