highland icon indicating copy to clipboard operation
highland copied to clipboard

Fork without end

Open kharandziuk opened this issue 9 years ago • 1 comments

Check a sample below

const H = require('highland')

const stream = H([1, 2, 3, 4, 5]).map(String)

const l = stream.fork()
const r = stream.fork()

r.take(2).pipe(process.stdout)
l.pipe(process.stdout)

As a result I see:

>node test.js
1122

Why do I have only two elements from l-stream? How can I achieve behavior when It emits all 5?

I know that it's possible with .observe but in a case I loose back pressure

kharandziuk avatar Apr 07 '17 18:04 kharandziuk

This is a known issue with fork. In 2.x, there's no concept of "unsubscribing", so even though your r fork no long needs data after the first two values, it can't signal to the source that it no longer wishes to exert backpressure.

Your choices are:

  1. Use the 3.0.0 beta: npm install highland@next. This is beta software, so standard disclaimers apply, but it should be generally bug-free.
  2. Use filter rather than take.
      function filterAfter(n) {
        let i = 0
        return stream => stream.filter(x => i++ < n);
      }
      r.take(filterAfter(2)).pipe(process.stdout)
      l.pipe(process.stdout)
    

vqvu avatar Apr 07 '17 20:04 vqvu