0.3.0 Release
This commit is contained in:
29
src/index.spec.ts
Normal file
29
src/index.spec.ts
Normal file
@@ -0,0 +1,29 @@
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import { once, sleep } from "./";
|
||||
import { EventEmitter } from "events";
|
||||
|
||||
const TimingErrorMarginMs = 50;
|
||||
|
||||
test("sleep() resolves after the specified delay in milliseconds", async t => {
|
||||
const before = Date.now();
|
||||
await sleep(200);
|
||||
const after = Date.now();
|
||||
|
||||
expect(after - before).gte(200);
|
||||
expect(after - before).closeTo(200, TimingErrorMarginMs);
|
||||
});
|
||||
|
||||
test("once() resolves only after the specified event is emitted", async t => {
|
||||
const emitter = new EventEmitter();
|
||||
const before = Date.now();
|
||||
emitter.emit("noise", "is ignored");
|
||||
setTimeout(() => emitter.emit("done", "some-result"), 200);
|
||||
|
||||
const result = await once(emitter, "done");
|
||||
const after = Date.now();
|
||||
|
||||
expect(result).to.equal("some-result");
|
||||
expect(after - before).gte(200);
|
||||
expect(after - before).closeTo(200, TimingErrorMarginMs);
|
||||
});
|
||||
27
src/index.ts
Normal file
27
src/index.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
/**
|
||||
* Resolve after the given delay in milliseconds
|
||||
*
|
||||
* @param ms - The number of milliseconds to wait
|
||||
*/
|
||||
export function sleep(ms: number) {
|
||||
return new Promise(resolve => {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve once the given event emitter emits the specified event
|
||||
*
|
||||
* @param emitter - The event emitter to watch
|
||||
* @param event - The event to watch
|
||||
*/
|
||||
export function once<T>(
|
||||
emitter: NodeJS.EventEmitter,
|
||||
event: string,
|
||||
): Promise<T> {
|
||||
return new Promise(resolve => {
|
||||
emitter.once(event, result => {
|
||||
resolve(result);
|
||||
});
|
||||
});
|
||||
}
|
||||
350
src/stream.spec.ts
Normal file
350
src/stream.spec.ts
Normal file
@@ -0,0 +1,350 @@
|
||||
import test from "ava";
|
||||
import { expect } from "chai";
|
||||
import { fromArray, collect, concat } from "./stream";
|
||||
import { Readable } from "stream";
|
||||
|
||||
test.cb("fromArray() streams array elements in flowing mode", t => {
|
||||
t.plan(3);
|
||||
const elements = ["a", "b", "c"];
|
||||
const stream = fromArray(elements);
|
||||
let i = 0;
|
||||
stream
|
||||
.on("data", element => {
|
||||
expect(element).to.equal(elements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
|
||||
test.cb("fromArray() streams array elements in paused mode", t => {
|
||||
t.plan(3);
|
||||
const elements = ["a", "b", "c"];
|
||||
const stream = fromArray(elements);
|
||||
let i = 0;
|
||||
stream
|
||||
.on("readable", () => {
|
||||
let element = stream.read();
|
||||
while (element !== null) {
|
||||
expect(element).to.equal(elements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
element = stream.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
|
||||
test.cb("fromArray() ends immediately if there are no array elements", t => {
|
||||
t.plan(0);
|
||||
fromArray([])
|
||||
.on("data", () => t.fail())
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"collect() collects streamed elements into an array (object, flowing mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
|
||||
source
|
||||
.pipe(collect({ objectMode: true }))
|
||||
.on("data", collected => {
|
||||
expect(collected).to.deep.equal(["a", "b", "c"]);
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() collects streamed elements into an array (object, paused mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const collector = source.pipe(collect({ objectMode: true }));
|
||||
|
||||
collector
|
||||
.on("readable", () => {
|
||||
let collected = collector.read();
|
||||
while (collected !== null) {
|
||||
expect(collected).to.deep.equal(["a", "b", "c"]);
|
||||
t.pass();
|
||||
collected = collector.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() collects streamed bytes into a buffer (non-object, flowing mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: false });
|
||||
|
||||
source
|
||||
.pipe(collect())
|
||||
.on("data", collected => {
|
||||
expect(collected).to.deep.equal(Buffer.from("abc"));
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() collects streamed bytes into a buffer (non-object, paused mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: false });
|
||||
const collector = source.pipe(collect({ objectMode: false }));
|
||||
collector
|
||||
.on("readable", () => {
|
||||
let collected = collector.read();
|
||||
while (collected !== null) {
|
||||
expect(collected).to.deep.equal(Buffer.from("abc"));
|
||||
t.pass();
|
||||
collected = collector.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() emits an empty array if the source was empty (object mode)",
|
||||
t => {
|
||||
t.plan(1);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const collector = source.pipe(collect({ objectMode: true }));
|
||||
collector
|
||||
.on("data", collected => {
|
||||
expect(collected).to.deep.equal([]);
|
||||
t.pass();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"collect() emits nothing if the source was empty (non-object mode)",
|
||||
t => {
|
||||
t.plan(0);
|
||||
const source = new Readable({ objectMode: false });
|
||||
const collector = source.pipe(collect({ objectMode: false }));
|
||||
collector
|
||||
.on("data", () => t.fail())
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates multiple readable streams (object, flowing mode)",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: true });
|
||||
const source2 = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
concat(source1, source2)
|
||||
.on("data", element => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
source2.push("d");
|
||||
source1.push("b");
|
||||
source2.push("e");
|
||||
source1.push("c");
|
||||
source2.push("f");
|
||||
source2.push(null);
|
||||
source1.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates multiple readable streams (object, paused mode)",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: true });
|
||||
const source2 = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
const concatenation = concat(source1, source2)
|
||||
.on("readable", () => {
|
||||
let element = concatenation.read();
|
||||
while (element !== null) {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
element = concatenation.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
source2.push("d");
|
||||
source1.push("b");
|
||||
source2.push("e");
|
||||
source1.push("c");
|
||||
source2.push("f");
|
||||
source2.push(null);
|
||||
source1.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates multiple readable streams (non-object, flowing mode)",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: false });
|
||||
const source2 = new Readable({ objectMode: false });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
concat(source1, source2)
|
||||
.on("data", element => {
|
||||
expect(element).to.deep.equal(Buffer.from(expectedElements[i]));
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
source2.push("d");
|
||||
source1.push("b");
|
||||
source2.push("e");
|
||||
source1.push("c");
|
||||
source2.push("f");
|
||||
source2.push(null);
|
||||
source1.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates multiple readable streams (non-object, paused mode)",
|
||||
t => {
|
||||
t.plan(6);
|
||||
const source1 = new Readable({ objectMode: false });
|
||||
const source2 = new Readable({ objectMode: false });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
const concatenation = concat(source1, source2)
|
||||
.on("readable", () => {
|
||||
let element = concatenation.read();
|
||||
while (element !== null) {
|
||||
expect(element).to.deep.equal(
|
||||
Buffer.from(expectedElements[i]),
|
||||
);
|
||||
t.pass();
|
||||
i++;
|
||||
element = concatenation.read();
|
||||
}
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source1.push("a");
|
||||
source2.push("d");
|
||||
source1.push("b");
|
||||
source2.push("e");
|
||||
source1.push("c");
|
||||
source2.push("f");
|
||||
source2.push(null);
|
||||
source1.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("concat() concatenates a single readable stream (object mode)", t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: true });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
concat(source)
|
||||
.on("data", element => {
|
||||
expect(element).to.equal(expectedElements[i]);
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
});
|
||||
|
||||
test.cb(
|
||||
"concat() concatenates a single readable stream (non-object mode)",
|
||||
t => {
|
||||
t.plan(3);
|
||||
const source = new Readable({ objectMode: false });
|
||||
const expectedElements = ["a", "b", "c", "d", "e", "f"];
|
||||
let i = 0;
|
||||
concat(source)
|
||||
.on("data", element => {
|
||||
expect(element).to.deep.equal(Buffer.from(expectedElements[i]));
|
||||
t.pass();
|
||||
i++;
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
|
||||
source.push("a");
|
||||
source.push("b");
|
||||
source.push("c");
|
||||
source.push(null);
|
||||
},
|
||||
);
|
||||
|
||||
test.cb("concat() concatenates empty list of readable streams", t => {
|
||||
t.plan(0);
|
||||
concat()
|
||||
.pipe(collect())
|
||||
.on("data", _ => {
|
||||
t.fail();
|
||||
})
|
||||
.on("error", t.end)
|
||||
.on("end", t.end);
|
||||
});
|
||||
83
src/stream.ts
Normal file
83
src/stream.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import { Transform, Readable } from "stream";
|
||||
|
||||
/**
|
||||
* Convert an array into a readable stream of its elements
|
||||
* @param array The array of elements to stream
|
||||
*/
|
||||
export function fromArray(array: any[]): NodeJS.ReadableStream {
|
||||
let cursor = 0;
|
||||
return new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
if (cursor < array.length) {
|
||||
this.push(array[cursor]);
|
||||
cursor++;
|
||||
} else {
|
||||
this.push(null);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a ReadWrite stream that collects streamed objects or bytes into an array or buffer
|
||||
* @param options
|
||||
* @param options.objectMode Whether this stream should behave as a stream of objects
|
||||
*/
|
||||
export function collect({ objectMode = false } = {}): NodeJS.ReadWriteStream {
|
||||
const collected: any[] = [];
|
||||
return new Transform({
|
||||
readableObjectMode: objectMode,
|
||||
writableObjectMode: objectMode,
|
||||
transform(data, encoding, callback) {
|
||||
collected.push(data);
|
||||
callback();
|
||||
},
|
||||
flush(callback) {
|
||||
this.push(objectMode ? collected : Buffer.concat(collected));
|
||||
callback();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a stream of readable streams concatenated together
|
||||
* @param streams The readable streams to concatenate
|
||||
*/
|
||||
export function concat(
|
||||
...streams: NodeJS.ReadableStream[]
|
||||
): NodeJS.ReadableStream {
|
||||
let isStarted = false;
|
||||
let currentStreamIndex = 0;
|
||||
const startCurrentStream = () => {
|
||||
if (currentStreamIndex >= streams.length) {
|
||||
wrapper.push(null);
|
||||
} else {
|
||||
streams[currentStreamIndex]
|
||||
.on("data", chunk => {
|
||||
if (!wrapper.push(chunk)) {
|
||||
streams[currentStreamIndex].pause();
|
||||
}
|
||||
})
|
||||
.on("error", err => wrapper.emit("error", err))
|
||||
.on("end", () => {
|
||||
currentStreamIndex++;
|
||||
startCurrentStream();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const wrapper = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
if (!isStarted) {
|
||||
isStarted = true;
|
||||
startCurrentStream();
|
||||
}
|
||||
if (currentStreamIndex < streams.length) {
|
||||
streams[currentStreamIndex].resume();
|
||||
}
|
||||
},
|
||||
});
|
||||
return wrapper;
|
||||
}
|
||||
Reference in New Issue
Block a user