LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 99.3 % 145 144
Test Date: 2026-02-04 19:49:07 Functions: 93.2 % 338 315

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Michael Vandeberg
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      11              : #define BOOST_CAPY_WHEN_ANY_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/concept/executor.hpp>
      15              : #include <boost/capy/concept/io_awaitable.hpp>
      16              : #include <boost/capy/coro.hpp>
      17              : #include <boost/capy/ex/executor_ref.hpp>
      18              : #include <boost/capy/ex/frame_allocator.hpp>
      19              : #include <boost/capy/task.hpp>
      20              : 
      21              : #include <array>
      22              : #include <atomic>
      23              : #include <exception>
      24              : #include <optional>
      25              : #include <ranges>
      26              : #include <stdexcept>
      27              : #include <stop_token>
      28              : #include <tuple>
      29              : #include <type_traits>
      30              : #include <utility>
      31              : #include <variant>
      32              : #include <vector>
      33              : 
      34              : /*
      35              :    when_any - Race multiple tasks, return first completion
      36              :    ========================================================
      37              : 
      38              :    OVERVIEW:
      39              :    ---------
      40              :    when_any launches N tasks concurrently and completes when the FIRST task
      41              :    finishes (success or failure). It then requests stop for all siblings and
      42              :    waits for them to acknowledge before returning.
      43              : 
      44              :    ARCHITECTURE:
      45              :    -------------
      46              :    The design mirrors when_all but with inverted completion semantics:
      47              : 
      48              :      when_all:  complete when remaining_count reaches 0 (all done)
      49              :      when_any:  complete when has_winner becomes true (first done)
      50              :                 BUT still wait for remaining_count to reach 0 for cleanup
      51              : 
      52              :    Key components:
      53              :      - when_any_state:    Shared state tracking winner and completion
      54              :      - when_any_runner:   Wrapper coroutine for each child task
      55              :      - when_any_launcher: Awaitable that starts all runners concurrently
      56              : 
      57              :    CRITICAL INVARIANTS:
      58              :    --------------------
      59              :    1. Exactly one task becomes the winner (via atomic compare_exchange)
      60              :    2. All tasks must complete before parent resumes (cleanup safety)
      61              :    3. Stop is requested immediately when winner is determined
      62              :    4. Only the winner's result/exception is stored
      63              : 
      64              :    TYPE DEDUPLICATION:
      65              :    -------------------
      66              :    std::variant requires unique alternative types. Since when_any can race
      67              :    tasks with identical return types (e.g., three task<int>), we must
      68              :    deduplicate types before constructing the variant.
      69              : 
      70              :    Example: when_any(task<int>, task<string>, task<int>)
      71              :      - Raw types after void->monostate: int, string, int
      72              :      - Deduplicated variant: std::variant<int, string>
      73              :      - Return: pair<size_t, variant<int, string>>
      74              : 
      75              :    The winner_index tells you which task won (0, 1, or 2), while the variant
      76              :    holds the result. Use the index to determine how to interpret the variant.
      77              : 
      78              :    VOID HANDLING:
      79              :    --------------
      80              :    void tasks contribute std::monostate to the variant (then deduplicated).
      81              :    All-void tasks result in: pair<size_t, variant<monostate>>
      82              : 
      83              :    MEMORY MODEL:
      84              :    -------------
      85              :    Synchronization chain from winner's write to parent's read:
      86              : 
      87              :    1. Winner thread writes result_/winner_exception_ (non-atomic)
      88              :    2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
      89              :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      90              :       → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      91              :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      92              :    5. Parent coroutine resumes and reads result_/winner_exception_
      93              : 
      94              :    Synchronization analysis:
      95              :    - All fetch_sub operations on remaining_count_ form a release sequence
      96              :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      97              :      in the modification order of remaining_count_
      98              :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
      99              :      modification order, establishing happens-before from winner's writes
     100              :    - Executor dispatch() is expected to provide queue-based synchronization
     101              :      (release-on-post, acquire-on-execute) completing the chain to parent
     102              :    - Even inline executors work (same thread = sequenced-before)
     103              : 
     104              :    Alternative considered: Adding winner_ready_ atomic (set with release after
     105              :    storing winner data, acquired before reading) would make synchronization
     106              :    self-contained and not rely on executor implementation details. Current
     107              :    approach is correct but requires careful reasoning about release sequences
     108              :    and executor behavior.
     109              : 
     110              :    EXCEPTION SEMANTICS:
     111              :    --------------------
     112              :    Unlike when_all (which captures first exception, discards others), when_any
     113              :    treats exceptions as valid completions. If the winning task threw, that
     114              :    exception is rethrown. Exceptions from non-winners are silently discarded.
     115              : */
     116              : 
     117              : namespace boost {
     118              : namespace capy {
     119              : 
     120              : namespace detail {
     121              : 
     122              : /** Convert void to monostate for variant storage.
     123              : 
     124              :     std::variant<void, ...> is ill-formed, so void tasks contribute
     125              :     std::monostate to the result variant instead. Non-void types
     126              :     pass through unchanged.
     127              : 
     128              :     @tparam T The type to potentially convert (void becomes monostate).
     129              : */
     130              : template<typename T>
     131              : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
     132              : 
     133              : // Type deduplication: std::variant requires unique alternative types.
     134              : // Fold left over the type list, appending each type only if not already present.
     135              : template<typename Variant, typename T>
     136              : struct variant_append_if_unique;
     137              : 
     138              : template<typename... Vs, typename T>
     139              : struct variant_append_if_unique<std::variant<Vs...>, T>
     140              : {
     141              :     using type = std::conditional_t<
     142              :         (std::is_same_v<T, Vs> || ...),
     143              :         std::variant<Vs...>,
     144              :         std::variant<Vs..., T>>;
     145              : };
     146              : 
     147              : template<typename Accumulated, typename... Remaining>
     148              : struct deduplicate_impl;
     149              : 
     150              : template<typename Accumulated>
     151              : struct deduplicate_impl<Accumulated>
     152              : {
     153              :     using type = Accumulated;
     154              : };
     155              : 
     156              : template<typename Accumulated, typename T, typename... Rest>
     157              : struct deduplicate_impl<Accumulated, T, Rest...>
     158              : {
     159              :     using next = typename variant_append_if_unique<Accumulated, T>::type;
     160              :     using type = typename deduplicate_impl<next, Rest...>::type;
     161              : };
     162              : 
     163              : // Deduplicated variant; void types become monostate before deduplication
     164              : template<typename T0, typename... Ts>
     165              : using unique_variant_t = typename deduplicate_impl<
     166              :     std::variant<void_to_monostate_t<T0>>,
     167              :     void_to_monostate_t<Ts>...>::type;
     168              : 
     169              : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
     170              : // when multiple tasks share the same return type.
     171              : template<typename T0, typename... Ts>
     172              : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
     173              : 
     174              : // Extract result type from any awaitable via await_resume()
     175              : template<typename A>
     176              : using awaitable_result_t = decltype(std::declval<std::decay_t<A>&>().await_resume());
     177              : 
     178              : /** Core shared state for when_any operations.
     179              : 
     180              :     Contains all members and methods common to both heterogeneous (variadic)
     181              :     and homogeneous (range) when_any implementations. State classes embed
     182              :     this via composition to avoid CRTP destructor ordering issues.
     183              : 
     184              :     @par Thread Safety
     185              :     Atomic operations protect winner selection and completion count.
     186              : */
     187              : struct when_any_core
     188              : {
     189              :     std::atomic<std::size_t> remaining_count_;
     190              :     std::size_t winner_index_{0};
     191              :     std::exception_ptr winner_exception_;
     192              :     std::stop_source stop_source_;
     193              : 
     194              :     // Bridges parent's stop token to our stop_source
     195              :     struct stop_callback_fn
     196              :     {
     197              :         std::stop_source* source_;
     198            6 :         void operator()() const noexcept { source_->request_stop(); }
     199              :     };
     200              :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     201              :     std::optional<stop_callback_t> parent_stop_callback_;
     202              : 
     203              :     coro continuation_;
     204              :     executor_ref caller_ex_;
     205              : 
     206              :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     207              :     std::atomic<bool> has_winner_{false};
     208              : 
     209           52 :     explicit when_any_core(std::size_t count) noexcept
     210           52 :         : remaining_count_(count)
     211              :     {
     212           52 :     }
     213              : 
     214              :     /** Atomically claim winner status; exactly one task succeeds. */
     215          145 :     bool try_win(std::size_t index) noexcept
     216              :     {
     217          145 :         bool expected = false;
     218          145 :         if(has_winner_.compare_exchange_strong(
     219              :             expected, true, std::memory_order_acq_rel))
     220              :         {
     221           52 :             winner_index_ = index;
     222           52 :             stop_source_.request_stop();
     223           52 :             return true;
     224              :         }
     225           93 :         return false;
     226              :     }
     227              : 
     228              :     /** @pre try_win() returned true. */
     229            8 :     void set_winner_exception(std::exception_ptr ep) noexcept
     230              :     {
     231            8 :         winner_exception_ = ep;
     232            8 :     }
     233              : 
     234              :     // Runners signal completion directly via final_suspend; no member function needed.
     235              : };
     236              : 
     237              : /** Shared state for heterogeneous when_any operation.
     238              : 
     239              :     Coordinates winner selection, result storage, and completion tracking
     240              :     for all child tasks in a when_any operation. Uses composition with
     241              :     when_any_core for shared functionality.
     242              : 
     243              :     @par Lifetime
     244              :     Allocated on the parent coroutine's frame, outlives all runners.
     245              : 
     246              :     @tparam T0 First task's result type.
     247              :     @tparam Ts Remaining tasks' result types.
     248              : */
     249              : template<typename T0, typename... Ts>
     250              : struct when_any_state
     251              : {
     252              :     static constexpr std::size_t task_count = 1 + sizeof...(Ts);
     253              :     using variant_type = unique_variant_t<T0, Ts...>;
     254              : 
     255              :     when_any_core core_;
     256              :     std::optional<variant_type> result_;
     257              :     std::array<coro, task_count> runner_handles_{};
     258              : 
     259           37 :     when_any_state()
     260           37 :         : core_(task_count)
     261              :     {
     262           37 :     }
     263              : 
     264              :     // Runners self-destruct in final_suspend. No destruction needed here.
     265              : 
     266              :     /** @pre core_.try_win() returned true.
     267              :         @note Uses in_place_type (not index) because variant is deduplicated.
     268              :     */
     269              :     template<typename T>
     270           30 :     void set_winner_result(T value)
     271              :         noexcept(std::is_nothrow_move_constructible_v<T>)
     272              :     {
     273           30 :         result_.emplace(std::in_place_type<T>, std::move(value));
     274           30 :     }
     275              : 
     276              :     /** @pre core_.try_win() returned true. */
     277            2 :     void set_winner_void() noexcept
     278              :     {
     279            2 :         result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
     280            2 :     }
     281              : };
     282              : 
     283              : /** Wrapper coroutine that runs a single child task for when_any.
     284              : 
     285              :     Propagates executor/stop_token to the child, attempts to claim winner
     286              :     status on completion, and signals completion for cleanup coordination.
     287              : 
     288              :     @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
     289              : */
     290              : template<typename StateType>
     291              : struct when_any_runner
     292              : {
     293              :     struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
     294              :     {
     295              :         StateType* state_ = nullptr;
     296              :         std::size_t index_ = 0;
     297              :         executor_ref ex_;
     298              :         std::stop_token stop_token_;
     299              : 
     300          145 :         when_any_runner get_return_object() noexcept
     301              :         {
     302          145 :             return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
     303              :         }
     304              : 
     305              :         // Starts suspended; launcher sets up state/ex/token then resumes
     306          145 :         std::suspend_always initial_suspend() noexcept
     307              :         {
     308          145 :             return {};
     309              :         }
     310              : 
     311          145 :         auto final_suspend() noexcept
     312              :         {
     313              :             struct awaiter
     314              :             {
     315              :                 promise_type* p_;
     316          145 :                 bool await_ready() const noexcept { return false; }
     317          145 :                 void await_suspend(coro h) noexcept
     318              :                 {
     319              :                     // Extract everything needed for signaling before
     320              :                     // self-destruction. Inline dispatch may destroy
     321              :                     // state, so we can't access members after.
     322          145 :                     auto& core = p_->state_->core_;
     323          145 :                     auto* counter = &core.remaining_count_;
     324          145 :                     auto caller_ex = core.caller_ex_;
     325          145 :                     auto cont = core.continuation_;
     326              : 
     327              :                     // Self-destruct first - state no longer destroys runners
     328          145 :                     h.destroy();
     329              : 
     330              :                     // Signal completion. If last, dispatch parent.
     331              :                     // Uses only local copies - safe even if state
     332              :                     // is destroyed during inline dispatch.
     333          145 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     334          145 :                     if(remaining == 1)
     335           52 :                         caller_ex.dispatch(cont);
     336          145 :                 }
     337            0 :                 void await_resume() const noexcept {}
     338              :             };
     339          145 :             return awaiter{this};
     340              :         }
     341              : 
     342          133 :         void return_void() noexcept {}
     343              : 
     344              :         // Exceptions are valid completions in when_any (unlike when_all)
     345           12 :         void unhandled_exception()
     346              :         {
     347           12 :             if(state_->core_.try_win(index_))
     348            8 :                 state_->core_.set_winner_exception(std::current_exception());
     349           12 :         }
     350              : 
     351              :         /** Injects executor and stop token into child awaitables. */
     352              :         template<class Awaitable>
     353              :         struct transform_awaiter
     354              :         {
     355              :             std::decay_t<Awaitable> a_;
     356              :             promise_type* p_;
     357              : 
     358          145 :             bool await_ready() { return a_.await_ready(); }
     359          145 :             auto await_resume() { return a_.await_resume(); }
     360              : 
     361              :             template<class Promise>
     362          145 :             auto await_suspend(std::coroutine_handle<Promise> h)
     363              :             {
     364          145 :                 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
     365              :             }
     366              :         };
     367              : 
     368              :         template<class Awaitable>
     369          145 :         auto await_transform(Awaitable&& a)
     370              :         {
     371              :             using A = std::decay_t<Awaitable>;
     372              :             if constexpr (IoAwaitable<A>)
     373              :             {
     374              :                 return transform_awaiter<Awaitable>{
     375          290 :                     std::forward<Awaitable>(a), this};
     376              :             }
     377              :             else
     378              :             {
     379              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     380              :             }
     381          145 :         }
     382              :     };
     383              : 
     384              :     std::coroutine_handle<promise_type> h_;
     385              : 
     386          145 :     explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
     387          145 :         : h_(h)
     388              :     {
     389          145 :     }
     390              : 
     391              :     // Enable move for all clang versions - some versions need it
     392              :     when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
     393              : 
     394              :     // Non-copyable
     395              :     when_any_runner(when_any_runner const&) = delete;
     396              :     when_any_runner& operator=(when_any_runner const&) = delete;
     397              :     when_any_runner& operator=(when_any_runner&&) = delete;
     398              : 
     399          145 :     auto release() noexcept
     400              :     {
     401          145 :         return std::exchange(h_, nullptr);
     402              :     }
     403              : };
     404              : 
     405              : /** Wraps a child awaitable, attempts to claim winner on completion.
     406              : 
     407              :     Uses requires-expressions to detect state capabilities:
     408              :     - set_winner_void(): for heterogeneous void tasks (stores monostate)
     409              :     - set_winner_result(): for non-void tasks
     410              :     - Neither: for homogeneous void tasks (no result storage)
     411              : */
     412              : template<IoAwaitable Awaitable, typename StateType>
     413              : when_any_runner<StateType>
     414          145 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
     415              : {
     416              :     using T = awaitable_result_t<Awaitable>;
     417              :     if constexpr (std::is_void_v<T>)
     418              :     {
     419              :         co_await std::move(inner);
     420              :         if(state->core_.try_win(index))
     421              :         {
     422              :             // Heterogeneous void tasks store monostate in the variant
     423              :             if constexpr (requires { state->set_winner_void(); })
     424              :                 state->set_winner_void();
     425              :             // Homogeneous void tasks have no result to store
     426              :         }
     427              :     }
     428              :     else
     429              :     {
     430              :         auto result = co_await std::move(inner);
     431              :         if(state->core_.try_win(index))
     432              :         {
     433              :             // Defensive: move should not throw (already moved once), but we
     434              :             // catch just in case since an uncaught exception would be devastating.
     435              :             try
     436              :             {
     437              :                 state->set_winner_result(std::move(result));
     438              :             }
     439              :             catch(...)
     440              :             {
     441              :                 state->core_.set_winner_exception(std::current_exception());
     442              :             }
     443              :         }
     444              :     }
     445          290 : }
     446              : 
     447              : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     448              : template<IoAwaitable... Awaitables>
     449              : class when_any_launcher
     450              : {
     451              :     using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
     452              : 
     453              :     std::tuple<Awaitables...>* tasks_;
     454              :     state_type* state_;
     455              : 
     456              : public:
     457           37 :     when_any_launcher(
     458              :         std::tuple<Awaitables...>* tasks,
     459              :         state_type* state)
     460           37 :         : tasks_(tasks)
     461           37 :         , state_(state)
     462              :     {
     463           37 :     }
     464              : 
     465           37 :     bool await_ready() const noexcept
     466              :     {
     467           37 :         return sizeof...(Awaitables) == 0;
     468              :     }
     469              : 
     470              :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     471              :         destroys this object before await_suspend returns. Must not reference
     472              :         `this` after the final launch_one call.
     473              :     */
     474              :     template<Executor Ex>
     475           37 :     coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
     476              :     {
     477           37 :         state_->core_.continuation_ = continuation;
     478           37 :         state_->core_.caller_ex_ = caller_ex;
     479              : 
     480           37 :         if(parent_token.stop_possible())
     481              :         {
     482           16 :             state_->core_.parent_stop_callback_.emplace(
     483              :                 parent_token,
     484            8 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     485              : 
     486            8 :             if(parent_token.stop_requested())
     487            2 :                 state_->core_.stop_source_.request_stop();
     488              :         }
     489              : 
     490           37 :         auto token = state_->core_.stop_source_.get_token();
     491           74 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     492           37 :             (..., launch_one<Is>(caller_ex, token));
     493           37 :         }(std::index_sequence_for<Awaitables...>{});
     494              : 
     495           74 :         return std::noop_coroutine();
     496           37 :     }
     497              : 
     498           37 :     void await_resume() const noexcept
     499              :     {
     500           37 :     }
     501              : 
     502              : private:
     503              :     /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
     504              :     template<std::size_t I, Executor Ex>
     505           93 :     void launch_one(Ex const& caller_ex, std::stop_token token)
     506              :     {
     507           93 :         auto runner = make_when_any_runner(
     508           93 :             std::move(std::get<I>(*tasks_)), state_, I);
     509              : 
     510           93 :         auto h = runner.release();
     511           93 :         h.promise().state_ = state_;
     512           93 :         h.promise().index_ = I;
     513           93 :         h.promise().ex_ = caller_ex;
     514           93 :         h.promise().stop_token_ = token;
     515              : 
     516           93 :         coro ch{h};
     517           93 :         state_->runner_handles_[I] = ch;
     518           93 :         caller_ex.dispatch(ch);
     519           93 :     }
     520              : };
     521              : 
     522              : } // namespace detail
     523              : 
     524              : /** Wait for the first awaitable to complete.
     525              : 
     526              :     Races multiple heterogeneous awaitables concurrently and returns when the
     527              :     first one completes. The result includes the winner's index and a
     528              :     deduplicated variant containing the result value.
     529              : 
     530              :     @par Suspends
     531              :     The calling coroutine suspends when co_await is invoked. All awaitables
     532              :     are launched concurrently and execute in parallel. The coroutine resumes
     533              :     only after all awaitables have completed, even though the winner is
     534              :     determined by the first to finish.
     535              : 
     536              :     @par Completion Conditions
     537              :     @li Winner is determined when the first awaitable completes (success or exception)
     538              :     @li Only one task can claim winner status via atomic compare-exchange
     539              :     @li Once a winner exists, stop is requested for all remaining siblings
     540              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     541              :     @li The winner's result is returned; if the winner threw, the exception is rethrown
     542              : 
     543              :     @par Cancellation Semantics
     544              :     Cancellation is supported via stop_token propagated through the
     545              :     IoAwaitable protocol:
     546              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     547              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     548              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     549              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     550              :     @li Stop requests are cooperative; tasks must check and respond to them
     551              : 
     552              :     @par Concurrency/Overlap
     553              :     All awaitables are launched concurrently before any can complete.
     554              :     The launcher iterates through the arguments, starting each task on the
     555              :     caller's executor. Tasks may execute in parallel on multi-threaded
     556              :     executors or interleave on single-threaded executors. There is no
     557              :     guaranteed ordering of task completion.
     558              : 
     559              :     @par Notable Error Conditions
     560              :     @li Winner exception: if the winning task threw, that exception is rethrown
     561              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     562              :     @li Cancellation: tasks may complete via cancellation without throwing
     563              : 
     564              :     @par Example
     565              :     @code
     566              :     task<void> example() {
     567              :         auto [index, result] = co_await when_any(
     568              :             fetch_from_primary(),   // task<Response>
     569              :             fetch_from_backup()     // task<Response>
     570              :         );
     571              :         // index is 0 or 1, result holds the winner's Response
     572              :         auto response = std::get<Response>(result);
     573              :     }
     574              :     @endcode
     575              : 
     576              :     @par Example with Heterogeneous Types
     577              :     @code
     578              :     task<void> mixed_types() {
     579              :         auto [index, result] = co_await when_any(
     580              :             fetch_int(),      // task<int>
     581              :             fetch_string()    // task<std::string>
     582              :         );
     583              :         if (index == 0)
     584              :             std::cout << "Got int: " << std::get<int>(result) << "\n";
     585              :         else
     586              :             std::cout << "Got string: " << std::get<std::string>(result) << "\n";
     587              :     }
     588              :     @endcode
     589              : 
     590              :     @tparam A0 First awaitable type (must satisfy IoAwaitable).
     591              :     @tparam As Remaining awaitable types (must satisfy IoAwaitable).
     592              :     @param a0 The first awaitable to race.
     593              :     @param as Additional awaitables to race concurrently.
     594              :     @return A task yielding a pair of (winner_index, result_variant).
     595              : 
     596              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     597              : 
     598              :     @par Remarks
     599              :     Awaitables are moved into the coroutine frame; original objects become
     600              :     empty after the call. When multiple awaitables share the same return type,
     601              :     the variant is deduplicated to contain only unique types. Use the winner
     602              :     index to determine which awaitable completed first. Void awaitables
     603              :     contribute std::monostate to the variant.
     604              : 
     605              :     @see when_all, IoAwaitable
     606              : */
     607              : template<IoAwaitable A0, IoAwaitable... As>
     608           37 : [[nodiscard]] auto when_any(A0 a0, As... as)
     609              :     -> task<detail::when_any_result_t<
     610              :         detail::awaitable_result_t<A0>,
     611              :         detail::awaitable_result_t<As>...>>
     612              : {
     613              :     using result_type = detail::when_any_result_t<
     614              :         detail::awaitable_result_t<A0>,
     615              :         detail::awaitable_result_t<As>...>;
     616              : 
     617              :     detail::when_any_state<
     618              :         detail::awaitable_result_t<A0>,
     619              :         detail::awaitable_result_t<As>...> state;
     620              :     std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
     621              : 
     622              :     co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
     623              : 
     624              :     if(state.core_.winner_exception_)
     625              :         std::rethrow_exception(state.core_.winner_exception_);
     626              : 
     627              :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     628           74 : }
     629              : 
     630              : /** Concept for ranges of full I/O awaitables.
     631              : 
     632              :     A range satisfies `IoAwaitableRange` if it is a sized input range
     633              :     whose value type satisfies @ref IoAwaitable. This enables when_any
     634              :     to accept any container or view of awaitables, not just std::vector.
     635              : 
     636              :     @tparam R The range type.
     637              : 
     638              :     @par Requirements
     639              :     @li `R` must satisfy `std::ranges::input_range`
     640              :     @li `R` must satisfy `std::ranges::sized_range`
     641              :     @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
     642              : 
     643              :     @par Syntactic Requirements
     644              :     Given `r` of type `R`:
     645              :     @li `std::ranges::begin(r)` is valid
     646              :     @li `std::ranges::end(r)` is valid
     647              :     @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
     648              :     @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
     649              : 
     650              :     @par Example
     651              :     @code
     652              :     template<IoAwaitableRange R>
     653              :     task<void> race_all(R&& awaitables) {
     654              :         auto winner = co_await when_any(std::forward<R>(awaitables));
     655              :         // Process winner...
     656              :     }
     657              :     @endcode
     658              : 
     659              :     @see when_any, IoAwaitable
     660              : */
     661              : template<typename R>
     662              : concept IoAwaitableRange =
     663              :     std::ranges::input_range<R> &&
     664              :     std::ranges::sized_range<R> &&
     665              :     IoAwaitable<std::ranges::range_value_t<R>>;
     666              : 
     667              : namespace detail {
     668              : 
     669              : /** Shared state for homogeneous when_any (range overload).
     670              : 
     671              :     Uses composition with when_any_core for shared functionality.
     672              :     Simpler than heterogeneous: optional<T> instead of variant, vector
     673              :     instead of array for runner handles.
     674              : */
     675              : template<typename T>
     676              : struct when_any_homogeneous_state
     677              : {
     678              :     when_any_core core_;
     679              :     std::optional<T> result_;
     680              :     std::vector<coro> runner_handles_;
     681              : 
     682           13 :     explicit when_any_homogeneous_state(std::size_t count)
     683           13 :         : core_(count)
     684           26 :         , runner_handles_(count)
     685              :     {
     686           13 :     }
     687              : 
     688              :     // Runners self-destruct in final_suspend. No destruction needed here.
     689              : 
     690              :     /** @pre core_.try_win() returned true. */
     691           11 :     void set_winner_result(T value)
     692              :         noexcept(std::is_nothrow_move_constructible_v<T>)
     693              :     {
     694           11 :         result_.emplace(std::move(value));
     695           11 :     }
     696              : };
     697              : 
     698              : /** Specialization for void tasks (no result storage needed). */
     699              : template<>
     700              : struct when_any_homogeneous_state<void>
     701              : {
     702              :     when_any_core core_;
     703              :     std::vector<coro> runner_handles_;
     704              : 
     705            2 :     explicit when_any_homogeneous_state(std::size_t count)
     706            2 :         : core_(count)
     707            4 :         , runner_handles_(count)
     708              :     {
     709            2 :     }
     710              : 
     711              :     // Runners self-destruct in final_suspend. No destruction needed here.
     712              : 
     713              :     // No set_winner_result - void tasks have no result to store
     714              : };
     715              : 
     716              : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     717              : template<IoAwaitableRange Range>
     718              : class when_any_homogeneous_launcher
     719              : {
     720              :     using Awaitable = std::ranges::range_value_t<Range>;
     721              :     using T = awaitable_result_t<Awaitable>;
     722              : 
     723              :     Range* range_;
     724              :     when_any_homogeneous_state<T>* state_;
     725              : 
     726              : public:
     727           15 :     when_any_homogeneous_launcher(
     728              :         Range* range,
     729              :         when_any_homogeneous_state<T>* state)
     730           15 :         : range_(range)
     731           15 :         , state_(state)
     732              :     {
     733           15 :     }
     734              : 
     735           15 :     bool await_ready() const noexcept
     736              :     {
     737           15 :         return std::ranges::empty(*range_);
     738              :     }
     739              : 
     740              :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     741              :         destroys this object before await_suspend returns. Must not reference
     742              :         `this` after dispatching begins.
     743              : 
     744              :         Two-phase approach:
     745              :         1. Create all runners (safe - no dispatch yet)
     746              :         2. Dispatch all runners (any may complete synchronously)
     747              :     */
     748              :     template<Executor Ex>
     749           15 :     coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
     750              :     {
     751           15 :         state_->core_.continuation_ = continuation;
     752           15 :         state_->core_.caller_ex_ = caller_ex;
     753              : 
     754           15 :         if(parent_token.stop_possible())
     755              :         {
     756           10 :             state_->core_.parent_stop_callback_.emplace(
     757              :                 parent_token,
     758            5 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     759              : 
     760            5 :             if(parent_token.stop_requested())
     761            2 :                 state_->core_.stop_source_.request_stop();
     762              :         }
     763              : 
     764           15 :         auto token = state_->core_.stop_source_.get_token();
     765              : 
     766              :         // Phase 1: Create all runners without dispatching.
     767              :         // This iterates over *range_ safely because no runners execute yet.
     768           15 :         std::size_t index = 0;
     769           67 :         for(auto&& a : *range_)
     770              :         {
     771           52 :             auto runner = make_when_any_runner(
     772           52 :                 std::move(a), state_, index);
     773              : 
     774           52 :             auto h = runner.release();
     775           52 :             h.promise().state_ = state_;
     776           52 :             h.promise().index_ = index;
     777           52 :             h.promise().ex_ = caller_ex;
     778           52 :             h.promise().stop_token_ = token;
     779              : 
     780           52 :             state_->runner_handles_[index] = coro{h};
     781           52 :             ++index;
     782              :         }
     783              : 
     784              :         // Phase 2: Dispatch all runners. Any may complete synchronously.
     785              :         // After last dispatch, state_ and this may be destroyed.
     786              :         // Use raw pointer/count captured before dispatching.
     787           15 :         coro* handles = state_->runner_handles_.data();
     788           15 :         std::size_t count = state_->runner_handles_.size();
     789           67 :         for(std::size_t i = 0; i < count; ++i)
     790           52 :             caller_ex.dispatch(handles[i]);
     791              : 
     792           30 :         return std::noop_coroutine();
     793           15 :     }
     794              : 
     795           15 :     void await_resume() const noexcept
     796              :     {
     797           15 :     }
     798              : };
     799              : 
     800              : } // namespace detail
     801              : 
     802              : /** Wait for the first awaitable to complete (range overload).
     803              : 
     804              :     Races a range of awaitables with the same result type. Accepts any
     805              :     sized input range of IoAwaitable types, enabling use with arrays,
     806              :     spans, or custom containers.
     807              : 
     808              :     @par Suspends
     809              :     The calling coroutine suspends when co_await is invoked. All awaitables
     810              :     in the range are launched concurrently and execute in parallel. The
     811              :     coroutine resumes only after all awaitables have completed, even though
     812              :     the winner is determined by the first to finish.
     813              : 
     814              :     @par Completion Conditions
     815              :     @li Winner is determined when the first awaitable completes (success or exception)
     816              :     @li Only one task can claim winner status via atomic compare-exchange
     817              :     @li Once a winner exists, stop is requested for all remaining siblings
     818              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     819              :     @li The winner's index and result are returned; if the winner threw, the exception is rethrown
     820              : 
     821              :     @par Cancellation Semantics
     822              :     Cancellation is supported via stop_token propagated through the
     823              :     IoAwaitable protocol:
     824              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     825              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     826              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     827              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     828              :     @li Stop requests are cooperative; tasks must check and respond to them
     829              : 
     830              :     @par Concurrency/Overlap
     831              :     All awaitables are launched concurrently before any can complete.
     832              :     The launcher iterates through the range, starting each task on the
     833              :     caller's executor. Tasks may execute in parallel on multi-threaded
     834              :     executors or interleave on single-threaded executors. There is no
     835              :     guaranteed ordering of task completion.
     836              : 
     837              :     @par Notable Error Conditions
     838              :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     839              :     @li Winner exception: if the winning task threw, that exception is rethrown
     840              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     841              :     @li Cancellation: tasks may complete via cancellation without throwing
     842              : 
     843              :     @par Example
     844              :     @code
     845              :     task<void> example() {
     846              :         std::array<task<Response>, 3> requests = {
     847              :             fetch_from_server(0),
     848              :             fetch_from_server(1),
     849              :             fetch_from_server(2)
     850              :         };
     851              : 
     852              :         auto [index, response] = co_await when_any(std::move(requests));
     853              :     }
     854              :     @endcode
     855              : 
     856              :     @par Example with Vector
     857              :     @code
     858              :     task<Response> fetch_fastest(std::vector<Server> const& servers) {
     859              :         std::vector<task<Response>> requests;
     860              :         for (auto const& server : servers)
     861              :             requests.push_back(fetch_from(server));
     862              : 
     863              :         auto [index, response] = co_await when_any(std::move(requests));
     864              :         co_return response;
     865              :     }
     866              :     @endcode
     867              : 
     868              :     @tparam R Range type satisfying IoAwaitableRange.
     869              :     @param awaitables Range of awaitables to race concurrently (must not be empty).
     870              :     @return A task yielding a pair of (winner_index, result).
     871              : 
     872              :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     873              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     874              : 
     875              :     @par Remarks
     876              :     Elements are moved from the range; for lvalue ranges, the original
     877              :     container will have moved-from elements after this call. The range
     878              :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     879              :     the variadic overload, no variant wrapper is needed since all tasks
     880              :     share the same return type.
     881              : 
     882              :     @see when_any, IoAwaitableRange
     883              : */
     884              : template<IoAwaitableRange R>
     885              :     requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
     886           14 : [[nodiscard]] auto when_any(R&& awaitables)
     887              :     -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
     888              : {
     889              :     using Awaitable = std::ranges::range_value_t<R>;
     890              :     using T = detail::awaitable_result_t<Awaitable>;
     891              :     using result_type = std::pair<std::size_t, T>;
     892              :     using OwnedRange = std::remove_cvref_t<R>;
     893              : 
     894              :     auto count = std::ranges::size(awaitables);
     895              :     if(count == 0)
     896              :         throw std::invalid_argument("when_any requires at least one awaitable");
     897              : 
     898              :     // Move/copy range onto coroutine frame to ensure lifetime
     899              :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     900              : 
     901              :     detail::when_any_homogeneous_state<T> state(count);
     902              : 
     903              :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     904              : 
     905              :     if(state.core_.winner_exception_)
     906              :         std::rethrow_exception(state.core_.winner_exception_);
     907              : 
     908              :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     909           28 : }
     910              : 
     911              : /** Wait for the first awaitable to complete (void range overload).
     912              : 
     913              :     Races a range of void-returning awaitables. Since void awaitables have
     914              :     no result value, only the winner's index is returned.
     915              : 
     916              :     @par Suspends
     917              :     The calling coroutine suspends when co_await is invoked. All awaitables
     918              :     in the range are launched concurrently and execute in parallel. The
     919              :     coroutine resumes only after all awaitables have completed, even though
     920              :     the winner is determined by the first to finish.
     921              : 
     922              :     @par Completion Conditions
     923              :     @li Winner is determined when the first awaitable completes (success or exception)
     924              :     @li Only one task can claim winner status via atomic compare-exchange
     925              :     @li Once a winner exists, stop is requested for all remaining siblings
     926              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     927              :     @li The winner's index is returned; if the winner threw, the exception is rethrown
     928              : 
     929              :     @par Cancellation Semantics
     930              :     Cancellation is supported via stop_token propagated through the
     931              :     IoAwaitable protocol:
     932              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     933              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     934              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     935              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     936              :     @li Stop requests are cooperative; tasks must check and respond to them
     937              : 
     938              :     @par Concurrency/Overlap
     939              :     All awaitables are launched concurrently before any can complete.
     940              :     The launcher iterates through the range, starting each task on the
     941              :     caller's executor. Tasks may execute in parallel on multi-threaded
     942              :     executors or interleave on single-threaded executors. There is no
     943              :     guaranteed ordering of task completion.
     944              : 
     945              :     @par Notable Error Conditions
     946              :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     947              :     @li Winner exception: if the winning task threw, that exception is rethrown
     948              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     949              :     @li Cancellation: tasks may complete via cancellation without throwing
     950              : 
     951              :     @par Example
     952              :     @code
     953              :     task<void> example() {
     954              :         std::vector<task<void>> tasks;
     955              :         for (int i = 0; i < 5; ++i)
     956              :             tasks.push_back(background_work(i));
     957              : 
     958              :         std::size_t winner = co_await when_any(std::move(tasks));
     959              :         // winner is the index of the first task to complete
     960              :     }
     961              :     @endcode
     962              : 
     963              :     @par Example with Timeout
     964              :     @code
     965              :     task<void> with_timeout() {
     966              :         std::vector<task<void>> tasks;
     967              :         tasks.push_back(long_running_operation());
     968              :         tasks.push_back(delay(std::chrono::seconds(5)));
     969              : 
     970              :         std::size_t winner = co_await when_any(std::move(tasks));
     971              :         if (winner == 1) {
     972              :             // Timeout occurred
     973              :         }
     974              :     }
     975              :     @endcode
     976              : 
     977              :     @tparam R Range type satisfying IoAwaitableRange with void result.
     978              :     @param awaitables Range of void awaitables to race concurrently (must not be empty).
     979              :     @return A task yielding the winner's index (zero-based).
     980              : 
     981              :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     982              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     983              : 
     984              :     @par Remarks
     985              :     Elements are moved from the range; for lvalue ranges, the original
     986              :     container will have moved-from elements after this call. The range
     987              :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     988              :     the non-void overload, no result storage is needed since void tasks
     989              :     produce no value.
     990              : 
     991              :     @see when_any, IoAwaitableRange
     992              : */
     993              : template<IoAwaitableRange R>
     994              :     requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
     995            2 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
     996              : {
     997              :     using OwnedRange = std::remove_cvref_t<R>;
     998              : 
     999              :     auto count = std::ranges::size(awaitables);
    1000              :     if(count == 0)
    1001              :         throw std::invalid_argument("when_any requires at least one awaitable");
    1002              : 
    1003              :     // Move/copy range onto coroutine frame to ensure lifetime
    1004              :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
    1005              : 
    1006              :     detail::when_any_homogeneous_state<void> state(count);
    1007              : 
    1008              :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
    1009              : 
    1010              :     if(state.core_.winner_exception_)
    1011              :         std::rethrow_exception(state.core_.winner_exception_);
    1012              : 
    1013              :     co_return state.core_.winner_index_;
    1014            4 : }
    1015              : 
    1016              : } // namespace capy
    1017              : } // namespace boost
    1018              : 
    1019              : #endif
        

Generated by: LCOV version 2.3