1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/concept/executor.hpp>
14  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/io_launchable_task.hpp>
15  
#include <boost/capy/concept/io_launchable_task.hpp>
16  
#include <boost/capy/coro.hpp>
16  
#include <boost/capy/coro.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/ex/frame_allocator.hpp>
18  
#include <boost/capy/ex/frame_allocator.hpp>
19  
#include <boost/capy/task.hpp>
19  
#include <boost/capy/task.hpp>
20  

20  

21  
#include <array>
21  
#include <array>
22  
#include <atomic>
22  
#include <atomic>
23  
#include <exception>
23  
#include <exception>
24  
#include <optional>
24  
#include <optional>
25  
#include <stop_token>
25  
#include <stop_token>
26  
#include <tuple>
26  
#include <tuple>
27  
#include <type_traits>
27  
#include <type_traits>
28  
#include <utility>
28  
#include <utility>
29  

29  

30  
namespace boost {
30  
namespace boost {
31  
namespace capy {
31  
namespace capy {
32  

32  

33  
namespace detail {
33  
namespace detail {
34  

34  

35  
/** Type trait to filter void types from a tuple.
35  
/** Type trait to filter void types from a tuple.
36  

36  

37  
    Void-returning tasks do not contribute a value to the result tuple.
37  
    Void-returning tasks do not contribute a value to the result tuple.
38  
    This trait computes the filtered result type.
38  
    This trait computes the filtered result type.
39  

39  

40  
    Example: filter_void_tuple_t<int, void, string> = tuple<int, string>
40  
    Example: filter_void_tuple_t<int, void, string> = tuple<int, string>
41  
*/
41  
*/
42  
template<typename T>
42  
template<typename T>
43  
using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>;
43  
using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>;
44  

44  

45  
template<typename... Ts>
45  
template<typename... Ts>
46  
using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...));
46  
using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...));
47  

47  

48  
/** Holds the result of a single task within when_all.
48  
/** Holds the result of a single task within when_all.
49  
*/
49  
*/
50  
template<typename T>
50  
template<typename T>
51  
struct result_holder
51  
struct result_holder
52  
{
52  
{
53  
    std::optional<T> value_;
53  
    std::optional<T> value_;
54  

54  

55  
    void set(T v)
55  
    void set(T v)
56  
    {
56  
    {
57  
        value_ = std::move(v);
57  
        value_ = std::move(v);
58  
    }
58  
    }
59  

59  

60  
    T get() &&
60  
    T get() &&
61  
    {
61  
    {
62  
        return std::move(*value_);
62  
        return std::move(*value_);
63  
    }
63  
    }
64  
};
64  
};
65  

65  

66  
/** Specialization for void tasks - no value storage needed.
66  
/** Specialization for void tasks - no value storage needed.
67  
*/
67  
*/
68  
template<>
68  
template<>
69  
struct result_holder<void>
69  
struct result_holder<void>
70  
{
70  
{
71  
};
71  
};
72  

72  

73  
/** Shared state for when_all operation.
73  
/** Shared state for when_all operation.
74  

74  

75  
    @tparam Ts The result types of the tasks.
75  
    @tparam Ts The result types of the tasks.
76  
*/
76  
*/
77  
template<typename... Ts>
77  
template<typename... Ts>
78  
struct when_all_state
78  
struct when_all_state
79  
{
79  
{
80  
    static constexpr std::size_t task_count = sizeof...(Ts);
80  
    static constexpr std::size_t task_count = sizeof...(Ts);
81  

81  

82  
    // Completion tracking - when_all waits for all children
82  
    // Completion tracking - when_all waits for all children
83  
    std::atomic<std::size_t> remaining_count_;
83  
    std::atomic<std::size_t> remaining_count_;
84  

84  

85  
    // Result storage in input order
85  
    // Result storage in input order
86  
    std::tuple<result_holder<Ts>...> results_;
86  
    std::tuple<result_holder<Ts>...> results_;
87  

87  

88  
    // Runner handles - destroyed in await_resume while allocator is valid
88  
    // Runner handles - destroyed in await_resume while allocator is valid
89  
    std::array<coro, task_count> runner_handles_{};
89  
    std::array<coro, task_count> runner_handles_{};
90  

90  

91  
    // Exception storage - first error wins, others discarded
91  
    // Exception storage - first error wins, others discarded
92  
    std::atomic<bool> has_exception_{false};
92  
    std::atomic<bool> has_exception_{false};
93  
    std::exception_ptr first_exception_;
93  
    std::exception_ptr first_exception_;
94  

94  

95  
    // Stop propagation - on error, request stop for siblings
95  
    // Stop propagation - on error, request stop for siblings
96  
    std::stop_source stop_source_;
96  
    std::stop_source stop_source_;
97  

97  

98  
    // Connects parent's stop_token to our stop_source
98  
    // Connects parent's stop_token to our stop_source
99  
    struct stop_callback_fn
99  
    struct stop_callback_fn
100  
    {
100  
    {
101  
        std::stop_source* source_;
101  
        std::stop_source* source_;
102  
        void operator()() const { source_->request_stop(); }
102  
        void operator()() const { source_->request_stop(); }
103  
    };
103  
    };
104  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
104  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
105  
    std::optional<stop_callback_t> parent_stop_callback_;
105  
    std::optional<stop_callback_t> parent_stop_callback_;
106  

106  

107  
    // Parent resumption
107  
    // Parent resumption
108  
    coro continuation_;
108  
    coro continuation_;
109  
    executor_ref caller_ex_;
109  
    executor_ref caller_ex_;
110  

110  

111  
    when_all_state()
111  
    when_all_state()
112  
        : remaining_count_(task_count)
112  
        : remaining_count_(task_count)
113  
    {
113  
    {
114  
    }
114  
    }
115  

115  

116  
    // Runners self-destruct in final_suspend. No destruction needed here.
116  
    // Runners self-destruct in final_suspend. No destruction needed here.
117  

117  

118  
    /** Capture an exception (first one wins).
118  
    /** Capture an exception (first one wins).
119  
    */
119  
    */
120  
    void capture_exception(std::exception_ptr ep)
120  
    void capture_exception(std::exception_ptr ep)
121  
    {
121  
    {
122  
        bool expected = false;
122  
        bool expected = false;
123  
        if(has_exception_.compare_exchange_strong(
123  
        if(has_exception_.compare_exchange_strong(
124  
            expected, true, std::memory_order_relaxed))
124  
            expected, true, std::memory_order_relaxed))
125  
            first_exception_ = ep;
125  
            first_exception_ = ep;
126  
    }
126  
    }
127  

127  

128  
};
128  
};
129  

129  

130  
/** Wrapper coroutine that intercepts task completion.
130  
/** Wrapper coroutine that intercepts task completion.
131  

131  

132  
    This runner awaits its assigned task and stores the result in
132  
    This runner awaits its assigned task and stores the result in
133  
    the shared state, or captures the exception and requests stop.
133  
    the shared state, or captures the exception and requests stop.
134  
*/
134  
*/
135  
template<typename T, typename... Ts>
135  
template<typename T, typename... Ts>
136  
struct when_all_runner
136  
struct when_all_runner
137  
{
137  
{
138  
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
138  
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
139  
    {
139  
    {
140  
        when_all_state<Ts...>* state_ = nullptr;
140  
        when_all_state<Ts...>* state_ = nullptr;
141  
        executor_ref ex_;
141  
        executor_ref ex_;
142  
        std::stop_token stop_token_;
142  
        std::stop_token stop_token_;
143  

143  

144  
        when_all_runner get_return_object()
144  
        when_all_runner get_return_object()
145  
        {
145  
        {
146  
            return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
146  
            return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
147  
        }
147  
        }
148  

148  

149  
        std::suspend_always initial_suspend() noexcept
149  
        std::suspend_always initial_suspend() noexcept
150  
        {
150  
        {
151  
            return {};
151  
            return {};
152  
        }
152  
        }
153  

153  

154  
        auto final_suspend() noexcept
154  
        auto final_suspend() noexcept
155  
        {
155  
        {
156  
            struct awaiter
156  
            struct awaiter
157  
            {
157  
            {
158  
                promise_type* p_;
158  
                promise_type* p_;
159  

159  

160  
                bool await_ready() const noexcept
160  
                bool await_ready() const noexcept
161  
                {
161  
                {
162  
                    return false;
162  
                    return false;
163  
                }
163  
                }
164  

164  

165  
                void await_suspend(coro h) noexcept
165  
                void await_suspend(coro h) noexcept
166  
                {
166  
                {
167  
                    // Extract everything needed for signaling before
167  
                    // Extract everything needed for signaling before
168  
                    // self-destruction. Inline dispatch may destroy
168  
                    // self-destruction. Inline dispatch may destroy
169  
                    // when_all_state, so we can't access members after.
169  
                    // when_all_state, so we can't access members after.
170  
                    auto* state = p_->state_;
170  
                    auto* state = p_->state_;
171  
                    auto* counter = &state->remaining_count_;
171  
                    auto* counter = &state->remaining_count_;
172  
                    auto caller_ex = state->caller_ex_;
172  
                    auto caller_ex = state->caller_ex_;
173  
                    auto cont = state->continuation_;
173  
                    auto cont = state->continuation_;
174  

174  

175  
                    // Self-destruct first - state no longer destroys runners
175  
                    // Self-destruct first - state no longer destroys runners
176  
                    h.destroy();
176  
                    h.destroy();
177  

177  

178  
                    // Signal completion. If last, dispatch parent.
178  
                    // Signal completion. If last, dispatch parent.
179  
                    // Uses only local copies - safe even if state
179  
                    // Uses only local copies - safe even if state
180  
                    // is destroyed during inline dispatch.
180  
                    // is destroyed during inline dispatch.
181  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
181  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
182  
                    if(remaining == 1)
182  
                    if(remaining == 1)
183  
                        caller_ex.dispatch(cont);
183  
                        caller_ex.dispatch(cont);
184  
                }
184  
                }
185  

185  

186  
                void await_resume() const noexcept
186  
                void await_resume() const noexcept
187  
                {
187  
                {
188  
                }
188  
                }
189  
            };
189  
            };
190  
            return awaiter{this};
190  
            return awaiter{this};
191  
        }
191  
        }
192  

192  

193  
        void return_void()
193  
        void return_void()
194  
        {
194  
        {
195  
        }
195  
        }
196  

196  

197  
        void unhandled_exception()
197  
        void unhandled_exception()
198  
        {
198  
        {
199  
            state_->capture_exception(std::current_exception());
199  
            state_->capture_exception(std::current_exception());
200  
            // Request stop for sibling tasks
200  
            // Request stop for sibling tasks
201  
            state_->stop_source_.request_stop();
201  
            state_->stop_source_.request_stop();
202  
        }
202  
        }
203  

203  

204  
        template<class Awaitable>
204  
        template<class Awaitable>
205  
        struct transform_awaiter
205  
        struct transform_awaiter
206  
        {
206  
        {
207  
            std::decay_t<Awaitable> a_;
207  
            std::decay_t<Awaitable> a_;
208  
            promise_type* p_;
208  
            promise_type* p_;
209  

209  

210  
            bool await_ready()
210  
            bool await_ready()
211  
            {
211  
            {
212  
                return a_.await_ready();
212  
                return a_.await_ready();
213  
            }
213  
            }
214  

214  

215  
            decltype(auto) await_resume()
215  
            decltype(auto) await_resume()
216  
            {
216  
            {
217  
                return a_.await_resume();
217  
                return a_.await_resume();
218  
            }
218  
            }
219  

219  

220  
            template<class Promise>
220  
            template<class Promise>
221  
            auto await_suspend(std::coroutine_handle<Promise> h)
221  
            auto await_suspend(std::coroutine_handle<Promise> h)
222  
            {
222  
            {
223  
                return a_.await_suspend(h, p_->ex_, p_->stop_token_);
223  
                return a_.await_suspend(h, p_->ex_, p_->stop_token_);
224  
            }
224  
            }
225  
        };
225  
        };
226  

226  

227  
        template<class Awaitable>
227  
        template<class Awaitable>
228  
        auto await_transform(Awaitable&& a)
228  
        auto await_transform(Awaitable&& a)
229  
        {
229  
        {
230  
            using A = std::decay_t<Awaitable>;
230  
            using A = std::decay_t<Awaitable>;
231  
            if constexpr (IoAwaitable<A>)
231  
            if constexpr (IoAwaitable<A>)
232  
            {
232  
            {
233  
                return transform_awaiter<Awaitable>{
233  
                return transform_awaiter<Awaitable>{
234  
                    std::forward<Awaitable>(a), this};
234  
                    std::forward<Awaitable>(a), this};
235  
            }
235  
            }
236  
            else
236  
            else
237  
            {
237  
            {
238  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
238  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
239  
            }
239  
            }
240  
        }
240  
        }
241  
    };
241  
    };
242  

242  

243  
    std::coroutine_handle<promise_type> h_;
243  
    std::coroutine_handle<promise_type> h_;
244  

244  

245  
    explicit when_all_runner(std::coroutine_handle<promise_type> h)
245  
    explicit when_all_runner(std::coroutine_handle<promise_type> h)
246  
        : h_(h)
246  
        : h_(h)
247  
    {
247  
    {
248  
    }
248  
    }
249  

249  

250  
    // Enable move for all clang versions - some versions need it
250  
    // Enable move for all clang versions - some versions need it
251  
    when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
251  
    when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
252  

252  

253  
    // Non-copyable
253  
    // Non-copyable
254  
    when_all_runner(when_all_runner const&) = delete;
254  
    when_all_runner(when_all_runner const&) = delete;
255  
    when_all_runner& operator=(when_all_runner const&) = delete;
255  
    when_all_runner& operator=(when_all_runner const&) = delete;
256  
    when_all_runner& operator=(when_all_runner&&) = delete;
256  
    when_all_runner& operator=(when_all_runner&&) = delete;
257  

257  

258  
    auto release() noexcept
258  
    auto release() noexcept
259  
    {
259  
    {
260  
        return std::exchange(h_, nullptr);
260  
        return std::exchange(h_, nullptr);
261  
    }
261  
    }
262  
};
262  
};
263  

263  

264  
/** Create a runner coroutine for a single task.
264  
/** Create a runner coroutine for a single task.
265  

265  

266  
    Task is passed directly to ensure proper coroutine frame storage.
266  
    Task is passed directly to ensure proper coroutine frame storage.
267  
*/
267  
*/
268  
template<std::size_t Index, typename T, typename... Ts>
268  
template<std::size_t Index, typename T, typename... Ts>
269  
when_all_runner<T, Ts...>
269  
when_all_runner<T, Ts...>
270  
make_when_all_runner(task<T> inner, when_all_state<Ts...>* state)
270  
make_when_all_runner(task<T> inner, when_all_state<Ts...>* state)
271  
{
271  
{
272  
    if constexpr (std::is_void_v<T>)
272  
    if constexpr (std::is_void_v<T>)
273  
    {
273  
    {
274  
        co_await std::move(inner);
274  
        co_await std::move(inner);
275  
    }
275  
    }
276  
    else
276  
    else
277  
    {
277  
    {
278  
        std::get<Index>(state->results_).set(co_await std::move(inner));
278  
        std::get<Index>(state->results_).set(co_await std::move(inner));
279  
    }
279  
    }
280  
}
280  
}
281  

281  

282  
/** Internal awaitable that launches all runner coroutines and waits.
282  
/** Internal awaitable that launches all runner coroutines and waits.
283  

283  

284  
    This awaitable is used inside the when_all coroutine to handle
284  
    This awaitable is used inside the when_all coroutine to handle
285  
    the concurrent execution of child tasks.
285  
    the concurrent execution of child tasks.
286  
*/
286  
*/
287  
template<typename... Ts>
287  
template<typename... Ts>
288  
class when_all_launcher
288  
class when_all_launcher
289  
{
289  
{
290  
    std::tuple<task<Ts>...>* tasks_;
290  
    std::tuple<task<Ts>...>* tasks_;
291  
    when_all_state<Ts...>* state_;
291  
    when_all_state<Ts...>* state_;
292  

292  

293  
public:
293  
public:
294  
    when_all_launcher(
294  
    when_all_launcher(
295  
        std::tuple<task<Ts>...>* tasks,
295  
        std::tuple<task<Ts>...>* tasks,
296  
        when_all_state<Ts...>* state)
296  
        when_all_state<Ts...>* state)
297  
        : tasks_(tasks)
297  
        : tasks_(tasks)
298  
        , state_(state)
298  
        , state_(state)
299  
    {
299  
    {
300  
    }
300  
    }
301  

301  

302  
    bool await_ready() const noexcept
302  
    bool await_ready() const noexcept
303  
    {
303  
    {
304  
        return sizeof...(Ts) == 0;
304  
        return sizeof...(Ts) == 0;
305  
    }
305  
    }
306  

306  

307  
    coro await_suspend(coro continuation, executor_ref caller_ex, std::stop_token parent_token = {})
307  
    coro await_suspend(coro continuation, executor_ref caller_ex, std::stop_token parent_token = {})
308  
    {
308  
    {
309  
        state_->continuation_ = continuation;
309  
        state_->continuation_ = continuation;
310  
        state_->caller_ex_ = caller_ex;
310  
        state_->caller_ex_ = caller_ex;
311  

311  

312  
        // Forward parent's stop requests to children
312  
        // Forward parent's stop requests to children
313  
        if(parent_token.stop_possible())
313  
        if(parent_token.stop_possible())
314  
        {
314  
        {
315  
            state_->parent_stop_callback_.emplace(
315  
            state_->parent_stop_callback_.emplace(
316  
                parent_token,
316  
                parent_token,
317  
                typename when_all_state<Ts...>::stop_callback_fn{&state_->stop_source_});
317  
                typename when_all_state<Ts...>::stop_callback_fn{&state_->stop_source_});
318  

318  

319  
            if(parent_token.stop_requested())
319  
            if(parent_token.stop_requested())
320  
                state_->stop_source_.request_stop();
320  
                state_->stop_source_.request_stop();
321  
        }
321  
        }
322  

322  

323  
        // CRITICAL: If the last task finishes synchronously then the parent
323  
        // CRITICAL: If the last task finishes synchronously then the parent
324  
        // coroutine resumes, destroying its frame, and destroying this object
324  
        // coroutine resumes, destroying its frame, and destroying this object
325  
        // prior to the completion of await_suspend. Therefore, await_suspend
325  
        // prior to the completion of await_suspend. Therefore, await_suspend
326  
        // must ensure `this` cannot be referenced after calling `launch_one`
326  
        // must ensure `this` cannot be referenced after calling `launch_one`
327  
        // for the last time.
327  
        // for the last time.
328  
        auto token = state_->stop_source_.get_token();
328  
        auto token = state_->stop_source_.get_token();
329  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
329  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
330  
            (..., launch_one<Is>(caller_ex, token));
330  
            (..., launch_one<Is>(caller_ex, token));
331  
        }(std::index_sequence_for<Ts...>{});
331  
        }(std::index_sequence_for<Ts...>{});
332  

332  

333  
        // Let signal_completion() handle resumption
333  
        // Let signal_completion() handle resumption
334  
        return std::noop_coroutine();
334  
        return std::noop_coroutine();
335  
    }
335  
    }
336  

336  

337  
    void await_resume() const noexcept
337  
    void await_resume() const noexcept
338  
    {
338  
    {
339  
        // Results are extracted by the when_all coroutine from state
339  
        // Results are extracted by the when_all coroutine from state
340  
    }
340  
    }
341  

341  

342  
private:
342  
private:
343  
    template<std::size_t I>
343  
    template<std::size_t I>
344  
    void launch_one(executor_ref caller_ex, std::stop_token token)
344  
    void launch_one(executor_ref caller_ex, std::stop_token token)
345  
    {
345  
    {
346  
        auto runner = make_when_all_runner<I>(
346  
        auto runner = make_when_all_runner<I>(
347  
            std::move(std::get<I>(*tasks_)), state_);
347  
            std::move(std::get<I>(*tasks_)), state_);
348  

348  

349  
        auto h = runner.release();
349  
        auto h = runner.release();
350  
        h.promise().state_ = state_;
350  
        h.promise().state_ = state_;
351  
        h.promise().ex_ = caller_ex;
351  
        h.promise().ex_ = caller_ex;
352  
        h.promise().stop_token_ = token;
352  
        h.promise().stop_token_ = token;
353  

353  

354  
        coro ch{h};
354  
        coro ch{h};
355  
        state_->runner_handles_[I] = ch;
355  
        state_->runner_handles_[I] = ch;
356  
        state_->caller_ex_.dispatch(ch);
356  
        state_->caller_ex_.dispatch(ch);
357  
    }
357  
    }
358  
};
358  
};
359  

