tinyCoroLab Docs
直通tinyCoroLab源码直通tinyCoro源码
latest
latest
  • 🚀开启你的tinyCoroLab之旅!
  • 📮致读者
  • 👊C++协程入门
    • 协程初探
    • 有栈协程VS无栈协程
    • C++协程入门实践
    • 从编译器视角揭秘 C++ 协程
    • 协程调用优化
  • 🌟认识io_uring
  • 📖tinyCoroLab实验介绍
  • 📘Lab1 构建协程任务封装
    • Lab1 构建协程任务封装
    • Lab1 实验解析
  • 📗Lab2 构建任务执行引擎
    • Lab2a 构建任务执行引擎engine
    • Lab2a 实验解析
    • Lab2b 构建任务执行引擎context
    • Lab2b 实验解析
  • 📙Lab3 封装异步I/O执行模块
  • 📕Lab4 构建基础协程同步组件
    • Lab4pre 如何构建协程同步组件
    • Lab4a 构建基础协程同步组件event
    • Lab4a 实验解析
    • Lab4b 构建基础协程同步组件latch
    • Lab4b 实验解析
    • Lab4c 构建基础协程同步组件wait_group
    • Lab4c 实验解析
    • Lab4d 构建基础协程同步组件mutex
    • Lab4d 实验解析
  • 📓Lab5 构建进阶协程同步组件
    • Lab5a(选做) 构建进阶协程同步组件when_all
    • Lab5a 实验解析
    • Lab5b 构建进阶协程同步组件condition_variable
    • Lab5b 实验解析
    • Lab5c 构建进阶协程同步组件channel
    • Lab5c 实验解析
  • ✨tinyCoro Bonus Lab
  • 🎯tinyCoro悬赏令
  • ⚔️面试实战
    • 面试实战
    • tinyCoro面试相关问题
  • 🚩实验总结-终点亦是起点
  • 🪐番外杂谈
    • 从编译器视角揭秘 C++ 协程
    • 协程调用优化
  • 📌更新日志
Powered by GitBook
On this page
  • tinyCoroLab3 实验简介
  • 📖lab3 任务书
  • 实验前置讲解
  • 🔖测试

📙Lab3 封装异步I/O执行模块

tinyCoroLab3 实验简介

本节我们将正式开始 tinyCoroLab3,即封装异步 I/O 执行模块。在前面的实验中我们完成了执行引擎的构建,这表明 tinyCoro 此时已经正式可以对外提供服务了,但是库终究是要给用户使用的,我们要简化用户发起异步 I/O 的流程,这也是本次实验的核心任务所在。实验者应当了解 liburing 的设计目标之一是良好的拓展性,而本节实验正是要让实验者见证liburing 强大的可拓展性如何简化库开发者的开发流程。

预备知识

⚠️预备知识即在实验开始前你应该已经掌握的知识,且在知识铺垫章节中均有涉及

  • io_uring 的概念以及 liburing 的使用

  • C++ 协程 awaiter 的概念

📖lab3 任务书

实验前置讲解

💡本节演示的代码以及链接均采用tinyCoro v1.1分支,最新版v1.2已对IO部分进行重构,但核心逻辑无变化,读者可根据需要切换至v1.1分支,在最终测试时切换回最新版本即可

不同于前面的实验,lab3 不需要实验者实现任何代码,所有代码都已经预先实现好,但代码的正确运行依赖于实验者在前置实验中对 tinyCoro 的正确实现,所以 lab3 仍然有功能测试,如果测试不通过实验者需要自行检查代码逻辑。

下面我们正式开始 lab3 的实验前置讲解,所涉及的核心代码均在文件夹include/coro/net和src/net中,实验者需要预先打开文件浏览大致代码结构,下面针对该文件内容进行讲解。

首先是include/coro/net/io_info.hpp定义了执行 IO 所需要的一些基本类型定义,核心是io_info结构体,其定义以及各字段含义如下:

struct io_info
{
    coroutine_handle<> handle; // IO 绑定的协程句柄
    int32_t            result; // IO 执行完的结果
    io_type            type; // IO 类型
    uintptr_t          data; // IO 绑定的内存区域
    cb_type            cb; // IO 绑定的回调函数
};

然后转到include/coro/net/base_awaiter.hpp,用户发起的异步 IO 均是通过在 C++ 协程的 awaiter 中发起的,base_io_awaiter为所有 IO 相关的 awaiter 提供了一个基类并实现了 awaiter 的全部调度逻辑。当base_io_awaiter被构造时会自动从当前上下文绑定的 engine 获取一个 sqe,当其被 co_await 时base_io_awaiter会在await_suspend中记录调用协程的句柄并使该协程陷入 suspend 状态。而在 IO 完成后会通过await_resume返回 IO 的执行结果。

