[BUG] attempt to await next() on more than one task
Describe the bug
When using .multicast I got unexpected error: attempt to await next() on more than one task. It is my - maybe incorrect? - understanding that I could use several tasks consuming an Async Multicasted sequence?
To Reproduce I have these variables:
actor Connection {
private let asyncStream: AsyncThrowingStream<MySendableStruct, Swift.Error>
private let asyncContinuation: AsyncThrowingStream<MySendableStruct, Swift.Error>.Continuation
private let multicastSubject = AsyncThrowingPassthroughSubject<MySendableStruct, Swift.Error>()
...
// using this static func: https://github.com/pointfreeco/swift-composable-architecture/blob/53ddc5904c065190d05c035ca0e4589cb6d45d61/Sources/ComposableArchitecture/Effects/ConcurrencySupport.swift#L222-L228
(asyncStream, asyncContinuation) = AsyncThrowingStream.streamWithContinuation()
...
func autoconnectingMulticastedAsync() -> AsyncMulticastSequence<AsyncThrowingStream<MySendableStruct, Swift.Error>, AsyncThrowingPassthroughSubject<MySendableStruct, Swift.Error>> {
asyncStream
.multicast(multicastSubject)
.autoconnect()
}
}
and callsite
...
func connect() async throws {
connectTask?.cancel()
connectTask = Task {
for try await value in autoconnectingMulticastedAsync() {
guard !Task.isCancelled else { return }
// do stuff with `value` that in some case might trigger a new nested `Task`
...
}
}
..
Provide code snippets, or links to gist or repos if possible.
Expected behavior I would expect to be able to await values in multiple tasks from a multicasted autoconnected Async Seqyuence.
Screenshots If applicable, add screenshots to help explain your problem.
Environment: Mac Studio (M1), running macOS Ventura: 13.0.1 (22A400) Xcode Version 14.1 (14B47b) AsyncExtension version 0.5.1
Additional context Add any other context about the problem here.
Hi,
Perhaps it as to do with the fact that an AsyncStream, once it is cancelled, cannot be iterated again. Perhaps try to replace it with an AsyncThrowingBufferedChannel?
Ultimately you are right … the multicast operator should support several tasks no matter the upstream sequence … I’ll look into it
@twittemb Im sorry I did not give you a reproducible example, I have a quite large and complex code base and Im novice at Structured Concurrency so it is a bit hard for me to give a small repro code snippet.
Thanks for an amazing project and looking forward to your Broadcast PR in AsyncAlgorithm repo to be merged!
Maybe I should use share instead of multicast..? Hmm
Thanks a lot.
Share is just a shortcut for multicast + AsyncPassthroughSubject. You’ll get the same result.
I’ll take a look as soon as possible to your usecase
Could you test the new version 0.5.2, it should have fixed that?
thanks.
Thanks! Testing it now, will report back results soon!
I did not manage to reproduce the bug with neither 0.5.1 nor 0.5.2 now when using AyncBufferedChannel.
Did you try with keeping an AsyncStream ?