Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

快速入门

本章节旨在说明如何基于 ASCO 异步框架编写和构建一个异步程序。

选择一种启动异步程序的方式

异步主函数

通常来说,future/promise 异步模型的异步函数具有传染性,需要一个异步的“主函数”作为程序的起点, 通过链接目标 asco-main (见 cmake目标段落)导入支持异步“主函数”的基本环境。

然后,在你的源代码中导入头文件 <asco/future.h> 后,在全局命名空间下定义一个返回 asco::future<int> 的异步函数 future<int> async_main() , 不能接收任何参数,通过 co_return 返回退出状态码。异步程序将会从此函数开始。

有关如何获取命令行参数和环境变量,见 future<T> 章节。

asco-main 实际上是在 main 函数中调用 async_main阻塞等待协程返回来实现的。

#include <asco/future.h>

using asco::future;

future<int> async_main() {
    co_return 0;
}

同步主函数

在实际的工程中,引入异步主函数会有很多困难,比如:

  • 在引入异步框架前可能已经定好了程序的架构,难以从主函数开始将程序重构为异步程序;
  • 或者可能仅仅需要一部分异步特性,没必要将整个程序都重构为异步程序;
  • 甚至主函数可能实际上是其它第三方库提供的,无法改为异步函数。

因此,asco 同样支持主函数保持不变,从同步上下文调用异步函数。 当然,这样并不能直接获取异步函数的返回值,需要通过阻塞等待异步任务组合来间接处理。

链接目标 asco-base 即可用这种方式将异步环境引入你的程序。

cmake目标

  • asco: ASCO 运行时环境,静态目标。
  • asco-main: 选用异步主函数方式启动需要链接的目标,静态目标。
  • asco-base: 选用同步主函数方式启动需要链接的目标,静态目标。

后二者不能同时链接。

  • asco-shared: 目标 asco 的动态版本。
  • asco-main-shared: 依赖 asco-sharedasco-main ,静态目标。
  • asco-base-shared: 依赖 asco-sharedasco-base ,静态目标。

如需使用动态库,请使用 -shared 后缀的目标。

未来可能会加入更多构建系统的支持。

future<T> 协程函数下的异步编程

asco::future<T>是C++20 coroutine的一个等待器( awaiter ),它与std::future<T>没有任何联系。

asco::future<T> (后简称 future<T> )作为异步函数的返回值,表示该函数将在未来某个时刻返回一个T类型的值。 调用方可以在异步函数中使用 co_await ,或在同步函数中调用 future<T>::await() 等待异步函数返回并获取返回值。


异步主函数

在全局命名空间中、名为 async_main 、没有形参、返回值为 asco::future<int> 的函数是异步主函数:

#include <asco/future.h>

future<int> async_main() {
    ...
    co_return 0;
}

异步主函数返回后运行时立即销毁,无法创建新协程,但是未完成的协程可以正常运行和销毁。

使用 runtime::sys::args() 获取命令行参数, runtime::sys::env() 获取环境变量1:

using asco::runtime::sys;
future<int> async_main() {
    for (auto& arg : sys::args()) {
        std::cout << arg << std::endl;
    }
    for (auto& [key, value] : sys::env()) {
        std::cout << key << " = " << value << std::endl;
    }
    co_return 0;
}

asco_main 使用默认配置1创建异步 asco 运行时并对 async_main 函数的返回值调用 .await()


核心机制

  • 将任意一个返回future<T>的函数,称为 asco 异步函数

asco 异步函数被调用时,立即将此函数作为一个任务发送给 asco 异步运行时并返回future<T>对象 ,异步任务将不会立即开始执行,而是等待调度器调度。

asco 异步函数中使用 co_await 时,当前任务挂起,等待 co_await 表达式返回结果。任务挂起时,调度器不会调度此任务。

co_await 表达式返回结果时,当前任务恢复,等待调度器调度。

asco 异步函数中使用 co_return 时,将返回值移动2给调用方,当前任务挂起并等待 asco 异步运行时稍后清理任务。


future<T> 的变体

future_inline<T>

future_inline<T> 的功能与 std::future 相同,但是它被创建时不会被发送给 asco 异步运行时,而是直接将协程挂起。 当此对象被 co_await 时,协程在当前上下文中被当场恢复,执行完毕后返回。

此等待器适用于本身十分短小但不得不执行异步代码的函数。

future_core<T>

future_core<T> 的功能与 std::future 相同,但是它创建核心任务, 核心任务不可以被窃取且优先发送至 calculating worker 工作线程1