💡base_io_awaiter 的 await_suspend 返回 void,那岂不是不论发起什么样的 IO,协程均会陷入 suspend 状态? 是的,即使协程发起了一个轻量级的 IO 操作,但毕竟涉及到系统调用并不推荐以同步的方式等待完成。由于协程会陷入 suspend 状态所以针对某个协程可能其全部执行耗时比同步方式更长(因为执行引擎转移了执行权到其他协程),但在大量 IO 请求的情况下总体耗时会更短。

在定义好base_io_awaiter后我们就可以开始批量生产 IO 操作相关的 awaiter 了!请实验者查看文件include/coro/net/io_awaiter.hpp和src/net/io_awaiter.cpp中的代码,我们以接受 tcp 连接举例,其定义如下:

class tcp_accept_awaiter : public detail::base_io_awaiter
{
public:
    tcp_accept_awaiter(int listenfd, int flags) noexcept;

    static auto callback(io_info* data, int res) noexcept -> void;

private:
    inline static socklen_t len = sizeof(sockaddr_in);
};

首先与 IO 绑定的 awaiter 其构造函数应该包含发起 IO 需要的数据,比如tcp_accept_awaiter构造函数就包括监听文件描述符和标志位,然后所有的 awaiter 都必须添加一个回调函数且形式全部统一,该回调函数入参是io_info和 IO 操作返回值。

在构造函数中我们应该将数据填充到 sqe 中,具体步骤以及注释如下:

tcp_accept_awaiter::tcp_accept_awaiter(int listenfd, int flags) noexcept
{
    m_info.type = io_type::tcp_accept; // 绑定 io 类型
    m_info.cb   = &tcp_accept_awaiter::callback; // 绑定回调函数

    io_uring_prep_accept(m_urs, listenfd, nullptr, &len, flags); // 为 sqe 填充数据
    io_uring_sqe_set_data(m_urs, &m_info);  // 将 m_info 绑定到 sqe 中
    local_engine().add_io_submit(); // 告诉执行引擎当前存在一个 IO 待提交
}

需要注意的是io_uring_prep_accept是 liburing 提供的用于发起接受 tcp 连接操作的 API,而 liburing 本身支持的 IO 操作种类非常多,且均以io_uring_prep_XXX的形式命名,因此要想扩展 tinyCoro 对 IO 的支持,只需要查阅 liburing 手册查找相关 IO 对应的 API,然后仿照tcp_accept_awaiter的写法就可以了。

io_uring_sqe_set_data(m_urs, &m_info)用于将 io_info 绑定到 sqe 中,这样在完成 IO 获得 cqe 后就可以根据 io_info 恢复处于 suspend 状态的协程了。

另外是回调函数,代码及注释如下:

auto tcp_accept_awaiter::callback(io_info* data, int res) noexcept -> void
{
    data->result = res; // 在 io_info 中设置返回值,之后在 io awaiter 的 await_resume 函数中返回

    // 向当前上下文绑定的 context 提交协程句柄来恢复等待 io 的协程,
    // 在长期运行模式下也可以调用 submit_to_scheduler
    submit_to_context(data->handle);
}

那么该回调函数怎么发挥作用呢?在 lab2a 中我们提到实验者只需要对从 io_uring 取出的 cqe 调用预先实现的handle_cqe_entry即可,而该函数的代码以及注释如下所示:

auto engine::handle_cqe_entry(urcptr cqe) noexcept -> void
{
    // 取出绑定的 io_info
    auto data = reinterpret_cast<net::detail::io_info*>(io_uring_cqe_get_data(cqe));
    // 调用 io_info 绑定的回调函数
    data->cb(data, cqe->res);
}

综上,我们就正式完成了 tinyCoro 对于接受 tcp 连接的支持,而include/coro/net/io_awaiter.hpp中定义的其他 IO 比如向 tcp 发送和接收数据相关的 awaiter 也是同理,那么此时构建 tcp 服务器和客户端只差一步之遥了!

此时实验者请打开include/coro/net/tcp.hpp和src/net/tcp.cpp,其中定义了 tinyCoro 对于 tcp 的简易支持,在 tcpclient 和 tcpserver 构造函数内是常见的 socket 编程,实验者应该并不陌生,但注意tcp_server::accpet、tcp_client::connect等 IO 相关的函数返回的是与 IO 关联的 awaiter,通过co_await awaiter的方式就可以完成 IO 的发起了,而有了该文件定义的 tcp 相关的类我们就可以正式用 tinyCoro 搭建 tcp 程序了。

