1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/slice.hpp>
17  
#include <boost/capy/buffers/slice.hpp>
18  
#include <boost/capy/concept/buffer_source.hpp>
18  
#include <boost/capy/concept/buffer_source.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/read_source.hpp>
20  
#include <boost/capy/concept/read_source.hpp>
21  
#include <boost/capy/coro.hpp>
21  
#include <boost/capy/coro.hpp>
22  
#include <boost/capy/error.hpp>
22  
#include <boost/capy/error.hpp>
23  
#include <boost/capy/ex/executor_ref.hpp>
23  
#include <boost/capy/ex/executor_ref.hpp>
24  
#include <boost/capy/io_result.hpp>
24  
#include <boost/capy/io_result.hpp>
25  
#include <boost/capy/task.hpp>
25  
#include <boost/capy/task.hpp>
26  

26  

27  
#include <concepts>
27  
#include <concepts>
28  
#include <coroutine>
28  
#include <coroutine>
29  
#include <cstddef>
29  
#include <cstddef>
30  
#include <exception>
30  
#include <exception>
31  
#include <new>
31  
#include <new>
32  
#include <span>
32  
#include <span>
33  
#include <stop_token>
33  
#include <stop_token>
34  
#include <system_error>
34  
#include <system_error>
35  
#include <utility>
35  
#include <utility>
36  

36  

