1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
21  
#include <boost/capy/coro.hpp>
21  
#include <boost/capy/coro.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
23  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_result.hpp>
24  
#include <boost/capy/task.hpp>
24  
#include <boost/capy/task.hpp>
25  

25  

26  
#include <concepts>
26  
#include <concepts>
27  
#include <coroutine>
27  
#include <coroutine>
28  
#include <cstddef>
28  
#include <cstddef>
29  
#include <exception>
29  
#include <exception>
30  
#include <new>
30  
#include <new>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <system_error>
32  
#include <system_error>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
namespace boost {
35  
namespace boost {
36  
namespace capy {
36  
namespace capy {
37  

37  

38  
/** Type-erased wrapper for any BufferSink.
38  
/** Type-erased wrapper for any BufferSink.
39  

39  

40  
    This class provides type erasure for any type satisfying the
40  
    This class provides type erasure for any type satisfying the
41  
    @ref BufferSink concept, enabling runtime polymorphism for
41  
    @ref BufferSink concept, enabling runtime polymorphism for
42  
    buffer sink operations. It uses cached awaitable storage to achieve
42  
    buffer sink operations. It uses cached awaitable storage to achieve
43  
    zero steady-state allocation after construction.
43  
    zero steady-state allocation after construction.
44  

44  

45  
    The wrapper also satisfies @ref WriteSink through templated
45  
    The wrapper also satisfies @ref WriteSink through templated
46  
    @ref write methods. These methods copy data from the caller's
46  
    @ref write methods. These methods copy data from the caller's
47  
    buffers into the sink's internal storage, incurring one extra
47  
    buffers into the sink's internal storage, incurring one extra
48  
    buffer copy compared to using @ref prepare and @ref commit
48  
    buffer copy compared to using @ref prepare and @ref commit
49  
    directly.
49  
    directly.
50  

50  

51  
    The wrapper supports two construction modes:
51  
    The wrapper supports two construction modes:
52  
    - **Owning**: Pass by value to transfer ownership. The wrapper
52  
    - **Owning**: Pass by value to transfer ownership. The wrapper
53  
      allocates storage and owns the sink.
53  
      allocates storage and owns the sink.
54  
    - **Reference**: Pass a pointer to wrap without ownership. The
54  
    - **Reference**: Pass a pointer to wrap without ownership. The
55  
      pointed-to sink must outlive this wrapper.
55  
      pointed-to sink must outlive this wrapper.
56  

56  

57  
    @par Awaitable Preallocation
57  
    @par Awaitable Preallocation
58  
    The constructor preallocates storage for the type-erased awaitable.
58  
    The constructor preallocates storage for the type-erased awaitable.
59  
    This reserves all virtual address space at server startup
59  
    This reserves all virtual address space at server startup
60  
    so memory usage can be measured up front, rather than
60  
    so memory usage can be measured up front, rather than
61  
    allocating piecemeal as traffic arrives.
61  
    allocating piecemeal as traffic arrives.
62  

62  

63  
    @par Thread Safety
63  
    @par Thread Safety
64  
    Not thread-safe. Concurrent operations on the same wrapper
64  
    Not thread-safe. Concurrent operations on the same wrapper
65  
    are undefined behavior.
65  
    are undefined behavior.
66  

66  

67  
    @par Example
67  
    @par Example
68  
    @code
68  
    @code
69  
    // Owning - takes ownership of the sink
69  
    // Owning - takes ownership of the sink
70  
    any_buffer_sink abs(some_buffer_sink{args...});
70  
    any_buffer_sink abs(some_buffer_sink{args...});
71  

71  

72  
    // Reference - wraps without ownership
72  
    // Reference - wraps without ownership
73  
    some_buffer_sink sink;
73  
    some_buffer_sink sink;
74  
    any_buffer_sink abs(&sink);
74  
    any_buffer_sink abs(&sink);
75  

75  

76  
    mutable_buffer arr[16];
76  
    mutable_buffer arr[16];
77  
    auto bufs = abs.prepare(arr);
77  
    auto bufs = abs.prepare(arr);
78  
    // Write data into bufs[0..bufs.size())
78  
    // Write data into bufs[0..bufs.size())
79  
    auto [ec] = co_await abs.commit(bytes_written);
79  
    auto [ec] = co_await abs.commit(bytes_written);
80  
    auto [ec2] = co_await abs.commit_eof();
80  
    auto [ec2] = co_await abs.commit_eof();
81  
    @endcode
81  
    @endcode
82  

82  

83  
    @see any_buffer_source, BufferSink, WriteSink
83  
    @see any_buffer_source, BufferSink, WriteSink
84  
*/
84  
*/
85  
class any_buffer_sink
85  
class any_buffer_sink
86  
{
86  
{
87  
    struct vtable;
87  
    struct vtable;
88  
    struct awaitable_ops;
88  
    struct awaitable_ops;
89  

89  

90  
    template<BufferSink S>
90  
    template<BufferSink S>
91  
    struct vtable_for_impl;
91  
    struct vtable_for_impl;
92  

92  

93  
    void* sink_ = nullptr;
93  
    void* sink_ = nullptr;
94  
    vtable const* vt_ = nullptr;
94  
    vtable const* vt_ = nullptr;
95  
    void* cached_awaitable_ = nullptr;
95  
    void* cached_awaitable_ = nullptr;
96  
    void* storage_ = nullptr;
96  
    void* storage_ = nullptr;
97  
    awaitable_ops const* active_ops_ = nullptr;
97  
    awaitable_ops const* active_ops_ = nullptr;
98  

98  

99  
public:
99  
public:
100  
    /** Destructor.
100  
    /** Destructor.
101  

101  

102  
        Destroys the owned sink (if any) and releases the cached
102  
        Destroys the owned sink (if any) and releases the cached
103  
        awaitable storage.
103  
        awaitable storage.
104  
    */
104  
    */
105  
    ~any_buffer_sink();
105  
    ~any_buffer_sink();
106  

106  

107  
    /** Default constructor.
107  
    /** Default constructor.
108  

108  

109  
        Constructs an empty wrapper. Operations on a default-constructed
109  
        Constructs an empty wrapper. Operations on a default-constructed
110  
        wrapper result in undefined behavior.
110  
        wrapper result in undefined behavior.
111  
    */
111  
    */
112  
    any_buffer_sink() = default;
112  
    any_buffer_sink() = default;
113  

113  

114  
    /** Non-copyable.
114  
    /** Non-copyable.
115  

115  

116  
        The awaitable cache is per-instance and cannot be shared.
116  
        The awaitable cache is per-instance and cannot be shared.
117  
    */
117  
    */
118  
    any_buffer_sink(any_buffer_sink const&) = delete;
118  
    any_buffer_sink(any_buffer_sink const&) = delete;
119  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
119  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
120  

120  

121  
    /** Move constructor.
121  
    /** Move constructor.
122  

122  

123  
        Transfers ownership of the wrapped sink (if owned) and
123  
        Transfers ownership of the wrapped sink (if owned) and
124  
        cached awaitable storage from `other`. After the move, `other` is
124  
        cached awaitable storage from `other`. After the move, `other` is
125  
        in a default-constructed state.
125  
        in a default-constructed state.
126  

126  

127  
        @param other The wrapper to move from.
127  
        @param other The wrapper to move from.
128  
    */
128  
    */
129  
    any_buffer_sink(any_buffer_sink&& other) noexcept
129  
    any_buffer_sink(any_buffer_sink&& other) noexcept
130  
        : sink_(std::exchange(other.sink_, nullptr))
130  
        : sink_(std::exchange(other.sink_, nullptr))
131  
        , vt_(std::exchange(other.vt_, nullptr))
131  
        , vt_(std::exchange(other.vt_, nullptr))
132  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
133  
        , storage_(std::exchange(other.storage_, nullptr))
133  
        , storage_(std::exchange(other.storage_, nullptr))
134  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
134  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
135  
    {
135  
    {
136  
    }
136  
    }
137  

137  

138  
    /** Move assignment operator.
138  
    /** Move assignment operator.
139  

139  

140  
        Destroys any owned sink and releases existing resources,
140  
        Destroys any owned sink and releases existing resources,
141  
        then transfers ownership from `other`.
141  
        then transfers ownership from `other`.
142  

142  

143  
        @param other The wrapper to move from.
143  
        @param other The wrapper to move from.
144  
        @return Reference to this wrapper.
144  
        @return Reference to this wrapper.
145  
    */
145  
    */
146  
    any_buffer_sink&
146  
    any_buffer_sink&
147  
    operator=(any_buffer_sink&& other) noexcept;
147  
    operator=(any_buffer_sink&& other) noexcept;
148  

148  

149  
    /** Construct by taking ownership of a BufferSink.
149  
    /** Construct by taking ownership of a BufferSink.
150  

150  

151  
        Allocates storage and moves the sink into this wrapper.
151  
        Allocates storage and moves the sink into this wrapper.
152  
        The wrapper owns the sink and will destroy it.
152  
        The wrapper owns the sink and will destroy it.
153  

153  

154  
        @param s The sink to take ownership of.
154  
        @param s The sink to take ownership of.
155  
    */
155  
    */
156  
    template<BufferSink S>
156  
    template<BufferSink S>
157  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
157  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
158  
    any_buffer_sink(S s);
158  
    any_buffer_sink(S s);
159  

159  

160  
    /** Construct by wrapping a BufferSink without ownership.
160  
    /** Construct by wrapping a BufferSink without ownership.
161  

161  

162  
        Wraps the given sink by pointer. The sink must remain
162  
        Wraps the given sink by pointer. The sink must remain
163  
        valid for the lifetime of this wrapper.
163  
        valid for the lifetime of this wrapper.
164  

164  

165  
        @param s Pointer to the sink to wrap.
165  
        @param s Pointer to the sink to wrap.
166  
    */
166  
    */
167  
    template<BufferSink S>
167  
    template<BufferSink S>
168  
    any_buffer_sink(S* s);
168  
    any_buffer_sink(S* s);
169  

169  

170  
    /** Check if the wrapper contains a valid sink.
170  
    /** Check if the wrapper contains a valid sink.
171  

171  

172  
        @return `true` if wrapping a sink, `false` if default-constructed
172  
        @return `true` if wrapping a sink, `false` if default-constructed
173  
            or moved-from.
173  
            or moved-from.
174  
    */
174  
    */
175  
    bool
175  
    bool
176  
    has_value() const noexcept
176  
    has_value() const noexcept
177  
    {
177  
    {
178  
        return sink_ != nullptr;
178  
        return sink_ != nullptr;
179  
    }
179  
    }
180  

180  

181  
    /** Check if the wrapper contains a valid sink.
181  
    /** Check if the wrapper contains a valid sink.
182  

182  

183  
        @return `true` if wrapping a sink, `false` if default-constructed
183  
        @return `true` if wrapping a sink, `false` if default-constructed
184  
            or moved-from.
184  
            or moved-from.
185  
    */
185  
    */
186  
    explicit
186  
    explicit
187  
    operator bool() const noexcept
187  
    operator bool() const noexcept
188  
    {
188  
    {
189  
        return has_value();
189  
        return has_value();
190  
    }
190  
    }
191  

191  

192  
    /** Prepare writable buffers.
192  
    /** Prepare writable buffers.
193  

193  

194  
        Fills the provided span with mutable buffer descriptors
194  
        Fills the provided span with mutable buffer descriptors
195  
        pointing to the underlying sink's internal storage. This
195  
        pointing to the underlying sink's internal storage. This
196  
        operation is synchronous.
196  
        operation is synchronous.
197  

197  

198  
        @param dest Span of mutable_buffer to fill.
198  
        @param dest Span of mutable_buffer to fill.
199  

199  

200  
        @return A span of filled buffers.
200  
        @return A span of filled buffers.
201  

201  

202  
        @par Preconditions
202  
        @par Preconditions
203  
        The wrapper must contain a valid sink (`has_value() == true`).
203  
        The wrapper must contain a valid sink (`has_value() == true`).
204  
    */
204  
    */
205  
    std::span<mutable_buffer>
205  
    std::span<mutable_buffer>
206  
    prepare(std::span<mutable_buffer> dest);
206  
    prepare(std::span<mutable_buffer> dest);
207  

207  

208  
    /** Commit bytes written to the prepared buffers.
208  
    /** Commit bytes written to the prepared buffers.
209  

209  

210  
        Commits `n` bytes written to the buffers returned by the
210  
        Commits `n` bytes written to the buffers returned by the
211  
        most recent call to @ref prepare. The operation may trigger
211  
        most recent call to @ref prepare. The operation may trigger
212  
        underlying I/O.
212  
        underlying I/O.
213  

213  

214  
        @param n The number of bytes to commit.
214  
        @param n The number of bytes to commit.
215  

215  

216  
        @return An awaitable yielding `(error_code)`.
216  
        @return An awaitable yielding `(error_code)`.
217  

217  

218  
        @par Preconditions
218  
        @par Preconditions
219  
        The wrapper must contain a valid sink (`has_value() == true`).
219  
        The wrapper must contain a valid sink (`has_value() == true`).
220  
    */
220  
    */
221  
    auto
221  
    auto
222  
    commit(std::size_t n);
222  
    commit(std::size_t n);
223  

223  

224  
    /** Commit bytes written with optional end-of-stream.
224  
    /** Commit bytes written with optional end-of-stream.
225  

225  

226  
        Commits `n` bytes written to the buffers returned by the
226  
        Commits `n` bytes written to the buffers returned by the
227  
        most recent call to @ref prepare. If `eof` is true, also
227  
        most recent call to @ref prepare. If `eof` is true, also
228  
        signals end-of-stream.
228  
        signals end-of-stream.
229  

229  

230  
        @param n The number of bytes to commit.
230  
        @param n The number of bytes to commit.
231  
        @param eof If true, signals end-of-stream after committing.
231  
        @param eof If true, signals end-of-stream after committing.
232  

232  

233  
        @return An awaitable yielding `(error_code)`.
233  
        @return An awaitable yielding `(error_code)`.
234  

234  

235  
        @par Preconditions
235  
        @par Preconditions
236  
        The wrapper must contain a valid sink (`has_value() == true`).
236  
        The wrapper must contain a valid sink (`has_value() == true`).
237  
    */
237  
    */
238  
    auto
238  
    auto
239  
    commit(std::size_t n, bool eof);
239  
    commit(std::size_t n, bool eof);
240  

240  

241  
    /** Signal end-of-stream.
241  
    /** Signal end-of-stream.
242  

242  

243  
        Indicates that no more data will be written to the sink.
243  
        Indicates that no more data will be written to the sink.
244  
        The operation completes when the sink is finalized, or
244  
        The operation completes when the sink is finalized, or
245  
        an error occurs.
245  
        an error occurs.
246  

246  

247  
        @return An awaitable yielding `(error_code)`.
247  
        @return An awaitable yielding `(error_code)`.
248  

248  

249  
        @par Preconditions
249  
        @par Preconditions
250  
        The wrapper must contain a valid sink (`has_value() == true`).
250  
        The wrapper must contain a valid sink (`has_value() == true`).
251  
    */
251  
    */
252  
    auto
252  
    auto
253  
    commit_eof();
253  
    commit_eof();
254  

254  

255  
    /** Write data from a buffer sequence.
255  
    /** Write data from a buffer sequence.
256  

256  

257  
        Writes all data from the buffer sequence to the underlying
257  
        Writes all data from the buffer sequence to the underlying
258  
        sink. This method satisfies the @ref WriteSink concept.
258  
        sink. This method satisfies the @ref WriteSink concept.
259  

259  

260  
        @note This operation copies data from the caller's buffers
260  
        @note This operation copies data from the caller's buffers
261  
        into the sink's internal buffers. For zero-copy writes,
261  
        into the sink's internal buffers. For zero-copy writes,
262  
        use @ref prepare and @ref commit directly.
262  
        use @ref prepare and @ref commit directly.
263  

263  

264  
        @param buffers The buffer sequence to write.
264  
        @param buffers The buffer sequence to write.
265  

265  

266  
        @return An awaitable yielding `(error_code,std::size_t)`.
266  
        @return An awaitable yielding `(error_code,std::size_t)`.
267  

267  

268  
        @par Preconditions
268  
        @par Preconditions
269  
        The wrapper must contain a valid sink (`has_value() == true`).
269  
        The wrapper must contain a valid sink (`has_value() == true`).
270  
    */
270  
    */
271  
    template<ConstBufferSequence CB>
271  
    template<ConstBufferSequence CB>
272  
    task<io_result<std::size_t>>
272  
    task<io_result<std::size_t>>
273  
    write(CB buffers);
273  
    write(CB buffers);
274  

274  

275  
    /** Write data with optional end-of-stream.
275  
    /** Write data with optional end-of-stream.
276  

276  

277  
        Writes all data from the buffer sequence to the underlying
277  
        Writes all data from the buffer sequence to the underlying
278  
        sink, optionally finalizing it afterwards. This method
278  
        sink, optionally finalizing it afterwards. This method
279  
        satisfies the @ref WriteSink concept.
279  
        satisfies the @ref WriteSink concept.
280  

280  

281  
        @note This operation copies data from the caller's buffers
281  
        @note This operation copies data from the caller's buffers
282  
        into the sink's internal buffers. For zero-copy writes,
282  
        into the sink's internal buffers. For zero-copy writes,
283  
        use @ref prepare and @ref commit directly.
283  
        use @ref prepare and @ref commit directly.
284  

284  

285  
        @param buffers The buffer sequence to write.
285  
        @param buffers The buffer sequence to write.
286  
        @param eof If true, finalize the sink after writing.
286  
        @param eof If true, finalize the sink after writing.
287  

287  

288  
        @return An awaitable yielding `(error_code,std::size_t)`.
288  
        @return An awaitable yielding `(error_code,std::size_t)`.
289  

289  

290  
        @par Preconditions
290  
        @par Preconditions
291  
        The wrapper must contain a valid sink (`has_value() == true`).
291  
        The wrapper must contain a valid sink (`has_value() == true`).
292  
    */
292  
    */
293  
    template<ConstBufferSequence CB>
293  
    template<ConstBufferSequence CB>
294  
    task<io_result<std::size_t>>
294  
    task<io_result<std::size_t>>
295  
    write(CB buffers, bool eof);
295  
    write(CB buffers, bool eof);
296  

296  

297  
    /** Signal end-of-stream.
297  
    /** Signal end-of-stream.
298  

298  

299  
        Indicates that no more data will be written to the sink.
299  
        Indicates that no more data will be written to the sink.
300  
        This method satisfies the @ref WriteSink concept.
300  
        This method satisfies the @ref WriteSink concept.
301  

301  

302  
        @return An awaitable yielding `(error_code)`.
302  
        @return An awaitable yielding `(error_code)`.
303  

303  

304  
        @par Preconditions
304  
        @par Preconditions
305  
        The wrapper must contain a valid sink (`has_value() == true`).
305  
        The wrapper must contain a valid sink (`has_value() == true`).
306  
    */
306  
    */
307  
    auto
307  
    auto
308  
    write_eof();
308  
    write_eof();
309  

309  

310  
protected:
310  
protected:
311  
    /** Rebind to a new sink after move.
311  
    /** Rebind to a new sink after move.
312  

312  

313  
        Updates the internal pointer to reference a new sink object.
313  
        Updates the internal pointer to reference a new sink object.
314  
        Used by owning wrappers after move assignment when the owned
314  
        Used by owning wrappers after move assignment when the owned
315  
        object has moved to a new location.
315  
        object has moved to a new location.
316  

316  

317  
        @param new_sink The new sink to bind to. Must be the same
317  
        @param new_sink The new sink to bind to. Must be the same
318  
            type as the original sink.
318  
            type as the original sink.
319  

319  

320  
        @note Terminates if called with a sink of different type
320  
        @note Terminates if called with a sink of different type
321  
            than the original.
321  
            than the original.
322  
    */
322  
    */
323  
    template<BufferSink S>
323  
    template<BufferSink S>
324  
    void
324  
    void
325  
    rebind(S& new_sink) noexcept
325  
    rebind(S& new_sink) noexcept
326  
    {
326  
    {
327  
        if(vt_ != &vtable_for_impl<S>::value)
327  
        if(vt_ != &vtable_for_impl<S>::value)
328  
            std::terminate();
328  
            std::terminate();
329  
        sink_ = &new_sink;
329  
        sink_ = &new_sink;
330  
    }
330  
    }
331  
};
331  
};
332  

332  

333  
//----------------------------------------------------------
333  
//----------------------------------------------------------
334  

334  

335  
struct any_buffer_sink::awaitable_ops
335  
struct any_buffer_sink::awaitable_ops
336  
{
336  
{
337  
    bool (*await_ready)(void*);
337  
    bool (*await_ready)(void*);
338  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
338  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
339  
    io_result<> (*await_resume)(void*);
339  
    io_result<> (*await_resume)(void*);
340  
    void (*destroy)(void*) noexcept;
340  
    void (*destroy)(void*) noexcept;
341  
};
341  
};
342  

342  

343  
struct any_buffer_sink::vtable
343  
struct any_buffer_sink::vtable
344  
{
344  
{
345  
    void (*destroy)(void*) noexcept;
345  
    void (*destroy)(void*) noexcept;
346  
    std::span<mutable_buffer> (*do_prepare)(
346  
    std::span<mutable_buffer> (*do_prepare)(
347  
        void* sink,
347  
        void* sink,
348  
        std::span<mutable_buffer> dest);
348  
        std::span<mutable_buffer> dest);
349  
    std::size_t awaitable_size;
349  
    std::size_t awaitable_size;
350  
    std::size_t awaitable_align;
350  
    std::size_t awaitable_align;
351  
    awaitable_ops const* (*construct_commit_awaitable)(
351  
    awaitable_ops const* (*construct_commit_awaitable)(
352  
        void* sink,
352  
        void* sink,
353  
        void* storage,
353  
        void* storage,
354  
        std::size_t n,
354  
        std::size_t n,
355  
        bool eof);
355  
        bool eof);
356  
    awaitable_ops const* (*construct_eof_awaitable)(
356  
    awaitable_ops const* (*construct_eof_awaitable)(
357  
        void* sink,
357  
        void* sink,
358  
        void* storage);
358  
        void* storage);
359  
};
359  
};
360  

360  

361  
template<BufferSink S>
361  
template<BufferSink S>
362  
struct any_buffer_sink::vtable_for_impl
362  
struct any_buffer_sink::vtable_for_impl
363  
{
363  
{
364  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
364  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
365  
        std::size_t{}, false));
365  
        std::size_t{}, false));
366  
    using EofAwaitable = decltype(std::declval<S&>().commit_eof());
366  
    using EofAwaitable = decltype(std::declval<S&>().commit_eof());
367  

367  

368  
    static void
368  
    static void
369  
    do_destroy_impl(void* sink) noexcept
369  
    do_destroy_impl(void* sink) noexcept
370  
    {
370  
    {
371  
        static_cast<S*>(sink)->~S();
371  
        static_cast<S*>(sink)->~S();
372  
    }
372  
    }
373  

373  

374  
    static std::span<mutable_buffer>
374  
    static std::span<mutable_buffer>
375  
    do_prepare_impl(
375  
    do_prepare_impl(
376  
        void* sink,
376  
        void* sink,
377  
        std::span<mutable_buffer> dest)
377  
        std::span<mutable_buffer> dest)
378  
    {
378  
    {
379  
        auto& s = *static_cast<S*>(sink);
379  
        auto& s = *static_cast<S*>(sink);
380  
        return s.prepare(dest);
380  
        return s.prepare(dest);
381  
    }
381  
    }
382  

382  

383  
    static awaitable_ops const*
383  
    static awaitable_ops const*
384  
    construct_commit_awaitable_impl(
384  
    construct_commit_awaitable_impl(
385  
        void* sink,
385  
        void* sink,
386  
        void* storage,
386  
        void* storage,
387  
        std::size_t n,
387  
        std::size_t n,
388  
        bool eof)
388  
        bool eof)
389  
    {
389  
    {
390  
        auto& s = *static_cast<S*>(sink);
390  
        auto& s = *static_cast<S*>(sink);
391  
        ::new(storage) CommitAwaitable(s.commit(n, eof));
391  
        ::new(storage) CommitAwaitable(s.commit(n, eof));
392  

392  

393  
        static constexpr awaitable_ops ops = {
393  
        static constexpr awaitable_ops ops = {
394  
            +[](void* p) {
394  
            +[](void* p) {
395  
                return static_cast<CommitAwaitable*>(p)->await_ready();
395  
                return static_cast<CommitAwaitable*>(p)->await_ready();
396  
            },
396  
            },
397  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
397  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
398  
                return detail::call_await_suspend(
398  
                return detail::call_await_suspend(
399  
                    static_cast<CommitAwaitable*>(p), h, ex, token);
399  
                    static_cast<CommitAwaitable*>(p), h, ex, token);
400  
            },
400  
            },
401  
            +[](void* p) {
401  
            +[](void* p) {
402  
                return static_cast<CommitAwaitable*>(p)->await_resume();
402  
                return static_cast<CommitAwaitable*>(p)->await_resume();
403  
            },
403  
            },
404  
            +[](void* p) noexcept {
404  
            +[](void* p) noexcept {
405  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
405  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
406  
            }
406  
            }
407  
        };
407  
        };
408  
        return &ops;
408  
        return &ops;
409  
    }
409  
    }
