LCOV - code coverage report
Current view: top level - capy/io - any_read_stream.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 79.7 % 79 63
Test Date: 2026-02-04 19:49:07 Functions: 90.0 % 40 36

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
      11              : #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_param.hpp>
      17              : #include <boost/capy/concept/io_awaitable.hpp>
      18              : #include <boost/capy/concept/read_stream.hpp>
      19              : #include <boost/capy/coro.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/io_result.hpp>
      22              : 
      23              : #include <concepts>
      24              : #include <coroutine>
      25              : #include <cstddef>
      26              : #include <new>
      27              : #include <span>
      28              : #include <stop_token>
      29              : #include <system_error>
      30              : #include <utility>
      31              : 
      32              : namespace boost {
      33              : namespace capy {
      34              : 
      35              : /** Type-erased wrapper for any ReadStream.
      36              : 
      37              :     This class provides type erasure for any type satisfying the
      38              :     @ref ReadStream concept, enabling runtime polymorphism for
      39              :     read operations. It uses cached awaitable storage to achieve
      40              :     zero steady-state allocation after construction.
      41              : 
      42              :     The wrapper supports two construction modes:
      43              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      44              :       allocates storage and owns the stream.
      45              :     - **Reference**: Pass a pointer to wrap without ownership. The
      46              :       pointed-to stream must outlive this wrapper.
      47              : 
      48              :     @par Awaitable Preallocation
      49              :     The constructor preallocates storage for the type-erased awaitable.
      50              :     This reserves all virtual address space at server startup
      51              :     so memory usage can be measured up front, rather than
      52              :     allocating piecemeal as traffic arrives.
      53              : 
      54              :     @par Thread Safety
      55              :     Not thread-safe. Concurrent operations on the same wrapper
      56              :     are undefined behavior.
      57              : 
      58              :     @par Example
      59              :     @code
      60              :     // Owning - takes ownership of the stream
      61              :     any_read_stream stream(socket{ioc});
      62              : 
      63              :     // Reference - wraps without ownership
      64              :     socket sock(ioc);
      65              :     any_read_stream stream(&sock);
      66              : 
      67              :     mutable_buffer buf(data, size);
      68              :     auto [ec, n] = co_await stream.read_some(std::span(&buf, 1));
      69              :     @endcode
      70              : 
      71              :     @see any_write_stream, any_stream, ReadStream
      72              : */
      73              : class any_read_stream
      74              : {
      75              :     struct vtable;
      76              :     struct awaitable_ops;
      77              : 
      78              :     template<ReadStream S>
      79              :     struct vtable_for_impl;
      80              : 
      81              :     void* stream_ = nullptr;
      82              :     vtable const* vt_ = nullptr;
      83              :     void* cached_awaitable_ = nullptr;
      84              :     void* storage_ = nullptr;
      85              :     awaitable_ops const* active_ops_ = nullptr;
      86              : 
      87              : public:
      88              :     /** Destructor.
      89              : 
      90              :         Destroys the owned stream (if any) and releases the cached
      91              :         awaitable storage.
      92              :     */
      93              :     ~any_read_stream();
      94              : 
      95              :     /** Default constructor.
      96              : 
      97              :         Constructs an empty wrapper. Operations on a default-constructed
      98              :         wrapper result in undefined behavior.
      99              :     */
     100            1 :     any_read_stream() = default;
     101              : 
     102              :     /** Non-copyable.
     103              : 
     104              :         The awaitable cache is per-instance and cannot be shared.
     105              :     */
     106              :     any_read_stream(any_read_stream const&) = delete;
     107              :     any_read_stream& operator=(any_read_stream const&) = delete;
     108              : 
     109              :     /** Move constructor.
     110              : 
     111              :         Transfers ownership of the wrapped stream (if owned) and
     112              :         cached awaitable storage from `other`. After the move, `other` is
     113              :         in a default-constructed state.
     114              : 
     115              :         @param other The wrapper to move from.
     116              :     */
     117            2 :     any_read_stream(any_read_stream&& other) noexcept
     118            2 :         : stream_(std::exchange(other.stream_, nullptr))
     119            2 :         , vt_(std::exchange(other.vt_, nullptr))
     120            2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     121            2 :         , storage_(std::exchange(other.storage_, nullptr))
     122            2 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     123              :     {
     124            2 :     }
     125              : 
     126              :     /** Move assignment operator.
     127              : 
     128              :         Destroys any owned stream and releases existing resources,
     129              :         then transfers ownership from `other`.
     130              : 
     131              :         @param other The wrapper to move from.
     132              :         @return Reference to this wrapper.
     133              :     */
     134              :     any_read_stream&
     135              :     operator=(any_read_stream&& other) noexcept;
     136              : 
     137              :     /** Construct by taking ownership of a ReadStream.
     138              : 
     139              :         Allocates storage and moves the stream into this wrapper.
     140              :         The wrapper owns the stream and will destroy it.
     141              : 
     142              :         @param s The stream to take ownership of.
     143              :     */
     144              :     template<ReadStream S>
     145              :         requires (!std::same_as<std::decay_t<S>, any_read_stream>)
     146              :     any_read_stream(S s);
     147              : 
     148              :     /** Construct by wrapping a ReadStream without ownership.
     149              : 
     150              :         Wraps the given stream by pointer. The stream must remain
     151              :         valid for the lifetime of this wrapper.
     152              : 
     153              :         @param s Pointer to the stream to wrap.
     154              :     */
     155              :     template<ReadStream S>
     156              :     any_read_stream(S* s);
     157              : 
     158              :     /** Check if the wrapper contains a valid stream.
     159              : 
     160              :         @return `true` if wrapping a stream, `false` if default-constructed
     161              :             or moved-from.
     162              :     */
     163              :     bool
     164           19 :     has_value() const noexcept
     165              :     {
     166           19 :         return stream_ != nullptr;
     167              :     }
     168              : 
     169              :     /** Check if the wrapper contains a valid stream.
     170              : 
     171              :         @return `true` if wrapping a stream, `false` if default-constructed
     172              :             or moved-from.
     173              :     */
     174              :     explicit
     175            2 :     operator bool() const noexcept
     176              :     {
     177            2 :         return has_value();
     178              :     }
     179              : 
     180              :     /** Initiate an asynchronous read operation.
     181              : 
     182              :         Reads data into the provided buffer sequence. The operation
     183              :         completes when at least one byte has been read, or an error
     184              :         occurs.
     185              : 
     186              :         @param buffers The buffer sequence to read into. Passed by
     187              :             value to ensure the sequence lives in the coroutine frame
     188              :             across suspension points.
     189              : 
     190              :         @return An awaitable yielding `(error_code,std::size_t)`.
     191              : 
     192              :         @par Preconditions
     193              :         The wrapper must contain a valid stream (`has_value() == true`).
     194              :     */
     195              :     template<MutableBufferSequence MB>
     196              :     auto
     197              :     read_some(MB buffers);
     198              : 
     199              : protected:
     200              :     /** Rebind to a new stream after move.
     201              : 
     202              :         Updates the internal pointer to reference a new stream object.
     203              :         Used by owning wrappers after move assignment when the owned
     204              :         object has moved to a new location.
     205              : 
     206              :         @param new_stream The new stream to bind to. Must be the same
     207              :             type as the original stream.
     208              : 
     209              :         @note Terminates if called with a stream of different type
     210              :             than the original.
     211              :     */
     212              :     template<ReadStream S>
     213              :     void
     214              :     rebind(S& new_stream) noexcept
     215              :     {
     216              :         if(vt_ != &vtable_for_impl<S>::value)
     217              :             std::terminate();
     218              :         stream_ = &new_stream;
     219              :     }
     220              : };
     221              : 
     222              : //----------------------------------------------------------
     223              : 
     224              : struct any_read_stream::awaitable_ops
     225              : {
     226              :     bool (*await_ready)(void*);
     227              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     228              :     io_result<std::size_t> (*await_resume)(void*);
     229              :     void (*destroy)(void*) noexcept;
     230              : };
     231              : 
     232              : struct any_read_stream::vtable
     233              : {
     234              :     void (*destroy)(void*) noexcept;
     235              :     std::size_t awaitable_size;
     236              :     std::size_t awaitable_align;
     237              :     awaitable_ops const* (*construct_awaitable)(
     238              :         void* stream,
     239              :         void* storage,
     240              :         std::span<mutable_buffer const> buffers);
     241              : };
     242              : 
     243              : template<ReadStream S>
     244              : struct any_read_stream::vtable_for_impl
     245              : {
     246              :     using Awaitable = decltype(std::declval<S&>().read_some(
     247              :         std::span<mutable_buffer const>{}));
     248              : 
     249              :     static void
     250            0 :     do_destroy_impl(void* stream) noexcept
     251              :     {
     252            0 :         static_cast<S*>(stream)->~S();
     253            0 :     }
     254              : 
     255              :     static awaitable_ops const*
     256           70 :     construct_awaitable_impl(
     257              :         void* stream,
     258              :         void* storage,
     259              :         std::span<mutable_buffer const> buffers)
     260              :     {
     261           70 :         auto& s = *static_cast<S*>(stream);
     262           70 :         ::new(storage) Awaitable(s.read_some(buffers));
     263              : 
     264              :         static constexpr awaitable_ops ops = {
     265           70 :             +[](void* p) {
     266           70 :                 return static_cast<Awaitable*>(p)->await_ready();
     267              :             },
     268            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     269            0 :                 return detail::call_await_suspend(
     270            0 :                     static_cast<Awaitable*>(p), h, ex, token);
     271              :             },
     272           70 :             +[](void* p) {
     273           70 :                 return static_cast<Awaitable*>(p)->await_resume();
     274              :             },
     275           70 :             +[](void* p) noexcept {
     276           70 :                 static_cast<Awaitable*>(p)->~Awaitable();
     277              :             }
     278              :         };
     279           70 :         return &ops;
     280              :     }
     281              : 
     282              :     static constexpr vtable value = {
     283              :         &do_destroy_impl,
     284              :         sizeof(Awaitable),
     285              :         alignof(Awaitable),
     286              :         &construct_awaitable_impl
     287              :     };
     288              : };
     289              : 
     290              : //----------------------------------------------------------
     291              : 
     292              : inline
     293           78 : any_read_stream::~any_read_stream()
     294              : {
     295           78 :     if(storage_)
     296              :     {
     297            0 :         vt_->destroy(stream_);
     298            0 :         ::operator delete(storage_);
     299              :     }
     300           78 :     if(cached_awaitable_)
     301              :     {
     302           71 :         if(active_ops_)
     303            0 :             active_ops_->destroy(cached_awaitable_);
     304           71 :         ::operator delete(cached_awaitable_);
     305              :     }
     306           78 : }
     307              : 
     308              : inline any_read_stream&
     309            3 : any_read_stream::operator=(any_read_stream&& other) noexcept
     310              : {
     311            3 :     if(this != &other)
     312              :     {
     313            3 :         if(storage_)
     314              :         {
     315            0 :             vt_->destroy(stream_);
     316            0 :             ::operator delete(storage_);
     317              :         }
     318            3 :         if(cached_awaitable_)
     319              :         {
     320            0 :             if(active_ops_)
     321            0 :                 active_ops_->destroy(cached_awaitable_);
     322            0 :             ::operator delete(cached_awaitable_);
     323              :         }
     324            3 :         stream_ = std::exchange(other.stream_, nullptr);
     325            3 :         vt_ = std::exchange(other.vt_, nullptr);
     326            3 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     327            3 :         storage_ = std::exchange(other.storage_, nullptr);
     328            3 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     329              :     }
     330            3 :     return *this;
     331              : }
     332              : 
     333              : template<ReadStream S>
     334              :     requires (!std::same_as<std::decay_t<S>, any_read_stream>)
     335              : any_read_stream::any_read_stream(S s)
     336              :     : vt_(&vtable_for_impl<S>::value)
     337              : {
     338              :     struct guard {
     339              :         any_read_stream* self;
     340              :         bool committed = false;
     341              :         ~guard() {
     342              :             if(!committed && self->storage_) {
     343              :                 self->vt_->destroy(self->stream_);
     344              :                 ::operator delete(self->storage_);
     345              :                 self->storage_ = nullptr;
     346              :                 self->stream_ = nullptr;
     347              :             }
     348              :         }
     349              :     } g{this};
     350              : 
     351              :     storage_ = ::operator new(sizeof(S));
     352              :     stream_ = ::new(storage_) S(std::move(s));
     353              : 
     354              :     // Preallocate the awaitable storage
     355              :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     356              : 
     357              :     g.committed = true;
     358              : }
     359              : 
     360              : template<ReadStream S>
     361           71 : any_read_stream::any_read_stream(S* s)
     362           71 :     : stream_(s)
     363           71 :     , vt_(&vtable_for_impl<S>::value)
     364              : {
     365              :     // Preallocate the awaitable storage
     366           71 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     367           71 : }
     368              : 
     369              : //----------------------------------------------------------
     370              : 
     371              : template<MutableBufferSequence MB>
     372              : auto
     373           70 : any_read_stream::read_some(MB buffers)
     374              : {
     375              :     struct awaitable
     376              :     {
     377              :         any_read_stream* self_;
     378              :         buffer_param<MB> bp_;
     379              : 
     380              :         bool
     381           70 :         await_ready() const noexcept
     382              :         {
     383           70 :             return false;
     384              :         }
     385              : 
     386              :         coro
     387           70 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     388              :         {
     389              :             // Construct the underlying awaitable into cached storage
     390           70 :             self_->active_ops_ = self_->vt_->construct_awaitable(
     391           70 :                 self_->stream_,
     392           70 :                 self_->cached_awaitable_,
     393           70 :                 bp_.data());
     394              : 
     395              :             // Check if underlying is immediately ready
     396           70 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     397           70 :                 return h;
     398              : 
     399              :             // Forward to underlying awaitable
     400            0 :             return self_->active_ops_->await_suspend(
     401            0 :                 self_->cached_awaitable_, h, ex, token);
     402              :         }
     403              : 
     404              :         io_result<std::size_t>
     405           70 :         await_resume()
     406              :         {
     407              :             struct guard {
     408              :                 any_read_stream* self;
     409           70 :                 ~guard() {
     410           70 :                     self->active_ops_->destroy(self->cached_awaitable_);
     411           70 :                     self->active_ops_ = nullptr;
     412           70 :                 }
     413           70 :             } g{self_};
     414           70 :             return self_->active_ops_->await_resume(
     415          120 :                 self_->cached_awaitable_);
     416           70 :         }
     417              :     };
     418           70 :     return awaitable{this, buffer_param<MB>(buffers)};
     419              : }
     420              : 
     421              : } // namespace capy
     422              : } // namespace boost
     423              : 
     424              : #endif
        

Generated by: LCOV version 2.3