37  
namespace boost {
37  
namespace boost {
38  
namespace capy {
38  
namespace capy {
39  

39  

40  
/** Type-erased wrapper for any BufferSource.
40  
/** Type-erased wrapper for any BufferSource.
41  

41  

42  
    This class provides type erasure for any type satisfying the
42  
    This class provides type erasure for any type satisfying the
43  
    @ref BufferSource concept, enabling runtime polymorphism for
43  
    @ref BufferSource concept, enabling runtime polymorphism for
44  
    buffer pull operations. The wrapper also satisfies @ref ReadSource,
44  
    buffer pull operations. The wrapper also satisfies @ref ReadSource,
45  
    allowing it to be used with code expecting either interface.
45  
    allowing it to be used with code expecting either interface.
46  
    It uses cached awaitable storage to achieve zero steady-state
46  
    It uses cached awaitable storage to achieve zero steady-state
47  
    allocation after construction.
47  
    allocation after construction.
48  

48  

49  
    The wrapper also satisfies @ref ReadSource through the templated
49  
    The wrapper also satisfies @ref ReadSource through the templated
50  
    @ref read method. This method copies data from the source's
50  
    @ref read method. This method copies data from the source's
51  
    internal buffers into the caller's buffers, incurring one extra
51  
    internal buffers into the caller's buffers, incurring one extra
52  
    buffer copy compared to using @ref pull and @ref consume directly.
52  
    buffer copy compared to using @ref pull and @ref consume directly.
53  

53  

54  
    The wrapper supports two construction modes:
54  
    The wrapper supports two construction modes:
55  
    - **Owning**: Pass by value to transfer ownership. The wrapper
55  
    - **Owning**: Pass by value to transfer ownership. The wrapper
56  
      allocates storage and owns the source.
56  
      allocates storage and owns the source.
57  
    - **Reference**: Pass a pointer to wrap without ownership. The
57  
    - **Reference**: Pass a pointer to wrap without ownership. The
58  
      pointed-to source must outlive this wrapper.
58  
      pointed-to source must outlive this wrapper.
59  

59  

60  
    @par Awaitable Preallocation
60  
    @par Awaitable Preallocation
61  
    The constructor preallocates storage for the type-erased awaitable.
61  
    The constructor preallocates storage for the type-erased awaitable.
62  
    This reserves all virtual address space at server startup
62  
    This reserves all virtual address space at server startup
63  
    so memory usage can be measured up front, rather than
63  
    so memory usage can be measured up front, rather than
64  
    allocating piecemeal as traffic arrives.
64  
    allocating piecemeal as traffic arrives.
65  

65  

66  
    @par Thread Safety
66  
    @par Thread Safety
67  
    Not thread-safe. Concurrent operations on the same wrapper
67  
    Not thread-safe. Concurrent operations on the same wrapper
68  
    are undefined behavior.
68  
    are undefined behavior.
69  

69  

70  
    @par Example
70  
    @par Example
71  
    @code
71  
    @code
72  
    // Owning - takes ownership of the source
72  
    // Owning - takes ownership of the source
73  
    any_buffer_source abs(some_buffer_source{args...});
73  
    any_buffer_source abs(some_buffer_source{args...});
74  

74  

75  
    // Reference - wraps without ownership
75  
    // Reference - wraps without ownership
76  
    some_buffer_source src;
76  
    some_buffer_source src;
77  
    any_buffer_source abs(&src);
77  
    any_buffer_source abs(&src);
78  

78  

79  
    const_buffer arr[16];
79  
    const_buffer arr[16];
80  
    auto [ec, bufs] = co_await abs.pull(arr);
80  
    auto [ec, bufs] = co_await abs.pull(arr);
81  
    @endcode
81  
    @endcode
82  

82  

83  
    @see any_buffer_sink, BufferSource, ReadSource
83  
    @see any_buffer_sink, BufferSource, ReadSource
84  
*/
84  
*/
85  
class any_buffer_source
85  
class any_buffer_source
86  
{
86  
{
87  
    struct vtable;
87  
    struct vtable;
88  
    struct awaitable_ops;
88  
    struct awaitable_ops;
89  

89  

90  
    template<BufferSource S>
90  
    template<BufferSource S>
91  
    struct vtable_for_impl;
91  
    struct vtable_for_impl;
92  

92  

93  
    void* source_ = nullptr;
93  
    void* source_ = nullptr;
94  
    vtable const* vt_ = nullptr;
94  
    vtable const* vt_ = nullptr;
95  
    void* cached_awaitable_ = nullptr;
95  
    void* cached_awaitable_ = nullptr;
96  
    void* storage_ = nullptr;
96  
    void* storage_ = nullptr;
97  
    awaitable_ops const* active_ops_ = nullptr;
97  
    awaitable_ops const* active_ops_ = nullptr;
98  

98  

99  
public:
99  
public:
100  
    /** Destructor.
100  
    /** Destructor.
101  

101  

102  
        Destroys the owned source (if any) and releases the cached
102  
        Destroys the owned source (if any) and releases the cached
103  
        awaitable storage.
103  
        awaitable storage.
104  
    */
104  
    */
105  
    ~any_buffer_source();
105  
    ~any_buffer_source();
106  

106  

107  
    /** Default constructor.
107  
    /** Default constructor.
108  

108  

109  
        Constructs an empty wrapper. Operations on a default-constructed
109  
        Constructs an empty wrapper. Operations on a default-constructed
110  
        wrapper result in undefined behavior.
110  
        wrapper result in undefined behavior.
111  
    */
111  
    */
112  
    any_buffer_source() = default;
112  
    any_buffer_source() = default;
113  

113  

114  
    /** Non-copyable.
114  
    /** Non-copyable.
115  

115  

116  
        The awaitable cache is per-instance and cannot be shared.
116  
        The awaitable cache is per-instance and cannot be shared.
117  
    */
117  
    */
118  
    any_buffer_source(any_buffer_source const&) = delete;
118  
    any_buffer_source(any_buffer_source const&) = delete;
119  
    any_buffer_source& operator=(any_buffer_source const&) = delete;
119  
    any_buffer_source& operator=(any_buffer_source const&) = delete;
120  

120  

121  
    /** Move constructor.
121  
    /** Move constructor.
122  

122  

123  
        Transfers ownership of the wrapped source (if owned) and
123  
        Transfers ownership of the wrapped source (if owned) and
124  
        cached awaitable storage from `other`. After the move, `other` is
124  
        cached awaitable storage from `other`. After the move, `other` is
125  
        in a default-constructed state.
125  
        in a default-constructed state.
126  

126  

127  
        @param other The wrapper to move from.
127  
        @param other The wrapper to move from.
128  
    */
128  
    */
129  
    any_buffer_source(any_buffer_source&& other) noexcept
129  
    any_buffer_source(any_buffer_source&& other) noexcept
130  
        : source_(std::exchange(other.source_, nullptr))
130  
        : source_(std::exchange(other.source_, nullptr))
131  
        , vt_(std::exchange(other.vt_, nullptr))
131  
        , vt_(std::exchange(other.vt_, nullptr))
132  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
133  
        , storage_(std::exchange(other.storage_, nullptr))
133  
        , storage_(std::exchange(other.storage_, nullptr))
134  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
134  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
135  
    {
135  
    {
136  
    }
136  
    }
137  

137  

138  
    /** Move assignment operator.
138  
    /** Move assignment operator.
139  

139  

140  
        Destroys any owned source and releases existing resources,
140  
        Destroys any owned source and releases existing resources,
141  
        then transfers ownership from `other`.
141  
        then transfers ownership from `other`.
142  

142  

143  
        @param other The wrapper to move from.
143  
        @param other The wrapper to move from.
144  
        @return Reference to this wrapper.
144  
        @return Reference to this wrapper.
145  
    */
145  
    */
146  
    any_buffer_source&
146  
    any_buffer_source&
147  
    operator=(any_buffer_source&& other) noexcept;
147  
    operator=(any_buffer_source&& other) noexcept;
148  

148  

149  
    /** Construct by taking ownership of a BufferSource.
149  
    /** Construct by taking ownership of a BufferSource.
150  

150  

151  
        Allocates storage and moves the source into this wrapper.
151  
        Allocates storage and moves the source into this wrapper.
152  
        The wrapper owns the source and will destroy it.
152  
        The wrapper owns the source and will destroy it.
153  

153  

154  
        @param s The source to take ownership of.
154  
        @param s The source to take ownership of.
155  
    */
155  
    */
156  
    template<BufferSource S>
156  
    template<BufferSource S>
157  
        requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
157  
        requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
158  
    any_buffer_source(S s);
158  
    any_buffer_source(S s);
159  

159  

160  
    /** Construct by wrapping a BufferSource without ownership.
160  
    /** Construct by wrapping a BufferSource without ownership.
161  

161  

162  
        Wraps the given source by pointer. The source must remain
162  
        Wraps the given source by pointer. The source must remain
163  
        valid for the lifetime of this wrapper.
163  
        valid for the lifetime of this wrapper.
164  

164  

165  
        @param s Pointer to the source to wrap.
165  
        @param s Pointer to the source to wrap.
166  
    */
166  
    */
167  
    template<BufferSource S>
167  
    template<BufferSource S>
168  
    any_buffer_source(S* s);
168  
    any_buffer_source(S* s);
169  

169  

170  
    /** Check if the wrapper contains a valid source.
170  
    /** Check if the wrapper contains a valid source.
171  

171  

172  
        @return `true` if wrapping a source, `false` if default-constructed
172  
        @return `true` if wrapping a source, `false` if default-constructed
173  
            or moved-from.
173  
            or moved-from.
174  
    */
174  
    */
175  
    bool
175  
    bool
176  
    has_value() const noexcept
176  
    has_value() const noexcept
177  
    {
177  
    {
178  
        return source_ != nullptr;
178  
        return source_ != nullptr;
179  
    }
179  
    }
180  

180  

181  
    /** Check if the wrapper contains a valid source.
181  
    /** Check if the wrapper contains a valid source.
182  

182  

183  
        @return `true` if wrapping a source, `false` if default-constructed
183  
        @return `true` if wrapping a source, `false` if default-constructed
184  
            or moved-from.
184  
            or moved-from.
185  
    */
185  
    */
186  
    explicit
186  
    explicit
187  
    operator bool() const noexcept
187  
    operator bool() const noexcept
188  
    {
188  
    {
189  
        return has_value();
189  
        return has_value();
190  
    }
190  
    }
191  

191  

192  
    /** Consume bytes from the source.
192  
    /** Consume bytes from the source.
193  

193  

194  
        Advances the internal read position of the underlying source
194  
        Advances the internal read position of the underlying source
195  
        by the specified number of bytes. The next call to @ref pull
195  
        by the specified number of bytes. The next call to @ref pull
196  
        returns data starting after the consumed bytes.
196  
        returns data starting after the consumed bytes.
197  

197  

198  
        @param n The number of bytes to consume. Must not exceed the
198  
        @param n The number of bytes to consume. Must not exceed the
199  
        total size of buffers returned by the previous @ref pull.
199  
        total size of buffers returned by the previous @ref pull.
200  

200  

201  
        @par Preconditions
201  
        @par Preconditions
202  
        The wrapper must contain a valid source (`has_value() == true`).
202  
        The wrapper must contain a valid source (`has_value() == true`).
203  
    */
203  
    */
204  
    void
204  
    void
205  
    consume(std::size_t n) noexcept;
205  
    consume(std::size_t n) noexcept;
206  

206  

207  
    /** Pull buffer data from the source.
207  
    /** Pull buffer data from the source.
208  

208  

209  
        Fills the provided span with buffer descriptors from the
209  
        Fills the provided span with buffer descriptors from the
210  
        underlying source. The operation completes when data is
210  
        underlying source. The operation completes when data is
211  
        available, the source is exhausted, or an error occurs.
211  
        available, the source is exhausted, or an error occurs.
212  

212  

213  
        @param dest Span of const_buffer to fill.
213  
        @param dest Span of const_buffer to fill.
214  

214  

215  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
215  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
216  
            On success with data, a non-empty span of filled buffers.
216  
            On success with data, a non-empty span of filled buffers.
217  
            On success with empty span, source is exhausted.
217  
            On success with empty span, source is exhausted.
218  

218  

219  
        @par Preconditions
219  
        @par Preconditions
220  
        The wrapper must contain a valid source (`has_value() == true`).
220  
        The wrapper must contain a valid source (`has_value() == true`).
221  
    */
221  
    */
222  
    auto
222  
    auto
223  
    pull(std::span<const_buffer> dest);
223  
    pull(std::span<const_buffer> dest);
224  

224  

225  
    /** Read data into a mutable buffer sequence.
225  
    /** Read data into a mutable buffer sequence.
226  

226  

227  
        Fills the provided buffer sequence by pulling data from the
227  
        Fills the provided buffer sequence by pulling data from the
228  
        underlying source and copying it into the caller's buffers.
228  
        underlying source and copying it into the caller's buffers.
229  
        This satisfies @ref ReadSource but incurs a copy; for zero-copy
229  
        This satisfies @ref ReadSource but incurs a copy; for zero-copy
230  
        access, use @ref pull and @ref consume instead.
230  
        access, use @ref pull and @ref consume instead.
231  

231  

232  
        @note This operation copies data from the source's internal
232  
        @note This operation copies data from the source's internal
233  
        buffers into the caller's buffers. For zero-copy reads,
233  
        buffers into the caller's buffers. For zero-copy reads,
234  
        use @ref pull and @ref consume directly.
234  
        use @ref pull and @ref consume directly.
235  

235  

236  
        @param buffers The buffer sequence to fill.
236  
        @param buffers The buffer sequence to fill.
237  

237  

238  
        @return An awaitable yielding `(error_code,std::size_t)`.
238  
        @return An awaitable yielding `(error_code,std::size_t)`.
239  
            On success, `n == buffer_size(buffers)`.
239  
            On success, `n == buffer_size(buffers)`.
240  
            On EOF, `ec == error::eof` and `n` is bytes transferred.
240  
            On EOF, `ec == error::eof` and `n` is bytes transferred.
241  

241  

242  
        @par Preconditions
242  
        @par Preconditions
243  
        The wrapper must contain a valid source (`has_value() == true`).
243  
        The wrapper must contain a valid source (`has_value() == true`).
244  

244  

245  
        @see pull, consume
245  
        @see pull, consume
246  
    */
246  
    */
247  
    template<MutableBufferSequence MB>
247  
    template<MutableBufferSequence MB>
248  
    task<io_result<std::size_t>>
248  
    task<io_result<std::size_t>>
249  
    read(MB buffers);
249  
    read(MB buffers);
250  

250  

251  
protected:
251  
protected:
252  
    /** Rebind to a new source after move.
252  
    /** Rebind to a new source after move.
253  

253  

254  
        Updates the internal pointer to reference a new source object.
254  
        Updates the internal pointer to reference a new source object.
255  
        Used by owning wrappers after move assignment when the owned
255  
        Used by owning wrappers after move assignment when the owned
256  
        object has moved to a new location.
256  
        object has moved to a new location.
257  

257  

258  
        @param new_source The new source to bind to. Must be the same
258  
        @param new_source The new source to bind to. Must be the same
259  
            type as the original source.
259  
            type as the original source.
260  

260  

261  
        @note Terminates if called with a source of different type
261  
        @note Terminates if called with a source of different type
262  
            than the original.
262  
            than the original.
263  
    */
263  
    */
264  
    template<BufferSource S>
264  
    template<BufferSource S>
265  
    void
265  
    void
266  
    rebind(S& new_source) noexcept
266  
    rebind(S& new_source) noexcept
267  
    {
267  
    {
268  
        if(vt_ != &vtable_for_impl<S>::value)
268  
        if(vt_ != &vtable_for_impl<S>::value)
269  
            std::terminate();
269  
            std::terminate();
270  
        source_ = &new_source;
270  
        source_ = &new_source;
271  
    }
271  
    }
272  
};
272  
};
273  

273  

274  
//----------------------------------------------------------
274  
//----------------------------------------------------------
275  

275  

276  
struct any_buffer_source::awaitable_ops
276  
struct any_buffer_source::awaitable_ops
277  
{
277  
{
278  
    bool (*await_ready)(void*);
278  
    bool (*await_ready)(void*);
279  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
279  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
280  
    io_result<std::span<const_buffer>> (*await_resume)(void*);
280  
    io_result<std::span<const_buffer>> (*await_resume)(void*);
281  
    void (*destroy)(void*) noexcept;
281  
    void (*destroy)(void*) noexcept;
282  
};
282  
};
283  

283  

284  
struct any_buffer_source::vtable
284  
struct any_buffer_source::vtable
285  
{
285  
{
286  
    void (*destroy)(void*) noexcept;
286  
    void (*destroy)(void*) noexcept;
287  
    void (*do_consume)(void* source, std::size_t n) noexcept;
287  
    void (*do_consume)(void* source, std::size_t n) noexcept;
288  
    std::size_t awaitable_size;
288  
    std::size_t awaitable_size;
289  
    std::size_t awaitable_align;
289  
    std::size_t awaitable_align;
290  
    awaitable_ops const* (*construct_awaitable)(
290  
    awaitable_ops const* (*construct_awaitable)(
291  
        void* source,
291  
        void* source,
292  
        void* storage,
292  
        void* storage,
293  
        std::span<const_buffer> dest);
293  
        std::span<const_buffer> dest);
294  
};
294  
};
295  

295  

296  
template<BufferSource S>
296  
template<BufferSource S>
297  
struct any_buffer_source::vtable_for_impl
297  
struct any_buffer_source::vtable_for_impl
298  
{
298  
{
299  
    using Awaitable = decltype(std::declval<S&>().pull(
299  
    using Awaitable = decltype(std::declval<S&>().pull(
300  
        std::declval<std::span<const_buffer>>()));
300  
        std::declval<std::span<const_buffer>>()));
301  

301  

302  
    static void
302  
    static void
303  
    do_destroy_impl(void* source) noexcept
303  
    do_destroy_impl(void* source) noexcept
304  
    {
304  
    {
305  
        static_cast<S*>(source)->~S();
305  
        static_cast<S*>(source)->~S();
306  
    }
306  
    }
307  

307  

308  
    static void
308  
    static void
309  
    do_consume_impl(void* source, std::size_t n) noexcept
309  
    do_consume_impl(void* source, std::size_t n) noexcept
310  
    {
310  
    {
311  
        static_cast<S*>(source)->consume(n);
311  
        static_cast<S*>(source)->consume(n);
312  
    }
312  
    }
313  

313  

314  
    static awaitable_ops const*
314  
    static awaitable_ops const*
315  
    construct_awaitable_impl(
315  
    construct_awaitable_impl(
316  
        void* source,
316  
        void* source,
317  
        void* storage,
317  
        void* storage,
318  
        std::span<const_buffer> dest)
318  
        std::span<const_buffer> dest)
319  
    {
319  
    {
320  
        auto& s = *static_cast<S*>(source);
320  
        auto& s = *static_cast<S*>(source);
321  
        ::new(storage) Awaitable(s.pull(dest));
321  
        ::new(storage) Awaitable(s.pull(dest));
322  

322  

323  
        static constexpr awaitable_ops ops = {
323  
        static constexpr awaitable_ops ops = {
324  
            +[](void* p) {
324  
            +[](void* p) {
325  
                return static_cast<Awaitable*>(p)->await_ready();
325  
                return static_cast<Awaitable*>(p)->await_ready();
326  
            },
326  
            },
327  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
327  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
328  
                return detail::call_await_suspend(
328  
                return detail::call_await_suspend(
329  
                    static_cast<Awaitable*>(p), h, ex, token);
329  
                    static_cast<Awaitable*>(p), h, ex, token);
330  
            },
330  
            },
331  
            +[](void* p) {
331  
            +[](void* p) {
332  
                return static_cast<Awaitable*>(p)->await_resume();
332  
                return static_cast<Awaitable*>(p)->await_resume();
333  
            },
333  
            },
334  
            +[](void* p) noexcept {
334  
            +[](void* p) noexcept {
335  
                static_cast<Awaitable*>(p)->~Awaitable();
335  
                static_cast<Awaitable*>(p)->~Awaitable();
336  
            }
336  
            }
337  
        };
337  
        };
338  
        return &ops;
338  
        return &ops;
339  
    }
339  
    }
340  

340  

341  
    static constexpr vtable value = {
341  
    static constexpr vtable value = {
342  
        &do_destroy_impl,
342  
        &do_destroy_impl,
343  
        &do_consume_impl,
343  
        &do_consume_impl,
344  
        sizeof(Awaitable),
344  
        sizeof(Awaitable),
345  
        alignof(Awaitable),
345  
        alignof(Awaitable),
346  
        &construct_awaitable_impl
346  
        &construct_awaitable_impl
347  
    };
347  
    };
348  
};
348  
};
349  

349  

350  
//----------------------------------------------------------
350  
//----------------------------------------------------------
351  

351  

352  
inline
352  
inline
353  
any_buffer_source::~any_buffer_source()
353  
any_buffer_source::~any_buffer_source()
354  
{
354  
{
355  
    if(storage_)
355  
    if(storage_)
356  
    {
356  
    {
357  
        vt_->destroy(source_);
357  
        vt_->destroy(source_);
358  
        ::operator delete(storage_);
358  
        ::operator delete(storage_);
359  
    }
359  
    }
360  
    if(cached_awaitable_)
360  
    if(cached_awaitable_)
361  
        ::operator delete(cached_awaitable_);
361  
        ::operator delete(cached_awaitable_);
362  
}
362  
}
363  

363  

364  
inline any_buffer_source&
364  
inline any_buffer_source&
365  
any_buffer_source::operator=(any_buffer_source&& other) noexcept
365  
any_buffer_source::operator=(any_buffer_source&& other) noexcept
366  
{
366  
{
367  
    if(this != &other)
367  
    if(this != &other)
368  
    {
368  
    {
369  
        if(storage_)
369  
        if(storage_)
370  
        {
370  
        {
371  
            vt_->destroy(source_);
371  
            vt_->destroy(source_);
372  
            ::operator delete(storage_);
372  
            ::operator delete(storage_);
373  
        }
373  
        }
374  
        if(cached_awaitable_)
374  
        if(cached_awaitable_)
375  
            ::operator delete(cached_awaitable_);
375  
            ::operator delete(cached_awaitable_);
376  
        source_ = std::exchange(other.source_, nullptr);
376  
        source_ = std::exchange(other.source_, nullptr);
377  
        vt_ = std::exchange(other.vt_, nullptr);
377  
        vt_ = std::exchange(other.vt_, nullptr);
378  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
378  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
379  
        storage_ = std::exchange(other.storage_, nullptr);
379  
        storage_ = std::exchange(other.storage_, nullptr);
380  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
380  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
381  
    }
381  
    }
382  
    return *this;
382  
    return *this;
383  
}
383  
}
384  

384  

385  
template<BufferSource S>
385  
template<BufferSource S>
386  
    requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
386  
    requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
387  
any_buffer_source::any_buffer_source(S s)
387  
any_buffer_source::any_buffer_source(S s)
388  
    : vt_(&vtable_for_impl<S>::value)
388  
    : vt_(&vtable_for_impl<S>::value)
389  
{
389  
{
390  
    struct guard {
390  
    struct guard {
391  
        any_buffer_source* self;
391  
        any_buffer_source* self;
392  
        bool committed = false;
392  
        bool committed = false;
393  
        ~guard() {
393  
        ~guard() {
394  
            if(!committed && self->storage_) {
394  
            if(!committed && self->storage_) {
395  
                self->vt_->destroy(self->source_);
395  
                self->vt_->destroy(self->source_);
396  
                ::operator delete(self->storage_);
396  
                ::operator delete(self->storage_);
397  
                self->storage_ = nullptr;
397  
                self->storage_ = nullptr;
398  
                self->source_ = nullptr;
398  
                self->source_ = nullptr;
399  
            }
399  
            }
400  
        }
400  
        }
401  
    } g{this};
401  
    } g{this};
402  

402  

403  
    storage_ = ::operator new(sizeof(S));
403  
    storage_ = ::operator new(sizeof(S));
404  
    source_ = ::new(storage_) S(std::move(s));
404  
    source_ = ::new(storage_) S(std::move(s));
405  

405  

406  
    // Preallocate the awaitable storage
406  
    // Preallocate the awaitable storage
407  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
407  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
408  

408  

409  
    g.committed = true;
409  
    g.committed = true;
410  
}
410  
}
411  

411  

412  
template<BufferSource S>
412  
template<BufferSource S>
413  
any_buffer_source::any_buffer_source(S* s)
413  
any_buffer_source::any_buffer_source(S* s)
414  
    : source_(s)
414  
    : source_(s)
415  
    , vt_(&vtable_for_impl<S>::value)
415  
    , vt_(&vtable_for_impl<S>::value)
416  
{
416  
{
417  
    // Preallocate the awaitable storage
417  
    // Preallocate the awaitable storage
418  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
418  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
419  
}
419  
}
420  

420  

421  
//----------------------------------------------------------
421  
//----------------------------------------------------------
422  

422  

423  
inline void
423  
inline void
424  
any_buffer_source::consume(std::size_t n) noexcept
424  
any_buffer_source::consume(std::size_t n) noexcept
425  
{
425  
{
426  
    vt_->do_consume(source_, n);
426  
    vt_->do_consume(source_, n);
427  
}
427  
}
428  

428  

429  
inline auto
429  
inline auto
430  
any_buffer_source::pull(std::span<const_buffer> dest)
430  
any_buffer_source::pull(std::span<const_buffer> dest)
431  
{
431  
{
432  
    struct awaitable
432  
    struct awaitable
433  
    {
433  
    {
434  
        any_buffer_source* self_;
434  
        any_buffer_source* self_;
435  
        std::span<const_buffer> dest_;
435  
        std::span<const_buffer> dest_;
436  

436  

437  
        bool
437  
        bool
438  
        await_ready() const noexcept
438  
        await_ready() const noexcept
439  
        {
439  
        {
440  
            return false;
440  
            return false;
441  
        }
441  
        }
442  

442  

443  
        coro
443  
        coro
444  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
444  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
445  
        {
445  
        {
446  
            // Construct the underlying awaitable into cached storage
446  
            // Construct the underlying awaitable into cached storage
447  
            self_->active_ops_ = self_->vt_->construct_awaitable(
447  
            self_->active_ops_ = self_->vt_->construct_awaitable(
448  
                self_->source_,
448  
                self_->source_,
449  
                self_->cached_awaitable_,
449  
                self_->cached_awaitable_,
450  
                dest_);
450  
                dest_);
451  

451  

452  
            // Check if underlying is immediately ready
452  
            // Check if underlying is immediately ready
453  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
453  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
454  
                return h;
454  
                return h;
455  

455  

456  
            // Forward to underlying awaitable
456  
            // Forward to underlying awaitable
457  
            return self_->active_ops_->await_suspend(
457  
            return self_->active_ops_->await_suspend(
458  
                self_->cached_awaitable_, h, ex, token);
458  
                self_->cached_awaitable_, h, ex, token);
459  
        }
459  
        }
460  

460  

461  
        io_result<std::span<const_buffer>>
461  
        io_result<std::span<const_buffer>>
462  
        await_resume()
462  
        await_resume()
463  
        {
463  
        {
464  
            struct guard {
464  
            struct guard {
465  
                any_buffer_source* self;
465  
                any_buffer_source* self;
466  
                ~guard() {
466  
                ~guard() {
467  
                    self->active_ops_->destroy(self->cached_awaitable_);
467  
                    self->active_ops_->destroy(self->cached_awaitable_);
468  
                    self->active_ops_ = nullptr;
468  
                    self->active_ops_ = nullptr;
469  
                }
469  
                }
470  
            } g{self_};
470  
            } g{self_};
471  
            return self_->active_ops_->await_resume(
471  
            return self_->active_ops_->await_resume(
472  
                self_->cached_awaitable_);
472  
                self_->cached_awaitable_);
473  
        }
473  
        }
474  
    };
474  
    };
475  
    return awaitable{this, dest};
475  
    return awaitable{this, dest};
476  
}
476  
}
477  

477  

478  
template<MutableBufferSequence MB>
478  
template<MutableBufferSequence MB>
479  
task<io_result<std::size_t>>
479  
task<io_result<std::size_t>>
480  
any_buffer_source::read(MB buffers)
480  
any_buffer_source::read(MB buffers)
481  
{
481  
{
482  
    std::size_t total = 0;
482  
    std::size_t total = 0;
483  
    auto dest = sans_prefix(buffers, 0);
483  
    auto dest = sans_prefix(buffers, 0);
484  

484  

485  
    while(!buffer_empty(dest))
485  
    while(!buffer_empty(dest))
486  
    {
486  
    {
487  
        const_buffer arr[detail::max_iovec_];
487  
        const_buffer arr[detail::max_iovec_];
488  
        auto [ec, bufs] = co_await pull(arr);
488  
        auto [ec, bufs] = co_await pull(arr);
489  

489  

490  
        if(ec)
490  
        if(ec)
491  
            co_return {ec, total};
491  
            co_return {ec, total};
492  

492  

493  
        if(bufs.empty())
493  
        if(bufs.empty())
494  
            co_return {error::eof, total};
494  
            co_return {error::eof, total};
495  

495  

496  
        auto n = buffer_copy(dest, bufs);
496  
        auto n = buffer_copy(dest, bufs);
497  
        consume(n);
497  
        consume(n);
498  
        total += n;
498  
        total += n;
499  
        dest = sans_prefix(dest, n);
499  
        dest = sans_prefix(dest, n);
500  
    }
500  
    }
501  

501  

502  
    co_return {{}, total};
502  
    co_return {{}, total};
503  
}
503  
}
504  

504  

505  
//----------------------------------------------------------
505  
//----------------------------------------------------------
506  

506  

507  
static_assert(BufferSource<any_buffer_source>);
507  
static_assert(BufferSource<any_buffer_source>);
508  
static_assert(ReadSource<any_buffer_source>);
508  
static_assert(ReadSource<any_buffer_source>);
509  

509  

510  
} // namespace capy
510  
} // namespace capy
511  
} // namespace boost
511  
} // namespace boost
512  

512  

513  
#endif
513  
#endif