1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_param.hpp>
16  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/write_sink.hpp>
18  
#include <boost/capy/concept/write_sink.hpp>
19  
#include <boost/capy/coro.hpp>
19  
#include <boost/capy/coro.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/task.hpp>
22  
#include <boost/capy/task.hpp>
23  

23  

24  
#include <concepts>
24  
#include <concepts>
25  
#include <coroutine>
25  
#include <coroutine>
26  
#include <cstddef>
26  
#include <cstddef>
27  
#include <exception>
27  
#include <exception>
28  
#include <new>
28  
#include <new>
29  
#include <span>
29  
#include <span>
30  
#include <stop_token>
30  
#include <stop_token>
31  
#include <system_error>
31  
#include <system_error>
32  
#include <utility>
32  
#include <utility>
33  

33  

34  
namespace boost {
34  
namespace boost {
35  
namespace capy {
35  
namespace capy {
36  

36  

37  
/** Type-erased wrapper for any WriteSink.
37  
/** Type-erased wrapper for any WriteSink.
38  

38  

39  
    This class provides type erasure for any type satisfying the
39  
    This class provides type erasure for any type satisfying the
40  
    @ref WriteSink concept, enabling runtime polymorphism for
40  
    @ref WriteSink concept, enabling runtime polymorphism for
41  
    sink write operations. It uses cached awaitable storage to achieve
41  
    sink write operations. It uses cached awaitable storage to achieve
42  
    zero steady-state allocation after construction.
42  
    zero steady-state allocation after construction.
43  

43  

44  
    The wrapper supports two construction modes:
44  
    The wrapper supports two construction modes:
45  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
    - **Owning**: Pass by value to transfer ownership. The wrapper
46  
      allocates storage and owns the sink.
46  
      allocates storage and owns the sink.
47  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
    - **Reference**: Pass a pointer to wrap without ownership. The
48  
      pointed-to sink must outlive this wrapper.
48  
      pointed-to sink must outlive this wrapper.
49  

49  

50  
    @par Awaitable Preallocation
50  
    @par Awaitable Preallocation
51  
    The constructor preallocates storage for the type-erased awaitable.
51  
    The constructor preallocates storage for the type-erased awaitable.
52  
    This reserves all virtual address space at server startup
52  
    This reserves all virtual address space at server startup
53  
    so memory usage can be measured up front, rather than
53  
    so memory usage can be measured up front, rather than
54  
    allocating piecemeal as traffic arrives.
54  
    allocating piecemeal as traffic arrives.
55  

55  

56  
    @par Thread Safety
56  
    @par Thread Safety
57  
    Not thread-safe. Concurrent operations on the same wrapper
57  
    Not thread-safe. Concurrent operations on the same wrapper
58  
    are undefined behavior.
58  
    are undefined behavior.
59  

59  

60  
    @par Example
60  
    @par Example
61  
    @code
61  
    @code
62  
    // Owning - takes ownership of the sink
62  
    // Owning - takes ownership of the sink
63  
    any_write_sink ws(some_sink{args...});
63  
    any_write_sink ws(some_sink{args...});
64  

64  

65  
    // Reference - wraps without ownership
65  
    // Reference - wraps without ownership
66  
    some_sink sink;
66  
    some_sink sink;
67  
    any_write_sink ws(&sink);
67  
    any_write_sink ws(&sink);
68  

68  

69  
    const_buffer buf(data, size);
69  
    const_buffer buf(data, size);
70  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
70  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
71  
    auto [ec2] = co_await ws.write_eof();
71  
    auto [ec2] = co_await ws.write_eof();
72  
    @endcode
72  
    @endcode
73  

73  

74  
    @see any_write_stream, WriteSink
74  
    @see any_write_stream, WriteSink
75  
*/
75  
*/
76  
class any_write_sink
76  
class any_write_sink
77  
{
77  
{
78  
    struct vtable;
78  
    struct vtable;
79  
    struct write_awaitable_ops;
79  
    struct write_awaitable_ops;
80  
    struct eof_awaitable_ops;
80  
    struct eof_awaitable_ops;
81  

81  

82  
    template<WriteSink S>
82  
    template<WriteSink S>
83  
    struct vtable_for_impl;
83  
    struct vtable_for_impl;
84  

84  

85  
    void* sink_ = nullptr;
85  
    void* sink_ = nullptr;
86  
    vtable const* vt_ = nullptr;
86  
    vtable const* vt_ = nullptr;
87  
    void* cached_awaitable_ = nullptr;
87  
    void* cached_awaitable_ = nullptr;
88  
    void* storage_ = nullptr;
88  
    void* storage_ = nullptr;
89  
    write_awaitable_ops const* active_write_ops_ = nullptr;
89  
    write_awaitable_ops const* active_write_ops_ = nullptr;
90  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
90  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
91  

91  

92  
public:
92  
public:
93  
    /** Destructor.
93  
    /** Destructor.
94  

94  

95  
        Destroys the owned sink (if any) and releases the cached
95  
        Destroys the owned sink (if any) and releases the cached
96  
        awaitable storage.
96  
        awaitable storage.
97  
    */
97  
    */
98  
    ~any_write_sink();
98  
    ~any_write_sink();
99  

99  

100  
    /** Default constructor.
100  
    /** Default constructor.
101  

101  

102  
        Constructs an empty wrapper. Operations on a default-constructed
102  
        Constructs an empty wrapper. Operations on a default-constructed
103  
        wrapper result in undefined behavior.
103  
        wrapper result in undefined behavior.
104  
    */
104  
    */
105  
    any_write_sink() = default;
105  
    any_write_sink() = default;
106  

106  

107  
    /** Non-copyable.
107  
    /** Non-copyable.
108  

108  

109  
        The awaitable cache is per-instance and cannot be shared.
109  
        The awaitable cache is per-instance and cannot be shared.
110  
    */
110  
    */
111  
    any_write_sink(any_write_sink const&) = delete;
111  
    any_write_sink(any_write_sink const&) = delete;
112  
    any_write_sink& operator=(any_write_sink const&) = delete;
112  
    any_write_sink& operator=(any_write_sink const&) = delete;
113  

113  

114  
    /** Move constructor.
114  
    /** Move constructor.
115  

115  

116  
        Transfers ownership of the wrapped sink (if owned) and
116  
        Transfers ownership of the wrapped sink (if owned) and
117  
        cached awaitable storage from `other`. After the move, `other` is
117  
        cached awaitable storage from `other`. After the move, `other` is
118  
        in a default-constructed state.
118  
        in a default-constructed state.
119  

119  

120  
        @param other The wrapper to move from.
120  
        @param other The wrapper to move from.
121  
    */
121  
    */
122  
    any_write_sink(any_write_sink&& other) noexcept
122  
    any_write_sink(any_write_sink&& other) noexcept
123  
        : sink_(std::exchange(other.sink_, nullptr))
123  
        : sink_(std::exchange(other.sink_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
127  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
127  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
128  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
128  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
129  
    {
129  
    {
130  
    }
130  
    }
131  

131  

132  
    /** Move assignment operator.
132  
    /** Move assignment operator.
133  

133  

134  
        Destroys any owned sink and releases existing resources,
134  
        Destroys any owned sink and releases existing resources,
135  
        then transfers ownership from `other`.
135  
        then transfers ownership from `other`.
136  

136  

137  
        @param other The wrapper to move from.
137  
        @param other The wrapper to move from.
138  
        @return Reference to this wrapper.
138  
        @return Reference to this wrapper.
139  
    */
139  
    */
140  
    any_write_sink&
140  
    any_write_sink&
141  
    operator=(any_write_sink&& other) noexcept;
141  
    operator=(any_write_sink&& other) noexcept;
142  

142  

143  
    /** Construct by taking ownership of a WriteSink.
143  
    /** Construct by taking ownership of a WriteSink.
144  

144  

145  
        Allocates storage and moves the sink into this wrapper.
145  
        Allocates storage and moves the sink into this wrapper.
146  
        The wrapper owns the sink and will destroy it.
146  
        The wrapper owns the sink and will destroy it.
147  

147  

148  
        @param s The sink to take ownership of.
148  
        @param s The sink to take ownership of.
149  
    */
149  
    */
150  
    template<WriteSink S>
150  
    template<WriteSink S>
151  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
151  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
152  
    any_write_sink(S s);
152  
    any_write_sink(S s);
153  

153  

154  
    /** Construct by wrapping a WriteSink without ownership.
154  
    /** Construct by wrapping a WriteSink without ownership.
155  

155  

156  
        Wraps the given sink by pointer. The sink must remain
156  
        Wraps the given sink by pointer. The sink must remain
157  
        valid for the lifetime of this wrapper.
157  
        valid for the lifetime of this wrapper.
158  

158  

159  
        @param s Pointer to the sink to wrap.
159  
        @param s Pointer to the sink to wrap.
160  
    */
160  
    */
161  
    template<WriteSink S>
161  
    template<WriteSink S>
162  
    any_write_sink(S* s);
162  
    any_write_sink(S* s);
163  

163  

164  
    /** Check if the wrapper contains a valid sink.
164  
    /** Check if the wrapper contains a valid sink.
165  

165  

166  
        @return `true` if wrapping a sink, `false` if default-constructed
166  
        @return `true` if wrapping a sink, `false` if default-constructed
167  
            or moved-from.
167  
            or moved-from.
168  
    */
168  
    */
169  
    bool
169  
    bool
170  
    has_value() const noexcept
170  
    has_value() const noexcept
171  
    {
171  
    {
172  
        return sink_ != nullptr;
172  
        return sink_ != nullptr;
173  
    }
173  
    }
174  

174  

175  
    /** Check if the wrapper contains a valid sink.
175  
    /** Check if the wrapper contains a valid sink.
176  

176  

177  
        @return `true` if wrapping a sink, `false` if default-constructed
177  
        @return `true` if wrapping a sink, `false` if default-constructed
178  
            or moved-from.
178  
            or moved-from.
179  
    */
179  
    */
180  
    explicit
180  
    explicit
181  
    operator bool() const noexcept
181  
    operator bool() const noexcept
182  
    {
182  
    {
183  
        return has_value();
183  
        return has_value();
184  
    }
184  
    }
185  

185  

186  
    /** Initiate an asynchronous write operation.
186  
    /** Initiate an asynchronous write operation.
187  

187  

188  
        Writes data from the provided buffer sequence. The operation
188  
        Writes data from the provided buffer sequence. The operation
189  
        completes when all bytes have been consumed, or an error
189  
        completes when all bytes have been consumed, or an error
190  
        occurs.
190  
        occurs.
191  

191  

192  
        @param buffers The buffer sequence containing data to write.
192  
        @param buffers The buffer sequence containing data to write.
193  
            Passed by value to ensure the sequence lives in the
193  
            Passed by value to ensure the sequence lives in the
194  
            coroutine frame across suspension points.
194  
            coroutine frame across suspension points.
195  

195  

196  
        @return An awaitable yielding `(error_code,std::size_t)`.
196  
        @return An awaitable yielding `(error_code,std::size_t)`.
197  

197  

198  
        @par Preconditions
198  
        @par Preconditions
199  
        The wrapper must contain a valid sink (`has_value() == true`).
199  
        The wrapper must contain a valid sink (`has_value() == true`).
200  
    */
200  
    */
201  
    template<ConstBufferSequence CB>
201  
    template<ConstBufferSequence CB>
202  
    task<io_result<std::size_t>>
202  
    task<io_result<std::size_t>>
203  
    write(CB buffers);
203  
    write(CB buffers);
204  

204  

205  
    /** Initiate an asynchronous write operation with optional EOF.
205  
    /** Initiate an asynchronous write operation with optional EOF.
206  

206  

207  
        Writes data from the provided buffer sequence, optionally
207  
        Writes data from the provided buffer sequence, optionally
208  
        finalizing the sink afterwards. The operation completes when
208  
        finalizing the sink afterwards. The operation completes when
209  
        all bytes have been consumed and (if eof is true) the sink
209  
        all bytes have been consumed and (if eof is true) the sink
210  
        is finalized, or an error occurs.
210  
        is finalized, or an error occurs.
211  

211  

212  
        @param buffers The buffer sequence containing data to write.
212  
        @param buffers The buffer sequence containing data to write.
213  
            Passed by value to ensure the sequence lives in the
213  
            Passed by value to ensure the sequence lives in the
214  
            coroutine frame across suspension points.
214  
            coroutine frame across suspension points.
215  

215  

216  
        @param eof If `true`, the sink is finalized after writing
216  
        @param eof If `true`, the sink is finalized after writing
217  
            the data.
217  
            the data.
218  

218  

219  
        @return An awaitable yielding `(error_code,std::size_t)`.
219  
        @return An awaitable yielding `(error_code,std::size_t)`.
220  

220  

221  
        @par Preconditions
221  
        @par Preconditions
222  
        The wrapper must contain a valid sink (`has_value() == true`).
222  
        The wrapper must contain a valid sink (`has_value() == true`).
223  
    */
223  
    */
224  
    template<ConstBufferSequence CB>
224  
    template<ConstBufferSequence CB>
225  
    task<io_result<std::size_t>>
225  
    task<io_result<std::size_t>>
226  
    write(CB buffers, bool eof);
226  
    write(CB buffers, bool eof);
227  

227  

228  
    /** Signal end of data.
228  
    /** Signal end of data.
229  

229  

230  
        Indicates that no more data will be written to the sink.
230  
        Indicates that no more data will be written to the sink.
231  
        The operation completes when the sink is finalized, or
231  
        The operation completes when the sink is finalized, or
232  
        an error occurs.
232  
        an error occurs.
233  

233  

234  
        @return An awaitable yielding `(error_code)`.
234  
        @return An awaitable yielding `(error_code)`.
235  

235  

236  
        @par Preconditions
236  
        @par Preconditions
237  
        The wrapper must contain a valid sink (`has_value() == true`).
237  
        The wrapper must contain a valid sink (`has_value() == true`).
238  
    */
238  
    */
239  
    auto
239  
    auto
240  
    write_eof();
240  
    write_eof();
241  

241  

242  
protected:
242  
protected:
243  
    /** Rebind to a new sink after move.
243  
    /** Rebind to a new sink after move.
244  

244  

245  
        Updates the internal pointer to reference a new sink object.
245  
        Updates the internal pointer to reference a new sink object.
246  
        Used by owning wrappers after move assignment when the owned
246  
        Used by owning wrappers after move assignment when the owned
247  
        object has moved to a new location.
247  
        object has moved to a new location.
248  

248  

249  
        @param new_sink The new sink to bind to. Must be the same
249  
        @param new_sink The new sink to bind to. Must be the same
250  
            type as the original sink.
250  
            type as the original sink.
251  

251  

252  
        @note Terminates if called with a sink of different type
252  
        @note Terminates if called with a sink of different type
253  
            than the original.
253  
            than the original.
254  
    */
254  
    */
255  
    template<WriteSink S>
255  
    template<WriteSink S>
256  
    void
256  
    void
257  
    rebind(S& new_sink) noexcept
257  
    rebind(S& new_sink) noexcept
258  
    {
258  
    {
259  
        if(vt_ != &vtable_for_impl<S>::value)
259  
        if(vt_ != &vtable_for_impl<S>::value)
260  
            std::terminate();
260  
            std::terminate();
261  
        sink_ = &new_sink;
261  
        sink_ = &new_sink;
262  
    }
262  
    }
263  

263  

264  
private:
264  
private:
265  
    auto
265  
    auto
266  
    write_some_(std::span<const_buffer const> buffers, bool eof);
266  
    write_some_(std::span<const_buffer const> buffers, bool eof);
267  
};
267  
};
268  

268  

269  
//----------------------------------------------------------
269  
//----------------------------------------------------------
270  

270  

271  
struct any_write_sink::write_awaitable_ops
271  
struct any_write_sink::write_awaitable_ops
272  
{
272  
{
273  
    bool (*await_ready)(void*);
273  
    bool (*await_ready)(void*);
274  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
274  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
275  
    io_result<std::size_t> (*await_resume)(void*);
275  
    io_result<std::size_t> (*await_resume)(void*);
276  
    void (*destroy)(void*) noexcept;
276  
    void (*destroy)(void*) noexcept;
277  
};
277  
};
278  

278  

279  
struct any_write_sink::eof_awaitable_ops
279  
struct any_write_sink::eof_awaitable_ops
280  
{
280  
{
281  
    bool (*await_ready)(void*);
281  
    bool (*await_ready)(void*);
282  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
282  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
283  
    io_result<> (*await_resume)(void*);
283  
    io_result<> (*await_resume)(void*);
284  
    void (*destroy)(void*) noexcept;
284  
    void (*destroy)(void*) noexcept;
285  
};
285  
};
286  

286  

287  
struct any_write_sink::vtable
287  
struct any_write_sink::vtable
288  
{
288  
{
289  
    void (*destroy)(void*) noexcept;
289  
    void (*destroy)(void*) noexcept;
290  
    std::size_t awaitable_size;
290  
    std::size_t awaitable_size;
291  
    std::size_t awaitable_align;
291  
    std::size_t awaitable_align;
292  
    write_awaitable_ops const* (*construct_write_awaitable)(
292  
    write_awaitable_ops const* (*construct_write_awaitable)(
293  
        void* sink,
293  
        void* sink,
294  
        void* storage,
294  
        void* storage,
295  
        std::span<const_buffer const> buffers,
295  
        std::span<const_buffer const> buffers,
296  
        bool eof);
296  
        bool eof);
297  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
297  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
298  
        void* sink,
298  
        void* sink,
299  
        void* storage);
299  
        void* storage);
300  
};
300  
};
301  

301  

302  
template<WriteSink S>
302  
template<WriteSink S>
303  
struct any_write_sink::vtable_for_impl
303  
struct any_write_sink::vtable_for_impl
304  
{
304  
{
305  
    using WriteAwaitable = decltype(std::declval<S&>().write(
305  
    using WriteAwaitable = decltype(std::declval<S&>().write(
306  
        std::span<const_buffer const>{}, false));
306  
        std::span<const_buffer const>{}, false));
307  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
307  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
308  

308  

309  
    static void
309  
    static void
310  
    do_destroy_impl(void* sink) noexcept
310  
    do_destroy_impl(void* sink) noexcept
311  
    {
311  
    {
312  
        static_cast<S*>(sink)->~S();
312  
        static_cast<S*>(sink)->~S();
313  
    }
313  
    }
314  

314  

315  
    static write_awaitable_ops const*
315  
    static write_awaitable_ops const*
316  
    construct_write_awaitable_impl(
316  
    construct_write_awaitable_impl(
317  
        void* sink,
317  
        void* sink,
318  
        void* storage,
318  
        void* storage,
319  
        std::span<const_buffer const> buffers,
319  
        std::span<const_buffer const> buffers,
320  
        bool eof)
320  
        bool eof)
321  
    {
321  
    {
322  
        auto& s = *static_cast<S*>(sink);
322  
        auto& s = *static_cast<S*>(sink);
323  
        ::new(storage) WriteAwaitable(s.write(buffers, eof));
323  
        ::new(storage) WriteAwaitable(s.write(buffers, eof));
324  

324  

325  
        static constexpr write_awaitable_ops ops = {
325  
        static constexpr write_awaitable_ops ops = {
326  
            +[](void* p) {
326  
            +[](void* p) {
327  
                return static_cast<WriteAwaitable*>(p)->await_ready();
327  
                return static_cast<WriteAwaitable*>(p)->await_ready();
328  
            },
328  
            },
329  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
329  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
330  
                return detail::call_await_suspend(
330  
                return detail::call_await_suspend(
331  
                    static_cast<WriteAwaitable*>(p), h, ex, token);
331  
                    static_cast<WriteAwaitable*>(p), h, ex, token);
332  
            },
332  
            },
333  
            +[](void* p) {
333  
            +[](void* p) {
334  
                return static_cast<WriteAwaitable*>(p)->await_resume();
334  
                return static_cast<WriteAwaitable*>(p)->await_resume();
335  
            },
335  
            },
336  
            +[](void* p) noexcept {
336  
            +[](void* p) noexcept {
337  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
337  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
338  
            }
338  
            }
339  
        };
339  
        };
340  
        return &ops;
340  
        return &ops;
341  
    }
341  
    }
342  

342  

343  
    static eof_awaitable_ops const*
343  
    static eof_awaitable_ops const*
344  
    construct_eof_awaitable_impl(
344  
    construct_eof_awaitable_impl(
345  
        void* sink,
345  
        void* sink,
346  
        void* storage)
346  
        void* storage)
347  
    {
347  
    {
348  
        auto& s = *static_cast<S*>(sink);
348  
        auto& s = *static_cast<S*>(sink);
349  
        ::new(storage) EofAwaitable(s.write_eof());
349  
        ::new(storage) EofAwaitable(s.write_eof());
350  

350  

351  
        static constexpr eof_awaitable_ops ops = {
351  
        static constexpr eof_awaitable_ops ops = {
352  
            +[](void* p) {
352  
            +[](void* p) {
353  
                return static_cast<EofAwaitable*>(p)->await_ready();
353  
                return static_cast<EofAwaitable*>(p)->await_ready();
354  
            },
354  
            },
355  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
355  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
356  
                return detail::call_await_suspend(
356  
                return detail::call_await_suspend(
357  
                    static_cast<EofAwaitable*>(p), h, ex, token);
357  
                    static_cast<EofAwaitable*>(p), h, ex, token);
358  
            },
358  
            },
359  
            +[](void* p) {
359  
            +[](void* p) {
360  
                return static_cast<EofAwaitable*>(p)->await_resume();
360  
                return static_cast<EofAwaitable*>(p)->await_resume();
361  
            },
361  
            },
362  
            +[](void* p) noexcept {
362  
            +[](void* p) noexcept {
363  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
363  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
364  
            }
364  
            }
365  
        };
365  
        };
366  
        return &ops;
366  
        return &ops;
367  
    }
367  
    }
368  

368  

369  
    static constexpr std::size_t max_awaitable_size =
369  
    static constexpr std::size_t max_awaitable_size =
370  
        sizeof(WriteAwaitable) > sizeof(EofAwaitable)
370  
        sizeof(WriteAwaitable) > sizeof(EofAwaitable)
371  
            ? sizeof(WriteAwaitable)
371  
            ? sizeof(WriteAwaitable)
372  
            : sizeof(EofAwaitable);
372  
            : sizeof(EofAwaitable);
373  

373  

374  
    static constexpr std::size_t max_awaitable_align =
374  
    static constexpr std::size_t max_awaitable_align =
375  
        alignof(WriteAwaitable) > alignof(EofAwaitable)
375  
        alignof(WriteAwaitable) > alignof(EofAwaitable)
376  
            ? alignof(WriteAwaitable)
376  
            ? alignof(WriteAwaitable)
377  
            : alignof(EofAwaitable);
377  
            : alignof(EofAwaitable);
378  

378  

379  
    static constexpr vtable value = {
379  
    static constexpr vtable value = {
380  
        &do_destroy_impl,
380  
        &do_destroy_impl,
381  
        max_awaitable_size,
381  
        max_awaitable_size,
382  
        max_awaitable_align,
382  
        max_awaitable_align,
383  
        &construct_write_awaitable_impl,
383  
        &construct_write_awaitable_impl,
384  
        &construct_eof_awaitable_impl
384  
        &construct_eof_awaitable_impl
385  
    };
385  
    };
386  
};
386  
};
387  