此等待器适用于 CPU 密集型任务。

在开启了超线程的 Intel 混合架构处理器(“大小核架构”)的 CPU 上, calculating worker 工作线程将运行在高性能核心(“大核”)上, 高能效核心(“小核”)均为 io worker 工作线程。 在未来,对于ARM big.LITTLE异构架构处理器(“大小核架构”)的安卓设备, calculating worker 工作线程将运行在大核上。


有关协程之间引用的传递

协程的自动储存期变量根据不同情况有不同的储存位置:

  • 变量的所有访问行为没有跨过任何协程暂停点,变量会被分配到当前工作线程的线程栈中。
  • 变量的所有访问行为跨过了协程暂停点,变量会存在于协程状态对象中,随着协程的创建和销毁而构造和析构。

由于协程可能会分配到不同的工作线程中执行,前一种自动储存期变量的引用不可以在协程间传递。以 thread_local 关键字声明的变量与这种情况相同, 不可以将它的引用在协程间传递。

以这段代码为例:

condition_variable decl_local(cv);
bool flag = false;
auto t = [&flag] -> future_void {
    condition_variable coro_local(cv);
    co_await cv.wait([flag]{ return flag; });
    co_return {};
} ();
flag = true;
cv.notify_one();
co_await t;

变量 flag 没有跨过任何协程暂停点,因此它将在当前工作线程的线程栈中被分配。

讨论两种情况:

  • 当前协程与 lambda 表达式在同一工作线程中被调度执行,当前协程执行至 co_await t 挂起后,线程栈退出当前栈帧, flag 变量失效, lambda 表达式中捕获的 flag 将引用一个有效但不合法的地址,是未定义行为。
  • 当前协程与 lambda 表达式不在同一工作线程中被调度执行, lambda表达式中的变量 flag 引用了一个其它线程中的地址, 通常这个地址是无效的,触发段错误,如果这个地址在当前工作线程中恰好是有效的,则是未定义行为。

错误处理

支持使用 try-catch 捕获异常。

未捕获的异常将传递给调用方,在调用方 co_await.await() 时抛出。

noexcept 会被忽略,以相同的方式传递异常。

本框架不限制其它错误处理方式的使用,也不提供其它错误处理方式的基础设施。

asco::exception

<asco/exception.h>

自带堆栈追踪和异步函数调用链追踪的异常类,若需要在 asco 异步运行时中抛出带有堆栈追踪的异常,请直接使用或派生此类。

此类的构造函数接收一个 std::string 参数作为异常的 what() 信息,派生类无需自己重载 const char *what() noexcept 函数。

若此类没有在 asco 异步运行时中构造,将会抛出 asco::runtime_error

  • 注:异步函数调用链追踪需要关闭优化( -O0 )才能获取正确的地址、函数签名和源代码位置。

asco::runtime_error

<asco/rterror.h>

自带堆栈追踪的异常类,用于运行时内部的异常处理。


协程睡眠

睡眠指定的时间间隔:

future_void_inline sleep_for(std::chrono::duration<Rep, Period>)

duration 类型包括标准库中任意的时间间隔类型如 nanosecondsmiliseconds 等,以及它们对应的字面值字符串运算符。

睡眠至指定的时间点:

future_void_inline sleep_until(std::chrono::time_point<Clock, Duration>)

协程本地变量

协程本地变量沿调用链传播。使用基于编译期计算哈希值的类型检查和变量名查找,查找变量名时沿调用链一路向上搜索。

使用宏 decl_local(name, ...)decl_local_array(name, ptr) 声明及初始化协程本地变量

int decl_local(i);
i += 5;
std::string decl_local(str, new std::string("Hello ASCO"));
int *decl_local_array(arr, new int[10]);

使用宏 coro_local(name) 获取协程本地变量

int *coro_local(arr);
std::string coro_local(str);
for (char c : str) {
    std::cout << c << ' ';
}

注意

若变量类型的模板参数中具有自动推导的模板参数,其自动推导无法传递至开头的类型声明处,需要手动指定。

若在变量构造处变量类型的模板参数中具有可以自动推导的匿名 lambda 表达式,需要显式填入模板参数,否则类型验证会失效。


可打断协程

future 调用 .abort() 递归打断这个任务以及这个任务正在挂起等待的子任务,如果任务正在挂起,立即唤醒,被唤醒的协程应正确处理打断。

协程函数需要自己实现被打断时的恢复功能,以将状态恢复到协程开始执行前。

