tinyCoroLab Docs
直通tinyCoroLab源码直通tinyCoro源码
latest
latest
  • 🚀开启你的tinyCoroLab之旅!
  • 📮致读者
  • 👊C++协程入门
    • 协程初探
    • 有栈协程VS无栈协程
    • C++协程入门实践
    • 从编译器视角揭秘 C++ 协程
    • 协程调用优化
  • 🌟认识io_uring
  • 📖tinyCoroLab实验介绍
  • 📘Lab1 构建协程任务封装
    • Lab1 构建协程任务封装
    • Lab1 实验解析
  • 📗Lab2 构建任务执行引擎
    • Lab2a 构建任务执行引擎engine
    • Lab2a 实验解析
    • Lab2b 构建任务执行引擎context
    • Lab2b 实验解析
  • 📙Lab3 封装异步I/O执行模块
  • 📕Lab4 构建基础协程同步组件
    • Lab4pre 如何构建协程同步组件
    • Lab4a 构建基础协程同步组件event
    • Lab4a 实验解析
    • Lab4b 构建基础协程同步组件latch
    • Lab4b 实验解析
    • Lab4c 构建基础协程同步组件wait_group
    • Lab4c 实验解析
    • Lab4d 构建基础协程同步组件mutex
    • Lab4d 实验解析
  • 📓Lab5 构建进阶协程同步组件
    • Lab5a(选做) 构建进阶协程同步组件when_all
    • Lab5a 实验解析
    • Lab5b 构建进阶协程同步组件condition_variable
    • Lab5b 实验解析
    • Lab5c 构建进阶协程同步组件channel
    • Lab5c 实验解析
  • ✨tinyCoro Bonus Lab
  • 🎯tinyCoro悬赏令
  • ⚔️面试实战
    • 面试实战
    • tinyCoro面试相关问题
  • 🚩实验总结-终点亦是起点
  • 🪐番外杂谈
    • 从编译器视角揭秘 C++ 协程
    • 协程调用优化
  • 📌更新日志
Powered by GitBook
On this page
  • 前言
  • 实验准备
  • 环境配置
  • 预备代码
  • 协程状态存储
  • 协程内存分配优化
  • 从编译器视角看 promise
  • 从编译器视角看 awaiter
  • awaiter 与线程安全
  • 剖析 C++ 协程大核心——await_suspend
  • void_await_suspend
  • bool_await_suspend
  • 对称转换优化
  • 三种 await_suspend 对比
  • 协程执行权转移分析
  • 实验总结
  • 参考文献
  1. 🪐番外杂谈

从编译器视角揭秘 C++ 协程

前言

在 tinyCoroLab 课程交流群中,有不少读者向我反馈协程晦涩难懂,虽然网上博客关于 C++20 协程的讲解博客很多,但是看完这些博客后总是会给人一种似懂非懂的感觉,对于读者的这些反馈我一点都不感到惊讶,因为 C++20 协程确实复杂,尤其体现在编译器在协程代码中做了很多幕后工作来支持协程机制,而大部分博客都是对协程的用户接口做讲解,鲜少涉及编译器部分,当然也会国外参与设计协程的大佬对协程的运行机制做了非常全面的讲解,从用户接口使用到底层原理剖析,只不过这些博客一是英文撰写二是技术讲解确实晦涩难懂,并不是所有读者都可以静下心来耐心推敲每一处知识点。

如果只是对 C++ 协程尝鲜,那么明白用户接口的使用就好了,可一旦读者想要深入运用协程比如打造自己的协程库,那么读者必须了解编译器在协程背后所做的幕后工作,这对于分析复杂的协程状态转换和执行权转移是非常必要的。因此我决定写下这样一篇博客,综合国外权威大佬的博客(例如lewissbaker)和我的个人理解,用详细的图文以及代码演示带领刚入门协程的读者从编译器视角解开协程的神秘面纱。

实验准备

环境配置

  • 操作系统: 内核较新的 linux 系统(本文采用 wsl2 ubuntu22.04)

  • 编译器: 支持 C++20 版本的 gcc

  • 编辑器: vscode

预备代码

为了便于后续代码演示,首先添加两个头文件。

第一个头文件是coro.hpp,里面定义了大部分协程库采用的一种协程定义task并且做了简化不考虑复杂数据的存储与返回,由于本文面向刚入门协程的读者,所以读者应该保证能大致读懂该文件代码的含义,而不用过分深究:

// file: coro.hpp
#pragma once

#include <coroutine>
#include <exception>
#include <new>
#include <optional>
#include <stdexcept>
#include <utility>

#include "utils.hpp"

template <typename return_type = void> class task;

// promise 基类,定义了协程的初始以及结束时的调度逻辑
struct promise_base {
  friend struct final_awaitable;

  // 用作 final_suspend 返回的 awaiter
  struct final_awaitable {
    auto await_ready() const noexcept -> bool { return false; }
    
    // 确保协程的栈式调用正确返回
    template <typename promise_type>
    auto await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
        -> std::coroutine_handle<> {
      auto &promise = coroutine.promise();
      if (promise.m_continuation != nullptr) {
        return promise.m_continuation;
      } else {
        return std::noop_coroutine();
      }
    }

    auto await_resume() noexcept -> void {}
  };

  promise_base() noexcept = default;
  ~promise_base() = default;

  // 协程创建后立刻处于 suspend 状态
  auto initial_suspend() noexcept { return std::suspend_always{}; }

  auto final_suspend() noexcept { return final_awaitable{}; }

