快速入门
Future 类型系统
ASCO 提供了三种核心的 Future 类型,用于不同的协程执行模式:future<T>、future_spawn<T> 和 future_core<T>。本文档将详细介绍这些类型的用法、生命周期与最佳实践。
类型概览
future<T>
最基础的协程返回类型,用于同步执行模式。
template<concepts::move_secure T>
using future = base::future_base<T, false, false>;
- 特点
- 同步执行:协程在被
co_await时继承调用者的工作线程 - 直接控制流:不会自动调度到其他线程
- 适用于需要线程亲和性的场景
- 同步执行:协程在被
future_spawn<T>
用于异步执行的协程返回类型。
template<concepts::move_secure T>
using future_spawn = base::future_base<T, true, false>;
- 特点
- 异步执行:协程会被调度到工作线程池中执行
- 自动负载均衡:runtime 会选择合适的工作线程
- 支持同步等待(
.await())和异步等待(co_await)
future_core<T>
核心任务的协程返回类型,用于运行时关键任务。当前行为与 future_spawn<T> 相同,但在未来的任务窃取(work stealing)机制中将获得特殊处理。
template<concepts::move_secure T>
using future_core = base::future_base<T, true, true>;
- 特点
- 异步执行:与
future_spawn<T>类似,在工作线程池中执行 - 核心标记:为即将引入的任务窃取机制预留
- 规划特性:未来将不可被其他工作线程窃取,保证在固定线程执行
- 异步执行:与
通用功能
所有 Future 类型都支持以下基本特性:
移动语义
- Future 只支持移动,禁止拷贝
- 确保任务的所有权清晰,避免资源泄漏
future<int> f1 = foo(); // OK:移动构造
auto f2 = std::move(f1); // OK:移动赋值
future<int> f3 = f1; // 错误:禁止拷贝
异常传播
- 自动捕获并传播协程中的异常
- 在
co_await或.await()时重新抛出
future<void> may_throw() {
if (error_condition)
throw my_error{};
co_return;
}
try {
co_await may_throw(); // 异常会在这里被重新抛出
} catch (const my_error& e) {
// 处理异常
}
co_invoke
auto task = co_invoke([] -> future_spawn<void> {
// 耗时任务
co_return;
});
co_await task; // lambda 表达式随协程一起销毁,此处安全
co_invoke 会将以右值(包括纯右值和将亡值)传递的可调用对象移动到协程内部,确保其生命周期覆盖整个协程执行期间。
- 常用于延长 lambda 表达式的生命周期,避免 use-after-free
值类型约束
- 要求 T 满足
move_secure概念(可移动) - 支持 void 类型(
future<void>)
执行模型
同步执行(future<T>)
future<int> compute() {
co_return 42;
}
// 在调用者线程中同步执行
future<void> caller() {
int value = co_await compute(); // 不会发生异步调度
}
异步执行(future_spawn<T>)
future_spawn<int> async_compute() {
co_return 42;
}
// 两种等待方式
future_spawn<void> async_caller() {
// 1. 异步等待(推荐)
int value = co_await async_compute();
// 2. 同步等待(仅在非工作线程中使用)
// int value = async_compute().await();
}
进阶功能
任务转换
future<T> 可以转换为 future_spawn<T> 或 future_core<T>:
future<int> normal() {
co_return 42;
}
future_spawn<int> to_spawn(future<int> f) {
return f.spawn(); // 转换为异步任务
}
future_core<int> to_core(future<int> f) {
return f.spawn_core(); // 转换为核心任务
}
transport():在 worker 之间迁移 future<T>
transport() 用于不改变 Future 类型(仍是 future<T>)的前提下,将一个 future<T> 所属的底层任务从“原 worker 的挂起任务集合”迁移到当前 worker,从而允许你在另一个 worker 上安全地 co_await 它。
这在以下场景非常常见:
- 你把一个
future<T>作为值传递/移动到了另一个以future_spawn<T>运行的协程里(例如select内部为每个分支启动的任务)。 - 该
future<T>可能已经在某个 worker 上注册并进入挂起状态;此时直接在另一个 worker 上等待它,会导致任务仍挂在旧 worker 的内部容器里,从而出现“在错误的 worker 上管理挂起/唤醒”的问题。
transport() 的核心效果:
- 若任务已被调度并处于某个 worker 管理之下,会先从原 worker 的挂起任务集合中移出,再迁入到当前 worker。
- 随后返回一个新的
future<T>,其行为等价于等待原 future:返回值与异常传播规则完全一致。
约束与注意事项:
- 仅适用于
future<T>(非 spawn 模式的 future)。 - 需要在运行时 worker 线程上下文中调用(因为它依赖
core::worker::this_worker()取得“当前 worker”)。 - 它不会把任务变成异步调度任务;如果你的目的是让任务进入线程池异步执行,请使用
.spawn()/.spawn_core()。
示例:在 future_spawn 中等待一个来自外部的 future:
future<int> make_sync_work();
future_spawn<int> async_main() {
auto f = make_sync_work();
// 假设这里之后的执行发生在某个 worker 上,且 f 可能来自/挂起于其它 worker
int v = co_await std::move(f).transport();
co_return v;
}
异常处理与忽略
ignore() 用于忽略对应 future 对象所代表的协程的返回值,并吞掉其抛出的异常(可选提供回调用于观测)。
- 返回值:
ignore()返回一个future<void>(spawn 模式),当底层协程完成时该 future 也完成。 - 行为:调用
ignore()会丢弃原协程的返回值;若协程抛出异常,默认不向上抛出;若提供了回调,会在捕获到异常时调用该回调并传入std::exception_ptr。
常见用法:在后台启动任务但不关心返回值与错误,或者只想在异常发生时记录/观测但不传播它们。
// 最常见:fire-and-forget(不等待,不传播异常)
background_task().ignore();
// 提供回调以记录异常(回调接受 std::exception_ptr)
cleanup_task().ignore([](std::exception_ptr e) {
try {
std::rethrow_exception(e);
} catch (const std::exception &ex) {
std::cerr << "Cleanup failed: " << ex.what() << '\n';
}
});
// 如果需要等待完成但仍丢弃返回值与异常,可以 co_await
co_await some_future().ignore();
最佳实践
选择合适的 Future 类型
-
默认使用
future<T>- 适用于大多数异步操作
- 需要低异步调度开销
- 无并发
-
使用
future_spawn<T>当:- 需要并发
- 自动负载均衡
- 良好的并发性能
-
使用
future_core<T>当:- 自动负载均衡
- 需要线程亲和、保持不可窃取
异步编程指南
- 注意 lambda 表达式的生命周期
// 错误示例:lambda 生命周期短于异步任务生命周期
auto task1 = []() -> future<void> {
// 任务逻辑
co_return;
}();
co_await task1; // 任务启动,但是前面的 lambda 表达式已经销毁,产生 use-after-free
auto task2 = []() -> future_spawn<void> {
// 耗时任务逻辑
co_return;
}();
co_await task2; // 任务在刚启动时行为正常,但是很快 lambda 表达式就将被销毁,产生 use-after-free
// 正确示例:使用 co_invoke 延长 lambda 表达式的生命周期
auto task = co_invoke([]() -> future<void> {
// 任务逻辑
co_return;
});
co_await task; // 任务安全执行
- 避免在工作线程中使用
.await()
// 错误:在工作线程中同步等待
void wrong() {
auto value = async_task().await(); // 会 panic
}
// 正确:使用 co_await
future_spawn<void> correct() {
auto value = co_await async_task();
}
- 正确处理异常
future_spawn<void> robust() {
try {
co_await risky_operation();
} catch (const std::exception& e) {
// 处理异常
co_return;
}
// 继续执行
}
- 合理使用
ignore()
// 适用于确实可以忽略返回值和异常的场景
background_task().ignore();
// 需要记录异常时提供回调
cleanup_task().ignore([](auto e) {
log_error("Cleanup failed", e);
});
性能考虑
-
避免不必要的任务转换
spawn()和spawn_core()会创建新的协程对象- 如果最终要异步执行,直接返回对应类型
-
合理使用同步/异步模式
- 短小操作使用
future<T>避免调度开销 - IO 密集型操作使用
future_spawn<T>提高并发度
- 短小操作使用
调试技巧
异常追踪
- ASCO 会自动记录协程创建和异常发生的调用栈
- 异常会保留原始抛出点的上下文
任务状态检查
- 可通过 task_id 在运行时中查找任务
- 支持检查任务是否已完成、是否发生异常
注意事项
- 使用
ignore()时要谨慎,确保返回值和异常可以被安全忽略
协程生成器 generator<T>
generator<T> 是 ASCO 中基于 C++ 协程实现的懒序列。它继承自 future_base,因此可以像 future 一样被调度,但每次 co_yield 会把一个元素推入内部队列,供消费端按需提取。
设计概览
- 逐项产出:生成器协程通过
co_yield把值送入continuous_queue,调用方每次等待一个元素。 - 基于
future:generator<T>依旧是异步任务,operator()返回另一个future<std::optional<T>>,因此可以在任何co_await环境里消费。 - 跨线程安全:内部通过信号量与无锁队列保证并发安全,能够在不同工作线程间生成与消费。
- 结束语义:当协程
co_return或co_yield通道关闭时,operator()会返回std::nullopt,并且operator bool()变为false。
基本流程
#include <asco/generator.h>
#include <asco/future.h>
using asco::future;
using asco::generator;
// 生产 1..n 的序列
generator<int> gen_count(int n) {
for (int i = 1; i <= n; ++i) {
co_yield i;
}
co_return;
}
future<void> consume(generator<int>& g) {
int sum = 0;
while (auto i = co_await g()) { // 每次等待一个元素
sum += *i;
}
// generator 被耗尽或停止时跳出循环
co_return;
}
调用 g() 返回一个可等待的 future,获取到 std::optional<T>。当生成器结束时返回 std::nullopt,表明序列已经耗尽。
组合消费 (operator|)
generator 支持通过 operator| 与异步函数拼接,形成更简洁的管道式写法。consumer 需要是一个返回 future 的函数,并接受生成器作为参数。
future<int> consume_sum(generator<int>& g) {
int sum = 0;
while (auto item = co_await g()) {
sum += *item;
}
co_return sum;
}
future<void> async_main() {
auto g = gen_count(5);
// 传统写法
auto sum1 = co_await consume_sum(g);
// 管道写法,会自动调用 co_invoke(consumer, generator)
auto g2 = gen_count(5);
auto sum2 = co_await (g2 | consume_sum);
co_return;
}
该操作符同样支持右值生成器,内部会移动到消费函数中:
auto total = co_await (gen_count(5) | [](auto gen) -> future<int> {
int sum = 0;
while (auto item = co_await gen()) {
sum += *item;
}
co_return sum;
});
// 消费函数还可以返回新的 generator/generator_core,实现链式加工
auto doubled = co_await (gen_count(3) | [](auto gen) -> generator<int> {
while (auto item = co_await gen()) {
co_yield *item * 2;
}
co_return;
});
异常与停止
- 生成器内部抛出的异常会在下一次
co_await generator()时重新抛出,调用方可以像处理普通异步任务一样捕获。 - 调用生成器的
operator bool()可以检测它是否仍然可继续产出值。 - 一旦协程结束或调用方显式停止(例如销毁生成器),内部队列会关闭,后续获取将立即返回
std::nullopt。
与运行时的关系
generator<T> 的调度由 ASCO 运行时处理。和 future 一样,需要在 async_main 或其他运行时任务中 co_await,以确保协程被调度执行。若要在核心 runtime 上下文内使用,可以改用 generator_core<T>,它会绑定到核心执行器。
同步原语
本章节介绍 ASCO 提供的基础同步原语,包括互斥锁(Mutex)、读写锁(RWLock)、信号量(Semaphore)与通道(Channel)。它们均为多线程/多协程安全的构件,可与 future<T>/co_await 无缝配合。
何时使用
- Mutex:需要在多个协程/线程之间串行化访问临界区或受保护对象时使用,支持
with (auto guard = co_await ...)的 RAII 写法。 - RWLock:读多写独的场景,可允许大量读者并发访问,写者持锁期间阻塞全部读者与其它写者。
- Semaphore:需要对共享资源进行计数式访问控制,或实现简单的事件/通知机制时使用。适合“一端释放(release),另一端等待(acquire)”的场景。
- Channel:需要在任务之间传递数据(消息)时使用。基于无锁 MPMC 队列封装并配合信号量实现“就绪通知”,保证 FIFO 顺序,可多生产者/多消费者。
公共特性
- 线程安全:在多线程/多协程环境下可安全调用。
- 与协程兼容:阻塞等待均以
co_await的方式暴露,不会阻塞线程。 - 异常安全:在常见使用路径中不抛出异常(若违反约束,如在通道关闭后继续发送,将触发 panic)。
目录
- Mutex:纯互斥锁及携带数据的互斥锁,推荐用法与注意事项。
- RWLock:协程友好的读写锁,支持封装对象的读写访问。
- Semaphore:计数信号量、二值信号量与不限量信号量,API 与使用示例。
- Channel:
sender<T>/receiver<T>、send/recv/try_recv/stop语义与注意事项。
典型组合用法
- 使用 Channel 进行任务间消息传递;在消费者端如果需要批处理或节流,可结合计时器
interval或sleep实现节奏控制。 - 使用 Semaphore 表达“完成/可用”事件;多个生产者可安全地
release(n)增加配额,多个消费者以协程方式co_await acquire()。
更多协程与运行时的信息,请先阅读《future<T>下的异步编程》。
Mutex(协程互斥锁)
asco::sync::mutex<> 与 asco::sync::mutex<T> 提供协程友好的互斥保护能力。它们以 future 形式返回守卫对象,利用 RAII 在离开作用域后自动释放锁,避免线程级别阻塞。
- 头文件:
#include <asco/sync/mutex.h> - 命名空间:在全局命名空间下别名为
asco::mutex
类型概览
mutex<>:纯互斥锁,不携带值,仅提供排他访问。template<typename T> mutex<T>:在互斥锁内部封装一个T实例,锁定后可通过守卫直接访问T。
两种互斥锁的 lock() 都返回 future<guard>,需使用 co_await 获取守卫。
接口
mutex<>
future<guard> lock():等待获得互斥锁。协程在竞争失败时会自旋退避,必要时挂起在内部wait_queue上。class guardguard::operator bool() const noexcept:指示守卫是否仍持有锁,被移动后的守卫返回false。- 析构时自动释放锁并唤醒等待者。
mutex<T>
future<guard> lock():同上,但守卫允许直接操作封装的T。class guardT &operator*()/T *operator->():访问被保护的对象。- 移动构造后,原守卫失效,新守卫继续持有锁。
T &&get():将内部对象移动出互斥锁,并在后续lock()时触发 panic,适用于需要一次性夺取所有权的场景。
推荐用法:with 宏
提供 with 宏(在 asco/utils/defines.h 中定义为 if):
#include <asco/sync/mutex.h>
#include <asco/utils/defines.h>
using namespace asco;
sync::mutex<> mtx;
future<void> do_work() {
with (auto guard = co_await mtx.lock()) {
// 成功获取锁后执行,作用域结束自动解锁
}
co_return;
}
with 在语义上等同于 if (auto guard = ...; guard) { ... },保证代码结构清晰且不会遗漏解锁。
示例
1. 保护共享计数器
#include <asco/future.h>
#include <asco/sync/mutex.h>
#include <asco/utils/defines.h>
#include <atomic>
using namespace asco;
sync::mutex<> counter_mutex;
int counter = 0;
std::atomic<int> active{0};
std::atomic<int> violations{0};
future_spawn<void> worker() {
with (auto guard = co_await counter_mutex.lock()) {
auto prev = active.fetch_add(1, std::memory_order_acq_rel);
if (prev != 0) {
violations.fetch_add(1, std::memory_order_acq_rel);
}
++counter;
active.fetch_sub(1, std::memory_order_acq_rel);
}
co_return;
}
2. 保护对象并原地修改
#include <asco/future.h>
#include <asco/sync/mutex.h>
#include <asco/utils/defines.h>
#include <string>
using namespace asco;
sync::mutex<std::string> name{"guest"};
future<void> rename(std::string new_name) {
with (auto guard = co_await name.lock()) {
*guard = std::move(new_name);
}
co_return;
}
future<std::string> snapshot() {
with (auto guard = co_await name.lock()) {
co_return *guard; // 复制守卫中的字符串
}
}
3. 彻底移交对象所有权
#include <asco/future.h>
#include <asco/sync/mutex.h>
using namespace asco;
sync::mutex<std::vector<int>> data;
future<std::vector<int>> take_all() {
with (auto guard = co_await data.lock()) {
co_return data.get(); // 移动出内部向量
}
}
future<void> reuse() {
co_await take_all();
// 再次锁定将触发 panic,提醒调用者对象已被移出
// co_await take_all(); // panic
co_return;
}
注意事项
- 守卫是可移动但不可复制的;请勿在移动后继续使用旧守卫。
- 如果
mutex<T>::get()被调用,内部会标记对象已被移动;继续lock()会触发 panic,用于显式暴露错误路径。 - 互斥锁内部使用自旋退避与
wait_queue协程挂起组合:在低争用场景中开销仅为原子操作,在高争用场景能避免忙等。 mutex<>只保障互斥,不提供递归锁语义;请避免在持锁期间再次调用lock()。
RWLock(协程读写锁)
asco::sync::rwlock<> 与 asco::sync::rwlock<T> 提供读多写独的协程同步原语。读操作可以并行获取共享锁,写操作以独占方式访问,被等待的协程会自动通过内部 wait_queue 挂起,不会阻塞线程。
- 头文件:
#include <asco/sync/rwlock.h> - 命名空间:在全局命名空间下别名为
asco::rwlock
类型概览
rwlock<>:纯读写锁,不附带存储,仅管理并发访问。template<typename T> rwlock<T>:内部封装一个T实例,读锁以常量视图访问,写锁以可变引用操作对象。
两种读写锁都分别提供 read() 与 write(),返回 future<read_guard> 与 future<write_guard>,需配合 co_await 使用。
接口
rwlock<>
future<read_guard> read():获取共享读锁。写者占用或排队时会自旋退避,随后挂起到读者等待队列rq。future<write_guard> write():获取独占写锁。尝试设置写意图位,等待所有读者释放并挂起到写者等待队列wq。class read_guardoperator bool() const noexcept:指示守卫是否仍有效;移动后原守卫失效。- 析构时减少读者计数,当最后一个读者离开时唤醒写者。
class write_guardoperator bool() const noexcept:移动后原守卫失效。- 析构时清除写标志,并唤醒排队写者与阻塞读者。
rwlock<T>
future<read_guard> read():取得常量读守卫,可直接访问const T。future<write_guard> write():取得可写守卫,支持修改内部对象。T &&get():在确认没有其他持有者时将内部值移动出来,后续任何read()/write()都会触发 panic,适合一次性转移所有权的场景。class read_guardconst T &operator*() const/const T *operator->() const:常量访问封装对象。
class write_guardT &operator*()/T *operator->():可变访问封装对象。
行为特性
- 读写公平:写协程在进入等待队列时会设置写意图,后续读者会挂起到
rq等待写者释放,避免写者无限饥饿。 - 协程友好:所有等待通过
co_await表达,不会阻塞线程。内部结合指数退避与wait_queue,在低争用时保持轻量,在高争用时自动挂起。 - 守卫语义:守卫可移动但不可复制;移动后原对象变为无效状态(
operator bool()返回false)。
示例
1. 在多个协程间共享配置快照
#include <asco/future.h>
#include <asco/sync/rwlock.h>
#include <asco/utils/defines.h>
#include <string>
using namespace asco;
rwlock<std::string> config{"v1"};
future<void> update_config(std::string next) {
with (auto guard = co_await config.write()) {
*guard = std::move(next);
}
co_return;
}
future<std::string> read_config() {
with (auto guard = co_await config.read()) {
co_return *guard; // 复制快照
}
}
2. 读者并发、写者独占
#include <asco/future.h>
#include <asco/invoke.h>
#include <asco/sync/rwlock.h>
#include <asco/utils/defines.h>
#include <vector>
using namespace asco;
rwlock<> resource_lock;
future_spawn<void> reader_task() {
with (auto guard = co_await resource_lock.read()) {
// 多个读者可以同时进入
}
co_return;
}
future_spawn<void> writer_task() {
with (auto guard = co_await resource_lock.write()) {
// 唯一写者,读者与其他写者都会等待
}
co_return;
}
注意事项
- 写守卫析构后会先唤醒其它写者,再唤醒读者;若需要严格保证写者优先,可在业务层面自行排队调度。
rwlock<T>::get()会永久标记内部对象已被移走,后续任意访问都会触发 panic,用于显式暴露误用。rwlock<>不提供递归锁语义;请避免在持有写锁期间再次请求读锁或写锁。- 推荐结合
with宏使用,避免手动检查守卫有效性并确保作用域结束即释放锁。
Semaphore(信号量)
asco::binary_semaphore、asco::counting_semaphore<N> 与 asco::unlimited_semaphore 提供了计数型同步原语,可用于限流、互斥(简化版)与事件通知。
- 头文件:
#include <asco/sync/semaphore.h> - 命名空间:
asco(类型别名定义在全局asco命名空间下)
类型与别名
binary_semaphore:上限为 1 的二值信号量。template<size_t N> counting_semaphore:上限为N的计数信号量。unlimited_semaphore:上限为size_t的最大值,可视为“无上限”。
底层实现均基于 semaphore_base<CountMax>。
接口
bool try_acquire() noexcept- 若计数大于 0,则原子地减 1 并返回
true;否则返回false。
- 若计数大于 0,则原子地减 1 并返回
future<void> acquire()- 若计数为 0,则当前协程挂起,直到被
release()唤醒;恢复后原子地减 1。
- 若计数为 0,则当前协程挂起,直到被
future<bool> acquire_for(const duration_type auto &timeout)- 尝试在指定超时时间内获取许可,超时则返回
false。
- 尝试在指定超时时间内获取许可,超时则返回
future<bool> acquire_until(const time_point_type auto &expire_time)- 尝试在指定时间点前获取许可,超时则返回
false。
- 尝试在指定时间点前获取许可,超时则返回
void release(size_t update = 1)- 将计数增加
min(update, CountMax - old_count),并唤醒相应数量的等待者。
- 将计数增加
特性:
acquire()以协程方式挂起,不阻塞线程。release()支持一次性增加多个许可;对上限做饱和值裁剪。- 公平性:不保证严格公平,但能够唤醒同等数量等待者。
使用示例
1. 事件通知(先通知,后等待)
#include <asco/future.h>
#include <asco/sync/semaphore.h>
using namespace asco;
future<int> async_main() {
binary_semaphore sem{0};
// 先通知
sem.release();
// 后等待:不会永久挂起
co_await sem.acquire();
co_return 0;
}
2. 等待后再通知(跨任务)
#include <asco/future.h>
#include <asco/sync/semaphore.h>
#include <atomic>
using namespace asco;
future_spawn<void> worker(binary_semaphore &sem, std::atomic<bool> &done) {
co_await sem.acquire();
done.store(true, std::memory_order::release);
co_return;
}
future<int> async_main() {
binary_semaphore sem{0};
std::atomic<bool> done{false};
auto w = worker(sem, done);
// 使等待中的任务恢复
sem.release();
co_await w;
co_return done.load(std::memory_order::acquire) ? 0 : 1;
}
3. 高并发场景(多次 release / acquire)
#include <asco/future.h>
#include <asco/sync/semaphore.h>
#include <atomic>
using namespace asco;
future_spawn<void> consumer(counting_semaphore<1000> &sem, std::atomic<size_t> &cnt) {
for (size_t i = 0; i < 1000; ++i) {
co_await sem.acquire();
cnt.fetch_add(1, std::memory_order::relaxed);
}
co_return;
}
future<int> async_main() {
counting_semaphore<1000> sem{0};
std::atomic<size_t> cnt{0};
auto c = consumer(sem, cnt);
for (size_t i = 0; i < 1000; ++i) sem.release();
co_await c;
co_return cnt.load(std::memory_order::relaxed) == 1000 ? 0 : 1;
}
注意事项
release()的update值会根据上限裁剪,避免溢出。
Notify(轻量通知器)
asco::notify 提供一个极轻量的事件通知原语,支持多个协程等待并由通知唤醒。它基于内部的 wait_queue,适合在不需要复杂状态同步、只需“唤醒即可”的场景中使用。
- 头文件:
#include <asco/sync/notify.h> - 命名空间:
asco
接口概览
yield<> wait()- 以协程方式挂起当前任务,等待
notify_one()或notify_all()唤醒。
- 以协程方式挂起当前任务,等待
void notify_one()- 唤醒至多一个等待中的协程。若当前没有等待者,通知会被丢弃(不计数、不排队)。
void notify_all()- 唤醒所有等待中的协程,同样不会记录通知历史。
使用示例
1. 简单的等待-通知
#include <asco/future.h>
#include <asco/sync/notify.h>
using namespace asco;
future_spawn<void> worker(notify &n, std::atomic<bool> &flag) {
co_await n.wait();
flag.store(true, std::memory_order_release);
co_return;
}
future<int> async_main() {
notify n;
std::atomic<bool> flag{false};
auto w = worker(n, flag);
// 执行若干工作…
n.notify_one();
co_await w;
co_return flag.load(std::memory_order_acquire) ? 0 : 1;
}
2. 广播唤醒多个等待者
#include <asco/future.h>
#include <asco/sync/notify.h>
#include <vector>
using namespace asco;
future_spawn<void> waiter(notify &n, std::atomic<int> &counter) {
co_await n.wait();
counter.fetch_add(1, std::memory_order_acq_rel);
co_return;
}
future<int> async_main() {
notify n;
std::atomic<int> counter{0};
std::vector<future_spawn<void>> waiters;
for (int i = 0; i < 3; ++i) {
waiters.push_back(waiter(n, counter));
}
n.notify_all();
for (auto &w : waiters) co_await w;
co_return counter.load(std::memory_order_acquire) == 3 ? 0 : 1;
}
注意事项
notify不会记录历史通知;在没有等待者时调用notify_one()/notify_all()会被直接丢弃。- 若需要具备通知计数或超时语义,可考虑使用
semaphore或其他同步原语。
Channel(通道)
ASCO 的 Channel 为多生产者/多消费者(MPMC)数据通道:
-
结合
unlimited_semaphore完成“有数据即通知”的等待/唤醒。 -
头文件:
#include <asco/sync/channel.h> -
命名空间:
- 类型:
asco::sender<T>,asco::receiver<T> - 工厂:
asco::channel<T>()
- 类型:
构造与类型
auto [tx, rx] = channel<T>();- 返回
sender<T>与receiver<T>的二元组,二者共享同一个内部信号量。
- 返回
- 也可传入自定义队列工厂:
channel<T>(Creator),其中Creator需满足queue::creator概念。
接口语义
sender
future<std::optional<std::tuple<T, queue::push_fail>>> send(T value)- 调用方需
co_await该future,以便在底层使用有界队列满载时自动等待配额。 - 成功:返回
std::nullopt,并唤醒一个等待的接收者。 - 失败:返回包含原值与失败原因的
std::tuple<T, queue::push_fail>:push_fail::closed:队列已关闭或停止。push_fail::full:自定义队列已满且不可再写入。
- 调用方需
void stop() noexcept- 标记底层队列为“发送端停止”。调用后不得再调用
send();否则底层会触发 panic(调试保护)。
- 标记底层队列为“发送端停止”。调用后不得再调用
bool is_stopped() const noexcept- 若发送端或接收端已停止,或未绑定到队列,返回
true。
- 若发送端或接收端已停止,或未绑定到队列,返回
receiver
std::expected<T, pop_fail> try_recv()- 若无可用数据:返回
std::unexpected(pop_fail::non_object)。 - 若通道已完全关闭且无可读对象:可能返回
std::unexpected(pop_fail::closed)。 - 若成功:返回
T。
- 若无可用数据:返回
future<std::optional<T>> recv()- 若当前无数据:协程方式挂起直至有“数据就绪”的通知;
- 唤醒后尝试读取,若成功返回
T,若关闭导致无对象可读返回std::nullopt。当底层队列为有界队列时,成功读取还会释放一个配额令发送端继续写入。
bool is_stopped() const noexcept- 当发送端或接收端停止,且当前帧已读尽时,返回
true。
- 当发送端或接收端停止,且当前帧已读尽时,返回
有序性与并发性
- Channel 保持 FIFO 顺序。
- 支持 MPMC:多个 sender/receiver 并发安全。
典型用法
1. 单生产者-单消费者
#include <asco/future.h>
#include <asco/sync/channel.h>
#include <print>
using namespace asco;
future<int> async_main() {
auto [tx, rx] = channel<int>();
// 发送
for (int i = 0; i < 5; ++i) {
if (auto r = co_await tx.send(i); r.has_value()) {
auto &[value, reason] = *r;
std::println("send failed: {} (reason = {})", value, static_cast<int>(reason));
co_return 1;
}
}
// 接收(等待式)
for (int i = 0; i < 5; ++i) {
auto v = co_await rx.recv();
if (!v || *v != i) {
std::println("recv mismatch: {}", v ? *v : -1);
co_return 1;
}
}
co_return 0;
}
2. 停止与排干
#include <asco/future.h>
#include <asco/sync/channel.h>
using namespace asco;
future<int> async_main() {
auto [tx, rx] = channel<int>();
(void)co_await tx.send(7);
(void)co_await tx.send(8);
// 停止发送端:之后不得再调用 send()
tx.stop();
// 排干剩余元素
auto a = co_await rx.recv();
auto b = co_await rx.recv();
if (!a || !b || *a != 7 || *b != 8) co_return 1;
if (!tx.is_stopped() || !rx.is_stopped()) co_return 1;
// 现在应无更多数据
auto t = rx.try_recv();
if (t.has_value()) co_return 1;
co_return 0;
}
3. 非阻塞读取与等待混合
#include <asco/future.h>
#include <asco/sync/channel.h>
#include <asco/time/interval.h>
#include <print>
using namespace asco;
using namespace std::chrono_literals;
future<int> async_main() {
auto [tx, rx] = channel<int>();
// 生产者:异步发送
auto producer = [] (sender<int> tx) -> future_spawn<void> {
for (int i = 0; i < 3; ++i) (void)co_await tx.send(i);
tx.stop();
co_return;
}(tx);
interval tick{100ms};
while (!rx.is_stopped()) {
// 先尝试非阻塞
if (auto r = rx.try_recv(); r.has_value()) {
std::println("got {}", *r);
continue;
}
// 无数据则小憩一会儿(避免空转)
co_await tick.tick();
}
co_await producer;
co_return 0;
}
注意事项
- 始终
co_await sender::send()以正确处理带界队列的背压;忽略返回future可能导致任务提前退出或绕过容量控制。 sender::send()返回值含queue::push_fail,常见原因是通道已关闭(push_fail::closed);若自定义队列有容量限制,可额外关注push_fail::full。- 调用
sender::stop()后不可再调用send();这是通道关闭的显式信号,违背会触发 panic(调试保护)。 recv()被唤醒后理论上应能读到元素;若底层已关闭并无对象可读将得到std::nullopt。- 使用
try_recv()判空时,请正确处理pop_fail::non_object与pop_fail::closed。
进阶(Advance)导读
本章面向已经熟悉 ASCO 基本概念(例如 future、协程、以及同步原语)的读者,介绍更贴近运行时实现与系统集成的高级主题。目标是帮助你:
- 理解框架内部的守护线程与运行时组件如何协作;
- 学会扩展和定制守护/后台服务;
- 了解不可恢复错误(panic)与断言(assert)的处理与集成方案;
- 获取与生产环境相关的实践建议(生命周期、停止策略、日志与崩溃处理)。
当前本章包含的主题(将持续扩展):
- Panic — ASCO 的 panic 机制,支持协程栈回溯与自定义回调,适用于不可恢复的致命错误处理。
- 带有堆栈回溯的断言 (asco_assert) — 基于 panic 框架实现的断言宏,支持表达式定位与失败处理集成。
- 守护线程基类
daemon— 一个用于实现后台服务/周期性任务的轻量基类,封装了线程生命周期、初始化同步与唤醒等待逻辑。
交互与实践建议:
- 若你希望把守护线程与任务队列、channel 或 worker pool 集成,本章后续会给出示例;欢迎在仓库 Issue 中提出常见场景。
- 本章不直接覆盖
future或协程语义的基础内容;若需要可在“快速入门”或“future”章节补充链接。
Panic 模块(asco::panic/panic.h)
本文档介绍 ASCO 框架中的 panic 机制及其头文件 panic/panic.h 的接口与用法。
概述
Panic 用于处理不可恢复的致命错误,如断言失败、严重逻辑漏洞等。调用 panic 后,程序会立即终止,并可输出详细的错误信息与栈回溯,便于定位问题。
主要接口
1. panic(std::string msg) noexcept
- 触发 panic,输出指定消息,终止程序。
[[noreturn]]:不会返回。- 推荐用于普通同步场景。
2. co_panic(std::string msg) noexcept
- 协程环境下触发 panic,输出消息并终止。
- 用法与
panic类似。
3. 格式化接口
- 支持 C++20 std::format 风格:
panic("错误码: {},原因: {}", code, reason);
co_panic("协程异常: {}", info);
- 自动格式化参数并输出。
4. register_callback(std::function<void(cpptrace::stacktrace &, std::string_view)> cb)
- 注册 panic 回调,可自定义 panic 行为(如日志落地、核心转储等)。
- 回调参数:
cpptrace::stacktrace &:当前栈(包括异步调用链,当使用 co_panic 时)回溯信息。std::string_view:panic 消息。
- 适合集成自定义监控、调试工具。
使用示例
#include <asco/panic/panic.h>
void fatal_error() {
asco::panic::panic("致命错误,无法恢复");
}
void format_example(int code) {
asco::panic::panic("错误码: {}", code);
}
// 注册自定义回调
asco::panic::register_callback([](cpptrace::stacktrace &st, std::string_view msg) {
// 可在此保存日志、输出栈(异步调用链)信息等
});
设计要点
- panic 只用于绝不应继续执行的场景。
- 格式化接口便于输出结构化错误信息。
- 回调机制支持扩展和集成第三方工具。
- 协程专用接口保证异步场景下的正确终止。
与断言的关系
asco_assert断言失败时会调用 panic,统一致命错误处理路径。- panic 可作为所有“不可恢复”错误的终极出口。
带有 stacktrace 的动态断言 (asco_assert)
本文档介绍 ASCO 的断言宏 asco_assert 以及其底层实现、使用方式与最佳实践。
概述
asco_assert(expr, [hint]) 用于在运行期验证永远应该为真的条件。若条件失败,它会调用内部的 asco::assert_failed,并最终触发 panic::panic,以不可恢复的方式终止程序([[noreturn]])。
与普通错误处理不同,断言面向开发阶段的逻辑不变量:
- 发现程序员假设被破坏的最早时机。
- 在失败点提供清晰的表达式字符串和可选提示信息。
- 将控制权交给 panic 框架,统一异常终止路径(着色输出、栈展开/回溯等)。
接口说明
宏:asco_assert
#define asco_assert(expr, ...) \
do { \
if (!(expr)) { \
asco::assert_failed(#expr, ##__VA_ARGS__); \
} \
} while (0)
要点:
expr只会被求值一次(包在if (!(expr))中),避免副作用重复执行。- 失败时字符串化表达式:
#expr,便于定位逻辑。 - 可选参数(提示
hint)通过 GNU 扩展##__VA_ARGS__消除空参数的逗号。 - 展开后调用对应该签名的
asco::assert_failed。
函数:asco::assert_failed
[[noreturn]] void assert_failed(std::string_view expr);
[[noreturn]] void assert_failed(std::string_view expr, std::string_view hint);
实现(assert.cpp)内部直接委托给:
panic::co_panic("Assertion failed on {}", expr);
panic::co_panic("Assertion failed on {}: {}", expr, hint);
因此:
- 所有断言失败统一走 panic 终止路径;
- 输出格式固定,便于日志检索;
- 由于采用
std::string_view,传入的hint应为字符串字面量或生命周期足够长的缓冲(避免悬垂)。
使用示例
基本用法
int idx = compute_index();
asco_assert(idx >= 0, "索引必须非负");
失败输出示例:
Assertion failed on idx >= 0: 索引必须非负
无提示信息
asco_assert(ptr != nullptr);
输出:
Assertion failed on ptr != nullptr
结合内部不变量
struct range { int l; int r; };
void normalize(range& rg) {
asco_assert(rg.l <= rg.r, "range 左值不能大于右值");
// ... normalize logic
}
避免副作用重复
虽然表达式只执行一次,仍建议保持无副作用:
// 不推荐:含副作用的断言表达式会降低可读性
asco_assert(vec.pop_back() > 0, "不应为空");
与 Panic 框架的集成
断言失败直接调用 panic::co_panic:
- 继承 panic 框架的彩色/结构化输出(若已实现)。
- 可以统一在 co_panic 处理路径中做栈回溯、日志落地、核心转储等。
- 所有断言失败为“致命”级别,不返回调用者。
这使断言只负责“检测 + 定位”,而终止策略(如是否生成 dump)完全由 panic 系统集中配置,降低分散的错误处理复杂度。
性能与开销
当前实现无条件启用:
- 每个断言在成功时的开销 ≈ 一次条件分支 + 未触发时的宏展开(极低)。
- 失败路径较重(格式化 + panic 逻辑)。
在性能关键路径依旧可以使用断言,但需确保:
- 断言表达式本身是 O(1) 且无昂贵副作用;
- 不使用复杂的临时格式化(
hint仅接受一个std::string_view,避免构造代价)。
最佳实践
-
只断言“绝不该失败”的内部不变量;可预期失败的输入应做显式校验与返回错误。
-
保持表达式短小、可读:逻辑复杂时先拆成局部变量再断言。
-
hint写明为何“不变量应成立”,而非简单重复表达式。例:"range 必须标准化后 l <= r"。 -
不在断言中做资源释放等副作用动作,断言失败后不会继续执行。
-
若需要更丰富提示(多变量格式化),建议在调用前自行构造稳定的字符串缓冲再传入(目前接口只接受一个
std::string_view)。
与标准库 / 其他框架对比
| 项目 | 行为 | 自定义提示 | 终止统一性 |
|---|---|---|---|
assert(expr) (C) | 条件失败 abort() | 需修改源码 | 分散 |
std::assert (宏) | 同上 | 需修改源码 | 分散 |
asco_assert | 进入 panic 流程(可扩展统一终止) | 有 | 统一 |
常见误用与规避
| 误用 | 风险说明 | 推荐做法 |
|---|---|---|
| 在表达式中包含副作用 | 语义不清 | 将副作用前置,断言纯条件 |
| 用断言替代输入合法性校验 | 用户可触发崩溃 | 使用返回值/错误码/异常 |
| 传入临时构造的短生命周期缓冲 | 悬垂引用 | 使用字符串字面量或静态缓存 |
asco::core::daemon(守护线程基类)
本文档说明如何继承和使用 asco::core::daemon 类型来实现后台守护线程。
概述
asco::core::daemon 是一个轻量的守护线程基类,封装了线程生命周期、初始化同步、以及唤醒/睡眠的常用逻辑。典型用法是继承该类并覆盖 init()、run_once(std::stop_token &) 和 shutdown() 三个虚方法来实现特定行为。
核心特性:
- 使用
std::jthread管理后台线程,支持基于std::stop_token的干净停止请求。 - 提供
awake()/sleep_until_awake*()系列方法用于线程间唤醒与有时限的等待。 - 构造/启动/初始化通过
start()+ 返回的 RAIIinit_waiter实现线程初始化同步,确保启动方能等待后台线程完成init()。 - 析构函数会请求停止并等待后台线程安全退出。
公有 API(概要)
daemon(std::string name):构造器,name在 Linux 上用于线程命名(可读性和调试)。~daemon():析构函数,会请求线程停止、唤醒线程(以解除阻塞),并join()线程。void awake():释放内部信号量,唤醒调用sleep_until_awake()或其变体而阻塞的守护线程。
受保护的辅助接口(给子类使用):
init_waiter start():启动后台线程并返回一个init_waiter。init_waiter的析构函数会等待后台线程释放init_sem,因此可通过 RAII 在启动代码中等待初始化完成。sleep_until_awake()/sleep_until_awake_for(...)/sleep_until_awake_before(...):基于内部std::binary_semaphore的阻塞/有时限等待,用于在run_once中等待事件或超时。
虚方法(子类通常需要覆盖)
virtual bool init():线程刚启动时调用一次。默认返回true。返回false表示初始化失败,线程将调用shutdown()并退出。virtual bool run_once(std::stop_token &st):守护主循环中每次执行的工作。返回true表示继续循环,返回false或在stop_requested()为真时退出循环。默认实现会sleep_until_awake()并返回true。virtual void shutdown():线程退出前的清理逻辑。
生命周期与典型启动模式
- 在子类构造函数或初始化函数中调用
start()(受保护,因此通常在子类内部调用),它会创建后台jthread并立即返回一个init_waiter。 - 当
init_waiter离开作用域时,其析构函数会阻塞直到后台线程完成init()并释放内部信号量,这样启动方就可以等待初始化完成。 - 后台线程在循环中调用
run_once(st),直到stop_requested()被置位或run_once返回false。 - 析构函数会发起
request_stop(),并调用awake()以解除阻塞,然后join()线程,保证线程安全退出。
示例(最小子类实现):
class my_daemon : public asco::core::daemon {
public:
my_daemon() : daemon("my_daemon") {
auto waiter = start(); // 启动线程并在 waiter 析构时等待 init 完成
}
protected:
bool init() override {
// 初始化资源
return true; // 返回 false 可终止线程
}
bool run_once(std::stop_token &st) override {
// 等待唤醒或定时任务
sleep_until_awake_for(std::chrono::milliseconds(500));
if (st.stop_requested()) return false;
// 执行一次工作
do_work();
return true; // 继续循环
}
void shutdown() override {
// 清理资源
}
};
在外部触发工作或唤醒守护线程:
my_daemon d;
// 触发守护线程做一次立即处理
d.awake();
设计细节与注意事项
start()是protected的:意在由子类控制何时启动线程(例如在子类构造流程中)。init_waiter的析构会acquire内部信号量,确保调用方等待完成;不要将其返回到会较晚析构的上下文,否则可能阻塞过久。run_once接受std::stop_token &:在实现中应检查st.stop_requested(),并在请求停止时尽快返回false。- 析构过程中
daemon会调用awake()以确保如果线程正阻塞在sleep_until_awake()上,则能被唤醒并退出。 - Linux 平台会把后台线程命名为构造时提供的
name(通过pthread_setname_np),便于调试与崩溃分析。
推荐实践
- 将耗时或阻塞的 I/O 放在
run_once中,并确保响应stop_requested()。 - 避免在
init()中执行可能长时间阻塞的操作(或在init()内使用超时机制),因为调用start()的上下文会等待init_waiter完成。 awake()语义是“通知有新工作”,而不是强制中断正在执行的工作;若需要立即中断复杂任务,结合stop_token使用。- 若需要周期性工作,结合
sleep_until_awake_for()或sleep_until_awake_before()实现合理的等待策略。
常见问题
-
Q: 我可以在
run_once中抛异常吗?- A: 当前基类未显式捕获异常,抛出异常会导致线程异常终止。建议在
run_once中自行捕获并在shutdown()中做清理,或通过panic报告致命错误。
- A: 当前基类未显式捕获异常,抛出异常会导致线程异常终止。建议在
-
Q:
start()返回的init_waiter需要手动保存吗?- A: 不需要长时间保存。通常以局部变量持有,确保在期望等待初始化完成的作用域结束时析构即可。
Context(协程上下文)
asco::context(底层定义在 asco::contexts::context)提供了一个轻量的协程取消原语,可用于在多个协程之间传播“停止/超时”信号。它与 notify 结合使用,支持显式取消与超时自动取消。
- 头文件:
#include <asco/context.h> - 命名空间:
asco(通过别名导出)
创建方式
static std::shared_ptr<context> with_cancel()- 创建一个手动可取消的上下文,初始状态为 未取消。
static std::shared_ptr<context> with_timeout(const duration_type auto &dur)- 创建一个上下文并在
dur后自动取消。内部会启动一个协程调用cancel()。
- 创建一个上下文并在
两个工厂函数都返回 std::shared_ptr<context>。使用共享指针便于在多个协程中传播同一取消源。
取消与状态查询
future<void> cancel()- 将上下文标记为已取消,并唤醒所有等待该上下文的协程,然后执行取消回调。该协程可以被并发调用;每次调用都会执行已注册的回调。
future<void> set_cancel_callback(std::function<void()>)- 注册一个在取消发生时调用的回调。若上下文已取消,请先重新检查状态;回调仅在随后执行的
cancel()协程中触发。 - 回调实现必须是可重入的,并且要做好幂等处理,以便在并发取消时安全地再次触发取消或操作上下文。
- 注册一个在取消发生时调用的回调。若上下文已取消,请先重新检查状态;回调仅在随后执行的
bool is_cancelled() const noexcept- 查询当前是否处于已取消状态。
取消操作是幂等的;重复调用 cancel() 不会带来额外副作用。
等待取消
context 定义了成员 operator co_await(),同时也为 std::shared_ptr<context> 提供了自由函数 operator co_await()。因此既可以在上下文对象本身上 co_await ctx_ref;,也可以直接 co_await ctx_ptr; 来等待取消事件:
- 若上下文尚未取消,当前协程将挂起,直到
cancel()或超时触发。 - 若上下文已取消,则立即恢复,且
co_await不返回任何额外数据:它仅表示“取消已经发生”。
示例:
#include <asco/context.h>
#include <asco/future.h>
#include <asco/time/sleep.h>
using namespace asco;
future_spawn<void> worker(context &ctx_ref, std::atomic<bool> &flag) {
co_await ctx_ref; // 等待取消信号
flag.store(true, std::memory_order_release);
co_return;
}
future<int> async_main() {
auto ctx = context::with_cancel();
std::atomic<bool> flag{false};
auto w = worker(*ctx, flag); // 也可以 co_await ctx(shared_ptr)
// 进行其他操作…
co_await sleep_for(10ms);
co_await ctx->cancel(); // 通知所有等待方并等待回调执行
co_await w;
return flag.load(std::memory_order_acquire) ? 0 : 1;
}
与超时结合
with_timeout() 会在后台调用 sleep_for() 后自动取消:
future<int> async_main() {
auto ctx = context::with_timeout(50ms);
co_await ctx; // 最多等待 50ms
// 此时 ctx 已经处于取消状态
if (!ctx->is_cancelled()) {
co_return 1;
}
co_return 0;
}
注意事项
context仅负责取消信号的传播,不携带附加信息。若需要携带错误码或取消原因,请在业务代码中自行维护。- 上下文内部使用
notify唤醒等待者;在没有协程等待时调用cancel()也会正确记录状态,随后等待者会立即返回。 - 取消回调必须是可重入的,且需要自行保证并发调用时的幂等性。
Join Set 聚合器
asco:::join_set<T = void> 提供一个轻量的“任务收集器”,用于在多个异步计算完成后,以生成器形式逐个取回结果。它内部使用 channel<T> 管道按完成顺序输送值,适合将若干协程并发运行、再统一遍历其结果。
- 头文件:
#include <asco/join_set.h> - 依赖组件:
future、generator、channel
基础用法
典型流程:
- 创建
join_set<T>实例。 - 调用
spawn(fn)注册若干异步任务。fn必须结果类型统一为T。 - 调用
join()得到generator<T>,随后通过co_await按完成顺序消费结果。
示例:
#include <asco/future.h>
#include <asco/join_set.h>
#include <asco/time/sleep.h>
using namespace asco;
future<int> async_main() {
base::join_set<int> set;
for (int i = 0; i < 3; ++i) {
set.spawn([i]() -> future<int> {
co_await sleep_for(std::chrono::milliseconds{10 * (i + 1)});
co_return i;
})
.ignore();
}
auto results = set.join();
while (auto value = co_await results()) {
std::println("result = {}", *value);
}
co_return 0;
}
接口概览
explicit join_set(Creator ctor):允许自定义底层channel的创建器;默认使用continuous_queue。future_spawn<void> spawn(Fn &&fn):启动一个异步任务,将其结果推入内部channel。Fn必须返回future<T>或future_spawn<T>,否则编译失败。generator<T> join():停止接受新任务(内部关闭sender),并返回一个生成器。生成器被迭代完毕后会自动退出。
行为与保证
- 结果顺序等于任务实际完成顺序;如果多个任务同时完成,顺序取决于
channel的排队次序。 - 任务抛出异常时会沿
future传播;可以在调用处使用.ignore(on_exception)或额外捕获。
使用建议
- 若任务很多,建议结合
std::for_each/ranges批量调用spawn,并尽早.ignore()避免遗漏。 - 如果需要自定义任务队列特性(例如环形缓冲区),可以提供自定义
Creator参数构造join_set。
Select 选择器
select 用于“竞速”多个异步分支:同时启动(或等待)若干个可取消分支,返回第一个完成的分支结果,并触发其余分支尽快结束。
典型用途:
- 同时等待多个异步操作,谁先完成就取谁;
- 把“超时 / 取消信号 / 数据到达”等事件与主任务一起竞速;
- 用一个统一的
std::variant承载“本次到底是哪条分支赢了”。
基本用法
select 必须在 worker 线程(也就是协程运行时)里构造。
#include <asco/future.h>
#include <asco/select.h>
using namespace asco;
future<int> async_main() {
auto sele = asco::select{}
.along_with([](std::shared_ptr<context>) -> future<int> { co_return 42; })
.along_with([](std::shared_ptr<context>) -> future_spawn<float> { co_return 3.14f; });
auto v = co_await sele;
std::visit(
[](auto&& br) {
using B = std::decay_t<decltype(br)>;
if constexpr (B::branch_index == 0) {
int x = *br;
(void)x;
} else if constexpr (B::branch_index == 1) {
float y = *br;
(void)y;
}
},
v);
co_return 0;
}
分支类型(along_with 重载)
select 支持两类分支:
1) cancellable_function 分支(会注入 select 内部 ctx)
形如:
Fn(std::shared_ptr<context> ctx, Args...)
ctx由select内部创建并注入;当某个分支胜出后,select会取消这个内部ctx,用于通知其它分支尽快退出。Fn可以返回future<T>或future_spawn<T>(以及其它满足框架约束的 future 类型)。
2) cancellable_waitable 分支(等待一个“外部对象”完成)
形如:
sele.along_with(waitable);
其中 waitable 满足:
waitable->operator co_await()返回yield<notify*>(即它本质是个“可被 notify 唤醒的等待点”);waitable->get_notify()返回notify&;deliver_type必须是void。
最常见的例子是 std::shared_ptr<context>:co_await ctx; 会等待它被取消或被内部 notify 对象唤醒。
重要:把
std::shared_ptr<context>作为 waitable 传给.along_with(ctx)时,它是“一个等待分支”。
branch_index 规则(索引语义)
每次调用一次 .along_with(...),都会在 select 中追加一个分支,并赋予该分支一个稳定的索引:
- 第一个追加的分支索引为
0 - 第二个为
1 - 以此类推
在返回值的 std::variant 中,可以通过 T::branch_index 判断是哪条分支胜出。
返回值类型
co_await select 的返回值是一个 std::variant:每个备选项都是 branch<I, T>。
I是分支索引(见上节)。T是该分支的返回类型;若分支返回void,则会被替换为std::monostate。branch提供operator*/operator->,用于访问返回值。
直观理解:如果你按顺序追加了 N 个分支,它的返回类型等价于:
future<std::variant<
branch<0, monostate_if_void<T0>>,
branch<1, monostate_if_void<T1>>,
...,
branch<N-1, monostate_if_void<TN_1>>
>>
其中 Tk 是第 k 个分支的 deliver type。
取消与唤醒语义
当某个分支胜出后,select 会做两件事来让其它分支尽快结束:
-
取消“内部 ctx”
- 这只影响 cancellable_function 分支中收到的那个
ctx; - 你的分支可以
co_await ctx;或检查ctx->is_cancelled()来退出。
- 这只影响 cancellable_function 分支中收到的那个
-
对所有 waitable 分支的
notify调用notify_all()- 这会把 waitable 从等待中唤醒,让它们有机会结束。
注意:对外部 waitable(例如你传入的
std::shared_ptr<context>)来说,notify_all()只是“唤醒等待”,并不会调用cancel()改变其 cancelled 状态。
异常语义
- 如果胜出的分支抛出异常,
select会在取消/唤醒其它分支后,把异常重新抛出给co_await select的调用方。