387  

388  
//----------------------------------------------------------
388  
//----------------------------------------------------------
389  

389  

390  
inline
390  
inline
391  
any_write_sink::~any_write_sink()
391  
any_write_sink::~any_write_sink()
392  
{
392  
{
393  
    if(storage_)
393  
    if(storage_)
394  
    {
394  
    {
395  
        vt_->destroy(sink_);
395  
        vt_->destroy(sink_);
396  
        ::operator delete(storage_);
396  
        ::operator delete(storage_);
397  
    }
397  
    }
398  
    if(cached_awaitable_)
398  
    if(cached_awaitable_)
399  
        ::operator delete(cached_awaitable_);
399  
        ::operator delete(cached_awaitable_);
400  
}
400  
}
401  

401  

402  
inline any_write_sink&
402  
inline any_write_sink&
403  
any_write_sink::operator=(any_write_sink&& other) noexcept
403  
any_write_sink::operator=(any_write_sink&& other) noexcept
404  
{
404  
{
405  
    if(this != &other)
405  
    if(this != &other)
406  
    {
406  
    {
407  
        if(storage_)
407  
        if(storage_)
408  
        {
408  
        {
409  
            vt_->destroy(sink_);
409  
            vt_->destroy(sink_);
410  
            ::operator delete(storage_);
410  
            ::operator delete(storage_);
411  
        }
411  
        }
412  
        if(cached_awaitable_)
412  
        if(cached_awaitable_)
413  
            ::operator delete(cached_awaitable_);
413  
            ::operator delete(cached_awaitable_);
414  
        sink_ = std::exchange(other.sink_, nullptr);
414  
        sink_ = std::exchange(other.sink_, nullptr);
415  
        vt_ = std::exchange(other.vt_, nullptr);
415  
        vt_ = std::exchange(other.vt_, nullptr);
416  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
416  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
417  
        storage_ = std::exchange(other.storage_, nullptr);
417  
        storage_ = std::exchange(other.storage_, nullptr);
418  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
418  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
419  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
419  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
420  
    }
420  
    }
421  
    return *this;
421  
    return *this;
422  
}
422  
}
423  

