libs/capy/include/boost/capy/io/any_read_stream.hpp

79.7% Lines (63/79) 84.0% Functions (21/25) 55.0% Branches (11/20)
libs/capy/include/boost/capy/io/any_read_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any ReadStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref ReadStream concept, enabling runtime polymorphism for
39 read operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Thread Safety
55 Not thread-safe. Concurrent operations on the same wrapper
56 are undefined behavior.
57
58 @par Example
59 @code
60 // Owning - takes ownership of the stream
61 any_read_stream stream(socket{ioc});
62
63 // Reference - wraps without ownership
64 socket sock(ioc);
65 any_read_stream stream(&sock);
66
67 mutable_buffer buf(data, size);
68 auto [ec, n] = co_await stream.read_some(std::span(&buf, 1));
69 @endcode
70
71 @see any_write_stream, any_stream, ReadStream
72 */
73 class any_read_stream
74 {
75 struct vtable;
76 struct awaitable_ops;
77
78 template<ReadStream S>
79 struct vtable_for_impl;
80
81 void* stream_ = nullptr;
82 vtable const* vt_ = nullptr;
83 void* cached_awaitable_ = nullptr;
84 void* storage_ = nullptr;
85 awaitable_ops const* active_ops_ = nullptr;
86
87 public:
88 /** Destructor.
89
90 Destroys the owned stream (if any) and releases the cached
91 awaitable storage.
92 */
93 ~any_read_stream();
94
95 /** Default constructor.
96
97 Constructs an empty wrapper. Operations on a default-constructed
98 wrapper result in undefined behavior.
99 */
100 1 any_read_stream() = default;
101
102 /** Non-copyable.
103
104 The awaitable cache is per-instance and cannot be shared.
105 */
106 any_read_stream(any_read_stream const&) = delete;
107 any_read_stream& operator=(any_read_stream const&) = delete;
108
109 /** Move constructor.
110
111 Transfers ownership of the wrapped stream (if owned) and
112 cached awaitable storage from `other`. After the move, `other` is
113 in a default-constructed state.
114
115 @param other The wrapper to move from.
116 */
117 2 any_read_stream(any_read_stream&& other) noexcept
118 2 : stream_(std::exchange(other.stream_, nullptr))
119 2 , vt_(std::exchange(other.vt_, nullptr))
120 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
121 2 , storage_(std::exchange(other.storage_, nullptr))
122 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
123 {
124 2 }
125
126 /** Move assignment operator.
127
128 Destroys any owned stream and releases existing resources,
129 then transfers ownership from `other`.
130
131 @param other The wrapper to move from.
132 @return Reference to this wrapper.
133 */
134 any_read_stream&
135 operator=(any_read_stream&& other) noexcept;
136
137 /** Construct by taking ownership of a ReadStream.
138
139 Allocates storage and moves the stream into this wrapper.
140 The wrapper owns the stream and will destroy it.
141
142 @param s The stream to take ownership of.
143 */
144 template<ReadStream S>
145 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
146 any_read_stream(S s);
147
148 /** Construct by wrapping a ReadStream without ownership.
149
150 Wraps the given stream by pointer. The stream must remain
151 valid for the lifetime of this wrapper.
152
153 @param s Pointer to the stream to wrap.
154 */
155 template<ReadStream S>
156 any_read_stream(S* s);
157
158 /** Check if the wrapper contains a valid stream.
159
160 @return `true` if wrapping a stream, `false` if default-constructed
161 or moved-from.
162 */
163 bool
164 19 has_value() const noexcept
165 {
166 19 return stream_ != nullptr;
167 }
168
169 /** Check if the wrapper contains a valid stream.
170
171 @return `true` if wrapping a stream, `false` if default-constructed
172 or moved-from.
173 */
174 explicit
175 2 operator bool() const noexcept
176 {
177 2 return has_value();
178 }
179
180 /** Initiate an asynchronous read operation.
181
182 Reads data into the provided buffer sequence. The operation
183 completes when at least one byte has been read, or an error
184 occurs.
185
186 @param buffers The buffer sequence to read into. Passed by
187 value to ensure the sequence lives in the coroutine frame
188 across suspension points.
189
190 @return An awaitable yielding `(error_code,std::size_t)`.
191
192 @par Preconditions
193 The wrapper must contain a valid stream (`has_value() == true`).
194 */
195 template<MutableBufferSequence MB>
196 auto
197 read_some(MB buffers);
198
199 protected:
200 /** Rebind to a new stream after move.
201
202 Updates the internal pointer to reference a new stream object.
203 Used by owning wrappers after move assignment when the owned
204 object has moved to a new location.
205
206 @param new_stream The new stream to bind to. Must be the same
207 type as the original stream.
208
209 @note Terminates if called with a stream of different type
210 than the original.
211 */
212 template<ReadStream S>
213 void
214 rebind(S& new_stream) noexcept
215 {
216 if(vt_ != &vtable_for_impl<S>::value)
217 std::terminate();
218 stream_ = &new_stream;
219 }
220 };
221
222 //----------------------------------------------------------
223
224 struct any_read_stream::awaitable_ops
225 {
226 bool (*await_ready)(void*);
227 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
228 io_result<std::size_t> (*await_resume)(void*);
229 void (*destroy)(void*) noexcept;
230 };
231
232 struct any_read_stream::vtable
233 {
234 void (*destroy)(void*) noexcept;
235 std::size_t awaitable_size;
236 std::size_t awaitable_align;
237 awaitable_ops const* (*construct_awaitable)(
238 void* stream,
239 void* storage,
240 std::span<mutable_buffer const> buffers);
241 };
242
243 template<ReadStream S>
244 struct any_read_stream::vtable_for_impl
245 {
246 using Awaitable = decltype(std::declval<S&>().read_some(
247 std::span<mutable_buffer const>{}));
248
249 static void
250 do_destroy_impl(void* stream) noexcept
251 {
252 static_cast<S*>(stream)->~S();
253 }
254
255 static awaitable_ops const*
256 70 construct_awaitable_impl(
257 void* stream,
258 void* storage,
259 std::span<mutable_buffer const> buffers)
260 {
261 70 auto& s = *static_cast<S*>(stream);
262 70 ::new(storage) Awaitable(s.read_some(buffers));
263
264 static constexpr awaitable_ops ops = {
265 70 +[](void* p) {
266 70 return static_cast<Awaitable*>(p)->await_ready();
267 },
268 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
269 return detail::call_await_suspend(
270 static_cast<Awaitable*>(p), h, ex, token);
271 },
272 70 +[](void* p) {
273 70 return static_cast<Awaitable*>(p)->await_resume();
274 },
275 70 +[](void* p) noexcept {
276 70 static_cast<Awaitable*>(p)->~Awaitable();
277 }
278 };
279 70 return &ops;
280 }
281
282 static constexpr vtable value = {
283 &do_destroy_impl,
284 sizeof(Awaitable),
285 alignof(Awaitable),
286 &construct_awaitable_impl
287 };
288 };
289
290 //----------------------------------------------------------
291
292 inline
293 78 any_read_stream::~any_read_stream()
294 {
295
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
78 if(storage_)
296 {
297 vt_->destroy(stream_);
298 ::operator delete(storage_);
299 }
300
2/2
✓ Branch 0 taken 71 times.
✓ Branch 1 taken 7 times.
78 if(cached_awaitable_)
301 {
302
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 71 times.
71 if(active_ops_)
303 active_ops_->destroy(cached_awaitable_);
304 71 ::operator delete(cached_awaitable_);
305 }
306 78 }
307
308 inline any_read_stream&
309 3 any_read_stream::operator=(any_read_stream&& other) noexcept
310 {
311
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if(this != &other)
312 {
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
314 {
315 vt_->destroy(stream_);
316 ::operator delete(storage_);
317 }
318
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(cached_awaitable_)
319 {
320 if(active_ops_)
321 active_ops_->destroy(cached_awaitable_);
322 ::operator delete(cached_awaitable_);
323 }
324 3 stream_ = std::exchange(other.stream_, nullptr);
325 3 vt_ = std::exchange(other.vt_, nullptr);
326 3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
327 3 storage_ = std::exchange(other.storage_, nullptr);
328 3 active_ops_ = std::exchange(other.active_ops_, nullptr);
329 }
330 3 return *this;
331 }
332
333 template<ReadStream S>
334 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
335 any_read_stream::any_read_stream(S s)
336 : vt_(&vtable_for_impl<S>::value)
337 {
338 struct guard {
339 any_read_stream* self;
340 bool committed = false;
341 ~guard() {
342 if(!committed && self->storage_) {
343 self->vt_->destroy(self->stream_);
344 ::operator delete(self->storage_);
345 self->storage_ = nullptr;
346 self->stream_ = nullptr;
347 }
348 }
349 } g{this};
350
351 storage_ = ::operator new(sizeof(S));
352 stream_ = ::new(storage_) S(std::move(s));
353
354 // Preallocate the awaitable storage
355 cached_awaitable_ = ::operator new(vt_->awaitable_size);
356
357 g.committed = true;
358 }
359
360 template<ReadStream S>
361 71 any_read_stream::any_read_stream(S* s)
362 71 : stream_(s)
363 71 , vt_(&vtable_for_impl<S>::value)
364 {
365 // Preallocate the awaitable storage
366 71 cached_awaitable_ = ::operator new(vt_->awaitable_size);
367 71 }
368
369 //----------------------------------------------------------
370
371 template<MutableBufferSequence MB>
372 auto
373 70 any_read_stream::read_some(MB buffers)
374 {
375 struct awaitable
376 {
377 any_read_stream* self_;
378 buffer_param<MB> bp_;
379
380 bool
381 70 await_ready() const noexcept
382 {
383 70 return false;
384 }
385
386 coro
387 70 await_suspend(coro h, executor_ref ex, std::stop_token token)
388 {
389 // Construct the underlying awaitable into cached storage
390 70 self_->active_ops_ = self_->vt_->construct_awaitable(
391 70 self_->stream_,
392
1/1
✓ Branch 1 taken 14 times.
70 self_->cached_awaitable_,
393
1/1
✓ Branch 1 taken 14 times.
70 bp_.data());
394
395 // Check if underlying is immediately ready
396
1/2
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
70 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
397 70 return h;
398
399 // Forward to underlying awaitable
400 return self_->active_ops_->await_suspend(
401 self_->cached_awaitable_, h, ex, token);
402 }
403
404 io_result<std::size_t>
405 70 await_resume()
406 {
407 struct guard {
408 any_read_stream* self;
409 70 ~guard() {
410 70 self->active_ops_->destroy(self->cached_awaitable_);
411 70 self->active_ops_ = nullptr;
412 70 }
413 70 } g{self_};
414 70 return self_->active_ops_->await_resume(
415
1/1
✓ Branch 1 taken 10 times.
120 self_->cached_awaitable_);
416 70 }
417 };
418 70 return awaitable{this, buffer_param<MB>(buffers)};
419 }
420
421 } // namespace capy
422 } // namespace boost
423
424 #endif
425