1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
10  
#ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
11  
#define BOOST_CAPY_TEST_READ_STREAM_HPP
11  
#define BOOST_CAPY_TEST_READ_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <boost/capy/cond.hpp>
17  
#include <boost/capy/cond.hpp>
18  
#include <boost/capy/coro.hpp>
18  
#include <boost/capy/coro.hpp>
19  
#include <boost/capy/ex/executor_ref.hpp>
19  
#include <boost/capy/ex/executor_ref.hpp>
20  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/test/fuse.hpp>
21  
#include <boost/capy/test/fuse.hpp>
22  

22  

23  
#include <stop_token>
23  
#include <stop_token>
24  
#include <string>
24  
#include <string>
25  
#include <string_view>
25  
#include <string_view>
26  

26  

27  
namespace boost {
27  
namespace boost {
28  
namespace capy {
28  
namespace capy {
29  
namespace test {
29  
namespace test {
30  

30  

31  
/** A mock stream for testing read operations.
31  
/** A mock stream for testing read operations.
32  

32  

33  
    Use this to verify code that performs reads without needing
33  
    Use this to verify code that performs reads without needing
34  
    real I/O. Call @ref provide to supply data, then @ref read_some
34  
    real I/O. Call @ref provide to supply data, then @ref read_some
35  
    to consume it. The associated @ref fuse enables error injection
35  
    to consume it. The associated @ref fuse enables error injection
36  
    at controlled points. An optional `max_read_size` constructor
36  
    at controlled points. An optional `max_read_size` constructor
37  
    parameter limits bytes per read to simulate chunked delivery.
37  
    parameter limits bytes per read to simulate chunked delivery.
38  

38  

39  
    @par Thread Safety
39  
    @par Thread Safety
40  
    Not thread-safe.
40  
    Not thread-safe.
41  

41  

42  
    @par Example
42  
    @par Example
43  
    @code
43  
    @code
44  
    fuse f;
44  
    fuse f;
45  
    read_stream rs( f );
45  
    read_stream rs( f );
46  
    rs.provide( "Hello, " );
46  
    rs.provide( "Hello, " );
47  
    rs.provide( "World!" );
47  
    rs.provide( "World!" );
48  

48  

49  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
49  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
50  
        char buf[32];
50  
        char buf[32];
51  
        auto [ec, n] = co_await rs.read_some(
51  
        auto [ec, n] = co_await rs.read_some(
52  
            mutable_buffer( buf, sizeof( buf ) ) );
52  
            mutable_buffer( buf, sizeof( buf ) ) );
53  
        if( ec )
53  
        if( ec )
54  
            co_return;
54  
            co_return;
55  
        // buf contains "Hello, World!"
55  
        // buf contains "Hello, World!"
56  
    } );
56  
    } );
57  
    @endcode
57  
    @endcode
58  

58  

59  
    @see fuse
59  
    @see fuse
60  
*/
60  
*/
61  
class read_stream
61  
class read_stream
62  
{
62  
{
63  
    fuse* f_;
63  
    fuse* f_;
64  
    std::string data_;
64  
    std::string data_;
65  
    std::size_t pos_ = 0;
65  
    std::size_t pos_ = 0;
66  
    std::size_t max_read_size_;
66  
    std::size_t max_read_size_;
67  

67  

68  
public:
68  
public:
69  
    /** Construct a read stream.
69  
    /** Construct a read stream.
70  

70  

71  
        @param f The fuse used to inject errors during reads.
71  
        @param f The fuse used to inject errors during reads.
72  

72  

73  
        @param max_read_size Maximum bytes returned per read.
73  
        @param max_read_size Maximum bytes returned per read.
74  
        Use to simulate chunked network delivery.
74  
        Use to simulate chunked network delivery.
75  
    */
75  
    */
76  
    explicit read_stream(
76  
    explicit read_stream(
77  
        fuse& f,
77  
        fuse& f,
78  
        std::size_t max_read_size = std::size_t(-1)) noexcept
78  
        std::size_t max_read_size = std::size_t(-1)) noexcept
79  
        : f_(&f)
79  
        : f_(&f)
80  
        , max_read_size_(max_read_size)
80  
        , max_read_size_(max_read_size)
81  
    {
81  
    {
82  
    }
82  
    }
83  

83  

84  
    /** Append data to be returned by subsequent reads.
84  
    /** Append data to be returned by subsequent reads.
85  

85  

86  
        Multiple calls accumulate data that @ref read_some returns.
86  
        Multiple calls accumulate data that @ref read_some returns.
87  

87  

88  
        @param sv The data to append.
88  
        @param sv The data to append.
89  
    */
89  
    */
90  
    void
90  
    void
91  
    provide(std::string_view sv)
91  
    provide(std::string_view sv)
92  
    {
92  
    {
93  
        data_.append(sv);
93  
        data_.append(sv);
94  
    }
94  
    }
95  

95  

96  
    /// Clear all data and reset the read position.
96  
    /// Clear all data and reset the read position.
97  
    void
97  
    void
98  
    clear() noexcept
98  
    clear() noexcept
99  
    {
99  
    {
100  
        data_.clear();
100  
        data_.clear();
101  
        pos_ = 0;
101  
        pos_ = 0;
102  
    }
102  
    }
103  

103  

104  
    /// Return the number of bytes available for reading.
104  
    /// Return the number of bytes available for reading.
105  
    std::size_t
105  
    std::size_t
106  
    available() const noexcept
106  
    available() const noexcept
107  
    {
107  
    {
108  
        return data_.size() - pos_;
108  
        return data_.size() - pos_;
109  
    }
109  
    }
110  

110  

111  
    /** Asynchronously read data from the stream.
111  
    /** Asynchronously read data from the stream.
112  

112  

113  
        Transfers up to `buffer_size( buffers )` bytes from the internal
113  
        Transfers up to `buffer_size( buffers )` bytes from the internal
114  
        buffer to the provided mutable buffer sequence. If no data remains,
114  
        buffer to the provided mutable buffer sequence. If no data remains,
115  
        returns `error::eof`. Before every read, the attached @ref fuse is
115  
        returns `error::eof`. Before every read, the attached @ref fuse is
116  
        consulted to possibly inject an error for testing fault scenarios.
116  
        consulted to possibly inject an error for testing fault scenarios.
117  
        The returned `std::size_t` is the number of bytes transferred.
117  
        The returned `std::size_t` is the number of bytes transferred.
118  

118  

119  
        @par Effects
119  
        @par Effects
120  
        On success, advances the internal read position by the number of
120  
        On success, advances the internal read position by the number of
121  
        bytes copied. If an error is injected by the fuse, the read position
121  
        bytes copied. If an error is injected by the fuse, the read position
122  
        remains unchanged.
122  
        remains unchanged.
123  

123  

124  
        @par Exception Safety
124  
        @par Exception Safety
125  
        No-throw guarantee.
125  
        No-throw guarantee.
126  

126  

127  
        @param buffers The mutable buffer sequence to receive data.
127  
        @param buffers The mutable buffer sequence to receive data.
128  

128  

129  
        @return An awaitable yielding `(error_code,std::size_t)`.
129  
        @return An awaitable yielding `(error_code,std::size_t)`.
130  

130  

131  
        @see fuse
131  
        @see fuse
132  
    */
132  
    */
133  
    template<MutableBufferSequence MB>
133  
    template<MutableBufferSequence MB>
134  
    auto
134  
    auto
135  
    read_some(MB buffers)
135  
    read_some(MB buffers)
136  
    {
136  
    {
137  
        struct awaitable
137  
        struct awaitable
138  
        {
138  
        {
139  
            read_stream* self_;
139  
            read_stream* self_;
140  
            MB buffers_;
140  
            MB buffers_;
141  

141  

142  
            bool await_ready() const noexcept { return true; }
142  
            bool await_ready() const noexcept { return true; }
143  

143  

 
144 +
            // This method is required to satisfy Capy's IoAwaitable concept,
 
145 +
            // but is never called because await_ready() returns true.
 
146 +
            //
 
147 +
            // Capy uses a two-layer awaitable system: the promise's
 
148 +
            // await_transform wraps awaitables in a transform_awaiter whose
 
149 +
            // standard await_suspend(coroutine_handle) calls this custom
 
150 +
            // 3-argument overload, passing the executor and stop_token from
 
151 +
            // the coroutine's context. For synchronous test awaitables like
 
152 +
            // this one, the coroutine never suspends, so this is not invoked.
 
153 +
            // The signature exists to allow the same awaitable type to work
 
154 +
            // with both synchronous (test) and asynchronous (real I/O) code.
144  
            void await_suspend(
155  
            void await_suspend(
145  
                coro,
156  
                coro,
146  
                executor_ref,
157  
                executor_ref,
147  
                std::stop_token) const noexcept
158  
                std::stop_token) const noexcept
148  
            {
159  
            {
149  
            }
160  
            }
150  

161  

151  
            io_result<std::size_t>
162  
            io_result<std::size_t>
152  
            await_resume()
163  
            await_resume()
153  
            {
164  
            {
154  
                auto ec = self_->f_->maybe_fail();
165  
                auto ec = self_->f_->maybe_fail();
155  
                if(ec)
166  
                if(ec)
156  
                    return {ec, 0};
167  
                    return {ec, 0};
157  

168  

158  
                if(self_->pos_ >= self_->data_.size())
169  
                if(self_->pos_ >= self_->data_.size())
159  
                    return {error::eof, 0};
170  
                    return {error::eof, 0};
160  

171  

161  
                std::size_t avail = self_->data_.size() - self_->pos_;
172  
                std::size_t avail = self_->data_.size() - self_->pos_;
162  
                if(avail > self_->max_read_size_)
173  
                if(avail > self_->max_read_size_)
163  
                    avail = self_->max_read_size_;
174  
                    avail = self_->max_read_size_;
164  
                auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
175  
                auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
165  
                std::size_t const n = buffer_copy(buffers_, src);
176  
                std::size_t const n = buffer_copy(buffers_, src);
166  
                self_->pos_ += n;
177  
                self_->pos_ += n;
167  
                return {{}, n};
178  
                return {{}, n};
168  
            }
179  
            }
169  
        };
180  
        };
170  
        return awaitable{this, buffers};
181  
        return awaitable{this, buffers};
171  
    }
182  
    }
172  
};
183  
};
173  

184  

174  
} // test
185  
} // test
175  
} // capy
186  
} // capy
176  
} // boost
187  
} // boost
177  

188  

178  
#endif
189  
#endif