Add batching timeout

This commit is contained in:
Lewis Diamond 2019-08-06 16:50:58 -04:00
parent 86020a50ad
commit bad58a27fe
3 changed files with 716 additions and 586 deletions

View File

@ -141,8 +141,9 @@ test.cb("map() emits errors during synchronous mapping", t => {
source.push(null); source.push(null);
}); });
test.cb("map() emits errors during asynchronous mapping", t => { test("map() emits errors during asynchronous mapping", t => {
t.plan(2); t.plan(1);
return new Promise((resolve, reject) => {
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
source source
.pipe( .pipe(
@ -158,14 +159,18 @@ test.cb("map() emits errors during asynchronous mapping", t => {
.on("error", err => { .on("error", err => {
expect(err.message).to.equal("Failed mapping"); expect(err.message).to.equal("Failed mapping");
t.pass(); t.pass();
resolve();
}) })
.on("end", t.end); .on("end", () => {
t.fail();
});
source.push("a"); source.push("a");
source.push("b"); source.push("b");
source.push("c"); source.push("c");
source.push(null); source.push(null);
}); });
});
test.cb("flatMap() maps elements synchronously", t => { test.cb("flatMap() maps elements synchronously", t => {
t.plan(6); t.plan(6);
@ -1212,6 +1217,36 @@ test.cb("batch() batches chunks together", t => {
source.push(null); source.push(null);
}); });
test.cb("batch() yields a batch after the timeout", t => {
t.plan(3);
const source = new Readable({
objectMode: true,
read(size: number) {},
});
const expectedElements = [["a", "b"], ["c"], ["d"]];
let i = 0;
source
.pipe(batch(3))
.on("data", (element: string[]) => {
console.error("DATA", element);
expect(element).to.deep.equal(expectedElements[i]);
t.pass();
i++;
})
.on("error", t.fail)
.on("end", t.end);
source.push("a");
source.push("b");
setTimeout(() => {
source.push("c");
}, 600);
setTimeout(() => {
source.push("d");
source.push(null);
}, 600 * 2);
});
test.cb("unbatch() unbatches", t => { test.cb("unbatch() unbatches", t => {
t.plan(3); t.plan(3);
const source = new Readable({ objectMode: true }); const source = new Readable({ objectMode: true });
@ -1344,16 +1379,16 @@ test.cb("parallel() parallel mapping", t => {
orderedResults[5].start, orderedResults[5].start,
); );
expect(orderedResults[0].start).to.be.lessThan( expect(orderedResults[0].start).to.be.lessThan(
orderedResults[2].start + offset orderedResults[2].start + offset,
); );
expect(orderedResults[1].start).to.be.lessThan( expect(orderedResults[1].start).to.be.lessThan(
orderedResults[3].start + offset orderedResults[3].start + offset,
); );
expect(orderedResults[2].start).to.be.lessThan( expect(orderedResults[2].start).to.be.lessThan(
orderedResults[4].start + offset orderedResults[4].start + offset,
); );
expect(orderedResults[3].start).to.be.lessThan( expect(orderedResults[3].start).to.be.lessThan(
orderedResults[5].start + offset orderedResults[5].start + offset,
); );
t.end(); t.end();
}); });

View File

@ -49,20 +49,13 @@ export function map<T, R>(
return new Transform({ return new Transform({
...options, ...options,
async transform(chunk: T, encoding, callback) { async transform(chunk: T, encoding, callback) {
let isPromise = false;
try { try {
const mapped = mapper(chunk, encoding); const mapped = await mapper(chunk, encoding);
isPromise = mapped instanceof Promise; this.push(mapped);
callback(undefined, await mapped);
} catch (err) {
if (isPromise) {
// Calling the callback asynchronously with an error wouldn't emit the error, so emit directly
this.emit("error", err);
callback(); callback();
} else { } catch (err) {
callback(err); callback(err);
} }
}
}, },
}); });
} }
@ -504,28 +497,33 @@ export function last<T>(readable: Readable): Promise<T | null> {
* @param maxBatchAge Max lifetime of a batch * @param maxBatchAge Max lifetime of a batch
*/ */
export function batch(batchSize: number = 1000, maxBatchAge: number = 500) { export function batch(batchSize: number = 1000, maxBatchAge: number = 500) {
const buffer: any[] = []; let buffer: any[] = [];
let startTime: number | null = null; let timer: NodeJS.Timer | null = null;
let sendChunk = (self: Transform) => {
timer && clearTimeout(timer);
timer = null;
self.push(buffer);
buffer = [];
};
return new Transform({ return new Transform({
objectMode: true, objectMode: true,
transform(chunk, encoding, callback) { transform(chunk, encoding, callback) {
if (
buffer.length === batchSize - 1 ||
(startTime !== null &&
startTime - performance.now() >= maxBatchAge)
) {
buffer.push(chunk); buffer.push(chunk);
callback(undefined, buffer.splice(0)); if (buffer.length === batchSize) {
sendChunk(this);
} else { } else {
if (startTime === null) { if (timer === null) {
startTime = performance.now(); timer = setInterval(() => {
sendChunk(this);
}, maxBatchAge);
}
} }
buffer.push(chunk);
callback(); callback();
}
}, },
flush(callback) { flush(callback) {
callback(undefined, buffer.splice(0)); console.error("flushing");
sendChunk(this);
callback();
}, },
}); });
} }

1165
yarn.lock

File diff suppressed because it is too large Load Diff