Compare commits
3 Commits
feature/de
...
feature/de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
46d118d4a5 | ||
|
|
5dd856deed | ||
|
|
d0a9d35fe7 |
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@jogogo/mhysa",
|
"name": "@jogogo/mhysa",
|
||||||
"version": "2.0.0-alpha.4",
|
"version": "2.0.0-alpha.3",
|
||||||
"description": "Streams and event emitter utils for Node.js",
|
"description": "Streams and event emitter utils for Node.js",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"promise",
|
"promise",
|
||||||
|
|||||||
@@ -6,45 +6,36 @@ type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
|
|||||||
|
|
||||||
export interface DemuxOptions extends DuplexOptions {
|
export interface DemuxOptions extends DuplexOptions {
|
||||||
remultiplex?: boolean;
|
remultiplex?: boolean;
|
||||||
|
purgeIdleInterval?: number;
|
||||||
|
maxIdleTime?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function demux(
|
export function demux(
|
||||||
pipelineConstructor: (
|
construct: (destKey?: string, chunk?: any) => DemuxStreams,
|
||||||
destKey?: string,
|
|
||||||
chunk?: any,
|
|
||||||
) => DemuxStreams | DemuxStreams[],
|
|
||||||
demuxBy: string | ((chunk: any) => string),
|
demuxBy: string | ((chunk: any) => string),
|
||||||
options?: DemuxOptions,
|
options?: DemuxOptions,
|
||||||
): Duplex {
|
): Duplex {
|
||||||
return new Demux(pipelineConstructor, demuxBy, options);
|
return new Demux(construct, demuxBy, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
class Demux extends Duplex {
|
class Demux extends Duplex {
|
||||||
private streamsByKey: {
|
private streamsByKey: {
|
||||||
[key: string]: DemuxStreams[];
|
[key: string]: { stream: DemuxStreams; lastWrite: number };
|
||||||
};
|
};
|
||||||
private demuxer: (chunk: any) => string;
|
private demuxer: (chunk: any) => string;
|
||||||
private pipelineConstructor: (
|
private construct: (destKey?: string, chunk?: any) => DemuxStreams;
|
||||||
destKey?: string,
|
|
||||||
chunk?: any,
|
|
||||||
) => DemuxStreams[];
|
|
||||||
private remultiplex: boolean;
|
private remultiplex: boolean;
|
||||||
private transform: Transform;
|
private transform: Transform;
|
||||||
|
private maxIdleTime: number;
|
||||||
constructor(
|
constructor(
|
||||||
pipelineConstructor: (
|
construct: (destKey?: string) => DemuxStreams,
|
||||||
destKey?: string,
|
|
||||||
chunk?: any,
|
|
||||||
) => DemuxStreams | DemuxStreams[],
|
|
||||||
demuxBy: string | ((chunk: any) => string),
|
demuxBy: string | ((chunk: any) => string),
|
||||||
options: DemuxOptions = {},
|
options: DemuxOptions = {},
|
||||||
) {
|
) {
|
||||||
super(options);
|
super(options);
|
||||||
this.demuxer =
|
this.demuxer =
|
||||||
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
|
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
|
||||||
this.pipelineConstructor = (destKey: string, chunk?: any) => {
|
this.construct = construct;
|
||||||
const pipeline = pipelineConstructor(destKey, chunk);
|
|
||||||
return Array.isArray(pipeline) ? pipeline : [pipeline];
|
|
||||||
};
|
|
||||||
this.remultiplex =
|
this.remultiplex =
|
||||||
options.remultiplex === undefined ? true : options.remultiplex;
|
options.remultiplex === undefined ? true : options.remultiplex;
|
||||||
this.streamsByKey = {};
|
this.streamsByKey = {};
|
||||||
@@ -55,77 +46,74 @@ class Demux extends Duplex {
|
|||||||
cb(null);
|
cb(null);
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
this.maxIdleTime = options.maxIdleTime || 600000;
|
||||||
|
const purgeIdleInterval = options.purgeIdleInterval || 600000;
|
||||||
|
setInterval(() => {
|
||||||
|
this._destroyIdle();
|
||||||
|
}, purgeIdleInterval);
|
||||||
|
|
||||||
this.on("unpipe", () => this._flush());
|
this.on("unpipe", () => this._flush());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _destroyIdle() {
|
||||||
|
for (let key in this.streamsByKey) {
|
||||||
|
const curTime = Date.now();
|
||||||
|
const pipeline = this.streamsByKey[key];
|
||||||
|
|
||||||
|
if (curTime - pipeline.lastWrite > this.maxIdleTime) {
|
||||||
|
delete this.streamsByKey[key];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// tslint:disable-next-line
|
// tslint:disable-next-line
|
||||||
public _read(size: number) {}
|
public _read(size: number) {}
|
||||||
|
|
||||||
public async _write(chunk: any, encoding: any, cb: any) {
|
public async _write(chunk: any, encoding: any, cb: any) {
|
||||||
const destKey = this.demuxer(chunk);
|
const destKey = this.demuxer(chunk);
|
||||||
if (this.streamsByKey[destKey] === undefined) {
|
if (this.streamsByKey[destKey] === undefined) {
|
||||||
const newPipelines = this.pipelineConstructor(destKey, chunk);
|
const newPipeline = await this.construct(destKey, chunk);
|
||||||
this.streamsByKey[destKey] = newPipelines;
|
this.streamsByKey[destKey] = {
|
||||||
|
stream: newPipeline,
|
||||||
newPipelines.forEach(newPipeline => {
|
lastWrite: Date.now(),
|
||||||
if (this.remultiplex && isReadable(newPipeline)) {
|
};
|
||||||
(newPipeline as NodeJS.ReadWriteStream).pipe(
|
if (this.remultiplex && isReadable(newPipeline)) {
|
||||||
this.transform,
|
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
|
||||||
);
|
} else if (this.remultiplex) {
|
||||||
} else if (this.remultiplex) {
|
console.error(
|
||||||
console.error(
|
`Pipeline construct for ${destKey} does not implement readable interface`,
|
||||||
`Pipeline construct for ${destKey} does not implement readable interface`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
const pipelines = this.streamsByKey[destKey];
|
|
||||||
const pendingDrains: Array<Promise<any>> = [];
|
|
||||||
|
|
||||||
pipelines.forEach(pipeline => {
|
|
||||||
if (!pipeline.write(chunk, encoding)) {
|
|
||||||
pendingDrains.push(
|
|
||||||
new Promise(resolve => {
|
|
||||||
pipeline.once("drain", () => {
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
}),
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
});
|
} else {
|
||||||
await Promise.all(pendingDrains);
|
this.streamsByKey[destKey].lastWrite = Date.now();
|
||||||
cb();
|
}
|
||||||
|
|
||||||
|
if (!this.streamsByKey[destKey].stream.write(chunk, encoding)) {
|
||||||
|
this.streamsByKey[destKey].stream.once("drain", () => {
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public _flush() {
|
public _flush() {
|
||||||
const pipelines: DemuxStreams[] = [].concat.apply(
|
const pipelines = Object.values(this.streamsByKey);
|
||||||
[],
|
let totalEnded = 0;
|
||||||
Object.values(this.streamsByKey),
|
pipelines.forEach(({ stream }) => {
|
||||||
);
|
stream.once("end", () => {
|
||||||
const flushPromises: Array<Promise<void>> = [];
|
totalEnded++;
|
||||||
pipelines.forEach(pipeline => {
|
if (pipelines.length === totalEnded) {
|
||||||
flushPromises.push(
|
this.push(null);
|
||||||
new Promise(resolve => {
|
this.emit("end");
|
||||||
pipeline.once("end", () => {
|
}
|
||||||
resolve();
|
});
|
||||||
});
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
pipelines.forEach(pipeline => pipeline.end());
|
|
||||||
Promise.all(flushPromises).then(() => {
|
|
||||||
this.push(null);
|
|
||||||
this.emit("end");
|
|
||||||
});
|
});
|
||||||
|
pipelines.forEach(({ stream }) => stream.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
public _destroy(error: any, cb: (error?: any) => void) {
|
public _destroy(error: any, cb: (error?: any) => void) {
|
||||||
const pipelines: DemuxStreams[] = [].concat.apply(
|
const pipelines = Object.values(this.streamsByKey);
|
||||||
[],
|
pipelines.forEach(({ stream }) => (stream as any).destroy());
|
||||||
Object.values(this.streamsByKey),
|
|
||||||
);
|
|
||||||
pipelines.forEach(p => (p as any).destroy());
|
|
||||||
cb(error);
|
cb(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -256,11 +256,14 @@ export default function mhysa(defaultOptions?: TransformOptions) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||||
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
* @param {Function} construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||||
* @param demuxBy
|
* @param {String | Function} demuxBy
|
||||||
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
* @param {string} demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||||
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
* @param {Function} demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||||
* @param options Writable stream options
|
* @param {Object} options Demux stream options
|
||||||
|
* @param {boolean} options.remultiplex? If demux should be remultiplexed into a single destination
|
||||||
|
* @param {number} options.purgeIdleInterval? Interval at which a purge for idle pipelines will occur
|
||||||
|
* @param {number} options.maxIdleTime? Min time a demuxed pipeline must be idle for to be purged
|
||||||
*/
|
*/
|
||||||
demux: withDefaultOptions(2, demux),
|
demux: withDefaultOptions(2, demux),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -862,3 +862,39 @@ test.cb("demux() should be 'destroyable'", t => {
|
|||||||
fakeSource.push(input[3]);
|
fakeSource.push(input[3]);
|
||||||
fakeSource.push(input[4]);
|
fakeSource.push(input[4]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test.cb("Should delete idle pipelines", t => {
|
||||||
|
t.plan(5);
|
||||||
|
const input = [
|
||||||
|
{ key: "a", visited: [] },
|
||||||
|
{ key: "b", visited: [] },
|
||||||
|
{ key: "b", visited: [] },
|
||||||
|
{ key: "a", visited: [] },
|
||||||
|
{ key: "b", visited: [] },
|
||||||
|
{ key: "c", visited: [] },
|
||||||
|
];
|
||||||
|
const construct = sinon.spy((destKey: string) => {
|
||||||
|
const dest = map((chunk: Test) => {
|
||||||
|
chunk.visited.push(1);
|
||||||
|
return chunk;
|
||||||
|
});
|
||||||
|
t.pass();
|
||||||
|
|
||||||
|
return dest;
|
||||||
|
});
|
||||||
|
|
||||||
|
const demuxed = demux(construct, "key", {
|
||||||
|
maxIdleTime: 110,
|
||||||
|
purgeIdleInterval: 110,
|
||||||
|
});
|
||||||
|
|
||||||
|
demuxed.on("data", data => {
|
||||||
|
if (data.key === "c") t.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
for (let i = 0; i < input.length; i++) {
|
||||||
|
setTimeout(() => {
|
||||||
|
demuxed.write(input[i]);
|
||||||
|
}, i * 100);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user