stream icon indicating copy to clipboard operation
stream copied to clipboard

`broadcastDynamic` broken

Open mikearnaldi opened this issue 2 years ago • 4 comments

The following doesn't return:

import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"

pipe(
  Stream.fail("fail"),
  Stream.broadcastDynamic(1),
  Effect.flatMap((_) => Effect.zipPar(Stream.runDrain(_), Stream.runDrain(_))),
  Effect.scoped,
  Effect.catchAllCause(Effect.logErrorCause),
  Effect.runFork
)

the following propagates the failure:

import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"

pipe(
  Stream.fail("fail"),
  Stream.broadcastDynamic(1),
  Effect.flatMap((_) => Effect.zip(Stream.runDrain(_), Stream.runDrain(_))),
  Effect.scoped,
  Effect.catchAllCause(Effect.logErrorCause),
  Effect.runFork
)

mikearnaldi avatar Feb 16 '23 09:02 mikearnaldi

import * as Duration from "@effect/data/Duration"
import * as Effect from "@effect/io/Effect"
import * as Stream from "@effect/stream/Stream"
import { pipe } from "@fp-ts/core/Function"

pipe(
  Stream.fromEffect(Effect.delay(Duration.seconds(5))(Effect.fail("fail"))),
  Stream.broadcastDynamic(1),
  Effect.flatMap((_) => Effect.zipPar(Stream.runDrain(_), Stream.runDrain(_))),
  Effect.scoped,
  Effect.catchAllCause(Effect.logErrorCause),
  Effect.runFork
)

This correctly prints out two errors, seems we are not dealing with a new subscriber subscribing to a failed stream that should automatically fail

mikearnaldi avatar Feb 16 '23 09:02 mikearnaldi

@mikearnaldi - have you tested this with ZIO yet to see if it repros there?

IMax153 avatar Feb 16 '23 15:02 IMax153

@mikearnaldi - have you tested this with ZIO yet to see if it repros there?

no not yet, would be good to make a zio project with a nix setup to repro this stuff quickly

mikearnaldi avatar Feb 16 '23 16:02 mikearnaldi

@mikearnaldi - I know it's been a while on this issue, but I've run the following two programs in the scala-playground repo using the latest ZIO via scala-cli and they both hang (i.e. neither terminates).

//> using dep dev.zio::zio:2.0.15
//> using dep dev.zio::zio-streams:2.0.15

package com.effect.playground

import zio._
import zio.stream._

object Main extends ZIOAppDefault {

  val program1 = ZIO
    .scoped {
      ZStream
        .fail("fail")
        .broadcastDynamic(1)
        .flatMap((stream) => stream.runDrain.zipPar(stream.runDrain))
    }
    .catchAllCause(ZIO.logErrorCause(_))

  val program2 = ZIO
    .scoped {
      ZStream
        .fail("fail")
        .broadcastDynamic(1)
        .flatMap((stream) => stream.runDrain.zip(stream.runDrain))
    }
    .catchAllCause(ZIO.logErrorCause(_))

  val run = program1

}

IMax153 avatar Jun 26 '23 13:06 IMax153