fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

'.pauseWhen' does not respect pause signal if there is was no data available

Open seigert opened this issue 3 years ago • 1 comments

The code below should just produce timeout, but it also prints 1 despite the fact that value was produced after pause:

Stream.eval(SignallingRef[IO].of(false)).flatMap { pause =>
   Stream.sleep[IO](1.second).map(_ => 1).pauseWhen(pause).concurrently {
      Stream.sleep[IO](500.millis).evalMap(_ => pause.set(true))
   }.foreach(IO.println)
}.compile.drain.timeout(2.seconds)

FS2: 3.2.7 & 3.2.8 Scatie link: https://scastie.scala-lang.org/ZxOocFhHTK6RjuvlQ57ulw

I think, it could be fixed with either a double-checking pause state in chunks.flatMap(...) or a custom pull stage that races between next chunk pull and pause signal.

seigert avatar Jun 20 '22 15:06 seigert

lately I am seeing some rare concurrent and timeout related issues every once in a while like this in tests as well as running on prob, probably related.

Swoorup avatar Jul 28 '22 14:07 Swoorup