This commit is contained in:
Lewis Diamond 2019-08-30 15:24:38 -04:00
parent c7903376e9
commit 2ee04a2d79
2 changed files with 62 additions and 31 deletions

View File

@ -1,4 +1,11 @@
import { pipeline, Duplex, DuplexOptions } from "stream"; import {
pipeline,
Duplex,
Transform,
Readable,
Writable,
DuplexOptions,
} from "stream";
/** /**
* Return a Readable stream of readable streams concatenated together * Return a Readable stream of readable streams concatenated together
@ -12,7 +19,7 @@ export function compose(
NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream
>, >,
options?: DuplexOptions, options?: DuplexOptions,
): Duplex { ): Compose {
// Maybe just return a new stream here // Maybe just return a new stream here
if (streams.length < 2) { if (streams.length < 2) {
throw new Error("At least two streams are required to compose"); throw new Error("At least two streams are required to compose");
@ -23,26 +30,41 @@ export function compose(
return composed; return composed;
} }
class Compose extends Duplex { enum EventSubscription {
private first: Last = 0,
| NodeJS.ReadableStream First,
| NodeJS.ReadWriteStream All,
| NodeJS.WritableStream; Self,
private last: }
| NodeJS.ReadableStream const eventsTarget = {
| NodeJS.ReadWriteStream close: EventSubscription.Last,
| NodeJS.WritableStream; data: EventSubscription.Last,
constructor( drain: EventSubscription.First,
streams: Array< end: EventSubscription.Last,
| NodeJS.ReadableStream error: EventSubscription.Self,
| NodeJS.ReadWriteStream finish: EventSubscription.Last,
| NodeJS.WritableStream pause: EventSubscription.Last,
>, pipe: EventSubscription.First,
options?: DuplexOptions, readable: EventSubscription.Last,
) { resume: EventSubscription.Last,
unpipe: EventSubscription.First,
};
type AllStreams =
| NodeJS.ReadableStream
| NodeJS.ReadWriteStream
| NodeJS.WritableStream;
export class Compose extends Duplex {
private first: AllStreams;
private last: AllStreams;
private streams: AllStreams[];
constructor(streams: AllStreams[], options?: DuplexOptions) {
super(options); super(options);
this.first = streams[0]; this.first = streams[0];
this.last = streams[streams.length - 1]; this.last = streams[streams.length - 1];
this.streams = streams;
pipeline(streams, (err: any) => { pipeline(streams, (err: any) => {
this.emit("error", err); this.emit("error", err);
}); });
@ -56,11 +78,28 @@ class Compose extends Duplex {
return (this.first as NodeJS.WritableStream).write(chunk); return (this.first as NodeJS.WritableStream).write(chunk);
} }
public bubble(...events: string[]) {
this.streams.forEach(s => {
events.forEach(e => {
s.on(e, (...args) => super.emit(e, ...args));
});
});
}
public on(event: string, cb: any) { public on(event: string, cb: any) {
if (event === "error") { switch (eventsTarget[event]) {
super.on(event, cb); case EventSubscription.First:
this.first.on(event, cb);
break;
case EventSubscription.Last:
this.last.on(event, cb);
break;
case EventSubscription.All:
this.streams.forEach(s => s.on(event, cb));
break;
default:
super.on(event, cb);
} }
this.last.on(event, cb);
return this; return this;
} }
} }

View File

@ -288,12 +288,4 @@ export function accumulatorBy<T, S extends FlushStrategy>(
return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee); return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee);
} }
export function compose( export const compose = baseFunctions.compose;
streams: Array<Writable | Transform>,
options?: DuplexOptions,
) {
return baseFunctions.compose(
streams,
options,
);
}