423  

424  
template<WriteSink S>
424  
template<WriteSink S>
425  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
425  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
426  
any_write_sink::any_write_sink(S s)
426  
any_write_sink::any_write_sink(S s)
427  
    : vt_(&vtable_for_impl<S>::value)
427  
    : vt_(&vtable_for_impl<S>::value)
428  
{
428  
{
429  
    struct guard {
429  
    struct guard {
430  
        any_write_sink* self;
430  
        any_write_sink* self;
431  
        bool committed = false;
431  
        bool committed = false;
432  
        ~guard() {
432  
        ~guard() {
433  
            if(!committed && self->storage_) {
433  
            if(!committed && self->storage_) {
434  
                self->vt_->destroy(self->sink_);
434  
                self->vt_->destroy(self->sink_);
435  
                ::operator delete(self->storage_);
435  
                ::operator delete(self->storage_);
436  
                self->storage_ = nullptr;
436  
                self->storage_ = nullptr;
437  
                self->sink_ = nullptr;
437  
                self->sink_ = nullptr;
438  
            }
438  
            }
439  
        }
439  
        }
440  
    } g{this};
440  
    } g{this};
441  

441  

442  
    storage_ = ::operator new(sizeof(S));
442  
    storage_ = ::operator new(sizeof(S));
443  
    sink_ = ::new(storage_) S(std::move(s));
443  
    sink_ = ::new(storage_) S(std::move(s));
444  

444  

445  
    // Preallocate the awaitable storage (sized for max of write/eof)
445  
    // Preallocate the awaitable storage (sized for max of write/eof)
446  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
446  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
447  

447  

448  
    g.committed = true;
448  
    g.committed = true;
449  
}
449  
}
450  

