Add chunk to constructor (#5)

* Add chunk to constructor

* Add test

* Bump version
This commit is contained in:
Jerry Kurian 2020-04-27 12:25:36 -04:00 committed by GitHub
parent f661f9be6b
commit 71b03678ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 13 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@jogogo/mhysa", "name": "@jogogo/mhysa",
"version": "2.0.0-alpha.2", "version": "2.0.0-alpha.3",
"description": "Streams and event emitter utils for Node.js", "description": "Streams and event emitter utils for Node.js",
"keywords": [ "keywords": [
"promise", "promise",

View File

@ -1,15 +1,7 @@
import { DuplexOptions, Duplex, Transform, Writable } from "stream"; import { DuplexOptions, Duplex, Transform } from "stream";
import { isReadable } from "../helpers"; import { isReadable } from "../helpers";
enum EventSubscription {
Last = 0,
First,
All,
Self,
Unhandled,
}
type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream; type DemuxStreams = NodeJS.WritableStream | NodeJS.ReadWriteStream;
export interface DemuxOptions extends DuplexOptions { export interface DemuxOptions extends DuplexOptions {
@ -17,7 +9,7 @@ export interface DemuxOptions extends DuplexOptions {
} }
export function demux( export function demux(
construct: (destKey?: string) => DemuxStreams, construct: (destKey?: string, chunk?: any) => DemuxStreams,
demuxBy: string | ((chunk: any) => string), demuxBy: string | ((chunk: any) => string),
options?: DemuxOptions, options?: DemuxOptions,
): Duplex { ): Duplex {
@ -29,7 +21,7 @@ class Demux extends Duplex {
[key: string]: DemuxStreams; [key: string]: DemuxStreams;
}; };
private demuxer: (chunk: any) => string; private demuxer: (chunk: any) => string;
private construct: (destKey?: string) => DemuxStreams; private construct: (destKey?: string, chunk?: any) => DemuxStreams;
private remultiplex: boolean; private remultiplex: boolean;
private transform: Transform; private transform: Transform;
constructor( constructor(
@ -61,7 +53,7 @@ class Demux extends Duplex {
public async _write(chunk: any, encoding: any, cb: any) { public async _write(chunk: any, encoding: any, cb: any) {
const destKey = this.demuxer(chunk); const destKey = this.demuxer(chunk);
if (this.streamsByKey[destKey] === undefined) { if (this.streamsByKey[destKey] === undefined) {
const newPipeline = await this.construct(destKey); const newPipeline = await this.construct(destKey, chunk);
this.streamsByKey[destKey] = newPipeline; this.streamsByKey[destKey] = newPipeline;
if (this.remultiplex && isReadable(newPipeline)) { if (this.remultiplex && isReadable(newPipeline)) {
(newPipeline as NodeJS.ReadWriteStream).pipe(this.transform); (newPipeline as NodeJS.ReadWriteStream).pipe(this.transform);

View File

@ -44,6 +44,34 @@ test.cb("demux() constructor should be called once per key", t => {
fromArray(input).pipe(demuxed); fromArray(input).pipe(demuxed);
}); });
test.cb("demux() item written passed in constructor", t => {
t.plan(4);
const input = [
{ key: "a", visited: [] },
{ key: "b", visited: [] },
{ key: "c", visited: [] },
];
const construct = sinon.spy((destKey: string, item: any) => {
expect(item).to.deep.equal({ key: destKey, visited: [] });
t.pass();
const dest = map((chunk: Test) => {
chunk.visited.push(1);
return chunk;
});
return dest;
});
const demuxed = demux(construct, "key", {});
demuxed.on("finish", () => {
t.pass();
t.end();
});
fromArray(input).pipe(demuxed);
});
test.cb("demux() should send input through correct pipeline", t => { test.cb("demux() should send input through correct pipeline", t => {
t.plan(6); t.plan(6);
const input = [ const input = [