Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

快速入门

本章节旨在说明如何基于 ASCO 异步框架编写和构建一个异步程序。

选择一种启动异步程序的方式

异步主函数

通常来说,future/promise 异步模型的异步函数具有传染性,需要一个异步的“主函数”作为程序的起点, 通过链接目标 asco-main (见 cmake目标段落)导入支持异步“主函数”的基本环境。

然后,在你的源代码中导入头文件 <asco/future.h> 后,在全局命名空间下定义一个返回 asco::future<int> 的异步函数 future<int> async_main() , 不能接收任何参数,通过 co_return 返回退出状态码。异步程序将会从此函数开始。

有关如何获取命令行参数和环境变量,见 future<T> 章节。

asco-main 实际上是在 main 函数中调用 async_main阻塞等待协程返回来实现的。

#include <asco/future.h>

using asco::future;

future<int> async_main() {
    co_return 0;
}

同步主函数

在实际的工程中,引入异步主函数会有很多困难,比如:

  • 在引入异步框架前可能已经定好了程序的架构,难以从主函数开始将程序重构为异步程序;
  • 或者可能仅仅需要一部分异步特性,没必要将整个程序都重构为异步程序;
  • 甚至主函数可能实际上是其它第三方库提供的,无法改为异步函数。

因此,asco 同样支持主函数保持不变,从同步上下文调用异步函数。 当然,这样并不能直接获取异步函数的返回值,需要通过阻塞等待异步任务组合来间接处理。

链接目标 asco-base 即可用这种方式将异步环境引入你的程序。

cmake目标

  • asco: ASCO 运行时环境,静态目标。
  • asco-main: 选用异步主函数方式启动需要链接的目标,静态目标。
  • asco-base: 选用同步主函数方式启动需要链接的目标,静态目标。

后二者不能同时链接。

  • asco-shared: 目标 asco 的动态版本。
  • asco-main-shared: 依赖 asco-sharedasco-main ,静态目标。
  • asco-base-shared: 依赖 asco-sharedasco-base ,静态目标。

如需使用动态库,请使用 -shared 后缀的目标。

未来可能会加入更多构建系统的支持。

future<T> 协程下的异步编程

asco::future<T>是C++20 coroutine的一个等待器( awaiter ),它与std::future<T>没有任何联系。

asco::future<T> (后简称 future<T> )作为异步函数的返回值,表示该函数将在未来某个时刻返回一个T类型的值。 调用方可以在异步函数中使用 co_await ,或在同步函数中调用 future<T>::await() 等待异步函数返回并获取返回值。


概念

  • 异步函数:指返回值为 future<T> 或其变体的函数定义。
  • 协程:指 C++20 协程标准所定义的“协程帧”对象。
  • 异步任务:除“协程帧”对象外,还有其相关的元数据和协程基础设施构成的被 asco 异步运行时所调度的实体。 特殊情况下,一个异步任务也可以对应多个协程。

异步主函数

在全局命名空间中、名为 async_main 、没有形参、返回值为 asco::future<int> 的函数是异步主函数:

#include <asco/future.h>

future<int> async_main() {
    ...
    co_return 0;
}

异步主函数返回后运行时立即销毁,无法创建新协程,但是未完成的协程可以正常运行和销毁。

使用 runtime::sys::args() 获取命令行参数, runtime::sys::env() 获取环境变量1:

using asco::runtime::sys;
future<int> async_main() {
    for (auto& arg : sys::args()) {
        std::cout << arg << std::endl;
    }
    for (auto& [key, value] : sys::env()) {
        std::cout << key << " = " << value << std::endl;
    }
    co_return 0;
}

asco_main 使用默认配置1创建异步 asco 运行时并对 async_main 函数的返回值调用 .await()


核心机制

asco 异步函数被调用时,立即将此函数作为一个任务发送给 asco 异步运行时并返回future<T>对象 ,异步任务将不会立即开始执行,而是等待调度器调度。

asco 异步函数中使用 co_await 时,当前任务挂起,等待 co_await 表达式返回结果。任务挂起时,调度器不会调度此任务。

co_await 表达式返回结果时,当前任务恢复,等待调度器调度。

asco 异步函数中使用 co_return 时,将返回值移动2给调用方,当前任务挂起并等待 asco 异步运行时稍后清理任务。


future<T> 的变体

future_inline<T>

future_inline<T> 的功能与 std::future 相同,但是它被创建时不会被发送给 asco 异步运行时,而是直接将协程挂起。 当此对象被 co_await 时,协程在当前上下文中被当场恢复,执行完毕后返回。

此等待器适用于本身十分短小但不得不执行异步代码的函数。

dispatch() 函数

当在非异步环境中调用 future_inline<T> 的异步函数时,不能使用 await() 阻塞等待其返回, 需要先调用 dispatch() 将其转化为非 inline 的异步任务后再使用 await()。也可以利用异步任务组合将此任务向后传递。

在异步函数环境中调用 dispatch() 时,将抛出一个运行时异常。

future_core<T>

future_core<T> 的功能与 std::future 相同,但是它创建核心任务, 核心任务不可以被窃取且优先发送至 calculating worker 工作线程1

此等待器适用于 CPU 密集型任务。

在开启了超线程的 Intel 混合架构处理器(“大小核架构”)的 CPU 上, calculating worker 工作线程将运行在高性能核心(“大核”)上, 高能效核心(“小核”)均为 io worker 工作线程。 在未来,对于ARM big.LITTLE异构架构处理器(“大小核架构”)的安卓设备, calculating worker 工作线程将运行在大核上。


asco::yield

包含头文件 <asco/yield.h> 即可使用。

当你认为当前协程可能长时间占用工作线程或任何可能需要让出工作线程以使其它协程得以执行时,使用 co_await asco::yield{} 以暂时让出工作线程,协程依然处于活动状态,若当前工作线程中没有其它可以调度的活动协程,当前协程依然会被继续调度。


有关协程之间引用的传递

协程的自动储存期变量根据不同情况有不同的储存位置:

  • 变量的所有访问行为没有跨过任何协程暂停点,变量可能会被优化到当前工作线程的线程栈中。
  • 变量的所有访问行为跨过了协程暂停点,变量会存在于协程状态对象中,随着协程的创建和销毁而构造和析构。

