Stream Pipeline
Data transformation through a pipeline of sources and sinks.
What You Will Learn
-
Building processing pipelines
-
Using
BufferSourceandBufferSinkconcepts -
Chaining transformations
Prerequisites
-
Completed Echo Server with Corosio
-
Understanding of buffer sources/sinks from Buffer Concepts
Source Code
//
// Stream Pipeline Example
//
// This example demonstrates chaining buffer sources to create a data
// processing pipeline. Data flows through transform stages:
//
// input -> uppercase_transform -> line_numbering_transform -> output
//
// Each transform is a BufferSource that wraps an upstream any_buffer_source,
// enabling type-erased composition of arbitrary transform chains.
//
#include <boost/capy.hpp>
#include <boost/capy/test/run_blocking.hpp>
#include <boost/capy/test/buffer_source.hpp>
#include <boost/capy/test/write_sink.hpp>
#include <boost/capy/io/any_buffer_source.hpp>
#include <boost/capy/io/any_write_sink.hpp>
#include <iostream>
#include <algorithm>
#include <cctype>
#include <system_error>
using namespace boost::capy;
// A transform stage that converts to uppercase
class uppercase_transform
{
any_buffer_source* source_; // any_buffer_source*
std::vector<char> buffer_; // std::vector<char>
std::size_t consumed_ = 0; // std::size_t
bool exhausted_ = false; // bool
public:
explicit uppercase_transform(any_buffer_source& source)
: source_(&source)
{
}
// BufferSource::consume - advance past processed bytes
void consume(std::size_t n) noexcept
{
consumed_ += n;
if (consumed_ >= buffer_.size())
{
buffer_.clear();
consumed_ = 0;
}
}
// BufferSource::pull - returns task<> to enable co_await on upstream
task<io_result<std::span<const_buffer>>>
pull(std::span<const_buffer> dest)
{
// Already have unconsumed data?
if (consumed_ < buffer_.size())
{
if (dest.empty())
co_return {std::error_code{}, std::span<const_buffer>{}};
dest[0] = const_buffer(
buffer_.data() + consumed_,
buffer_.size() - consumed_);
co_return {std::error_code{}, dest.first(1)};
}
// Upstream exhausted?
if (exhausted_)
co_return {std::error_code{}, std::span<const_buffer>{}};
// Pull from upstream
buffer_.clear();
consumed_ = 0;
const_buffer upstream[8]; // const_buffer[8]
// ec: std::error_code, bufs: std::span<const_buffer>
auto [ec, bufs] = co_await source_->pull(upstream);
if (ec)
co_return {ec, std::span<const_buffer>{}};
if (bufs.empty())
{
exhausted_ = true;
co_return {std::error_code{}, std::span<const_buffer>{}};
}
// Transform: uppercase each byte
for (auto const& buf : bufs) // const_buffer const&
{
auto const* data = static_cast<char const*>(buf.data()); // char const*
auto size = buf.size(); // std::size_t
for (std::size_t i = 0; i < size; ++i)
{
buffer_.push_back(static_cast<char>(
std::toupper(static_cast<unsigned char>(data[i]))));
}
}
// Consume from upstream
source_->consume(buffer_size(bufs));
// Return transformed data
if (dest.empty() || buffer_.empty())
co_return {std::error_code{}, std::span<const_buffer>{}};
dest[0] = const_buffer(buffer_.data(), buffer_.size());
co_return {std::error_code{}, dest.first(1)};
}
};
// A transform that adds line numbers
class line_numbering_transform
{
any_buffer_source* source_; // any_buffer_source*
std::string buffer_; // std::string
std::size_t consumed_ = 0; // std::size_t
std::size_t line_num_ = 1; // std::size_t
bool at_line_start_ = true; // bool
bool exhausted_ = false; // bool
public:
explicit line_numbering_transform(any_buffer_source& source)
: source_(&source)
{
}
void consume(std::size_t n) noexcept
{
consumed_ += n;
if (consumed_ >= buffer_.size())
{
buffer_.clear();
consumed_ = 0;
}
}
task<io_result<std::span<const_buffer>>>
pull(std::span<const_buffer> dest)
{
if (consumed_ < buffer_.size())
{
if (dest.empty())
co_return {std::error_code{}, std::span<const_buffer>{}};
dest[0] = const_buffer(
buffer_.data() + consumed_,
buffer_.size() - consumed_);
co_return {std::error_code{}, dest.first(1)};
}
if (exhausted_)
co_return {std::error_code{}, std::span<const_buffer>{}};
buffer_.clear();
consumed_ = 0;
const_buffer upstream[8]; // const_buffer[8]
// ec: std::error_code, bufs: std::span<const_buffer>
auto [ec, bufs] = co_await source_->pull(upstream);
if (ec)
co_return {ec, std::span<const_buffer>{}};
if (bufs.empty())
{
exhausted_ = true;
co_return {std::error_code{}, std::span<const_buffer>{}};
}
// Transform: add line numbers
for (auto const& buf : bufs) // const_buffer const&
{
auto const* data = static_cast<char const*>(buf.data()); // char const*
auto size = buf.size(); // std::size_t
for (std::size_t i = 0; i < size; ++i)
{
if (at_line_start_)
{
buffer_ += std::to_string(line_num_++) + ": ";
at_line_start_ = false;
}
buffer_ += data[i];
if (data[i] == '\n')
at_line_start_ = true;
}
}
source_->consume(buffer_size(bufs));
if (dest.empty() || buffer_.empty())
co_return {std::error_code{}, std::span<const_buffer>{}};
dest[0] = const_buffer(buffer_.data(), buffer_.size());
co_return {std::error_code{}, dest.first(1)};
}
};
// Transfer from source to sink
task<std::size_t> transfer(any_buffer_source& source, any_write_sink& sink)
{
std::size_t total = 0; // std::size_t
const_buffer bufs[8]; // const_buffer[8]
for (;;)
{
// ec: std::error_code, spans: std::span<const_buffer>
auto [ec, spans] = co_await source.pull(bufs);
if (ec)
throw std::system_error(ec);
if (spans.empty())
break;
for (auto const& buf : spans) // const_buffer const&
{
// wec: std::error_code, n: std::size_t
auto [wec, n] = co_await sink.write(buf);
if (wec)
throw std::system_error(wec);
total += n;
}
source.consume(buffer_size(spans));
}
io_result<> eof_result = co_await sink.write_eof();
if (eof_result.ec)
throw std::system_error(eof_result.ec);
co_return total;
}
void demo_pipeline()
{
std::cout << "=== Stream Pipeline Demo ===\n\n";
// Input data
std::string input = "hello world\nthis is a test\nof the pipeline\n";
std::cout << "Input:\n" << input << "\n";
// Create mock source with input data
test::fuse f; // test::fuse
test::buffer_source source(f); // test::buffer_source
source.provide(input);
// Build the pipeline using type-erased buffer sources.
// Using pointer construction (&source) for reference semantics -
// the wrapper does not take ownership, so source must outlive src.
any_buffer_source src{&source}; // any_buffer_source
uppercase_transform upper{src}; // uppercase_transform
any_buffer_source upper_src{&upper}; // any_buffer_source
line_numbering_transform numbered{upper_src}; // line_numbering_transform
any_buffer_source numbered_src{&numbered}; // any_buffer_source
// Create sink - pointer construction ensures sink outlives dst
test::write_sink sink(f); // test::write_sink
any_write_sink dst{&sink}; // any_write_sink
// Run pipeline
std::size_t bytes = 0; // std::size_t
test::run_blocking([&](std::size_t n) { bytes = n; })(
transfer(numbered_src, dst));
std::cout << "Output (" << bytes << " bytes):\n";
std::cout << sink.data() << "\n";
}
int main()
{
try
{
demo_pipeline();
}
catch (std::system_error const& e)
{
std::cerr << "Pipeline error: " << e.what() << "\n";
return 1;
}
return 0;
}
Build
add_executable(stream_pipeline stream_pipeline.cpp)
target_link_libraries(stream_pipeline PRIVATE capy)
Walkthrough
Pipeline Structure
Source → Uppercase → LineNumbering → Sink
Data flows through the pipeline:
-
Source provides raw input
-
Uppercase transforms to uppercase
-
LineNumbering adds line numbers
-
Sink collects output
BufferSource Implementation
task<io_result<std::span<const_buffer>>>
pull(std::span<const_buffer> dest)
{
// Pull from upstream
// ec: std::error_code, bufs: std::span<const_buffer>
auto [ec, bufs] = co_await source_->pull(upstream);
// Transform data...
// Consume from upstream
source_->consume(buffer_size(bufs));
// Return transformed buffer
dest[0] = const_buffer(buffer_.data(), buffer_.size());
co_return {std::error_code{}, dest.first(1)};
}
Each stage:
-
Pulls buffers from upstream using
co_await -
Transforms the data
-
Calls
consume()on upstream to indicate bytes processed -
Returns transformed buffers
Type Erasure with Pointer Construction
// Using pointer construction (&source) for reference semantics
any_buffer_source src{&source}; // any_buffer_source
uppercase_transform upper{src}; // uppercase_transform
any_buffer_source upper_src{&upper}; // any_buffer_source
any_buffer_source wraps each stage using pointer construction, allowing uniform composition while preserving the lifetime of the underlying objects.
Output
=== Stream Pipeline Demo ===
Input:
hello world
this is a test
of the pipeline
Output (52 bytes):
1: HELLO WORLD
2: THIS IS A TEST
3: OF THE PIPELINE
Exercises
-
Add a compression/decompression stage
-
Implement a ROT13 transform
-
Create a filtering stage that drops lines matching a pattern
Summary
This example catalog demonstrated:
-
Basic task creation and launching
-
Coroutine synchronization with events
-
Buffer composition for scatter/gather I/O
-
Unit testing with mock streams
-
Compilation firewalls with type erasure
-
Cooperative cancellation with stop tokens
-
Concurrent execution with
when_all -
Custom buffer implementations
-
Real network I/O with Corosio
-
Data transformation pipelines
These patterns form the foundation for building robust, efficient I/O applications with Capy.