  // 记录当前协程执行结束后应当恢复的协程
  auto continuation(std::coroutine_handle<> continuation) noexcept -> void {
    m_continuation = continuation;
  }

protected:
  std::coroutine_handle<> m_continuation{nullptr};
};

// 忽略复杂类型的存取
template <typename return_type> struct promise final : public promise_base {
public:
  using task_type = task<return_type>;
  using coroutine_handle = std::coroutine_handle<promise<return_type>>;

  promise() noexcept {}
  promise(const promise &) = delete;
  promise(promise &&other) = delete;
  promise &operator=(const promise &) = delete;
  promise &operator=(promise &&other) = delete;
  ~promise() = default;

  auto get_return_object() noexcept -> task_type;

  auto return_value(return_type &value) -> void {
    m_value = std::make_optional<return_type>(value);
  }

  auto return_value(return_type &&value) -> void {
    m_value = std::make_optional<return_type>(std::move(value));
  }

  auto unhandled_exception() noexcept -> void {
    m_except = std::current_exception();
  }

  auto result() -> return_type {
    if (m_value.has_value()) {
      return *m_value;
    } else {
      throw std::runtime_error{
          "The return value was never set, did you execute the coroutine?"};
    }
  }

private:
  std::optional<return_type> m_value;
  std::exception_ptr m_except;
};

template <> struct promise<void> : public promise_base {
  using task_type = task<void>;
  using coroutine_handle = std::coroutine_handle<promise<void>>;

  promise() noexcept = default;
  promise(const promise &) = delete;
  promise(promise &&other) = delete;
  promise &operator=(const promise &) = delete;
  promise &operator=(promise &&other) = delete;
  ~promise() = default;

  auto get_return_object() noexcept -> task_type;

  auto return_void() noexcept -> void {}

  auto unhandled_exception() noexcept -> void {
    m_exception_ptr = std::current_exception();
  }

  auto result() -> void {
    if (m_exception_ptr) {
      std::rethrow_exception(m_exception_ptr);
    }
  }

private:
  std::exception_ptr m_exception_ptr{nullptr};
};

// 面向用户的协程对象
template <typename return_type> class [[nodiscard]] task {
public:
  using task_type = task<return_type>;
  using promise_type = promise<return_type>;
  using coroutine_handle = std::coroutine_handle<promise_type>;

  struct awaitable_base {
    awaitable_base(coroutine_handle coroutine) noexcept
        : m_coroutine(coroutine) {}

    auto await_ready() const noexcept -> bool {
      return !m_coroutine || m_coroutine.done();
    }

    // 连接调用与调用者之间的调用关系
    auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept
        -> std::coroutine_handle<> {
      m_coroutine.promise().continuation(awaiting_coroutine);
      return m_coroutine;
    }

    std::coroutine_handle<promise_type> m_coroutine{nullptr};
  };

  task() noexcept : m_coroutine(nullptr) {}

  explicit task(coroutine_handle handle) : m_coroutine(handle) {}
  task(const task &) = delete;
  task(task &&other) noexcept
      : m_coroutine(std::exchange(other.m_coroutine, nullptr)) {}

  ~task() {
    if (m_coroutine != nullptr) {
      m_coroutine.destroy();
    }
  }

  auto operator=(const task &) -> task & = delete;

  auto operator=(task &&other) noexcept -> task & {
    if (std::addressof(other) != this) {
      if (m_coroutine != nullptr) {
        m_coroutine.destroy();
      }

      m_coroutine = std::exchange(other.m_coroutine, nullptr);
    }

    return *this;
  }

  auto is_ready() const noexcept -> bool {
    return m_coroutine == nullptr || m_coroutine.done();
  }

  auto resume() -> bool {
    if (!m_coroutine.done()) {
      m_coroutine.resume();
    }
    return !m_coroutine.done();
  }

  auto destroy() -> bool {
    if (m_coroutine != nullptr) {
      m_coroutine.destroy();
      m_coroutine = nullptr;
      return true;
    }

    return false;
  }

  auto operator co_await() noexcept {
    struct awaitable : public awaitable_base {
      auto await_resume() -> decltype(auto) {
        return this->m_coroutine.promise().result();
      }
    };

    return awaitable{m_coroutine};
  }

  auto handle() -> coroutine_handle { return m_coroutine; }

private:
  coroutine_handle m_coroutine{nullptr};
};

template <typename return_type>
inline auto promise<return_type>::get_return_object() noexcept
    -> task<return_type> {
  return task<return_type>{coroutine_handle::from_promise(*this)};
}

inline auto promise<void>::get_return_object() noexcept -> task<> {
  return task<>{coroutine_handle::from_promise(*this)};
}

coro.hpp引用的utils.hpp即第二个文件,该文件定义了一些工具函数,print_rbp函数用于打印寄存器rbp即栈基指针的值,这里添加了__attribute__((optimize("O0")))可以避免编译器优化导致rbp值打印不正确,然后重载了全局new函数并根据宏HOOK_MEMORY选择性启用,这样便于跟踪程序内存分配,SHOW_ADDRESS宏用来以十六进制格式打印地址数据。

// file: utils.hpp
#pragma once

#include <iostream>

// use optimize("O0") to avoid false rbp value
__attribute__((optimize("O0"))) void print_rbp(const char *func_name) {
  unsigned long long rbp_value;
  asm volatile("mov %%rbp, %0" : "=r"(rbp_value));
  std::cout << func_name << std::hex << std::showbase
            << " rbp value: " << rbp_value << std::dec << std::endl;
}