410  

410  

411  
    static awaitable_ops const*
411  
    static awaitable_ops const*
412  
    construct_eof_awaitable_impl(
412  
    construct_eof_awaitable_impl(
413  
        void* sink,
413  
        void* sink,
414  
        void* storage)
414  
        void* storage)
415  
    {
415  
    {
416  
        auto& s = *static_cast<S*>(sink);
416  
        auto& s = *static_cast<S*>(sink);
417  
        ::new(storage) EofAwaitable(s.commit_eof());
417  
        ::new(storage) EofAwaitable(s.commit_eof());
418  

418  

419  
        static constexpr awaitable_ops ops = {
419  
        static constexpr awaitable_ops ops = {
420  
            +[](void* p) {
420  
            +[](void* p) {
421  
                return static_cast<EofAwaitable*>(p)->await_ready();
421  
                return static_cast<EofAwaitable*>(p)->await_ready();
422  
            },
422  
            },
423  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
423  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
424  
                return detail::call_await_suspend(
424  
                return detail::call_await_suspend(
425  
                    static_cast<EofAwaitable*>(p), h, ex, token);
425  
                    static_cast<EofAwaitable*>(p), h, ex, token);
426  
            },
426  
            },
427  
            +[](void* p) {
427  
            +[](void* p) {
428  
                return static_cast<EofAwaitable*>(p)->await_resume();
428  
                return static_cast<EofAwaitable*>(p)->await_resume();
429  
            },
429  
            },
430  
            +[](void* p) noexcept {
430  
            +[](void* p) noexcept {
431  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
431  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
432  
            }
432  
            }
433  
        };
433  
        };
