1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_param.hpp>
16  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/read_source.hpp>
18  
#include <boost/capy/concept/read_source.hpp>
19  
#include <boost/capy/coro.hpp>
19  
#include <boost/capy/coro.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/task.hpp>
22  
#include <boost/capy/task.hpp>
23  

23  

24  
#include <concepts>
24  
#include <concepts>
25  
#include <coroutine>
25  
#include <coroutine>
26  
#include <cstddef>
26  
#include <cstddef>
27  
#include <new>
27  
#include <new>
28  
#include <span>
28  
#include <span>
29  
#include <stop_token>
29  
#include <stop_token>
30  
#include <system_error>
30  
#include <system_error>
31  
#include <utility>
31  
#include <utility>
32  

32  

33  
namespace boost {
33  
namespace boost {
34  
namespace capy {
34  
namespace capy {
35  

35  

36  
/** Type-erased wrapper for any ReadSource.
36  
/** Type-erased wrapper for any ReadSource.
37  

37  

38  
    This class provides type erasure for any type satisfying the
38  
    This class provides type erasure for any type satisfying the
39  
    @ref ReadSource concept, enabling runtime polymorphism for
39  
    @ref ReadSource concept, enabling runtime polymorphism for
40  
    source read operations. It uses cached awaitable storage to achieve
40  
    source read operations. It uses cached awaitable storage to achieve
41  
    zero steady-state allocation after construction.
41  
    zero steady-state allocation after construction.
42  

42  

43  
    The wrapper supports two construction modes:
43  
    The wrapper supports two construction modes:
44  
    - **Owning**: Pass by value to transfer ownership. The wrapper
44  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
      allocates storage and owns the source.
45  
      allocates storage and owns the source.
46  
    - **Reference**: Pass a pointer to wrap without ownership. The
46  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
      pointed-to source must outlive this wrapper.
47  
      pointed-to source must outlive this wrapper.
48  

48  

49  
    @par Awaitable Preallocation
49  
    @par Awaitable Preallocation
50  
    The constructor preallocates storage for the type-erased awaitable.
50  
    The constructor preallocates storage for the type-erased awaitable.
51  
    This reserves all virtual address space at server startup
51  
    This reserves all virtual address space at server startup
52  
    so memory usage can be measured up front, rather than
52  
    so memory usage can be measured up front, rather than
53  
    allocating piecemeal as traffic arrives.
53  
    allocating piecemeal as traffic arrives.
54  

54  

55  
    @par Thread Safety
55  
    @par Thread Safety
56  
    Not thread-safe. Concurrent operations on the same wrapper
56  
    Not thread-safe. Concurrent operations on the same wrapper
57  
    are undefined behavior.
57  
    are undefined behavior.
58  

58  

59  
    @par Example
59  
    @par Example
60  
    @code
60  
    @code
61  
    // Owning - takes ownership of the source
61  
    // Owning - takes ownership of the source
62  
    any_read_source rs(some_source{args...});
62  
    any_read_source rs(some_source{args...});
63  

63  

64  
    // Reference - wraps without ownership
64  
    // Reference - wraps without ownership
65  
    some_source source;
65  
    some_source source;
66  
    any_read_source rs(&source);
66  
    any_read_source rs(&source);
67  

67  

68  
    mutable_buffer buf(data, size);
68  
    mutable_buffer buf(data, size);
69  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
69  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
70  
    @endcode
70  
    @endcode
71  

71  

72  
    @see any_read_stream, ReadSource
72  
    @see any_read_stream, ReadSource
73  
*/
73  
*/
74  
class any_read_source
74  
class any_read_source
75  
{
75  
{
76  
    struct vtable;
76  
    struct vtable;
77  
    struct awaitable_ops;
77  
    struct awaitable_ops;
78  

78  

79  
    template<ReadSource S>
79  
    template<ReadSource S>
80  
    struct vtable_for_impl;
80  
    struct vtable_for_impl;
81  

81  

82  
    void* source_ = nullptr;
82  
    void* source_ = nullptr;
83  
    vtable const* vt_ = nullptr;
83  
    vtable const* vt_ = nullptr;
84  
    void* cached_awaitable_ = nullptr;
84  
    void* cached_awaitable_ = nullptr;
85  
    void* storage_ = nullptr;
85  
    void* storage_ = nullptr;
86  
    awaitable_ops const* active_ops_ = nullptr;
86  
    awaitable_ops const* active_ops_ = nullptr;
87  

87  

88  
public:
88  
public:
89  
    /** Destructor.
89  
    /** Destructor.
90  

90  

91  
        Destroys the owned source (if any) and releases the cached
91  
        Destroys the owned source (if any) and releases the cached
92  
        awaitable storage.
92  
        awaitable storage.
93  
    */
93  
    */
94  
    ~any_read_source();
94  
    ~any_read_source();
95  

95  

96  
    /** Default constructor.
96  
    /** Default constructor.
97  

97  

98  
        Constructs an empty wrapper. Operations on a default-constructed
98  
        Constructs an empty wrapper. Operations on a default-constructed
99  
        wrapper result in undefined behavior.
99  
        wrapper result in undefined behavior.
100  
    */
100  
    */
101  
    any_read_source() = default;
101  
    any_read_source() = default;
102  

102  

103  
    /** Non-copyable.
103  
    /** Non-copyable.
104  

104  

105  
        The awaitable cache is per-instance and cannot be shared.
105  
        The awaitable cache is per-instance and cannot be shared.
106  
    */
106  
    */
107  
    any_read_source(any_read_source const&) = delete;
107  
    any_read_source(any_read_source const&) = delete;
108  
    any_read_source& operator=(any_read_source const&) = delete;
108  
    any_read_source& operator=(any_read_source const&) = delete;
109  

109  

110  
    /** Move constructor.
110  
    /** Move constructor.
111  

111  

112  
        Transfers ownership of the wrapped source (if owned) and
112  
        Transfers ownership of the wrapped source (if owned) and
113  
        cached awaitable storage from `other`. After the move, `other` is
113  
        cached awaitable storage from `other`. After the move, `other` is
114  
        in a default-constructed state.
114  
        in a default-constructed state.
115  

115  

116  
        @param other The wrapper to move from.
116  
        @param other The wrapper to move from.
117  
    */
117  
    */
118  
    any_read_source(any_read_source&& other) noexcept
118  
    any_read_source(any_read_source&& other) noexcept
119  
        : source_(std::exchange(other.source_, nullptr))
119  
        : source_(std::exchange(other.source_, nullptr))
120  
        , vt_(std::exchange(other.vt_, nullptr))
120  
        , vt_(std::exchange(other.vt_, nullptr))
121  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
121  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
122  
        , storage_(std::exchange(other.storage_, nullptr))
122  
        , storage_(std::exchange(other.storage_, nullptr))
123  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
123  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
124  
    {
124  
    {
125  
    }
125  
    }
126  

126  

127  
    /** Move assignment operator.
127  
    /** Move assignment operator.
128  

128  

129  
        Destroys any owned source and releases existing resources,
129  
        Destroys any owned source and releases existing resources,
130  
        then transfers ownership from `other`.
130  
        then transfers ownership from `other`.
131  

131  

132  
        @param other The wrapper to move from.
132  
        @param other The wrapper to move from.
133  
        @return Reference to this wrapper.
133  
        @return Reference to this wrapper.
134  
    */
134  
    */
135  
    any_read_source&
135  
    any_read_source&
136  
    operator=(any_read_source&& other) noexcept;
136  
    operator=(any_read_source&& other) noexcept;
137  

137  

138  
    /** Construct by taking ownership of a ReadSource.
138  
    /** Construct by taking ownership of a ReadSource.
139  

139  

140  
        Allocates storage and moves the source into this wrapper.
140  
        Allocates storage and moves the source into this wrapper.
141  
        The wrapper owns the source and will destroy it.
141  
        The wrapper owns the source and will destroy it.
142  

142  

143  
        @param s The source to take ownership of.
143  
        @param s The source to take ownership of.
144  
    */
144  
    */
145  
    template<ReadSource S>
145  
    template<ReadSource S>
146  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
146  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
147  
    any_read_source(S s);
147  
    any_read_source(S s);
148  

148  

149  
    /** Construct by wrapping a ReadSource without ownership.
149  
    /** Construct by wrapping a ReadSource without ownership.
150  

150  

151  
        Wraps the given source by pointer. The source must remain
151  
        Wraps the given source by pointer. The source must remain
152  
        valid for the lifetime of this wrapper.
152  
        valid for the lifetime of this wrapper.
153  

153  

154  
        @param s Pointer to the source to wrap.
154  
        @param s Pointer to the source to wrap.
155  
    */
155  
    */
156  
    template<ReadSource S>
156  
    template<ReadSource S>
157  
    any_read_source(S* s);
157  
    any_read_source(S* s);
158  

158  

159  
    /** Check if the wrapper contains a valid source.
159  
    /** Check if the wrapper contains a valid source.
160  

160  

161  
        @return `true` if wrapping a source, `false` if default-constructed
161  
        @return `true` if wrapping a source, `false` if default-constructed
162  
            or moved-from.
162  
            or moved-from.
163  
    */
163  
    */
164  
    bool
164  
    bool
165  
    has_value() const noexcept
165  
    has_value() const noexcept
166  
    {
166  
    {
167  
        return source_ != nullptr;
167  
        return source_ != nullptr;
168  
    }
168  
    }
169  

169  

170  
    /** Check if the wrapper contains a valid source.
170  
    /** Check if the wrapper contains a valid source.
171  

171  

172  
        @return `true` if wrapping a source, `false` if default-constructed
172  
        @return `true` if wrapping a source, `false` if default-constructed
173  
            or moved-from.
173  
            or moved-from.
174  
    */
174  
    */
175  
    explicit
175  
    explicit
176  
    operator bool() const noexcept
176  
    operator bool() const noexcept
177  
    {
177  
    {
178  
        return has_value();
178  
        return has_value();
179  
    }
179  
    }
180  

180  

181  
    /** Initiate an asynchronous read operation.
181  
    /** Initiate an asynchronous read operation.
182  

182  

183  
        Reads data into the provided buffer sequence. The operation
183  
        Reads data into the provided buffer sequence. The operation
184  
        completes when the entire buffer sequence is filled, end-of-file
184  
        completes when the entire buffer sequence is filled, end-of-file
185  
        is reached, or an error occurs.
185  
        is reached, or an error occurs.
186  

186  

187  
        @param buffers The buffer sequence to read into. Passed by
187  
        @param buffers The buffer sequence to read into. Passed by
188  
            value to ensure the sequence lives in the coroutine frame
188  
            value to ensure the sequence lives in the coroutine frame
189  
            across suspension points.
189  
            across suspension points.
190  

190  

191  
        @return An awaitable yielding `(error_code,std::size_t)`.
191  
        @return An awaitable yielding `(error_code,std::size_t)`.
192  

192  

193  
        @par Postconditions
193  
        @par Postconditions
194  
        Exactly one of the following is true on return:
194  
        Exactly one of the following is true on return:
195  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
195  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
196  
            The entire buffer was filled.
196  
            The entire buffer was filled.
197  
        @li **End-of-stream or Error**: `ec` and `n` indicates
197  
        @li **End-of-stream or Error**: `ec` and `n` indicates
198  
            the number of bytes transferred before the failure.
198  
            the number of bytes transferred before the failure.
199  

199  

200  
        @par Preconditions
200  
        @par Preconditions
201  
        The wrapper must contain a valid source (`has_value() == true`).
201  
        The wrapper must contain a valid source (`has_value() == true`).
202  
    */
202  
    */
203  
    template<MutableBufferSequence MB>
203  
    template<MutableBufferSequence MB>
204  
    task<io_result<std::size_t>>
204  
    task<io_result<std::size_t>>
205  
    read(MB buffers);
205  
    read(MB buffers);
206  

206  

207  
protected:
207  
protected:
208  
    /** Rebind to a new source after move.
208  
    /** Rebind to a new source after move.
209  

209  

210  
        Updates the internal pointer to reference a new source object.
210  
        Updates the internal pointer to reference a new source object.
211  
        Used by owning wrappers after move assignment when the owned
211  
        Used by owning wrappers after move assignment when the owned
212  
        object has moved to a new location.
212  
        object has moved to a new location.
213  

213  

214  
        @param new_source The new source to bind to. Must be the same
214  
        @param new_source The new source to bind to. Must be the same
215  
            type as the original source.
215  
            type as the original source.
216  

216  

217  
        @note Terminates if called with a source of different type
217  
        @note Terminates if called with a source of different type
218  
            than the original.
218  
            than the original.
219  
    */
219  
    */
220  
    template<ReadSource S>
220  
    template<ReadSource S>
221  
    void
221  
    void
222  
    rebind(S& new_source) noexcept
222  
    rebind(S& new_source) noexcept
223  
    {
223  
    {
224  
        if(vt_ != &vtable_for_impl<S>::value)
224  
        if(vt_ != &vtable_for_impl<S>::value)
225  
            std::terminate();
225  
            std::terminate();
226  
        source_ = &new_source;
226  
        source_ = &new_source;
227  
    }
227  
    }
228  

228  

229  
private:
229  
private:
230  
    auto
230  
    auto
231  
    read_some_(std::span<mutable_buffer const> buffers);
231  
    read_some_(std::span<mutable_buffer const> buffers);
232  
};
232  
};
233  

233  

234  
//----------------------------------------------------------
234  
//----------------------------------------------------------
235  

235  

236  
struct any_read_source::awaitable_ops
236  
struct any_read_source::awaitable_ops
237  
{
237  
{
238  
    bool (*await_ready)(void*);
238  
    bool (*await_ready)(void*);
239  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
239  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
240  
    io_result<std::size_t> (*await_resume)(void*);
240  
    io_result<std::size_t> (*await_resume)(void*);
241  
    void (*destroy)(void*) noexcept;
241  
    void (*destroy)(void*) noexcept;
242  
};
242  
};
243  

243  

244  
struct any_read_source::vtable
244  
struct any_read_source::vtable
245  
{
245  
{
246  
    void (*destroy)(void*) noexcept;
246  
    void (*destroy)(void*) noexcept;
247  
    std::size_t awaitable_size;
247  
    std::size_t awaitable_size;
248  
    std::size_t awaitable_align;
248  
    std::size_t awaitable_align;
249  
    awaitable_ops const* (*construct_awaitable)(
249  
    awaitable_ops const* (*construct_awaitable)(
250  
        void* source,
250  
        void* source,
251  
        void* storage,
251  
        void* storage,
252  
        std::span<mutable_buffer const> buffers);
252  
        std::span<mutable_buffer const> buffers);
253  
};
253  
};
254  

254  

255  
template<ReadSource S>
255  
template<ReadSource S>
256  
struct any_read_source::vtable_for_impl
256  
struct any_read_source::vtable_for_impl
257  
{
257  
{
258  
    using Awaitable = decltype(std::declval<S&>().read(
258  
    using Awaitable = decltype(std::declval<S&>().read(
259  
        std::span<mutable_buffer const>{}));
259  
        std::span<mutable_buffer const>{}));
260  

260  

261  
    static void
261  
    static void
262  
    do_destroy_impl(void* source) noexcept
262  
    do_destroy_impl(void* source) noexcept
263  
    {
263  
    {
264  
        static_cast<S*>(source)->~S();
264  
        static_cast<S*>(source)->~S();
265  
    }
265  
    }
266  

266  

267  
    static awaitable_ops const*
267  
    static awaitable_ops const*
268  
    construct_awaitable_impl(
268  
    construct_awaitable_impl(
269  
        void* source,
269  
        void* source,
270  
        void* storage,
270  
        void* storage,
271  
        std::span<mutable_buffer const> buffers)
271  
        std::span<mutable_buffer const> buffers)
272  
    {
272  
    {
273  
        auto& s = *static_cast<S*>(source);
273  
        auto& s = *static_cast<S*>(source);
274  
        ::new(storage) Awaitable(s.read(buffers));
274  
        ::new(storage) Awaitable(s.read(buffers));
275  

275  

276  
        static constexpr awaitable_ops ops = {
276  
        static constexpr awaitable_ops ops = {
277  
            +[](void* p) {
277  
            +[](void* p) {
278  
                return static_cast<Awaitable*>(p)->await_ready();
278  
                return static_cast<Awaitable*>(p)->await_ready();
279  
            },
279  
            },
280  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
280  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
281  
                return detail::call_await_suspend(
281  
                return detail::call_await_suspend(
282  
                    static_cast<Awaitable*>(p), h, ex, token);
282  
                    static_cast<Awaitable*>(p), h, ex, token);
283  
            },
283  
            },
284  
            +[](void* p) {
284  
            +[](void* p) {
285  
                return static_cast<Awaitable*>(p)->await_resume();
285  
                return static_cast<Awaitable*>(p)->await_resume();
286  
            },
286  
            },
287  
            +[](void* p) noexcept {
287  
            +[](void* p) noexcept {
288  
                static_cast<Awaitable*>(p)->~Awaitable();
288  
                static_cast<Awaitable*>(p)->~Awaitable();
289  
            }
289  
            }
290  
        };
290  
        };
291  
        return &ops;
291  
        return &ops;
292  
    }
292  
    }
293  

293  

294  
    static constexpr vtable value = {
294  
    static constexpr vtable value = {
295  
        &do_destroy_impl,
295  
        &do_destroy_impl,
296  
        sizeof(Awaitable),
296  
        sizeof(Awaitable),
297  
        alignof(Awaitable),
297  
        alignof(Awaitable),
298  
        &construct_awaitable_impl
298  
        &construct_awaitable_impl
299  
    };
299  
    };
300  
};
300  
};
301  

301  

302  
//----------------------------------------------------------
302  
//----------------------------------------------------------
303  

303  

304  
inline
304  
inline
305  
any_read_source::~any_read_source()
305  
any_read_source::~any_read_source()
306  
{
306  
{
307  
    if(storage_)
307  
    if(storage_)
308  
    {
308  
    {
309  
        vt_->destroy(source_);
309  
        vt_->destroy(source_);
310  
        ::operator delete(storage_);
310  
        ::operator delete(storage_);
311  
    }
311  
    }
312  
    if(cached_awaitable_)
312  
    if(cached_awaitable_)
313  
        ::operator delete(cached_awaitable_);
313  
        ::operator delete(cached_awaitable_);
314  
}
314  
}
315  

315  

316  
inline any_read_source&
316  
inline any_read_source&
317  
any_read_source::operator=(any_read_source&& other) noexcept
317  
any_read_source::operator=(any_read_source&& other) noexcept
318  
{
318  
{
319  
    if(this != &other)
319  
    if(this != &other)
320  
    {
320  
    {
321  
        if(storage_)
321  
        if(storage_)
322  
        {
322  
        {
323  
            vt_->destroy(source_);
323  
            vt_->destroy(source_);
324  
            ::operator delete(storage_);
324  
            ::operator delete(storage_);
325  
        }
325  
        }
326  
        if(cached_awaitable_)
326  
        if(cached_awaitable_)
327  
            ::operator delete(cached_awaitable_);
327  
            ::operator delete(cached_awaitable_);
328  
        source_ = std::exchange(other.source_, nullptr);
328  
        source_ = std::exchange(other.source_, nullptr);
329  
        vt_ = std::exchange(other.vt_, nullptr);
329  
        vt_ = std::exchange(other.vt_, nullptr);
330  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
330  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
331  
        storage_ = std::exchange(other.storage_, nullptr);
331  
        storage_ = std::exchange(other.storage_, nullptr);
332  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
332  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
333  
    }
333  
    }
334  
    return *this;
334  
    return *this;
335  
}
335  
}
336  

336  

337  
template<ReadSource S>
337  
template<ReadSource S>
338  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
338  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
339  
any_read_source::any_read_source(S s)
339  
any_read_source::any_read_source(S s)
340  
    : vt_(&vtable_for_impl<S>::value)
340  
    : vt_(&vtable_for_impl<S>::value)
341  
{
341  
{
342  
    struct guard {
342  
    struct guard {
343  
        any_read_source* self;
343  
        any_read_source* self;
344  
        bool committed = false;
344  
        bool committed = false;
345  
        ~guard() {
345  
        ~guard() {
346  
            if(!committed && self->storage_) {
346  
            if(!committed && self->storage_) {
347  
                self->vt_->destroy(self->source_);
347  
                self->vt_->destroy(self->source_);
348  
                ::operator delete(self->storage_);
348  
                ::operator delete(self->storage_);
349  
                self->storage_ = nullptr;
349  
                self->storage_ = nullptr;
350  
                self->source_ = nullptr;
350  
                self->source_ = nullptr;
351  
            }
351  
            }
352  
        }
352  
        }
353  
    } g{this};
353  
    } g{this};
354  

354  

355  
    storage_ = ::operator new(sizeof(S));
355  
    storage_ = ::operator new(sizeof(S));
356  
    source_ = ::new(storage_) S(std::move(s));
356  
    source_ = ::new(storage_) S(std::move(s));
357  

357  

358  
    // Preallocate the awaitable storage
358  
    // Preallocate the awaitable storage
359  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
359  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
360  

360  

361  
    g.committed = true;
361  
    g.committed = true;
362  
}
362  
}
363  

363  

364  
template<ReadSource S>
364  
template<ReadSource S>
365  
any_read_source::any_read_source(S* s)
365  
any_read_source::any_read_source(S* s)
366  
    : source_(s)
366  
    : source_(s)
367  
    , vt_(&vtable_for_impl<S>::value)
367  
    , vt_(&vtable_for_impl<S>::value)
368  
{
368  
{
369  
    // Preallocate the awaitable storage
369  
    // Preallocate the awaitable storage
370  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
370  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
371  
}
371  
}
372  

372  

373  
//----------------------------------------------------------
373  
//----------------------------------------------------------
374  

374  

375  
inline auto
375  
inline auto
376  
any_read_source::read_some_(std::span<mutable_buffer const> buffers)
376  
any_read_source::read_some_(std::span<mutable_buffer const> buffers)
377  
{
377  
{
378  
    struct awaitable
378  
    struct awaitable
379  
    {
379  
    {
380  
        any_read_source* self_;
380  
        any_read_source* self_;
381  
        std::span<mutable_buffer const> buffers_;
381  
        std::span<mutable_buffer const> buffers_;
382  

382  

383  
        bool
383  
        bool
384  
        await_ready() const noexcept
384  
        await_ready() const noexcept
385  
        {
385  
        {
386  
            return false;
386  
            return false;
387  
        }
387  
        }
388  

388  

389  
        coro
389  
        coro
390  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
390  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
391  
        {
391  
        {
392  
            // Construct the underlying awaitable into cached storage
392  
            // Construct the underlying awaitable into cached storage
393  
            self_->active_ops_ = self_->vt_->construct_awaitable(
393  
            self_->active_ops_ = self_->vt_->construct_awaitable(
394  
                self_->source_,
394  
                self_->source_,
395  
                self_->cached_awaitable_,
395  
                self_->cached_awaitable_,
396  
                buffers_);
396  
                buffers_);
397  

397  

398  
            // Check if underlying is immediately ready
398  
            // Check if underlying is immediately ready
399  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
399  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
400  
                return h;
400  
                return h;
401  

401  

402  
            // Forward to underlying awaitable
402  
            // Forward to underlying awaitable
403  
            return self_->active_ops_->await_suspend(
403  
            return self_->active_ops_->await_suspend(
404  
                self_->cached_awaitable_, h, ex, token);
404  
                self_->cached_awaitable_, h, ex, token);
405  
        }
405  
        }
406  

406  

407  
        io_result<std::size_t>
407  
        io_result<std::size_t>
408  
        await_resume()
408  
        await_resume()
409  
        {
409  
        {
410  
            struct guard {
410  
            struct guard {
411  
                any_read_source* self;
411  
                any_read_source* self;
412  
                ~guard() {
412  
                ~guard() {
413  
                    self->active_ops_->destroy(self->cached_awaitable_);
413  
                    self->active_ops_->destroy(self->cached_awaitable_);
414  
                    self->active_ops_ = nullptr;
414  
                    self->active_ops_ = nullptr;
415  
                }
415  
                }
416  
            } g{self_};
416  
            } g{self_};
417  
            return self_->active_ops_->await_resume(
417  
            return self_->active_ops_->await_resume(
418  
                self_->cached_awaitable_);
418  
                self_->cached_awaitable_);
419  
        }
419  
        }
420  
    };
420  
    };
421  
    return awaitable{this, buffers};
421  
    return awaitable{this, buffers};
422  
}
422  
}
423  

423  

424  
template<MutableBufferSequence MB>
424  
template<MutableBufferSequence MB>
425  
task<io_result<std::size_t>>
425  
task<io_result<std::size_t>>
426  
any_read_source::read(MB buffers)
426  
any_read_source::read(MB buffers)
427  
{
427  
{
428  
    buffer_param<MB> bp(std::move(buffers));
428  
    buffer_param<MB> bp(std::move(buffers));
429  
    std::size_t total = 0;
429  
    std::size_t total = 0;
430  

430  

431  
    for(;;)
431  
    for(;;)
432  
    {
432  
    {
433  
        auto bufs = bp.data();
433  
        auto bufs = bp.data();
434  
        if(bufs.empty())
434  
        if(bufs.empty())
435  
            break;
435  
            break;
436  

436  

437  
        auto [ec, n] = co_await read_some_(bufs);
437  
        auto [ec, n] = co_await read_some_(bufs);
438  
        total += n;
438  
        total += n;
439  
        if(ec)
439  
        if(ec)
440  
            co_return {ec, total};
440  
            co_return {ec, total};
441  
        bp.consume(n);
441  
        bp.consume(n);
442  
    }
442  
    }
443  

443  

444  
    co_return {{}, total};
444  
    co_return {{}, total};
445  
}
445  
}
446  

446  

447  
} // namespace capy
447  
} // namespace capy
448  
} // namespace boost
448  
} // namespace boost
449  

449  

450  
#endif
450  
#endif