Update demux

This commit is contained in:
Jerry Kurian
2019-09-12 09:08:49 -04:00
parent 65c36a8f22
commit 586f618e95
2 changed files with 128 additions and 73 deletions

View File

@@ -24,7 +24,7 @@ const eventsTarget = {
export function demux(
construct: () => NodeJS.WritableStream | NodeJS.ReadWriteStream,
demuxBy: { key?: string; keyBy?: (chunk: any) => string },
demuxBy: string | ((chunk: any) => string),
options?: WritableOptions,
): Writable {
return new Demux(construct, demuxBy, options);
@@ -42,19 +42,17 @@ class Demux extends Writable {
construct: (
destKey?: string,
) => NodeJS.WritableStream | NodeJS.ReadWriteStream,
demuxBy: { key?: string; keyBy?: (chunk: any) => string },
demuxBy: string | ((chunk: any) => string),
options?: WritableOptions,
) {
super(options);
if (demuxBy.keyBy === undefined && demuxBy.key === undefined) {
throw new Error("keyBy or key must be provided in second argument");
}
this.demuxer = demuxBy.keyBy || ((chunk: any) => chunk[demuxBy.key!]);
this.demuxer =
typeof demuxBy === "string" ? chunk => chunk[demuxBy] : demuxBy;
this.construct = construct;
this.streamsByKey = {};
}
public async _write(chunk: any, encoding: any, cb: any) {
public _write(chunk: any, encoding: any, cb: any) {
const destKey = this.demuxer(chunk);
if (this.streamsByKey[destKey] === undefined) {
this.streamsByKey[destKey] = this.construct(destKey);