highland icon indicating copy to clipboard operation
highland copied to clipboard

Strange behaviour of a forked stream

Open kharandziuk opened this issue 8 years ago • 1 comments

Check the code below:

const H = require('highland')

const IntervalStream = function(time, value) {
  return H(function(push, next) {
    push(null, value)
    setTimeout(
      next,
      time
    )
  })
}

const stream = IntervalStream(100, 1).scan1(H.add)
  //.tap(x => x)// HACK: doesn't work without tap <-----------------------------

stream.fork().filter(i => i % 5).map(() => '.').pipe(process.stdout)
stream.fork().reject(i => i % 5).map(() => '!').pipe(process.stderr)

If run it:

>node generator.js
................^C

but if I uncoment the line with .tap I see:

>node generator.js
....!....!...

I believe it's a kind of a bug.

"dependencies": {
    "highland": "^2.10.5"
  }

kharandziuk avatar Apr 06 '17 08:04 kharandziuk

The issue here is that you're forking a stream after you've consumed from one of its fork (i.e., when you called pipe). You generally want to fork all of your streams first, and then consume from them after. This way, you get a chance to set up the correct backpressure behavior before any stream consumption happens.

Something like this should work without needing tap.

const forkA = stream.fork()
const forkB = stream.fork()

forkA.filter(i => i % 5).map(() => '.').pipe(process.stdout)
forkB.reject(i => i % 5).map(() => '!').pipe(process.stderr)

That said, this is definitely a bug, since it should be possible to dynamically add forks in this way. I don't have a lot of time to look at this right now though (and backpressure logic in 2.x is something of a Rube Goldberg machine), so this bug will probably remain open for a while.

vqvu avatar Apr 07 '17 10:04 vqvu