Producer-Consumer

Two tasks communicating via an async event, with strand serialization.

What You Will Learn

  • Using async_event for coroutine synchronization

  • Running multiple concurrent tasks with when_all

  • Using strand to serialize access to shared state

  • Task-to-task communication patterns

Prerequisites

  • Completed Hello Task

  • Understanding of basic task creation and launching

Source Code

#include <boost/capy.hpp>
#include <boost/capy/ex/strand.hpp>
#include <iostream>
#include <latch>

using namespace boost::capy;

int main()
{
    thread_pool pool;  // thread_pool
    strand s{pool.get_executor()};  // strand - serializes execution
    std::latch done(1);  // std::latch - wait for completion

    auto on_complete = [&done](auto&&...) { done.count_down(); };  // lambda
    auto on_error = [&done](std::exception_ptr) { done.count_down(); };  // lambda

    async_event data_ready;  // async_event
    int shared_value = 0;    // int

    auto producer = [&]() -> task<> {
        std::cout << "Producer: preparing data...\n";
        shared_value = 42;
        std::cout << "Producer: data ready, signaling\n";
        data_ready.set();
        co_return;
    };

    auto consumer = [&]() -> task<> {
        std::cout << "Consumer: waiting for data...\n";
        co_await data_ready.wait();
        std::cout << "Consumer: received value " << shared_value << "\n";
        co_return;
    };

    // Run both tasks concurrently using when_all, through a strand.
    // The strand serializes execution, ensuring thread-safe access
    // to the shared async_event and shared_value.
    auto run_both = [&]() -> task<> {
        co_await when_all(producer(), consumer());
    };

    run_async(s, on_complete, on_error)(run_both());

    done.wait();  // Block until tasks complete
    return 0;
}

Build

add_executable(producer_consumer producer_consumer.cpp)
target_link_libraries(producer_consumer PRIVATE capy)

Walkthrough

The Strand

strand s{pool.get_executor()};  // strand - serializes execution

A strand is an executor adaptor that serializes execution. All coroutines dispatched through a strand are guaranteed not to run concurrently, making it safe to access shared state without explicit locking. Note that async_event is not thread-safe, so using a strand ensures safe access.

The Event

async_event data_ready;  // async_event

async_event is a one-shot signaling mechanism. One task can set() it; other tasks can wait() for it. When set, all waiting tasks resume.

Producer

auto producer = [&]() -> task<> {
    std::cout << "Producer: preparing data...\n";
    shared_value = 42;
    std::cout << "Producer: data ready, signaling\n";
    data_ready.set();
    co_return;
};

The producer prepares data and signals completion by calling set().

Consumer

auto consumer = [&]() -> task<> {
    std::cout << "Consumer: waiting for data...\n";
    co_await data_ready.wait();
    std::cout << "Consumer: received value " << shared_value << "\n";
    co_return;
};

The consumer waits until the event is set. The co_await data_ready.wait() suspends until set() is called.

Running Both

// Run both tasks concurrently using when_all, through a strand.
// The strand serializes execution, ensuring thread-safe access
// to the shared async_event and shared_value.
auto run_both = [&]() -> task<> {
    co_await when_all(producer(), consumer());
};

run_async(s, on_complete, on_error)(run_both());

when_all runs both tasks concurrently within the same parent coroutine context, but the strand ensures they don’t run at the same time on different threads. The producer signals data_ready when the value is set, and the consumer waits for the signal before reading.

Completion Synchronization

std::latch done(1);  // std::latch - wait for completion
auto on_complete = [&done](auto&&...) { done.count_down(); };
auto on_error = [&done](std::exception_ptr) { done.count_down(); };
// ...
done.wait();  // Block until tasks complete

The std::latch ensures main() waits for the tasks to complete before returning.

Output

Producer: preparing data...
Producer: data ready, signaling
Consumer: waiting for data...
Consumer: received value 42

(Output order may vary due to concurrent execution)

Exercises

  1. Add multiple consumers that all wait for the same event

  2. Create a producer that sets the event multiple times (use a loop with a new event each iteration)

  3. Add error handling—what happens if the producer throws?

Next Steps