Strange behaviour of a forked stream
Check the code below:
const H = require('highland')
const IntervalStream = function(time, value) {
return H(function(push, next) {
push(null, value)
setTimeout(
next,
time
)
})
}
const stream = IntervalStream(100, 1).scan1(H.add)
//.tap(x => x)// HACK: doesn't work without tap <-----------------------------
stream.fork().filter(i => i % 5).map(() => '.').pipe(process.stdout)
stream.fork().reject(i => i % 5).map(() => '!').pipe(process.stderr)
If run it:
>node generator.js
................^C
but if I uncoment the line with .tap I see:
>node generator.js
....!....!...
I believe it's a kind of a bug.
"dependencies": {
"highland": "^2.10.5"
}
The issue here is that you're forking a stream after you've consumed from one of its fork (i.e., when you called pipe). You generally want to fork all of your streams first, and then consume from them after. This way, you get a chance to set up the correct backpressure behavior before any stream consumption happens.
Something like this should work without needing tap.
const forkA = stream.fork()
const forkB = stream.fork()
forkA.filter(i => i % 5).map(() => '.').pipe(process.stdout)
forkB.reject(i => i % 5).map(() => '!').pipe(process.stderr)
That said, this is definitely a bug, since it should be possible to dynamically add forks in this way. I don't have a lot of time to look at this right now though (and backpressure logic in 2.x is something of a Rube Goldberg machine), so this bug will probably remain open for a while.