#define SHOW_ADDRESS(message, address)                                         \
  std::cout << message << ": " << std::hex << std::showbase << address         \
            << std::dec << std::endl

#ifdef HOOK_MEMORY
void *operator new(std::size_t size) {
  auto address = malloc(size);
  std::cout << "Allocating " << size << " bytes in address " << std::hex
            << std::showbase << address << std::dec << std::endl;
  return address;
}
#endif // HOOK_MEMORY

协程状态存储

C++ 协程的基本使用相信读者已经熟悉,从用户的角度看协程的调用与普通函数的调用最大差别就是协程调用需要一些额外的关键字来指明这是协程调用,例如下面一段代码:

// file: coro.cpp
// compile: g++ coro.cpp -std=c++20 -fcoroutines -O3 -o coro
#include "coro.hpp"

task<int> add(int a, int b) { co_return a + b; }

task<> func() {
  auto result = co_await add(1, 2);
  std::cout << "result: " << result << std::endl;
}

int main(int argc, char const *argv[]) {
  auto h = func();
  h.resume();
  return 0;
}

上面一段代码是利用协程函数func调用协程函数add,逻辑很简单就像在使用普通函数一样,我们将上述代码修改一下:

// file: coro.cpp
// compile: g++ coro.cpp -std=c++20 -fcoroutines -O3 -o coro
#include "coro.hpp"

task<> func() {
  std::cout << "begin func" << std::endl;
  co_await std::suspend_always{};
  std::cout << "end func" << std::endl;
}

int main(int argc, char const *argv[]) {
  auto h = func();

  bool can_resume = true;
  do {
    can_resume = h.resume();
    std::cout << "back to main" << std::endl;
  } while (can_resume);
  return 0;
}

编译运行结果如下:

begin func
back to main
end func
back to main

可以看到协程函数loop与main函数出现了交叉执行的情况,即协程函数的调用并不严格嵌套,那么便无法像普通函数一样将协程运行所需的内存全部分配到栈空间。

为了解决这个问题,编译器将协程运行所需要的全部状态划分为两个部分,分别用栈内存和堆内存存储,我们用协程栈帧和协程堆帧代指:

  • 协程堆帧: 协程关联的 promise 以及跨越调度点的局部变量等生命周期较长的对象需要存放在堆帧中

  • 协程栈帧: 对于未跨越调度点的局部变量以及与普通函数调用相同的过程采用栈帧存储

协程调用的过程是建立在普通函数调用基础上的,前者与后者相同的逻辑可以放在协程栈帧中存储,而一些特殊的生命周期较长的变量或内部状态需要用协程堆帧存储,而对于堆帧大小,编译器会自动计算出一个最合理的值来满足协程对堆内存的需求,综上栈帧与堆帧搭配既能保证协程的正确运行,又可以减少动态分配所消耗的内存。

为了更细节的描述这一过程我们再次使用前文协程函数func调用协程函数add的样例,读者可以看下图:

这里需要注意的是在编译器的实现中协程帧通常会由某个寄存器来存储,为了节省篇幅就交由读者自行验证。上图中func调用add时会将协程帧的地址压栈便于后续恢复,图中演示的情况为协程调用,如果是func调用普通函数其调用过程也与上图无太大差异。

协程内存分配优化

上文中提到对于堆帧大小,编译器会自动计算出一个最合理的值来满足协程对堆内存的需求,这句话该怎么理解呢?我们不坊修改一下func调用协程函数add的例子:

// file: coro.cpp
// compile: g++ coro.cpp -std=c++20 -fcoroutines -O3 -DHOOK_MEMORY -o coro
#include "coro.hpp"

// 添加 noop_awaiter 是为了观察 awaiter 的构造地址
struct noop_awaiter : std::suspend_never {
  noop_awaiter() { SHOW_ADDRESS("noop awaiter constructed in", this); }
};

task<int> add(int a, int b) { co_return a + b; }

task<> func() {
  auto result = co_await add(1, 2);
  std::cout << "result: " << result << std::endl;
  co_await noop_awaiter{};
}

int main(int argc, char const *argv[]) {
  auto h = func();
  h.resume();
  return 0;
}

同时在头文件coro.hpp中task的co_await重载运算符函数内做出临时修改:

⚠️临时修改表明在完成当前实验后应当及时删除

auto operator co_await() noexcept {
  struct awaitable : public awaitable_base {
    auto await_resume() -> decltype(auto) {
      SHOW_ADDRESS("task awaitable is constructed in address", this); // 临时添加
      return this->m_coroutine.promise().result();
    }
  };

  return awaitable{m_coroutine};
}

在我本地编译运行coro.cpp结果如下:

Allocating 80 bytes in address 0x5592f8587eb0
Allocating 64 bytes in address 0x5592f8588320
task awaitable is constructed in address: 0x5592f8587ee8
result: 3
noop awaiter constructed in: 0x5592f8587ef8

从上述结果中可以得出两个重要信息:

  • Allocating出现了两次表明发生了两次动态内存调用,分别用于协程函数func和add的内存分配,这也意味着每一个协程函数在其完整的生命周期中需要也仅需要一次动态内存分配

  • func的协程栈帧地址范围为0x5592f8587eb0~0x5592f8587f00,task awaiter和noop_awaiter的内存地址均在该范围内,这说明两个 awaiter 均被构造在了func的协程帧内,换句话说,awaiter的内存被分配在了调用者协程的协程帧内