434  
        return &ops;
434  
        return &ops;
435  
    }
435  
    }
436  

436  

437  
    static constexpr std::size_t max_awaitable_size =
437  
    static constexpr std::size_t max_awaitable_size =
438  
        sizeof(CommitAwaitable) > sizeof(EofAwaitable)
438  
        sizeof(CommitAwaitable) > sizeof(EofAwaitable)
439  
            ? sizeof(CommitAwaitable)
439  
            ? sizeof(CommitAwaitable)
440  
            : sizeof(EofAwaitable);
440  
            : sizeof(EofAwaitable);
441  

441  

442  
    static constexpr std::size_t max_awaitable_align =
442  
    static constexpr std::size_t max_awaitable_align =
443  
        alignof(CommitAwaitable) > alignof(EofAwaitable)
443  
        alignof(CommitAwaitable) > alignof(EofAwaitable)
444  
            ? alignof(CommitAwaitable)
444  
            ? alignof(CommitAwaitable)
445  
            : alignof(EofAwaitable);
445  
            : alignof(EofAwaitable);
446  

446  

447  
    static constexpr vtable value = {
447  
    static constexpr vtable value = {
448  
        &do_destroy_impl,
448  
        &do_destroy_impl,
449  
        &do_prepare_impl,
449  
        &do_prepare_impl,
450  
        max_awaitable_size,
450  
        max_awaitable_size,
451  
        max_awaitable_align,
451  
        max_awaitable_align,
452  
        &construct_commit_awaitable_impl,
452  
        &construct_commit_awaitable_impl,
453  
        &construct_eof_awaitable_impl
453  
        &construct_eof_awaitable_impl
454  
    };
454  
    };