450  

451  
template<WriteSink S>
451  
template<WriteSink S>
452  
any_write_sink::any_write_sink(S* s)
452  
any_write_sink::any_write_sink(S* s)
453  
    : sink_(s)
453  
    : sink_(s)
454  
    , vt_(&vtable_for_impl<S>::value)
454  
    , vt_(&vtable_for_impl<S>::value)
455  
{
455  
{
456  
    // Preallocate the awaitable storage (sized for max of write/eof)
456  
    // Preallocate the awaitable storage (sized for max of write/eof)
457  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
457  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
458  
}
458  
}
459  

459  

460  
//----------------------------------------------------------
460  
//----------------------------------------------------------
461  

461  

462  
inline auto
462  
inline auto
463  
any_write_sink::write_some_(
463  
any_write_sink::write_some_(
464  
    std::span<const_buffer const> buffers,
464  
    std::span<const_buffer const> buffers,
465  
    bool eof)
465  
    bool eof)
466  
{
466  
{
467  
    struct awaitable
467  
    struct awaitable
468  
    {
468  
    {
469  
        any_write_sink* self_;
469  
        any_write_sink* self_;
470  
        std::span<const_buffer const> buffers_;
470  
        std::span<const_buffer const> buffers_;
471  
        bool eof_;
471  
        bool eof_;
472  

472  

473  
        bool
473  
        bool
474  
        await_ready() const noexcept
474  
        await_ready() const noexcept
475  
        {
475  
        {
476  
            return false;
476  
            return false;
477  
        }
477  
        }
478  

478  

479  
        coro
479  
        coro
480  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
480  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
481  
        {
481  
        {
482  
            // Construct the underlying awaitable into cached storage
482  
            // Construct the underlying awaitable into cached storage
483  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
483  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
484  
                self_->sink_,
484  
                self_->sink_,
485  
                self_->cached_awaitable_,
485  
                self_->cached_awaitable_,
486  
                buffers_,
486  
                buffers_,
487  
                eof_);
487  
                eof_);
488  

488  

489  
            // Check if underlying is immediately ready
489  
            // Check if underlying is immediately ready
490  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
490  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
491  
                return h;
491  
                return h;
492  

492  

493  
            // Forward to underlying awaitable
493  
            // Forward to underlying awaitable
494  
            return self_->active_write_ops_->await_suspend(
494  
            return self_->active_write_ops_->await_suspend(
495  
                self_->cached_awaitable_, h, ex, token);
495  
                self_->cached_awaitable_, h, ex, token);
496  
        }
496  
        }
497  

497  

498  
        io_result<std::size_t>
498  
        io_result<std::size_t>
499  
        await_resume()
499  
        await_resume()
500  
        {
500  
        {
501  
            struct guard {
501  
            struct guard {
502  
                any_write_sink* self;
502  
                any_write_sink* self;
503  
                ~guard() {
503  
                ~guard() {
504  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
504  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
505  
                    self->active_write_ops_ = nullptr;
505  
                    self->active_write_ops_ = nullptr;
506  
                }
506  
                }
507  
            } g{self_};
507  
            } g{self_};
508  
            return self_->active_write_ops_->await_resume(
508  
            return self_->active_write_ops_->await_resume(
509  
                self_->cached_awaitable_);
509  
                self_->cached_awaitable_);
510  
        }
510  
        }
511  
    };
511  
    };