总结来说,协程确实依赖动态内存但编译器足够智能,针对每个协程函数仅调用一次动态内存分配即可,从而避免多次动态内存分配造成得到性能损失。另外我想部分读者可能对awaiter的内存分配不太关注甚至说对其中的细节感到含糊不清,那么本次实验就向读者证明了 awaiter的内存被分配在了调用者协程的协程帧内,无需额外的动态内存分配,事实上协程函数体内的局部变量均在协程帧上分配内存。

从编译器视角看 promise

从用户的角度讲 promise 提供了协程的构造及析构时的具体行为并可以在协程运行过程中存储协程return value或yield value,从编译器的角度讲 promise 更像是编译器暴露给用户用来操纵协程帧的对象,且 promise 本身被构造在协程帧上。

下面代码摘自 gcc 编译器提供的 C++ 头文件coroutine并做了简化:

template <> struct coroutine_handle<noop_coroutine_promise> {
  struct __frame {
    static void __dummy_resume_destroy() {}

    void (*__r)() = __dummy_resume_destroy; // 协程句柄 resume 函数地址
    void (*__d)() = __dummy_resume_destroy; // 协程句柄 destroy 函数地址
    struct noop_coroutine_promise __p;
  };
  static __frame _S_fr;
  void *_M_fr_ptr = &_S_fr;
};

从上述代码可以看出协程帧=协程句柄 resume 函数地址 + 协程句柄 destroy 函数地址+promise,读者可能好奇这里存放的 resume 和 destroy 函数的作用,前文讲过协程的局部变量其生命周期有可能跨越调度点,对于这部分变量需要额外处理来保证协程 suspend 时其生命周期不会提前结束,当然协程内局部变量的生命周期管理是较为复杂的,这部分涉及的代码一般由编译器生成汇编函数并填充在协程帧的头部,用户对协程句柄调用resume和destroy即调用该函数。

代码展现的协程帧结构也表明协程句柄其本身指向的地址即协程帧地址,promise 的地址位于两个函数指针偏移量之后,正因如此 promise 和协程句柄可以相互转换。如果读者打算用自定义内存分配器为协程帧分配内存,那么只需要为 promise 定义operator new和operator delete即可。

那么当用户执行协程函数时编译器会利用 promise 做什么呢?读者可以看下面添加了注释的伪代码:

T some_coroutine(P param)
{
  // 为协程帧分配内存,如果 promsie 存在内存分配重载则利用 promise 的内存分配函数
  auto* f = new coroutine_frame(std::forward<P>(param));

  // 返回协程函数关联的面向用户的对象,在 Coro.hpp 中即 task,
  // returnObject 会在协程第一次陷入 suspend 状态后通过语句`return returnObject`被返回,
  auto returnObject = f->promise.get_return_object();

  // 控制协程创建时的调度逻辑
  co_await promise.initial_suspend();
  try
  {
    // 用户在协程函数体内定义的逻辑
    <body-statements>
  }
  catch (...)
  {
    // 处理协程运行中抛出的异常
    promise.unhandled_exception();
  }
FinalSuspend:
  // 控制协程结束时的调度逻辑
  co_await promise.final_suspend();
}

💡调度逻辑指的是协程在该处会被决定是处于 suspend 状态还是 continue 执行

❓returnObject 会在协程第一次陷入 suspend 状态后被返回,那为何要在一开始就获取? 在协程第一次陷入 suspend 状态前可能协程帧就被销毁,此时再获取 returnObject 会出现内存错误,举个例子,initial_suspend和final_suspend均返回suspend_never且<body-statements>也不会让协程陷入 suspend 状态,那么协程会直接执行完全部逻辑并销毁协程帧(编译器生成代码),此时就无法通过协程帧获取 returnObject

上述伪代码展示了用户在调用协程时编译器在背后所做的隐式工作,用户编写的协程体只对应<body-statements>这一部分,实际协程的执行还需要编译器做很多额外工作。

当协程体包含co_return和co_yield时编译器会隐式生成下列代码:

// 用户侧            编译器侧
co_return value <==> promise.return_value(value);
co_return       <==> promise.return_void(value);
co_yield  value <==> co_await promise.yield_value(value); // 注意 co_yield 会产生调度点

此时读者应该能理解协程调用和普通函数调用的差别以及背后的本质原理。

从编译器视角看 awaiter

用户通过co_await awaiter的形式可以控制协程的调度逻辑或者转移执行权,狭义的awaiter定义为内部实现了await_ready()、await_suspend(coroutine_handle<>)和await_resume方法的对象,广义的awaiter还包括可以通过 C++20 标准要求的特定运算符转化为狭义awaiter的 awaitable 对象,因此co_await awaitable同样是被允许的,具体转换逻辑读者应该已经在开源博客中有所学习,这里不再赘述并给出编译器获取awaiter时的伪代码逻辑:

template<typename P, typename T>
decltype(auto) get_awaitable(P& promise, T&& expr)
{
  if constexpr (has_any_await_transform_member_v<P>)
    return promise.await_transform(static_cast<T&&>(expr));
  else
    return static_cast<T&&>(expr);
}

template<typename Awaitable>
decltype(auto) get_awaiter(Awaitable&& awaitable)
{
  if constexpr (has_member_operator_co_await_v<Awaitable>)
    return static_cast<Awaitable&&>(awaitable).operator co_await();
  else if constexpr (has_non_member_operator_co_await_v<Awaitable&&>)
    return operator co_await(static_cast<Awaitable&&>(awaitable));
  else
    return static_cast<Awaitable&&>(awaitable);
}

当用户调用co_await expr时编译器会生成如下伪代码,注意返回std::coroutine_handle的await_suspend暂不考虑:

