AsyncExtensions icon indicating copy to clipboard operation
AsyncExtensions copied to clipboard

[BUG] attempt to await next() on more than one task

Open CyonAlexRDX opened this issue 3 years ago • 8 comments

Describe the bug When using .multicast I got unexpected error: attempt to await next() on more than one task. It is my - maybe incorrect? - understanding that I could use several tasks consuming an Async Multicasted sequence?

To Reproduce I have these variables:

actor Connection {
    private let asyncStream: AsyncThrowingStream<MySendableStruct, Swift.Error>
    private let asyncContinuation: AsyncThrowingStream<MySendableStruct, Swift.Error>.Continuation
    private let multicastSubject = AsyncThrowingPassthroughSubject<MySendableStruct, Swift.Error>()

    ...
    // using this static func: https://github.com/pointfreeco/swift-composable-architecture/blob/53ddc5904c065190d05c035ca0e4589cb6d45d61/Sources/ComposableArchitecture/Effects/ConcurrencySupport.swift#L222-L228
    (asyncStream, asyncContinuation) = AsyncThrowingStream.streamWithContinuation()
    ...

func autoconnectingMulticastedAsync() -> AsyncMulticastSequence<AsyncThrowingStream<MySendableStruct, Swift.Error>, AsyncThrowingPassthroughSubject<MySendableStruct, Swift.Error>>  {
        asyncStream
            .multicast(multicastSubject)
            .autoconnect()
    }
}

and callsite

...
func connect() async throws {
    connectTask?.cancel()
    connectTask = Task {
    for try await value in autoconnectingMulticastedAsync() {
        guard !Task.isCancelled else { return }
        // do stuff with `value` that in some case might trigger a new nested `Task`
       ...
    }
}
..

Provide code snippets, or links to gist or repos if possible.

Expected behavior I would expect to be able to await values in multiple tasks from a multicasted autoconnected Async Seqyuence.

Screenshots If applicable, add screenshots to help explain your problem.

Environment: Mac Studio (M1), running macOS Ventura: 13.0.1 (22A400) Xcode Version 14.1 (14B47b) AsyncExtension version 0.5.1

Additional context Add any other context about the problem here.

CyonAlexRDX avatar Jan 03 '23 10:01 CyonAlexRDX

Hi,

Perhaps it as to do with the fact that an AsyncStream, once it is cancelled, cannot be iterated again. Perhaps try to replace it with an AsyncThrowingBufferedChannel?

twittemb avatar Jan 03 '23 10:01 twittemb

Ultimately you are right … the multicast operator should support several tasks no matter the upstream sequence … I’ll look into it

twittemb avatar Jan 03 '23 10:01 twittemb

@twittemb Im sorry I did not give you a reproducible example, I have a quite large and complex code base and Im novice at Structured Concurrency so it is a bit hard for me to give a small repro code snippet.

Thanks for an amazing project and looking forward to your Broadcast PR in AsyncAlgorithm repo to be merged!

Maybe I should use share instead of multicast..? Hmm

CyonAlexRDX avatar Jan 03 '23 11:01 CyonAlexRDX

Thanks a lot.

Share is just a shortcut for multicast + AsyncPassthroughSubject. You’ll get the same result.

I’ll take a look as soon as possible to your usecase

twittemb avatar Jan 03 '23 11:01 twittemb

Could you test the new version 0.5.2, it should have fixed that?

thanks.

twittemb avatar Jan 03 '23 14:01 twittemb

Thanks! Testing it now, will report back results soon!

CyonAlexRDX avatar Jan 04 '23 11:01 CyonAlexRDX

I did not manage to reproduce the bug with neither 0.5.1 nor 0.5.2 now when using AyncBufferedChannel.

CyonAlexRDX avatar Jan 05 '23 06:01 CyonAlexRDX

Did you try with keeping an AsyncStream ?

twittemb avatar Jan 05 '23 09:01 twittemb