由于协程可能会分配到不同的工作线程中执行,前一种自动储存期变量的引用不可以在协程间传递。以 thread_local 关键字声明的变量与这种情况相同, 不可以将它的引用在协程间传递。

以这段代码为例:

condition_variable decl_local(cv);
bool flag = false;
auto t = [&flag] -> future<void> {
    condition_variable coro_local(cv);
    co_await cv.wait([flag]{ return flag; });
    co_return;
} ();
flag = true;
cv.notify_one();
co_await t;

decl_local(cv)coro_local(cv) 的使用是安全的,这两个宏将在后面的段落讲解。

讨论两种情况:

  • 当前协程与 lambda 表达式在同一工作线程中被调度执行:当前协程执行至 co_await t 挂起后,线程栈退出当前栈帧,如果 flag 被优化到栈上, 会失效, lambda 表达式中捕获的 flag 将引用一个已经退出的栈帧中的地址。
  • 当前协程与 lambda 表达式不在同一工作线程中被调度执行:如果 flag 被优化到栈上, lambda表达式中的变量 flag 引用了一个其它线程中的栈上地址,C++标准并未规定线程栈在整个进程中的可访问性,访问这个引用是未定义行为。

错误处理

支持使用 try-catch 捕获异常。

未捕获的异常将传递给调用方,在调用方 co_await.await() 时抛出。

noexcept 会被忽略,以相同的方式传递异常。

本框架不限制其它错误处理方式的使用,也不提供其它错误处理方式的基础设施。

asco::exception

<asco/exception.h>

自带堆栈追踪和异步函数调用链追踪的异常类,若需要在 asco 异步运行时中抛出带有堆栈追踪的异常,请直接使用或派生此类。

此类的构造函数接收一个 std::string 参数作为异常的 what() 信息,派生类无需自己重载 const char *what() noexcept 函数。

若此类没有在 asco 异步运行时中构造,将会抛出 asco::runtime_error

  • 注:异步函数调用链追踪需要关闭优化( -O0 )才能获取正确的地址、函数签名和源代码位置。

asco::runtime_error

<asco/rterror.h>

自带堆栈追踪的异常类,用于运行时内部的异常处理。


协程睡眠

睡眠指定的时间间隔:

future_inline<void> sleep_for(std::chrono::duration<Rep, Period>)

duration 类型包括标准库中任意的时间间隔类型如 nanosecondsmiliseconds 等,以及它们对应的字面值字符串运算符。

睡眠至指定的时间点:

future_inline<void> sleep_until(std::chrono::time_point<Clock, Duration>)

协程本地变量

协程本地变量沿调用链传播。使用基于编译期计算哈希值的类型检查和变量名查找,查找变量名时沿调用链一路向上搜索。

使用宏 decl_local(name, ...)decl_local_array(name, ptr) 声明及初始化协程本地变量

int decl_local(i);
i += 5;
std::string decl_local(str, new std::string("Hello ASCO"));
int *decl_local_array(arr, new int[10]);

使用宏 coro_local(name) 获取协程本地变量

int *coro_local(arr);
std::string coro_local(str);
for (char c : str) {
    std::cout << c << ' ';
}

注意

若变量类型的模板参数中具有自动推导的模板参数,其自动推导无法传递至开头的类型声明处,需要手动指定。

若在变量构造处变量类型的模板参数中具有可以自动推导的匿名 lambda 表达式,需要显式填入模板参数,否则类型验证会失效。


可打断协程

future 调用 .abort() 递归打断这个任务以及这个任务正在挂起等待的子任务,如果任务正在挂起,立即唤醒,被唤醒的协程应正确处理打断。

协程函数需要自己实现被打断时的恢复功能,以将状态恢复到协程开始执行前。

如果你的协程没有实现可打断特性,请谨慎使用于 asco 提供的依赖可打断特性的功能。

asco 内部大多数异步函数都具有可打断支持,如信号量的 .acquire() 函数:

asco::binary_semaphore sem{1};
auto task = sem.acquire();
task.abort();
// acquire() 返回 future_inline<void> 类型,需要手动 co_await 使任务开始执行
try { co_await task; } catch (coroutine_abort &) {}
assert_eq(sem.get_counter(), 1);

恢复任务状态

asco 异步函数中调用 bool this_coro::aborted() ,返回 true 时执行状态恢复逻辑或缓存已得到的结果供下次调用时使用, 然后立即 throw coroutine_abort{} 。此处的代码称为打断判定点。如果没有抛出此异常直接返回,则是未定义行为。 此异常会继续在调用者 co_await 后抛出,若不使用 try-catch 捕获,还可以使用 future<T>::aborted()3 对子任务被打断的情况进行处理。

编写可打断协程最佳实践:在每个协程暂停点4前后设置一个打断判定点,并在 co_return 之后利用 raii 设置一个打断判定点

co_return 后,析构阶段无法抛出异常,但是可以通过 this_coro::throw_coroutine_abort() 直接让协程抛出异常。


  1. asco 异步运行时 ↩2 ↩3

  2. std::move(),模板参数 T 必须实现移动构造函数移动赋值运算符

  3. 任务打断处理,见任务组合

  4. C++20 coroutine 使用的术语,指 co_awaitco_yieldco_return

任务组合

在实际的工程项目中,各种任务通常以多种逻辑关系和时序关系组合在一起,而不是单纯的等待任务完成后再进行下一个任务, ASCO 提供了一些任务组合方式。

另外,除 select<N> 外,大多数组合函数都支持在同步上下文中调用。


非阻塞的串行任务组合 future::then()

future<T>::then() 用于将一个异步操作的结果传递给下一个异步操作,实现异步任务的串行组合。 它允许你在前一个 future 完成后,自动将结果传递给指定的异步函数,并返回一个新的 future,表示整个组合任务的最终结果。

同时它返回另一个 futur ,可以继续对它进行任意的任务组合操作。

用法

future<int> f1 = some_async_func();
auto f2 = std::move(f1).then([](int result) -> future<std::string> {
    // 可以使用 result 进行后续异步操作
    co_return std::to_string(result);
});

then() 接收一个右值引用的 deduced this,通常在异步函数调用后直接调用 then() 即可,这里为了表示返回值类型, 特意先构造一个 future 对象,再将它移动。

传入的参数如果是 lambda 表达式,其参数不可以使用 auto 自动推导。