{
  auto&& value = <expr>;

  auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
  auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>(awaitable));

  // 如果 await_ready 返回 true 就不需要执行下面的逻辑从而提高效率
  if (!awaiter.await_ready())
  {
    using handle_t = std::coroutine_handle<P>;

    using await_suspend_result_t =
      decltype(awaiter.await_suspend(handle_t::from_promise(promise)));

    // 在该处协程陷入 suspend 状态,编译器生成代码来保存协程当前的运行状态
    <suspend-coroutine>

    if constexpr (std::is_void_v<await_suspend_result_t>)
    {
      awaiter.await_suspend(handle_t::from_promise(promise));
      <return-to-caller-or-resumer>
    }
    else
    {
      // 这里暂不考虑返回 std::coroutine_handle 的 await_suspend
      static_assert(
         std::is_same_v<await_suspend_result_t, bool>,
         "await_suspend() must return 'void' or 'bool'.");

      if (awaiter.await_suspend(handle_t::from_promise(promise)))
      {
        <return-to-caller-or-resumer>
      }
    }
    // 协程陷入 suspend 状态后恢复会从该处开始执行而不是从<suspend-coroutine>处
    <resume-point>
  }

  return awaiter.await_resume();
}

通过上述伪代码读者应该能明白为何 C++ 规定awaiter需要实现特定的三个接口。

如果将当前协程陷入暂停并转移至另一个协程执行的过程称为上下文切换的话,已有的有栈协程库的上下文切换是比较直接的,即保存当前协程状态然后切换至目标协程,而 C++ 实现的无栈协程,正如伪代码所示,会在上下文切换的过程中安插中间代码来控制切换逻辑,这对于执行一些异步操作来说是非常方便的,异步操作可以直接封装在awaiter的内部逻辑中,发起异步操作只需要调用co_await io_awaiter即可。

另外,发起异步操作例如回调式异步 IO 通常需要在堆中分配内存,因为某些数据结构需要保持持久的生命周期,而前文中提到awaiter是被分配在协程帧内,因此该部分数据结构可以直接封装在awaiter中,因为awaiter处于协程帧内,协程帧在协程暂停时也依然有效,所以awaiter天然的成为了保存这些生命周期持久的数据结构的容器,避免了不必要的动态内存分配,并且编译器会智能计算所需的堆内存大小保证一定可以容纳这些数据。

awaiter 与线程安全

协程通常会搭配线程池使用,例如下面一段伪代码:

func() 
{
  // codes part A
  co_await thread_pool.schedule(); // part B 部分交由线程池分配的另一个线程执行
  // codes part B
}

现有的很多协程库都采用了上述伪代码形式来将当前协程的执行转移到另一个线程,转移逻辑通常会在await_suspend函数中实现,具体可以实现为将协程句柄投放到别的线程。

如果读者自行实现的话需要注意,当await_suspend函数将协程句柄交由别的线程执行时可能会出现await_suspend函数还未返回,另一个线程已经把协程执行完毕并且销毁了协程句柄以及 promsie 对象,甚至awaiter也会被析构,此时await_suspend函数内只有访问局部变量才是安全的,因此读者应该对这种跨线程转移协程执行的逻辑保证其线程安全性。

剖析 C++ 协程大核心——await_suspend

通过利用co_await用户可以以同步的代码形式来实现异步执行的逻辑,这是因为co_await处会产生调度逻辑从而使得协程的执行像从该处断开一样,而该处我们定义为调度点。通过实现awaiter的await_suspend函数便能控制协程具体的调度逻辑,因此await_suspend函数对于协程库实现协程调度起着举足轻重的作用。

下图展示了coro.hpp中task通过co_await调用子task时发生的状态转移:

读者不难发现,虽然在用户代码中协程被调用就像普通函数被调用一样立刻得到执行,但实际上coro.hpp实现的调用逻辑是存在一些状态转移过程的,子协程被创建后会先处于暂停状态,由awaiter中的await_suspend函数恢复子协程运行,这一段暂停 + 恢复的过程在外侧看来就像是子协程被直接执行一样。

💡子协程创建后先暂停又立刻被恢复,为何不使其创建后直接运行呢? 在很多协程库的设计中,协程对象被创建有两种情况:

  • 直接创建:例如auto task = func(),func为协程函数,这种情况下协程通常会被交由调度器而不是创建后直接执行

  • 创建后被co_await:例如co_await func(),该情况处于正常的协程运行流程中,调用子协程应该直接运行

coro.hpp中对协程的设计正好符合上述要求。

C++20 标准为awaiter按返回值类型提供了 3 种await_suspend形式,为了简化书写我们定义三种代指名称:

  • void_await_suspend: void await_suspend(std::coroutine_handle)

  • bool_await_suspend: bool await_suspend(std::coroutine_handle)

  • handle_await_suspend: std::coroutine_handle await_suspend(std::coroutine_handle)

coro.hpp主要使用了handle_await_suspend,在部分读者眼里这三种形式似乎只有返回值类型的差别,但实际真是这样吗?接下来我们用实验验证分析,并添加coro.cpp作为用户侧代码:

// file: coro.cpp
// compile: g++ coro.cpp -std=c++20 -fcoroutines -O3 -o coro
#include "coro.hpp"

const int loop_num = 3;

task<> func() {
  print_rbp("func"); // 打印 func 栈指针
  co_return;
}

