Composed Operations
Learn how to write reusable asynchronous operations using coroutines.
What is a Composed Operation?
A composed operation is an async operation built from other async operations. Examples in Asio include:
-
async_read— Built fromasync_read_some -
async_write— Built fromasync_write_some -
async_connect(with range) — Built fromasync_connectcalls
You can write your own composed operations for reusable async logic.
Coroutine-Based Composition
The simplest approach: return awaitable<T>:
#include <boost/asio.hpp>
namespace asio = boost::asio;
using asio::awaitable;
using asio::use_awaitable;
using asio::ip::tcp;
// A composed operation that reads a length-prefixed message
awaitable<std::string> async_read_message(tcp::socket& socket)
{
// Read 4-byte length header
uint32_t length;
co_await asio::async_read(
socket,
asio::buffer(&length, sizeof(length)),
use_awaitable);
length = ntohl(length); // Network to host byte order
// Read the message body
std::string message(length, '\0');
co_await asio::async_read(
socket,
asio::buffer(message),
use_awaitable);
co_return message;
}
// Usage
awaitable<void> client(tcp::socket& socket)
{
std::string msg = co_await async_read_message(socket);
std::cout << "Received: " << msg << "\n";
}
Writing a Message
awaitable<void> async_write_message(tcp::socket& socket, std::string_view message)
{
// Write length header
uint32_t length = htonl(static_cast<uint32_t>(message.size()));
co_await asio::async_write(
socket,
asio::buffer(&length, sizeof(length)),
use_awaitable);
// Write message body
co_await asio::async_write(
socket,
asio::buffer(message),
use_awaitable);
}
Operation with Timeout
A composed operation that races against a timer:
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace asio::experimental::awaitable_operators;
template<typename T>
awaitable<T> with_timeout(
awaitable<T> operation,
std::chrono::steady_clock::duration timeout)
{
auto executor = co_await asio::this_coro::executor;
asio::steady_timer timer(executor, timeout);
auto result = co_await (
std::move(operation) || timer.async_wait(use_awaitable)
);
if (result.index() == 1)
throw std::runtime_error("Operation timed out");
co_return std::get<0>(std::move(result));
}
// Usage
awaitable<void> example(tcp::socket& socket)
{
using namespace std::chrono_literals;
auto msg = co_await with_timeout(
async_read_message(socket),
5s);
}
Retry Pattern
A composed operation that retries on failure:
template<typename Func>
awaitable<std::invoke_result_t<Func>> with_retry(
Func&& func,
int max_attempts,
std::chrono::milliseconds delay)
{
auto executor = co_await asio::this_coro::executor;
for (int attempt = 1; attempt <= max_attempts; ++attempt)
{
try
{
co_return co_await func();
}
catch (const boost::system::system_error& e)
{
if (attempt == max_attempts)
throw;
std::cout << "Attempt " << attempt << " failed: "
<< e.what() << ", retrying...\n";
asio::steady_timer timer(executor, delay);
co_await timer.async_wait(use_awaitable);
}
}
throw std::logic_error("Unreachable");
}
// Usage
awaitable<void> connect_with_retry(tcp::socket& socket, tcp::endpoint ep)
{
co_await with_retry(
[&]() -> awaitable<void> {
co_await socket.async_connect(ep, use_awaitable);
},
3, // max attempts
std::chrono::milliseconds(1000)); // delay between attempts
}
Parallel Operations
Run multiple operations concurrently:
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace asio::experimental::awaitable_operators;
// Wait for both to complete
awaitable<void> parallel_both()
{
auto executor = co_await asio::this_coro::executor;
tcp::socket socket1(executor);
tcp::socket socket2(executor);
// Connect to both servers in parallel
co_await (
socket1.async_connect(endpoint1, use_awaitable)
&& socket2.async_connect(endpoint2, use_awaitable)
);
// Both are now connected
}
// Wait for first to complete
awaitable<void> parallel_race()
{
auto executor = co_await asio::this_coro::executor;
tcp::socket socket1(executor);
tcp::socket socket2(executor);
// Connect to whichever responds first
auto result = co_await (
socket1.async_connect(endpoint1, use_awaitable)
|| socket2.async_connect(endpoint2, use_awaitable)
);
if (result.index() == 0)
std::cout << "socket1 connected first\n";
else
std::cout << "socket2 connected first\n";
}
Generic Stream Operations
Make operations work with any async stream:
template<typename AsyncStream>
awaitable<std::string> async_read_line(AsyncStream& stream)
{
std::string data;
std::size_t n = co_await asio::async_read_until(
stream,
asio::dynamic_buffer(data),
'\n',
use_awaitable);
std::string line = data.substr(0, n - 1); // Remove \n
co_return line;
}
// Works with tcp::socket
awaitable<void> use_with_socket(tcp::socket& socket)
{
auto line = co_await async_read_line(socket);
}
// Works with ssl::stream
awaitable<void> use_with_ssl(asio::ssl::stream<tcp::socket>& stream)
{
auto line = co_await async_read_line(stream);
}
Error Handling in Composed Operations
Propagate errors naturally with exceptions:
awaitable<std::string> safe_read_message(tcp::socket& socket)
{
try
{
co_return co_await async_read_message(socket);
}
catch (const boost::system::system_error& e)
{
if (e.code() == asio::error::eof)
co_return ""; // Return empty on EOF
throw; // Re-throw other errors
}
}
Or use error codes:
awaitable<std::pair<boost::system::error_code, std::string>>
async_read_message_ec(tcp::socket& socket)
{
uint32_t length;
auto [ec1, n1] = co_await asio::async_read(
socket,
asio::buffer(&length, sizeof(length)),
asio::experimental::as_tuple(use_awaitable));
if (ec1)
co_return {ec1, ""};
length = ntohl(length);
std::string message(length, '\0');
auto [ec2, n2] = co_await asio::async_read(
socket,
asio::buffer(message),
asio::experimental::as_tuple(use_awaitable));
co_return {ec2, std::move(message)};
}
Best Practices
-
Return
awaitable<T>— Makes the operation composable with other coroutines -
Accept streams by reference — Don’t take ownership unless necessary
-
Use
use_awaitable— Consistent with the rest of the coroutine -
Handle cancellation — Operations should handle
operation_aborted -
Document lifetimes — Be clear about what must remain valid
Next Steps
-
Initiating Functions — Understand the async model
-
Executors — Control where operations run