Composed Operations

Learn how to write reusable asynchronous operations using coroutines.

What is a Composed Operation?

A composed operation is an async operation built from other async operations. Examples in Asio include:

  • async_read — Built from async_read_some

  • async_write — Built from async_write_some

  • async_connect (with range) — Built from async_connect calls

You can write your own composed operations for reusable async logic.

Coroutine-Based Composition

The simplest approach: return awaitable<T>:

#include <boost/asio.hpp>

namespace asio = boost::asio;
using asio::awaitable;
using asio::use_awaitable;
using asio::ip::tcp;

// A composed operation that reads a length-prefixed message
awaitable<std::string> async_read_message(tcp::socket& socket)
{
    // Read 4-byte length header
    uint32_t length;
    co_await asio::async_read(
        socket,
        asio::buffer(&length, sizeof(length)),
        use_awaitable);

    length = ntohl(length);  // Network to host byte order

    // Read the message body
    std::string message(length, '\0');
    co_await asio::async_read(
        socket,
        asio::buffer(message),
        use_awaitable);

    co_return message;
}

// Usage
awaitable<void> client(tcp::socket& socket)
{
    std::string msg = co_await async_read_message(socket);
    std::cout << "Received: " << msg << "\n";
}

Writing a Message

awaitable<void> async_write_message(tcp::socket& socket, std::string_view message)
{
    // Write length header
    uint32_t length = htonl(static_cast<uint32_t>(message.size()));
    co_await asio::async_write(
        socket,
        asio::buffer(&length, sizeof(length)),
        use_awaitable);

    // Write message body
    co_await asio::async_write(
        socket,
        asio::buffer(message),
        use_awaitable);
}

Operation with Timeout

A composed operation that races against a timer:

#include <boost/asio/experimental/awaitable_operators.hpp>

using namespace asio::experimental::awaitable_operators;

template<typename T>
awaitable<T> with_timeout(
    awaitable<T> operation,
    std::chrono::steady_clock::duration timeout)
{
    auto executor = co_await asio::this_coro::executor;
    asio::steady_timer timer(executor, timeout);

    auto result = co_await (
        std::move(operation) || timer.async_wait(use_awaitable)
    );

    if (result.index() == 1)
        throw std::runtime_error("Operation timed out");

    co_return std::get<0>(std::move(result));
}

// Usage
awaitable<void> example(tcp::socket& socket)
{
    using namespace std::chrono_literals;

    auto msg = co_await with_timeout(
        async_read_message(socket),
        5s);
}

Retry Pattern

A composed operation that retries on failure:

template<typename Func>
awaitable<std::invoke_result_t<Func>> with_retry(
    Func&& func,
    int max_attempts,
    std::chrono::milliseconds delay)
{
    auto executor = co_await asio::this_coro::executor;

    for (int attempt = 1; attempt <= max_attempts; ++attempt)
    {
        try
        {
            co_return co_await func();
        }
        catch (const boost::system::system_error& e)
        {
            if (attempt == max_attempts)
                throw;

            std::cout << "Attempt " << attempt << " failed: "
                      << e.what() << ", retrying...\n";

            asio::steady_timer timer(executor, delay);
            co_await timer.async_wait(use_awaitable);
        }
    }

    throw std::logic_error("Unreachable");
}

// Usage
awaitable<void> connect_with_retry(tcp::socket& socket, tcp::endpoint ep)
{
    co_await with_retry(
        [&]() -> awaitable<void> {
            co_await socket.async_connect(ep, use_awaitable);
        },
        3,  // max attempts
        std::chrono::milliseconds(1000));  // delay between attempts
}

Parallel Operations

Run multiple operations concurrently:

#include <boost/asio/experimental/awaitable_operators.hpp>

using namespace asio::experimental::awaitable_operators;

// Wait for both to complete
awaitable<void> parallel_both()
{
    auto executor = co_await asio::this_coro::executor;

    tcp::socket socket1(executor);
    tcp::socket socket2(executor);

    // Connect to both servers in parallel
    co_await (
        socket1.async_connect(endpoint1, use_awaitable)
        && socket2.async_connect(endpoint2, use_awaitable)
    );

    // Both are now connected
}

// Wait for first to complete
awaitable<void> parallel_race()
{
    auto executor = co_await asio::this_coro::executor;

    tcp::socket socket1(executor);
    tcp::socket socket2(executor);

    // Connect to whichever responds first
    auto result = co_await (
        socket1.async_connect(endpoint1, use_awaitable)
        || socket2.async_connect(endpoint2, use_awaitable)
    );

    if (result.index() == 0)
        std::cout << "socket1 connected first\n";
    else
        std::cout << "socket2 connected first\n";
}

Generic Stream Operations

Make operations work with any async stream:

template<typename AsyncStream>
awaitable<std::string> async_read_line(AsyncStream& stream)
{
    std::string data;

    std::size_t n = co_await asio::async_read_until(
        stream,
        asio::dynamic_buffer(data),
        '\n',
        use_awaitable);

    std::string line = data.substr(0, n - 1);  // Remove \n
    co_return line;
}

// Works with tcp::socket
awaitable<void> use_with_socket(tcp::socket& socket)
{
    auto line = co_await async_read_line(socket);
}

// Works with ssl::stream
awaitable<void> use_with_ssl(asio::ssl::stream<tcp::socket>& stream)
{
    auto line = co_await async_read_line(stream);
}

Error Handling in Composed Operations

Propagate errors naturally with exceptions:

awaitable<std::string> safe_read_message(tcp::socket& socket)
{
    try
    {
        co_return co_await async_read_message(socket);
    }
    catch (const boost::system::system_error& e)
    {
        if (e.code() == asio::error::eof)
            co_return "";  // Return empty on EOF
        throw;  // Re-throw other errors
    }
}

Or use error codes:

awaitable<std::pair<boost::system::error_code, std::string>>
async_read_message_ec(tcp::socket& socket)
{
    uint32_t length;

    auto [ec1, n1] = co_await asio::async_read(
        socket,
        asio::buffer(&length, sizeof(length)),
        asio::experimental::as_tuple(use_awaitable));

    if (ec1)
        co_return {ec1, ""};

    length = ntohl(length);
    std::string message(length, '\0');

    auto [ec2, n2] = co_await asio::async_read(
        socket,
        asio::buffer(message),
        asio::experimental::as_tuple(use_awaitable));

    co_return {ec2, std::move(message)};
}

Best Practices

  1. Return awaitable<T> — Makes the operation composable with other coroutines

  2. Accept streams by reference — Don’t take ownership unless necessary

  3. Use use_awaitable — Consistent with the rest of the coroutine

  4. Handle cancellation — Operations should handle operation_aborted

  5. Document lifetimes — Be clear about what must remain valid

Next Steps