c++

第十三章:使用协程进行异步编程

Posted by lili on

目录

上一章实现的生成器 (Generator) 类帮助我们使用协程来构建惰性求值的序列。C++ 协程也可以用于异步编程,方法是让一个协程表示一个异步计算或一个异步任务。尽管异步编程是 C++ 引入协程的最重要驱动力,但标准库中并没有对基于协程的异步任务提供支持。

如果您想使用协程进行异步编程,我建议您查找并使用一个能够补充 C++20 协程的库。我已经推荐了 CppCoro (https://github.com/lewissbaker/cppcoro),它在撰写本书时看来是最有前途的选择。正如您将在本章后面看到的那样,也可以使用成熟的 Boost.Asio 库进行异步协程编程。

本章将展示使用协程进行异步编程是可行的,并且有可用的库来补充 C++20 协程。更具体地说,我们将重点关注:

  • co_await 关键字和 Awaitable 类型
  • 一个基本 Task 类型的实现——一种可以从执行异步工作的协程中返回的类型
  • Boost.Asio,以举例说明使用协程进行的异步编程

在继续之前,还需要说明的是,本章中没有性能相关的主题,也很少介绍指导方针和最佳实践。相反,本章更多地是作为 C++ 中异步协程这一新颖特性的一篇介绍性文章。我们将从探索 Awaitable 类型和 co_await 语句开始这个介绍。

Awaitable 类型再探

我们已经在上一章简单讨论了 Awaitable 类型。但现在我们需要更具体地了解 co_await 的作用以及 Awaitable 类型是什么。关键字 co_await 是一个一元运算符(unary operator),意味着它接受一个单独的参数。我们传递给 co_await 的参数需要满足一些要求,我们将在本节中探讨这些要求。

当我们在代码中使用 co_await 时,我们表达的是我们正在等待某个可能已就绪或未就绪的对象。如果它尚未就绪,co_await 会暂停当前执行的协程,并将控制权返回给其调用者。当异步任务完成时,它应该将控制权转回给最初等待任务完成的协程。从现在起,我通常将等待函数称为延续(continuation)。

现在考虑以下表达式:

co_await X{};

要使这段代码编译,X 需要是一个 Awaitable 类型。到目前为止,我们只使用了平凡(trivial)的 Awaitable 类型:std::suspend_alwaysstd::suspend_never。任何直接实现接下来列出的三个成员函数,或定义 operator co_await() 来生成一个具有这些成员函数的对象的类型,就是一个 Awaitable 类型:

  1. await_ready()

    • 返回值: 返回一个 bool,指示结果是否已就绪(true),或者是否需要暂停当前协程并等待结果就绪。
  2. await_suspend(coroutine_handle)

    • 调用时机: 如果 await_ready() 返回 false,则此函数将被调用,并带有一个执行了 co_await 的协程的句柄(handle)。
    • 作用: 此函数使我们有机会启动异步工作并订阅一个通知,该通知将在任务完成后触发,并随后恢复(resume)该协程。
  3. await_resume()

    • 作用: 负责将结果(或错误)解包回协程。如果由 await_suspend() 启动的工作期间发生了错误,此函数可以重新抛出(rethrow)捕获的错误或返回一个错误码。
    • 结果: 整个 co_await 表达式的结果就是 await_resume() 返回的任何值。

为了演示 operator co_await() 的使用,这里有一个受 C++20 标准中定义时间间隔的 operator co_await 启发而来的代码片段:

using namespace std::chrono;
template <class Rep, class Period>
auto operator co_await(duration<Rep, Period> d) {
    struct Awaitable {
        system_clock::duration d_;
        Awaitable(system_clock::duration d) : d_(d) {}
        
        bool await_ready() const { return d_.count() <= 0; }
        
        void await_suspend(std::coroutine_handle<> h) { /* ... */ }
        
        void await_resume() {}
    };
    return Awaitable{d};
}

有了这个重载,我们现在可以将一个时间间隔传递给 co_await 运算符,如下所示:

std::cout << "just about to go to sleep...\n";
co_await 10ms; // Calls operator co_await()
std::cout << "resumed\n";

该示例并不完整,但为您展示了如何使用一元运算符 co_await。您可能已经注意到,三个 await_*() 函数不是由我们直接调用的;相反,它们是由编译器插入的代码调用的。

另一个示例将阐明编译器所做的转换。假设编译器在我们的代码中遇到了以下语句:

auto result = co_await expr;

那么编译器将(非常)粗略地将代码转换为如下所示:

// Pseudo code (伪代码)
auto&& a = expr;             // Evaluate expr, a is the awaitable (求值 expr,a 是 awaitable)
if (!a.await_ready()) {      // Not ready, wait for result (未就绪,等待结果)
    a.await_suspend(h);      // Handle to current coroutine (当前协程的句柄)
    // Suspend/resume happens here (暂停/恢复发生在此处)
}
auto result = a.await_resume(); // 获取结果

await_ready() 函数首先被调用,以检查是否需要暂停。如果需要,则调用 await_suspend(),并带有一个将要被暂停的协程的句柄(即带有 co_await 语句的协程)。最后,请求 Awaitable 的结果并将其分配给 result 变量。

隐式暂停点

正如您在许多示例中看到的那样,协程通过使用 co_awaitco_yield 定义了显式暂停点。每个协程还具有两个隐式暂停点:

  • 初始暂停点 (The initial suspend point): 发生在协程首次调用时,在协程体执行之前。
  • 最终暂停点 (The final suspend point): 发生在协程体执行之后,在协程被销毁之前。

Promise 类型通过实现 initial_suspend()final_suspend() 来定义这两个点的行为。这两个函数都返回 Awaitable 对象。通常,我们从 initial_suspend() 函数返回 std::suspend_always,以便协程是惰性启动(lazily started)而不是急切启动。

最终暂停点对于异步任务扮演着重要的角色,因为它使我们能够调整 co_await 的行为。通常,一个被 co_await 的协程应该在最终暂停点恢复正在等待的协程。

接下来,让我们更好地理解这三个 Awaitable 函数的预期用途以及它们如何与 co_await 运算符协作。

实现一个基本的任务类型

我们即将实现的 Task 类型是一种可以从表示异步任务的协程中返回的类型。调用者可以使用 co_await 等待该任务完成。目标是能够编写如下所示的异步应用程序代码:

auto image = co_await load("image.jpg");
auto thumbnail = co_await resize(image, 100, 100);
co_await save(thumbnail, "thumbnail.jpg");

标准库已经提供了一种类型,允许函数返回一个对象,调用者可以使用它来等待结果的计算,即 std::future。我们有可能将 std::future 包装成符合 Awaitable 接口的东西。然而,std::future 不支持延续(continuations),这意味着每当我们尝试从 std::future 获取值时,我们都会阻塞当前线程。换句话说,使用 std::future 时,无法在不阻塞的情况下组合异步操作。

另一种选择是使用 std::experimental::future 或来自 Boost 库的支持延续的 future 类型。但这些 future 类型会分配堆内存并包含同步原语,而这些在我们的 Task 用例中是不需要的。因此,我们将创建一个开销最小的新类型,其职责是:

  • 将返回值和异常转发给调用者。
  • 恢复等待结果的调用者。

协程 Task 类型已被提出(参见 P1056R0),该提案为我们提供了我们需要哪些组件的良好提示。接下来的实现基于 Gor Nishanov 提出的工作以及 Lewis Baker 分享的源代码,该代码可在 CppCoro 库中找到。

下面是用于表示异步任务的类模板实现:

template <typename T>
class [[nodiscard]] Task {
struct Promise { /* ... */ }; // 见下文
std::coroutine_handle<Promise> h_;
explicit Task(Promise & p) noexcept
: h_{std::coroutine_handle<Promise>::from_promise(p)} {}
public:
using promise_type = Promise;
Task(Task&& t) noexcept : h_{std::exchange(t.h_, {})} {}
~Task() { if (h_) h_.destroy(); }
// Awaitable interface (Awaitable 接口)
bool await_ready() { return false; }
auto await_suspend(std::coroutine_handle<> c) {
    h_.promise().continuation_ = c;
    return h_;
}
auto await_resume() -> T {
    auto& result = h_.promise().result_;
    if (result.index() == 1) {
        return std::get<1>(std::move(result));
    } else {
        std::rethrow_exception(std::get<2>(std::move(result)));
    }
}
};

以下是 Promise 类型的实现,它使用 std::variant 来存储值或错误,并使用 continuation_ 成员存储等待该任务完成的协程的句柄:

struct Promise {
    // 存储可能的结果:空值、T类型的值或异常指针
    std::variant<std::monostate, T, std::exception_ptr> result_;
    // 等待中的协程句柄
    std::coroutine_handle<> continuation_; 
    
    // 协程工厂函数
    auto get_return_object() noexcept { return Task{*this}; }
    
    // 处理 co_return value
    void return_value(T value) {
        result_.template emplace<1>(std::move(value));
    }
    
    // 处理未捕获的异常
    void unhandled_exception() noexcept {
        result_.template emplace<2>(std::current_exception());
    }
    
    // 初始暂停点:总是暂停,实现惰性启动
    auto initial_suspend() { return std::suspend_always{}; }
    
    // 最终暂停点:在完成时恢复等待的调用者
    auto final_suspend() noexcept {
        struct Awaitable {
            bool await_ready() noexcept { return false; }
            auto await_suspend(std::coroutine_handle<Promise> h) noexcept {
                return h.promise().continuation_;
            }
            void await_resume() noexcept {}
        };
        return Awaitable{};
    }
};

区分我们正在使用的两个协程句柄非常重要:标识当前协程的句柄和标识延续(continuation)的句柄。

请注意,由于 std::variant 的限制,以及我们不能在同一个 Promise 类型上同时拥有 return_value()return_void(),此实现不支持 Task。不支持 Task 是一个遗憾,因为并非所有异步任务都必须返回值。稍后,我们将通过为 Task 提供模板特化来克服此限制。

异常和返回值处理

一个异步任务可以通过返回(一个值或 void)或抛出异常来完成。值和错误都需要交给一直在等待任务完成的调用者。像往常一样,这是 Promise 对象的责任。

Promise 类使用 std::variant 来存储三种可能结果:

  • 无值 (std::monostate): 我们在 variant 中使用它来使其可以默认构造,而无需要求其他两种类型也可以默认构造。
  • 返回值 (T): 类型为 T 的返回值,其中 T 是 Task 的模板参数。
  • 异常指针 (std::exception_ptr): 一个指向先前抛出的异常的句柄。

通过在 Promise::unhandled_exception() 函数内部使用 std::current_exception() 函数捕获异常。通过存储 std::exception_ptr,我们以后可以在另一个上下文中重新抛出此异常。这也是异常在线程之间传递时使用的机制。

限制: 使用 co_return value; 的协程必须有一个实现 return_value() 的 Promise 类型。然而,使用 co_return; 或在没有返回值的情况下运行到函数体结束的协程必须有一个实现 return_void() 的 Promise 类型。实现一个同时包含 return_void()return_value() 的 Promise 类型会生成编译错误。

恢复等待中的协程

当异步任务完成时,它应该将控制权转移回等待任务完成的协程。为了能够恢复这个延续(continuation)协程,Task 对象需要指向该延续协程的 coroutine_handle。这个句柄被传递给了 Task 对象的 await_suspend() 函数,我们方便地确保将该句柄保存到了 Promise 对象中:

class Task {
// ...
auto await_suspend(std::coroutine_handle<> c) {
    h_.promise().continuation_ = c; // 保存句柄
    return h_;
}
// ...

final_suspend() 函数负责在该协程的最终暂停点暂停,并将执行权转移给等待中的协程。为方便起见,下面重现了 Promise 中的相关部分:

auto Promise::final_suspend() noexcept {
    struct Awaitable {
        bool await_ready() noexcept { return false; } // 暂停
        
        auto await_suspend(std::coroutine_handle<Promise> h) noexcept{
            return h.promise().continuation_; // 将控制权转移给
        }                                    // 等待中的协程
        
        void await_resume() noexcept {}
    };
    return Awaitable{};
}

首先,从 await_ready() 返回 false 将使协程在最终暂停点保持暂停状态。我们这样做的原因是,Promise 仍然存活,并且可供延续协程从中取出结果。

接下来,我们来看 await_suspend() 函数。这是我们希望恢复延续协程的地方。我们原本可以直接在 continuation_ 句柄上调用 resume() 并等待其完成,像这样:

// ...
auto await_suspend(std::coroutine_handle<Promise> h) noexcept {
    h.promise().continuation_.resume(); // 不推荐
    return std::coroutine_handle<>{}; // 或者返回 void/bool
}
// ...

然而,这样做可能会在栈上创建一长串嵌套的调用帧,最终可能导致栈溢出(stack overflow)。让我们通过一个使用两个协程 a() 和 b() 的简短示例,看看这种情况是如何发生的:

auto a() -> Task<int> { co_return 42; }

auto b() -> Task<int> { // 延续协程
    auto sum = 0;
    for (auto i = 0; i < 1'000'000; ++i) {
        sum += co_await a(); // b() 每次都在等待 a()
    }
    co_return sum;
}

如果与协程 a() 关联的 Promise 对象直接调用协程 b() 句柄上的 resume(),那么一个恢复 b() 的新调用帧将在栈上位于 a() 的调用帧之上被创建。这个过程将在循环中一遍又一遍地重复,为每次迭代在栈上创建新的嵌套调用帧。这种两个函数相互调用的调用序列是一种递归形式,有时被称为相互递归(mutual recursion):

尽管只为 b() 创建了一个协程帧,但每次恢复协程 b() 的 resume() 调用都会在栈上创建一个新的帧。

解决这个问题的方法称为对称转移(Symmetric Transfer)。即将要完成的协程不是直接恢复延续协程,而是 Task 对象从 await_suspend() 中返回标识延续协程的 coroutine_handle

// ...
auto await_suspend(std::coroutine_handle<Promise> h) noexcept {
    return h.promise().continuation_; // 对称转移
}
// ...

这样,编译器就保证会进行一个称为尾调用优化(tail call optimization)的优化。在我们的例子中,这意味着编译器将能够直接将控制权转移给延续协程,而不会创建新的嵌套调用帧。

我们将不再花费更多时间讨论对称转移和尾调用的细节,但您可以在 Lewis Baker 的文章《C++ 协程:理解对称转移》中找到对这些主题的优秀且更深入的解释。

如前所述,我们的 Task 模板存在无法处理 void 类型模板参数的限制。现在是时候解决这个问题了。

Task 的支持

为了克服前面提到的无法处理不产生任何值的任务的限制,我们需要对 Task 进行模板特化。

template <>
class [[nodiscard]] Task<void> {
struct Promise {
    std::exception_ptr e_; // No std::variant, only exception (只有异常)
    std::coroutine_handle<> continuation_;
    auto get_return_object() noexcept { return Task{*this}; }
    void return_void() {} // Instead of return_value() (替代 return_value())
    void unhandled_exception() noexcept {
        e_ = std::current_exception();
    }
    // ... initial_suspend() and final_suspend() are the same as Task<T>
    auto initial_suspend() { return std::suspend_always{}; }
    auto final_suspend() noexcept {
        struct Awaitable {
            bool await_ready() noexcept { return false; }
            auto await_suspend(std::coroutine_handle<Promise> h) noexcept {
                return h.promise().continuation_;
            }
            void await_resume() noexcept {}
        };
        return Awaitable{};
    }
};
std::coroutine_handle<Promise> h_;
explicit Task(Promise& p) noexcept
    : h_{std::coroutine_handle<Promise>::from_promise(p)} {}
public:
    using promise_type = Promise;
    Task(Task&& t) noexcept : h_{std::exchange(t.h_, {})} {}
    ~Task() { if (h_) h_.destroy(); }
    // Awaitable interface
    bool await_ready() { return false; }
    auto await_suspend(std::coroutine_handle<> c) {
        h_.promise().continuation_ = c;
        return h_;
    }
    void await_resume() { // 注意:返回 void
        if (h_.promise().e_)
            std::rethrow_exception(h_.promise().e_);
    }
};

这个模板特化中的 Promise 类型只保留对潜在未处理异常的引用。并且,该 Promise 包含成员函数 return_void(),而不是定义 return_value()

阻塞式等待任务完成

Task 类型的一个重要方面是,无论是谁调用返回 Task 的协程,都必须对其进行 co_await,因此该调用者本身也必须是一个协程。这创建了一个协程链(延续)。

由于 C++ 标准规定 main() 函数不允许是协程,这个协程链最终必须在某个地方被打破。解决方案是提供至少一个函数,用于同步等待异步链完成。

我们声明 Task 为 [[nodiscard]] 正是为了在返回值被忽略时(如 async_func();)生成编译警告,因为这将导致任务永远不会运行。

实现 sync_wait()

我们的目标是能够编写如下所示的测试程序:

auto some_async_func() -> Task<int> { /* ... */ }
int main() {
    auto result = sync_wait(some_async_func());
    return result;
}

为了实现测试和运行异步任务的目的,我们将继续实现 sync_wait()

template<typename T>
using Result = decltype(std::declval<T&>().await_resume()); // 获取 Task 的返回值类型

template <typename T>
Result<T> sync_wait(T&& task) {
    if constexpr (std::is_void_v<Result<T>>) { // 处理 void 任务
        struct Empty {};
        auto coro = [&]() -> detail::SyncWaitTask<Empty> {
            co_await std::forward<T>(task);
            co_yield Empty{};
            assert(false);
        };
        coro().get();
    } else { // 处理有返回值任务
        auto coro = [&]() -> detail::SyncWaitTask<Result<T>> {
            co_yield co_await std::forward<T>(task);
            // This coroutine will be destroyed before it
            // has a chance to return.
            assert(false);
        };
        return coro().get();
    }
}

【译注:

template <typename T>
using Result = decltype(std::declval<T&>().await_resume());

这是一个 C++ 模板别名(Template Alias),用于元编程(metaprogramming),目的是在编译时确定一个 Awaitable 类型(通常是 Task 类型)在被 co_await 成功等待后,最终会返回的数据类型。

它在 sync_wait 函数的实现中至关重要,因为它需要知道要同步等待的异步任务会产生什么类型的值。

  • template <typename T>: 定义一个接受一个类型参数 T 的模板别名。这里的 T 预期是某个 Awaitable 类型(如 Task<int>Task<void>)。
  • using Result = ...: 为计算出的类型定义一个别名,名为 Result
  • decltype(...): C++ 关键字,用于获取表达式的准确类型。表达式本身不会被执行,decltype 仅在编译时推导出类型。

这是元编程技巧,用于模拟调用 Awaitable 类型的关键方法:

  • std::declval<T&>():
    • 这个模板函数(位于 <utility>)的作用是生成一个 T 类型的左值引用(T&),而无需实际构造 T 类型的对象。
    • 它使得我们可以在编译时假装我们有一个可用的 T 类型实例。
  • .await_resume():
    • 这是 Awaitable 接口的第三个也是最后一个函数。
    • co_await 流程中,await_resume() 是协程被恢复时调用的函数,它负责返回最终结果或重新抛出异常。
    • 无论是 Task<int> 还是 Task<void>,它们都必须实现此方法。

decltype(std::declval<T&>().await_resume()) 结合起来就是在编译时问:“如果我有一个 T 类型的对象,调用它的 await_resume() 成员函数,会返回什么类型?”

示例:

传入类型 T await_resume() 的签名 Result<T> 推导出的类型
Task<int> auto await_resume() -> int int
Task<void> void await_resume() void

因此,这个模板别名提供了一种优雅且类型安全的方式,在不实际运行任何代码的前提下,获取异步任务的最终返回值类型,这对于正确实现 sync_wait 的返回值和内部逻辑至关重要。 】

sync_wait() 内部,我们区分了返回值的任务和返回 void 的任务。

实现 SyncWaitTask

SyncWaitTask 是专门为 sync_wait() 设计的内部辅助类,因此放在 detail 命名空间下。

namespace detail { // Implementation detail (实现细节)
template <typename T>
class SyncWaitTask { // A helper class only used by sync_wait()
    struct Promise { /* ... */ }; // See below (见下文)
    std::coroutine_handle<Promise> h_;
    explicit SyncWaitTask(Promise& p) noexcept
        : h_{std::coroutine_handle<Promise>::from_promise(p)} {}
public:
    using promise_type = Promise;
    SyncWaitTask(SyncWaitTask&& t) noexcept
        : h_{std::exchange(t.h_, {})} {}
    ~SyncWaitTask() { if (h_) h_.destroy();}
    
    // Called from sync_wait(). Will block and retrieve the
    // value or error from the task passed to sync_wait()
    T&& get() {
        auto& p = h_.promise();
        h_.resume();
        p.semaphore_.acquire(); // Block until signal (阻塞直到收到信号)
        if (p.error_)
            std::rethrow_exception(p.error_);
        return static_cast<T&&>(*p.value_);
    }
    // No awaitable interface, this class will not be co_await:ed
};
} // namespace detail

get() 函数中最值得注意的是对 Promise 对象拥有的信号量(semaphore)的阻塞调用 acquire()。这使得该 Task 类型可以同步等待结果就绪。

Promise 结构体拥有一个二元信号量:

struct Promise {
    T* value_{nullptr};
    std::exception_ptr error_;
    std::binary_semaphore semaphore_; // C++20 同步原语
    
    SyncWaitTask get_return_object() noexcept {
        return SyncWaitTask{*this};
    }
    void unhandled_exception() noexcept {
        error_ = std::current_exception();
    }
    
    // Result has arrived (结果已到达)
    auto yield_value(T&& x) noexcept { 
        value_ = std::addressof(x);
        return final_suspend(); // 触发 final_suspend
    }
    
    auto initial_suspend() noexcept {
        return std::suspend_always{};
    }
    
    auto final_suspend() noexcept {
        struct Awaitable {
            bool await_ready() noexcept { return false; }
            void await_suspend(std::coroutine_handle<Promise> h) noexcept {
                h.promise().semaphore_.release(); // Signal! (释放信号量)
            }
            void await_resume() noexcept {}
        };
        return Awaitable{};
    }
    void return_void() noexcept { assert(false); }
};

这里要特别关注 yield_value()final_suspend()

  1. sync_wait() 中的 lambda 协程执行 co_yield co_await task 并产生值时,会调用 yield_value()
  2. yield_value() 返回 final_suspend()
  3. final_suspend() 返回的 Awaitable 在其 await_suspend() 中通过 semaphore_.release() 发送信号。

这个信号使得 SyncWaitTask::get() 中阻塞的 semaphore_.acquire() 调用返回,从而将计算出的值交给调用 sync_wait() 的客户端。

【译注: sync_wait函数调用coro().get()会让当前线程等待信号量并且阻塞,那么执行协程的线程又是哪个呢?

这是一个关于 C++ 协程和异步编程中执行器(Executor)的经典问题。

在您提供的 sync_wait 简化实现中,执行协程的线程就是调用 coro().get() 的当前线程。

这里没有显式的线程池或调度器,这是 C++ 协程的一个基本行为和设计选择。

在没有外部调度器(如 Boost.Asio 或 CppCoro 库中的线程池)介入的情况下,协程的执行遵循以下步骤:

1. 协程创建和首次恢复

  1. 调用 coro(): 在 sync_wait 中,这会执行 lambda 函数,创建一个 detail::SyncWaitTask 对象(包含协程句柄 h_)。
  2. initial_suspend: 协程主体(coro 的 lambda)在第一行代码执行前立即暂停(因为它返回了 std::suspend_always{})。
  3. 调用 coro().get(): get() 函数被调用。
  4. h_.resume(): get() 内部的第一步是调用 h_.resume()

结论: 此时,调用 sync_wait 的当前线程 开始执行协程的主体逻辑。

2. 协程执行与阻塞点

协程执行主体,直到遇到第一个 co_awaitco_yield

  1. 执行 co_await std::forward<T>(task):
    • 这将开始执行您正在等待的原始异步任务 (task)。
    • 关键点: 如果 task 是一个真正的异步操作(如网络 I/O),它通常会将工作提交给某个调度器(例如操作系统内核或 Boost.Asio 的 io_context)并立即暂停自身。
    • 在您这个简化的例子中: 您的 Task 实现的 await_ready() 返回 false,所以它会调用 task.await_suspend() 并暂停自身。由于您的 Task 中没有实际的异步操作,控制权会立即返回到 sync_wait 协程。
  2. co_yield (对于有返回值的情况):
    • 协程执行 co_yield ...,最终调用 SyncWaitTask::Promise::yield_value

3. 当前线程阻塞

  1. yield_value / final_suspend: yield_value 触发 final_suspend,该流程在 await_suspend 中执行 semaphore_.release()
  2. 控制权返回 get(): 协程暂停,控制权返回给 SyncWaitTask::get() 函数。
  3. p.semaphore_.acquire(): get() 函数调用 acquire()

结论: 调用 sync_wait 的当前线程 此时阻塞,等待信号量。

4. 谁执行了异步工作?

在这个简化的,不包含执行器的 Task 模型中,sync_wait 只能处理同步完成的协程,或者依赖于外部库提供的异步工作。

  • 如果 task 内部没有涉及线程池或 I/O: 那么 task 的所有逻辑(在 co_await 之前的部分)都是在调用 sync_wait 的当前线程上同步执行的。
  • 如果 task 是一个真正的异步操作(例如 Boost.Asio 的网络请求): 那么 task.await_suspend() 会将 I/O 请求提交给操作系统。当 I/O 完成时,Boost.Asio 的 io_context 线程(另一个线程或线程池)会负责恢复等待的协程 (continuation_.resume())。
    • 在这个实际场景中: 最终信号量 release() 动作将由异步工作所在的线程来执行,从而唤醒在 sync_wait 中阻塞的初始线程。

关键总结:

在这个简化代码中,协程的初始执行和同步阻塞都发生在调用 sync_wait() 的当前线程。如果 task 内部涉及真正的异步或并发,那么恢复协程和释放信号量的操作将由另一个线程(执行器或 I/O 线程)来完成。

使用 sync_wait() 测试异步任务

最后,可以使用 Task 和 sync_wait() 构建一个小型异步测试程序:

auto height() -> Task<int> { co_return 20; } // Dummy coroutines (虚拟协程)
auto width() -> Task<int> { co_return 30; }

auto area() -> Task<int> {
    co_return co_await height() * co_await width();
}

int main() {
    auto a = area();
    int value = sync_wait(a);
    std::cout << value; // Outputs: 600
}

我们已经实现了使用 C++ 协程进行异步任务的绝对最低基础设施。然而,为了有效地使用协程进行异步编程,还需要更多的基础设施。为了更贴近现实世界,我们将在接下来的部分中探讨使用 Boost.Asio 的一些示例。我们要做的第一件事是尝试将基于回调的 API 包装在一个与 C++ 协程兼容的 API 中。

包装基于回调的 API

存在许多基于回调(callbacks)的异步 API。通常,一个异步函数接受一个由调用者提供的回调函数。异步函数会立即返回,然后最终在计算出值或完成等待某些事情时,调用该回调函数(完成处理器)。

为了向您展示基于异步回调的 API 可能是什么样子,我们将快速了解一个用于异步 I/O 的 Boost 库,名为 Boost.Asio。关于 Boost.Asio 有很多知识,但这里不会全部涵盖;我将只描述最少的 Boost 代码,转而关注与 C++ 协程直接相关的部分。

为了使代码适合书页排版,示例假设每次我们使用 Boost.Asio 中的代码时都定义了以下命名空间别名:

namespace asio = boost::asio;

这是一个使用 Boost.Asio 实现延迟函数调用但不阻塞当前线程的完整示例。这个异步示例运行在单个线程中:

#include <boost/asio.hpp>
#include <chrono>
#include <iostream>

using namespace std::chrono;
namespace asio = boost::asio;

int main() {
    auto ctx = asio::io_context{};
    auto timer = asio::system_timer{ctx};
    
    // 设置定时器,1000毫秒后过期
    timer.expires_from_now(1000ms);
    
    // 异步等待,立即返回。当时间到时,执行 lambda 回调
    timer.async_wait([](auto error) { // Callback (回调)
        // 忽略错误..
        std::cout << "Hello from delayed callback\n";
    });
    
    std::cout << "Hello from main\n";
    
    // 运行事件处理循环,直到所有异步任务完成
    ctx.run();
}

编译并运行此程序将生成以下输出(”Hello from main” 在延迟发生前立即输出):

Hello from main
Hello from delayed callback

使用 Boost.Asio 时,我们总是需要创建一个运行事件处理循环(event processing loop)的 io_context 对象。对 async_wait() 的调用是异步的;它立即返回到 main(),并在定时器过期时调用回调(即 lambda 表达式)。

这个定时器示例没有使用协程,而是使用回调 API 来提供异步性。Boost.Asio 也与 C++20 协程兼容,我将在后面演示。但在我们探索 Awaitable 类型的道路上,我们将绕个弯,假设我们需要在 Boost.Asio 基于回调的 API 之上,提供一个返回 Awaitable 类型的基于协程的 API。

这样,我们就可以使用 co_await 表达式来调用并等待异步任务完成(但不阻塞当前线程)。我们希望能够写出类似这样的代码,而不是使用回调:

std::cout << "Hello! ";
co_await async_sleep(ctx, 100ms);
std::cout << "Delayed output\n";

让我们看看如何实现 async_sleep() 函数,使其可以与 co_await 一起使用。我们将遵循的模式是让 async_sleep() 返回一个 Awaitable 对象,该对象将实现所需的三个函数:await_ready()await_suspend()await_resume()

template <typename R, typename P>
auto async_sleep(asio::io_context& ctx,
                 std::chrono::duration<R, P> d) {
    
    struct Awaitable {
        asio::system_timer t_;
        std::chrono::duration<R, P> d_;
        boost::system::error_code ec_{}; // 存储错误码
        
        bool await_ready() { 
            // 如果时间小于等于 0,则立即就绪,无需等待
            return d_.count() <= 0; 
        }
        
        void await_suspend(std::coroutine_handle<> h) {
            t_.expires_from_now(d_);
            
            // 启动异步等待,并传入一个回调函数
            t_.async_wait([this, h](auto ec) mutable {
                this->ec_ = ec; // 保存错误码
                h.resume();     // 恢复被挂起的协程!
            });
        }
        
        void await_resume() {
            // 检查是否有错误,如果有则抛出
            if (ec_) throw boost::system::system_error(ec_);
            // 无返回值
        }
    };
    
    // 返回一个以 system_timer 和 duration 初始化的 Awaitable 对象
    return Awaitable{asio::system_timer{ctx}, d};
}

我们再次创建了一个自定义的 Awaitable 类型来完成所有必要的工作:

  • await_ready():除非定时器已经归零,否则返回 false
  • await_suspend():启动异步操作 (t_.async_wait()),并传入一个回调函数。该回调函数将在定时器过期或产生错误时被调用。回调函数保存错误码(如果有)并恢复被挂起的协程。
  • await_resume():没有结果需要解包,因为我们包装的异步函数 boost::asio::timer::async_wait() 不返回任何值,只返回一个可选的错误码。

在我们能够在独立的程序中实际测试 async_sleep() 之前,我们需要一种方法来启动 io_context 运行循环并打破协程链,就像我们之前测试 Task 类型时所做的那样。我们将以一种不规范(hacky)的方式来实现它,通过实现两个函数 run_task()run_task_impl(),以及一个忽略错误处理并可被调用者丢弃的简单协程返回类型 Detached

// -----------------------------------------------------------------
// 仅为使示例能够运行而编写的代码
struct Detached {
    struct promise_type {
        auto get_return_object() { return Detached{}; }
        auto initial_suspend() { return std::suspend_never{}; } // 立即启动
        auto final_suspend() noexcept { return std::suspend_never{};} // 立即销毁
        void unhandled_exception() { std::terminate(); } // 忽略/终止
        void return_void() {}
    };
};

Detached run_task_impl(asio::io_context& ctx, Task<void>&& t) {
    // 防止 ctx.run() 在 coroutine 完成前返回
    auto wg = asio::executor_work_guard{ctx.get_executor()}; 
    co_await t; // 等待 Task<void> 完成
}

void run_task(asio::io_context& ctx, Task<void>&& t) {
    run_task_impl(ctx, std::move(t));
    ctx.run(); // 阻塞直到所有工作完成
}
// -----------------------------------------------------------------

Detached 类型使协程立即启动,并且与调用者分离运行。executor_work_guard 阻止 run() 调用返回,直到协程 run_task_impl() 完成。

【译注:

auto wg = asio::executor_work_guard{ctx.get_executor()}; 

编译失败:class template argument deduction failed。可以修改为:

using ExecutorType = asio::io_context::executor_type;
auto wg = asio::executor_work_guard<ExecutorType>{ctx.get_executor()};

或者:

auto wg = asio::make_work_guard(ctx.get_executor());

注意: 启动操作并将其分离通常应避免。这类似于分离线程或分配没有引用的内存。然而,此示例的目的是演示 Awaitable 类型的使用以及如何编写异步程序并在单线程中运行它们。

一切就绪;包装器 async_sleep() 返回一个 Task,并且我们有可用于执行任务的函数 run_task()。是时候编写一个小型协程来测试我们实现的新代码了:

auto test_sleep(asio::io_context& ctx) -> Task<void> {
    std::cout << "Hello! ";
    co_await async_sleep(ctx, 100ms);
    std::cout << "Delayed output\n";
}

int main() {
    auto ctx = asio::io_context{};
    auto task = test_sleep(ctx);
    run_task(ctx, std::move(task));
}

执行此程序将生成以下输出:

Hello! Delayed output

您已经看到了如何将基于回调的 API 包装在一个可以被 co_await 使用的函数中,从而允许我们使用协程而不是回调来进行异步编程。此程序还提供了一个关于 Awaitable 类型中函数如何使用的典型示例。然而,正如前面提到的,Boost 的最新版本(从 1.70 开始)已经提供了一个与 C++20 协程兼容的接口。在下一节中,我们将在构建一个微型 TCP 服务器时使用这个新的协程 API。

使用 Boost.Asio 的并发服务器

本节将演示如何编写具有多个执行流但仅使用单个 OS 线程的并发程序。我们将实现一个基本的、并发的、单线程 TCP 服务器,它可以处理多个客户端。C++ 标准库中没有网络功能,但幸运的是 Boost.Asio 为我们提供了处理套接字通信的平台无关接口。

这一次,我将不再包装基于回调的 Boost.Asio API,而是演示如何使用 boost::asio::awaitable 类,以展示一个更真实的、使用协程进行异步应用编程的示例。类模板 boost::asio::awaitable 对应于我们前面创建的 Task 模板;它被用作表示异步计算的协程的返回类型。

实现服务器

这个服务器非常简单:一旦客户端连接,它就开始更新一个数字计数器,并在每次更新时将值写回客户端。这次我们将从上到下地分析代码,从 main() 函数开始:

#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <iostream> // 需要补充
#include <string>   // 需要补充
#include <chrono>   // 需要补充

using namespace std::chrono;
namespace asio = boost::asio;
using boost::asio::ip::tcp;

// 前向声明,将在后面定义
auto listen(tcp::endpoint endpoint) -> asio::awaitable<void>;

int main() {
    auto server = [] {
        // 定义 TCP 端点:IPv4, 端口 37259
        auto endpoint = tcp::endpoint{tcp::v4(), 37259}; 
        // 启动监听协程
        auto awaitable = listen(endpoint); 
        return awaitable;
    };
    
    auto ctx = asio::io_context{};
    // 启动一个分离的并发流 (协程)
    asio::co_spawn(ctx, server, asio::detached); 
    
    // 从主线程运行事件循环
    ctx.run(); 
}

强制性的 io_context 运行事件处理循环。如果我们希望服务器执行多个 OS 线程,也可以从多个线程调用 run()。在我们的例子中,我们只使用一个线程,但有多个并发流。函数 boost::asio::co_spawn() 启动一个分离的并发流。服务器使用 lambda 实现;它定义了一个 TCP 端点(端口 $37259$)并开始监听传入的客户端连接。

监听协程 (listen)

listen() 协程相当简单,如下所示:

auto serve_client(tcp::socket socket) -> asio::awaitable<void>; // 前向声明

auto listen(tcp::endpoint endpoint) -> asio::awaitable<void> {
    // 1. 获取当前协程的执行器 (executor)
    auto ex = co_await asio::this_coro::executor; 
    
    // 2. 创建一个 TCP 接收器
    auto a = tcp::acceptor{ex, endpoint}; 

    while (true) {
        // 3. 异步等待传入连接,返回一个 socket
        auto socket = co_await a.async_accept(asio::use_awaitable); 
        
        // 4. 为新连接创建一个会话协程
        auto session = [s = std::move(socket)]() mutable {
            auto awaitable = serve_client(std::move(s));
            return awaitable;
        };
        
        // 5. 启动一个分离的协程来处理客户端会话
        asio::co_spawn(ex, std::move(session), asio::detached);
    }
}

执行器(Executor)是实际执行我们的异步函数的对象。执行器可以代表一个线程池或一个单个系统线程。我们很有可能会在 C++ 的未来版本中看到某种形式的执行器,以便为程序员提供更多关于代码执行时间和地点的控制和灵活性(包括 GPU)。

接下来,该协程运行一个无限循环,等待 TCP 客户端连接。第一个 co_await 表达式在新的客户端成功连接到服务器时返回一个 socket。然后,该 socket 对象被移动到协程 serve_client() 中,该协程将服务新连接的客户端,直到客户端断开连接。

客户端服务协程 (serve_client)

服务器的主要应用程序逻辑发生在处理每个客户端的协程中。它看起来像这样:

auto serve_client(tcp::socket socket) -> asio::awaitable<void> {
    std::cout << "New client connected\n";
    
    auto ex = co_await asio::this_coro::executor;
    auto timer = asio::system_timer{ex};
    auto counter = 0;
    
    while (true) {
        try {
            // 构造要发送的字符串
            auto s = std::to_string(counter) + "\n";
            auto buf = asio::buffer(s.data(), s.size());
            
            // 异步写入数据
            auto n = co_await async_write(socket, buf, asio::use_awaitable);
            
            std::cout << "Wrote " << n << " byte(s)\n";
            ++counter;
            
            // 设置 100 毫秒的延迟
            timer.expires_from_now(100ms);
            
            // 异步等待定时器
            co_await timer.async_wait(asio::use_awaitable);
            
        } catch (...) {
            // 捕获错误或客户端断开连接
            break; 
        }
    }
}

每次协程调用都在整个客户端会话期间服务一个独特的客户端;它运行直到客户端从服务器断开连接。该协程以固定的间隔(每 100 ms)更新一个计数器,并使用 async_write() 将该值异步写回客户端。请注意,尽管 serve_client() 调用了两个异步操作:async_write()async_wait(),但我们可以以线性方式编写该函数。

运行和连接到服务器

一旦我们启动了这个服务器,我们就可以在端口 $37259$ 上连接客户端。为了尝试这个,我使用了名为 nc (netcat) 的工具,它可以用于通过 TCP 和 UDP 进行通信。这是一个客户端连接到运行在 localhost 上的服务器的简短会话示例:

[client] $ nc localhost 37259
0
1
2
3

我们可以启动多个客户端,它们都将由一个专门的 serve_client() 协程调用来服务,并拥有各自递增的计数器变量副本。

创建应用程序并发服务多个会话的另一种方法是为连接的每个新客户端创建一个线程。然而,与这种使用协程的模型相比,线程的内存开销会使会话数量的限制大大降低。

此示例中的协程都在同一线程上执行,这使得共享资源无需锁定。想象一下我们有一个由每个会话更新的全局计数器。如果我们使用多线程,访问全局计数器将需要某种同步(使用互斥锁或原子数据类型)。对于在同一线程上执行的协程来说,这是不必要的。换句话说,在同一线程上执行的协程可以在不使用任何锁定原语的情况下共享状态。

服务器的成就与不足

使用 Boost.Asio 的示例应用程序表明协程可用于异步编程。我们不需要使用嵌套回调来实现延续,而是可以使用 co_await 语句以线性方式编写代码。然而,这个示例是最小的,避免了一些异步编程中非常重要的方面,例如:

  • 异步读写操作: 服务器只向客户端写入数据,忽略了同步读写操作的挑战。
  • 取消异步任务和优雅关闭: 服务器在无限循环中运行,完全忽略了干净关闭的挑战。
  • 多重 co_await 语句下的错误处理和异常安全。

这些主题非常重要,但超出了本书的范围。我之前提到过,最好避免分离操作(detached operations)。如示例所示,使用 boost::asio::co_spawn() 创建分离任务应极其谨慎。一种用于避免分离工作的较新的编程范式称为结构化并发(structured concurrency)。它旨在通过将并发封装到通用且可重用的算法中(例如 when_all()stop_when()),来解决多个异步任务的异常安全和取消问题。关键思想是绝不允许任何子任务超过其父任务的生命周期。这使得可以安全、高性能地通过引用将局部变量传递给异步子操作。严格嵌套的并发任务生命周期也使代码更易于推理。

另一个重要方面是,异步任务应该始终是惰性(lazy)的(立即暂停),以便在抛出任何异常之前可以附加延续。这也是如果你想安全地取消任务所必需的要求。

未来几年,很可能会有大量与这一重要主题相关的演讲、库和文章。CppCon 2019 的两次演讲涉及了此主题:

  • A Unifying Abstraction for Async in C++, Eric Neibler and D. S. Hollman
  • Structured Concurrency: Writing Safer Concurrent Code with Coroutines and Algorithms, Lewis Baker

总结

在本章中,您了解了如何使用 C++ 协程编写异步任务。为了能够以 Task 类型和 sync_wait() 函数的形式实现基础设施,您需要完全理解 Awaitable 类型的概念以及如何使用它们来自定义 C++ 中协程的行为。

通过使用 Boost.Asio,我们构建了一个真正最小但功能完整的并发服务器应用程序,它在单个线程上执行,同时处理多个客户端会话。

最后,我简要介绍了结构化并发这一方法论,并指明了您可以找到更多关于此主题信息的地方。

在下一章中,我们将继续探索并行算法,这是一种通过利用多核来加速并发程序的方法。