fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Improve `parJoin` with the ability to select a join strategy

Open rehanone opened this issue 4 years ago • 4 comments

The current implementation of parJoin with maxOpen does not start the remaining streams until the earlier ones are complete. This is a problem in cases where the inner streams are infinite as in the example below:

Listing A - parJoin with 10 infinite streams, maxOpen = 2

    Stream.range(1, 10)
      .map {
        c =>
          Stream
            .repeatEval(IO(c))
      }
      .parJoin(maxOpen = 2)
      .evalTap {
        c =>
          IO(println(s"I am Stream $c"))
      }
      .take(10)
      .compile
      .drain

Output of Listing A

I am Stream 1
I am Stream 2
I am Stream 2
I am Stream 1
I am Stream 1
I am Stream 2
I am Stream 2
I am Stream 1
I am Stream 1
I am Stream 2

This limitation makes parJoin unusable with libraries like fs2-kafka when processing individual partitions is desirable before joining them. The only alternative in that case is to use parJoinUnbounded which fixes the above problem but has no way of controlling how many inner streams will be evaluated in parallel. This is desirable if the inner streams are memory intensive and you cannot let all of them run in parallel.

I would like to propose a new signature for parJoin as follows:


sealed trait JoinStrategy

object JoinStrategy {
  case object Random extends JoinStrategy
  case object RoundRobin extends JoinStrategy
  case object Sequential extends JoinStrategy
}

def parJoin(maxOpen: Int, joinStrategy: JoinStrategy = Sequential)

Sequential would be the current logic and the default RoundRobin would evaluate each inner stream in turn Random would be how the parJoinUnbounded works but with maximum parallel evaluations limited to maxOpen

Surly there can be better names selected for these and there could be other strategies but these are what I can think of now.

rehanone avatar Oct 09 '21 19:10 rehanone

For your Kafka use case, you want to basically start each inner stream but then only pull up to n elements at a time? You could do this with parJoinUnbounded and a channel like so:

def greedyJoin[F[_]: Concurrent, O](s: Stream[F, Stream[F, O]], n: Int): Stream[F, O] =
    Stream.eval(Channel.bounded[F, O](n)).flatMap { channel =>
      channel.stream.concurrently(s.parJoinUnbounded.through(channel.sendAll))
    }

Alternatively, if you want to prefetch up to n chunks instead of n elements:

def greedyJoin[F[_]: Concurrent, O](s: Stream[F, Stream[F, O]], n: Int): Stream[F, O] =
    s.parJoinUnbounded.prefetchN(n)

mpilquist avatar Oct 11 '21 12:10 mpilquist

@SystemFw What do you think about allowing configuration of the channel inside parJoin? Somehow letting folks to optionally specify a bound for output here, instead of always using a sync channel: https://github.com/typelevel/fs2/blob/c945743bf663cecc8e1a67b3b833c5cefe7a3f90/core/shared/src/main/scala/fs2/Stream.scala#L3948

Edit: nevermind, this would be equivalent to using prefetchN after parJoin.

mpilquist avatar Oct 12 '21 11:10 mpilquist

@mpilquist , thanks for the suggestion, I will try the solution you suggested. I do think it would be worth considering to add the above or perhaps separate parJoin combinators that provide control how to join. Over in Akka they have similar functionality exposed through many merge* functions.

rehanone avatar Oct 12 '21 11:10 rehanone

I actually write Kafka code on top of fs2-kafka very differently, the interface I expose is (Chunk[A] => F[Chunk[B]]): F[Unit], and both ConsumerRecords and Stream are hidden, for various reasons.

Anyway if we were to explore this, I'd be inclined to provide several combinators, the design space is pretty big. For example, I had a case where I wanted n to be dynamic, then you can think about RoundRobin, etc. I feel like trying to cram it all into one combinator will result in a PubSub-like thing, which hasn't been successful before

SystemFw avatar Oct 12 '21 12:10 SystemFw