task<> loop() {
  for (int i = 0; i < loop_num; i++) {
    std::cout << "======= loop " << i << " =======" << std::endl;
    print_rbp("loop"); // 打印 loop 栈指针
    co_await func();
  }
}

int main(int argc, char const *argv[]) {
  auto h = loop();
  h.resume();
  return 0;
}

上述代码主要逻辑是loop协程循环调用func协程并在协程体内添加了打印栈指针的逻辑,具体为何需要打印栈指针这里先卖个关子,执行coro.cpp得到下列结果:

======= loop 0 =======
loop rbp value: 0x7fffc65ff240
func rbp value: 0x7fffc65ff260
======= loop 1 =======
loop rbp value: 0x7fffc65ff240
func rbp value: 0x7fffc65ff260
======= loop 2 =======
loop rbp value: 0x7fffc65ff240
func rbp value: 0x7fffc65ff260

可以看到在多次循环中各个协程函数执行时的栈指针不变,就像普通函数调用一样,这是很正常的结果,接下来我们会保持coro.cpp文件以及编译选项不变,通过修改coro.hpp来进行实验。

⚠️后续每小节针对coro.hpp的修改均为临时修改,请在小节结束后移除

void_await_suspend

为了区分出void_await_suspend和handle_await_suspend的差别,本节我们修改coro.hpp中关于协程的调度逻辑,不使用handle_await_suspend转而替换为void_await_suspend,具体的修改如下所示:

// 将 promise_base::final_awaitable::await_suspend 修改为如下:
template <typename promise_type>
auto await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
    -> void {
  auto &promise = coroutine.promise();
  if (promise.m_continuation != nullptr) {
    promise.m_continuation.resume(); // 子协程运行结束,恢复父协程运行
  }
}

// 将 task::awaitable_base::await_suspend 修改为如下:
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept
    -> void {
  m_coroutine.promise().continuation(awaiting_coroutine);
  m_coroutine.resume(); // 恢复子协程运行
}

此时运行coro.cpp,结果如下:

======= loop 0 =======
loop rbp value: 0x7fff6d93daa0
func rbp value: 0x7fff6d93da90
======= loop 1 =======
loop rbp value: 0x7fff6d93da60
func rbp value: 0x7fff6d93da50
======= loop 2 =======
loop rbp value: 0x7fff6d93da20
func rbp value: 0x7fff6d93da10

从结果可以看出代码正确运行了,这表明我们对coro.hpp修改的逻辑正确,但是每一行打印的rpb值都在降低,我们知道栈空间是往低地址方向增加的,即loop函数每调用一次func其调用栈便会加深,这看似普通的循环调用实际却在执行递归调用的逻辑,当循环次数足够多时便会产生栈溢出错误,读者可以尝试将loop_num修改为 100w 来验证是否会有栈溢出。

造成栈溢出的原因很简单,可以先看下图:

问题就出在final_suspend这一过程中,由于子协程运行完毕需要恢复父协程,所以在void_await_suspend中对父协程句柄调用resume函数,这就导致在子协程运行的末尾,即子协程调用栈弹出之前以调用函数(handle.resume)的形式恢复了父协程,这样每完成一次子协程调用的过程,其父协程函数的调用栈便会加深一层,因此在多次循环调用场景下产生栈溢出问题,具体更详细的执行细节读者可以添加更多日志来观察。

上文描述的void_await_suspend产生的栈溢出问题并不难理解,但读者应该会感到疑惑,为何使用了handle_await_suspend就不会产生这种问题?我们继续实验。

bool_await_suspend

既然使用void_await_suspend会产生栈溢出问题,那么使用bool_await_suspend呢?其实按照coro.hpp对协程的设计思路读者很容易就能猜出bool_await_suspend也会产生栈溢出问题,因为终究是要在final_suspend中对父协程句柄调用resume。

但 lewissbaker 在其博客中从另一个角度解决bool_await_suspend的栈溢出问题,其给出的伪代码如下:

class task::promise_type {
  ...

  std::coroutine_handle<> continuation;
  std::atomic<bool> ready = false; // true 表明父协程可以恢复运行
};

bool task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  promise_type& promise = coro_.promise();
  promise.continuation = continuation;
  coro_.resume(); // 恢复子协程运行,子协程因为被转移至别的线程池执行所以会提前返回

  return !promise.ready.exchange(true, std::memory_order_acq_rel);
}

void task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  promise_type& promise = h.promise();
  if (promise.ready.exchange(true, std::memory_order_acq_rel)) {
    promise.continuation.resume();
  }
}

上述代码能正确运行且不会产生栈溢出的前提条件是子协程被调度到了另一个线程执行,即子协程为如下形式:

task coro()
{
  // codes... partA
  // 该部分会在父协程所在线程执行
  co_await tp.schedule();
  // codes... partB
  // 该部分切换至其他线程执行
}

这样task::awaiter::await_suspend的coro_.resume()便可以在子协程执行结束前返回,也因此task::awaiter::await_suspend与task::promise_type::final_awaiter::await_suspend会存在多线程并行执行的关系,原子变量ready的作用便是保证父协程要么是在原线程恢复,要么是在子协程所在线程恢复,读者可以自行推演不同并发顺序恢复父协程的逻辑。

当父协程在原线程恢复,即task::awaiter::await_suspend返回false,那么父协程继续运行,子协程运行结束,不会产生嵌套调用栈,自然也不会有栈溢出。

当父协程在子协程所在协程恢复,即子协程在task::promise_type::final_awaiter::await_suspend对父协程句柄调用了resume,此时读者会有疑惑,这样不就又产生嵌套调用栈吗?下面我将为读者通过证明的形式解答疑惑。

