fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

prefetch hangs on a canceled stream

Open enlait opened this issue 1 year ago • 2 comments

fs2 version 3.10.2

this code does not terminate:

Stream.eval(IO.canceled).prefetch.compile.drain

expected behavior: prefetch should not change execution semantics, at least not so much as to prevent termination

enlait avatar Oct 01 '24 15:10 enlait

Since concurrently doesn't guarantee that the foreground stream is closed when the background reaches termination, then I think the channel should be closed once the this stream exits.

  def prefetchN[F2[x] >: F[x]: Concurrent](
      n: Int
  ): Stream[F2, O] =
    Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
      chan.stream.unchunks.concurrently {
-        chunks.through(chan.sendAll)
+        chunks.through(chan.sendAll).onFinalize(chan.close)
      }
    }

Here.

ValdemarGr avatar Oct 01 '24 19:10 ValdemarGr

Further note, this will cause the foreground stream to exit with Success which might not be correct. I think Canceled should be the right outcome in that case.

  def prefetchN[F2[x] >: F[x]: Concurrent](
      n: Int
  ): Stream[F2, O] =
-   Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
+   Stream.eval(Channel.bounded[F2, Either[ExitCase, Chunk[O]]](n)).flatMap { chan =>
      chan.stream.unchunks.concurrently {
-        chunks.through(chan.sendAll)
+        chunks.map(_.asRight[ExitCase]).through(chan.sendAll).onFinalizeCase(chan.send(_) >> chan.close)
      }
    }.flatMap{
      case Right(x) => Stream.chunk(x)
      case Left(ec) => Stream.exec(ec.toOutcome[F].embed(Concurrent[F].canceled))
    }

Maybe prefetch can be expressed via one of the other combinators that run two streams concurrently which also guarantees communication of cancellation and errors if any of the concurrent streams exits (merge? but it doesn't look like it handles cancellation properly).

ValdemarGr avatar Oct 02 '24 10:10 ValdemarGr