From 2ee04a2d79b12044e684fe0a9df803f823d6fba6 Mon Sep 17 00:00:00 2001 From: Lewis Diamond Date: Fri, 30 Aug 2019 15:24:38 -0400 Subject: [PATCH] unclean --- src/functions/compose.ts | 83 +++++++++++++++++++++++++++++----------- src/functions/index.ts | 10 +---- 2 files changed, 62 insertions(+), 31 deletions(-) diff --git a/src/functions/compose.ts b/src/functions/compose.ts index f53aa6c..fea69eb 100644 --- a/src/functions/compose.ts +++ b/src/functions/compose.ts @@ -1,4 +1,11 @@ -import { pipeline, Duplex, DuplexOptions } from "stream"; +import { + pipeline, + Duplex, + Transform, + Readable, + Writable, + DuplexOptions, +} from "stream"; /** * Return a Readable stream of readable streams concatenated together @@ -12,7 +19,7 @@ export function compose( NodeJS.ReadableStream | NodeJS.ReadWriteStream | NodeJS.WritableStream >, options?: DuplexOptions, -): Duplex { +): Compose { // Maybe just return a new stream here if (streams.length < 2) { throw new Error("At least two streams are required to compose"); @@ -23,26 +30,41 @@ export function compose( return composed; } -class Compose extends Duplex { - private first: - | NodeJS.ReadableStream - | NodeJS.ReadWriteStream - | NodeJS.WritableStream; - private last: - | NodeJS.ReadableStream - | NodeJS.ReadWriteStream - | NodeJS.WritableStream; - constructor( - streams: Array< - | NodeJS.ReadableStream - | NodeJS.ReadWriteStream - | NodeJS.WritableStream - >, - options?: DuplexOptions, - ) { +enum EventSubscription { + Last = 0, + First, + All, + Self, +} +const eventsTarget = { + close: EventSubscription.Last, + data: EventSubscription.Last, + drain: EventSubscription.First, + end: EventSubscription.Last, + error: EventSubscription.Self, + finish: EventSubscription.Last, + pause: EventSubscription.Last, + pipe: EventSubscription.First, + readable: EventSubscription.Last, + resume: EventSubscription.Last, + unpipe: EventSubscription.First, +}; + +type AllStreams = + | NodeJS.ReadableStream + | NodeJS.ReadWriteStream + | NodeJS.WritableStream; + +export class Compose extends Duplex { + private first: AllStreams; + private last: AllStreams; + private streams: AllStreams[]; + + constructor(streams: AllStreams[], options?: DuplexOptions) { super(options); this.first = streams[0]; this.last = streams[streams.length - 1]; + this.streams = streams; pipeline(streams, (err: any) => { this.emit("error", err); }); @@ -56,11 +78,28 @@ class Compose extends Duplex { return (this.first as NodeJS.WritableStream).write(chunk); } + public bubble(...events: string[]) { + this.streams.forEach(s => { + events.forEach(e => { + s.on(e, (...args) => super.emit(e, ...args)); + }); + }); + } + public on(event: string, cb: any) { - if (event === "error") { - super.on(event, cb); + switch (eventsTarget[event]) { + case EventSubscription.First: + this.first.on(event, cb); + break; + case EventSubscription.Last: + this.last.on(event, cb); + break; + case EventSubscription.All: + this.streams.forEach(s => s.on(event, cb)); + break; + default: + super.on(event, cb); } - this.last.on(event, cb); return this; } } diff --git a/src/functions/index.ts b/src/functions/index.ts index 21ad7f9..2e4b8ad 100644 --- a/src/functions/index.ts +++ b/src/functions/index.ts @@ -288,12 +288,4 @@ export function accumulatorBy( return baseFunctions.accumulatorBy(batchRate, flushStrategy, iteratee); } -export function compose( - streams: Array, - options?: DuplexOptions, -) { - return baseFunctions.compose( - streams, - options, - ); -} +export const compose = baseFunctions.compose;