455  
};
455  
};
456  

456  

457  
//----------------------------------------------------------
457  
//----------------------------------------------------------
458  

458  

459  
inline
459  
inline
460  
any_buffer_sink::~any_buffer_sink()
460  
any_buffer_sink::~any_buffer_sink()
461  
{
461  
{
462  
    if(storage_)
462  
    if(storage_)
463  
    {
463  
    {
464  
        vt_->destroy(sink_);
464  
        vt_->destroy(sink_);
465  
        ::operator delete(storage_);
465  
        ::operator delete(storage_);
466  
    }
466  
    }
467  
    if(cached_awaitable_)
467  
    if(cached_awaitable_)
468  
        ::operator delete(cached_awaitable_);
468  
        ::operator delete(cached_awaitable_);
469  
}
469  
}
470  

470  

471  
inline any_buffer_sink&
471  
inline any_buffer_sink&
472  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
472  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
473  
{
473  
{
474  
    if(this != &other)
474  
    if(this != &other)
475  
    {
475  
    {
476  
        if(storage_)
476  
        if(storage_)
477  
        {
477  
        {
478  
            vt_->destroy(sink_);
478  
            vt_->destroy(sink_);
479  
            ::operator delete(storage_);
479  
            ::operator delete(storage_);
480  
        }
480  
        }
481  
        if(cached_awaitable_)
481  
        if(cached_awaitable_)
482  
            ::operator delete(cached_awaitable_);
482  
            ::operator delete(cached_awaitable_);
483  
        sink_ = std::exchange(other.sink_, nullptr);
483  
        sink_ = std::exchange(other.sink_, nullptr);
484  
        vt_ = std::exchange(other.vt_, nullptr);
484  
        vt_ = std::exchange(other.vt_, nullptr);
485  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
485  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
486  
        storage_ = std::exchange(other.storage_, nullptr);
486  
        storage_ = std::exchange(other.storage_, nullptr);
487  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
487  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
488  
    }
488  
    }
489  
    return *this;
489  
    return *this;
490  
}
490  
}
491  