512  
    return awaitable{this, buffers, eof};
512  
    return awaitable{this, buffers, eof};
513  
}
513  
}
514  

514  

515  
inline auto
515  
inline auto
516  
any_write_sink::write_eof()
516  
any_write_sink::write_eof()
517  
{
517  
{
518  
    struct awaitable
518  
    struct awaitable
519  
    {
519  
    {
520  
        any_write_sink* self_;
520  
        any_write_sink* self_;
521  

521  

522  
        bool
522  
        bool
523  
        await_ready() const noexcept
523  
        await_ready() const noexcept
524  
        {
524  
        {
525  
            return false;
525  
            return false;
526  
        }
526  
        }
527  

527  

528  
        coro
528  
        coro
529  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
529  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
530  
        {
530  
        {
531  
            // Construct the underlying awaitable into cached storage
531  
            // Construct the underlying awaitable into cached storage
532  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
532  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
533  
                self_->sink_,
533  
                self_->sink_,
534  
                self_->cached_awaitable_);
534  
                self_->cached_awaitable_);
535  

535  

536  
            // Check if underlying is immediately ready
536  
            // Check if underlying is immediately ready
537  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
537  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
538  
                return h;
538  
                return h;
539  

539  

540  
            // Forward to underlying awaitable
540  
            // Forward to underlying awaitable
541  
            return self_->active_eof_ops_->await_suspend(
541  
            return self_->active_eof_ops_->await_suspend(
542  
                self_->cached_awaitable_, h, ex, token);
542  
                self_->cached_awaitable_, h, ex, token);
543  
        }
543  
        }
544  

544  

545  
        io_result<>
545  
        io_result<>
546  
        await_resume()
546  
        await_resume()
547  
        {
547  
        {
548  
            struct guard {
548  
            struct guard {
549  
                any_write_sink* self;
549  
                any_write_sink* self;
550  
                ~guard() {
550  
                ~guard() {
551  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
551  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
552  
                    self->active_eof_ops_ = nullptr;
552  
                    self->active_eof_ops_ = nullptr;
553  
                }
553  
                }
554  
            } g{self_};
554  
            } g{self_};
555  
            return self_->active_eof_ops_->await_resume(
555  
            return self_->active_eof_ops_->await_resume(
556  
                self_->cached_awaitable_);
556  
                self_->cached_awaitable_);
557  
        }
557  
        }
558  
    };
558  
    };
