1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_BUFFER_SINK_HPP
10  
#ifndef BOOST_CAPY_TEST_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_TEST_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_TEST_BUFFER_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/make_buffer.hpp>
15  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/coro.hpp>
16  
#include <boost/capy/coro.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/io_result.hpp>
18  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/test/fuse.hpp>
19  
#include <boost/capy/test/fuse.hpp>
20  

20  

21  
#include <algorithm>
21  
#include <algorithm>
22  
#include <span>
22  
#include <span>
23  
#include <stop_token>
23  
#include <stop_token>
24  
#include <string>
24  
#include <string>
25  
#include <string_view>
25  
#include <string_view>
26  

26  

27  
namespace boost {
27  
namespace boost {
28  
namespace capy {
28  
namespace capy {
29  
namespace test {
29  
namespace test {
30  

30  

31  
/** A mock buffer sink for testing callee-owns-buffers write operations.
31  
/** A mock buffer sink for testing callee-owns-buffers write operations.
32  

32  

33  
    Use this to verify code that writes data using the callee-owns-buffers
33  
    Use this to verify code that writes data using the callee-owns-buffers
34  
    pattern without needing real I/O. Call @ref prepare to get writable
34  
    pattern without needing real I/O. Call @ref prepare to get writable
35  
    buffers, write into them, then call @ref commit to finalize. The
35  
    buffers, write into them, then call @ref commit to finalize. The
36  
    associated @ref fuse enables error injection at controlled points.
36  
    associated @ref fuse enables error injection at controlled points.
37  

37  

38  
    This class satisfies the @ref BufferSink concept by providing
38  
    This class satisfies the @ref BufferSink concept by providing
39  
    internal storage that callers write into directly.
39  
    internal storage that callers write into directly.
40  

40  

41  
    @par Thread Safety
41  
    @par Thread Safety
42  
    Not thread-safe.
42  
    Not thread-safe.
43  

43  

44  
    @par Example
44  
    @par Example
45  
    @code
45  
    @code
46  
    fuse f;
46  
    fuse f;
47  
    buffer_sink bs( f );
47  
    buffer_sink bs( f );
48  

48  

49  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
49  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
50  
        mutable_buffer arr[16];
50  
        mutable_buffer arr[16];
51  
        std::size_t count = bs.prepare( arr, 16 );
51  
        std::size_t count = bs.prepare( arr, 16 );
52  
        if( count == 0 )
52  
        if( count == 0 )
53  
            co_return;
53  
            co_return;
54  

54  

55  
        // Write data into arr[0]
55  
        // Write data into arr[0]
56  
        std::memcpy( arr[0].data(), "Hello", 5 );
56  
        std::memcpy( arr[0].data(), "Hello", 5 );
57  

57  

58  
        auto [ec] = co_await bs.commit( 5 );
58  
        auto [ec] = co_await bs.commit( 5 );
59  
        if( ec )
59  
        if( ec )
60  
            co_return;
60  
            co_return;
61  

61  

62  
        auto [ec2] = co_await bs.commit_eof();
62  
        auto [ec2] = co_await bs.commit_eof();
63  
        // bs.data() returns "Hello"
63  
        // bs.data() returns "Hello"
64  
    } );
64  
    } );
65  
    @endcode
65  
    @endcode
66  

66  

67  
    @see fuse, BufferSink
67  
    @see fuse, BufferSink
68  
*/
68  
*/
69  
class buffer_sink
69  
class buffer_sink
70  
{
70  
{
71  
    fuse* f_;
71  
    fuse* f_;
72  
    std::string data_;
72  
    std::string data_;
73  
    std::string prepare_buf_;
73  
    std::string prepare_buf_;
74  
    std::size_t prepare_size_ = 0;
74  
    std::size_t prepare_size_ = 0;
75  
    std::size_t max_prepare_size_;
75  
    std::size_t max_prepare_size_;
76  
    bool eof_called_ = false;
76  
    bool eof_called_ = false;
77  

77  

78  
public:
78  
public:
79  
    /** Construct a buffer sink.
79  
    /** Construct a buffer sink.
80  

80  

81  
        @param f The fuse used to inject errors during commits.
81  
        @param f The fuse used to inject errors during commits.
82  

82  

83  
        @param max_prepare_size Maximum bytes available per prepare.
83  
        @param max_prepare_size Maximum bytes available per prepare.
84  
        Use to simulate limited buffer space.
84  
        Use to simulate limited buffer space.
85  
    */
85  
    */
86  
    explicit buffer_sink(
86  
    explicit buffer_sink(
87  
        fuse& f,
87  
        fuse& f,
88  
        std::size_t max_prepare_size = 4096) noexcept
88  
        std::size_t max_prepare_size = 4096) noexcept
89  
        : f_(&f)
89  
        : f_(&f)
90  
        , max_prepare_size_(max_prepare_size)
90  
        , max_prepare_size_(max_prepare_size)
91  
    {
91  
    {
92  
        prepare_buf_.resize(max_prepare_size_);
92  
        prepare_buf_.resize(max_prepare_size_);
93  
    }
93  
    }
94  

94  

95  
    /// Return the written data as a string view.
95  
    /// Return the written data as a string view.
96  
    std::string_view
96  
    std::string_view
97  
    data() const noexcept
97  
    data() const noexcept
98  
    {
98  
    {
99  
        return data_;
99  
        return data_;
100  
    }
100  
    }
101  

101  

102  
    /// Return the number of bytes written.
102  
    /// Return the number of bytes written.
103  
    std::size_t
103  
    std::size_t
104  
    size() const noexcept
104  
    size() const noexcept
105  
    {
105  
    {
106  
        return data_.size();
106  
        return data_.size();
107  
    }
107  
    }
108  

108  

109  
    /// Return whether commit_eof has been called.
109  
    /// Return whether commit_eof has been called.
110  
    bool
110  
    bool
111  
    eof_called() const noexcept
111  
    eof_called() const noexcept
112  
    {
112  
    {
113  
        return eof_called_;
113  
        return eof_called_;
114  
    }
114  
    }
115  

115  

116  
    /// Clear all data and reset state.
116  
    /// Clear all data and reset state.
117  
    void
117  
    void
118  
    clear() noexcept
118  
    clear() noexcept
119  
    {
119  
    {
120  
        data_.clear();
120  
        data_.clear();
121  
        prepare_size_ = 0;
121  
        prepare_size_ = 0;
122  
        eof_called_ = false;
122  
        eof_called_ = false;
123  
    }
123  
    }
124  

124  

125  
    /** Prepare writable buffers.
125  
    /** Prepare writable buffers.
126  

126  

127  
        Fills the provided span with mutable buffer descriptors pointing
127  
        Fills the provided span with mutable buffer descriptors pointing
128  
        to internal storage. The caller writes data into these buffers,
128  
        to internal storage. The caller writes data into these buffers,
129  
        then calls @ref commit to finalize.
129  
        then calls @ref commit to finalize.
130  

130  

131  
        @param dest Span of mutable_buffer to fill.
131  
        @param dest Span of mutable_buffer to fill.
132  

132  

133  
        @return A span of filled buffers (empty or 1 buffer in this implementation).
133  
        @return A span of filled buffers (empty or 1 buffer in this implementation).
134  
    */
134  
    */
135  
    std::span<mutable_buffer>
135  
    std::span<mutable_buffer>
136  
    prepare(std::span<mutable_buffer> dest)
136  
    prepare(std::span<mutable_buffer> dest)
137  
    {
137  
    {
138  
        if(dest.empty())
138  
        if(dest.empty())
139  
            return {};
139  
            return {};
140  

140  

141  
        prepare_size_ = max_prepare_size_;
141  
        prepare_size_ = max_prepare_size_;
142  
        dest[0] = make_buffer(prepare_buf_.data(), prepare_size_);
142  
        dest[0] = make_buffer(prepare_buf_.data(), prepare_size_);
143  
        return dest.first(1);
143  
        return dest.first(1);
144  
    }
144  
    }
145  

145  

146  
    /** Commit bytes written to the prepared buffers.
146  
    /** Commit bytes written to the prepared buffers.
147  

147  

148  
        Transfers `n` bytes from the prepared buffer to the internal
148  
        Transfers `n` bytes from the prepared buffer to the internal
149  
        data buffer. Before committing, the attached @ref fuse is
149  
        data buffer. Before committing, the attached @ref fuse is
150  
        consulted to possibly inject an error for testing fault scenarios.
150  
        consulted to possibly inject an error for testing fault scenarios.
151  

151  

152  
        @param n The number of bytes to commit.
152  
        @param n The number of bytes to commit.
153  

153  

154  
        @return An awaitable yielding `(error_code)`.
154  
        @return An awaitable yielding `(error_code)`.
155  

155  

156  
        @see fuse
156  
        @see fuse
157  
    */
157  
    */
158  
    auto
158  
    auto
159  
    commit(std::size_t n)
159  
    commit(std::size_t n)
160  
    {
160  
    {
161  
        struct awaitable
161  
        struct awaitable
162  
        {
162  
        {
163  
            buffer_sink* self_;
163  
            buffer_sink* self_;
164  
            std::size_t n_;
164  
            std::size_t n_;
165  

165  

166  
            bool await_ready() const noexcept { return true; }
166  
            bool await_ready() const noexcept { return true; }
167  

167  

 
168 +
            // This method is required to satisfy Capy's IoAwaitable concept,
 
169 +
            // but is never called because await_ready() returns true.
 
170 +
            //
 
171 +
            // Capy uses a two-layer awaitable system: the promise's
 
172 +
            // await_transform wraps awaitables in a transform_awaiter whose
 
173 +
            // standard await_suspend(coroutine_handle) calls this custom
 
174 +
            // 3-argument overload, passing the executor and stop_token from
 
175 +
            // the coroutine's context. For synchronous test awaitables like
 
176 +
            // this one, the coroutine never suspends, so this is not invoked.
 
177 +
            // The signature exists to allow the same awaitable type to work
 
178 +
            // with both synchronous (test) and asynchronous (real I/O) code.
168  
            void await_suspend(
179  
            void await_suspend(
169  
                coro,
180  
                coro,
170  
                executor_ref,
181  
                executor_ref,
171  
                std::stop_token) const noexcept
182  
                std::stop_token) const noexcept
172  
            {
183  
            {
173  
            }
184  
            }
174  

185  

175  
            io_result<>
186  
            io_result<>
176  
            await_resume()
187  
            await_resume()
177  
            {
188  
            {
178  
                auto ec = self_->f_->maybe_fail();
189  
                auto ec = self_->f_->maybe_fail();
179  
                if(ec)
190  
                if(ec)
180  
                    return {ec};
191  
                    return {ec};
181  

192  

182  
                std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
193  
                std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
183  
                self_->data_.append(self_->prepare_buf_.data(), to_commit);
194  
                self_->data_.append(self_->prepare_buf_.data(), to_commit);
184  
                self_->prepare_size_ = 0;
195  
                self_->prepare_size_ = 0;
185  

196  

186  
                return {};
197  
                return {};
187  
            }
198  
            }
188  
        };
199  
        };
189  
        return awaitable{this, n};
200  
        return awaitable{this, n};
190  
    }
201  
    }
191  

202  

192  
    /** Commit bytes written with optional end-of-stream.
203  
    /** Commit bytes written with optional end-of-stream.
193  

204  

194  
        Transfers `n` bytes from the prepared buffer to the internal
205  
        Transfers `n` bytes from the prepared buffer to the internal
195  
        data buffer. If `eof` is true, marks the sink as finalized.
206  
        data buffer. If `eof` is true, marks the sink as finalized.
196  

207  

197  
        @param n The number of bytes to commit.
208  
        @param n The number of bytes to commit.
198  
        @param eof If true, signals end-of-stream after committing.
209  
        @param eof If true, signals end-of-stream after committing.
199  

210  

200  
        @return An awaitable yielding `(error_code)`.
211  
        @return An awaitable yielding `(error_code)`.
201  

212  

202  
        @see fuse
213  
        @see fuse
203  
    */
214  
    */
204  
    auto
215  
    auto
205  
    commit(std::size_t n, bool eof)
216  
    commit(std::size_t n, bool eof)
206  
    {
217  
    {
207  
        struct awaitable
218  
        struct awaitable
208  
        {
219  
        {
209  
            buffer_sink* self_;
220  
            buffer_sink* self_;
210  
            std::size_t n_;
221  
            std::size_t n_;
211  
            bool eof_;
222  
            bool eof_;
212  

223  

213  
            bool await_ready() const noexcept { return true; }
224  
            bool await_ready() const noexcept { return true; }
214  

225  

 
226 +
            // This method is required to satisfy Capy's IoAwaitable concept,
 
227 +
            // but is never called because await_ready() returns true.
 
228 +
            // See the comment on commit(std::size_t) for a detailed explanation.
215  
            void await_suspend(
229  
            void await_suspend(
216  
                coro,
230  
                coro,
217  
                executor_ref,
231  
                executor_ref,
218  
                std::stop_token) const noexcept
232  
                std::stop_token) const noexcept
219  
            {
233  
            {
220  
            }
234  
            }
221  

235  

222  
            io_result<>
236  
            io_result<>
223  
            await_resume()
237  
            await_resume()
224  
            {
238  
            {
225  
                auto ec = self_->f_->maybe_fail();
239  
                auto ec = self_->f_->maybe_fail();
226  
                if(ec)
240  
                if(ec)
227  
                    return {ec};
241  
                    return {ec};
228  

242  

229  
                std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
243  
                std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
230  
                self_->data_.append(self_->prepare_buf_.data(), to_commit);
244  
                self_->data_.append(self_->prepare_buf_.data(), to_commit);
231  
                self_->prepare_size_ = 0;
245  
                self_->prepare_size_ = 0;
232  

246  

233  
                if(eof_)
247  
                if(eof_)
234  
                    self_->eof_called_ = true;
248  
                    self_->eof_called_ = true;
235  

249  

236  
                return {};
250  
                return {};
237  
            }
251  
            }
238  
        };
252  
        };
239  
        return awaitable{this, n, eof};
253  
        return awaitable{this, n, eof};
240  
    }
254  
    }
241  

255  

242  
    /** Signal end-of-stream.
256  
    /** Signal end-of-stream.
243  

257  

244  
        Marks the sink as finalized, indicating no more data will be
258  
        Marks the sink as finalized, indicating no more data will be
245  
        written. Before signaling, the attached @ref fuse is consulted
259  
        written. Before signaling, the attached @ref fuse is consulted
246  
        to possibly inject an error for testing fault scenarios.
260  
        to possibly inject an error for testing fault scenarios.
247  

261  

248  
        @return An awaitable yielding `(error_code)`.
262  
        @return An awaitable yielding `(error_code)`.
249  

263  

250  
        @see fuse
264  
        @see fuse
251  
    */
265  
    */
252  
    auto
266  
    auto
253  
    commit_eof()
267  
    commit_eof()
254  
    {
268  
    {
255  
        struct awaitable
269  
        struct awaitable
256  
        {
270  
        {
257  
            buffer_sink* self_;
271  
            buffer_sink* self_;
258  

272  

259  
            bool await_ready() const noexcept { return true; }
273  
            bool await_ready() const noexcept { return true; }
260  

274  

 
275 +
            // This method is required to satisfy Capy's IoAwaitable concept,
 
276 +
            // but is never called because await_ready() returns true.
 
277 +
            // See the comment on commit(std::size_t) for a detailed explanation.
261  
            void await_suspend(
278  
            void await_suspend(
262  
                coro,
279  
                coro,
263  
                executor_ref,
280  
                executor_ref,
264  
                std::stop_token) const noexcept
281  
                std::stop_token) const noexcept
265  
            {
282  
            {
266  
            }
283  
            }
267  

284  

268  
            io_result<>
285  
            io_result<>
269  
            await_resume()
286  
            await_resume()
270  
            {
287  
            {
271  
                auto ec = self_->f_->maybe_fail();
288  
                auto ec = self_->f_->maybe_fail();
272  
                if(ec)
289  
                if(ec)
273  
                    return {ec};
290  
                    return {ec};
274  

291  

275  
                self_->eof_called_ = true;
292  
                self_->eof_called_ = true;
276  
                return {};
293  
                return {};
277  
            }
294  
            }
278  
        };
295  
        };
279  
        return awaitable{this};
296  
        return awaitable{this};
280  
    }
297  
    }
281  
};
298  
};
282  

299  

283  
} // test
300  
} // test
284  
} // capy
301  
} // capy
285  
} // boost
302  
} // boost
286  

303  

287  
#endif
304  
#endif