asyncplusplus icon indicating copy to clipboard operation
asyncplusplus copied to clipboard

[feature] iterative when_all

Open kikaxa opened this issue 2 years ago • 1 comments

Many code paths (dynamically/iteratively) generate tasks that need to be waited on. Currently one needs to make a temporary storage for all of them and then create when_all composition. The proposal api simply separates the when_all initialization and addition steps, allowing it to be easily used with loops, generators, fold expressions etc..

Please let me know if the feature is ok with you and any comments on proposed code below: I can make a pr then.. // On a side note, while it should be possible to make an analogue for when_any, i dont imagine (many) use-cases for that rn. Therefore not suggesting to add it.

/// Returns when_all state that can be used with tasks from generators without intermediate storage
/// Use operator(task) to fill it with tasks and
/// readyTask is the composite result task
template<typename Ttask = async::task<void> >
auto when_all_iterative(size_t total_count)
{
    typedef typename std::decay<Ttask>::type task_type;
    typedef std::vector<task_type> result_type;

    struct when_all_iterative_state
    {
        typedef async::detail::when_all_state<result_type> state_type;

        void operator() (task_type&& source_task)
        {
            LIBASYNC_ASSERT(state, std::logic_error, "Adding to empty when_all");

            // Add a continuation to each task to add its result to the shared state
            // Last task sets the event result
            LIBASYNC_TRY {
                source_task.then(async::inline_scheduler(), async::detail::when_all_func_range<task_type, result_type>(next_index++, async::detail::ref_count_ptr<async::detail::when_all_state<result_type>>(state)));
            } LIBASYNC_CATCH(...) {
                // Make sure we don't leak memory if then() throws
                addFailed(1);
                LIBASYNC_RETHROW();
            }
        }

        void setTotal(size_t total_count)
        {
            LIBASYNC_ASSERT(!state, std::logic_error, "Initializing state twice");

            state = new state_type(total_count);
            state->result.resize(total_count);
            readyTask = state->event.get_task();
        }

        void addFailed(size_t failed = 1)
        {
            LIBASYNC_ASSERT(state, std::logic_error, "Adding to empty when_all");
            state->remove_ref(failed);
            next_index += failed;
        }

    protected:
        state_type* state = nullptr;
        size_t next_index = 0;
    public:
        async::task<result_type> readyTask = async::make_task<result_type>({});
    } state;

    if (total_count)
        state.setTotal(total_count);

    return state;
}

kikaxa avatar Jun 22 '23 09:06 kikaxa

https://github.com/kikaxa/asyncplusplus/commit/034821b989d64a3f915dafa74aa18c387dc4808b

kikaxa avatar Feb 15 '24 08:02 kikaxa