libs/capy/include/boost/capy/io/any_write_stream.hpp

79.7% Lines (63/79) 84.0% Functions (21/25) 55.0% Branches (11/20)
libs/capy/include/boost/capy/io/any_write_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any WriteStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref WriteStream concept, enabling runtime polymorphism for
39 write operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Thread Safety
55 Not thread-safe. Concurrent operations on the same wrapper
56 are undefined behavior.
57
58 @par Example
59 @code
60 // Owning - takes ownership of the stream
61 any_write_stream stream(socket{ioc});
62
63 // Reference - wraps without ownership
64 socket sock(ioc);
65 any_write_stream stream(&sock);
66
67 const_buffer buf(data, size);
68 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
69 @endcode
70
71 @see any_read_stream, any_stream, WriteStream
72 */
73 class any_write_stream
74 {
75 struct vtable;
76 struct awaitable_ops;
77
78 template<WriteStream S>
79 struct vtable_for_impl;
80
81 void* stream_ = nullptr;
82 vtable const* vt_ = nullptr;
83 void* cached_awaitable_ = nullptr;
84 void* storage_ = nullptr;
85 awaitable_ops const* active_ops_ = nullptr;
86
87 public:
88 /** Destructor.
89
90 Destroys the owned stream (if any) and releases the cached
91 awaitable storage.
92 */
93 ~any_write_stream();
94
95 /** Default constructor.
96
97 Constructs an empty wrapper. Operations on a default-constructed
98 wrapper result in undefined behavior.
99 */
100 1 any_write_stream() = default;
101
102 /** Non-copyable.
103
104 The awaitable cache is per-instance and cannot be shared.
105 */
106 any_write_stream(any_write_stream const&) = delete;
107 any_write_stream& operator=(any_write_stream const&) = delete;
108
109 /** Move constructor.
110
111 Transfers ownership of the wrapped stream (if owned) and
112 cached awaitable storage from `other`. After the move, `other` is
113 in a default-constructed state.
114
115 @param other The wrapper to move from.
116 */
117 2 any_write_stream(any_write_stream&& other) noexcept
118 2 : stream_(std::exchange(other.stream_, nullptr))
119 2 , vt_(std::exchange(other.vt_, nullptr))
120 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
121 2 , storage_(std::exchange(other.storage_, nullptr))
122 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
123 {
124 2 }
125
126 /** Move assignment operator.
127
128 Destroys any owned stream and releases existing resources,
129 then transfers ownership from `other`.
130
131 @param other The wrapper to move from.
132 @return Reference to this wrapper.
133 */
134 any_write_stream&
135 operator=(any_write_stream&& other) noexcept;
136
137 /** Construct by taking ownership of a WriteStream.
138
139 Allocates storage and moves the stream into this wrapper.
140 The wrapper owns the stream and will destroy it.
141
142 @param s The stream to take ownership of.
143 */
144 template<WriteStream S>
145 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
146 any_write_stream(S s);
147
148 /** Construct by wrapping a WriteStream without ownership.
149
150 Wraps the given stream by pointer. The stream must remain
151 valid for the lifetime of this wrapper.
152
153 @param s Pointer to the stream to wrap.
154 */
155 template<WriteStream S>
156 any_write_stream(S* s);
157
158 /** Check if the wrapper contains a valid stream.
159
160 @return `true` if wrapping a stream, `false` if default-constructed
161 or moved-from.
162 */
163 bool
164 15 has_value() const noexcept
165 {
166 15 return stream_ != nullptr;
167 }
168
169 /** Check if the wrapper contains a valid stream.
170
171 @return `true` if wrapping a stream, `false` if default-constructed
172 or moved-from.
173 */
174 explicit
175 2 operator bool() const noexcept
176 {
177 2 return has_value();
178 }
179
180 /** Initiate an asynchronous write operation.
181
182 Writes data from the provided buffer sequence. The operation
183 completes when at least one byte has been written, or an error
184 occurs.
185
186 @param buffers The buffer sequence containing data to write.
187 Passed by value to ensure the sequence lives in the
188 coroutine frame across suspension points.
189
190 @return An awaitable yielding `(error_code,std::size_t)`.
191
192 @par Preconditions
193 The wrapper must contain a valid stream (`has_value() == true`).
194 */
195 template<ConstBufferSequence CB>
196 auto
197 write_some(CB buffers);
198
199 protected:
200 /** Rebind to a new stream after move.
201
202 Updates the internal pointer to reference a new stream object.
203 Used by owning wrappers after move assignment when the owned
204 object has moved to a new location.
205
206 @param new_stream The new stream to bind to. Must be the same
207 type as the original stream.
208
209 @note Terminates if called with a stream of different type
210 than the original.
211 */
212 template<WriteStream S>
213 void
214 rebind(S& new_stream) noexcept
215 {
216 if(vt_ != &vtable_for_impl<S>::value)
217 std::terminate();
218 stream_ = &new_stream;
219 }
220 };
221
222 //----------------------------------------------------------
223
224 struct any_write_stream::awaitable_ops
225 {
226 bool (*await_ready)(void*);
227 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
228 io_result<std::size_t> (*await_resume)(void*);
229 void (*destroy)(void*) noexcept;
230 };
231
232 struct any_write_stream::vtable
233 {
234 void (*destroy)(void*) noexcept;
235 std::size_t awaitable_size;
236 std::size_t awaitable_align;
237 awaitable_ops const* (*construct_awaitable)(
238 void* stream,
239 void* storage,
240 std::span<const_buffer const> buffers);
241 };
242
243 template<WriteStream S>
244 struct any_write_stream::vtable_for_impl
245 {
246 using Awaitable = decltype(std::declval<S&>().write_some(
247 std::span<const_buffer const>{}));
248
249 static void
250 do_destroy_impl(void* stream) noexcept
251 {
252 static_cast<S*>(stream)->~S();
253 }
254
255 static awaitable_ops const*
256 60 construct_awaitable_impl(
257 void* stream,
258 void* storage,
259 std::span<const_buffer const> buffers)
260 {
261 60 auto& s = *static_cast<S*>(stream);
262 60 ::new(storage) Awaitable(s.write_some(buffers));
263
264 static constexpr awaitable_ops ops = {
265 60 +[](void* p) {
266 60 return static_cast<Awaitable*>(p)->await_ready();
267 },
268 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
269 return detail::call_await_suspend(
270 static_cast<Awaitable*>(p), h, ex, token);
271 },
272 60 +[](void* p) {
273 60 return static_cast<Awaitable*>(p)->await_resume();
274 },
275 60 +[](void* p) noexcept {
276 60 static_cast<Awaitable*>(p)->~Awaitable();
277 }
278 };
279 60 return &ops;
280 }
281
282 static constexpr vtable value = {
283 &do_destroy_impl,
284 sizeof(Awaitable),
285 alignof(Awaitable),
286 &construct_awaitable_impl
287 };
288 };
289
290 //----------------------------------------------------------
291
292 inline
293 72 any_write_stream::~any_write_stream()
294 {
295
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 72 times.
72 if(storage_)
296 {
297 vt_->destroy(stream_);
298 ::operator delete(storage_);
299 }
300
2/2
✓ Branch 0 taken 65 times.
✓ Branch 1 taken 7 times.
72 if(cached_awaitable_)
301 {
302
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 65 times.
65 if(active_ops_)
303 active_ops_->destroy(cached_awaitable_);
304 65 ::operator delete(cached_awaitable_);
305 }
306 72 }
307
308 inline any_write_stream&
309 3 any_write_stream::operator=(any_write_stream&& other) noexcept
310 {
311
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if(this != &other)
312 {
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
314 {
315 vt_->destroy(stream_);
316 ::operator delete(storage_);
317 }
318
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(cached_awaitable_)
319 {
320 if(active_ops_)
321 active_ops_->destroy(cached_awaitable_);
322 ::operator delete(cached_awaitable_);
323 }
324 3 stream_ = std::exchange(other.stream_, nullptr);
325 3 vt_ = std::exchange(other.vt_, nullptr);
326 3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
327 3 storage_ = std::exchange(other.storage_, nullptr);
328 3 active_ops_ = std::exchange(other.active_ops_, nullptr);
329 }
330 3 return *this;
331 }
332
333 template<WriteStream S>
334 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
335 any_write_stream::any_write_stream(S s)
336 : vt_(&vtable_for_impl<S>::value)
337 {
338 struct guard {
339 any_write_stream* self;
340 bool committed = false;
341 ~guard() {
342 if(!committed && self->storage_) {
343 self->vt_->destroy(self->stream_);
344 ::operator delete(self->storage_);
345 self->storage_ = nullptr;
346 self->stream_ = nullptr;
347 }
348 }
349 } g{this};
350
351 storage_ = ::operator new(sizeof(S));
352 stream_ = ::new(storage_) S(std::move(s));
353
354 // Preallocate the awaitable storage
355 cached_awaitable_ = ::operator new(vt_->awaitable_size);
356
357 g.committed = true;
358 }
359
360 template<WriteStream S>
361 65 any_write_stream::any_write_stream(S* s)
362 65 : stream_(s)
363 65 , vt_(&vtable_for_impl<S>::value)
364 {
365 // Preallocate the awaitable storage
366 65 cached_awaitable_ = ::operator new(vt_->awaitable_size);
367 65 }
368
369 //----------------------------------------------------------
370
371 template<ConstBufferSequence CB>
372 auto
373 60 any_write_stream::write_some(CB buffers)
374 {
375 struct awaitable
376 {
377 any_write_stream* self_;
378 buffer_param<CB> bp_;
379
380 bool
381 60 await_ready() const noexcept
382 {
383 60 return false;
384 }
385
386 coro
387 60 await_suspend(coro h, executor_ref ex, std::stop_token token)
388 {
389 // Construct the underlying awaitable into cached storage
390 60 self_->active_ops_ = self_->vt_->construct_awaitable(
391 60 self_->stream_,
392
1/1
✓ Branch 1 taken 10 times.
60 self_->cached_awaitable_,
393
1/1
✓ Branch 1 taken 10 times.
60 bp_.data());
394
395 // Check if underlying is immediately ready
396
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
60 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
397 60 return h;
398
399 // Forward to underlying awaitable
400 return self_->active_ops_->await_suspend(
401 self_->cached_awaitable_, h, ex, token);
402 }
403
404 io_result<std::size_t>
405 60 await_resume()
406 {
407 struct guard {
408 any_write_stream* self;
409 60 ~guard() {
410 60 self->active_ops_->destroy(self->cached_awaitable_);
411 60 self->active_ops_ = nullptr;
412 60 }
413 60 } g{self_};
414 60 return self_->active_ops_->await_resume(
415
1/1
✓ Branch 1 taken 7 times.
103 self_->cached_awaitable_);
416 60 }
417 };
418 60 return awaitable{this, buffer_param<CB>(buffers)};
419 }
420
421 } // namespace capy
422 } // namespace boost
423
424 #endif
425