559  
    return awaitable{this};
559  
    return awaitable{this};
560  
}
560  
}
561  

561  

562  
template<ConstBufferSequence CB>
562  
template<ConstBufferSequence CB>
563  
task<io_result<std::size_t>>
563  
task<io_result<std::size_t>>
564  
any_write_sink::write(CB buffers)
564  
any_write_sink::write(CB buffers)
565  
{
565  
{
566  
    return write(buffers, false);
566  
    return write(buffers, false);
567  
}
567  
}
568  

568  

569  
template<ConstBufferSequence CB>
569  
template<ConstBufferSequence CB>
570  
task<io_result<std::size_t>>
570  
task<io_result<std::size_t>>
571  
any_write_sink::write(CB buffers, bool eof)
571  
any_write_sink::write(CB buffers, bool eof)
572  
{
572  
{
573  
    buffer_param<CB> bp(buffers);
573  
    buffer_param<CB> bp(buffers);
574  
    std::size_t total = 0;
574  
    std::size_t total = 0;
575  

575  

576  
    for(;;)
576  
    for(;;)
577  
    {
577  
    {
578  
        auto bufs = bp.data();
578  
        auto bufs = bp.data();
579  
        if(bufs.empty())
579  
        if(bufs.empty())
580  
            break;
580  
            break;
581  

581  

582  
        auto [ec, n] = co_await write_some_(bufs, false);
582  
        auto [ec, n] = co_await write_some_(bufs, false);
583  
        if(ec)
583  
        if(ec)
584  
            co_return {ec, total + n};
584  
            co_return {ec, total + n};
585  
        bp.consume(n);
585  
        bp.consume(n);
586  
        total += n;
586  
        total += n;
587  
    }
587  
    }
588  

588  

589  
    if(eof)
589  
    if(eof)
590  
    {
590  
    {
591  
        auto [ec] = co_await write_eof();
591  
        auto [ec] = co_await write_eof();
592  
        if(ec)
592  
        if(ec)
593  
            co_return {ec, total};
593  
            co_return {ec, total};
594  
    }
594  
    }
595  

595  

596  
    co_return {{}, total};
596  
    co_return {{}, total};
597  
}
597  
}
598  

598  

599  
} // namespace capy
599  
} // namespace capy
600  
} // namespace boost
600  
} // namespace boost
601  

601  

602  
#endif
602  
#endif