行为说明

  • then() 会等待当前 future 完成,将其结果作为参数传递给 f
  • then() 返回的 future 表示整个链式异步操作的最终结果。
  • 支持异常传递,若前一个 future 抛出异常,则异常会传递到 then 返回的 future。

注意事项

  • then() 只支持异步函数(返回 future)的链式组合。
  • 若在 future_inline 上调用 then,会自动将任务迁移到当前 worker 上执行。

异步的异常捕获 future::exceptionally()

future<T>::exceptionally() 用于为异步任务链提供异常处理能力。当前一个 future 抛出指定的异常时, exceptionally 可以捕获并执行用户自定义的异常处理逻辑,如果抛出的不是所指定的异常,则继续重抛至 exceptionally 自己的 future。

用法

future<int> f = some_async_func();
auto f2 = std::move(f).exceptionally([](const std::runtime_error& e) {
    std::cout << "caught: " << e.what() << std::endl;
});

exceptionally() 接收一个右值引用的 deduced this,通常在异步函数调用后直接调用即可。 传入的参数可以是 lambda 表达式或其它可调用对象,参数类型需为异常类型或异常类型的引用或 std::exception_ptr

行为说明

  • exceptionally() 会等待当前 future 完成 。
  • 如果当前 future 正常返回,则 exceptionally 返回的 future 也正常返回原值。
  • 如果当前 future 抛出异常,且异常类型与处理函数参数类型匹配,则调用处理函数,并返回 std::unexpected 包装的错误类型。
  • 如果异常类型不匹配,则异常会继续向上传递。
  • 返回类型为 future<std::expected<T, E>>,其中 E 由处理函数的返回类型自动推断。

注意事项

  • 处理函数参数类型必须能匹配 future 抛出的异常类型,否则异常不会被捕获。
  • 处理函数可以为 lambda、函数指针、std::function 等。
  • 处理函数返回类型可以为 void,此时异常类型为 std::monostate,否则为实际返回类型。
  • 若需捕获所有异常,可使用 std::exceptionstd::exception_ptr 作为参数类型。

协程打断异常

当协程被打断(如通过 future.abort()select<N> 选择逻辑打断其它分支)时,应抛出 coroutine_abort 异常。 你可以通过以下两种方式处理协程打断:

  1. 手动 try-catch 捕获

    在协程体内用 try { ... } catch (const coroutine_abort&) { ... } 捕获打断异常,实现自定义清理或日志逻辑。

  2. 使用 future.aborted() 统一处理

    通过 future<T>::aborted() 方法,可以为打断异常单独指定处理逻辑。例如:

auto f = some_async_func().aborted([] {
    std::cout << "协程被打断" << std::endl;
});

当协程被打断时,aborted() 传入的处理函数会被调用,并返回 std::nullopt,否则正常返回结果。


竞态任务组合(选择逻辑) select<N>

选择最先返回的协程继续运行,打断其它未返回或后返回的协程。

asco::interval in1s{1s};
asco::interval in500ms{500ms};
for (int i{0}; i < 6; i++) {
    switch (co_await asco::select<2>{}) {
    case 0: {
        co_await in1s.tick();
        std::cout << "1s\n";
        break;
    }
    case 1: {
        co_await in500ms.tick();
        std::cout << "500ms\n";
        break;
    }
    }
}

选择器将当前协程克隆出 N 个协程并同时唤醒运行。对构造的 select<N> 对象 co_await 后按协程被克隆的顺序返回 size_t 类型的值。

选择器仅对 select<N> 对象返回后的第一个异步任务有效。

最先返回的异步任务会将其它任务打断,因此即使后来的协程的已经返回,也会根据前文规定的可打断特性将其影响的状态恢复。

被打断的协程会将其调用者一并销毁;如果正确使用了 select<N> ,其调用者总是被克隆的 N 个协程中的 N-1 个。

异步编程基本工具

本节介绍了异步编程的基本工具,包括异步编程的基本工具函数、管道、定时器等。

异步生成器 generator<T>

generator<T> 是 asco 提供的“按需拉取”的异步序列工具。它让你在一个协程中逐个 co_yield 元素,消费端以异步方式一项项地拉取并处理,适合流式处理、分页读取、或逐步产生结果的场景。

核心类型与别名:

  • asco::generator<T>:标准生成器
  • asco::generator_core<T>:核心生成器

生成器的本体是一个协程函数:在生产端用 co_yield 逐项产出 T,完成时 co_return;;消费端用 while (g) 搭配 co_await g() 逐项拉取。

快速示例

#include <asco/generator.h>
#include <asco/future.h>

using asco::generator;
using asco::future;
using asco::future_inline;

// 生产端:生成 1..n
generator<int> gen_count(int n) {
    for (int i = 1; i <= n; ++i) {
        co_yield i;
    }
    co_return; // 可省略
}

// 消费端:累加求和
future_inline<int> consume_sum(generator<int>& g) {
    int sum = 0;
    while (g) {                // g 仍可产出
        int v = co_await g();  // 拉取下一项
        sum += v;
    }
    co_return sum;
}

future<int> async_main() {
    auto g = gen_count(1000);
    auto sum = co_await consume_sum(g);
    // sum == 1000 * 1001 / 2
    co_return 0;
}

API 与语义

  • 定义:template<class T> generator<T> f(...);

    • 在函数体内可多次 co_yield T 产出元素
    • 函数正常结束或 co_return; 代表序列结束
  • 消费:

    • bool(g):判断生成器是否仍可继续产出(未关闭)
    • co_await g():异步拉取下一项,返回 T;若生成器已结束,会抛出 asco::runtime_error(见下一节)

异常与结束

  • 生产端抛出异常:

    • 消费端在消费完所有已经产生的值后再调用 co_await g() 时会重新抛出该异常;随后生成器关闭,bool(g) 返回 false
  • 正常结束:

  • IO 分块读取

#include <asco/io/file.h>
#include <asco/io/buffer.h>

using asco::io::file;
using asco::io::buffer;

generator<buffer<>> read_chunks(file& f, size_t chunk_size) {
    while (true) {
        auto r = co_await f.read(chunk_size);
        if (!r.has_value())
            co_return; // EOF
        co_yield std::move(*r);
    }
}

建议的消费模式:

while (g) {
    int v = co_await g();
    // 处理 v
}