如果你的协程没有实现可打断特性,请谨慎使用于 asco 提供的依赖可打断特性的功能。

asco 内部大多数异步函数都具有可打断支持,如信号量的 .acquire() 函数:

asco::binary_semaphore sem{1};
auto task = sem.acquire();
task.abort();
// acquire() 返回 future_void_inline 类型,需要手动 co_await 使任务开始执行
try { co_await task; } catch (coroutine_abort &) {}
assert_eq(sem.get_counter(), 1);

恢复任务状态

asco 异步函数中调用 bool this_coro::aborted() ,返回 true 时执行状态恢复逻辑或缓存已得到的结果供下次调用时使用, 然后立即 throw coroutine_abort{} 。此处的代码称为打断判定点。如果没有抛出此异常直接返回,则是未定义行为。 此异常会继续在调用者 co_await 后抛出,若不使用 try-catch 捕获,还可以使用 future<T>::aborted()3 对子任务被打断的情况进行处理。

编写可打断协程最佳实践:在每个协程暂停点4前后设置一个打断判定点,并在 co_return 之后利用 raii 设置一个打断判定点

co_return 后,析构阶段无法抛出异常,但是可以通过 this_coro::throw_coroutin_abort() 直接让协程抛出异常。

以本项目的 channel::reveiver<T>::recv() 为例:

可以看到,每个打断判定点外的 co_return 前都有一个打断判定点。

协程的自动储存期变量(通常所谓的本地变量,这里与协程本地变量作区分使用此名称)会在 co_return 后按照初始化相反的顺序析构。 因此,变量 restorer 的存在使得协程在返回后依然有机会判断是否被打断。

在每个 co_returnthrow coroutine_abort{} 前,都设置 restorer.state 的值,因此, restorer 的析构函数可以在不同的 co_returnthrow coroutine_abort{} 后执行不同的恢复操作。

在此期间,可以使用 T &&this_coro::move_back_return_value<future<T>, T>() 将返回值移动回当前上下文以避免其被丢弃。

[[nodiscard("[ASCO] receiver::recv(): You must deal with the case of channel closed.")]]
future_inline<std::optional<T>> recv() {
    struct re {
        receiver *self;
        int state{0};

        ~re() {
            if (!this_coro::aborted())
                return;

            this_coro::throw_coroutine_abort<future_inline<std::optional<T>>>();

            switch (state) {
            case 2:
                self->buffer.push_back(
                    this_coro::move_back_return_value<future_inline<std::optional<T>>, std::optional<T>>());
            case 1:
                self->frame->sem.release();
                break;
            default:
                break;
            }
        }
    } restorer{this};

    if (none)
        throw asco::runtime_error(
            "[ASCO] receiver::recv(): Cannot do any action on a NONE receiver object.");
    if (moved)
        throw asco::runtime_error("[ASCO] receiver::recv(): Cannot do any action after receiver moved.");

    if (this_coro::aborted()) {
        restorer.state = 0;
        throw coroutine_abort{}
    }

    if (!buffer.empty()) {
        std::optional<T> res{std::move(buffer[0])};
        buffer.erase(buffer.begin());
        restorer.state = 2;
        co_return std::move(res);
    }

    co_await frame->sem.acquire();

    if (this_coro::aborted()) {
        frame->sem.release();
        restorer.state = 0;
        throw coroutine_abort{}
    }

    if (frame->sender.has_value()) {
        if (is_stopped()) {
            restorer.state = 1;
            co_return std::nullopt;
        }

        if (*frame->sender == *frame->receiver)
            throw asco::runtime_error(
                "[ASCO] receiver::recv(): Sender gave a new object, but sender index equals to receiver index.");

    } else if (*frame->receiver == FrameSize) {
        // go to next frame.
        auto *f = frame;
        if (!f->next)
            throw asco::runtime_error(
                "[ASCO] receiver::recv(): Sender went to next frame, but next frame is nullptr.");
        frame = f->next;
        delete f;
        frame->receiver = 0;

        co_await frame->sem.acquire();

        if (this_coro::aborted()) {
            frame->sem.release();
            restorer.state = 0;
            throw coroutine_abort{}
        }

        if (is_stopped()) {
            restorer.state = 1;
            co_return std::nullopt;
        }

        if (frame->sender && *frame->sender == *frame->receiver)
            throw asco::runtime_error(
                "[ASCO] receiver::recv(): Sender gave a new object, but sender index equals to receiver index.");
    }

    restorer.state = 2;
    co_return std::move(((T *)frame->buffer)[(*frame->receiver)++]);
}

  1. asco 异步运行时 ↩2 ↩3

  2. std::move(),模板参数 T 必须实现移动构造函数移动赋值运算符

  3. 任务打断处理,见任务组合

  4. C++20 coroutine 使用的术语,指 co_awaitco_yieldco_return