491  

492  
template<BufferSink S>
492  
template<BufferSink S>
493  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
493  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
494  
any_buffer_sink::any_buffer_sink(S s)
494  
any_buffer_sink::any_buffer_sink(S s)
495  
    : vt_(&vtable_for_impl<S>::value)
495  
    : vt_(&vtable_for_impl<S>::value)
496  
{
496  
{
497  
    struct guard {
497  
    struct guard {
498  
        any_buffer_sink* self;
498  
        any_buffer_sink* self;
499  
        bool committed = false;
499  
        bool committed = false;
500  
        ~guard() {
500  
        ~guard() {
501  
            if(!committed && self->storage_) {
501  
            if(!committed && self->storage_) {
502  
                self->vt_->destroy(self->sink_);
502  
                self->vt_->destroy(self->sink_);
503  
                ::operator delete(self->storage_);
503  
                ::operator delete(self->storage_);
504  
                self->storage_ = nullptr;
504  
                self->storage_ = nullptr;
505  
                self->sink_ = nullptr;
505  
                self->sink_ = nullptr;
506  
            }
506  
            }
507  
        }
507  
        }
508  
    } g{this};
508  
    } g{this};
509  

509  

510  
    storage_ = ::operator new(sizeof(S));
510  
    storage_ = ::operator new(sizeof(S));
511  
    sink_ = ::new(storage_) S(std::move(s));
511  
    sink_ = ::new(storage_) S(std::move(s));
512  

512  

513  
    // Preallocate the awaitable storage (sized for max of commit/eof)
513  
    // Preallocate the awaitable storage (sized for max of commit/eof)
514  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
514  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
515  

515  

516  
    g.committed = true;
516  
    g.committed = true;
517  
}
517  
}
518  