💡证明:上述bool_await_suspend可以解决栈溢出问题 条件:父子协程跨线程执行

概念定义:定义父协程所在的线程为线程 A,子协程被转移执行后的线程为线程 B

证明过程:

  • 因为栈溢出问题可以微缩为父子协程调用问题,所以将问题重定义,即证明使用bool_await_suspend,线程 A 和线程 B 均不会产生栈溢出问题。

  • 无论父协程在哪个线程恢复,线程 A 均不会产生栈溢出问题,命名为定理 1

  • 父协程在线程 A 恢复时,线程 B 不会产生栈溢出问题

  • 父协程在线程 B 恢复时,看起线程 B 发生了栈嵌套,但基于父子协程跨线程执行这一条件,此时线程 B 角色转换为线程 A,按照定理 1,该情况下的线程 B 不会产生栈溢出问题。

  • 证毕。

  • 补充说明:首先读者应该理解,无论哪种情况,父协程所在线程一定不会产生栈溢出,唯一有疑惑的是第二种情况下的子协程所在线程可能会产生栈溢出,但是上述代码成立的条件是子协程一定会被跨线程调度,因此此时的子协程恢复了父协程后即使再次发起子协程调用,那么当前线程又变成了父协程所在线程,按照前述推演关系,子协程所在线程也不会产生栈溢出问题。

上述证明过程读者可能不太理解,不过没关系,这种做法一是需要保证父子协程跨线程执行,二是使用了原子变量这种昂贵的操作,三是父协程恢复的随机性会为程序执行带来不确定性,所以也不太会应用到实际中,综上bool_await_suspend也无法很好解决栈溢出问题。

对称转换优化

其实void_await_suspend和bool_await_suspend只是用来控制当前协程是否暂停,但我们想要使其拥有手动转移执行权到其他协程的能力,所以栈溢出问题产生了,简单而言就是我们对函数使用了错误的用法。

在最初的 C++ 设计提案中只存在void_await_suspend和bool_await_suspend,随后官方也发现了利用这两种情况转移协程执行权时会存在栈溢出问题,因此后来的提案中又添加了handle_await_suspend,这不是一个简单的函数,添加该函数主要是为了编译器实现对称转换优化来彻底解决栈溢出问题。

💡对称转换中的对称如何理解? 这里的对称可以理解为对称协程中的对称,只使用void_await_suspend和bool_await_suspend那么 C++ 协程在执行权只会回到其调用者,不能手动指定,因此 C++ 协程属于非对称协程,但利用对称优化,C++ 协程便可以实现对称协程的行为

什么是对称转换?当一个协程通过协程句柄的resume函数恢复另一个协程时,当前协程的栈空间会一直处于激活态,子协程运行完毕或陷入suspend状态后resume函数返回,这个过程就像是普通的函数调用一样,存在明显的调用者与被调用者关系,这种情况称为非对称转换。

那么假设协程调用子协程的过程发生了变化,当前协程会先让自身陷入suspend状态,弹出自身的栈帧,随后构造子协程的栈帧,恢复子协程运行,此时子协程运行完毕或陷入suspend状态后不会回到父协程,而是回到父协程的调用者,这个过程中看似是父协程调用了子协程,但实际是不存在显示调用与被调用关系的,这种情况被称为对称转换。

下图展示了对称转换与非对称转换的对比示意图:

读者此时应该会感到疑惑:对称转换如何应用呢?我们首先引入尾部调用这个概念,尾部调用的官方解释是当前函数栈帧会在实际发起子函数调用时被弹出,子函数返回时会直接返回到当前函数的调用方,就像上图一样,其大多数情况下发生在函数调用后紧跟return语句的情况。尾部调用作为一种优化手段,需要符合一些特定条件才可以执行该优化:

  • 调用方与被调用方遵循相同的调用规则,并且调用规则支持尾调用

  • 调用方与被调用方返回类型相同

  • 在返回到调用方之前,不需要在调用之后运行重要的析构函数

  • 调用不出在 try/catch 块中

具体每个条件的含义读者可以查看文末的参考博客,这里读者只需要理解对称转换优化与尾部调用含义相似,通过引入handle_await_suspend,编译器便可以对协程的执行进行对称转换优化,此时对协程句柄调用resume会被编译器优化为弹出当前栈帧并用jmp指令代替call指令来恢复协程。

我们再来回看下最初版的coro.hh使用handle_await_suspend的地方:

template <typename promise_type>
auto promise_base::final_awaitable::await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
    -> std::coroutine_handle<> {
  auto &promise = coroutine.promise();
  if (promise.m_continuation != nullptr) {
    // 返回父协程句柄,这里执行了对称转换优化
    return promise.m_continuation;
  } else {
    return std::noop_coroutine();
  }
}

auto task::awaitable_base::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept
    -> std::coroutine_handle<> {
  m_coroutine.promise().continuation(awaiting_coroutine);
  // 返回子协程句柄,这里执行了对称转换优化
  return m_coroutine;
}

handle_await_suspend通过返回协程句柄来使编译器完成对称转换优化。不过需要注意,对称转换优化是一种优化手段,对于 GCC 编译器需要至少开启-O2才可以利用该手段,否则无论哪种await_suspend形式都无法解决栈溢出问题。

❓按照上文描述task::awaitable_base::await_suspend采用了对称转换优化那运行coro.cpp结果中func和loop的rpb为何不一样?loop栈帧不是被弹出了吗?我试着把task::awaitable_base::await_suspend改为void_await_suspend的形式,为何又产生了栈溢出问题? 编译器实际是针对协程的整体进行优化的,读者只需要记住在需要手动明确将协程执行权转移到某个特定协程时尽量统一使用handle_await_suspend

