Save
This commit is contained in:
parent
d6d974ee0d
commit
505fefeeb5
58
src/functions/batch/batch.spec.ts
Normal file
58
src/functions/batch/batch.spec.ts
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { batch } from ".";
|
||||||
|
|
||||||
|
test.cb("batch() batches chunks together", t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = [["a", "b", "c"], ["d", "e", "f"], ["g"]];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(batch(3))
|
||||||
|
.on("data", (element: string[]) => {
|
||||||
|
expect(element).to.deep.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push("d");
|
||||||
|
source.push("e");
|
||||||
|
source.push("f");
|
||||||
|
source.push("g");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("batch() yields a batch after the timeout", t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({
|
||||||
|
objectMode: true,
|
||||||
|
read(size: number) {},
|
||||||
|
});
|
||||||
|
const expectedElements = [["a", "b"], ["c"], ["d"]];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(batch(3))
|
||||||
|
.on("data", (element: string[]) => {
|
||||||
|
expect(element).to.deep.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.fail)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
setTimeout(() => {
|
||||||
|
source.push("c");
|
||||||
|
}, 600);
|
||||||
|
setTimeout(() => {
|
||||||
|
source.push("d");
|
||||||
|
source.push(null);
|
||||||
|
}, 600 * 2);
|
||||||
|
});
|
28
src/functions/child/child.spec.ts
Normal file
28
src/functions/child/child.spec.ts
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
import * as cp from "child_process";
|
||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { child } from ".";
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"child() allows easily writing to child process stdin and reading from its stdout",
|
||||||
|
t => {
|
||||||
|
t.plan(1);
|
||||||
|
const source = new Readable();
|
||||||
|
const catProcess = cp.exec("cat");
|
||||||
|
let out = "";
|
||||||
|
source
|
||||||
|
.pipe(child(catProcess))
|
||||||
|
.on("data", chunk => (out += chunk))
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", () => {
|
||||||
|
expect(out).to.equal("abcdef");
|
||||||
|
t.pass();
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
source.push("ab");
|
||||||
|
source.push("cd");
|
||||||
|
source.push("ef");
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
@ -1,3 +1,5 @@
|
|||||||
|
import { ChildProcess } from "child_process";
|
||||||
|
import { duplex } from "../baseFunctions";
|
||||||
/**
|
/**
|
||||||
* Return a Duplex stream from a child process' stdin and stdout
|
* Return a Duplex stream from a child process' stdin and stdout
|
||||||
* @param childProcess Child process from which to create duplex stream
|
* @param childProcess Child process from which to create duplex stream
|
||||||
|
132
src/functions/collect/collect.spec.ts
Normal file
132
src/functions/collect/collect.spec.ts
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { collect } from ".";
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"collect() collects streamed elements into an array (object, flowing mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(1);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
|
||||||
|
source
|
||||||
|
.pipe(collect({ objectMode: true }))
|
||||||
|
.on("data", collected => {
|
||||||
|
expect(collected).to.deep.equal(["a", "b", "c"]);
|
||||||
|
t.pass();
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"collect() collects streamed elements into an array (object, paused mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(1);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const collector = source.pipe(collect({ objectMode: true }));
|
||||||
|
|
||||||
|
collector
|
||||||
|
.on("readable", () => {
|
||||||
|
let collected = collector.read();
|
||||||
|
while (collected !== null) {
|
||||||
|
expect(collected).to.deep.equal(["a", "b", "c"]);
|
||||||
|
t.pass();
|
||||||
|
collected = collector.read();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"collect() collects streamed bytes into a buffer (non-object, flowing mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(1);
|
||||||
|
const source = new Readable({ objectMode: false });
|
||||||
|
|
||||||
|
source
|
||||||
|
.pipe(collect())
|
||||||
|
.on("data", collected => {
|
||||||
|
expect(collected).to.deep.equal(Buffer.from("abc"));
|
||||||
|
t.pass();
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"collect() collects streamed bytes into a buffer (non-object, paused mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(1);
|
||||||
|
const source = new Readable({ objectMode: false });
|
||||||
|
const collector = source.pipe(collect({ objectMode: false }));
|
||||||
|
collector
|
||||||
|
.on("readable", () => {
|
||||||
|
let collected = collector.read();
|
||||||
|
while (collected !== null) {
|
||||||
|
expect(collected).to.deep.equal(Buffer.from("abc"));
|
||||||
|
t.pass();
|
||||||
|
collected = collector.read();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"collect() emits an empty array if the source was empty (object mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(1);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const collector = source.pipe(collect({ objectMode: true }));
|
||||||
|
collector
|
||||||
|
.on("data", collected => {
|
||||||
|
expect(collected).to.deep.equal([]);
|
||||||
|
t.pass();
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"collect() emits nothing if the source was empty (non-object mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(0);
|
||||||
|
const source = new Readable({ objectMode: false });
|
||||||
|
const collector = source.pipe(collect({ objectMode: false }));
|
||||||
|
collector
|
||||||
|
.on("data", () => t.fail())
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
@ -7,7 +7,7 @@ import { ThroughOptions } from "../baseDefinitions";
|
|||||||
*/
|
*/
|
||||||
export function collect(
|
export function collect(
|
||||||
options: ThroughOptions = { objectMode: false },
|
options: ThroughOptions = { objectMode: false },
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
const collected: any[] = [];
|
const collected: any[] = [];
|
||||||
return new Transform({
|
return new Transform({
|
||||||
readableObjectMode: options.objectMode,
|
readableObjectMode: options.objectMode,
|
||||||
|
180
src/functions/concat/concat.spec.ts
Normal file
180
src/functions/concat/concat.spec.ts
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { concat, collect } from "../baseFunctions";
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"concat() concatenates multiple readable streams (object, flowing mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(6);
|
||||||
|
const source1 = new Readable({ objectMode: true });
|
||||||
|
const source2 = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||||
|
let i = 0;
|
||||||
|
concat(source1, source2)
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source1.push("a");
|
||||||
|
source2.push("d");
|
||||||
|
source1.push("b");
|
||||||
|
source2.push("e");
|
||||||
|
source1.push("c");
|
||||||
|
source2.push("f");
|
||||||
|
source2.push(null);
|
||||||
|
source1.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"concat() concatenates multiple readable streams (object, paused mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(6);
|
||||||
|
const source1 = new Readable({ objectMode: true });
|
||||||
|
const source2 = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||||
|
let i = 0;
|
||||||
|
const concatenation = concat(source1, source2)
|
||||||
|
.on("readable", () => {
|
||||||
|
let element = concatenation.read();
|
||||||
|
while (element !== null) {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
element = concatenation.read();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source1.push("a");
|
||||||
|
source2.push("d");
|
||||||
|
source1.push("b");
|
||||||
|
source2.push("e");
|
||||||
|
source1.push("c");
|
||||||
|
source2.push("f");
|
||||||
|
source2.push(null);
|
||||||
|
source1.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"concat() concatenates multiple readable streams (non-object, flowing mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(6);
|
||||||
|
const source1 = new Readable({ objectMode: false });
|
||||||
|
const source2 = new Readable({ objectMode: false });
|
||||||
|
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||||
|
let i = 0;
|
||||||
|
concat(source1, source2)
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.deep.equal(Buffer.from(expectedElements[i]));
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source1.push("a");
|
||||||
|
source2.push("d");
|
||||||
|
source1.push("b");
|
||||||
|
source2.push("e");
|
||||||
|
source1.push("c");
|
||||||
|
source2.push("f");
|
||||||
|
source2.push(null);
|
||||||
|
source1.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"concat() concatenates multiple readable streams (non-object, paused mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(6);
|
||||||
|
const source1 = new Readable({ objectMode: false, read: () => ({}) });
|
||||||
|
const source2 = new Readable({ objectMode: false, read: () => ({}) });
|
||||||
|
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||||
|
let i = 0;
|
||||||
|
const concatenation = concat(source1, source2)
|
||||||
|
.on("readable", () => {
|
||||||
|
let element = concatenation.read();
|
||||||
|
while (element !== null) {
|
||||||
|
expect(element).to.deep.equal(
|
||||||
|
Buffer.from(expectedElements[i]),
|
||||||
|
);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
element = concatenation.read();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source1.push("a");
|
||||||
|
setTimeout(() => source2.push("d"), 10);
|
||||||
|
setTimeout(() => source1.push("b"), 20);
|
||||||
|
setTimeout(() => source2.push("e"), 30);
|
||||||
|
setTimeout(() => source1.push("c"), 40);
|
||||||
|
setTimeout(() => source2.push("f"), 50);
|
||||||
|
setTimeout(() => source2.push(null), 60);
|
||||||
|
setTimeout(() => source1.push(null), 70);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb("concat() concatenates a single readable stream (object mode)", t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||||
|
let i = 0;
|
||||||
|
concat(source)
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"concat() concatenates a single readable stream (non-object mode)",
|
||||||
|
t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({ objectMode: false });
|
||||||
|
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||||
|
let i = 0;
|
||||||
|
concat(source)
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.deep.equal(Buffer.from(expectedElements[i]));
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb("concat() concatenates empty list of readable streams", t => {
|
||||||
|
t.plan(0);
|
||||||
|
concat()
|
||||||
|
.pipe(collect())
|
||||||
|
.on("data", _ => {
|
||||||
|
t.fail();
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
});
|
@ -3,7 +3,7 @@ import { Readable } from "stream";
|
|||||||
* Return a Readable stream of readable streams concatenated together
|
* Return a Readable stream of readable streams concatenated together
|
||||||
* @param streams Readable streams to concatenate
|
* @param streams Readable streams to concatenate
|
||||||
*/
|
*/
|
||||||
export function concat(...streams: Readable[]): Readable {
|
export function concat(...streams: NodeJS.ReadableStream[]): Readable {
|
||||||
let isStarted = false;
|
let isStarted = false;
|
||||||
let currentStreamIndex = 0;
|
let currentStreamIndex = 0;
|
||||||
const startCurrentStream = () => {
|
const startCurrentStream = () => {
|
||||||
|
28
src/functions/duplex/duplex.spec.ts
Normal file
28
src/functions/duplex/duplex.spec.ts
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
import * as cp from "child_process";
|
||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { duplex } from "../baseFunctions";
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"duplex() combines a writable and readable stream into a ReadWrite stream",
|
||||||
|
t => {
|
||||||
|
t.plan(1);
|
||||||
|
const source = new Readable();
|
||||||
|
const catProcess = cp.exec("cat");
|
||||||
|
let out = "";
|
||||||
|
source
|
||||||
|
.pipe(duplex(catProcess.stdin!, catProcess.stdout!))
|
||||||
|
.on("data", chunk => (out += chunk))
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", () => {
|
||||||
|
expect(out).to.equal("abcdef");
|
||||||
|
t.pass();
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
source.push("ab");
|
||||||
|
source.push("cd");
|
||||||
|
source.push("ef");
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
@ -1,11 +1,14 @@
|
|||||||
import { Duplex, Writable, Readable } from "stream";
|
import { Duplex } from "stream";
|
||||||
/**
|
/**
|
||||||
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
|
* Return a Duplex stream from a writable stream that is assumed to somehow, when written to,
|
||||||
* cause the given readable stream to yield chunks
|
* cause the given readable stream to yield chunks
|
||||||
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
|
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
|
||||||
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
|
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
|
||||||
*/
|
*/
|
||||||
export function duplex(writable: Writable, readable: Readable) {
|
export function duplex(
|
||||||
|
writable: NodeJS.WritableStream,
|
||||||
|
readable: NodeJS.ReadableStream,
|
||||||
|
) {
|
||||||
const wrapper = new Duplex({
|
const wrapper = new Duplex({
|
||||||
readableObjectMode: true,
|
readableObjectMode: true,
|
||||||
writableObjectMode: true,
|
writableObjectMode: true,
|
||||||
|
@ -15,7 +15,7 @@ export function flatMap<T, R>(
|
|||||||
readableObjectMode: true,
|
readableObjectMode: true,
|
||||||
writableObjectMode: true,
|
writableObjectMode: true,
|
||||||
},
|
},
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
return new Transform({
|
return new Transform({
|
||||||
...options,
|
...options,
|
||||||
async transform(chunk: T, encoding, callback) {
|
async transform(chunk: T, encoding, callback) {
|
||||||
|
@ -3,7 +3,7 @@ import { Readable } from "stream";
|
|||||||
* Convert an array into a Readable stream of its elements
|
* Convert an array into a Readable stream of its elements
|
||||||
* @param array Array of elements to stream
|
* @param array Array of elements to stream
|
||||||
*/
|
*/
|
||||||
export function fromArray(array: any[]): NodeJS.ReadableStream {
|
export function fromArray(array: any[]): Readable {
|
||||||
let cursor = 0;
|
let cursor = 0;
|
||||||
return new Readable({
|
return new Readable({
|
||||||
objectMode: true,
|
objectMode: true,
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import { Readable, Writable, Transform } from "stream";
|
import { Readable, Writable, Transform, Duplex } from "stream";
|
||||||
import { ChildProcess } from "child_process";
|
import { ChildProcess } from "child_process";
|
||||||
import * as baseFunctions from "./baseFunctions";
|
import * as baseFunctions from "./baseFunctions";
|
||||||
|
|
||||||
@ -7,15 +7,18 @@ import {
|
|||||||
TransformOptions,
|
TransformOptions,
|
||||||
WithEncoding,
|
WithEncoding,
|
||||||
JsonParseOptions,
|
JsonParseOptions,
|
||||||
|
} from "./baseDefinitions";
|
||||||
|
|
||||||
|
import {
|
||||||
FlushStrategy,
|
FlushStrategy,
|
||||||
AccumulatorByIteratee,
|
AccumulatorByIteratee,
|
||||||
} from "./definitions";
|
} from "./accumulator/definitions";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert an array into a Readable stream of its elements
|
* Convert an array into a Readable stream of its elements
|
||||||
* @param array Array of elements to stream
|
* @param array Array of elements to stream
|
||||||
*/
|
*/
|
||||||
export function fromArray(array: any[]): NodeJS.ReadableStream {
|
export function fromArray(array: any[]): Readable {
|
||||||
return baseFunctions.fromArray(array);
|
return baseFunctions.fromArray(array);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,7 +48,7 @@ export function flatMap<T, R>(
|
|||||||
| ((chunk: T, encoding: string) => R[])
|
| ((chunk: T, encoding: string) => R[])
|
||||||
| ((chunk: T, encoding: string) => Promise<R[]>),
|
| ((chunk: T, encoding: string) => Promise<R[]>),
|
||||||
options?: TransformOptions,
|
options?: TransformOptions,
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
return baseFunctions.flatMap(mapper, options);
|
return baseFunctions.flatMap(mapper, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,7 +63,7 @@ export function filter<T>(
|
|||||||
| ((chunk: T, encoding: string) => boolean)
|
| ((chunk: T, encoding: string) => boolean)
|
||||||
| ((chunk: T, encoding: string) => Promise<boolean>),
|
| ((chunk: T, encoding: string) => Promise<boolean>),
|
||||||
options?: ThroughOptions,
|
options?: ThroughOptions,
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
return baseFunctions.filter(mapper, options);
|
return baseFunctions.filter(mapper, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,7 +82,7 @@ export function reduce<T, R>(
|
|||||||
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
|
| ((previousValue: R, chunk: T, encoding: string) => Promise<R>),
|
||||||
initialValue: R,
|
initialValue: R,
|
||||||
options?: TransformOptions,
|
options?: TransformOptions,
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
return baseFunctions.reduce(iteratee, initialValue, options);
|
return baseFunctions.reduce(iteratee, initialValue, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,7 +95,7 @@ export function reduce<T, R>(
|
|||||||
export function split(
|
export function split(
|
||||||
separator?: string | RegExp,
|
separator?: string | RegExp,
|
||||||
options?: WithEncoding,
|
options?: WithEncoding,
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
return baseFunctions.split(separator, options);
|
return baseFunctions.split(separator, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,10 +105,7 @@ export function split(
|
|||||||
* @param options? Defaults to encoding: utf8
|
* @param options? Defaults to encoding: utf8
|
||||||
* @param options.encoding? Encoding written chunks are assumed to use
|
* @param options.encoding? Encoding written chunks are assumed to use
|
||||||
*/
|
*/
|
||||||
export function join(
|
export function join(separator: string, options?: WithEncoding): Transform {
|
||||||
separator: string,
|
|
||||||
options?: WithEncoding,
|
|
||||||
): NodeJS.ReadWriteStream {
|
|
||||||
return baseFunctions.join(separator, options);
|
return baseFunctions.join(separator, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,7 +121,7 @@ export function replace(
|
|||||||
searchValue: string | RegExp,
|
searchValue: string | RegExp,
|
||||||
replaceValue: string,
|
replaceValue: string,
|
||||||
options?: WithEncoding,
|
options?: WithEncoding,
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
return baseFunctions.replace(searchValue, replaceValue, options);
|
return baseFunctions.replace(searchValue, replaceValue, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,7 +129,7 @@ export function replace(
|
|||||||
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
|
* Return a ReadWrite stream that parses the streamed chunks as JSON. Each streamed chunk
|
||||||
* must be a fully defined JSON string in utf8.
|
* must be a fully defined JSON string in utf8.
|
||||||
*/
|
*/
|
||||||
export function parse(): NodeJS.ReadWriteStream {
|
export function parse(): Transform {
|
||||||
return baseFunctions.parse();
|
return baseFunctions.parse();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,7 +139,7 @@ export function parse(): NodeJS.ReadWriteStream {
|
|||||||
* @param options.pretty If true, whitespace is inserted into the stringified chunks.
|
* @param options.pretty If true, whitespace is inserted into the stringified chunks.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream {
|
export function stringify(options?: JsonParseOptions): Transform {
|
||||||
return baseFunctions.stringify(options);
|
return baseFunctions.stringify(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,7 +148,7 @@ export function stringify(options?: JsonParseOptions): NodeJS.ReadWriteStream {
|
|||||||
* @param options?
|
* @param options?
|
||||||
* @param options.objectMode? Whether this stream should behave as a stream of objects
|
* @param options.objectMode? Whether this stream should behave as a stream of objects
|
||||||
*/
|
*/
|
||||||
export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream {
|
export function collect(options?: ThroughOptions): Transform {
|
||||||
return baseFunctions.collect(options);
|
return baseFunctions.collect(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,9 +156,7 @@ export function collect(options?: ThroughOptions): NodeJS.ReadWriteStream {
|
|||||||
* Return a Readable stream of readable streams concatenated together
|
* Return a Readable stream of readable streams concatenated together
|
||||||
* @param streams Readable streams to concatenate
|
* @param streams Readable streams to concatenate
|
||||||
*/
|
*/
|
||||||
export function concat(
|
export function concat(...streams: Readable[]): Readable {
|
||||||
...streams: NodeJS.ReadableStream[]
|
|
||||||
): NodeJS.ReadableStream {
|
|
||||||
return baseFunctions.concat(...streams);
|
return baseFunctions.concat(...streams);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,9 +164,7 @@ export function concat(
|
|||||||
* Return a Readable stream of readable streams concatenated together
|
* Return a Readable stream of readable streams concatenated together
|
||||||
* @param streams Readable streams to merge
|
* @param streams Readable streams to merge
|
||||||
*/
|
*/
|
||||||
export function merge(
|
export function merge(...streams: Readable[]): Readable {
|
||||||
...streams: NodeJS.ReadableStream[]
|
|
||||||
): NodeJS.ReadableStream {
|
|
||||||
return baseFunctions.merge(...streams);
|
return baseFunctions.merge(...streams);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,10 +174,7 @@ export function merge(
|
|||||||
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
|
* @param writable Writable stream assumed to cause the readable stream to yield chunks when written to
|
||||||
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
|
* @param readable Readable stream assumed to yield chunks when the writable stream is written to
|
||||||
*/
|
*/
|
||||||
export function duplex(
|
export function duplex(writable: Writable, readable: Readable): Duplex {
|
||||||
writable: Writable,
|
|
||||||
readable: Readable,
|
|
||||||
): NodeJS.ReadWriteStream {
|
|
||||||
return baseFunctions.duplex(writable, readable);
|
return baseFunctions.duplex(writable, readable);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,7 +182,7 @@ export function duplex(
|
|||||||
* Return a Duplex stream from a child process' stdin and stdout
|
* Return a Duplex stream from a child process' stdin and stdout
|
||||||
* @param childProcess Child process from which to create duplex stream
|
* @param childProcess Child process from which to create duplex stream
|
||||||
*/
|
*/
|
||||||
export function child(childProcess: ChildProcess): NodeJS.ReadWriteStream {
|
export function child(childProcess: ChildProcess): Duplex {
|
||||||
return baseFunctions.child(childProcess);
|
return baseFunctions.child(childProcess);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,7 +207,7 @@ export function batch(batchSize: number, maxBatchAge?: number): Transform {
|
|||||||
/**
|
/**
|
||||||
* Unbatches and sends individual chunks of data
|
* Unbatches and sends individual chunks of data
|
||||||
*/
|
*/
|
||||||
export function unbatch(): NodeJS.ReadWriteStream {
|
export function unbatch(): Transform {
|
||||||
return baseFunctions.unbatch();
|
return baseFunctions.unbatch();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,10 +217,7 @@ export function unbatch(): NodeJS.ReadWriteStream {
|
|||||||
* @param targetRate? Desired rate in ms
|
* @param targetRate? Desired rate in ms
|
||||||
* @param period? Period to sleep for when rate is above or equal to targetRate
|
* @param period? Period to sleep for when rate is above or equal to targetRate
|
||||||
*/
|
*/
|
||||||
export function rate(
|
export function rate(targetRate?: number, period?: number): Transform {
|
||||||
targetRate?: number,
|
|
||||||
period?: number,
|
|
||||||
): NodeJS.ReadWriteStream {
|
|
||||||
return baseFunctions.rate(targetRate, period);
|
return baseFunctions.rate(targetRate, period);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ import { WithEncoding } from "../baseDefinitions";
|
|||||||
export function join(
|
export function join(
|
||||||
separator: string,
|
separator: string,
|
||||||
options: WithEncoding = { encoding: "utf8" },
|
options: WithEncoding = { encoding: "utf8" },
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
let isFirstChunk = true;
|
let isFirstChunk = true;
|
||||||
const decoder = new StringDecoder(options.encoding);
|
const decoder = new StringDecoder(options.encoding);
|
||||||
return new Transform({
|
return new Transform({
|
||||||
|
@ -1,10 +1,9 @@
|
|||||||
import { Readable } from "stream";
|
|
||||||
/**
|
/**
|
||||||
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
|
* Return a Promise resolving to the last streamed chunk of the given readable stream, after it has
|
||||||
* ended
|
* ended
|
||||||
* @param readable Readable stream to wait on
|
* @param readable Readable stream to wait on
|
||||||
*/
|
*/
|
||||||
export function last<T>(readable: Readable): Promise<T | null> {
|
export function last<T>(readable: NodeJS.ReadableStream): Promise<T | null> {
|
||||||
let lastChunk: T | null = null;
|
let lastChunk: T | null = null;
|
||||||
return new Promise((resolve, _) => {
|
return new Promise((resolve, _) => {
|
||||||
readable
|
readable
|
||||||
|
15
src/functions/last/last.spec.ts
Normal file
15
src/functions/last/last.spec.ts
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { last } from "../baseFunctions";
|
||||||
|
|
||||||
|
test("last() resolves to the last chunk streamed by the given readable stream", async t => {
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const lastPromise = last(source);
|
||||||
|
source.push("ab");
|
||||||
|
source.push("cd");
|
||||||
|
source.push("ef");
|
||||||
|
source.push(null);
|
||||||
|
const lastChunk = await lastPromise;
|
||||||
|
expect(lastChunk).to.equal("ef");
|
||||||
|
});
|
60
src/functions/merge/merge.spec.ts
Normal file
60
src/functions/merge/merge.spec.ts
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { merge } from "../baseFunctions";
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"merge() merges multiple readable streams in chunk arrival order",
|
||||||
|
t => {
|
||||||
|
t.plan(6);
|
||||||
|
const source1 = new Readable({ objectMode: true, read: () => ({}) });
|
||||||
|
const source2 = new Readable({ objectMode: true, read: () => ({}) });
|
||||||
|
const expectedElements = ["a", "d", "b", "e", "c", "f"];
|
||||||
|
let i = 0;
|
||||||
|
merge(source1, source2)
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source1.push("a");
|
||||||
|
setTimeout(() => source2.push("d"), 10);
|
||||||
|
setTimeout(() => source1.push("b"), 20);
|
||||||
|
setTimeout(() => source2.push("e"), 30);
|
||||||
|
setTimeout(() => source1.push("c"), 40);
|
||||||
|
setTimeout(() => source2.push("f"), 50);
|
||||||
|
setTimeout(() => source2.push(null), 60);
|
||||||
|
setTimeout(() => source1.push(null), 70);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.cb("merge() merges a readable stream", t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({ objectMode: true, read: () => ({}) });
|
||||||
|
const expectedElements = ["a", "b", "c"];
|
||||||
|
let i = 0;
|
||||||
|
merge(source)
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("merge() merges an empty list of readable streams", t => {
|
||||||
|
t.plan(0);
|
||||||
|
merge()
|
||||||
|
.on("data", () => t.pass())
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
});
|
77
src/functions/parallelMap/parallelMap.spec.ts
Normal file
77
src/functions/parallelMap/parallelMap.spec.ts
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import { performance } from "perf_hooks";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { parallelMap } from "../baseFunctions";
|
||||||
|
import { sleep } from "../../helpers";
|
||||||
|
|
||||||
|
test.cb("parallelMap() parallel mapping", t => {
|
||||||
|
t.plan(6);
|
||||||
|
const offset = 50;
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = [
|
||||||
|
"a_processed",
|
||||||
|
"b_processed",
|
||||||
|
"c_processed",
|
||||||
|
"d_processed",
|
||||||
|
"e_processed",
|
||||||
|
"f_processed",
|
||||||
|
];
|
||||||
|
interface IPerfData {
|
||||||
|
start: number;
|
||||||
|
output?: string;
|
||||||
|
finish?: number;
|
||||||
|
}
|
||||||
|
const orderedResults: IPerfData[] = [];
|
||||||
|
source
|
||||||
|
.pipe(
|
||||||
|
parallelMap(async (data: any) => {
|
||||||
|
const perfData: IPerfData = { start: performance.now() };
|
||||||
|
const c = data + "_processed";
|
||||||
|
perfData.output = c;
|
||||||
|
await sleep(offset);
|
||||||
|
perfData.finish = performance.now();
|
||||||
|
orderedResults.push(perfData);
|
||||||
|
return c;
|
||||||
|
}, 2),
|
||||||
|
)
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
t.true(expectedElements.includes(element));
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", async () => {
|
||||||
|
expect(orderedResults[0].finish).to.be.lessThan(
|
||||||
|
orderedResults[2].start,
|
||||||
|
);
|
||||||
|
expect(orderedResults[1].finish).to.be.lessThan(
|
||||||
|
orderedResults[3].start,
|
||||||
|
);
|
||||||
|
expect(orderedResults[2].finish).to.be.lessThan(
|
||||||
|
orderedResults[4].start,
|
||||||
|
);
|
||||||
|
expect(orderedResults[3].finish).to.be.lessThan(
|
||||||
|
orderedResults[5].start,
|
||||||
|
);
|
||||||
|
expect(orderedResults[0].start).to.be.lessThan(
|
||||||
|
orderedResults[2].start + offset,
|
||||||
|
);
|
||||||
|
expect(orderedResults[1].start).to.be.lessThan(
|
||||||
|
orderedResults[3].start + offset,
|
||||||
|
);
|
||||||
|
expect(orderedResults[2].start).to.be.lessThan(
|
||||||
|
orderedResults[4].start + offset,
|
||||||
|
);
|
||||||
|
expect(orderedResults[3].start).to.be.lessThan(
|
||||||
|
orderedResults[5].start + offset,
|
||||||
|
);
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push("d");
|
||||||
|
source.push("e");
|
||||||
|
source.push("f");
|
||||||
|
source.push(null);
|
||||||
|
});
|
@ -8,7 +8,7 @@ import { SerializationFormats } from "../baseDefinitions";
|
|||||||
*/
|
*/
|
||||||
export function parse(
|
export function parse(
|
||||||
format: SerializationFormats = SerializationFormats.utf8,
|
format: SerializationFormats = SerializationFormats.utf8,
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
const decoder = new StringDecoder(format);
|
const decoder = new StringDecoder(format);
|
||||||
return new Transform({
|
return new Transform({
|
||||||
readableObjectMode: true,
|
readableObjectMode: true,
|
||||||
|
40
src/functions/parse/parse.spec.ts
Normal file
40
src/functions/parse/parse.spec.ts
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { parse } from "../baseFunctions";
|
||||||
|
|
||||||
|
test.cb("parse() parses the streamed elements as JSON", t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["abc", {}, []];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(parse())
|
||||||
|
.on("data", part => {
|
||||||
|
expect(part).to.deep.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push('"abc"');
|
||||||
|
source.push("{}");
|
||||||
|
source.push("[]");
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb("parse() emits errors on invalid JSON", t => {
|
||||||
|
t.plan(2);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
source
|
||||||
|
.pipe(parse())
|
||||||
|
.resume()
|
||||||
|
.on("error", () => t.pass())
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("{}");
|
||||||
|
source.push({});
|
||||||
|
source.push([]);
|
||||||
|
source.push(null);
|
||||||
|
});
|
@ -14,11 +14,12 @@ export function rate(
|
|||||||
readableObjectMode: true,
|
readableObjectMode: true,
|
||||||
writableObjectMode: true,
|
writableObjectMode: true,
|
||||||
},
|
},
|
||||||
) {
|
): Transform {
|
||||||
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period
|
const deltaMS = ((1 / targetRate) * 1000) / period; // Skip half a period
|
||||||
let total = 0;
|
let total = 0;
|
||||||
const start = performance.now();
|
const start = performance.now();
|
||||||
return new Transform({
|
return new Transform({
|
||||||
|
...options,
|
||||||
async transform(data, encoding, callback) {
|
async transform(data, encoding, callback) {
|
||||||
const currentRate = (total / (performance.now() - start)) * 1000;
|
const currentRate = (total / (performance.now() - start)) * 1000;
|
||||||
if (targetRate && currentRate > targetRate) {
|
if (targetRate && currentRate > targetRate) {
|
||||||
|
67
src/functions/rate/rate.spec.ts
Normal file
67
src/functions/rate/rate.spec.ts
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import { performance } from "perf_hooks";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { rate } from "../baseFunctions";
|
||||||
|
|
||||||
|
test.cb("rate() sends data at desired rate", t => {
|
||||||
|
t.plan(9);
|
||||||
|
const fastRate = 150;
|
||||||
|
const medRate = 50;
|
||||||
|
const slowRate = 1;
|
||||||
|
const sourceFast = new Readable({ objectMode: true });
|
||||||
|
const sourceMed = new Readable({ objectMode: true });
|
||||||
|
const sourceSlow = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["a", "b", "c"];
|
||||||
|
const start = performance.now();
|
||||||
|
let i = 0;
|
||||||
|
let j = 0;
|
||||||
|
let k = 0;
|
||||||
|
|
||||||
|
sourceFast
|
||||||
|
.pipe(rate(fastRate))
|
||||||
|
.on("data", (element: string[]) => {
|
||||||
|
const currentRate = (i / (performance.now() - start)) * 1000;
|
||||||
|
expect(element).to.deep.equal(expectedElements[i]);
|
||||||
|
expect(currentRate).lessThan(fastRate);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end);
|
||||||
|
|
||||||
|
sourceMed
|
||||||
|
.pipe(rate(medRate))
|
||||||
|
.on("data", (element: string[]) => {
|
||||||
|
const currentRate = (j / (performance.now() - start)) * 1000;
|
||||||
|
expect(element).to.deep.equal(expectedElements[j]);
|
||||||
|
expect(currentRate).lessThan(medRate);
|
||||||
|
t.pass();
|
||||||
|
j++;
|
||||||
|
})
|
||||||
|
.on("error", t.end);
|
||||||
|
|
||||||
|
sourceSlow
|
||||||
|
.pipe(rate(slowRate, 1))
|
||||||
|
.on("data", (element: string[]) => {
|
||||||
|
const currentRate = (k / (performance.now() - start)) * 1000;
|
||||||
|
expect(element).to.deep.equal(expectedElements[k]);
|
||||||
|
expect(currentRate).lessThan(slowRate);
|
||||||
|
t.pass();
|
||||||
|
k++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
sourceFast.push("a");
|
||||||
|
sourceFast.push("b");
|
||||||
|
sourceFast.push("c");
|
||||||
|
sourceFast.push(null);
|
||||||
|
sourceMed.push("a");
|
||||||
|
sourceMed.push("b");
|
||||||
|
sourceMed.push("c");
|
||||||
|
sourceMed.push(null);
|
||||||
|
sourceSlow.push("a");
|
||||||
|
sourceSlow.push("b");
|
||||||
|
sourceSlow.push("c");
|
||||||
|
sourceSlow.push(null);
|
||||||
|
});
|
@ -13,7 +13,7 @@ export function replace(
|
|||||||
searchValue: string | RegExp,
|
searchValue: string | RegExp,
|
||||||
replaceValue: string,
|
replaceValue: string,
|
||||||
options: WithEncoding = { encoding: "utf8" },
|
options: WithEncoding = { encoding: "utf8" },
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
const decoder = new StringDecoder(options.encoding);
|
const decoder = new StringDecoder(options.encoding);
|
||||||
return new Transform({
|
return new Transform({
|
||||||
readableObjectMode: true,
|
readableObjectMode: true,
|
||||||
|
@ -10,7 +10,7 @@ import { WithEncoding } from "../baseDefinitions";
|
|||||||
export function split(
|
export function split(
|
||||||
separator: string | RegExp = "\n",
|
separator: string | RegExp = "\n",
|
||||||
options: WithEncoding = { encoding: "utf8" },
|
options: WithEncoding = { encoding: "utf8" },
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
let buffered = "";
|
let buffered = "";
|
||||||
const decoder = new StringDecoder(options.encoding);
|
const decoder = new StringDecoder(options.encoding);
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ import { JsonValue, JsonParseOptions } from "../baseDefinitions";
|
|||||||
*/
|
*/
|
||||||
export function stringify(
|
export function stringify(
|
||||||
options: JsonParseOptions = { pretty: false },
|
options: JsonParseOptions = { pretty: false },
|
||||||
): NodeJS.ReadWriteStream {
|
): Transform {
|
||||||
return new Transform({
|
return new Transform({
|
||||||
readableObjectMode: true,
|
readableObjectMode: true,
|
||||||
writableObjectMode: true,
|
writableObjectMode: true,
|
||||||
|
61
src/functions/stringify/stringify.spec.ts
Normal file
61
src/functions/stringify/stringify.spec.ts
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { stringify } from "../baseFunctions";
|
||||||
|
|
||||||
|
test.cb("stringify() stringifies the streamed elements as JSON", t => {
|
||||||
|
t.plan(4);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = [
|
||||||
|
'"abc"',
|
||||||
|
"0",
|
||||||
|
'{"a":"a","b":"b","c":"c"}',
|
||||||
|
'["a","b","c"]',
|
||||||
|
];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(stringify())
|
||||||
|
.on("data", part => {
|
||||||
|
expect(part).to.deep.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("abc");
|
||||||
|
source.push(0);
|
||||||
|
source.push({ a: "a", b: "b", c: "c" });
|
||||||
|
source.push(["a", "b", "c"]);
|
||||||
|
source.push(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.cb(
|
||||||
|
"stringify() stringifies the streamed elements as pretty-printed JSON",
|
||||||
|
t => {
|
||||||
|
t.plan(4);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = [
|
||||||
|
'"abc"',
|
||||||
|
"0",
|
||||||
|
'{\n "a": "a",\n "b": "b",\n "c": "c"\n}',
|
||||||
|
'[\n "a",\n "b",\n "c"\n]',
|
||||||
|
];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(stringify({ pretty: true }))
|
||||||
|
.on("data", part => {
|
||||||
|
expect(part).to.deep.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("abc");
|
||||||
|
source.push(0);
|
||||||
|
source.push({ a: "a", b: "b", c: "c" });
|
||||||
|
source.push(["a", "b", "c"]);
|
||||||
|
source.push(null);
|
||||||
|
},
|
||||||
|
);
|
26
src/functions/unbatch/unbatch.spec.ts
Normal file
26
src/functions/unbatch/unbatch.spec.ts
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
import { Readable } from "stream";
|
||||||
|
import test from "ava";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { unbatch, batch } from "../baseFunctions";
|
||||||
|
|
||||||
|
test.cb("unbatch() unbatches", t => {
|
||||||
|
t.plan(3);
|
||||||
|
const source = new Readable({ objectMode: true });
|
||||||
|
const expectedElements = ["a", "b", "c"];
|
||||||
|
let i = 0;
|
||||||
|
source
|
||||||
|
.pipe(batch(3))
|
||||||
|
.pipe(unbatch())
|
||||||
|
.on("data", (element: string) => {
|
||||||
|
expect(element).to.equal(expectedElements[i]);
|
||||||
|
t.pass();
|
||||||
|
i++;
|
||||||
|
})
|
||||||
|
.on("error", t.end)
|
||||||
|
.on("end", t.end);
|
||||||
|
|
||||||
|
source.push("a");
|
||||||
|
source.push("b");
|
||||||
|
source.push("c");
|
||||||
|
source.push(null);
|
||||||
|
});
|
Loading…
Reference in New Issue
Block a user