518  

519  
template<BufferSink S>
519  
template<BufferSink S>
520  
any_buffer_sink::any_buffer_sink(S* s)
520  
any_buffer_sink::any_buffer_sink(S* s)
521  
    : sink_(s)
521  
    : sink_(s)
522  
    , vt_(&vtable_for_impl<S>::value)
522  
    , vt_(&vtable_for_impl<S>::value)
523  
{
523  
{
524  
    // Preallocate the awaitable storage (sized for max of commit/eof)
524  
    // Preallocate the awaitable storage (sized for max of commit/eof)
525  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
525  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
526  
}
526  
}
527  

527  

528  
//----------------------------------------------------------
528  
//----------------------------------------------------------
529  

529  

530  
inline std::span<mutable_buffer>
530  
inline std::span<mutable_buffer>
531  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
531  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
532  
{
532  
{
533  
    return vt_->do_prepare(sink_, dest);
533  
    return vt_->do_prepare(sink_, dest);
534  
}
534  
}
535  

535  

536  
inline auto
536  
inline auto
537  
any_buffer_sink::commit(std::size_t n, bool eof)
537  
any_buffer_sink::commit(std::size_t n, bool eof)
538  
{
538  
{
539  
    struct awaitable
539  
    struct awaitable
540  
    {
540  
    {
541  
        any_buffer_sink* self_;
541  
        any_buffer_sink* self_;
542  
        std::size_t n_;
542  
        std::size_t n_;
543  
        bool eof_;
543  
        bool eof_;
544  

544  

545  
        bool
545  
        bool
546  
        await_ready() const noexcept
546  
        await_ready() const noexcept
547  
        {
547  
        {
548  
            return false;
548  
            return false;
549  
        }
549  
        }
550  

550  

551  
        coro
551  
        coro
552  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
552  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
553  
        {
553  
        {
554  
            // Construct the underlying awaitable into cached storage
554  
            // Construct the underlying awaitable into cached storage
555  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
555  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
556  
                self_->sink_,
556  
                self_->sink_,
557  
                self_->cached_awaitable_,
557  
                self_->cached_awaitable_,
558  
                n_,
558  
                n_,
559  
                eof_);
559  
                eof_);
560  

560  

561  
            // Check if underlying is immediately ready
561  
            // Check if underlying is immediately ready
562  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
562  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
563  
                return h;
563  
                return h;
564  

564  

565  
            // Forward to underlying awaitable
565  
            // Forward to underlying awaitable
566  
            return self_->active_ops_->await_suspend(
566  
            return self_->active_ops_->await_suspend(
567  
                self_->cached_awaitable_, h, ex, token);
567  
                self_->cached_awaitable_, h, ex, token);
568  
        }
568  
        }
569  

569  

570  
        io_result<>
570  
        io_result<>
571  
        await_resume()
571  
        await_resume()
572  
        {
572  
        {
573  
            struct guard {
573  
            struct guard {
574  
                any_buffer_sink* self;
574  
                any_buffer_sink* self;
575  
                ~guard() {
575  
                ~guard() {
576  
                    self->active_ops_->destroy(self->cached_awaitable_);
576  
                    self->active_ops_->destroy(self->cached_awaitable_);
577  
                    self->active_ops_ = nullptr;
577  
                    self->active_ops_ = nullptr;
578  
                }
578  
                }
579  
            } g{self_};
579  
            } g{self_};
580  
            return self_->active_ops_->await_resume(
580  
            return self_->active_ops_->await_resume(
581  
                self_->cached_awaitable_);
581  
                self_->cached_awaitable_);
582  
        }
582  
        }
583  
    };
583  
    };