任务组合

在实际的工程项目中,各种任务通常以多种逻辑关系和时序关系组合在一起,而不是单纯的等待任务完成后再进行下一个任务, ASCO 提供了一些任务组合方式。

另外,除 select<N> 外,大多数组合函数都支持在同步上下文中调用。


非阻塞的串行任务组合 future::then()

future<T>::then() 用于将一个异步操作的结果传递给下一个异步操作,实现异步任务的串行组合。 它允许你在前一个 future 完成后,自动将结果传递给指定的异步函数,并返回一个新的 future,表示整个组合任务的最终结果。

同时它返回另一个 futur ,可以继续对它进行任意的任务组合操作。

用法

future<int> f1 = some_async_func();
auto f2 = std::move(f1).then([](int result) -> future<std::string> {
    // 可以使用 result 进行后续异步操作
    co_return std::to_string(result);
});

then() 接收一个右值引用的 deduced this,通常在异步函数调用后直接调用 then() 即可,这里为了表示返回值类型, 特意先构造一个 future 对象,再将它移动。

传入的参数如果是 lambda 表达式,其参数不可以使用 auto 自动推导。

行为说明

  • then() 会等待当前 future 完成,将其结果作为参数传递给 f
  • then() 返回的 future 表示整个链式异步操作的最终结果。
  • 支持异常传递,若前一个 future 抛出异常,则异常会传递到 then 返回的 future。

注意事项

  • then() 只支持异步函数(返回 future)的链式组合。
  • 若在 future_inline 上调用 then,会自动将任务迁移到当前 worker 上执行。

异步的异常捕获 future::exceptionally()

future<T>::exceptionally() 用于为异步任务链提供异常处理能力。当前一个 future 抛出指定的异常时, exceptionally 可以捕获并执行用户自定义的异常处理逻辑,如果抛出的不是所指定的异常,则继续重抛至 exceptionally 自己的 future。

用法

future<int> f = some_async_func();
auto f2 = std::move(f).exceptionally([](const std::runtime_error& e) {
    std::cout << "caught: " << e.what() << std::endl;
});

exceptionally() 接收一个右值引用的 deduced this,通常在异步函数调用后直接调用即可。 传入的参数可以是 lambda 表达式或其它可调用对象,参数类型需为异常类型或异常类型的引用或 std::exception_ptr

行为说明

  • exceptionally() 会等待当前 future 完成 。
  • 如果当前 future 正常返回,则 exceptionally 返回的 future 也正常返回原值。
  • 如果当前 future 抛出异常,且异常类型与处理函数参数类型匹配,则调用处理函数,并返回 std::unexpected 包装的错误类型。
  • 如果异常类型不匹配,则异常会继续向上传递。
  • 返回类型为 future<std::expected<T, E>>,其中 E 由处理函数的返回类型自动推断。

注意事项

  • 处理函数参数类型必须能匹配 future 抛出的异常类型,否则异常不会被捕获。
  • 处理函数可以为 lambda、函数指针、std::function 等。
  • 处理函数返回类型可以为 void,此时异常类型为 std::monostate,否则为实际返回类型。
  • 若需捕获所有异常,可使用 std::exceptionstd::exception_ptr 作为参数类型。

协程打断异常

当协程被打断(如通过 future.abort()select<N> 选择逻辑打断其它分支)时,应抛出 coroutine_abort 异常。 你可以通过以下两种方式处理协程打断:

  1. 手动 try-catch 捕获

    在协程体内用 try { ... } catch (const coroutine_abort&) { ... } 捕获打断异常,实现自定义清理或日志逻辑。

  2. 使用 future.aborted() 统一处理

    通过 future<T>::aborted() 方法,可以为打断异常单独指定处理逻辑。例如:

auto f = some_async_func().aborted([] {
    std::cout << "协程被打断" << std::endl;
});

当协程被打断时,aborted() 传入的处理函数会被调用,并返回 std::nullopt,否则正常返回结果。


竞态任务组合(选择逻辑) select<N>

选择最先返回的协程继续运行,打断其它未返回或后返回的协程。

