`broadcastDynamic` broken
The following doesn't return:
import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"
pipe(
Stream.fail("fail"),
Stream.broadcastDynamic(1),
Effect.flatMap((_) => Effect.zipPar(Stream.runDrain(_), Stream.runDrain(_))),
Effect.scoped,
Effect.catchAllCause(Effect.logErrorCause),
Effect.runFork
)
the following propagates the failure:
import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"
pipe(
Stream.fail("fail"),
Stream.broadcastDynamic(1),
Effect.flatMap((_) => Effect.zip(Stream.runDrain(_), Stream.runDrain(_))),
Effect.scoped,
Effect.catchAllCause(Effect.logErrorCause),
Effect.runFork
)
import * as Duration from "@effect/data/Duration"
import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"
pipe(
Stream.fromEffect(Effect.delay(Duration.seconds(5))(Effect.fail("fail"))),
Stream.broadcastDynamic(1),
Effect.flatMap((_) => Effect.zipPar(Stream.runDrain(_), Stream.runDrain(_))),
Effect.scoped,
Effect.catchAllCause(Effect.logErrorCause),
Effect.runFork
)
This correctly prints out two errors, seems we are not dealing with a new subscriber subscribing to a failed stream that should automatically fail
@mikearnaldi - have you tested this with ZIO yet to see if it repros there?
@mikearnaldi - have you tested this with ZIO yet to see if it repros there?
no not yet, would be good to make a zio project with a nix setup to repro this stuff quickly
@mikearnaldi - I know it's been a while on this issue, but I've run the following two programs in the scala-playground repo using the latest ZIO via scala-cli and they both hang (i.e. neither terminates).
//> using dep dev.zio::zio:2.0.15
//> using dep dev.zio::zio-streams:2.0.15
package com.effect.playground
import zio._
import zio.stream._
object Main extends ZIOAppDefault {
val program1 = ZIO
.scoped {
ZStream
.fail("fail")
.broadcastDynamic(1)
.flatMap((stream) => stream.runDrain.zipPar(stream.runDrain))
}
.catchAllCause(ZIO.logErrorCause(_))
val program2 = ZIO
.scoped {
ZStream
.fail("fail")
.broadcastDynamic(1)
.flatMap((stream) => stream.runDrain.zip(stream.runDrain))
}
.catchAllCause(ZIO.logErrorCause(_))
val run = program1
}