584  
    return awaitable{this, n, eof};
584  
    return awaitable{this, n, eof};
585  
}
585  
}
586  

586  

587  
inline auto
587  
inline auto
588  
any_buffer_sink::commit(std::size_t n)
588  
any_buffer_sink::commit(std::size_t n)
589  
{
589  
{
590  
    return commit(n, false);
590  
    return commit(n, false);
591  
}
591  
}
592  

592  

593  
inline auto
593  
inline auto
594  
any_buffer_sink::commit_eof()
594  
any_buffer_sink::commit_eof()
595  
{
595  
{
596  
    struct awaitable
596  
    struct awaitable
597  
    {
597  
    {
598  
        any_buffer_sink* self_;
598  
        any_buffer_sink* self_;
599  

599  

600  
        bool
600  
        bool
601  
        await_ready() const noexcept
601  
        await_ready() const noexcept
602  
        {
602  
        {
603  
            return false;
603  
            return false;
604  
        }
604  
        }
605  

605  

606  
        coro
606  
        coro
607  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
607  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
608  
        {
608  
        {
609  
            // Construct the underlying awaitable into cached storage
609  
            // Construct the underlying awaitable into cached storage
610  
            self_->active_ops_ = self_->vt_->construct_eof_awaitable(
610  
            self_->active_ops_ = self_->vt_->construct_eof_awaitable(
611  
                self_->sink_,
611  
                self_->sink_,
612  
                self_->cached_awaitable_);
612  
                self_->cached_awaitable_);
613  

613  

614  
            // Check if underlying is immediately ready
614  
            // Check if underlying is immediately ready
615  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
615  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
616  
                return h;
616  
                return h;
617  

617  

618  
            // Forward to underlying awaitable
618  
            // Forward to underlying awaitable
619  
            return self_->active_ops_->await_suspend(
619  
            return self_->active_ops_->await_suspend(
620  
                self_->cached_awaitable_, h, ex, token);
620  
                self_->cached_awaitable_, h, ex, token);
621  
        }
621  
        }
622  

622  

623  
        io_result<>
623  
        io_result<>
624  
        await_resume()
624  
        await_resume()
625  
        {
625  
        {
626  
            struct guard {
626  
            struct guard {
627  
                any_buffer_sink* self;
627  
                any_buffer_sink* self;
628  
                ~guard() {
628  
                ~guard() {
629  
                    self->active_ops_->destroy(self->cached_awaitable_);
629  
                    self->active_ops_->destroy(self->cached_awaitable_);
630  
                    self->active_ops_ = nullptr;
630  
                    self->active_ops_ = nullptr;
631  
                }
631  
                }
632  
            } g{self_};
632  
            } g{self_};
633  
            return self_->active_ops_->await_resume(
633  
            return self_->active_ops_->await_resume(
634  
                self_->cached_awaitable_);
634  
                self_->cached_awaitable_);
635  
        }
635  
        }
636  
    };
636  
    };
637  
    return awaitable{this};
637  
    return awaitable{this};
638  
}
638  
}
639  

