Improve `parJoin` with the ability to select a join strategy
The current implementation of parJoin with maxOpen does not start the remaining streams until the earlier ones are complete. This is a problem in cases where the inner streams are infinite as in the example below:
Listing A - parJoin with 10 infinite streams, maxOpen = 2
Stream.range(1, 10)
.map {
c =>
Stream
.repeatEval(IO(c))
}
.parJoin(maxOpen = 2)
.evalTap {
c =>
IO(println(s"I am Stream $c"))
}
.take(10)
.compile
.drain
Output of Listing A
I am Stream 1
I am Stream 2
I am Stream 2
I am Stream 1
I am Stream 1
I am Stream 2
I am Stream 2
I am Stream 1
I am Stream 1
I am Stream 2
This limitation makes parJoin unusable with libraries like fs2-kafka when processing individual partitions is desirable before joining them. The only alternative in that case is to use parJoinUnbounded which fixes the above problem but has no way of controlling how many inner streams will be evaluated in parallel. This is desirable if the inner streams are memory intensive and you cannot let all of them run in parallel.
I would like to propose a new signature for parJoin as follows:
sealed trait JoinStrategy
object JoinStrategy {
case object Random extends JoinStrategy
case object RoundRobin extends JoinStrategy
case object Sequential extends JoinStrategy
}
def parJoin(maxOpen: Int, joinStrategy: JoinStrategy = Sequential)
Sequential would be the current logic and the default
RoundRobin would evaluate each inner stream in turn
Random would be how the parJoinUnbounded works but with maximum parallel evaluations limited to maxOpen
Surly there can be better names selected for these and there could be other strategies but these are what I can think of now.
For your Kafka use case, you want to basically start each inner stream but then only pull up to n elements at a time? You could do this with parJoinUnbounded and a channel like so:
def greedyJoin[F[_]: Concurrent, O](s: Stream[F, Stream[F, O]], n: Int): Stream[F, O] =
Stream.eval(Channel.bounded[F, O](n)).flatMap { channel =>
channel.stream.concurrently(s.parJoinUnbounded.through(channel.sendAll))
}
Alternatively, if you want to prefetch up to n chunks instead of n elements:
def greedyJoin[F[_]: Concurrent, O](s: Stream[F, Stream[F, O]], n: Int): Stream[F, O] =
s.parJoinUnbounded.prefetchN(n)
@SystemFw What do you think about allowing configuration of the channel inside parJoin? Somehow letting folks to optionally specify a bound for output here, instead of always using a sync channel: https://github.com/typelevel/fs2/blob/c945743bf663cecc8e1a67b3b833c5cefe7a3f90/core/shared/src/main/scala/fs2/Stream.scala#L3948
Edit: nevermind, this would be equivalent to using prefetchN after parJoin.
@mpilquist , thanks for the suggestion, I will try the solution you suggested. I do think it would be worth considering to add the above or perhaps separate parJoin combinators that provide control how to join. Over in Akka they have similar functionality exposed through many merge* functions.
I actually write Kafka code on top of fs2-kafka very differently, the interface I expose is (Chunk[A] => F[Chunk[B]]): F[Unit], and both ConsumerRecords and Stream are hidden, for various reasons.
Anyway if we were to explore this, I'd be inclined to provide several combinators, the design space is pretty big. For example, I had a case where I wanted n to be dynamic, then you can think about RoundRobin, etc. I feel like trying to cram it all into one combinator will result in a PubSub-like thing, which hasn't been successful before