359  

360  
/** Compute the result type for when_all.
360  
/** Compute the result type for when_all.
361  

361  

362  
    Returns void when all tasks are void (P2300 aligned),
362  
    Returns void when all tasks are void (P2300 aligned),
363  
    otherwise returns a tuple with void types filtered out.
363  
    otherwise returns a tuple with void types filtered out.
364  
*/
364  
*/
365  
template<typename... Ts>
365  
template<typename... Ts>
366  
using when_all_result_t = std::conditional_t<
366  
using when_all_result_t = std::conditional_t<
367  
    std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>,
367  
    std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>,
368  
    void,
368  
    void,
369  
    filter_void_tuple_t<Ts...>>;
369  
    filter_void_tuple_t<Ts...>>;
370  

370  

371  
/** Helper to extract a single result, returning empty tuple for void.
371  
/** Helper to extract a single result, returning empty tuple for void.
372  
    This is a separate function to work around a GCC-11 ICE that occurs
372  
    This is a separate function to work around a GCC-11 ICE that occurs
373  
    when using nested immediately-invoked lambdas with pack expansion.
373  
    when using nested immediately-invoked lambdas with pack expansion.
374  
*/
374  
*/
375  
template<std::size_t I, typename... Ts>
375  
template<std::size_t I, typename... Ts>
376  
auto extract_single_result(when_all_state<Ts...>& state)
376  
auto extract_single_result(when_all_state<Ts...>& state)
377  
{
377  
{
378  
    using T = std::tuple_element_t<I, std::tuple<Ts...>>;
378  
    using T = std::tuple_element_t<I, std::tuple<Ts...>>;
379  
    if constexpr (std::is_void_v<T>)
379  
    if constexpr (std::is_void_v<T>)
380  
        return std::tuple<>();
380  
        return std::tuple<>();
381  
    else
381  
    else
382  
        return std::make_tuple(std::move(std::get<I>(state.results_)).get());
382  
        return std::make_tuple(std::move(std::get<I>(state.results_)).get());
383  
}
383  
}
384  

