prefetch hangs on a canceled stream
fs2 version 3.10.2
this code does not terminate:
Stream.eval(IO.canceled).prefetch.compile.drain
expected behavior: prefetch should not change execution semantics, at least not so much as to prevent termination
Since concurrently doesn't guarantee that the foreground stream is closed when the background reaches termination, then I think the channel should be closed once the this stream exits.
def prefetchN[F2[x] >: F[x]: Concurrent](
n: Int
): Stream[F2, O] =
Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
chan.stream.unchunks.concurrently {
- chunks.through(chan.sendAll)
+ chunks.through(chan.sendAll).onFinalize(chan.close)
}
}
Here.
Further note, this will cause the foreground stream to exit with Success which might not be correct. I think Canceled should be the right outcome in that case.
def prefetchN[F2[x] >: F[x]: Concurrent](
n: Int
): Stream[F2, O] =
- Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
+ Stream.eval(Channel.bounded[F2, Either[ExitCase, Chunk[O]]](n)).flatMap { chan =>
chan.stream.unchunks.concurrently {
- chunks.through(chan.sendAll)
+ chunks.map(_.asRight[ExitCase]).through(chan.sendAll).onFinalizeCase(chan.send(_) >> chan.close)
}
}.flatMap{
case Right(x) => Stream.chunk(x)
case Left(ec) => Stream.exec(ec.toOutcome[F].embed(Concurrent[F].canceled))
}
Maybe prefetch can be expressed via one of the other combinators that run two streams concurrently which also guarantees communication of cancellation and errors if any of the concurrent streams exits (merge? but it doesn't look like it handles cancellation properly).