如需区分“异常结束”与“正常结束”,可在循环外包一层 try/catch:

try {
    while (g) {
        int v = co_await g();
        // 处理 v
    }
} catch (const std::exception& e) {
    // 处理异常路径
}

并发与调度

  • co_yield 默认是“无背压”的快速产出点,不会在每次 co_yield 处挂起。
  • 消费端通过一次 co_await g() 拉取一个元素。
  • 若需要更强的背压(例如严格一产一消),可在上层协议中控制消费节奏,或引入限速逻辑。

典型用法

  • IO 分块读取
generator<buffer> read_pages(file& f, size_t page, size_t count) {
    for (size_t i = 0; i < count; ++i) {
        auto buf = co_await f.read(page + i);
        assert(buf);
        co_yield std::move(*buf);
    }
}

future<T> 的关系

  • future<T> 符合 <asco/utils/concepts.h> 中的概念 async_function ,但是生成器不符合。
  • 生成器本身仍然是一个协程任务,遵循 asco 的调度模型。
  • 生成器没有最终的 T 返回值,其“结果”是一串通过 co_yield 逐步产出的元素。

错误与最佳实践

  • 在消费端使用 while (g) 做循环条件,避免最后一次 co_await g() 在生成器结束后抛错影响逻辑。
  • 如果你确实需要显式检查结束,可用 try/catch 捕获异常并在 catch 中处理收尾。
  • 生成器可移动,不可拷贝。移动后使用新对象继续消费,旧对象不再可用。

定时器

导入 <asco/time/interval.h> 头文件使用。

定时器每超过固定时间产生一次 tick

定时器无法保证完全精确,其精度受操作系统调度的影响。

调用方法 asco::interval::tick()co_await ,协程挂起至定时器到达唤醒点,或当时间过短直接忙等待至唤醒点。

使用例:

#include <asco/future.h>
#include <asco/time/interval.h>

using asco::future;
using asco::interval;
using namespace std::chrono_literals;

future<void> foo() {
    interval in(1s);
    for (int i = 0; i < 10; i++) {
        co_await in.tick();
        std::cout << "tick foo" << std::endl;
    }
    co_return;
}

future<int> async_main() {
    auto task = foo();
    interval in(500ms);
    for (int i = 0; i < 10; i++) {
        co_await in.tick();
        std::cout << "tick async_main" << std::endl;
    }
    co_return 0;
}

timeout(Ti, F)

导入 <asco/time/timeout.h> 头文件使用。

timeout 函数返回一个 future_inline<std::optional<T>> ,此类型为内联协程,必须 co_await 使其开始执行。

类型 T 从第二个参数类型推导。要求第二个参数必须是无形参的异步函数。使用其实际返回值类型作为 T

若异步函数超时,返回 std::nullopt ,否则返回异步函数的返回值。

传入的异步函数需要实现可打断特性1

此函数不是可打断协程。

#include <iostream>

#include <asco/future.h>
#include <asco/time/timeout.h>

using asco::future;
using asco::timeout, asco::interval;

using namespace std::chrono_literals;

future<int> async_main() {
    auto res = co_await timeout(1s, [] -> future_inline<void> {
        interval in{2s};
        std::cout << "interval start\n";
        co_await in.tick();
        if (asco::this_coro::aborted()) {
            std::cout << "timeout aborted\n";
        } else {
            std::cout << "interval 2s\n";
        }
        co_return;
    });
    if (!res)
        std::cout << "timeout\n";
    else
        std::cout << "not timeout\n";
    co_return 0;
}

  1. 可打断特性

异步同步原语

在并发编程中,通常需要同步原语来保证被并发访问的资源的同步,同步原语本身是并发安全的,同时被它控制的资源也是并发安全的。 这和多线程编程语境中的线程安全是相同的。

asco 提供了一系列在 asco 异步运行时中适用的异步同步原语类型。

推荐优先使用 asco 异步同步原语,而不是使用操作系统提供的系统调用或 C++ 标准库提供的同步原语。 asco 异步同步原语会在异步等待时挂起当前协程,由 asco 调度器调度你的程序中的其它协程,而不是阻塞当前进程让操作系统内核调度其它进程。 这样只有当你的程序中没有活动的协程,或当前进程已经耗尽当前的 CPU 时间片时,其它进程才会开始占用 CPU ,会大幅提高运行效率。 这实际上是协程相比多线程效率更高的关键。

互斥锁

导入 <asco/sync/mutex.h> 头文件使用互斥锁。

锁的获取和释放使用 raii 类型封装,调用 .lock() 函数获取锁的保卫对象,保卫对象退出作用域后自动释放锁。

lock 协程返回类型为 future_inlie<mutex<T>::guard> ,需要 co_await 使其开始执行。

锁保卫对象与迭代器类似,通过重载的 * 运算符和重载的 -> 运算符访问锁内部的对象。

future<void> foo() {
    mutex<int> coro_local(muti);
    auto g = co_await muti.lock();
    *g = 5;
    std::cout << "foo *g: " << *g << std::endl;
    co_return;
}

future<int> async_main() {
    mutex<int> decl_local(muti);
    future<void> t;
    {
        auto g = co_await muti.lock();
        t = foo();
        std::cout << "*g: " << *g << std::endl;
    }
    co_await t;
    co_return 0;
}

构造函数

  • mutex(const T&):将 T 值拷贝进锁中。
  • mutex(T&&):将 T 值移动进锁中。
  • template<typename... Args> mutex(Args &&...args):用完美转发在锁内部构造 T 值。

读写锁

导入 <asco/sync/rwlock.h> 头文件使用读写锁。

锁的获取和释放使用 raii 类型封装,调用 .read().write() 函数获取锁的保卫对象,保卫对象退出作用域后自动释放锁。

readwrite 协程返回类型为 future_inlie<rwlock<T>::read_guard>future_inline<rwlock<T>::write_guard> ,需要 co_await 使其开始执行。

锁保卫对象与迭代器类似,通过重载的 * 运算符和重载的 -> 运算符访问锁内部的对象。