asco::interval in1s{1s};
asco::interval in500ms{500ms};
for (int i{0}; i < 6; i++) {
    switch (co_await asco::select<2>{}) {
    case 0: {
        co_await in1s.tick();
        std::cout << "1s\n";
        break;
    }
    case 1: {
        co_await in500ms.tick();
        std::cout << "500ms\n";
        break;
    }
    }
}

选择器将当前协程克隆出 N 个协程并同时唤醒运行。对构造的 select<N> 对象 co_await 后按协程被克隆的顺序返回 size_t 类型的值。

选择器仅对 select<N> 对象返回后的第一个异步任务有效。

最先返回的异步任务会将其它任务打断,因此即使后来的协程的已经返回,也会根据前文规定的可打断特性将其影响的状态恢复。

被打断的协程会将其调用者一并销毁;如果正确使用了 select<N> ,其调用者总是被克隆的 N 个协程中的 N-1 个。

异步编程基本工具

本节介绍了异步编程的基本工具,包括异步编程的基本工具函数、管道、定时器等。

定时器

导入 <asco/time/interval.h> 头文件使用。

定时器每超过固定时间产生一次 tick

定时器无法保证完全精确,其精度受操作系统调度的影响。

调用方法 asco::interval::tick()co_await ,协程挂起至定时器到达唤醒点,或当时间过短直接忙等待至唤醒点。

使用例:

#include <asco/future.h>
#include <asco/time/interval.h>

using asco::future, asco::future_void;
using asco::interval;
using namespace std::chrono_literals;

future_void foo() {
    interval in(1s);
    for (int i = 0; i < 10; i++) {
        co_await in.tick();
        std::cout << "tick foo" << std::endl;
    }
    co_return {};
}

future<int> async_main() {
    auto task = foo();
    interval in(500ms);
    for (int i = 0; i < 10; i++) {
        co_await in.tick();
        std::cout << "tick async_main" << std::endl;
    }
    co_return 0;
}

通道

导入 <asco/channel.h> 头文件使用。

通道与管道类似,但是通道完全由用户态代码实现,支持在 asco 运行时中阻塞读取。

通道分为三部分:管道本体、接收端和发送端。管道本体是无法被访问的。用户只需使用接收端和发送端即可。

