Compare commits
4 Commits
feature/de
...
feature/de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c75ef88b4 | ||
|
|
7113400cb1 | ||
|
|
a42560edfc | ||
|
|
58a95a91d0 |
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@jogogo/mhysa",
|
||||
"version": "2.0.0-alpha.3",
|
||||
"version": "2.0.0-alpha.4",
|
||||
"description": "Streams and event emitter utils for Node.js",
|
||||
"keywords": [
|
||||
"promise",
|
||||
|
||||
@@ -6,36 +6,45 @@ type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
|
||||
|
||||
export interface DemuxOptions extends DuplexOptions {
|
||||
remultiplex?: boolean;
|
||||
purgeIdleInterval?: number;
|
||||
maxIdleTime?: number;
|
||||
}
|
||||
|
||||
export function demux(
|
||||
construct: (destKey?: string, chunk?: any) => DemuxStreams,
|
||||
pipelineConstructor: (
|
||||
destKey?: string,
|
||||
chunk?: any,
|
||||
) => DemuxStreams | DemuxStreams[],
|
||||
demuxBy: string | ((chunk: any) => string),
|
||||
options?: DemuxOptions,
|
||||
): Duplex {
|
||||
return new Demux(construct, demuxBy, options);
|
||||
return new Demux(pipelineConstructor, demuxBy, options);
|
||||
}
|
||||
|
||||
class Demux extends Duplex {
|
||||
private streamsByKey: {
|
||||
[key: string]: { stream: DemuxStreams; lastWrite: number };
|
||||
[key: string]: DemuxStreams[];
|
||||
};
|
||||
private demuxer: (chunk: any) => string;
|
||||
private construct: (destKey?: string, chunk?: any) => DemuxStreams;
|
||||
private pipelineConstructor: (
|
||||
destKey?: string,
|
||||
chunk?: any,
|
||||
) => DemuxStreams[];
|
||||
private remultiplex: boolean;
|
||||
private transform: Transform;
|
||||
private maxIdleTime: number;
|
||||
constructor(
|
||||
construct: (destKey?: string) => DemuxStreams,
|
||||
pipelineConstructor: (
|
||||
destKey?: string,
|
||||
chunk?: any,
|
||||
) => DemuxStreams | DemuxStreams[],
|
||||
demuxBy: string | ((chunk: any) => string),
|
||||
options: DemuxOptions = {},
|
||||
) {
|
||||
super(options);
|
||||
this.demuxer =
|
||||
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
|
||||
this.construct = construct;
|
||||
this.pipelineConstructor = (destKey: string, chunk?: any) => {
|
||||
const pipeline = pipelineConstructor(destKey, chunk);
|
||||
return Array.isArray(pipeline) ? pipeline : [pipeline];
|
||||
};
|
||||
this.remultiplex =
|
||||
options.remultiplex === undefined ? true : options.remultiplex;
|
||||
this.streamsByKey = {};
|
||||
@@ -46,74 +55,77 @@ class Demux extends Duplex {
|
||||
cb(null);
|
||||
},
|
||||
});
|
||||
this.maxIdleTime = options.maxIdleTime || 600000;
|
||||
const purgeIdleInterval = options.purgeIdleInterval || 600000;
|
||||
setInterval(() => {
|
||||
this._destroyIdle();
|
||||
}, purgeIdleInterval);
|
||||
|
||||
this.on("unpipe", () => this._flush());
|
||||
}
|
||||
|
||||
private _destroyIdle() {
|
||||
for (let key in this.streamsByKey) {
|
||||
const curTime = Date.now();
|
||||
const pipeline = this.streamsByKey[key];
|
||||
|
||||
if (curTime - pipeline.lastWrite > this.maxIdleTime) {
|
||||
delete this.streamsByKey[key];
|
||||
}
|
||||
}
|
||||
}
|
||||
// tslint:disable-next-line
|
||||
public _read(size: number) {}
|
||||
|
||||
public async _write(chunk: any, encoding: any, cb: any) {
|
||||
const destKey = this.demuxer(chunk);
|
||||
if (this.streamsByKey[destKey] === undefined) {
|
||||
const newPipeline = await this.construct(destKey, chunk);
|
||||
this.streamsByKey[destKey] = {
|
||||
stream: newPipeline,
|
||||
lastWrite: Date.now(),
|
||||
};
|
||||
const newPipelines = this.pipelineConstructor(destKey, chunk);
|
||||
this.streamsByKey[destKey] = newPipelines;
|
||||
|
||||
newPipelines.forEach(newPipeline => {
|
||||
if (this.remultiplex && isReadable(newPipeline)) {
|
||||
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
|
||||
(newPipeline as NodeJS.ReadWriteStream).pipe(
|
||||
this.transform,
|
||||
);
|
||||
} else if (this.remultiplex) {
|
||||
console.error(
|
||||
`Pipeline construct for ${destKey} does not implement readable interface`,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
this.streamsByKey[destKey].lastWrite = Date.now();
|
||||
}
|
||||
|
||||
if (!this.streamsByKey[destKey].stream.write(chunk, encoding)) {
|
||||
this.streamsByKey[destKey].stream.once("drain", () => {
|
||||
cb();
|
||||
});
|
||||
} else {
|
||||
cb();
|
||||
}
|
||||
const pipelines = this.streamsByKey[destKey];
|
||||
const pendingDrains: Array<Promise<any>> = [];
|
||||
|
||||
pipelines.forEach(pipeline => {
|
||||
if (!pipeline.write(chunk, encoding)) {
|
||||
pendingDrains.push(
|
||||
new Promise(resolve => {
|
||||
pipeline.once("drain", () => {
|
||||
resolve();
|
||||
});
|
||||
}),
|
||||
);
|
||||
}
|
||||
});
|
||||
await Promise.all(pendingDrains);
|
||||
cb();
|
||||
}
|
||||
|
||||
public _flush() {
|
||||
const pipelines = Object.values(this.streamsByKey);
|
||||
let totalEnded = 0;
|
||||
pipelines.forEach(({ stream }) => {
|
||||
stream.once("end", () => {
|
||||
totalEnded++;
|
||||
if (pipelines.length === totalEnded) {
|
||||
const pipelines: DemuxStreams[] = [].concat.apply(
|
||||
[],
|
||||
Object.values(this.streamsByKey),
|
||||
);
|
||||
const flushPromises: Array<Promise<void>> = [];
|
||||
pipelines.forEach(pipeline => {
|
||||
flushPromises.push(
|
||||
new Promise(resolve => {
|
||||
pipeline.once("end", () => {
|
||||
resolve();
|
||||
});
|
||||
}),
|
||||
);
|
||||
});
|
||||
pipelines.forEach(pipeline => pipeline.end());
|
||||
Promise.all(flushPromises).then(() => {
|
||||
this.push(null);
|
||||
this.emit("end");
|
||||
}
|
||||
});
|
||||
});
|
||||
pipelines.forEach(({ stream }) => stream.end());
|
||||
}
|
||||
|
||||
public _destroy(error: any, cb: (error?: any) => void) {
|
||||
const pipelines = Object.values(this.streamsByKey);
|
||||
pipelines.forEach(({ stream }) => (stream as any).destroy());
|
||||
const pipelines: DemuxStreams[] = [].concat.apply(
|
||||
[],
|
||||
Object.values(this.streamsByKey),
|
||||
);
|
||||
pipelines.forEach(p => (p as any).destroy());
|
||||
cb(error);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,14 +256,11 @@ export default function mhysa(defaultOptions?: TransformOptions) {
|
||||
|
||||
/**
|
||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||
* @param {Function} construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||
* @param {String | Function} demuxBy
|
||||
* @param {string} demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||
* @param {Function} demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||
* @param {Object} options Demux stream options
|
||||
* @param {boolean} options.remultiplex? If demux should be remultiplexed into a single destination
|
||||
* @param {number} options.purgeIdleInterval? Interval at which a purge for idle pipelines will occur
|
||||
* @param {number} options.maxIdleTime? Min time a demuxed pipeline must be idle for to be purged
|
||||
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||
* @param demuxBy
|
||||
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||
* @param options Writable stream options
|
||||
*/
|
||||
demux: withDefaultOptions(2, demux),
|
||||
};
|
||||
|
||||
@@ -862,39 +862,3 @@ test.cb("demux() should be 'destroyable'", t => {
|
||||
fakeSource.push(input[3]);
|
||||
fakeSource.push(input[4]);
|
||||
});
|
||||
|
||||
test.cb("Should delete idle pipelines", t => {
|
||||
t.plan(5);
|
||||
const input = [
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "a", visited: [] },
|
||||
{ key: "b", visited: [] },
|
||||
{ key: "c", visited: [] },
|
||||
];
|
||||
const construct = sinon.spy((destKey: string) => {
|
||||
const dest = map((chunk: Test) => {
|
||||
chunk.visited.push(1);
|
||||
return chunk;
|
||||
});
|
||||
t.pass();
|
||||
|
||||
return dest;
|
||||
});
|
||||
|
||||
const demuxed = demux(construct, "key", {
|
||||
maxIdleTime: 110,
|
||||
purgeIdleInterval: 110,
|
||||
});
|
||||
|
||||
demuxed.on("data", data => {
|
||||
if (data.key === "c") t.end();
|
||||
});
|
||||
|
||||
for (let i = 0; i < input.length; i++) {
|
||||
setTimeout(() => {
|
||||
demuxed.write(input[i]);
|
||||
}, i * 100);
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user