future<int> async_main() {
    rwlock<int> lk{10};
    {
        auto g1 = co_await lk.read();
        auto g2 = co_await lk.read();
        std::cout << *g1 << std::endl;
        std::cout << *g2 << std::endl;
        // auto g3 = co_await lk.write();  // cannot get it
    }
    {
        auto g = co_await lk.write();
        *g = 20;
        std::cout << *g << std::endl;
        // auto g1 = co_await lk.write();  // cannot get it
        // auto g1 = co_await lk.read();  // cannot get it
    }
    co_return 0;
}

构造函数

  • rwlock(const T&):将 T 值拷贝进锁中。
  • rwlock(T&&):将 T 值移动进锁中。
  • template<typename... Args> rwlock(Args &&...args):用完美转发在锁内部构造 T 值。

自旋锁

导入 <asco/sync/spin.h> 头文件使用自旋锁。

锁的获取和释放使用 raii 类型封装,调用 .lock() 函数获取锁的保卫对象,保卫对象退出作用域后自动释放锁。

锁保卫对象与迭代器类似,通过重载的 * 运算符和重载的 -> 运算符访问锁内部的对象。

信号量

导入 <asco/sync/semaphore.h> 头文件使用信号量。

二值信号量( asco::binary_semaphore

二值信号量仅有两种状态,表示此信号量控制的资源存在(计数为1)或不存在(计数为0)。

构造

必须为二值信号量指定一个初始值:

binary_semaphore sem{0};

获取信号量

future_inline<void> acquire()

此函数是一个内联协程,需要通过 co_await 唤起执行。

acquire 函数行为:

当信号量计数为1时,将信号量计数设置为0,并返回。

当信号量计数为0时,挂起当前协程,将当前协程加入信号量的等待队列,并挂起。

此协程是可打断协程,在函数返回前被打断时,不会将信号量计数设置为0

尝试获取信号量

bool try_acquire()

当信号量计数为1时,将信号量计数设置为0,并返回true

当信号量计数为0时,返回false

释放信号量

void release()

release 函数行为:

当信号量计数为0时,将信号量计数设置为1,从等待队列中唤醒一个协程,并返回。

当信号量计数为1时,什么都不做,直接返回。

计数信号量( asco::semaphore<N>

表示此信号量控制的资源有 N 个。

依然使用 acquire 获取信号量,使用 release 释放信号量。

release 有一个参数,表示资源的释放数量,这个参数具有默认值1,或传入指定的值,不可传入非正整数:

sem.release(2);

无限信号量( asco::unlimited_semaphore

定义为:

using unlimited_semaphore = semaphore_base<std::numeric_limits<size_t>::max()>;

条件变量

导入 <asco/sync/condition_variable.h> 头文件使用条件变量。

使用普通函数 notify_one() 唤醒一个正在等待的协程,使用普通函数 notify_all() 唤醒所有正在等待的协程。

使用 wait() 函数等待条件变量,协程 wait() 的返回类型为 future_inline<void> ,需要 co_await 才能开始执行。

wait() 函数与标准库的 std::condition_variable::wait() 不同,仅接收一个条件判断函数,条件判断的原子性需要调用者自行保证, 因此条件变量的原子性在某些情况下可以避免使用锁而是使用原子变量保证。 而协程进入等待队列挂起当前协程两个操作的原子性由此类本身保证。

future<int> async_main() {
    condition_variable decl_local(cv);
    atomic_bool decl_local(flag, new atomic_bool{false});
    auto task = []() -> future<void> {
        condition_variable coro_local(cv);
        atomic_bool coro_local(flag);
        co_await cv.wait([&flag]{ return flag.load(); });
        std::cout << "cv notified" << std::endl;
        co_return;
    }();
    co_await this_coro::sleep_for(1s);
    flag.notify_one();
    co_await task;
    co_return 0;
}

屏障

导入 <asco/sync/barrier.h> 头文件使用屏障。

调用 arrive() 以获取一个等待 token,调用 token 的成员协程 wait() 等待其它协程到达屏障。

token::wait() 返回 future_inline<void> ,需要 co_await 才能开始执行。

constexpr size_t NUM_THREADS = 5;

future<void> worker(asco::sync::barrier<NUM_THREADS> &bar, size_t id) {
    co_await bar.arrive().wait();
    co_return;
}

future<int> async_main() {
    asco::sync::barrier<NUM_THREADS> bar;

    for (size_t i = 0; i < NUM_THREADS; ++i) { worker(bar, i + 1); }

    co_await bar.all_arrived();

    co_return 0;
}

无锁数据结构与并行算法

本模块提供面向高吞吐、低延迟场景的无锁(lock-free)数据结构与并行算法基元,既可在 asco 协程运行时中使用,也可在普通线程环境中独立使用。

设计重点:

  • 尽量避免阻塞与系统调用,减小上下文切换开销。
  • 借助原子操作与合适的内存序(acquire/release/acq_rel)保证跨核可见性与顺序性。
  • 关注缓存友好与伪共享隔离,使用 cache line 对齐降低抖动。
  • 在异常与资源管理上提供可预期的语义,避免泄漏与 ABA 问题。

当前组件

  • 无锁连续队列 continuous_queue:面向 SPSC 理想路径、但允许多 sender/receiver 句柄并发的消息队列。

无锁连续队列 continuous_queue

导入头文件 <asco/nolock/continuous_queue.h> 使用无锁队列。

该结构提供一对生产者/消费者句柄:sender<T>receiver<T>,通过 create<T>() 一次性创建并绑定到同一个内部缓冲区链表。它是“无锁”的:数据入队/出队全程只使用原子操作与内存序,不持有互斥锁;同时采用按 CPU cache line 对齐的定长帧(frame)作为环形/链式缓冲,尽量减少伪共享与 cache 抖动。

适用场景:

  • 单个 sender 与单个 receiver 的高吞吐、低延迟消息传递(也允许多个 sender/receiver 拷贝同一端句柄并发使用,但每个 sender/receiver 自身不是线程安全对象)。
  • T 的移动/构造开销较低、且异常保证良好的场景。

注意:

  • senderreceiver 都不是线程安全类型,但可以拷贝多个句柄在不同线程上使用(内部通过引用计数和原子指针维护帧链表)。
  • T 需满足 move_secure 概念:移动安全。
  • 原则上不允许 T 的构造函数抛出异常。

核心概念

  • frame<T>: 4KB 对齐、定长数据页,包含三组游标与状态:
    • head: 消费进度(接收方已领取的下标),初始 preset 为 0;
    • tail: 生产进度(发送方已保留的尾下标),满时置为 index_nullopt
    • released: 生产方串行释放游标,保证写入与可见性的顺序;
    • next: 单向链表指向“下一帧”;
    • sender_stopped/receiver_stopped: 双方停止标志,用于优雅关闭;
    • refcount: 对帧的共享引用计数,保障跨句柄与跨帧切换时的生命周期安全;
  • 每帧可容纳 length = (4096 - header_size) / sizeof(T) 个元素,元素区按 alignof(T) 对齐;

接口速览

  • std::tuple<sender<T>, receiver<T>> create<T>():创建一对句柄。
  • sender<T>::push(T|T&&) -> std::optional<T>
    • 入队成功返回 std::nullopt
    • 若队列已关闭或一方停止,则返回“未消费的值”以便调用者自行处理(避免丢失)。
  • receiver<T>::pop() -> std::expected<T, receiver::pop_fail>
    • 成功返回元素;
    • 失败返回 pop_fail::non_object(暂时无元素)或 pop_fail::closed(队列关闭)。
  • sender::stop() / receiver::stop():设置全链路停止标志,释放当前持有的帧引用;
  • is_stopped(): 查询端是否已停止/关闭(含对方停止的传播)。

使用示例

#include <asco/nolock/continuous_queue.h>

namespace cq = asco::continuous_queue;

void producer_consumer_demo() {
    auto [tx, rx] = cq::create<int>();

    // 生产者线程/协程:
    for (int i = 0; i < 1000; ++i) {
        if (auto unconsumed = tx.push(i)) {
            // 队列已关闭或对端停止,处理未消费数据
            break;
        }
    }
    tx.stop(); // 可选:显式结束生产

    // 消费者线程/协程:
    while (true) {
        auto r = rx.pop();
        if (r) {
            int v = *r;
            // 使用 v
        } else if (r.error() == decltype(rx)::pop_fail::non_object) {
            // 当前无元素,可自旋/让出/休眠
            continue;
        } else {
            // closed
            break;
        }
    }
}

行为与内存模型

  • push 路径:
    1. CAS 递增 tail,若满则尝试跳帧(确保 next 存在,不存在则分配新 frame 并链接);
    2. 在计算出的 index 处原位构造元素;
    3. 按序等待 released == index,随后将 released = index + 1,序列化生产可见性;
  • pop 路径:
    1. CAS 递增 head,若 head 达到 length 则尝试跳到 next 帧;
    2. 若暂时无元素,返回 non_object;若检测到停止且无更多元素,返回 closed
    3. 取出并移动/拷贝元素后显式析构存储槽位;
  • 停止传播:stop() 会沿帧链标记 sender_stoppedreceiver_stopped,另一端在检查到标志且无待处理元素时认为关闭;

内存序:

  • 关键原子使用 acquire/release 或 acq_rel,保证跨核可见性;
  • 使用 std::hardware_destructive_interference_size 对齐关键字段,降低伪共享;

复杂度与性能提示

  • push/pop 常数时间,跨帧时有轻微分配/链接成本(有 freelist 缓解);
  • 单生产/单消费路径下最大化局部性;多 sender/receiver 并发下仍是无锁,但存在轻度竞争自旋;
  • 为极低延迟,push 在等待 released 时做短自旋,超过阈值后 cpu_relax();可根据场景考虑在上层增加让出/休眠策略;

正确性与限制

  • T 的析构至少不应在队列析构路径上抛出;
  • 对 T 的对齐要求已被 frame header 设计满足(length>16 推导出 sizeof/alignof 约束);
  • sender/receiver 自身不是线程安全对象;若需多线程使用,请复制句柄,每个句柄独占所在线程;
  • 不提供有界背压信号,仅当帧满时自动扩展到下一帧;如需限流请在上层实现;

与 asco 运行时的关系

该容器完全在用户态工作,不依赖调度器或阻塞系统调用;适合在 asco 协程调度环境中作为跨任务的高效消息通道,亦可用于普通线程环境。

API 参考

  • 命名空间:
    • asco::nolock::continuous_queue:原始实现
    • asco::continuous_queue:re-export 别名
  • 类型:
    • sender<T>:生产端
    • receiver<T>:消费端(pop_fail { non_object, closed }
  • 函数:
    • create<T>() -> tuple<sender<T>, receiver<T>>
    • sender::push(T|T&&) -> optional<T>
    • receiver::pop() -> expected<T, pop_fail>
    • sender::stop() / receiver::stop()
    • sender::is_stopped() / receiver::is_stopped()

调试与故障排查

  • 若频繁返回 non_object,可能是生产/消费速率不匹配;考虑在消费者侧加入自适应让出策略;
  • 若过早返回 closed,检查是否一端调用了 stop() 或已经释放了所有句柄;
  • 大对象 T 可能降低每帧容量与缓存局部性,必要时改为传输指针/句柄而非大对象本体。

IO

本节介绍了异步 IO 的类和相关的工具。

零拷贝缓冲区

简介

asco::io::buffer<CharT> 是一个面向高性能 I/O 的零拷贝缓冲区,支持将多段数据以链式方式拼接、切分与转移,默认字符类型为 char,也可使用 std::byte 作为纯字节缓冲区。

适用场景:

  • 网络/文件 I/O 的分段读写与聚合
  • 日志、协议报文、消息的高效拼装
  • 需要避免多次内存拷贝的高吞吐场景

核心特点:

  • 多种来源零拷贝拼接(原始内存、std::string、std::string_view)
  • 消费型与非消费型操作区分清晰(如 to_string/split 为消费型,clone/rawbuffers 为非消费)
  • 与标准库良好协作,接口直观

快速上手

using asco::io::buffer;
using namespace std::literals;

// 追加与合并输出(to_string 会消耗自身)
buffer<> b;
b.push('A');
b.push(std::string("SCO"));
b.push("._TEST"sv);
std::string out = std::move(b).to_string(); // "ASCO._TEST"

// 拼接另一个 buffer(源被清空)
buffer<> x, y;
x.push("Hello"sv);
y.push("World"sv);
x.push(std::move(y));                       // y 清空
std::string s = std::move(x).to_string();   // "HelloWorld"

// 分割
buffer<> z("HelloWorld"s);
auto [left, right] = std::move(z).split(5); // "Hello" | "World"

// 克隆(非消费式,适合重复读取)
buffer<> a("ASCO"s);
auto c = a.clone();
std::string s1 = std::move(c).to_string();         // "ASCO"
std::string s2 = std::move(a.clone()).to_string(); // 仍可再读

// 覆盖修改(从 start 起以新内容覆盖,可能截断原尾部)
buffer<> m("HelloWorld"s);
buffer<> ins("ASCO"s);
m.modify(5, std::move(ins)); // 结果 "HelloASCOd"
auto rs = std::move(m).to_string();

// 追加零字节(如为定长结构留空位)
buffer<> zf;
zf.push("A"sv);
zf.fill_zero(3);
zf.push("B"sv);
auto zs = std::move(zf).to_string(); // "A\0\0\0B"

// 遍历底层原始块(零拷贝发送)
buffer<> r("AA"s);
auto it = r.rawbuffers();
while (!it.is_end()) {
    auto rb = *it;      // rb.buffer / rb.size / rb.seq
                        // 直接用于系统调用或网络发送
    ++it;
}

常用接口(按用途)

构造与赋值

  • buffer()buffer(CharT)buffer(std::basic_string<CharT>&&)
  • buffer(const std::basic_string_view<CharT>&)
  • 支持移动构造/赋值,禁止拷贝

写入与组合

  • void push(CharT)
  • void push(std::basic_string<CharT>&&)
  • void push(const std::basic_string_view<CharT>&)
  • void push(buffer&&)(拼接并清空源)
  • void fill_zero(size_t n) 追加 n 个值为 0 的字节/字符
  • void push_raw_array_buffer(CharT* buf, size_t capacity, size_t size, buffer_destroyer* destroyer = nullptr) 直接挂载用户内存

读出与切分

  • std::basic_string<CharT> to_string(this buffer&&) 合并输出并清空自身(消费型)
  • std::tuple<buffer, buffer> split(this buffer&&, size_t pos) 分割为左右两个 buffer, 并清空自身(消费型),对先前调用 clone() 的结果没有副作用

复制与修改

  • buffer clone() const 非消费式克隆(共享底层数据,适合多次读取)
  • void modify(this buffer&, size_t start, buffer<CharT> buf)start 起覆盖写入,对先前调用 clone() 的结果没有副作用

状态与工具

  • size_t size() constbool empty() constsize_t buffer_count() const
  • void clear()void swap(buffer& rhs): 对先前调用 clone() 的结果没有副作用
  • rawbuffer_iterator rawbuffers() const 遍历底层连续内存块

行为与语义

  • 消费型操作:to_string()split() 会清空被调用对象。
  • 非消费型操作:clone()rawbuffers() 不改变内容。
  • push(buffer&&) 会清空源对象(源的内容被整体转移)。
  • 传入的 std::string_view 不持有所有权;需保证其底层数据在使用期间有效。
  • 非线程安全。

错误与边界

  • split(pos)modify(start, ...) 若位置越界将抛出运行时错误。
  • to_string()/split() 后原对象被清空,再次读取将得到空结果。
  • rawbuffers() 暴露底层块指针,仅在对应帧有效期内可用;请在对象生命周期内使用。

原始文件

asco::io::file 提供直接面向操作系统的文件读写接口(不进行用户态缓冲),基于协程的异步 API,适合需要精确控制 I/O 的场景。

要点

  • 无用户态缓冲:请求直接提交到内核
  • 异步接口:open/close/reopen/write/read 均为协程;read 可被打断并恢复,open/close/reopen/write 为不可打断的 inline future
  • 独立读写指针:支持 seekg/seekp 与 tellg/tellp
  • 可取得原生句柄以与其他系统 API 协作
  • 与零拷贝缓冲区 buffer<> 协同良好

主要类型

  • file:已打开文件的句柄
  • file::options:打开选项(位标志)
  • opener:链式配置的打开器(builder)
  • open_file:底层打开实现(返回 future<std::expected<file,int>>
  • read_result:读取结果状态(eof/interrupted/again
  • file_state:文件状态视图(如 size()

打开与关闭

创建打开器

  • file::at(path):基于路径创建 opener
  • file::at():从现有 file 的上下文创建 opener(便于 reopen()

打开/关闭方法

  • future_inline<void> open(std::string path, flags<file::options> opts, uint64_t perm = 0)
  • future<void> close()
  • future_inline<void> reopen(opener&&)

注意

  • 已打开对象再次 open() 将抛出异常
  • 使用 create 选项时应提供 perm(如 0644)
  • 析构时若仍处于打开状态将自动 close()

示例

using asco::io::file;
using asco::flags;

auto r = co_await file::at("/tmp/data.log")
             .write().append().create().exclusive()
             .mode(0644)
             .open();
if (!r) co_return;                // r.error() 为 errno
file f = std::move(*r);

// 或:file f; co_await f.open("/tmp/data.log", flags{file::options::write}, 0644);
co_await f.close();

读写

写入

  • 签名:future<std::optional<buffer<>>> write(buffer<> buf)(不可中止)
  • 语义:尽量写出 buf,未全部写完则返回剩余数据;全部写尽返回 std::nullopt
  • 需以 options::write 打开

读取

  • 签名:future<std::expected<buffer<>, read_result>> read(size_t nbytes)
  • 语义:读取至多 nbytes,成功返回数据,否则返回 eof/interrupted/again
  • 需以 options::read 打开(可被中止)

示例:写到完成

asco::io::buffer<> buf("hello");
while (true) {
    auto rest = co_await f.write(std::move(buf));
    if (!rest) break;
    buf = std::move(*rest);
}

示例:读到 EOF

std::string out;
for (;;) {
    auto r = co_await f.read(4096);
    if (!r) {
        if (r.error() == asco::io::read_result::eof) break;
        if (r.error() == asco::io::read_result::interrupted) continue;
        if (r.error() == asco::io::read_result::again) { co_await std::suspend_always{}; continue; }
    } else {
        out += std::move(*r).to_string();
    }
}

指针与状态

  • size_t seekg(ssize_t offset, seekpos whence = seekpos::current)
  • size_t seekp(ssize_t offset, seekpos whence = seekpos::current)
  • size_t tellg() const
  • size_t tellp() const
  • file_state state() const(Linux:基于 fstat()

说明

  • seekpos::beginoffset 必须非负
  • seekpos::endoffset 必须非正
  • seekpos::current:允许正负偏移,但不得越界
  • 越界将抛出运行时错误

示例

f.seekp(0, asco::io::seekpos::end); // 追加写
auto pos = f.tellp();
f.seekg(0, asco::io::seekpos::begin);

原生句柄

  • raw_handle():返回底层原生句柄(Linux 为 int),可与系统 API 协作

块式读写器(block_read_writer)

block_read_writer<T> 为“块式 I/O”提供一个轻量的用户态缓冲与读写指针管理器。 它在调用方与底层原始 I/O 对象(如 file)之间维护一段活动缓冲区与独立的读/写位置, 使顺序与随机读写更易用,同时保留“无用户态缓冲的原始 I/O”的可控性: 未 flush() 前的数据仅对当前对象可见,flush() 后才落到下层 I/O。

要点

  • 读写统一接口:顺序/随机读写、独立 seekg/seekptellg/tellp
  • 显式持久化:flush() 前写入仅缓存在内存,flush() 后写入到底层 I/O
  • 读可见性:未 flush() 的写入,对同一 block_read_writer 的后续读取是可见的
  • 析构自动刷写:对象析构时会调用 flush(),尝试将缓冲写入到底层 I/O
  • 协程语义:read() 可被打断;write()flush() 不可打断
  • 模板约束:T 需满足 block_read_write 概念(例如 asco::io::file

快速上手

using asco::io::file;
using asco::io::buffer;
using asco::io::seekpos;
using asco::block_read_writer;

auto r = co_await file::at("/tmp/demo.bin")
        .read()
        .write()
        .create()
        .truncate()
        .mode(0644)
        .open();
if (!r) co_return;
block_read_writer<file> brw(std::move(*r));

// 写入两段数据(此时仅缓存在 brw 中)
co_await brw.write(buffer("Hello"));
co_await brw.write(buffer("World"));
assert(brw.tellp() == 10);

// 读取(未 flush 的写入对同一 brw 可见)
brw.seekg(0, seekpos::begin);
auto b = co_await brw.read(1024);
std::string s = b ? std::move(*b).to_string() : "";

// 显式持久化到底层 I/O (推荐在作用域结束前显式调用)
co_await brw.flush();

API 概览

模板

  • template<block_read_write T> class block_read_writer;
    • 示例:block_read_writer<asco::io::file>

构造/析构

  • block_read_writer(T&& ioo)
  • block_read_writer(const T& ioo)
  • ~block_read_writer():析构时调用 flush() 尝试刷写缓冲

读写与缓冲

  • future<std::optional<buffer<>>> read(size_t nbytes)
    • 成功时返回数据;到达 EOF 返回 std::nullopt
    • 可被打断
  • future<void> write(buffer<> buf)
    • 仅更新内部活动缓冲区与写指针;不可打断
  • future<void> flush()
    • 将内部活动缓冲区自 buffer_start 起写入到底层 I/O;不可打断

定位与查询

  • size_t seekg(ssize_t offset, seekpos whence = seekpos::current)
  • size_t seekp(ssize_t offset, seekpos whence = seekpos::current)
  • size_t tellg() const noexcept
  • size_t tellp() const noexcept

行为与语义

  • 写入可见性
    • flush() 的写入对同一 block_read_writer 的后续 read() 可见
    • 调用 flush() 后方才写入到底层 I/O 对象
  • 读写指针
    • tellg/tellp 分别为读取进度与写入进度
    • seekg/seekp 相互独立,互不影响
  • 打断语义
    • read() 支持在等待过程中被打断;write()flush() 不支持打断
  • 析构行为
    • 析构函数会调用 flush() 刷写缓冲。

示例:覆盖写与稀疏写

// 初始内容
co_await brw.write(buffer("HelloWorld"));
co_await brw.flush();

// 覆盖中间 4 字节为 "ASCO"(总长度不变)
brw.seekp(6, seekpos::begin);
co_await brw.write(buffer("ASCO"));

// 形成 10 字节洞并在其后写 'X'
brw.seekp(20, seekpos::begin);
co_await brw.write(buffer("X"));

// 未 flush 时,通过 brw 读取可见:
brw.seekg(0, seekpos::begin);
std::string all;
while (auto part = co_await brw.read(4096)) all += std::move(*part).to_string();

co_await brw.flush(); // 持久化到底层

依赖与概念

  • 需包含:<asco/io/bufio.h><asco/io/buffer.h>
  • T 必须满足 block_read_write 概念(如 asco::io::file

注意事项

  • 非线程安全
  • flush() 前,其他对象(或进程)无法从底层 I/O 中看到你的写入

进阶

本节更详细和全面的介绍了在前文没有介绍的 ASCO 提供给使用者的所有功能以及很多实现细节。

asco 异步运行时

工作线程

工作线程分为 io workercalculating worker ,运行时初始化时将合适的工作线程绑定到某个 CPU 执行。

asco 内部,“阻塞“任务被定义为一个任务会完全占用当前工作线程而无法被其它工作线程窃取的任务,它会优先分配给 calculating worker 执行。 而不是通常说的为了等待 IO 而暂停运行的含义。 然而,对于仅基于本框架开发应用程序的用户,为了减少在正常使用过程中的失误,我们避免使用 blocking 这个词汇,而是使用 core 表示。

运行时对象配置

asco::runtime 构造函数仅有一个参数,用于设置工作线程数,参数默认值为0,为0时使用 std::thread::hardware_concurrency() 作为工作线程数。

未来会加入更多配置选项。

性能统计

开启 ASCO_PERF_RECORD 选项,以启用性能统计功能。

在需要计时的异步函数开头添加 coro_pref(); 宏,或在普通函数开头添加 func_perf(); 宏以对函数计时,普通函数暂时只能统计总运行时间。

程序运行结束后,会打印性能统计结果,如:

[ASCO] Flag 'ASCO_PERF_RECORD' is enabled, please disable it if you are building your program for releasing.

active  total   counter name
1ms     2ms     205     future_inline<void> asco::sync::semaphore_base<1>::acquire() [CounterMax = 1, R = asco::core::runtime]
0ms     0ms     2       future_inline<guard> asco::sync::mutex<int>::lock() [T = int]