Compare commits
4 Commits
feature/de
...
feature/de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c75ef88b4 | ||
|
|
7113400cb1 | ||
|
|
a42560edfc | ||
|
|
58a95a91d0 |
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@jogogo/mhysa",
|
"name": "@jogogo/mhysa",
|
||||||
"version": "2.0.0-alpha.3",
|
"version": "2.0.0-alpha.4",
|
||||||
"description": "Streams and event emitter utils for Node.js",
|
"description": "Streams and event emitter utils for Node.js",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"promise",
|
"promise",
|
||||||
|
|||||||
@@ -6,36 +6,45 @@ type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
|
|||||||
|
|
||||||
export interface DemuxOptions extends DuplexOptions {
|
export interface DemuxOptions extends DuplexOptions {
|
||||||
remultiplex?: boolean;
|
remultiplex?: boolean;
|
||||||
purgeIdleInterval?: number;
|
|
||||||
maxIdleTime?: number;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function demux(
|
export function demux(
|
||||||
construct: (destKey?: string, chunk?: any) => DemuxStreams,
|
pipelineConstructor: (
|
||||||
|
destKey?: string,
|
||||||
|
chunk?: any,
|
||||||
|
) => DemuxStreams | DemuxStreams[],
|
||||||
demuxBy: string | ((chunk: any) => string),
|
demuxBy: string | ((chunk: any) => string),
|
||||||
options?: DemuxOptions,
|
options?: DemuxOptions,
|
||||||
): Duplex {
|
): Duplex {
|
||||||
return new Demux(construct, demuxBy, options);
|
return new Demux(pipelineConstructor, demuxBy, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
class Demux extends Duplex {
|
class Demux extends Duplex {
|
||||||
private streamsByKey: {
|
private streamsByKey: {
|
||||||
[key: string]: { stream: DemuxStreams; lastWrite: number };
|
[key: string]: DemuxStreams[];
|
||||||
};
|
};
|
||||||
private demuxer: (chunk: any) => string;
|
private demuxer: (chunk: any) => string;
|
||||||
private construct: (destKey?: string, chunk?: any) => DemuxStreams;
|
private pipelineConstructor: (
|
||||||
|
destKey?: string,
|
||||||
|
chunk?: any,
|
||||||
|
) => DemuxStreams[];
|
||||||
private remultiplex: boolean;
|
private remultiplex: boolean;
|
||||||
private transform: Transform;
|
private transform: Transform;
|
||||||
private maxIdleTime: number;
|
|
||||||
constructor(
|
constructor(
|
||||||
construct: (destKey?: string) => DemuxStreams,
|
pipelineConstructor: (
|
||||||
|
destKey?: string,
|
||||||
|
chunk?: any,
|
||||||
|
) => DemuxStreams | DemuxStreams[],
|
||||||
demuxBy: string | ((chunk: any) => string),
|
demuxBy: string | ((chunk: any) => string),
|
||||||
options: DemuxOptions = {},
|
options: DemuxOptions = {},
|
||||||
) {
|
) {
|
||||||
super(options);
|
super(options);
|
||||||
this.demuxer =
|
this.demuxer =
|
||||||
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
|
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
|
||||||
this.construct = construct;
|
this.pipelineConstructor = (destKey: string, chunk?: any) => {
|
||||||
|
const pipeline = pipelineConstructor(destKey, chunk);
|
||||||
|
return Array.isArray(pipeline) ? pipeline : [pipeline];
|
||||||
|
};
|
||||||
this.remultiplex =
|
this.remultiplex =
|
||||||
options.remultiplex === undefined ? true : options.remultiplex;
|
options.remultiplex === undefined ? true : options.remultiplex;
|
||||||
this.streamsByKey = {};
|
this.streamsByKey = {};
|
||||||
@@ -46,74 +55,77 @@ class Demux extends Duplex {
|
|||||||
cb(null);
|
cb(null);
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
this.maxIdleTime = options.maxIdleTime || 600000;
|
|
||||||
const purgeIdleInterval = options.purgeIdleInterval || 600000;
|
|
||||||
setInterval(() => {
|
|
||||||
this._destroyIdle();
|
|
||||||
}, purgeIdleInterval);
|
|
||||||
|
|
||||||
this.on("unpipe", () => this._flush());
|
this.on("unpipe", () => this._flush());
|
||||||
}
|
}
|
||||||
|
|
||||||
private _destroyIdle() {
|
|
||||||
for (let key in this.streamsByKey) {
|
|
||||||
const curTime = Date.now();
|
|
||||||
const pipeline = this.streamsByKey[key];
|
|
||||||
|
|
||||||
if (curTime - pipeline.lastWrite > this.maxIdleTime) {
|
|
||||||
delete this.streamsByKey[key];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// tslint:disable-next-line
|
// tslint:disable-next-line
|
||||||
public _read(size: number) {}
|
public _read(size: number) {}
|
||||||
|
|
||||||
public async _write(chunk: any, encoding: any, cb: any) {
|
public async _write(chunk: any, encoding: any, cb: any) {
|
||||||
const destKey = this.demuxer(chunk);
|
const destKey = this.demuxer(chunk);
|
||||||
if (this.streamsByKey[destKey] === undefined) {
|
if (this.streamsByKey[destKey] === undefined) {
|
||||||
const newPipeline = await this.construct(destKey, chunk);
|
const newPipelines = this.pipelineConstructor(destKey, chunk);
|
||||||
this.streamsByKey[destKey] = {
|
this.streamsByKey[destKey] = newPipelines;
|
||||||
stream: newPipeline,
|
|
||||||
lastWrite: Date.now(),
|
newPipelines.forEach(newPipeline => {
|
||||||
};
|
if (this.remultiplex && isReadable(newPipeline)) {
|
||||||
if (this.remultiplex && isReadable(newPipeline)) {
|
(newPipeline as NodeJS.ReadWriteStream).pipe(
|
||||||
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);
|
this.transform,
|
||||||
} else if (this.remultiplex) {
|
);
|
||||||
console.error(
|
} else if (this.remultiplex) {
|
||||||
`Pipeline construct for ${destKey} does not implement readable interface`,
|
console.error(
|
||||||
|
`Pipeline construct for ${destKey} does not implement readable interface`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
const pipelines = this.streamsByKey[destKey];
|
||||||
|
const pendingDrains: Array<Promise<any>> = [];
|
||||||
|
|
||||||
|
pipelines.forEach(pipeline => {
|
||||||
|
if (!pipeline.write(chunk, encoding)) {
|
||||||
|
pendingDrains.push(
|
||||||
|
new Promise(resolve => {
|
||||||
|
pipeline.once("drain", () => {
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
});
|
||||||
this.streamsByKey[destKey].lastWrite = Date.now();
|
await Promise.all(pendingDrains);
|
||||||
}
|
cb();
|
||||||
|
|
||||||
if (!this.streamsByKey[destKey].stream.write(chunk, encoding)) {
|
|
||||||
this.streamsByKey[destKey].stream.once("drain", () => {
|
|
||||||
cb();
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
cb();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public _flush() {
|
public _flush() {
|
||||||
const pipelines = Object.values(this.streamsByKey);
|
const pipelines: DemuxStreams[] = [].concat.apply(
|
||||||
let totalEnded = 0;
|
[],
|
||||||
pipelines.forEach(({ stream }) => {
|
Object.values(this.streamsByKey),
|
||||||
stream.once("end", () => {
|
);
|
||||||
totalEnded++;
|
const flushPromises: Array<Promise<void>> = [];
|
||||||
if (pipelines.length === totalEnded) {
|
pipelines.forEach(pipeline => {
|
||||||
this.push(null);
|
flushPromises.push(
|
||||||
this.emit("end");
|
new Promise(resolve => {
|
||||||
}
|
pipeline.once("end", () => {
|
||||||
});
|
resolve();
|
||||||
|
});
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
pipelines.forEach(pipeline => pipeline.end());
|
||||||
|
Promise.all(flushPromises).then(() => {
|
||||||
|
this.push(null);
|
||||||
|
this.emit("end");
|
||||||
});
|
});
|
||||||
pipelines.forEach(({ stream }) => stream.end());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public _destroy(error: any, cb: (error?: any) => void) {
|
public _destroy(error: any, cb: (error?: any) => void) {
|
||||||
const pipelines = Object.values(this.streamsByKey);
|
const pipelines: DemuxStreams[] = [].concat.apply(
|
||||||
pipelines.forEach(({ stream }) => (stream as any).destroy());
|
[],
|
||||||
|
Object.values(this.streamsByKey),
|
||||||
|
);
|
||||||
|
pipelines.forEach(p => (p as any).destroy());
|
||||||
cb(error);
|
cb(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -256,14 +256,11 @@ export default function mhysa(defaultOptions?: TransformOptions) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
* Composes multiple streams together. Writing occurs on first stream, piping occurs from last stream.
|
||||||
* @param {Function} construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
* @param construct Constructor for new output source. Should return a Writable or ReadWrite stream.
|
||||||
* @param {String | Function} demuxBy
|
* @param demuxBy
|
||||||
* @param {string} demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
* @param demuxBy.key? Key to fetch value from source chunks to demultiplex source.
|
||||||
* @param {Function} demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
* @param demuxBy.keyBy? Function to fetch value from source chunks to demultiplex source.
|
||||||
* @param {Object} options Demux stream options
|
* @param options Writable stream options
|
||||||
* @param {boolean} options.remultiplex? If demux should be remultiplexed into a single destination
|
|
||||||
* @param {number} options.purgeIdleInterval? Interval at which a purge for idle pipelines will occur
|
|
||||||
* @param {number} options.maxIdleTime? Min time a demuxed pipeline must be idle for to be purged
|
|
||||||
*/
|
*/
|
||||||
demux: withDefaultOptions(2, demux),
|
demux: withDefaultOptions(2, demux),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -862,39 +862,3 @@ test.cb("demux() should be 'destroyable'", t => {
|
|||||||
fakeSource.push(input[3]);
|
fakeSource.push(input[3]);
|
||||||
fakeSource.push(input[4]);
|
fakeSource.push(input[4]);
|
||||||
});
|
});
|
||||||
|
|
||||||
test.cb("Should delete idle pipelines", t => {
|
|
||||||
t.plan(5);
|
|
||||||
const input = [
|
|
||||||
{ key: "a", visited: [] },
|
|
||||||
{ key: "b", visited: [] },
|
|
||||||
{ key: "b", visited: [] },
|
|
||||||
{ key: "a", visited: [] },
|
|
||||||
{ key: "b", visited: [] },
|
|
||||||
{ key: "c", visited: [] },
|
|
||||||
];
|
|
||||||
const construct = sinon.spy((destKey: string) => {
|
|
||||||
const dest = map((chunk: Test) => {
|
|
||||||
chunk.visited.push(1);
|
|
||||||
return chunk;
|
|
||||||
});
|
|
||||||
t.pass();
|
|
||||||
|
|
||||||
return dest;
|
|
||||||
});
|
|
||||||
|
|
||||||
const demuxed = demux(construct, "key", {
|
|
||||||
maxIdleTime: 110,
|
|
||||||
purgeIdleInterval: 110,
|
|
||||||
});
|
|
||||||
|
|
||||||
demuxed.on("data", data => {
|
|
||||||
if (data.key === "c") t.end();
|
|
||||||
});
|
|
||||||
|
|
||||||
for (let i = 0; i < input.length; i++) {
|
|
||||||
setTimeout(() => {
|
|
||||||
demuxed.write(input[i]);
|
|
||||||
}, i * 100);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|||||||
Reference in New Issue
Block a user