在文件夹examples中的tcp_echo_server.cpp、tcp_echo_client.cpp和stdin_client.cpp为实验者提供了一些实例,我们用最为有趣的 stdin_client 举例,该程序实现了一个 tcp 客户端并且可以支持用户终端输入发送到 tcp 服务端,其代码如下:

#include "coro/coro.hpp"

using namespace coro;

#define BUFFLEN 10240

task<> echo(int sockfd)
{
    char buf[BUFFLEN] = {0};
    int  ret          = 0;
    auto conn         = net::tcp_connector(sockfd);

    while (true)
    {
        ret = co_await net::stdin_awaiter(buf, BUFFLEN, 0);
        log::info("receive data from stdin: {}", buf);
        ret = co_await conn.write(buf, ret);
    }
}

task<> client(const char* addr, int port)
{
    auto client = net::tcp_client(addr, port);
    int  ret    = 0;
    int  sockfd = 0;
    sockfd      = co_await client.connect();
    assert(sockfd > 0 && "connect error");

    submit_to_scheduler(echo(sockfd));

    char buf[BUFFLEN] = {0};
    auto conn         = net::tcp_connector(sockfd);
    while ((ret = co_await conn.read(buf, BUFFLEN)) > 0)
    {
        log::info("receive data from net: {}", buf);
    }

    ret = co_await conn.close();
    assert(ret == 0);
}

int main(int argc, char const* argv[])
{
    /* code */
    scheduler::init();
    submit_to_scheduler(client("localhost", 8000));

    scheduler::loop();
    return 0;
}

我们首先开启一个终端运行下列指令来开启一个监听 8000 端口的 tcp 服务器:

nc -lk 8000

然后编译构建 tinyCoro,开启一个终端运行 stdin_client:

mkdir build
cd build
cmake ..
make
./bin/stdin_client

此时实验者可以在 tcp 服务端输入字符并回车发送,stdin_client 会打印出接收的字符,然后实验者在 stdin_client 运行的终端中输入字符并回车发送,同样也会看到 tcp 服务端输出接收的字符。

最后让我们用一张图来总结 io awaiter 是如何支持 tinyCoro 实现 IO 操作的吧!

🔖测试

实验前置环境

  • 安装 rust 的构建工具 cargo

  • 安装 python3.7 及以上版本并确保命令行输入python或python3是可以正确启动的

功能测试

本节实验测试略微复杂,对于 tinyCoro IO 的功能测试,我们采用rust_echo_bench,这是一种可以对 tcp 服务器测试 qps 的压测工具,而 tinyCoroLab 自身在 third_party 里就包含了该工具,首先在项目构建目录下执行下列指令对压测工具进行构建(实验者需要预先安装 rust 的构建工具 cargo):

make build-benchtools

构建 lab3 测试程序:

make build-lab3

测试要求是使用 rust_echo_bench 在 100 个并发连接、负载为 1kbyte 且持续时长为 30s 的情况下对由 tinyCoro 搭建的tcp_echo_server进行压测,只要保证 rust_echo_bench 顺利输出结果就行。

一轮完整的测试流程如下所示:

  • step1.启动 tcp_echo_server,等待 2s

  • step2.检查 tcp_echo_server 是否仍在运行,如果停止运行则测试不通过,退出测试

  • step3.启动 rust_echo_bench,等待 2s

  • step4.检查 rust_echo_bench 是否仍在运行,如果停止运行则测试不通过,退出测试

  • step5.等待 40s

  • step6.检查 tcp_echo_server 是否仍在运行,如果停止运行则测试不通过,退出测试

  • step7.检查 rust_echo_bench 是否仍在运行,如果正在运行则测试不通过,退出测试

  • step8.测试通过,打印测试结果

测试主程序为tests/lab3_test.py,需要注意的是在测试文件开头由这样一行:

# test cases: [(<threadnum>, <port>) ...]
paras = [(1, 8000), (0, 8001)]

列表长度代表测试轮数,(1, 8000)表示 tcp_echo_server 只开启一个 context 并监听 8000 端口,(0, 8001)表示 tcp_echo_server 开启与本地机器 CPU 逻辑核心数相同的 context 数量并监听 8001 端口,实验者可以更改该列表中指定的端口号,但不能修改其他逻辑。

最后,在构建目录下通过下列指令来运行 lab3 的测试:

make test-lab3

测试程序会打印出详细结果,测试成功会提示 pass。

PreviousLab2b 实验解析Next📕Lab4 构建基础协程同步组件

Last updated 15 days ago

ch08_lab3_p1