Stream Pipeline

Data transformation through a pipeline of sources and sinks.

What You Will Learn

  • Building processing pipelines

  • Using BufferSource and BufferSink concepts

  • Chaining transformations

Prerequisites

Source Code

//
// Stream Pipeline Example
//
// This example demonstrates chaining buffer sources to create a data
// processing pipeline. Data flows through transform stages:
//
//   input -> uppercase_transform -> line_numbering_transform -> output
//
// Each transform is a BufferSource that wraps an upstream any_buffer_source,
// enabling type-erased composition of arbitrary transform chains.
//

#include <boost/capy.hpp>
#include <boost/capy/test/run_blocking.hpp>
#include <boost/capy/test/buffer_source.hpp>
#include <boost/capy/test/write_sink.hpp>
#include <boost/capy/io/any_buffer_source.hpp>
#include <boost/capy/io/any_write_sink.hpp>
#include <iostream>
#include <algorithm>
#include <cctype>
#include <system_error>

using namespace boost::capy;

// A transform stage that converts to uppercase
class uppercase_transform
{
    any_buffer_source* source_;  // any_buffer_source*
    std::vector<char> buffer_;   // std::vector<char>
    std::size_t consumed_ = 0;   // std::size_t
    bool exhausted_ = false;     // bool

public:
    explicit uppercase_transform(any_buffer_source& source)
        : source_(&source)
    {
    }

    // BufferSource::consume - advance past processed bytes
    void consume(std::size_t n) noexcept
    {
        consumed_ += n;
        if (consumed_ >= buffer_.size())
        {
            buffer_.clear();
            consumed_ = 0;
        }
    }

    // BufferSource::pull - returns task<> to enable co_await on upstream
    task<io_result<std::span<const_buffer>>>
    pull(std::span<const_buffer> dest)
    {
        // Already have unconsumed data?
        if (consumed_ < buffer_.size())
        {
            if (dest.empty())
                co_return {std::error_code{}, std::span<const_buffer>{}};

            dest[0] = const_buffer(
                buffer_.data() + consumed_,
                buffer_.size() - consumed_);
            co_return {std::error_code{}, dest.first(1)};
        }

        // Upstream exhausted?
        if (exhausted_)
            co_return {std::error_code{}, std::span<const_buffer>{}};

        // Pull from upstream
        buffer_.clear();
        consumed_ = 0;

        const_buffer upstream[8];  // const_buffer[8]
        // ec: std::error_code, bufs: std::span<const_buffer>
        auto [ec, bufs] = co_await source_->pull(upstream);

        if (ec)
            co_return {ec, std::span<const_buffer>{}};

        if (bufs.empty())
        {
            exhausted_ = true;
            co_return {std::error_code{}, std::span<const_buffer>{}};
        }

        // Transform: uppercase each byte
        for (auto const& buf : bufs)  // const_buffer const&
        {
            auto const* data = static_cast<char const*>(buf.data());  // char const*
            auto size = buf.size();  // std::size_t

            for (std::size_t i = 0; i < size; ++i)
            {
                buffer_.push_back(static_cast<char>(
                    std::toupper(static_cast<unsigned char>(data[i]))));
            }
        }

        // Consume from upstream
        source_->consume(buffer_size(bufs));

        // Return transformed data
        if (dest.empty() || buffer_.empty())
            co_return {std::error_code{}, std::span<const_buffer>{}};

        dest[0] = const_buffer(buffer_.data(), buffer_.size());
        co_return {std::error_code{}, dest.first(1)};
    }
};

// A transform that adds line numbers
class line_numbering_transform
{
    any_buffer_source* source_;  // any_buffer_source*
    std::string buffer_;         // std::string
    std::size_t consumed_ = 0;   // std::size_t
    std::size_t line_num_ = 1;   // std::size_t
    bool at_line_start_ = true;  // bool
    bool exhausted_ = false;     // bool

public:
    explicit line_numbering_transform(any_buffer_source& source)
        : source_(&source)
    {
    }

    void consume(std::size_t n) noexcept
    {
        consumed_ += n;
        if (consumed_ >= buffer_.size())
        {
            buffer_.clear();
            consumed_ = 0;
        }
    }

    task<io_result<std::span<const_buffer>>>
    pull(std::span<const_buffer> dest)
    {
        if (consumed_ < buffer_.size())
        {
            if (dest.empty())
                co_return {std::error_code{}, std::span<const_buffer>{}};

            dest[0] = const_buffer(
                buffer_.data() + consumed_,
                buffer_.size() - consumed_);
            co_return {std::error_code{}, dest.first(1)};
        }

        if (exhausted_)
            co_return {std::error_code{}, std::span<const_buffer>{}};

        buffer_.clear();
        consumed_ = 0;

        const_buffer upstream[8];  // const_buffer[8]
        // ec: std::error_code, bufs: std::span<const_buffer>
        auto [ec, bufs] = co_await source_->pull(upstream);

        if (ec)
            co_return {ec, std::span<const_buffer>{}};

        if (bufs.empty())
        {
            exhausted_ = true;
            co_return {std::error_code{}, std::span<const_buffer>{}};
        }

        // Transform: add line numbers
        for (auto const& buf : bufs)  // const_buffer const&
        {
            auto const* data = static_cast<char const*>(buf.data());  // char const*
            auto size = buf.size();  // std::size_t

            for (std::size_t i = 0; i < size; ++i)
            {
                if (at_line_start_)
                {
                    buffer_ += std::to_string(line_num_++) + ": ";
                    at_line_start_ = false;
                }
                buffer_ += data[i];
                if (data[i] == '\n')
                    at_line_start_ = true;
            }
        }

        source_->consume(buffer_size(bufs));

        if (dest.empty() || buffer_.empty())
            co_return {std::error_code{}, std::span<const_buffer>{}};

        dest[0] = const_buffer(buffer_.data(), buffer_.size());
        co_return {std::error_code{}, dest.first(1)};
    }
};

