This commit is contained in:
Jerry Kurian 2019-09-26 09:44:42 -04:00
parent f177f95f52
commit f6e3a03eb7

View File

@ -16,32 +16,31 @@ const eventsTarget = {
error: EventSubscription.Self, error: EventSubscription.Self,
finish: EventSubscription.Self, finish: EventSubscription.Self,
pause: EventSubscription.Self, pause: EventSubscription.Self,
pipe: EventSubscription.Unhandled, pipe: EventSubscription.Self,
readable: EventSubscription.Self, readable: EventSubscription.Self,
resume: EventSubscription.Self, resume: EventSubscription.Self,
unpipe: EventSubscription.Unhandled, unpipe: EventSubscription.Self,
}; };
type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
export function demux( export function demux(
construct: () => NodeJS.WritableStream | NodeJS.ReadWriteStream, construct: () => DemuxStreams,
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options?: WritableOptions, options?: WritableOptions,
): Writable { ): Writable {
return new Demux(construct, demuxBy, options); return new Demux(construct, demuxBy, options);
} }
// @TODO handle pipe event ie) Multiplex
class Demux extends Writable { class Demux extends Writable {
private streamsByKey: { private streamsByKey: {
[key: string]: NodeJS.WritableStream | NodeJS.ReadWriteStream; [key: string]: DemuxStreams;
}; };
private demuxer: (chunk: any) => string; private demuxer: (chunk: any) => string;
private construct: ( private construct: (destKey?: string) => DemuxStreams;
destKey?: string,
) => NodeJS.WritableStream | NodeJS.ReadWriteStream;
constructor( constructor(
construct: ( construct: (destKey?: string) => DemuxStreams,
destKey?: string,
) => NodeJS.WritableStream | NodeJS.ReadWriteStream,
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options: WritableOptions = {}, options: WritableOptions = {},
) { ) {
@ -76,10 +75,6 @@ class Demux extends Writable {
this.streamsByKey[key].on(event, cb), this.streamsByKey[key].on(event, cb),
); );
break; break;
case EventSubscription.Unhandled:
throw new Error(
"Stream must be multiplexed before handling this event",
);
default: default:
super.on(event, cb); super.on(event, cb);
} }
@ -96,10 +91,6 @@ class Demux extends Writable {
this.streamsByKey[key].once(event, cb), this.streamsByKey[key].once(event, cb),
); );
break; break;
case EventSubscription.Unhandled:
throw new Error(
"Stream must be multiplexed before handling this event",
);
default: default:
super.once(event, cb); super.once(event, cb);
} }