libs/capy/include/boost/capy/when_any.hpp

99.2% Lines (131/132) 96.2% Functions (281/292) 100.0% Branches (27/27)
libs/capy/include/boost/capy/when_any.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 #define BOOST_CAPY_WHEN_ANY_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/task.hpp>
20
21 #include <array>
22 #include <atomic>
23 #include <exception>
24 #include <optional>
25 #include <ranges>
26 #include <stdexcept>
27 #include <stop_token>
28 #include <tuple>
29 #include <type_traits>
30 #include <utility>
31 #include <variant>
32 #include <vector>
33
34 /*
35 when_any - Race multiple tasks, return first completion
36 ========================================================
37
38 OVERVIEW:
39 ---------
40 when_any launches N tasks concurrently and completes when the FIRST task
41 finishes (success or failure). It then requests stop for all siblings and
42 waits for them to acknowledge before returning.
43
44 ARCHITECTURE:
45 -------------
46 The design mirrors when_all but with inverted completion semantics:
47
48 when_all: complete when remaining_count reaches 0 (all done)
49 when_any: complete when has_winner becomes true (first done)
50 BUT still wait for remaining_count to reach 0 for cleanup
51
52 Key components:
53 - when_any_state: Shared state tracking winner and completion
54 - when_any_runner: Wrapper coroutine for each child task
55 - when_any_launcher: Awaitable that starts all runners concurrently
56
57 CRITICAL INVARIANTS:
58 --------------------
59 1. Exactly one task becomes the winner (via atomic compare_exchange)
60 2. All tasks must complete before parent resumes (cleanup safety)
61 3. Stop is requested immediately when winner is determined
62 4. Only the winner's result/exception is stored
63
64 TYPE DEDUPLICATION:
65 -------------------
66 std::variant requires unique alternative types. Since when_any can race
67 tasks with identical return types (e.g., three task<int>), we must
68 deduplicate types before constructing the variant.
69
70 Example: when_any(task<int>, task<string>, task<int>)
71 - Raw types after void->monostate: int, string, int
72 - Deduplicated variant: std::variant<int, string>
73 - Return: pair<size_t, variant<int, string>>
74
75 The winner_index tells you which task won (0, 1, or 2), while the variant
76 holds the result. Use the index to determine how to interpret the variant.
77
78 VOID HANDLING:
79 --------------
80 void tasks contribute std::monostate to the variant (then deduplicated).
81 All-void tasks result in: pair<size_t, variant<monostate>>
82
83 MEMORY MODEL:
84 -------------
85 Synchronization chain from winner's write to parent's read:
86
87 1. Winner thread writes result_/winner_exception_ (non-atomic)
88 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
89 3. Last task thread (may be winner or non-winner) calls signal_completion()
90 → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
91 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
92 5. Parent coroutine resumes and reads result_/winner_exception_
93
94 Synchronization analysis:
95 - All fetch_sub operations on remaining_count_ form a release sequence
96 - Winner's fetch_sub releases; subsequent fetch_sub operations participate
97 in the modification order of remaining_count_
98 - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
99 modification order, establishing happens-before from winner's writes
100 - Executor dispatch() is expected to provide queue-based synchronization
101 (release-on-post, acquire-on-execute) completing the chain to parent
102 - Even inline executors work (same thread = sequenced-before)
103
104 Alternative considered: Adding winner_ready_ atomic (set with release after
105 storing winner data, acquired before reading) would make synchronization
106 self-contained and not rely on executor implementation details. Current
107 approach is correct but requires careful reasoning about release sequences
108 and executor behavior.
109
110 EXCEPTION SEMANTICS:
111 --------------------
112 Unlike when_all (which captures first exception, discards others), when_any
113 treats exceptions as valid completions. If the winning task threw, that
114 exception is rethrown. Exceptions from non-winners are silently discarded.
115 */
116
117 namespace boost {
118 namespace capy {
119
120 namespace detail {
121
122 /** Convert void to monostate for variant storage.
123
124 std::variant<void, ...> is ill-formed, so void tasks contribute
125 std::monostate to the result variant instead. Non-void types
126 pass through unchanged.
127
128 @tparam T The type to potentially convert (void becomes monostate).
129 */
130 template<typename T>
131 using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
132
133 // Type deduplication: std::variant requires unique alternative types.
134 // Fold left over the type list, appending each type only if not already present.
135 template<typename Variant, typename T>
136 struct variant_append_if_unique;
137
138 template<typename... Vs, typename T>
139 struct variant_append_if_unique<std::variant<Vs...>, T>
140 {
141 using type = std::conditional_t<
142 (std::is_same_v<T, Vs> || ...),
143 std::variant<Vs...>,
144 std::variant<Vs..., T>>;
145 };
146
147 template<typename Accumulated, typename... Remaining>
148 struct deduplicate_impl;
149
150 template<typename Accumulated>
151 struct deduplicate_impl<Accumulated>
152 {
153 using type = Accumulated;
154 };
155
156 template<typename Accumulated, typename T, typename... Rest>
157 struct deduplicate_impl<Accumulated, T, Rest...>
158 {
159 using next = typename variant_append_if_unique<Accumulated, T>::type;
160 using type = typename deduplicate_impl<next, Rest...>::type;
161 };
162
163 // Deduplicated variant; void types become monostate before deduplication
164 template<typename T0, typename... Ts>
165 using unique_variant_t = typename deduplicate_impl<
166 std::variant<void_to_monostate_t<T0>>,
167 void_to_monostate_t<Ts>...>::type;
168
169 // Result: (winner_index, deduplicated_variant). Use index to disambiguate
170 // when multiple tasks share the same return type.
171 template<typename T0, typename... Ts>
172 using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
173
174 // Extract result type from any awaitable via await_resume()
175 template<typename A>
176 using awaitable_result_t = decltype(std::declval<std::decay_t<A>&>().await_resume());
177
178 /** Core shared state for when_any operations.
179
180 Contains all members and methods common to both heterogeneous (variadic)
181 and homogeneous (range) when_any implementations. State classes embed
182 this via composition to avoid CRTP destructor ordering issues.
183
184 @par Thread Safety
185 Atomic operations protect winner selection and completion count.
186 */
187 struct when_any_core
188 {
189 std::atomic<std::size_t> remaining_count_;
190 std::size_t winner_index_{0};
191 std::exception_ptr winner_exception_;
192 std::stop_source stop_source_;
193
194 // Bridges parent's stop token to our stop_source
195 struct stop_callback_fn
196 {
197 std::stop_source* source_;
198 6 void operator()() const noexcept { source_->request_stop(); }
199 };
200 using stop_callback_t = std::stop_callback<stop_callback_fn>;
201 std::optional<stop_callback_t> parent_stop_callback_;
202
203 coro continuation_;
204 executor_ref caller_ex_;
205
206 // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
207 std::atomic<bool> has_winner_{false};
208
209 52 explicit when_any_core(std::size_t count) noexcept
210 52 : remaining_count_(count)
211 {
212 52 }
213
214 /** Atomically claim winner status; exactly one task succeeds. */
215 145 bool try_win(std::size_t index) noexcept
216 {
217 145 bool expected = false;
218
2/2
✓ Branch 1 taken 52 times.
✓ Branch 2 taken 93 times.
145 if(has_winner_.compare_exchange_strong(
219 expected, true, std::memory_order_acq_rel))
220 {
221 52 winner_index_ = index;
222 52 stop_source_.request_stop();
223 52 return true;
224 }
225 93 return false;
226 }
227
228 /** @pre try_win() returned true. */
229 8 void set_winner_exception(std::exception_ptr ep) noexcept
230 {
231 8 winner_exception_ = ep;
232 8 }
233
234 // Runners signal completion directly via final_suspend; no member function needed.
235 };
236
237 /** Shared state for heterogeneous when_any operation.
238
239 Coordinates winner selection, result storage, and completion tracking
240 for all child tasks in a when_any operation. Uses composition with
241 when_any_core for shared functionality.
242
243 @par Lifetime
244 Allocated on the parent coroutine's frame, outlives all runners.
245
246 @tparam T0 First task's result type.
247 @tparam Ts Remaining tasks' result types.
248 */
249 template<typename T0, typename... Ts>
250 struct when_any_state
251 {
252 static constexpr std::size_t task_count = 1 + sizeof...(Ts);
253 using variant_type = unique_variant_t<T0, Ts...>;
254
255 when_any_core core_;
256 std::optional<variant_type> result_;
257 std::array<coro, task_count> runner_handles_{};
258
259 37 when_any_state()
260 37 : core_(task_count)
261 {
262 37 }
263
264 // Runners self-destruct in final_suspend. No destruction needed here.
265
266 /** @pre core_.try_win() returned true.
267 @note Uses in_place_type (not index) because variant is deduplicated.
268 */
269 template<typename T>
270 30 void set_winner_result(T value)
271 noexcept(std::is_nothrow_move_constructible_v<T>)
272 {
273 30 result_.emplace(std::in_place_type<T>, std::move(value));
274 30 }
275
276 /** @pre core_.try_win() returned true. */
277 2 void set_winner_void() noexcept
278 {
279 2 result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
280 2 }
281 };
282
283 /** Wrapper coroutine that runs a single child task for when_any.
284
285 Propagates executor/stop_token to the child, attempts to claim winner
286 status on completion, and signals completion for cleanup coordination.
287
288 @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
289 */
290 template<typename StateType>
291 struct when_any_runner
292 {
293 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
294 {
295 StateType* state_ = nullptr;
296 std::size_t index_ = 0;
297 executor_ref ex_;
298 std::stop_token stop_token_;
299
300 145 when_any_runner get_return_object() noexcept
301 {
302 145 return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
303 }
304
305 // Starts suspended; launcher sets up state/ex/token then resumes
306 145 std::suspend_always initial_suspend() noexcept
307 {
308 145 return {};
309 }
310
311 145 auto final_suspend() noexcept
312 {
313 struct awaiter
314 {
315 promise_type* p_;
316 bool await_ready() const noexcept { return false; }
317 void await_suspend(coro h) noexcept
318 {
319 // Extract everything needed for signaling before
320 // self-destruction. Inline dispatch may destroy
321 // state, so we can't access members after.
322 auto& core = p_->state_->core_;
323 auto* counter = &core.remaining_count_;
324 auto caller_ex = core.caller_ex_;
325 auto cont = core.continuation_;
326
327 // Self-destruct first - state no longer destroys runners
328 h.destroy();
329
330 // Signal completion. If last, dispatch parent.
331 // Uses only local copies - safe even if state
332 // is destroyed during inline dispatch.
333 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
334 if(remaining == 1)
335 caller_ex.dispatch(cont);
336 }
337 void await_resume() const noexcept {}
338 };
339 145 return awaiter{this};
340 }
341
342 133 void return_void() noexcept {}
343
344 // Exceptions are valid completions in when_any (unlike when_all)
345 12 void unhandled_exception()
346 {
347
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 4 times.
12 if(state_->core_.try_win(index_))
348 8 state_->core_.set_winner_exception(std::current_exception());
349 12 }
350
351 /** Injects executor and stop token into child awaitables. */
352 template<class Awaitable>
353 struct transform_awaiter
354 {
355 std::decay_t<Awaitable> a_;
356 promise_type* p_;
357
358 145 bool await_ready() { return a_.await_ready(); }
359 145 auto await_resume() { return a_.await_resume(); }
360
361 template<class Promise>
362 145 auto await_suspend(std::coroutine_handle<Promise> h)
363 {
364 145 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
365 }
366 };
367
368 template<class Awaitable>
369 145 auto await_transform(Awaitable&& a)
370 {
371 using A = std::decay_t<Awaitable>;
372 if constexpr (IoAwaitable<A>)
373 {
374 return transform_awaiter<Awaitable>{
375 290 std::forward<Awaitable>(a), this};
376 }
377 else
378 {
379 static_assert(sizeof(A) == 0, "requires IoAwaitable");
380 }
381 145 }
382 };
383
384 std::coroutine_handle<promise_type> h_;
385
386 145 explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
387 145 : h_(h)
388 {
389 145 }
390
391 // Enable move for all clang versions - some versions need it
392 when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
393
394 // Non-copyable
395 when_any_runner(when_any_runner const&) = delete;
396 when_any_runner& operator=(when_any_runner const&) = delete;
397 when_any_runner& operator=(when_any_runner&&) = delete;
398
399 145 auto release() noexcept
400 {
401 145 return std::exchange(h_, nullptr);
402 }
403 };
404
405 /** Wraps a child awaitable, attempts to claim winner on completion.
406
407 Uses requires-expressions to detect state capabilities:
408 - set_winner_void(): for heterogeneous void tasks (stores monostate)
409 - set_winner_result(): for non-void tasks
410 - Neither: for homogeneous void tasks (no result storage)
411 */
412 template<IoAwaitable Awaitable, typename StateType>
413 when_any_runner<StateType>
414
1/1
✓ Branch 1 taken 145 times.
145 make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
415 {
416 using T = awaitable_result_t<Awaitable>;
417 if constexpr (std::is_void_v<T>)
418 {
419 co_await std::move(inner);
420 if(state->core_.try_win(index))
421 {
422 // Heterogeneous void tasks store monostate in the variant
423 if constexpr (requires { state->set_winner_void(); })
424 state->set_winner_void();
425 // Homogeneous void tasks have no result to store
426 }
427 }
428 else
429 {
430 auto result = co_await std::move(inner);
431 if(state->core_.try_win(index))
432 {
433 // Defensive: move should not throw (already moved once), but we
434 // catch just in case since an uncaught exception would be devastating.
435 try
436 {
437 state->set_winner_result(std::move(result));
438 }
439 catch(...)
440 {
441 state->core_.set_winner_exception(std::current_exception());
442 }
443 }
444 }
445 290 }
446
447 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
448 template<IoAwaitable... Awaitables>
449 class when_any_launcher
450 {
451 using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
452
453 std::tuple<Awaitables...>* tasks_;
454 state_type* state_;
455
456 public:
457 37 when_any_launcher(
458 std::tuple<Awaitables...>* tasks,
459 state_type* state)
460 37 : tasks_(tasks)
461 37 , state_(state)
462 {
463 37 }
464
465 37 bool await_ready() const noexcept
466 {
467 37 return sizeof...(Awaitables) == 0;
468 }
469
470 /** CRITICAL: If the last task finishes synchronously, parent resumes and
471 destroys this object before await_suspend returns. Must not reference
472 `this` after the final launch_one call.
473 */
474 template<Executor Ex>
475 37 coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
476 {
477 37 state_->core_.continuation_ = continuation;
478 37 state_->core_.caller_ex_ = caller_ex;
479
480
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 29 times.
37 if(parent_token.stop_possible())
481 {
482 16 state_->core_.parent_stop_callback_.emplace(
483 parent_token,
484 8 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
485
486
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 6 times.
8 if(parent_token.stop_requested())
487 2 state_->core_.stop_source_.request_stop();
488 }
489
490 37 auto token = state_->core_.stop_source_.get_token();
491 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
492 (..., launch_one<Is>(caller_ex, token));
493
1/1
✓ Branch 1 taken 37 times.
37 }(std::index_sequence_for<Awaitables...>{});
494
495 74 return std::noop_coroutine();
496 37 }
497
498 37 void await_resume() const noexcept
499 {
500 37 }
501
502 private:
503 /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
504 template<std::size_t I, Executor Ex>
505 93 void launch_one(Ex const& caller_ex, std::stop_token token)
506 {
507
1/1
✓ Branch 2 taken 93 times.
93 auto runner = make_when_any_runner(
508 93 std::move(std::get<I>(*tasks_)), state_, I);
509
510 93 auto h = runner.release();
511 93 h.promise().state_ = state_;
512 93 h.promise().index_ = I;
513 93 h.promise().ex_ = caller_ex;
514 93 h.promise().stop_token_ = token;
515
516 93 coro ch{h};
517 93 state_->runner_handles_[I] = ch;
518
1/1
✓ Branch 1 taken 93 times.
93 caller_ex.dispatch(ch);
519 93 }
520 };
521
522 } // namespace detail
523
524 /** Wait for the first awaitable to complete.
525
526 Races multiple heterogeneous awaitables concurrently and returns when the
527 first one completes. The result includes the winner's index and a
528 deduplicated variant containing the result value.
529
530 @par Suspends
531 The calling coroutine suspends when co_await is invoked. All awaitables
532 are launched concurrently and execute in parallel. The coroutine resumes
533 only after all awaitables have completed, even though the winner is
534 determined by the first to finish.
535
536 @par Completion Conditions
537 @li Winner is determined when the first awaitable completes (success or exception)
538 @li Only one task can claim winner status via atomic compare-exchange
539 @li Once a winner exists, stop is requested for all remaining siblings
540 @li Parent coroutine resumes only after all siblings acknowledge completion
541 @li The winner's result is returned; if the winner threw, the exception is rethrown
542
543 @par Cancellation Semantics
544 Cancellation is supported via stop_token propagated through the
545 IoAwaitable protocol:
546 @li Each child awaitable receives a stop_token derived from a shared stop_source
547 @li When the parent's stop token is activated, the stop is forwarded to all children
548 @li When a winner is determined, stop_source_.request_stop() is called immediately
549 @li Siblings must handle cancellation gracefully and complete before parent resumes
550 @li Stop requests are cooperative; tasks must check and respond to them
551
552 @par Concurrency/Overlap
553 All awaitables are launched concurrently before any can complete.
554 The launcher iterates through the arguments, starting each task on the
555 caller's executor. Tasks may execute in parallel on multi-threaded
556 executors or interleave on single-threaded executors. There is no
557 guaranteed ordering of task completion.
558
559 @par Notable Error Conditions
560 @li Winner exception: if the winning task threw, that exception is rethrown
561 @li Non-winner exceptions: silently discarded (only winner's result matters)
562 @li Cancellation: tasks may complete via cancellation without throwing
563
564 @par Example
565 @code
566 task<void> example() {
567 auto [index, result] = co_await when_any(
568 fetch_from_primary(), // task<Response>
569 fetch_from_backup() // task<Response>
570 );
571 // index is 0 or 1, result holds the winner's Response
572 auto response = std::get<Response>(result);
573 }
574 @endcode
575
576 @par Example with Heterogeneous Types
577 @code
578 task<void> mixed_types() {
579 auto [index, result] = co_await when_any(
580 fetch_int(), // task<int>
581 fetch_string() // task<std::string>
582 );
583 if (index == 0)
584 std::cout << "Got int: " << std::get<int>(result) << "\n";
585 else
586 std::cout << "Got string: " << std::get<std::string>(result) << "\n";
587 }
588 @endcode
589
590 @tparam A0 First awaitable type (must satisfy IoAwaitable).
591 @tparam As Remaining awaitable types (must satisfy IoAwaitable).
592 @param a0 The first awaitable to race.
593 @param as Additional awaitables to race concurrently.
594 @return A task yielding a pair of (winner_index, result_variant).
595
596 @throws Rethrows the winner's exception if the winning task threw an exception.
597
598 @par Remarks
599 Awaitables are moved into the coroutine frame; original objects become
600 empty after the call. When multiple awaitables share the same return type,
601 the variant is deduplicated to contain only unique types. Use the winner
602 index to determine which awaitable completed first. Void awaitables
603 contribute std::monostate to the variant.
604
605 @see when_all, IoAwaitable
606 */
607 template<IoAwaitable A0, IoAwaitable... As>
608
1/1
✓ Branch 1 taken 37 times.
37 [[nodiscard]] auto when_any(A0 a0, As... as)
609 -> task<detail::when_any_result_t<
610 detail::awaitable_result_t<A0>,
611 detail::awaitable_result_t<As>...>>
612 {
613 using result_type = detail::when_any_result_t<
614 detail::awaitable_result_t<A0>,
615 detail::awaitable_result_t<As>...>;
616
617 detail::when_any_state<
618 detail::awaitable_result_t<A0>,
619 detail::awaitable_result_t<As>...> state;
620 std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
621
622 co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
623
624 if(state.core_.winner_exception_)
625 std::rethrow_exception(state.core_.winner_exception_);
626
627 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
628 74 }
629
630 /** Concept for ranges of full I/O awaitables.
631
632 A range satisfies `IoAwaitableRange` if it is a sized input range
633 whose value type satisfies @ref IoAwaitable. This enables when_any
634 to accept any container or view of awaitables, not just std::vector.
635
636 @tparam R The range type.
637
638 @par Requirements
639 @li `R` must satisfy `std::ranges::input_range`
640 @li `R` must satisfy `std::ranges::sized_range`
641 @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
642
643 @par Syntactic Requirements
644 Given `r` of type `R`:
645 @li `std::ranges::begin(r)` is valid
646 @li `std::ranges::end(r)` is valid
647 @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
648 @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
649
650 @par Example
651 @code
652 template<IoAwaitableRange R>
653 task<void> race_all(R&& awaitables) {
654 auto winner = co_await when_any(std::forward<R>(awaitables));
655 // Process winner...
656 }
657 @endcode
658
659 @see when_any, IoAwaitable
660 */
661 template<typename R>
662 concept IoAwaitableRange =
663 std::ranges::input_range<R> &&
664 std::ranges::sized_range<R> &&
665 IoAwaitable<std::ranges::range_value_t<R>>;
666
667 namespace detail {
668
669 /** Shared state for homogeneous when_any (range overload).
670
671 Uses composition with when_any_core for shared functionality.
672 Simpler than heterogeneous: optional<T> instead of variant, vector
673 instead of array for runner handles.
674 */
675 template<typename T>
676 struct when_any_homogeneous_state
677 {
678 when_any_core core_;
679 std::optional<T> result_;
680 std::vector<coro> runner_handles_;
681
682 13 explicit when_any_homogeneous_state(std::size_t count)
683 13 : core_(count)
684
1/1
✓ Branch 2 taken 13 times.
26 , runner_handles_(count)
685 {
686 13 }
687
688 // Runners self-destruct in final_suspend. No destruction needed here.
689
690 /** @pre core_.try_win() returned true. */
691 11 void set_winner_result(T value)
692 noexcept(std::is_nothrow_move_constructible_v<T>)
693 {
694 11 result_.emplace(std::move(value));
695 11 }
696 };
697
698 /** Specialization for void tasks (no result storage needed). */
699 template<>
700 struct when_any_homogeneous_state<void>
701 {
702 when_any_core core_;
703 std::vector<coro> runner_handles_;
704
705 2 explicit when_any_homogeneous_state(std::size_t count)
706 2 : core_(count)
707
1/1
✓ Branch 1 taken 2 times.
4 , runner_handles_(count)
708 {
709 2 }
710
711 // Runners self-destruct in final_suspend. No destruction needed here.
712
713 // No set_winner_result - void tasks have no result to store
714 };
715
716 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
717 template<IoAwaitableRange Range>
718 class when_any_homogeneous_launcher
719 {
720 using Awaitable = std::ranges::range_value_t<Range>;
721 using T = awaitable_result_t<Awaitable>;
722
723 Range* range_;
724 when_any_homogeneous_state<T>* state_;
725
726 public:
727 15 when_any_homogeneous_launcher(
728 Range* range,
729 when_any_homogeneous_state<T>* state)
730 15 : range_(range)
731 15 , state_(state)
732 {
733 15 }
734
735 15 bool await_ready() const noexcept
736 {
737 15 return std::ranges::empty(*range_);
738 }
739
740 /** CRITICAL: If the last task finishes synchronously, parent resumes and
741 destroys this object before await_suspend returns. Must not reference
742 `this` after dispatching begins.
743
744 Two-phase approach:
745 1. Create all runners (safe - no dispatch yet)
746 2. Dispatch all runners (any may complete synchronously)
747 */
748 template<Executor Ex>
749 15 coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
750 {
751 15 state_->core_.continuation_ = continuation;
752 15 state_->core_.caller_ex_ = caller_ex;
753
754
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 10 times.
15 if(parent_token.stop_possible())
755 {
756 10 state_->core_.parent_stop_callback_.emplace(
757 parent_token,
758 5 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
759
760
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 3 times.
5 if(parent_token.stop_requested())
761 2 state_->core_.stop_source_.request_stop();
762 }
763
764 15 auto token = state_->core_.stop_source_.get_token();
765
766 // Phase 1: Create all runners without dispatching.
767 // This iterates over *range_ safely because no runners execute yet.
768 15 std::size_t index = 0;
769
2/2
✓ Branch 4 taken 52 times.
✓ Branch 5 taken 15 times.
67 for(auto&& a : *range_)
770 {
771
1/1
✓ Branch 2 taken 52 times.
52 auto runner = make_when_any_runner(
772 52 std::move(a), state_, index);
773
774 52 auto h = runner.release();
775 52 h.promise().state_ = state_;
776 52 h.promise().index_ = index;
777 52 h.promise().ex_ = caller_ex;
778 52 h.promise().stop_token_ = token;
779
780 52 state_->runner_handles_[index] = coro{h};
781 52 ++index;
782 }
783
784 // Phase 2: Dispatch all runners. Any may complete synchronously.
785 // After last dispatch, state_ and this may be destroyed.
786 // Use raw pointer/count captured before dispatching.
787 15 coro* handles = state_->runner_handles_.data();
788 15 std::size_t count = state_->runner_handles_.size();
789
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 15 times.
67 for(std::size_t i = 0; i < count; ++i)
790
1/1
✓ Branch 1 taken 52 times.
52 caller_ex.dispatch(handles[i]);
791
792 30 return std::noop_coroutine();
793 15 }
794
795 15 void await_resume() const noexcept
796 {
797 15 }
798 };
799
800 } // namespace detail
801
802 /** Wait for the first awaitable to complete (range overload).
803
804 Races a range of awaitables with the same result type. Accepts any
805 sized input range of IoAwaitable types, enabling use with arrays,
806 spans, or custom containers.
807
808 @par Suspends
809 The calling coroutine suspends when co_await is invoked. All awaitables
810 in the range are launched concurrently and execute in parallel. The
811 coroutine resumes only after all awaitables have completed, even though
812 the winner is determined by the first to finish.
813
814 @par Completion Conditions
815 @li Winner is determined when the first awaitable completes (success or exception)
816 @li Only one task can claim winner status via atomic compare-exchange
817 @li Once a winner exists, stop is requested for all remaining siblings
818 @li Parent coroutine resumes only after all siblings acknowledge completion
819 @li The winner's index and result are returned; if the winner threw, the exception is rethrown
820
821 @par Cancellation Semantics
822 Cancellation is supported via stop_token propagated through the
823 IoAwaitable protocol:
824 @li Each child awaitable receives a stop_token derived from a shared stop_source
825 @li When the parent's stop token is activated, the stop is forwarded to all children
826 @li When a winner is determined, stop_source_.request_stop() is called immediately
827 @li Siblings must handle cancellation gracefully and complete before parent resumes
828 @li Stop requests are cooperative; tasks must check and respond to them
829
830 @par Concurrency/Overlap
831 All awaitables are launched concurrently before any can complete.
832 The launcher iterates through the range, starting each task on the
833 caller's executor. Tasks may execute in parallel on multi-threaded
834 executors or interleave on single-threaded executors. There is no
835 guaranteed ordering of task completion.
836
837 @par Notable Error Conditions
838 @li Empty range: throws std::invalid_argument immediately (not via co_return)
839 @li Winner exception: if the winning task threw, that exception is rethrown
840 @li Non-winner exceptions: silently discarded (only winner's result matters)
841 @li Cancellation: tasks may complete via cancellation without throwing
842
843 @par Example
844 @code
845 task<void> example() {
846 std::array<task<Response>, 3> requests = {
847 fetch_from_server(0),
848 fetch_from_server(1),
849 fetch_from_server(2)
850 };
851
852 auto [index, response] = co_await when_any(std::move(requests));
853 }
854 @endcode
855
856 @par Example with Vector
857 @code
858 task<Response> fetch_fastest(std::vector<Server> const& servers) {
859 std::vector<task<Response>> requests;
860 for (auto const& server : servers)
861 requests.push_back(fetch_from(server));
862
863 auto [index, response] = co_await when_any(std::move(requests));
864 co_return response;
865 }
866 @endcode
867
868 @tparam R Range type satisfying IoAwaitableRange.
869 @param awaitables Range of awaitables to race concurrently (must not be empty).
870 @return A task yielding a pair of (winner_index, result).
871
872 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
873 @throws Rethrows the winner's exception if the winning task threw an exception.
874
875 @par Remarks
876 Elements are moved from the range; for lvalue ranges, the original
877 container will have moved-from elements after this call. The range
878 is moved onto the coroutine frame to ensure lifetime safety. Unlike
879 the variadic overload, no variant wrapper is needed since all tasks
880 share the same return type.
881
882 @see when_any, IoAwaitableRange
883 */
884 template<IoAwaitableRange R>
885 requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
886
1/1
✓ Branch 1 taken 14 times.
14 [[nodiscard]] auto when_any(R&& awaitables)
887 -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
888 {
889 using Awaitable = std::ranges::range_value_t<R>;
890 using T = detail::awaitable_result_t<Awaitable>;
891 using result_type = std::pair<std::size_t, T>;
892 using OwnedRange = std::remove_cvref_t<R>;
893
894 auto count = std::ranges::size(awaitables);
895 if(count == 0)
896 throw std::invalid_argument("when_any requires at least one awaitable");
897
898 // Move/copy range onto coroutine frame to ensure lifetime
899 OwnedRange owned_awaitables = std::forward<R>(awaitables);
900
901 detail::when_any_homogeneous_state<T> state(count);
902
903 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
904
905 if(state.core_.winner_exception_)
906 std::rethrow_exception(state.core_.winner_exception_);
907
908 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
909 28 }
910
911 /** Wait for the first awaitable to complete (void range overload).
912
913 Races a range of void-returning awaitables. Since void awaitables have
914 no result value, only the winner's index is returned.
915
916 @par Suspends
917 The calling coroutine suspends when co_await is invoked. All awaitables
918 in the range are launched concurrently and execute in parallel. The
919 coroutine resumes only after all awaitables have completed, even though
920 the winner is determined by the first to finish.
921
922 @par Completion Conditions
923 @li Winner is determined when the first awaitable completes (success or exception)
924 @li Only one task can claim winner status via atomic compare-exchange
925 @li Once a winner exists, stop is requested for all remaining siblings
926 @li Parent coroutine resumes only after all siblings acknowledge completion
927 @li The winner's index is returned; if the winner threw, the exception is rethrown
928
929 @par Cancellation Semantics
930 Cancellation is supported via stop_token propagated through the
931 IoAwaitable protocol:
932 @li Each child awaitable receives a stop_token derived from a shared stop_source
933 @li When the parent's stop token is activated, the stop is forwarded to all children
934 @li When a winner is determined, stop_source_.request_stop() is called immediately
935 @li Siblings must handle cancellation gracefully and complete before parent resumes
936 @li Stop requests are cooperative; tasks must check and respond to them
937
938 @par Concurrency/Overlap
939 All awaitables are launched concurrently before any can complete.
940 The launcher iterates through the range, starting each task on the
941 caller's executor. Tasks may execute in parallel on multi-threaded
942 executors or interleave on single-threaded executors. There is no
943 guaranteed ordering of task completion.
944
945 @par Notable Error Conditions
946 @li Empty range: throws std::invalid_argument immediately (not via co_return)
947 @li Winner exception: if the winning task threw, that exception is rethrown
948 @li Non-winner exceptions: silently discarded (only winner's result matters)
949 @li Cancellation: tasks may complete via cancellation without throwing
950
951 @par Example
952 @code
953 task<void> example() {
954 std::vector<task<void>> tasks;
955 for (int i = 0; i < 5; ++i)
956 tasks.push_back(background_work(i));
957
958 std::size_t winner = co_await when_any(std::move(tasks));
959 // winner is the index of the first task to complete
960 }
961 @endcode
962
963 @par Example with Timeout
964 @code
965 task<void> with_timeout() {
966 std::vector<task<void>> tasks;
967 tasks.push_back(long_running_operation());
968 tasks.push_back(delay(std::chrono::seconds(5)));
969
970 std::size_t winner = co_await when_any(std::move(tasks));
971 if (winner == 1) {
972 // Timeout occurred
973 }
974 }
975 @endcode
976
977 @tparam R Range type satisfying IoAwaitableRange with void result.
978 @param awaitables Range of void awaitables to race concurrently (must not be empty).
979 @return A task yielding the winner's index (zero-based).
980
981 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
982 @throws Rethrows the winner's exception if the winning task threw an exception.
983
984 @par Remarks
985 Elements are moved from the range; for lvalue ranges, the original
986 container will have moved-from elements after this call. The range
987 is moved onto the coroutine frame to ensure lifetime safety. Unlike
988 the non-void overload, no result storage is needed since void tasks
989 produce no value.
990
991 @see when_any, IoAwaitableRange
992 */
993 template<IoAwaitableRange R>
994 requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
995
1/1
✓ Branch 1 taken 2 times.
2 [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
996 {
997 using OwnedRange = std::remove_cvref_t<R>;
998
999 auto count = std::ranges::size(awaitables);
1000 if(count == 0)
1001 throw std::invalid_argument("when_any requires at least one awaitable");
1002
1003 // Move/copy range onto coroutine frame to ensure lifetime
1004 OwnedRange owned_awaitables = std::forward<R>(awaitables);
1005
1006 detail::when_any_homogeneous_state<void> state(count);
1007
1008 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1009
1010 if(state.core_.winner_exception_)
1011 std::rethrow_exception(state.core_.winner_exception_);
1012
1013 co_return state.core_.winner_index_;
1014 4 }
1015
1016 } // namespace capy
1017 } // namespace boost
1018
1019 #endif
1020