Cleanup
This commit is contained in:
parent
4e80e48fa4
commit
50f6886b4b
@ -4,7 +4,7 @@ import { TransformOptions } from "./baseDefinitions";
|
|||||||
* Stores chunks of data internally in array and batches when batchSize is reached.
|
* Stores chunks of data internally in array and batches when batchSize is reached.
|
||||||
*
|
*
|
||||||
* @param batchSize Size of the batches
|
* @param batchSize Size of the batches
|
||||||
* @param maxBatchAge Max lifetime of a batch
|
* @param maxBatchAge Max lifetime of a batch in seconds
|
||||||
*/
|
*/
|
||||||
export function batch(
|
export function batch(
|
||||||
batchSize: number = 1000,
|
batchSize: number = 1000,
|
||||||
|
@ -202,17 +202,17 @@ export function batch(batchSize: number, maxBatchAge?: number): Transform {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unbatches and sends individual chunks of data
|
* Unbatches and sends individual chunks of data.
|
||||||
*/
|
*/
|
||||||
export function unbatch(): Transform {
|
export function unbatch(): Transform {
|
||||||
return baseFunctions.unbatch();
|
return baseFunctions.unbatch();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Limits date of data transferred into stream.
|
* Limits rate of data transferred into stream.
|
||||||
* @param options?
|
* @param options?
|
||||||
* @param targetRate? Desired rate in ms
|
* @param targetRate? Desired rate in ms.
|
||||||
* @param period? Period to sleep for when rate is above or equal to targetRate
|
* @param period? Period to sleep for when rate is above or equal to targetRate.
|
||||||
*/
|
*/
|
||||||
export function rate(targetRate?: number, period?: number): Transform {
|
export function rate(targetRate?: number, period?: number): Transform {
|
||||||
return baseFunctions.rate(targetRate, period);
|
return baseFunctions.rate(targetRate, period);
|
||||||
@ -221,7 +221,7 @@ export function rate(targetRate?: number, period?: number): Transform {
|
|||||||
/**
|
/**
|
||||||
* Limits number of parallel processes in flight.
|
* Limits number of parallel processes in flight.
|
||||||
* @param parallel Max number of parallel processes.
|
* @param parallel Max number of parallel processes.
|
||||||
* @param func Function to execute on each data chunk
|
* @param func Function to execute on each data chunk.
|
||||||
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
|
* @param pause Amount of time to pause processing when max number of parallel processes are executing.
|
||||||
*/
|
*/
|
||||||
export function parallelMap<T, R>(
|
export function parallelMap<T, R>(
|
||||||
@ -232,6 +232,26 @@ export function parallelMap<T, R>(
|
|||||||
return baseFunctions.parallelMap(mapper, parallel, sleepTime);
|
return baseFunctions.parallelMap(mapper, parallel, sleepTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
|
||||||
|
* in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies:
|
||||||
|
* 1. Sliding
|
||||||
|
* - If the buffer is larger than the batchSize, the front of the buffer is popped to maintain
|
||||||
|
* the batchSize. When no key is provided, the batchSize is effectively the buffer length. When
|
||||||
|
* a key is provided, the batchSize is based on the value at that key. For example, given a key
|
||||||
|
* of `timestamp` and a batchSize of 3000, each item in the buffer will be guaranteed to be
|
||||||
|
* within 3000 timestamp units from the first element. This means that with a key, multiple elements
|
||||||
|
* may be spliced off the front of the buffer. The buffer is then pushed into the stream.
|
||||||
|
* 2. Rolling
|
||||||
|
* - If the buffer is larger than the batchSize, the buffer is cleared and pushed into the stream.
|
||||||
|
* When no key is provided, the batchSize is the buffer length. When a key is provided, the batchSize
|
||||||
|
* is based on the value at that key. For example, given a key of `timestamp` and a batchSize of 3000,
|
||||||
|
* each item in the buffer will be guaranteed to be within 3000 timestamp units from the first element.
|
||||||
|
* @param batchSize Size of the batch (in units of buffer length or value at key).
|
||||||
|
* @param batchRate Desired rate of data transfer to next stream.
|
||||||
|
* @param flushStrategy Buffering strategy to use.
|
||||||
|
* @param keyBy Key to determine if element fits into buffer or items need to be cleared from buffer.
|
||||||
|
*/
|
||||||
export function accumulator(
|
export function accumulator(
|
||||||
batchSize: number,
|
batchSize: number,
|
||||||
batchRate: number | undefined,
|
batchRate: number | undefined,
|
||||||
@ -246,6 +266,20 @@ export function accumulator(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accummulates and sends batches of data. Each chunk that flows into the stream is checked against items
|
||||||
|
* in the buffer. How the buffer is mutated is based on 1 of 2 possible buffering strategies:
|
||||||
|
* 1. Sliding
|
||||||
|
* - If the iteratee returns false, the front of the buffer is popped until iteratee returns true. The
|
||||||
|
* item is pushed into the buffer and buffer is pushed into stream.
|
||||||
|
* 2. Rolling
|
||||||
|
* - If the iteratee returns false, the buffer is cleared and pushed into stream. The item is
|
||||||
|
* then pushed into the buffer.
|
||||||
|
* @param batchRate Desired rate of data transfer to next stream.
|
||||||
|
* @param flushStrategy Buffering strategy to use.
|
||||||
|
* @param iteratee Function applied to buffer when a chunk of data enters stream to determine if element fits into
|
||||||
|
* or items need to be cleared from buffer.
|
||||||
|
*/
|
||||||
export function accumulatorBy<T, S extends FlushStrategy>(
|
export function accumulatorBy<T, S extends FlushStrategy>(
|
||||||
batchRate: number | undefined,
|
batchRate: number | undefined,
|
||||||
flushStrategy: S,
|
flushStrategy: S,
|
||||||
|
@ -26,9 +26,7 @@ test.cb("accumulator() rolling", t => {
|
|||||||
.on("error", (e: any) => {
|
.on("error", (e: any) => {
|
||||||
t.end(e);
|
t.end(e);
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
[...firstFlush, ...secondFlush, ...thirdFlush].forEach(item => {
|
[...firstFlush, ...secondFlush, ...thirdFlush].forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -61,9 +59,7 @@ test.cb("accumulator() rolling with key", t => {
|
|||||||
.on("error", (e: any) => {
|
.on("error", (e: any) => {
|
||||||
t.end(e);
|
t.end(e);
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
[...firstFlush, ...secondFlush].forEach(item => {
|
[...firstFlush, ...secondFlush].forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -105,9 +101,7 @@ test.cb(
|
|||||||
index++;
|
index++;
|
||||||
t.pass();
|
t.pass();
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -162,9 +156,7 @@ test.cb(
|
|||||||
);
|
);
|
||||||
t.pass();
|
t.pass();
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -209,9 +201,7 @@ test.cb("accumulator() sliding", t => {
|
|||||||
.on("error", (e: any) => {
|
.on("error", (e: any) => {
|
||||||
t.end(e);
|
t.end(e);
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -266,9 +256,7 @@ test.cb("accumulator() sliding with key", t => {
|
|||||||
.on("error", (e: any) => {
|
.on("error", (e: any) => {
|
||||||
t.end(e);
|
t.end(e);
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -309,9 +297,7 @@ test.cb(
|
|||||||
index++;
|
index++;
|
||||||
t.pass();
|
t.pass();
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -372,9 +358,7 @@ test.cb(
|
|||||||
);
|
);
|
||||||
t.pass();
|
t.pass();
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -416,9 +400,7 @@ test.cb("accumulatorBy() rolling", t => {
|
|||||||
.on("error", (e: any) => {
|
.on("error", (e: any) => {
|
||||||
t.end(e);
|
t.end(e);
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
[...firstFlush, ...secondFlush].forEach(item => {
|
[...firstFlush, ...secondFlush].forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -457,9 +439,7 @@ test.cb(
|
|||||||
expect(err.message).to.equal("Failed mapping");
|
expect(err.message).to.equal("Failed mapping");
|
||||||
t.pass();
|
t.pass();
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
@ -524,9 +504,7 @@ test.cb("accumulatorBy() sliding", t => {
|
|||||||
.on("error", (e: any) => {
|
.on("error", (e: any) => {
|
||||||
t.end(e);
|
t.end(e);
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
});
|
});
|
||||||
@ -565,9 +543,7 @@ test.cb(
|
|||||||
expect(err.message).to.equal("Failed mapping");
|
expect(err.message).to.equal("Failed mapping");
|
||||||
t.pass();
|
t.pass();
|
||||||
})
|
})
|
||||||
.on("end", () => {
|
.on("end", t.end);
|
||||||
t.end();
|
|
||||||
});
|
|
||||||
|
|
||||||
input.forEach(item => {
|
input.forEach(item => {
|
||||||
source.push(item);
|
source.push(item);
|
||||||
|
@ -11,8 +11,7 @@ test.cb("batch() batches chunks together", t => {
|
|||||||
source
|
source
|
||||||
.pipe(batch(3))
|
.pipe(batch(3))
|
||||||
.on("data", (element: string[]) => {
|
.on("data", (element: string[]) => {
|
||||||
expect(element).to.deep.equal(expectedElements[i]);
|
t.deepEqual(element, expectedElements[i]);
|
||||||
t.pass();
|
|
||||||
i++;
|
i++;
|
||||||
})
|
})
|
||||||
.on("error", t.end)
|
.on("error", t.end)
|
||||||
@ -39,8 +38,7 @@ test.cb("batch() yields a batch after the timeout", t => {
|
|||||||
source
|
source
|
||||||
.pipe(batch(3))
|
.pipe(batch(3))
|
||||||
.on("data", (element: string[]) => {
|
.on("data", (element: string[]) => {
|
||||||
expect(element).to.deep.equal(expectedElements[i]);
|
t.deepEqual(element, expectedElements[i]);
|
||||||
t.pass();
|
|
||||||
i++;
|
i++;
|
||||||
})
|
})
|
||||||
.on("error", t.fail)
|
.on("error", t.fail)
|
||||||
|
@ -9,15 +9,11 @@
|
|||||||
"suppressImplicitAnyIndexErrors": true,
|
"suppressImplicitAnyIndexErrors": true,
|
||||||
"outDir": "./dist",
|
"outDir": "./dist",
|
||||||
"module": "commonjs"
|
"module": "commonjs"
|
||||||
},
|
|
||||||
"target": "es5",
|
|
||||||
"lib": [
|
|
||||||
"es2016"
|
|
||||||
],
|
|
||||||
"sourceMap": true,
|
|
||||||
"declaration": true
|
|
||||||
},
|
},
|
||||||
"include": [
|
"target": "es5",
|
||||||
"src/**/*.ts"
|
"lib": [
|
||||||
]
|
"es2016"
|
||||||
|
],
|
||||||
|
"sourceMap": true,
|
||||||
|
"declaration": true
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user