fs2
fs2 copied to clipboard
'.pauseWhen' does not respect pause signal if there is was no data available
The code below should just produce timeout, but it also prints 1 despite the fact that value was produced after pause:
Stream.eval(SignallingRef[IO].of(false)).flatMap { pause =>
Stream.sleep[IO](1.second).map(_ => 1).pauseWhen(pause).concurrently {
Stream.sleep[IO](500.millis).evalMap(_ => pause.set(true))
}.foreach(IO.println)
}.compile.drain.timeout(2.seconds)
FS2: 3.2.7 & 3.2.8 Scatie link: https://scastie.scala-lang.org/ZxOocFhHTK6RjuvlQ57ulw
I think, it could be fixed with either a double-checking pause state in chunks.flatMap(...) or a custom pull stage that races between next chunk pull and pause signal.
lately I am seeing some rare concurrent and timeout related issues every once in a while like this in tests as well as running on prob, probably related.