Producer-Consumer
Two tasks communicating via an async event, with strand serialization.
What You Will Learn
-
Using
async_eventfor coroutine synchronization -
Running multiple concurrent tasks with
when_all -
Using
strandto serialize access to shared state -
Task-to-task communication patterns
Prerequisites
-
Completed Hello Task
-
Understanding of basic task creation and launching
Source Code
#include <boost/capy.hpp>
#include <boost/capy/ex/strand.hpp>
#include <iostream>
#include <latch>
using namespace boost::capy;
int main()
{
thread_pool pool; // thread_pool
strand s{pool.get_executor()}; // strand - serializes execution
std::latch done(1); // std::latch - wait for completion
auto on_complete = [&done](auto&&...) { done.count_down(); }; // lambda
auto on_error = [&done](std::exception_ptr) { done.count_down(); }; // lambda
async_event data_ready; // async_event
int shared_value = 0; // int
auto producer = [&]() -> task<> {
std::cout << "Producer: preparing data...\n";
shared_value = 42;
std::cout << "Producer: data ready, signaling\n";
data_ready.set();
co_return;
};
auto consumer = [&]() -> task<> {
std::cout << "Consumer: waiting for data...\n";
co_await data_ready.wait();
std::cout << "Consumer: received value " << shared_value << "\n";
co_return;
};
// Run both tasks concurrently using when_all, through a strand.
// The strand serializes execution, ensuring thread-safe access
// to the shared async_event and shared_value.
auto run_both = [&]() -> task<> {
co_await when_all(producer(), consumer());
};
run_async(s, on_complete, on_error)(run_both());
done.wait(); // Block until tasks complete
return 0;
}
Build
add_executable(producer_consumer producer_consumer.cpp)
target_link_libraries(producer_consumer PRIVATE capy)
Walkthrough
The Strand
strand s{pool.get_executor()}; // strand - serializes execution
A strand is an executor adaptor that serializes execution. All coroutines dispatched through a strand are guaranteed not to run concurrently, making it safe to access shared state without explicit locking. Note that async_event is not thread-safe, so using a strand ensures safe access.
The Event
async_event data_ready; // async_event
async_event is a one-shot signaling mechanism. One task can set() it; other tasks can wait() for it. When set, all waiting tasks resume.
Producer
auto producer = [&]() -> task<> {
std::cout << "Producer: preparing data...\n";
shared_value = 42;
std::cout << "Producer: data ready, signaling\n";
data_ready.set();
co_return;
};
The producer prepares data and signals completion by calling set().
Consumer
auto consumer = [&]() -> task<> {
std::cout << "Consumer: waiting for data...\n";
co_await data_ready.wait();
std::cout << "Consumer: received value " << shared_value << "\n";
co_return;
};
The consumer waits until the event is set. The co_await data_ready.wait() suspends until set() is called.
Running Both
// Run both tasks concurrently using when_all, through a strand.
// The strand serializes execution, ensuring thread-safe access
// to the shared async_event and shared_value.
auto run_both = [&]() -> task<> {
co_await when_all(producer(), consumer());
};
run_async(s, on_complete, on_error)(run_both());
when_all runs both tasks concurrently within the same parent coroutine context, but the strand ensures they don’t run at the same time on different threads. The producer signals data_ready when the value is set, and the consumer waits for the signal before reading.
Completion Synchronization
std::latch done(1); // std::latch - wait for completion
auto on_complete = [&done](auto&&...) { done.count_down(); };
auto on_error = [&done](std::exception_ptr) { done.count_down(); };
// ...
done.wait(); // Block until tasks complete
The std::latch ensures main() waits for the tasks to complete before returning.
Output
Producer: preparing data...
Producer: data ready, signaling
Consumer: waiting for data...
Consumer: received value 42
(Output order may vary due to concurrent execution)
Exercises
-
Add multiple consumers that all wait for the same event
-
Create a producer that sets the event multiple times (use a loop with a new event each iteration)
-
Add error handling—what happens if the producer throws?
Next Steps
-
Buffer Composition — Zero-allocation buffer composition