highland
highland copied to clipboard
Fork without end
Check a sample below
const H = require('highland')
const stream = H([1, 2, 3, 4, 5]).map(String)
const l = stream.fork()
const r = stream.fork()
r.take(2).pipe(process.stdout)
l.pipe(process.stdout)
As a result I see:
>node test.js
1122
Why do I have only two elements from l-stream? How can I achieve behavior when It emits all 5?
I know that it's possible with .observe but in a case I loose back pressure
This is a known issue with fork. In 2.x, there's no concept of "unsubscribing", so even though your r fork no long needs data after the first two values, it can't signal to the source that it no longer wishes to exert backpressure.
Your choices are:
- Use the 3.0.0 beta:
npm install highland@next. This is beta software, so standard disclaimers apply, but it should be generally bug-free. - Use
filterrather thantake.function filterAfter(n) { let i = 0 return stream => stream.filter(x => i++ < n); } r.take(filterAfter(2)).pipe(process.stdout) l.pipe(process.stdout)