Add tests for demux

This commit is contained in:
Jerry Kurian
2019-09-09 11:53:21 -04:00
parent 599ba16d48
commit 0067ba6a7c
6 changed files with 576 additions and 105 deletions

View File

@@ -29,6 +29,7 @@ enum EventSubscription {
All,
Self,
}
const eventsTarget = {
close: EventSubscription.Last,
data: EventSubscription.Last,

View File

@@ -1,5 +1,27 @@
import { WritableOptions, Writable } from "stream";
enum EventSubscription {
Last = 0,
First,
All,
Self,
Unhandled,
}
const eventsTarget = {
close: EventSubscription.Self,
data: EventSubscription.All,
drain: EventSubscription.Self,
end: EventSubscription.Self,
error: EventSubscription.Self,
finish: EventSubscription.Self,
pause: EventSubscription.Self,
pipe: EventSubscription.Unhandled,
readable: EventSubscription.Self,
resume: EventSubscription.Self,
unpipe: EventSubscription.Unhandled,
};
/**
* Return a Duplex stream that is pushed data from multiple sources
* @param streams Source streams to multiplex
@@ -76,4 +98,24 @@ class Demux extends Writable {
});
}
}
public on(event: string, cb: any) {
switch (eventsTarget[event]) {
case EventSubscription.Self:
super.on(event, cb);
break;
case EventSubscription.All:
Object.keys(this.streamsByKey).forEach(key =>
this.streamsByKey[key].stream.on(event, cb),
);
break;
case EventSubscription.Unhandled:
throw new Error(
"Stream must be multiplexed before handling this event",
);
default:
super.on(event, cb);
}
return this;
}
}