三种 await_suspend 对比

handle_await_suspend其实是可以替换void_await_suspend和bool_await_suspend的,即:

// void_await_suspend
void await_suspend(std::coroutine_handle handle) {
  // codes...
  return
}
// 替换后
std::coroutine_handle await_suspend(std::coroutine_handle handle) {
  // codes...
  return std::noop_coroutine();
}
// bool_await_suspend 返回 false
bool await_suspend(std::coroutine_handle handle) {
  // codes...
  return false;
}
// 替换后
std::coroutine_handle await_suspend(std::coroutine_handle handle) {
  // codes...
  return handle;
}
// bool_await_suspend 返回 true
void await_suspend(std::coroutine_handle handle) {
  // codes...
  return
}
// 替换后
std::coroutine_handle await_suspend(std::coroutine_handle handle) {
  // codes...
  return std::noop_coroutine();
} 

通过引入std::noop_coroutine,handle_await_suspend便能代理其余两种await_suspend,那么既然handle_await_suspend功能这么强大,为何还需要使用void_await_suspend和bool_await_suspend呢?

请读者注意,我们前文讨论的对称转换优化是用来解决主动转移执行权至某个协程会遇到的栈溢出问题,void_await_suspend和bool_await_suspend主要控制当前协程执行是继续还是暂停,可以看做是一种被动转移执行权,因为这两种方式下当前协程暂停了执行权肯定转移到调用者,非常明确,我们无法手动转移,否则会产生栈溢出问题。

因此如果不需要主动转移执行权至某个协程这种逻辑,只需要使用void_await_suspend和bool_await_suspend就好了,在编译器看来二者的返回值情况较为简单,即利于编译器做分支预测等优化手段,而handle_await_suspend返回的协程句柄带有很大不确定性,总结起来void_await_suspend和bool_await_suspend相较handle_await_suspend功能更弱,但利于编译器优化,性能更好。

协程执行权转移分析

有了对称转换优化这一概念后,我们来实战分析一下协程运行中的执行权转移,协程的栈式调用是很多初学者感到迷惑的地方,因此用协程的栈式调用来分析协程执行权转移分析再适合不过了,下面为一段读者反馈的样例程序:

// file: coro.cpp
// compile: g++ coro.cpp -std=c++20 -fcoroutines -O3 -o coro
#include "coro.hpp"

task<> func(int i) {
  std::cout << "func " << i << " begin" << std::endl;
  if (i < 3) {
    co_await func(i + 1);
  } else {
    co_await std::suspend_always();
  }
  std::cout << "func " << i << " end" << std::endl;
  co_return;
}

int main(int argc, char const *argv[]) {
  auto h = func(0);

  bool can_resume = true;
  do {
    can_resume = h.resume();
  } while (can_resume);
  return 0;
}

这位读者想通过在main函数里循环调用resume来驱动全部函数执行完毕,程序输出如下:

func 0 begin
func 1 begin
func 2 begin
func 3 begin
func 0 end

很遗憾,调用链下层的函数都没有得到正确恢复,这是很多初学者会误犯的错误,原因很简单,fun(3)会执行co_await std::suspend_always(),请问读者执行权会返回到哪里?答案是main函数,此时main对调用栈最上层的协程调用resume其只会从co_await处继续往下执行,不会恢复调用链下层协程执行。

那问题又来了,为何调用链下层协程在执行co_await std::suspend_always()后执行权是直接回到了main?因为此时调用链的其他协程均处于suspend状态,对于协程当其co_await awaiter时下列三种情况会使其陷入 suspend 状态:

  • awaiter实现了void_await_suspend

  • awaiter实现了bool_await_suspend且返回 true

  • awaiter实现了handle_await_suspend且返回了非当前协程句柄

读者只需要记住一点,在协程的嵌套调用逻辑中,协程调用返回后其执行权会向调用链上层转移,直到遇见普通函数或者非suspend状态的协程函数,否则会一直向上转移。 coro.hpp实现的协程逻辑对应情况 3,所以每次发生调用时调用者协程均会陷入suspend状态,这也解释了上述程序为何没有输出预期结果。

那上述程序怎么修改保证让链式调用完整执行呢?这需要修改coro.hpp中的promise,如下图所示:

即让promise存储链式调用关系,main对调用链上层协程调用resume时恢复的其实是current指向的协程,这样就能保证调用链从上次中断点继续运行了,具体代码读者可以查看co_stack.cpp。

当然上述的调用链是通过在main函数里驱动调用链顶层来保证完整执行的,还有一种更简单的方法,即tinyCoro采用的方法,示意图如下:

只需要存储调用者与被调用者关系就好了,在特定的数据结构中保存的是调用栈中正在运行的协程句柄,这样恢复该句柄就可以顺序恢复整个调用链。

实验总结

通过从编译器视角看 promise 和 awaiter,以及对协程对称转换优化和协程执行权的分析,希望读者可以对协程有更深刻的认知。

参考文献

  • lewissbaker's blog

  • gcc-patches

  • cpp.std.coroutines.draft

  • luncliff/coroutine

  • Stack Frames Not Cleared After co_await

Previous🪐番外杂谈Next协程调用优化

Last updated 1 month ago

coroutine_compiler_view_p1
coro_call
void_await_suspend
symmetric_transfer
coro_stack_structure
work_thread