2019-08-28 21:01:51 +00:00
import test from "ava" ;
import { expect } from "chai" ;
2019-09-09 15:53:21 +00:00
const { demux , map } = require ( "../src" ) ;
2019-08-29 12:50:11 +00:00
import { Writable } from "stream" ;
2019-09-09 15:53:21 +00:00
const sinon = require ( "sinon" ) ;
const { sleep } = require ( "../src/helpers" ) ;
import { performance } from "perf_hooks" ;
2019-08-28 21:01:51 +00:00
interface Test {
key : string ;
2019-09-09 15:53:21 +00:00
visited : number [ ] ;
2019-08-28 21:01:51 +00:00
}
2019-09-09 18:43:18 +00:00
2019-09-09 15:53:21 +00:00
test . cb ( "demux() constructor should be called once per key" , t = > {
t . plan ( 1 ) ;
2019-08-28 21:01:51 +00:00
const input = [
2019-09-09 15:53:21 +00:00
{ key : "a" , visited : [ ] } ,
{ key : "b" , visited : [ ] } ,
{ key : "a" , visited : [ ] } ,
{ key : "c" , visited : [ ] } ,
2019-09-09 17:47:38 +00:00
{ key : "a" , visited : [ ] } ,
{ key : "b" , visited : [ ] } ,
2019-08-28 21:01:51 +00:00
] ;
2019-09-09 15:53:21 +00:00
const construct = sinon . spy ( ( destKey : string ) = > {
2019-08-29 12:50:11 +00:00
const dest = map ( ( chunk : Test ) = > {
2019-09-09 15:53:21 +00:00
chunk . visited . push ( 1 ) ;
return chunk ;
2019-08-29 12:50:11 +00:00
} ) ;
2019-08-28 21:01:51 +00:00
return dest ;
2019-09-09 15:53:21 +00:00
} ) ;
2019-08-28 21:01:51 +00:00
2019-09-12 13:08:49 +00:00
const demuxed = demux ( construct , "key" , { objectMode : true } ) ;
2019-09-09 15:53:21 +00:00
2019-08-28 21:01:51 +00:00
demuxed . on ( "finish" , ( ) = > {
2019-09-09 15:53:21 +00:00
expect ( construct . withArgs ( "a" ) . callCount ) . to . equal ( 1 ) ;
expect ( construct . withArgs ( "b" ) . callCount ) . to . equal ( 1 ) ;
expect ( construct . withArgs ( "c" ) . callCount ) . to . equal ( 1 ) ;
2019-08-28 21:01:51 +00:00
t . pass ( ) ;
2019-08-29 18:39:08 +00:00
t . end ( ) ;
2019-08-28 21:01:51 +00:00
} ) ;
input . forEach ( event = > demuxed . write ( event ) ) ;
demuxed . end ( ) ;
} ) ;
2019-08-28 21:04:31 +00:00
2019-09-10 16:09:26 +00:00
test . cb ( "demux() should send input through correct pipeline" , t = > {
t . plan ( 6 ) ;
const input = [
{ key : "a" , visited : [ ] } ,
{ key : "b" , visited : [ ] } ,
{ key : "a" , visited : [ ] } ,
{ key : "c" , visited : [ ] } ,
{ key : "a" , visited : [ ] } ,
{ key : "b" , visited : [ ] } ,
] ;
const pipelineSpies = { } ;
const construct = ( destKey : string ) = > {
const mapper = sinon . spy ( ( chunk : Test ) = > {
return { . . . chunk , visited : [ 1 ] } ;
} ) ;
const dest = map ( mapper ) ;
pipelineSpies [ destKey ] = mapper ;
return dest ;
} ;
2019-09-12 13:08:49 +00:00
const demuxed = demux ( construct , "key" , { objectMode : true } ) ;
2019-09-10 16:09:26 +00:00
demuxed . on ( "finish" , ( ) = > {
pipelineSpies [ "a" ] . getCalls ( ) . forEach ( call = > {
expect ( call . args [ 0 ] . key ) . to . equal ( "a" ) ;
t . pass ( ) ;
} ) ;
pipelineSpies [ "b" ] . getCalls ( ) . forEach ( call = > {
expect ( call . args [ 0 ] . key ) . to . equal ( "b" ) ;
t . pass ( ) ;
} ) ;
pipelineSpies [ "c" ] . getCalls ( ) . forEach ( call = > {
expect ( call . args [ 0 ] . key ) . to . equal ( "c" ) ;
t . pass ( ) ;
} ) ;
t . end ( ) ;
} ) ;
input . forEach ( event = > demuxed . write ( event ) ) ;
demuxed . end ( ) ;
} ) ;
2019-09-09 15:53:21 +00:00
test . cb ( "demux() constructor should be called once per key using keyBy" , t = > {
t . plan ( 1 ) ;
2019-08-28 21:04:31 +00:00
const input = [
2019-09-09 15:53:21 +00:00
{ key : "a" , visited : [ ] } ,
{ key : "b" , visited : [ ] } ,
{ key : "a" , visited : [ ] } ,
{ key : "c" , visited : [ ] } ,
2019-09-09 17:47:38 +00:00
{ key : "a" , visited : [ ] } ,
{ key : "b" , visited : [ ] } ,
2019-08-28 21:04:31 +00:00
] ;
2019-09-09 15:53:21 +00:00
const construct = sinon . spy ( ( destKey : string ) = > {
2019-08-29 12:50:11 +00:00
const dest = map ( ( chunk : Test ) = > {
2019-09-09 15:53:21 +00:00
chunk . visited . push ( 1 ) ;
return chunk ;
2019-08-29 12:50:11 +00:00
} ) ;
2019-08-28 21:04:31 +00:00
return dest ;
2019-09-09 15:53:21 +00:00
} ) ;
2019-08-28 21:04:31 +00:00
2019-09-12 13:08:49 +00:00
const demuxed = demux ( construct , item = > item . key , { objectMode : true } ) ;
2019-09-09 15:53:21 +00:00
2019-08-28 21:04:31 +00:00
demuxed . on ( "finish" , ( ) = > {
2019-09-09 15:53:21 +00:00
expect ( construct . withArgs ( "a" ) . callCount ) . to . equal ( 1 ) ;
expect ( construct . withArgs ( "b" ) . callCount ) . to . equal ( 1 ) ;
expect ( construct . withArgs ( "c" ) . callCount ) . to . equal ( 1 ) ;
2019-08-28 21:04:31 +00:00
t . pass ( ) ;
2019-08-29 18:39:08 +00:00
t . end ( ) ;
2019-08-28 21:04:31 +00:00
} ) ;
input . forEach ( event = > demuxed . write ( event ) ) ;
demuxed . end ( ) ;
} ) ;
2019-08-30 13:33:29 +00:00
2019-09-10 16:09:26 +00:00
test . cb ( "demux() should send input through correct pipeline using keyBy" , t = > {
t . plan ( 6 ) ;
const input = [
{ key : "a" , visited : [ ] } ,
{ key : "b" , visited : [ ] } ,
{ key : "a" , visited : [ ] } ,
{ key : "c" , visited : [ ] } ,
{ key : "a" , visited : [ ] } ,
{ key : "b" , visited : [ ] } ,
] ;
const pipelineSpies = { } ;
const construct = ( destKey : string ) = > {
const mapper = sinon . spy ( ( chunk : Test ) = > {
return { . . . chunk , visited : [ 1 ] } ;
} ) ;
const dest = map ( mapper ) ;
pipelineSpies [ destKey ] = mapper ;
return dest ;
} ;
2019-09-12 13:08:49 +00:00
const demuxed = demux ( construct , item = > item . key , { objectMode : true } ) ;
2019-09-10 16:09:26 +00:00
demuxed . on ( "finish" , ( ) = > {
pipelineSpies [ "a" ] . getCalls ( ) . forEach ( call = > {
expect ( call . args [ 0 ] . key ) . to . equal ( "a" ) ;
t . pass ( ) ;
} ) ;
pipelineSpies [ "b" ] . getCalls ( ) . forEach ( call = > {
expect ( call . args [ 0 ] . key ) . to . equal ( "b" ) ;
t . pass ( ) ;
} ) ;
pipelineSpies [ "c" ] . getCalls ( ) . forEach ( call = > {
expect ( call . args [ 0 ] . key ) . to . equal ( "c" ) ;
t . pass ( ) ;
} ) ;
t . end ( ) ;
} ) ;
input . forEach ( event = > demuxed . write ( event ) ) ;
demuxed . end ( ) ;
} ) ;
2019-09-11 20:33:02 +00:00
test ( "demux() write should return false after if it has >= highWaterMark items buffered and drain should be emitted" , t = > {
2019-09-11 19:09:51 +00:00
return new Promise ( async ( resolve , reject ) = > {
t . plan ( 7 ) ;
interface Chunk {
key : string ;
mapped : number [ ] ;
}
const input : Chunk [ ] = [
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
] ;
let pendingReads = input . length ;
const highWaterMark = 5 ;
const slowProcessorSpeed = 25 ;
const construct = ( destKey : string ) = > {
const first = map (
async ( chunk : Chunk ) = > {
await sleep ( slowProcessorSpeed ) ;
return { . . . chunk , mapped : [ 1 ] } ;
} ,
{ highWaterMark : 1 , objectMode : true } ,
) ;
first . on ( "data" , chunk = > {
expect ( chunk . mapped ) . to . deep . equal ( [ 1 ] ) ;
pendingReads -- ;
if ( pendingReads === 0 ) {
resolve ( ) ;
}
t . pass ( ) ;
} ) ;
return first ;
} ;
2019-09-12 13:08:49 +00:00
const _demux = demux ( construct , "key" , {
objectMode : true ,
highWaterMark ,
} ) ;
2019-09-11 19:09:51 +00:00
_demux . on ( "error" , err = > {
reject ( ) ;
} ) ;
for ( const item of input ) {
const res = _demux . write ( item ) ;
expect ( _demux . _writableState . length ) . to . be . at . most ( highWaterMark ) ;
if ( ! res ) {
await new Promise ( ( resolv , rej ) = > {
_demux . once ( "drain" , ( ) = > {
expect ( _demux . _writableState . length ) . to . be . equal ( 0 ) ;
t . pass ( ) ;
resolv ( ) ;
} ) ;
} ) ;
}
}
} ) ;
} ) ;
2019-09-11 20:33:02 +00:00
test ( "demux() should emit one drain event after slowProcessorSpeed * highWaterMark ms" , t = > {
2019-09-09 15:53:21 +00:00
return new Promise ( async ( resolve , reject ) = > {
2019-09-10 22:13:13 +00:00
t . plan ( 7 ) ;
interface Chunk {
key : string ;
mapped : number [ ] ;
}
const input : Chunk [ ] = [
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
] ;
let pendingReads = input . length ;
const highWaterMark = 5 ;
const slowProcessorSpeed = 25 ;
2019-09-11 20:33:02 +00:00
2019-09-09 15:53:21 +00:00
const construct = ( destKey : string ) = > {
2019-09-11 18:29:20 +00:00
const first = map (
async ( chunk : Chunk ) = > {
await sleep ( slowProcessorSpeed ) ;
chunk . mapped . push ( 1 ) ;
return chunk ;
} ,
{ highWaterMark : 1 , objectMode : true } ,
) ;
2019-09-09 15:53:21 +00:00
2019-09-11 20:33:02 +00:00
first . on ( "data" , ( ) = > {
t . pass ( ) ;
pendingReads -- ;
if ( pendingReads === 0 ) {
resolve ( ) ;
}
} ) ;
2019-09-09 15:53:21 +00:00
return first ;
} ;
2019-09-12 13:08:49 +00:00
const _demux = demux ( construct , "key" , {
objectMode : true ,
highWaterMark ,
} ) ;
2019-09-09 15:53:21 +00:00
_demux . on ( "error" , err = > {
reject ( ) ;
} ) ;
2019-09-11 20:33:02 +00:00
const start = performance . now ( ) ;
2019-09-09 15:53:21 +00:00
for ( const item of input ) {
const res = _demux . write ( item ) ;
if ( ! res ) {
2019-09-10 22:13:13 +00:00
await new Promise ( ( resolv , rej ) = > {
2019-09-11 20:33:02 +00:00
// This event should be received after all items in demux are processed
2019-09-10 22:13:13 +00:00
_demux . once ( "drain" , ( ) = > {
2019-09-11 18:29:20 +00:00
expect ( performance . now ( ) - start ) . to . be . greaterThan (
slowProcessorSpeed * highWaterMark ,
) ;
t . pass ( ) ;
2019-09-10 22:13:13 +00:00
resolv ( ) ;
} ) ;
} ) ;
2019-09-09 15:53:21 +00:00
}
}
} ) ;
} ) ;
test ( "demux() should emit one drain event when writing 6 items with highWaterMark of 5" , t = > {
return new Promise ( async ( resolve , reject ) = > {
2019-09-10 22:13:13 +00:00
t . plan ( 7 ) ;
2019-09-09 15:53:21 +00:00
interface Chunk {
key : string ;
mapped : number [ ] ;
}
2019-09-10 22:13:13 +00:00
const highWaterMark = 5 ;
const input = [
2019-09-11 20:33:02 +00:00
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
2019-09-10 22:13:13 +00:00
] ;
let pendingReads = input . length ;
2019-09-09 15:53:21 +00:00
const construct = ( destKey : string ) = > {
2019-09-11 20:33:02 +00:00
const first = map (
2019-09-11 18:29:20 +00:00
async ( chunk : Chunk ) = > {
await sleep ( 50 ) ;
chunk . mapped . push ( 2 ) ;
return chunk ;
} ,
{ highWaterMark : 1 , objectMode : true } ,
) ;
2019-09-09 15:53:21 +00:00
2019-09-11 20:33:02 +00:00
first . on ( "data" , ( ) = > {
pendingReads -- ;
t . pass ( ) ;
if ( pendingReads === 0 ) {
resolve ( ) ;
}
} ) ;
return first ;
2019-09-09 15:53:21 +00:00
} ;
2019-09-12 13:08:49 +00:00
const _demux = demux ( construct , "key" , {
objectMode : true ,
highWaterMark : 5 ,
} ) ;
2019-09-11 18:29:20 +00:00
2019-09-09 15:53:21 +00:00
_demux . on ( "error" , err = > {
reject ( ) ;
} ) ;
for ( const item of input ) {
const res = _demux . write ( item ) ;
expect ( _demux . _writableState . length ) . to . be . at . most ( highWaterMark ) ;
if ( ! res ) {
2019-09-11 18:29:20 +00:00
await new Promise ( _resolve = > {
2019-09-10 22:13:13 +00:00
_demux . once ( "drain" , ( ) = > {
_resolve ( ) ;
2019-09-11 18:29:20 +00:00
expect ( _demux . _writableState . length ) . to . be . equal ( 0 ) ;
t . pass ( ) ;
2019-09-10 22:13:13 +00:00
} ) ;
} ) ;
2019-09-09 15:53:21 +00:00
}
}
} ) ;
} ) ;
2019-09-12 13:41:04 +00:00
test . cb . only (
2019-09-12 13:08:49 +00:00
"demux() should emit drain event when third stream is bottleneck" ,
2019-09-09 15:53:21 +00:00
t = > {
2019-09-12 13:08:49 +00:00
t . plan ( 8 ) ;
2019-09-11 18:29:20 +00:00
const slowProcessorSpeed = 100 ;
2019-09-11 20:33:02 +00:00
const highWaterMark = 5 ;
2019-09-09 15:53:21 +00:00
interface Chunk {
key : string ;
mapped : number [ ] ;
}
const sink = new Writable ( {
objectMode : true ,
write ( chunk , encoding , cb ) {
2019-09-11 18:29:20 +00:00
expect ( chunk . mapped ) . to . deep . equal ( [ 1 , 2 ] ) ;
2019-09-09 15:53:21 +00:00
t . pass ( ) ;
2019-09-11 18:29:20 +00:00
pendingReads -- ;
2019-09-09 15:53:21 +00:00
if ( pendingReads === 0 ) {
t . end ( ) ;
}
2019-09-11 18:29:20 +00:00
cb ( ) ;
2019-09-09 15:53:21 +00:00
} ,
} ) ;
const construct = ( destKey : string ) = > {
const first = map (
( chunk : Chunk ) = > {
chunk . mapped . push ( 1 ) ;
return chunk ;
} ,
2019-09-11 18:29:20 +00:00
{ objectMode : true , highWaterMark : 1 } ,
2019-09-09 15:53:21 +00:00
) ;
const second = map (
async ( chunk : Chunk ) = > {
2019-09-10 22:13:13 +00:00
await sleep ( slowProcessorSpeed ) ;
2019-09-09 15:53:21 +00:00
chunk . mapped . push ( 2 ) ;
return chunk ;
} ,
{ objectMode : true , highWaterMark : 1 } ,
) ;
first . pipe ( second ) . pipe ( sink ) ;
return first ;
} ;
2019-09-12 13:08:49 +00:00
const _demux = demux ( construct , ( ) = > "a" , {
objectMode : true ,
highWaterMark ,
} ) ;
2019-09-09 15:53:21 +00:00
_demux . on ( "error" , err = > {
t . end ( err ) ;
} ) ;
2019-09-12 13:41:04 +00:00
// This event should be received after at least 5 * slowProcessorSpeed (two are read immediately by first and second, 5 remaining in demux before drain event)
2019-09-09 15:53:21 +00:00
_demux . on ( "drain" , ( ) = > {
expect ( _demux . _writableState . length ) . to . be . equal ( 0 ) ;
2019-09-11 18:29:20 +00:00
expect ( performance . now ( ) - start ) . to . be . greaterThan (
2019-09-11 20:33:02 +00:00
slowProcessorSpeed * ( input . length - 2 ) ,
2019-09-10 22:13:13 +00:00
) ;
2019-09-09 15:53:21 +00:00
t . pass ( ) ;
} ) ;
const input = [
{ key : "a" , mapped : [ ] } ,
2019-09-12 13:08:49 +00:00
{ key : "b" , mapped : [ ] } ,
{ key : "c" , mapped : [ ] } ,
{ key : "d" , mapped : [ ] } ,
{ key : "e" , mapped : [ ] } ,
{ key : "f" , mapped : [ ] } ,
{ key : "g" , mapped : [ ] } ,
] ;
let pendingReads = input . length ;
const start = performance . now ( ) ;
input . forEach ( item = > {
2019-09-26 13:24:58 +00:00
_demux . write ( item ) ;
2019-09-12 13:08:49 +00:00
} ) ;
} ,
) ;
test . cb (
"demux() should emit drain event when second stream is bottleneck" ,
t = > {
t . plan ( 8 ) ;
const slowProcessorSpeed = 100 ;
const highWaterMark = 5 ;
interface Chunk {
key : string ;
mapped : number [ ] ;
}
const sink = new Writable ( {
objectMode : true ,
write ( chunk , encoding , cb ) {
2019-09-12 13:41:04 +00:00
expect ( chunk . mapped ) . to . deep . equal ( [ 1 , 2 ] ) ;
2019-09-12 13:08:49 +00:00
t . pass ( ) ;
pendingReads -- ;
if ( pendingReads === 0 ) {
t . end ( ) ;
}
cb ( ) ;
} ,
} ) ;
const construct = ( destKey : string ) = > {
const first = map (
( chunk : Chunk ) = > {
chunk . mapped . push ( 1 ) ;
return chunk ;
} ,
{ objectMode : true , highWaterMark : 1 } ,
) ;
const second = map (
( chunk : Chunk ) = > {
chunk . mapped . push ( 2 ) ;
return chunk ;
} ,
{ objectMode : true , highWaterMark : 1 } ,
) ;
const third = map (
async ( chunk : Chunk ) = > {
await sleep ( slowProcessorSpeed ) ;
chunk . mapped . push ( 3 ) ;
return chunk ;
} ,
{ objectMode : true , highWaterMark : 1 } ,
) ;
first
. pipe ( second )
. pipe ( third )
. pipe ( sink ) ;
return first ;
} ;
const _demux = demux ( construct , ( ) = > "a" , {
objectMode : true ,
highWaterMark ,
} ) ;
_demux . on ( "error" , err = > {
t . end ( err ) ;
} ) ;
2019-09-12 13:41:04 +00:00
// This event should be received after at least 3 * slowProcessorSpeed (two are read immediately by first and second, 3 remaining in demux before drain event)
2019-09-12 13:08:49 +00:00
_demux . on ( "drain" , ( ) = > {
expect ( _demux . _writableState . length ) . to . be . equal ( 0 ) ;
expect ( performance . now ( ) - start ) . to . be . greaterThan (
slowProcessorSpeed * ( input . length - 4 ) ,
) ;
t . pass ( ) ;
} ) ;
const input = [
2019-09-09 15:53:21 +00:00
{ key : "a" , mapped : [ ] } ,
2019-09-12 13:08:49 +00:00
{ key : "b" , mapped : [ ] } ,
{ key : "c" , mapped : [ ] } ,
{ key : "d" , mapped : [ ] } ,
{ key : "e" , mapped : [ ] } ,
{ key : "f" , mapped : [ ] } ,
{ key : "g" , mapped : [ ] } ,
2019-09-09 15:53:21 +00:00
] ;
let pendingReads = input . length ;
2019-09-11 18:29:20 +00:00
const start = performance . now ( ) ;
2019-09-09 15:53:21 +00:00
input . forEach ( item = > {
2019-09-26 13:24:58 +00:00
_demux . write ( item ) ;
2019-09-09 15:53:21 +00:00
} ) ;
} ,
) ;
2019-09-11 18:29:20 +00:00
test ( "demux() should be blocked by slowest pipeline" , t = > {
2019-09-09 17:47:38 +00:00
t . plan ( 1 ) ;
2019-09-11 18:29:20 +00:00
const slowProcessorSpeed = 100 ;
2019-09-09 17:47:38 +00:00
interface Chunk {
key : string ;
mapped : number [ ] ;
}
return new Promise ( async ( resolve , reject ) = > {
const construct = ( destKey : string ) = > {
const first = map (
async ( chunk : Chunk ) = > {
2019-09-11 18:29:20 +00:00
await sleep ( slowProcessorSpeed ) ;
chunk . mapped . push ( 1 ) ;
2019-09-09 17:47:38 +00:00
return chunk ;
} ,
{ objectMode : true , highWaterMark : 1 } ,
) ;
2019-09-11 20:33:02 +00:00
first . on ( "data" , chunk = > {
pendingReads -- ;
if ( chunk . key === "b" ) {
expect ( performance . now ( ) - start ) . to . be . greaterThan (
slowProcessorSpeed * totalItems ,
) ;
t . pass ( ) ;
expect ( pendingReads ) . to . equal ( 0 ) ;
resolve ( ) ;
}
} ) ;
2019-09-09 17:47:38 +00:00
return first ;
} ;
2019-09-12 13:08:49 +00:00
const _demux = demux ( construct , "key" , {
objectMode : true ,
highWaterMark : 1 ,
} ) ;
2019-09-09 17:47:38 +00:00
_demux . on ( "error" , err = > {
2019-09-10 22:13:13 +00:00
reject ( err ) ;
2019-09-09 17:47:38 +00:00
} ) ;
const input = [
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "c" , mapped : [ ] } ,
{ key : "c" , mapped : [ ] } ,
2019-09-11 18:29:20 +00:00
{ key : "c" , mapped : [ ] } ,
{ key : "b" , mapped : [ ] } ,
2019-09-09 17:47:38 +00:00
] ;
let pendingReads = input . length ;
2019-09-11 18:29:20 +00:00
const totalItems = input . length ;
2019-09-09 18:43:18 +00:00
const start = performance . now ( ) ;
2019-09-09 17:47:38 +00:00
for ( const item of input ) {
2019-09-11 18:29:20 +00:00
if ( ! _demux . write ( item ) ) {
await new Promise ( _resolve = > {
_demux . once ( "drain" , ( ) = > {
_resolve ( ) ;
} ) ;
} ) ;
}
2019-09-09 17:47:38 +00:00
}
} ) ;
} ) ;
2019-09-09 18:43:18 +00:00
2019-09-11 20:33:02 +00:00
test ( "demux() should emit drain event when second stream in pipeline is bottleneck" , t = > {
t . plan ( 5 ) ;
const highWaterMark = 3 ;
2019-09-09 15:53:21 +00:00
return new Promise ( async ( resolve , reject ) = > {
interface Chunk {
key : string ;
mapped : number [ ] ;
}
const sink = new Writable ( {
objectMode : true ,
write ( chunk , encoding , cb ) {
2019-09-11 18:29:20 +00:00
expect ( chunk . mapped ) . to . deep . equal ( [ 1 , 2 ] ) ;
2019-09-09 15:53:21 +00:00
t . pass ( ) ;
cb ( ) ;
if ( pendingReads === 0 ) {
resolve ( ) ;
}
} ,
} ) ;
2019-09-11 20:33:02 +00:00
2019-09-09 15:53:21 +00:00
const construct = ( destKey : string ) = > {
const first = map (
( chunk : Chunk ) = > {
expect ( first . _readableState . length ) . to . be . at . most ( 2 ) ;
chunk . mapped . push ( 1 ) ;
return chunk ;
} ,
2019-09-11 20:33:02 +00:00
{ objectMode : true , highWaterMark : 2 } ,
2019-09-09 15:53:21 +00:00
) ;
const second = map (
async ( chunk : Chunk ) = > {
2019-09-11 20:33:02 +00:00
await sleep ( 100 ) ;
2019-09-09 15:53:21 +00:00
chunk . mapped . push ( 2 ) ;
expect ( second . _writableState . length ) . to . be . equal ( 1 ) ;
pendingReads -- ;
return chunk ;
} ,
2019-09-11 18:29:20 +00:00
{ objectMode : true , highWaterMark : 1 } ,
2019-09-09 15:53:21 +00:00
) ;
first . pipe ( second ) . pipe ( sink ) ;
return first ;
} ;
2019-09-11 20:33:02 +00:00
2019-09-12 13:08:49 +00:00
const _demux = demux ( construct , "key" , {
objectMode : true ,
highWaterMark ,
} ) ;
2019-09-09 15:53:21 +00:00
_demux . on ( "error" , err = > {
reject ( ) ;
} ) ;
_demux . on ( "drain" , ( ) = > {
expect ( _demux . _writableState . length ) . to . be . equal ( 0 ) ;
t . pass ( ) ;
} ) ;
const input = [
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
{ key : "a" , mapped : [ ] } ,
] ;
let pendingReads = input . length ;
input . forEach ( item = > {
_demux . write ( item ) ;
} ) ;
} ) ;
} ) ;