发送端( asco::sender<T>

  • 构造函数: 仅有移动构造函数
  • 赋值运算符: 仅有移动赋值运算符
  • std::optional<T> send(T &&):移动发送,若接收端关闭,将传入的值返回,否则返回 std::nullopt
  • std::optional<T> send(T &):拷贝发送
  • void stop():关闭发送端,接收端将无法继续接收数据,析构时自动调用此函数

接收端( asco::receiver<T>

  • 构造函数: 仅有移动构造函数
  • 赋值运算符: 仅有移动赋值运算符
  • future_inline<std::optional<T>> recv()可打断异步接收,若发送端关闭且通道中没有更多对象,返回 std::nullopt
  • std::expected<T, receive_fail> try_recv(): 尝试接收一个对象,通道中没有新对象返回 std::unexpected(receive_fail::non_object) ,通道关闭返回 std::unexpected(receive_fail::closed)
  • bool is_stopped():判断通道是否关闭
  • void stop():关闭接收端,发送端将无法继续发送数据,析构时自动调用此函数

单生产者单消费者通道( asco::ss::channel

单生产者单消费者通道使用工具函数 std::pair<sender<T>, receiver<T>> ss::channel<T>() 创建。

此通道仅支持一个协程访问发送端,一个协程访问接收端,两个端没有并发安全特性。

#include <asco/channel.h>

future_void sending(asco::sender<int> tx) {
    for (int i = 0; i < 30000; i++) {
        auto r = tx.send(i);
        if (r.has_value())
            break;
    }
    co_return {};
}

auto [tx, rx] = asco::ss::channel<int>();
sending(std::move(tx));
while (true) if (auto r = co_await rx.recv(); r) {
    std::cout << *r << std::endl;
} else break;

tx 是发送端, rx 是接收端。

timeout(Ti, F)

导入 <asco/time/timeout.h> 头文件使用。

timeout 函数返回一个 future_inline<std::optional<T>> ,此类型为内联协程,必须 co_await 使其开始执行。

类型 T 从第二个参数类型推导。要求第二个参数必须是无形参的异步函数。使用其实际返回值类型作为 T

若异步函数超时,返回 std::nullopt ,否则返回异步函数的返回值。

传入的异步函数需要实现可打断特性1

此函数不是可打断协程。

#include <iostream>

#include <asco/future.h>
#include <asco/time/timeout.h>

using asco::future, asco::future_void_inline;
using asco::timeout, asco::interval;

using namespace std::chrono_literals;

future<int> async_main() {
    auto res = co_await timeout(1s, [] -> future_void_inline {
        interval in{2s};
        std::cout << "interval start\n";
        co_await in.tick();
        if (asco::this_coro::aborted()) {
            std::cout << "timeout aborted\n";
        } else {
            std::cout << "interval 2s\n";
        }
        co_return {};
    });
    if (!res)
        std::cout << "timeout\n";
    else
        std::cout << "not timeout\n";
    co_return 0;
}

  1. 可打断特性

异步同步原语

在并发编程中,通常需要同步原语来保证被并发访问的资源的同步,同步原语本身是并发安全的,同时被它控制的资源也是并发安全的。 这和多线程编程语境中的线程安全是相同的。

asco 提供了一系列在 asco 异步运行时中适用的异步同步原语类型。

推荐优先使用 asco 异步同步原语,而不是使用操作系统提供的系统调用或 C++ 标准库提供的同步原语。 asco 异步同步原语会在异步等待时挂起当前协程,由 asco 调度器调度你的程序中的其它协程,而不是阻塞当前进程让操作系统内核调度其它进程。 这样只有当你的程序中没有活动的协程,或当前进程已经耗尽当前的 CPU 时间片时,其它进程才会开始占用 CPU ,会大幅提高运行效率。 这实际上是协程相比多线程效率更高的关键。

互斥锁

导入 <asco/sync/mutex.h> 头文件使用互斥锁。

锁的获取和释放使用 raii 类型封装,调用 .lock() 函数获取锁的保卫对象,保卫对象退出作用域后自动释放锁。

lock 协程返回类型为 future_inlie<mutex<T>::guard> ,需要 co_await 使其开始执行。

锁保卫对象与迭代器类似,通过重载的 * 运算符和重载的 -> 运算符访问锁内部的对象。

future_void foo() {
    mutex<int> coro_local(muti);
    auto g = co_await muti.lock();
    *g = 5;
    std::cout << "foo *g: " << *g << std::endl;
    co_return {};
}

future<int> async_main() {
    mutex<int> decl_local(muti);
    future_void t;
    {
        auto g = co_await muti.lock();
        t = foo();
        std::cout << "*g: " << *g << std::endl;
    }
    co_await t;
    co_return 0;
}

构造函数

  • mutex(const T&):将 T 值拷贝进锁中。
  • mutex(T&&):将 T 值移动进锁中。
  • template<typename... Args> mutex(Args &&...args):用完美转发在锁内部构造 T 值。

互斥锁

导入 <asco/sync/rwlock.h> 头文件使用读写锁。

锁的获取和释放使用 raii 类型封装,调用 .read().write() 函数获取锁的保卫对象,保卫对象退出作用域后自动释放锁。

readwrite 协程返回类型为 future_inlie<rwlock<T>::read_guard>future_inline<rwlock<T>::write_guard> ,需要 co_await 使其开始执行。

锁保卫对象与迭代器类似,通过重载的 * 运算符和重载的 -> 运算符访问锁内部的对象。

future<int> async_main() {
    rwlock<int> lk{10};
    {
        auto g1 = co_await lk.read();
        auto g2 = co_await lk.read();
        std::cout << *g1 << std::endl;
        std::cout << *g2 << std::endl;
        // auto g3 = co_await lk.write();  // cannot get it
    }
    {
        auto g = co_await lk.write();
        *g = 20;
        std::cout << *g << std::endl;
        // auto g1 = co_await lk.write();  // cannot get it
        // auto g1 = co_await lk.read();  // cannot get it
    }
    co_return 0;
}

构造函数

  • rwlock(const T&):将 T 值拷贝进锁中。
  • rwlock(T&&):将 T 值移动进锁中。
  • template<typename... Args> rwlock(Args &&...args):用完美转发在锁内部构造 T 值。

互斥锁

导入 <asco/sync/spin.h> 头文件使用自旋锁锁。

锁的获取和释放使用 raii 类型封装,调用 .lock() 函数获取锁的保卫对象,保卫对象退出作用域后自动释放锁。

锁保卫对象与迭代器类似,通过重载的 * 运算符和重载的 -> 运算符访问锁内部的对象。

信号量

导入 <asco/sync/semaphore.h> 头文件使用信号量。

二值信号量( asco::binary_semaphore

二值信号量仅有两种状态,表示此信号量控制的资源存在(计数为1)或不存在(计数为0)。

构造

必须为二值信号量指定一个初始值:

binary_semaphore sem{0};

获取信号量

future_void_inline acquire()

此函数是一个内联协程,需要通过 co_await 唤起执行。

acquire 函数行为:

当信号量计数为1时,将信号量计数设置为0,并返回。

当信号量计数为0时,挂起当前协程,将当前协程加入信号量的等待队列,并挂起。

此协程是可打断协程,在函数返回前被打断时,不会将信号量计数设置为0

尝试获取信号量

bool try_acquire()

当信号量计数为1时,将信号量计数设置为0,并返回true

当信号量计数为0时,返回false

释放信号量

void release()

release 函数行为:

当信号量计数为0时,将信号量计数设置为1,从等待队列中唤醒一个协程,并返回。

当信号量计数为1时,什么都不做,直接返回。

计数信号量( asco::semaphore<N>

表示此信号量控制的资源有 N 个。

依然使用 acquire 获取信号量,使用 release 释放信号量。

release 有一个参数,表示资源的释放数量,这个参数具有默认值1,或传入指定的值,不可传入非正整数:

sem.release(2);

条件变量

导入 <asco/sync/condition_variable.h> 头文件使用条件变量。

使用普通函数 notify_one() 唤醒一个正在等待的协程,使用普通函数 notify_all() 唤醒所有正在等待的协程。

使用 wait() 函数等待条件变量,协程 wait() 的返回类型为 future_void_inline ,需要 co_await 才能开始执行。

wait() 函数与标准库的 std::condition_variable::wait() 不同,仅接收一个条件判断函数,条件判断的原子性需要调用者自行保证, 因此条件变量的原子性在某些情况下可以避免使用锁而是使用原子变量保证。 而协程进入等待队列挂起当前协程两个操作的原子性由此类本身保证。

future<int> async_main() {
    condition_variable decl_local(cv);
    atomic_bool decl_local(flag, new atomic_bool{false});
    auto task = []() -> future_void {
        condition_variable coro_local(cv);
        atomic_bool coro_local(flag);
        co_await cv.wait([&flag]{ return flag.load(); });
        std::cout << "cv notified" << std::endl;
        co_return {};
    }();
    co_await this_coro::sleep_for(1s);
    flag.notify_one();
    co_await task;
    co_return 0;
}

屏障

导入 <asco/sync/barrier.h> 头文件使用屏障。

调用 arrive() 以获取一个等待 token,调用 token 的成员协程 wait() 等待其它协程到达屏障。

token::wait() 返回 future_void_inline ,需要 co_await 才能开始执行。

constexpr size_t NUM_THREADS = 5;

future_void worker(asco::sync::barrier<NUM_THREADS> &bar, size_t id) {
    co_await bar.arrive().wait();
    co_return {};
}

future<int> async_main() {
    asco::sync::barrier<NUM_THREADS> bar;

    for (size_t i = 0; i < NUM_THREADS; ++i) { worker(bar, i + 1); }

    co_await bar.all_arrived();

    co_return 0;
}

IO

本节介绍了异步 IO 的类和相关的工具。

零拷贝缓冲区

简介

asco::io::buffer 是一个支持零拷贝的数据缓冲区,适用于高性能 IO 场景。 它通过链式结构管理多个缓冲帧(frame),每个缓冲帧可以是原始数组、字符串或字符串视图, 实现数据的高效拼接与转移,避免不必要的内存拷贝。

具有一个默认为 char 的模板参数 CharT,可容纳多种等长的字符集。也可传入 std::byte 将其作为纯字节缓冲区。

主要特性

  • 零拷贝设计:通过 std::variant 管理不同类型的数据块,减少数据拷贝次数。
  • 链式缓冲帧:每个缓冲帧独立管理,支持动态扩展和拼接。
  • 多种数据类型支持:支持 charstd::byte 及其字符串类型。
  • 高效拼接与转移:支持将其他 buffer 对象高效拼接进当前缓冲区。

主要接口

构造与赋值

  • buffer():创建一个空缓冲区。
  • buffer(CharT value):以单个字符初始化缓冲区。
  • buffer(std::basic_string<CharT> &&str):以字符串右值初始化缓冲区。
  • buffer(const std::basic_string_view<CharT> &str):以字符串视图初始化缓冲区。
  • 禁止拷贝构造,支持移动构造。

数据写入

  • void push(CharT value):向缓冲区追加单个字符。
  • void push(std::basic_string<CharT> &&str):追加字符串(右值)。
  • void push(const std::basic_string_view<CharT> &str):追加字符串视图。
  • void push(buffer &&buf):拼接另一个 buffer,将其内容高效转移到当前缓冲区,原 buffer 被清空。

数据读取

  • std::basic_string<CharT> to_string():将缓冲区所有内容合并为一个字符串,并清空缓冲区。

其他

  • void swap(buffer &rhs):与另一个 buffer 交换内容。
  • void clear():清空缓冲区内容。
  • size_t size() const:返回缓冲区内数据总字节数/字符数。
  • std::tuple<buffer, buffer> split(buffer &&self, size_t pos):将当前 buffer 按指定位置分割为两个 buffer, 第一个包含前 pos 个元素,第二个包含剩余部分。分割后原 buffer 为空。若 pos 超出范围会抛出异常。

split 用法示例

asco::io::buffer buf;
buf.push('A');
buf.push(std::string("Hello"));
buf.push(std::string_view("World"));
// buf: "AHelloWorld"

auto [first, second] = std::move(buf).split(6);
// first: "AHello"
// second: "World"
// buf 现在为空

注意事项

  • 禁止拷贝构造,避免误用导致多重释放。
  • 线程不安全,需自行加锁。

进阶

本节更详细和全面的介绍了在前文没有介绍的 ASCO 提供给使用者的所有功能以及很多实现细节。

asco 异步运行时

工作线程

工作线程分为 io workercalculating worker ,运行时初始化时将合适的工作线程绑定到某个 CPU 执行。

asco 内部,“阻塞“任务被定义为一个任务会完全占用当前工作线程而无法被其它工作线程窃取的任务,它会优先分配给 calculating worker 执行。 而不是通常说的为了等待 IO 而暂停运行的含义。 然而,对于仅基于本框架开发应用程序的用户,为了减少在正常使用过程中的失误,我们避免使用 blocking 这个词汇,而是使用 core 表示。

运行时对象配置

asco::runtime 构造函数仅有一个参数,用于设置工作线程数,参数默认值为0,为0时使用 std::thread::hardware_concurrency() 作为工作线程数。

未来会加入更多配置选项。

自定义运行时对象配置

asco 命名空间下提供了 runtime_initializer_t类型, 全局命名空间中声明一个 inline runtime_initializer_t runtime_initializer 对象以使用自定义的配置:

inline asco::runtime_initializer_t runtime_initializer = [] () {
    std::cout << "custom runtime initializer" << std::endl;
    return new asco::runtime(4); // 创建一个只有4个工作线程的 `runtime` 对象
};

如需使用更自由灵活的自定义配置,或需要在 main 函数中编写其它代码, 可以不链接 CMake 工程提供的 asco-main 目标,自己编写 main 函数,并手动创建 asco::runtime 对象。

但是 asco::runtime::sys 中的功能是由 asco-main 目标提供的,因此若不链接 asco-main 目标, 需要参考 asco-main 目标的 main 函数手动实现或不使用 asco::runtime::sys 中的功能。

自定义运行时对象类

asco 提供了 is_runtime<R> 概念,自定义的运行时类必须符合该概念。 创建了自定义的运行时对象类后,使用如下代码为 asco 指定运行时对象类:

#include <asco/rutime.h>

#define SET_RUNTIME
set_runtime(MyRuntime);

#include <asco/future.h>

头文件的导入顺序非常重要,<asco/runtime.h> 只能在 SET_RUNTIME 宏定义之前导入,以使用 set_runtime 宏设置运行时类, 其余所有的头文件都必须在其后导入。

性能统计

开启 ASCO_PERF_RECORD 选项,以启用性能统计功能。

在需要计时的异步函数开头添加 coro_pref(); 宏,或在普通函数开头添加 func_perf(); 宏以对函数计时,普通函数暂时只能统计总运行时间。

程序运行结束后,会打印性能统计结果,如:

[ASCO] Flag 'ASCO_PERF_RECORD' is enabled, please disable it if you are building your program for releasing.

active  total   counter name
1ms     2ms     205     future_void_inline asco::sync::semaphore_base<1>::acquire() [CounterMax = 1, R = asco::core::runtime]
0ms     0ms     2       future_inline<guard> asco::sync::mutex<int>::lock() [T = int]