384  

385  
/** Extract results from state, filtering void types.
385  
/** Extract results from state, filtering void types.
386  
*/
386  
*/
387  
template<typename... Ts>
387  
template<typename... Ts>
388  
auto extract_results(when_all_state<Ts...>& state)
388  
auto extract_results(when_all_state<Ts...>& state)
389  
{
389  
{
390  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
390  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
391  
        return std::tuple_cat(extract_single_result<Is>(state)...);
391  
        return std::tuple_cat(extract_single_result<Is>(state)...);
392  
    }(std::index_sequence_for<Ts...>{});
392  
    }(std::index_sequence_for<Ts...>{});
393  
}
393  
}
394  

394  

395  
} // namespace detail
395  
} // namespace detail
396  

396  

397  
/** Execute multiple tasks concurrently and collect their results.
397  
/** Execute multiple tasks concurrently and collect their results.
398  

398  

399  
    Launches all tasks simultaneously and waits for all to complete
399  
    Launches all tasks simultaneously and waits for all to complete
400  
    before returning. Results are collected in input order. If any
400  
    before returning. Results are collected in input order. If any
401  
    task throws, cancellation is requested for siblings and the first
401  
    task throws, cancellation is requested for siblings and the first
402  
    exception is rethrown after all tasks complete.
402  
    exception is rethrown after all tasks complete.
403  

403  

404  
    @li All child tasks run concurrently on the caller's executor
404  
    @li All child tasks run concurrently on the caller's executor
405  
    @li Results are returned as a tuple in input order
405  
    @li Results are returned as a tuple in input order
406  
    @li Void-returning tasks do not contribute to the result tuple
406  
    @li Void-returning tasks do not contribute to the result tuple
407  
    @li If all tasks return void, `when_all` returns `task<void>`
407  
    @li If all tasks return void, `when_all` returns `task<void>`
408  
    @li First exception wins; subsequent exceptions are discarded
408  
    @li First exception wins; subsequent exceptions are discarded
409  
    @li Stop is requested for siblings on first error
409  
    @li Stop is requested for siblings on first error
410  
    @li Completes only after all children have finished
410  
    @li Completes only after all children have finished
411  

411  

412  
    @par Thread Safety
412  
    @par Thread Safety
413  
    The returned task must be awaited from a single execution context.
413  
    The returned task must be awaited from a single execution context.
414  
    Child tasks execute concurrently but complete through the caller's
414  
    Child tasks execute concurrently but complete through the caller's
415  
    executor.
415  
    executor.
416  

416  

417  
    @param tasks The tasks to execute concurrently. Each task is
417  
    @param tasks The tasks to execute concurrently. Each task is
418  
        consumed (moved-from) when `when_all` is awaited.
418  
        consumed (moved-from) when `when_all` is awaited.
419  

419  

420  
    @return A task yielding a tuple of non-void results. Returns
420  
    @return A task yielding a tuple of non-void results. Returns
421  
        `task<void>` when all input tasks return void.
421  
        `task<void>` when all input tasks return void.
422  

422  

423  
    @par Example
423  
    @par Example
424  

424  

425  
    @code
425  
    @code
426  
    task<> example()
426  
    task<> example()
427  
    {
427  
    {
428  
        // Concurrent fetch, results collected in order
428  
        // Concurrent fetch, results collected in order
429  
        auto [user, posts] = co_await when_all(
429  
        auto [user, posts] = co_await when_all(
430  
            fetch_user( id ),      // task<User>
430  
            fetch_user( id ),      // task<User>
431  
            fetch_posts( id )      // task<std::vector<Post>>
431  
            fetch_posts( id )      // task<std::vector<Post>>
432  
        );
432  
        );
433  

433  

434  
        // Void tasks don't contribute to result
434  
        // Void tasks don't contribute to result
435  
        co_await when_all(
435  
        co_await when_all(
436  
            log_event( "start" ),  // task<void>
436  
            log_event( "start" ),  // task<void>
437  
            notify_user( id )      // task<void>
437  
            notify_user( id )      // task<void>
438  
        );
438  
        );
439  
        // Returns task<void>, no result tuple
439  
        // Returns task<void>, no result tuple
440  
    }
440  
    }
441  
    @endcode
441  
    @endcode
442  

442  

443  
    @see task
443  
    @see task
444  
*/
444  
*/
445  
template<typename... Ts>
445  
template<typename... Ts>
446  
[[nodiscard]] task<detail::when_all_result_t<Ts...>>
446  
[[nodiscard]] task<detail::when_all_result_t<Ts...>>
447  
when_all(task<Ts>... tasks)
447  
when_all(task<Ts>... tasks)
448  
{
448  
{
449  
    using result_type = detail::when_all_result_t<Ts...>;
449  
    using result_type = detail::when_all_result_t<Ts...>;
450  

450  

451  
    // State is stored in the coroutine frame, using the frame allocator
451  
    // State is stored in the coroutine frame, using the frame allocator
452  
    detail::when_all_state<Ts...> state;
452  
    detail::when_all_state<Ts...> state;
453  

453  

454  
    // Store tasks in the frame
454  
    // Store tasks in the frame
455  
    std::tuple<task<Ts>...> task_tuple(std::move(tasks)...);
455  
    std::tuple<task<Ts>...> task_tuple(std::move(tasks)...);
456  

456  

457  
    // Launch all tasks and wait for completion
457  
    // Launch all tasks and wait for completion
458  
    co_await detail::when_all_launcher<Ts...>(&task_tuple, &state);
458  
    co_await detail::when_all_launcher<Ts...>(&task_tuple, &state);
459  

459  

460  
    // Propagate first exception if any.
460  
    // Propagate first exception if any.
461  
    // Safe without explicit acquire: capture_exception() is sequenced-before
461  
    // Safe without explicit acquire: capture_exception() is sequenced-before
462  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
462  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
463  
    // last task's decrement that resumes this coroutine.
463  
    // last task's decrement that resumes this coroutine.
464  
    if(state.first_exception_)
464  
    if(state.first_exception_)
465  
        std::rethrow_exception(state.first_exception_);
465  
        std::rethrow_exception(state.first_exception_);
466  

466  

467  
    // Extract and return results
467  
    // Extract and return results
468  
    if constexpr (std::is_void_v<result_type>)
468  
    if constexpr (std::is_void_v<result_type>)
469  
        co_return;
469  
        co_return;
470  
    else
470  
    else
471  
        co_return detail::extract_results(state);
471  
        co_return detail::extract_results(state);
472  
}
472  
}
473  

473  

474  
/// Compute the result type of `when_all` for the given task types.
474  
/// Compute the result type of `when_all` for the given task types.
475  
template<typename... Ts>
475  
template<typename... Ts>
476  
using when_all_result_type = detail::when_all_result_t<Ts...>;
476  
using when_all_result_type = detail::when_all_result_t<Ts...>;
477  

477  

478  
} // namespace capy
478  
} // namespace capy
479  
} // namespace boost
479  
} // namespace boost
480  

480  

481  
#endif
481  
#endif