// Transfer from source to sink
task<std::size_t> transfer(any_buffer_source& source, any_write_sink& sink)
{
    std::size_t total = 0;  // std::size_t
    const_buffer bufs[8];   // const_buffer[8]

    for (;;)
    {
        // ec: std::error_code, spans: std::span<const_buffer>
        auto [ec, spans] = co_await source.pull(bufs);

        if (ec)
            throw std::system_error(ec);

        if (spans.empty())
            break;

        for (auto const& buf : spans)  // const_buffer const&
        {
            // wec: std::error_code, n: std::size_t
            auto [wec, n] = co_await sink.write(buf);
            if (wec)
                throw std::system_error(wec);
            total += n;
        }

        source.consume(buffer_size(spans));
    }

    io_result<> eof_result = co_await sink.write_eof();
    if (eof_result.ec)
        throw std::system_error(eof_result.ec);

    co_return total;
}

void demo_pipeline()
{
    std::cout << "=== Stream Pipeline Demo ===\n\n";

    // Input data
    std::string input = "hello world\nthis is a test\nof the pipeline\n";
    std::cout << "Input:\n" << input << "\n";

    // Create mock source with input data
    test::fuse f;  // test::fuse
    test::buffer_source source(f);  // test::buffer_source
    source.provide(input);

    // Build the pipeline using type-erased buffer sources.
    // Using pointer construction (&source) for reference semantics -
    // the wrapper does not take ownership, so source must outlive src.
    any_buffer_source src{&source};  // any_buffer_source

    uppercase_transform upper{src};  // uppercase_transform
    any_buffer_source upper_src{&upper};  // any_buffer_source

    line_numbering_transform numbered{upper_src};  // line_numbering_transform
    any_buffer_source numbered_src{&numbered};  // any_buffer_source

    // Create sink - pointer construction ensures sink outlives dst
    test::write_sink sink(f);  // test::write_sink
    any_write_sink dst{&sink};  // any_write_sink

    // Run pipeline
    std::size_t bytes = 0;  // std::size_t
    test::run_blocking([&](std::size_t n) { bytes = n; })(
        transfer(numbered_src, dst));

    std::cout << "Output (" << bytes << " bytes):\n";
    std::cout << sink.data() << "\n";
}

int main()
{
    try
    {
        demo_pipeline();
    }
    catch (std::system_error const& e)
    {
        std::cerr << "Pipeline error: " << e.what() << "\n";
        return 1;
    }
    return 0;
}

Build

add_executable(stream_pipeline stream_pipeline.cpp)
target_link_libraries(stream_pipeline PRIVATE capy)

Walkthrough

Pipeline Structure

Source → Uppercase → LineNumbering → Sink

Data flows through the pipeline:

  1. Source provides raw input

  2. Uppercase transforms to uppercase

  3. LineNumbering adds line numbers

  4. Sink collects output

BufferSource Implementation

task<io_result<std::span<const_buffer>>>
pull(std::span<const_buffer> dest)
{
    // Pull from upstream
    // ec: std::error_code, bufs: std::span<const_buffer>
    auto [ec, bufs] = co_await source_->pull(upstream);

    // Transform data...

    // Consume from upstream
    source_->consume(buffer_size(bufs));

    // Return transformed buffer
    dest[0] = const_buffer(buffer_.data(), buffer_.size());
    co_return {std::error_code{}, dest.first(1)};
}

Each stage:

  1. Pulls buffers from upstream using co_await

  2. Transforms the data

  3. Calls consume() on upstream to indicate bytes processed

  4. Returns transformed buffers

Type Erasure with Pointer Construction

// Using pointer construction (&source) for reference semantics
any_buffer_source src{&source};  // any_buffer_source

uppercase_transform upper{src};  // uppercase_transform
any_buffer_source upper_src{&upper};  // any_buffer_source

any_buffer_source wraps each stage using pointer construction, allowing uniform composition while preserving the lifetime of the underlying objects.

Output

=== Stream Pipeline Demo ===

Input:
hello world
this is a test
of the pipeline

Output (52 bytes):
1: HELLO WORLD
2: THIS IS A TEST
3: OF THE PIPELINE

Exercises

  1. Add a compression/decompression stage

  2. Implement a ROT13 transform

  3. Create a filtering stage that drops lines matching a pattern

Summary

This example catalog demonstrated:

  • Basic task creation and launching

  • Coroutine synchronization with events

  • Buffer composition for scatter/gather I/O

  • Unit testing with mock streams

  • Compilation firewalls with type erasure

  • Cooperative cancellation with stop tokens

  • Concurrent execution with when_all

  • Custom buffer implementations

  • Real network I/O with Corosio

  • Data transformation pipelines

These patterns form the foundation for building robust, efficient I/O applications with Capy.