639  

640  
//----------------------------------------------------------
640  
//----------------------------------------------------------
641  

641  

642  
template<ConstBufferSequence CB>
642  
template<ConstBufferSequence CB>
643  
task<io_result<std::size_t>>
643  
task<io_result<std::size_t>>
644  
any_buffer_sink::write(CB buffers)
644  
any_buffer_sink::write(CB buffers)
645  
{
645  
{
646  
    return write(buffers, false);
646  
    return write(buffers, false);
647  
}
647  
}
648  

648  

649  
template<ConstBufferSequence CB>
649  
template<ConstBufferSequence CB>
650  
task<io_result<std::size_t>>
650  
task<io_result<std::size_t>>
651  
any_buffer_sink::write(CB buffers, bool eof)
651  
any_buffer_sink::write(CB buffers, bool eof)
652  
{
652  
{
653  
    buffer_param<CB> bp(buffers);
653  
    buffer_param<CB> bp(buffers);
654  
    std::size_t total = 0;
654  
    std::size_t total = 0;
655  

655  

656  
    for(;;)
656  
    for(;;)
657  
    {
657  
    {
658  
        auto src = bp.data();
658  
        auto src = bp.data();
659  
        if(src.empty())
659  
        if(src.empty())
660  
            break;
660  
            break;
661  

661  

662  
        mutable_buffer arr[detail::max_iovec_];
662  
        mutable_buffer arr[detail::max_iovec_];
663  
        auto dst_bufs = prepare(arr);
663  
        auto dst_bufs = prepare(arr);
664  
        if(dst_bufs.empty())
664  
        if(dst_bufs.empty())
665  
        {
665  
        {
666  
            auto [ec] = co_await commit(0);
666  
            auto [ec] = co_await commit(0);
667  
            if(ec)
667  
            if(ec)
668  
                co_return {ec, total};
668  
                co_return {ec, total};
669  
            continue;
669  
            continue;
670  
        }
670  
        }
671  

671  

672  
        auto n = buffer_copy(dst_bufs, src);
672  
        auto n = buffer_copy(dst_bufs, src);
673  
        auto [ec] = co_await commit(n);
673  
        auto [ec] = co_await commit(n);
674  
        if(ec)
674  
        if(ec)
675  
            co_return {ec, total};
675  
            co_return {ec, total};
676  
        bp.consume(n);
676  
        bp.consume(n);
677  
        total += n;
677  
        total += n;
678  
    }
678  
    }
679  

679  

680  
    if(eof)
680  
    if(eof)
681  
    {
681  
    {
682  
        auto [ec] = co_await commit_eof();
682  
        auto [ec] = co_await commit_eof();
683  
        if(ec)
683  
        if(ec)
684  
            co_return {ec, total};
684  
            co_return {ec, total};
685  
    }
685  
    }
686  

686  

687  
    co_return {{}, total};
687  
    co_return {{}, total};
688  
}
688  
}
689  

689  

690  
inline auto
690  
inline auto
691  
any_buffer_sink::write_eof()
691  
any_buffer_sink::write_eof()
692  
{
692  
{
693  
    return commit_eof();
693  
    return commit_eof();
694  
}
694  
}
695  

695  

696  
//----------------------------------------------------------
696  
//----------------------------------------------------------
697  

697  

698  
static_assert(WriteSink<any_buffer_sink>);
698  
static_assert(WriteSink<any_buffer_sink>);
699  

699  

700  
} // namespace capy
700  
} // namespace capy
701  
} // namespace boost
701  
} // namespace boost
702  

702  

703  
#endif
703  
#endif