Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/concept/executor.hpp>
15 : #include <boost/capy/concept/io_awaitable.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/frame_allocator.hpp>
19 : #include <boost/capy/task.hpp>
20 :
21 : #include <array>
22 : #include <atomic>
23 : #include <exception>
24 : #include <optional>
25 : #include <ranges>
26 : #include <stdexcept>
27 : #include <stop_token>
28 : #include <tuple>
29 : #include <type_traits>
30 : #include <utility>
31 : #include <variant>
32 : #include <vector>
33 :
34 : /*
35 : when_any - Race multiple tasks, return first completion
36 : ========================================================
37 :
38 : OVERVIEW:
39 : ---------
40 : when_any launches N tasks concurrently and completes when the FIRST task
41 : finishes (success or failure). It then requests stop for all siblings and
42 : waits for them to acknowledge before returning.
43 :
44 : ARCHITECTURE:
45 : -------------
46 : The design mirrors when_all but with inverted completion semantics:
47 :
48 : when_all: complete when remaining_count reaches 0 (all done)
49 : when_any: complete when has_winner becomes true (first done)
50 : BUT still wait for remaining_count to reach 0 for cleanup
51 :
52 : Key components:
53 : - when_any_state: Shared state tracking winner and completion
54 : - when_any_runner: Wrapper coroutine for each child task
55 : - when_any_launcher: Awaitable that starts all runners concurrently
56 :
57 : CRITICAL INVARIANTS:
58 : --------------------
59 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
60 : 2. All tasks must complete before parent resumes (cleanup safety)
61 : 3. Stop is requested immediately when winner is determined
62 : 4. Only the winner's result/exception is stored
63 :
64 : TYPE DEDUPLICATION:
65 : -------------------
66 : std::variant requires unique alternative types. Since when_any can race
67 : tasks with identical return types (e.g., three task<int>), we must
68 : deduplicate types before constructing the variant.
69 :
70 : Example: when_any(task<int>, task<string>, task<int>)
71 : - Raw types after void->monostate: int, string, int
72 : - Deduplicated variant: std::variant<int, string>
73 : - Return: pair<size_t, variant<int, string>>
74 :
75 : The winner_index tells you which task won (0, 1, or 2), while the variant
76 : holds the result. Use the index to determine how to interpret the variant.
77 :
78 : VOID HANDLING:
79 : --------------
80 : void tasks contribute std::monostate to the variant (then deduplicated).
81 : All-void tasks result in: pair<size_t, variant<monostate>>
82 :
83 : MEMORY MODEL:
84 : -------------
85 : Synchronization chain from winner's write to parent's read:
86 :
87 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
88 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
89 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
90 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
91 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
92 : 5. Parent coroutine resumes and reads result_/winner_exception_
93 :
94 : Synchronization analysis:
95 : - All fetch_sub operations on remaining_count_ form a release sequence
96 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
97 : in the modification order of remaining_count_
98 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
99 : modification order, establishing happens-before from winner's writes
100 : - Executor dispatch() is expected to provide queue-based synchronization
101 : (release-on-post, acquire-on-execute) completing the chain to parent
102 : - Even inline executors work (same thread = sequenced-before)
103 :
104 : Alternative considered: Adding winner_ready_ atomic (set with release after
105 : storing winner data, acquired before reading) would make synchronization
106 : self-contained and not rely on executor implementation details. Current
107 : approach is correct but requires careful reasoning about release sequences
108 : and executor behavior.
109 :
110 : EXCEPTION SEMANTICS:
111 : --------------------
112 : Unlike when_all (which captures first exception, discards others), when_any
113 : treats exceptions as valid completions. If the winning task threw, that
114 : exception is rethrown. Exceptions from non-winners are silently discarded.
115 : */
116 :
117 : namespace boost {
118 : namespace capy {
119 :
120 : namespace detail {
121 :
122 : /** Convert void to monostate for variant storage.
123 :
124 : std::variant<void, ...> is ill-formed, so void tasks contribute
125 : std::monostate to the result variant instead. Non-void types
126 : pass through unchanged.
127 :
128 : @tparam T The type to potentially convert (void becomes monostate).
129 : */
130 : template<typename T>
131 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
132 :
133 : // Type deduplication: std::variant requires unique alternative types.
134 : // Fold left over the type list, appending each type only if not already present.
135 : template<typename Variant, typename T>
136 : struct variant_append_if_unique;
137 :
138 : template<typename... Vs, typename T>
139 : struct variant_append_if_unique<std::variant<Vs...>, T>
140 : {
141 : using type = std::conditional_t<
142 : (std::is_same_v<T, Vs> || ...),
143 : std::variant<Vs...>,
144 : std::variant<Vs..., T>>;
145 : };
146 :
147 : template<typename Accumulated, typename... Remaining>
148 : struct deduplicate_impl;
149 :
150 : template<typename Accumulated>
151 : struct deduplicate_impl<Accumulated>
152 : {
153 : using type = Accumulated;
154 : };
155 :
156 : template<typename Accumulated, typename T, typename... Rest>
157 : struct deduplicate_impl<Accumulated, T, Rest...>
158 : {
159 : using next = typename variant_append_if_unique<Accumulated, T>::type;
160 : using type = typename deduplicate_impl<next, Rest...>::type;
161 : };
162 :
163 : // Deduplicated variant; void types become monostate before deduplication
164 : template<typename T0, typename... Ts>
165 : using unique_variant_t = typename deduplicate_impl<
166 : std::variant<void_to_monostate_t<T0>>,
167 : void_to_monostate_t<Ts>...>::type;
168 :
169 : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
170 : // when multiple tasks share the same return type.
171 : template<typename T0, typename... Ts>
172 : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
173 :
174 : // Extract result type from any awaitable via await_resume()
175 : template<typename A>
176 : using awaitable_result_t = decltype(std::declval<std::decay_t<A>&>().await_resume());
177 :
178 : /** Core shared state for when_any operations.
179 :
180 : Contains all members and methods common to both heterogeneous (variadic)
181 : and homogeneous (range) when_any implementations. State classes embed
182 : this via composition to avoid CRTP destructor ordering issues.
183 :
184 : @par Thread Safety
185 : Atomic operations protect winner selection and completion count.
186 : */
187 : struct when_any_core
188 : {
189 : std::atomic<std::size_t> remaining_count_;
190 : std::size_t winner_index_{0};
191 : std::exception_ptr winner_exception_;
192 : std::stop_source stop_source_;
193 :
194 : // Bridges parent's stop token to our stop_source
195 : struct stop_callback_fn
196 : {
197 : std::stop_source* source_;
198 6 : void operator()() const noexcept { source_->request_stop(); }
199 : };
200 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
201 : std::optional<stop_callback_t> parent_stop_callback_;
202 :
203 : coro continuation_;
204 : executor_ref caller_ex_;
205 :
206 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
207 : std::atomic<bool> has_winner_{false};
208 :
209 52 : explicit when_any_core(std::size_t count) noexcept
210 52 : : remaining_count_(count)
211 : {
212 52 : }
213 :
214 : /** Atomically claim winner status; exactly one task succeeds. */
215 145 : bool try_win(std::size_t index) noexcept
216 : {
217 145 : bool expected = false;
218 145 : if(has_winner_.compare_exchange_strong(
219 : expected, true, std::memory_order_acq_rel))
220 : {
221 52 : winner_index_ = index;
222 52 : stop_source_.request_stop();
223 52 : return true;
224 : }
225 93 : return false;
226 : }
227 :
228 : /** @pre try_win() returned true. */
229 8 : void set_winner_exception(std::exception_ptr ep) noexcept
230 : {
231 8 : winner_exception_ = ep;
232 8 : }
233 :
234 : // Runners signal completion directly via final_suspend; no member function needed.
235 : };
236 :
237 : /** Shared state for heterogeneous when_any operation.
238 :
239 : Coordinates winner selection, result storage, and completion tracking
240 : for all child tasks in a when_any operation. Uses composition with
241 : when_any_core for shared functionality.
242 :
243 : @par Lifetime
244 : Allocated on the parent coroutine's frame, outlives all runners.
245 :
246 : @tparam T0 First task's result type.
247 : @tparam Ts Remaining tasks' result types.
248 : */
249 : template<typename T0, typename... Ts>
250 : struct when_any_state
251 : {
252 : static constexpr std::size_t task_count = 1 + sizeof...(Ts);
253 : using variant_type = unique_variant_t<T0, Ts...>;
254 :
255 : when_any_core core_;
256 : std::optional<variant_type> result_;
257 : std::array<coro, task_count> runner_handles_{};
258 :
259 37 : when_any_state()
260 37 : : core_(task_count)
261 : {
262 37 : }
263 :
264 : // Runners self-destruct in final_suspend. No destruction needed here.
265 :
266 : /** @pre core_.try_win() returned true.
267 : @note Uses in_place_type (not index) because variant is deduplicated.
268 : */
269 : template<typename T>
270 30 : void set_winner_result(T value)
271 : noexcept(std::is_nothrow_move_constructible_v<T>)
272 : {
273 30 : result_.emplace(std::in_place_type<T>, std::move(value));
274 30 : }
275 :
276 : /** @pre core_.try_win() returned true. */
277 2 : void set_winner_void() noexcept
278 : {
279 2 : result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
280 2 : }
281 : };
282 :
283 : /** Wrapper coroutine that runs a single child task for when_any.
284 :
285 : Propagates executor/stop_token to the child, attempts to claim winner
286 : status on completion, and signals completion for cleanup coordination.
287 :
288 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
289 : */
290 : template<typename StateType>
291 : struct when_any_runner
292 : {
293 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
294 : {
295 : StateType* state_ = nullptr;
296 : std::size_t index_ = 0;
297 : executor_ref ex_;
298 : std::stop_token stop_token_;
299 :
300 145 : when_any_runner get_return_object() noexcept
301 : {
302 145 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
303 : }
304 :
305 : // Starts suspended; launcher sets up state/ex/token then resumes
306 145 : std::suspend_always initial_suspend() noexcept
307 : {
308 145 : return {};
309 : }
310 :
311 145 : auto final_suspend() noexcept
312 : {
313 : struct awaiter
314 : {
315 : promise_type* p_;
316 145 : bool await_ready() const noexcept { return false; }
317 145 : void await_suspend(coro h) noexcept
318 : {
319 : // Extract everything needed for signaling before
320 : // self-destruction. Inline dispatch may destroy
321 : // state, so we can't access members after.
322 145 : auto& core = p_->state_->core_;
323 145 : auto* counter = &core.remaining_count_;
324 145 : auto caller_ex = core.caller_ex_;
325 145 : auto cont = core.continuation_;
326 :
327 : // Self-destruct first - state no longer destroys runners
328 145 : h.destroy();
329 :
330 : // Signal completion. If last, dispatch parent.
331 : // Uses only local copies - safe even if state
332 : // is destroyed during inline dispatch.
333 145 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
334 145 : if(remaining == 1)
335 52 : caller_ex.dispatch(cont);
336 145 : }
337 0 : void await_resume() const noexcept {}
338 : };
339 145 : return awaiter{this};
340 : }
341 :
342 133 : void return_void() noexcept {}
343 :
344 : // Exceptions are valid completions in when_any (unlike when_all)
345 12 : void unhandled_exception()
346 : {
347 12 : if(state_->core_.try_win(index_))
348 8 : state_->core_.set_winner_exception(std::current_exception());
349 12 : }
350 :
351 : /** Injects executor and stop token into child awaitables. */
352 : template<class Awaitable>
353 : struct transform_awaiter
354 : {
355 : std::decay_t<Awaitable> a_;
356 : promise_type* p_;
357 :
358 145 : bool await_ready() { return a_.await_ready(); }
359 145 : auto await_resume() { return a_.await_resume(); }
360 :
361 : template<class Promise>
362 145 : auto await_suspend(std::coroutine_handle<Promise> h)
363 : {
364 145 : return a_.await_suspend(h, p_->ex_, p_->stop_token_);
365 : }
366 : };
367 :
368 : template<class Awaitable>
369 145 : auto await_transform(Awaitable&& a)
370 : {
371 : using A = std::decay_t<Awaitable>;
372 : if constexpr (IoAwaitable<A>)
373 : {
374 : return transform_awaiter<Awaitable>{
375 290 : std::forward<Awaitable>(a), this};
376 : }
377 : else
378 : {
379 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
380 : }
381 145 : }
382 : };
383 :
384 : std::coroutine_handle<promise_type> h_;
385 :
386 145 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
387 145 : : h_(h)
388 : {
389 145 : }
390 :
391 : // Enable move for all clang versions - some versions need it
392 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
393 :
394 : // Non-copyable
395 : when_any_runner(when_any_runner const&) = delete;
396 : when_any_runner& operator=(when_any_runner const&) = delete;
397 : when_any_runner& operator=(when_any_runner&&) = delete;
398 :
399 145 : auto release() noexcept
400 : {
401 145 : return std::exchange(h_, nullptr);
402 : }
403 : };
404 :
405 : /** Wraps a child awaitable, attempts to claim winner on completion.
406 :
407 : Uses requires-expressions to detect state capabilities:
408 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
409 : - set_winner_result(): for non-void tasks
410 : - Neither: for homogeneous void tasks (no result storage)
411 : */
412 : template<IoAwaitable Awaitable, typename StateType>
413 : when_any_runner<StateType>
414 145 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
415 : {
416 : using T = awaitable_result_t<Awaitable>;
417 : if constexpr (std::is_void_v<T>)
418 : {
419 : co_await std::move(inner);
420 : if(state->core_.try_win(index))
421 : {
422 : // Heterogeneous void tasks store monostate in the variant
423 : if constexpr (requires { state->set_winner_void(); })
424 : state->set_winner_void();
425 : // Homogeneous void tasks have no result to store
426 : }
427 : }
428 : else
429 : {
430 : auto result = co_await std::move(inner);
431 : if(state->core_.try_win(index))
432 : {
433 : // Defensive: move should not throw (already moved once), but we
434 : // catch just in case since an uncaught exception would be devastating.
435 : try
436 : {
437 : state->set_winner_result(std::move(result));
438 : }
439 : catch(...)
440 : {
441 : state->core_.set_winner_exception(std::current_exception());
442 : }
443 : }
444 : }
445 290 : }
446 :
447 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
448 : template<IoAwaitable... Awaitables>
449 : class when_any_launcher
450 : {
451 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
452 :
453 : std::tuple<Awaitables...>* tasks_;
454 : state_type* state_;
455 :
456 : public:
457 37 : when_any_launcher(
458 : std::tuple<Awaitables...>* tasks,
459 : state_type* state)
460 37 : : tasks_(tasks)
461 37 : , state_(state)
462 : {
463 37 : }
464 :
465 37 : bool await_ready() const noexcept
466 : {
467 37 : return sizeof...(Awaitables) == 0;
468 : }
469 :
470 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
471 : destroys this object before await_suspend returns. Must not reference
472 : `this` after the final launch_one call.
473 : */
474 : template<Executor Ex>
475 37 : coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
476 : {
477 37 : state_->core_.continuation_ = continuation;
478 37 : state_->core_.caller_ex_ = caller_ex;
479 :
480 37 : if(parent_token.stop_possible())
481 : {
482 16 : state_->core_.parent_stop_callback_.emplace(
483 : parent_token,
484 8 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
485 :
486 8 : if(parent_token.stop_requested())
487 2 : state_->core_.stop_source_.request_stop();
488 : }
489 :
490 37 : auto token = state_->core_.stop_source_.get_token();
491 74 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
492 37 : (..., launch_one<Is>(caller_ex, token));
493 37 : }(std::index_sequence_for<Awaitables...>{});
494 :
495 74 : return std::noop_coroutine();
496 37 : }
497 :
498 37 : void await_resume() const noexcept
499 : {
500 37 : }
501 :
502 : private:
503 : /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
504 : template<std::size_t I, Executor Ex>
505 93 : void launch_one(Ex const& caller_ex, std::stop_token token)
506 : {
507 93 : auto runner = make_when_any_runner(
508 93 : std::move(std::get<I>(*tasks_)), state_, I);
509 :
510 93 : auto h = runner.release();
511 93 : h.promise().state_ = state_;
512 93 : h.promise().index_ = I;
513 93 : h.promise().ex_ = caller_ex;
514 93 : h.promise().stop_token_ = token;
515 :
516 93 : coro ch{h};
517 93 : state_->runner_handles_[I] = ch;
518 93 : caller_ex.dispatch(ch);
519 93 : }
520 : };
521 :
522 : } // namespace detail
523 :
524 : /** Wait for the first awaitable to complete.
525 :
526 : Races multiple heterogeneous awaitables concurrently and returns when the
527 : first one completes. The result includes the winner's index and a
528 : deduplicated variant containing the result value.
529 :
530 : @par Suspends
531 : The calling coroutine suspends when co_await is invoked. All awaitables
532 : are launched concurrently and execute in parallel. The coroutine resumes
533 : only after all awaitables have completed, even though the winner is
534 : determined by the first to finish.
535 :
536 : @par Completion Conditions
537 : @li Winner is determined when the first awaitable completes (success or exception)
538 : @li Only one task can claim winner status via atomic compare-exchange
539 : @li Once a winner exists, stop is requested for all remaining siblings
540 : @li Parent coroutine resumes only after all siblings acknowledge completion
541 : @li The winner's result is returned; if the winner threw, the exception is rethrown
542 :
543 : @par Cancellation Semantics
544 : Cancellation is supported via stop_token propagated through the
545 : IoAwaitable protocol:
546 : @li Each child awaitable receives a stop_token derived from a shared stop_source
547 : @li When the parent's stop token is activated, the stop is forwarded to all children
548 : @li When a winner is determined, stop_source_.request_stop() is called immediately
549 : @li Siblings must handle cancellation gracefully and complete before parent resumes
550 : @li Stop requests are cooperative; tasks must check and respond to them
551 :
552 : @par Concurrency/Overlap
553 : All awaitables are launched concurrently before any can complete.
554 : The launcher iterates through the arguments, starting each task on the
555 : caller's executor. Tasks may execute in parallel on multi-threaded
556 : executors or interleave on single-threaded executors. There is no
557 : guaranteed ordering of task completion.
558 :
559 : @par Notable Error Conditions
560 : @li Winner exception: if the winning task threw, that exception is rethrown
561 : @li Non-winner exceptions: silently discarded (only winner's result matters)
562 : @li Cancellation: tasks may complete via cancellation without throwing
563 :
564 : @par Example
565 : @code
566 : task<void> example() {
567 : auto [index, result] = co_await when_any(
568 : fetch_from_primary(), // task<Response>
569 : fetch_from_backup() // task<Response>
570 : );
571 : // index is 0 or 1, result holds the winner's Response
572 : auto response = std::get<Response>(result);
573 : }
574 : @endcode
575 :
576 : @par Example with Heterogeneous Types
577 : @code
578 : task<void> mixed_types() {
579 : auto [index, result] = co_await when_any(
580 : fetch_int(), // task<int>
581 : fetch_string() // task<std::string>
582 : );
583 : if (index == 0)
584 : std::cout << "Got int: " << std::get<int>(result) << "\n";
585 : else
586 : std::cout << "Got string: " << std::get<std::string>(result) << "\n";
587 : }
588 : @endcode
589 :
590 : @tparam A0 First awaitable type (must satisfy IoAwaitable).
591 : @tparam As Remaining awaitable types (must satisfy IoAwaitable).
592 : @param a0 The first awaitable to race.
593 : @param as Additional awaitables to race concurrently.
594 : @return A task yielding a pair of (winner_index, result_variant).
595 :
596 : @throws Rethrows the winner's exception if the winning task threw an exception.
597 :
598 : @par Remarks
599 : Awaitables are moved into the coroutine frame; original objects become
600 : empty after the call. When multiple awaitables share the same return type,
601 : the variant is deduplicated to contain only unique types. Use the winner
602 : index to determine which awaitable completed first. Void awaitables
603 : contribute std::monostate to the variant.
604 :
605 : @see when_all, IoAwaitable
606 : */
607 : template<IoAwaitable A0, IoAwaitable... As>
608 37 : [[nodiscard]] auto when_any(A0 a0, As... as)
609 : -> task<detail::when_any_result_t<
610 : detail::awaitable_result_t<A0>,
611 : detail::awaitable_result_t<As>...>>
612 : {
613 : using result_type = detail::when_any_result_t<
614 : detail::awaitable_result_t<A0>,
615 : detail::awaitable_result_t<As>...>;
616 :
617 : detail::when_any_state<
618 : detail::awaitable_result_t<A0>,
619 : detail::awaitable_result_t<As>...> state;
620 : std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
621 :
622 : co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
623 :
624 : if(state.core_.winner_exception_)
625 : std::rethrow_exception(state.core_.winner_exception_);
626 :
627 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
628 74 : }
629 :
630 : /** Concept for ranges of full I/O awaitables.
631 :
632 : A range satisfies `IoAwaitableRange` if it is a sized input range
633 : whose value type satisfies @ref IoAwaitable. This enables when_any
634 : to accept any container or view of awaitables, not just std::vector.
635 :
636 : @tparam R The range type.
637 :
638 : @par Requirements
639 : @li `R` must satisfy `std::ranges::input_range`
640 : @li `R` must satisfy `std::ranges::sized_range`
641 : @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
642 :
643 : @par Syntactic Requirements
644 : Given `r` of type `R`:
645 : @li `std::ranges::begin(r)` is valid
646 : @li `std::ranges::end(r)` is valid
647 : @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
648 : @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
649 :
650 : @par Example
651 : @code
652 : template<IoAwaitableRange R>
653 : task<void> race_all(R&& awaitables) {
654 : auto winner = co_await when_any(std::forward<R>(awaitables));
655 : // Process winner...
656 : }
657 : @endcode
658 :
659 : @see when_any, IoAwaitable
660 : */
661 : template<typename R>
662 : concept IoAwaitableRange =
663 : std::ranges::input_range<R> &&
664 : std::ranges::sized_range<R> &&
665 : IoAwaitable<std::ranges::range_value_t<R>>;
666 :
667 : namespace detail {
668 :
669 : /** Shared state for homogeneous when_any (range overload).
670 :
671 : Uses composition with when_any_core for shared functionality.
672 : Simpler than heterogeneous: optional<T> instead of variant, vector
673 : instead of array for runner handles.
674 : */
675 : template<typename T>
676 : struct when_any_homogeneous_state
677 : {
678 : when_any_core core_;
679 : std::optional<T> result_;
680 : std::vector<coro> runner_handles_;
681 :
682 13 : explicit when_any_homogeneous_state(std::size_t count)
683 13 : : core_(count)
684 26 : , runner_handles_(count)
685 : {
686 13 : }
687 :
688 : // Runners self-destruct in final_suspend. No destruction needed here.
689 :
690 : /** @pre core_.try_win() returned true. */
691 11 : void set_winner_result(T value)
692 : noexcept(std::is_nothrow_move_constructible_v<T>)
693 : {
694 11 : result_.emplace(std::move(value));
695 11 : }
696 : };
697 :
698 : /** Specialization for void tasks (no result storage needed). */
699 : template<>
700 : struct when_any_homogeneous_state<void>
701 : {
702 : when_any_core core_;
703 : std::vector<coro> runner_handles_;
704 :
705 2 : explicit when_any_homogeneous_state(std::size_t count)
706 2 : : core_(count)
707 4 : , runner_handles_(count)
708 : {
709 2 : }
710 :
711 : // Runners self-destruct in final_suspend. No destruction needed here.
712 :
713 : // No set_winner_result - void tasks have no result to store
714 : };
715 :
716 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
717 : template<IoAwaitableRange Range>
718 : class when_any_homogeneous_launcher
719 : {
720 : using Awaitable = std::ranges::range_value_t<Range>;
721 : using T = awaitable_result_t<Awaitable>;
722 :
723 : Range* range_;
724 : when_any_homogeneous_state<T>* state_;
725 :
726 : public:
727 15 : when_any_homogeneous_launcher(
728 : Range* range,
729 : when_any_homogeneous_state<T>* state)
730 15 : : range_(range)
731 15 : , state_(state)
732 : {
733 15 : }
734 :
735 15 : bool await_ready() const noexcept
736 : {
737 15 : return std::ranges::empty(*range_);
738 : }
739 :
740 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
741 : destroys this object before await_suspend returns. Must not reference
742 : `this` after dispatching begins.
743 :
744 : Two-phase approach:
745 : 1. Create all runners (safe - no dispatch yet)
746 : 2. Dispatch all runners (any may complete synchronously)
747 : */
748 : template<Executor Ex>
749 15 : coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
750 : {
751 15 : state_->core_.continuation_ = continuation;
752 15 : state_->core_.caller_ex_ = caller_ex;
753 :
754 15 : if(parent_token.stop_possible())
755 : {
756 10 : state_->core_.parent_stop_callback_.emplace(
757 : parent_token,
758 5 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
759 :
760 5 : if(parent_token.stop_requested())
761 2 : state_->core_.stop_source_.request_stop();
762 : }
763 :
764 15 : auto token = state_->core_.stop_source_.get_token();
765 :
766 : // Phase 1: Create all runners without dispatching.
767 : // This iterates over *range_ safely because no runners execute yet.
768 15 : std::size_t index = 0;
769 67 : for(auto&& a : *range_)
770 : {
771 52 : auto runner = make_when_any_runner(
772 52 : std::move(a), state_, index);
773 :
774 52 : auto h = runner.release();
775 52 : h.promise().state_ = state_;
776 52 : h.promise().index_ = index;
777 52 : h.promise().ex_ = caller_ex;
778 52 : h.promise().stop_token_ = token;
779 :
780 52 : state_->runner_handles_[index] = coro{h};
781 52 : ++index;
782 : }
783 :
784 : // Phase 2: Dispatch all runners. Any may complete synchronously.
785 : // After last dispatch, state_ and this may be destroyed.
786 : // Use raw pointer/count captured before dispatching.
787 15 : coro* handles = state_->runner_handles_.data();
788 15 : std::size_t count = state_->runner_handles_.size();
789 67 : for(std::size_t i = 0; i < count; ++i)
790 52 : caller_ex.dispatch(handles[i]);
791 :
792 30 : return std::noop_coroutine();
793 15 : }
794 :
795 15 : void await_resume() const noexcept
796 : {
797 15 : }
798 : };
799 :
800 : } // namespace detail
801 :
802 : /** Wait for the first awaitable to complete (range overload).
803 :
804 : Races a range of awaitables with the same result type. Accepts any
805 : sized input range of IoAwaitable types, enabling use with arrays,
806 : spans, or custom containers.
807 :
808 : @par Suspends
809 : The calling coroutine suspends when co_await is invoked. All awaitables
810 : in the range are launched concurrently and execute in parallel. The
811 : coroutine resumes only after all awaitables have completed, even though
812 : the winner is determined by the first to finish.
813 :
814 : @par Completion Conditions
815 : @li Winner is determined when the first awaitable completes (success or exception)
816 : @li Only one task can claim winner status via atomic compare-exchange
817 : @li Once a winner exists, stop is requested for all remaining siblings
818 : @li Parent coroutine resumes only after all siblings acknowledge completion
819 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
820 :
821 : @par Cancellation Semantics
822 : Cancellation is supported via stop_token propagated through the
823 : IoAwaitable protocol:
824 : @li Each child awaitable receives a stop_token derived from a shared stop_source
825 : @li When the parent's stop token is activated, the stop is forwarded to all children
826 : @li When a winner is determined, stop_source_.request_stop() is called immediately
827 : @li Siblings must handle cancellation gracefully and complete before parent resumes
828 : @li Stop requests are cooperative; tasks must check and respond to them
829 :
830 : @par Concurrency/Overlap
831 : All awaitables are launched concurrently before any can complete.
832 : The launcher iterates through the range, starting each task on the
833 : caller's executor. Tasks may execute in parallel on multi-threaded
834 : executors or interleave on single-threaded executors. There is no
835 : guaranteed ordering of task completion.
836 :
837 : @par Notable Error Conditions
838 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
839 : @li Winner exception: if the winning task threw, that exception is rethrown
840 : @li Non-winner exceptions: silently discarded (only winner's result matters)
841 : @li Cancellation: tasks may complete via cancellation without throwing
842 :
843 : @par Example
844 : @code
845 : task<void> example() {
846 : std::array<task<Response>, 3> requests = {
847 : fetch_from_server(0),
848 : fetch_from_server(1),
849 : fetch_from_server(2)
850 : };
851 :
852 : auto [index, response] = co_await when_any(std::move(requests));
853 : }
854 : @endcode
855 :
856 : @par Example with Vector
857 : @code
858 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
859 : std::vector<task<Response>> requests;
860 : for (auto const& server : servers)
861 : requests.push_back(fetch_from(server));
862 :
863 : auto [index, response] = co_await when_any(std::move(requests));
864 : co_return response;
865 : }
866 : @endcode
867 :
868 : @tparam R Range type satisfying IoAwaitableRange.
869 : @param awaitables Range of awaitables to race concurrently (must not be empty).
870 : @return A task yielding a pair of (winner_index, result).
871 :
872 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
873 : @throws Rethrows the winner's exception if the winning task threw an exception.
874 :
875 : @par Remarks
876 : Elements are moved from the range; for lvalue ranges, the original
877 : container will have moved-from elements after this call. The range
878 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
879 : the variadic overload, no variant wrapper is needed since all tasks
880 : share the same return type.
881 :
882 : @see when_any, IoAwaitableRange
883 : */
884 : template<IoAwaitableRange R>
885 : requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
886 14 : [[nodiscard]] auto when_any(R&& awaitables)
887 : -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
888 : {
889 : using Awaitable = std::ranges::range_value_t<R>;
890 : using T = detail::awaitable_result_t<Awaitable>;
891 : using result_type = std::pair<std::size_t, T>;
892 : using OwnedRange = std::remove_cvref_t<R>;
893 :
894 : auto count = std::ranges::size(awaitables);
895 : if(count == 0)
896 : throw std::invalid_argument("when_any requires at least one awaitable");
897 :
898 : // Move/copy range onto coroutine frame to ensure lifetime
899 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
900 :
901 : detail::when_any_homogeneous_state<T> state(count);
902 :
903 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
904 :
905 : if(state.core_.winner_exception_)
906 : std::rethrow_exception(state.core_.winner_exception_);
907 :
908 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
909 28 : }
910 :
911 : /** Wait for the first awaitable to complete (void range overload).
912 :
913 : Races a range of void-returning awaitables. Since void awaitables have
914 : no result value, only the winner's index is returned.
915 :
916 : @par Suspends
917 : The calling coroutine suspends when co_await is invoked. All awaitables
918 : in the range are launched concurrently and execute in parallel. The
919 : coroutine resumes only after all awaitables have completed, even though
920 : the winner is determined by the first to finish.
921 :
922 : @par Completion Conditions
923 : @li Winner is determined when the first awaitable completes (success or exception)
924 : @li Only one task can claim winner status via atomic compare-exchange
925 : @li Once a winner exists, stop is requested for all remaining siblings
926 : @li Parent coroutine resumes only after all siblings acknowledge completion
927 : @li The winner's index is returned; if the winner threw, the exception is rethrown
928 :
929 : @par Cancellation Semantics
930 : Cancellation is supported via stop_token propagated through the
931 : IoAwaitable protocol:
932 : @li Each child awaitable receives a stop_token derived from a shared stop_source
933 : @li When the parent's stop token is activated, the stop is forwarded to all children
934 : @li When a winner is determined, stop_source_.request_stop() is called immediately
935 : @li Siblings must handle cancellation gracefully and complete before parent resumes
936 : @li Stop requests are cooperative; tasks must check and respond to them
937 :
938 : @par Concurrency/Overlap
939 : All awaitables are launched concurrently before any can complete.
940 : The launcher iterates through the range, starting each task on the
941 : caller's executor. Tasks may execute in parallel on multi-threaded
942 : executors or interleave on single-threaded executors. There is no
943 : guaranteed ordering of task completion.
944 :
945 : @par Notable Error Conditions
946 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
947 : @li Winner exception: if the winning task threw, that exception is rethrown
948 : @li Non-winner exceptions: silently discarded (only winner's result matters)
949 : @li Cancellation: tasks may complete via cancellation without throwing
950 :
951 : @par Example
952 : @code
953 : task<void> example() {
954 : std::vector<task<void>> tasks;
955 : for (int i = 0; i < 5; ++i)
956 : tasks.push_back(background_work(i));
957 :
958 : std::size_t winner = co_await when_any(std::move(tasks));
959 : // winner is the index of the first task to complete
960 : }
961 : @endcode
962 :
963 : @par Example with Timeout
964 : @code
965 : task<void> with_timeout() {
966 : std::vector<task<void>> tasks;
967 : tasks.push_back(long_running_operation());
968 : tasks.push_back(delay(std::chrono::seconds(5)));
969 :
970 : std::size_t winner = co_await when_any(std::move(tasks));
971 : if (winner == 1) {
972 : // Timeout occurred
973 : }
974 : }
975 : @endcode
976 :
977 : @tparam R Range type satisfying IoAwaitableRange with void result.
978 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
979 : @return A task yielding the winner's index (zero-based).
980 :
981 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
982 : @throws Rethrows the winner's exception if the winning task threw an exception.
983 :
984 : @par Remarks
985 : Elements are moved from the range; for lvalue ranges, the original
986 : container will have moved-from elements after this call. The range
987 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
988 : the non-void overload, no result storage is needed since void tasks
989 : produce no value.
990 :
991 : @see when_any, IoAwaitableRange
992 : */
993 : template<IoAwaitableRange R>
994 : requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
995 2 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
996 : {
997 : using OwnedRange = std::remove_cvref_t<R>;
998 :
999 : auto count = std::ranges::size(awaitables);
1000 : if(count == 0)
1001 : throw std::invalid_argument("when_any requires at least one awaitable");
1002 :
1003 : // Move/copy range onto coroutine frame to ensure lifetime
1004 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
1005 :
1006 : detail::when_any_homogeneous_state<void> state(count);
1007 :
1008 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1009 :
1010 : if(state.core_.winner_exception_)
1011 : std::rethrow_exception(state.core_.winner_exception_);
1012 :
1013 : co_return state.core_.winner_index_;
1014 4 : }
1015 :
1016 : } // namespace capy
1017 : } // namespace boost
1018 :
1019 : #endif
|