Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

快速入门

Future 类型系统

ASCO 提供了三种核心的 Future 类型,用于不同的协程执行模式:future<T>future_spawn<T>future_core<T>。本文档将详细介绍这些类型的用法、生命周期与最佳实践。

类型概览

future<T>

最基础的协程返回类型,用于同步执行模式。

template<concepts::move_secure T>
using future = base::future_base<T, false, false>;
  • 特点
    • 同步执行:协程在被 co_await 时继承调用者的工作线程
    • 直接控制流:不会自动调度到其他线程
    • 适用于需要线程亲和性的场景

future_spawn<T>

用于异步执行的协程返回类型。

template<concepts::move_secure T>
using future_spawn = base::future_base<T, true, false>;
  • 特点
    • 异步执行:协程会被调度到工作线程池中执行
    • 自动负载均衡:runtime 会选择合适的工作线程
    • 支持同步等待(.await())和异步等待(co_await

future_core<T>

核心任务的协程返回类型,用于运行时关键任务。当前行为与 future_spawn<T> 相同,但在未来的任务窃取(work stealing)机制中将获得特殊处理。

template<concepts::move_secure T>
using future_core = base::future_base<T, true, true>;
  • 特点
    • 异步执行:与 future_spawn<T> 类似,在工作线程池中执行
    • 核心标记:为即将引入的任务窃取机制预留
    • 规划特性:未来将不可被其他工作线程窃取,保证在固定线程执行

通用功能

所有 Future 类型都支持以下基本特性:

移动语义

  • Future 只支持移动,禁止拷贝
  • 确保任务的所有权清晰,避免资源泄漏
future<int> f1 = foo();     // OK:移动构造
auto f2 = std::move(f1);    // OK:移动赋值
future<int> f3 = f1;        // 错误:禁止拷贝

异常传播

  • 自动捕获并传播协程中的异常
  • co_await.await() 时重新抛出
future<void> may_throw() {
    if (error_condition)
        throw my_error{};
    co_return;
}

try {
    co_await may_throw();  // 异常会在这里被重新抛出
} catch (const my_error& e) {
    // 处理异常
}

co_invoke

auto task = co_invoke([] -> future_spawn<void> {
    // 耗时任务
    co_return;
});
co_await task;  // lambda 表达式随协程一起销毁,此处安全

co_invoke 会将以右值(包括纯右值和将亡值)传递的可调用对象移动到协程内部,确保其生命周期覆盖整个协程执行期间。

  • 常用于延长 lambda 表达式的生命周期,避免 use-after-free

值类型约束

  • 要求 T 满足 move_secure 概念(可移动)
  • 支持 void 类型(future<void>

执行模型

同步执行(future<T>

future<int> compute() {
    co_return 42;
}

// 在调用者线程中同步执行
future<void> caller() {
    int value = co_await compute();  // 不会发生异步调度
}

异步执行(future_spawn<T>

future_spawn<int> async_compute() {
    co_return 42;
}

// 两种等待方式
future_spawn<void> async_caller() {
    // 1. 异步等待(推荐)
    int value = co_await async_compute();
    
    // 2. 同步等待(仅在非工作线程中使用)
    // int value = async_compute().await();
}

进阶功能

任务转换

future<T> 可以转换为 future_spawn<T>future_core<T>

future<int> normal() {
    co_return 42;
}

future_spawn<int> to_spawn(future<int> f) {
    return f.spawn();  // 转换为异步任务
}

future_core<int> to_core(future<int> f) {
    return f.spawn_core();  // 转换为核心任务
}

transport():在 worker 之间迁移 future<T>

transport() 用于不改变 Future 类型(仍是 future<T>的前提下,将一个 future<T> 所属的底层任务从“原 worker 的挂起任务集合”迁移到当前 worker,从而允许你在另一个 worker 上安全地 co_await 它。

这在以下场景非常常见:

  • 你把一个 future<T> 作为值传递/移动到了另一个以 future_spawn<T> 运行的协程里(例如 select 内部为每个分支启动的任务)。
  • future<T> 可能已经在某个 worker 上注册并进入挂起状态;此时直接在另一个 worker 上等待它,会导致任务仍挂在旧 worker 的内部容器里,从而出现“在错误的 worker 上管理挂起/唤醒”的问题。

transport() 的核心效果:

  • 若任务已被调度并处于某个 worker 管理之下,会先从原 worker 的挂起任务集合中移出,再迁入到当前 worker。
  • 随后返回一个新的 future<T>,其行为等价于等待原 future:返回值与异常传播规则完全一致。

约束与注意事项:

  • 仅适用于 future<T>(非 spawn 模式的 future)。
  • 需要在运行时 worker 线程上下文中调用(因为它依赖 core::worker::this_worker() 取得“当前 worker”)。
  • 不会把任务变成异步调度任务;如果你的目的是让任务进入线程池异步执行,请使用 .spawn()/.spawn_core()

示例:在 future_spawn 中等待一个来自外部的 future

future<int> make_sync_work();

future_spawn<int> async_main() {
    auto f = make_sync_work();

    // 假设这里之后的执行发生在某个 worker 上,且 f 可能来自/挂起于其它 worker
    int v = co_await std::move(f).transport();
    co_return v;
}

异常处理与忽略

ignore() 用于忽略对应 future 对象所代表的协程的返回值,并吞掉其抛出的异常(可选提供回调用于观测)。

  • 返回值:ignore() 返回一个 future<void>(spawn 模式),当底层协程完成时该 future 也完成。
  • 行为:调用 ignore() 会丢弃原协程的返回值;若协程抛出异常,默认不向上抛出;若提供了回调,会在捕获到异常时调用该回调并传入 std::exception_ptr

常见用法:在后台启动任务但不关心返回值与错误,或者只想在异常发生时记录/观测但不传播它们。

// 最常见:fire-and-forget(不等待,不传播异常)
background_task().ignore();

// 提供回调以记录异常(回调接受 std::exception_ptr)
cleanup_task().ignore([](std::exception_ptr e) {
    try {
        std::rethrow_exception(e);
    } catch (const std::exception &ex) {
        std::cerr << "Cleanup failed: " << ex.what() << '\n';
    }
});

// 如果需要等待完成但仍丢弃返回值与异常,可以 co_await
co_await some_future().ignore();

最佳实践

选择合适的 Future 类型

  1. 默认使用 future<T>

    • 适用于大多数异步操作
    • 需要低异步调度开销
    • 无并发
  2. 使用 future_spawn<T> 当:

    • 需要并发
    • 自动负载均衡
    • 良好的并发性能
  3. 使用 future_core<T> 当:

    • 自动负载均衡
    • 需要线程亲和、保持不可窃取

异步编程指南

  • 注意 lambda 表达式的生命周期
// 错误示例:lambda 生命周期短于异步任务生命周期

auto task1 = []() -> future<void> {
    // 任务逻辑
    co_return;
}();
co_await task1; // 任务启动,但是前面的 lambda 表达式已经销毁,产生 use-after-free

auto task2 = []() -> future_spawn<void> {
    // 耗时任务逻辑
    co_return;
}();
co_await task2; // 任务在刚启动时行为正常,但是很快 lambda 表达式就将被销毁,产生 use-after-free

// 正确示例:使用 co_invoke 延长 lambda 表达式的生命周期

auto task = co_invoke([]() -> future<void> {
    // 任务逻辑
    co_return;
});
co_await task; // 任务安全执行
  • 避免在工作线程中使用 .await()
// 错误:在工作线程中同步等待
void wrong() {
    auto value = async_task().await();  // 会 panic
}

// 正确:使用 co_await
future_spawn<void> correct() {
    auto value = co_await async_task();
}
  • 正确处理异常
future_spawn<void> robust() {
    try {
        co_await risky_operation();
    } catch (const std::exception& e) {
        // 处理异常
        co_return;
    }
    // 继续执行
}
  • 合理使用 ignore()
// 适用于确实可以忽略返回值和异常的场景
background_task().ignore();

// 需要记录异常时提供回调
cleanup_task().ignore([](auto e) {
    log_error("Cleanup failed", e);
});

性能考虑

  1. 避免不必要的任务转换

    • spawn()spawn_core() 会创建新的协程对象
    • 如果最终要异步执行,直接返回对应类型
  2. 合理使用同步/异步模式

    • 短小操作使用 future<T> 避免调度开销
    • IO 密集型操作使用 future_spawn<T> 提高并发度

调试技巧

异常追踪

  • ASCO 会自动记录协程创建和异常发生的调用栈
  • 异常会保留原始抛出点的上下文

任务状态检查

  • 可通过 task_id 在运行时中查找任务
  • 支持检查任务是否已完成、是否发生异常

注意事项

  • 使用 ignore() 时要谨慎,确保返回值和异常可以被安全忽略

协程生成器 generator<T>

generator<T> 是 ASCO 中基于 C++ 协程实现的懒序列。它继承自 future_base,因此可以像 future 一样被调度,但每次 co_yield 会把一个元素推入内部队列,供消费端按需提取。

设计概览

  • 逐项产出:生成器协程通过 co_yield 把值送入 continuous_queue,调用方每次等待一个元素。
  • 基于 futuregenerator<T> 依旧是异步任务,operator() 返回另一个 future<std::optional<T>>,因此可以在任何 co_await 环境里消费。
  • 跨线程安全:内部通过信号量与无锁队列保证并发安全,能够在不同工作线程间生成与消费。
  • 结束语义:当协程 co_returnco_yield 通道关闭时,operator() 会返回 std::nullopt,并且 operator bool() 变为 false

基本流程

#include <asco/generator.h>
#include <asco/future.h>

using asco::future;
using asco::generator;

// 生产 1..n 的序列
generator<int> gen_count(int n) {
    for (int i = 1; i <= n; ++i) {
        co_yield i;
    }
    co_return;
}

future<void> consume(generator<int>& g) {
    int sum = 0;
    while (auto i = co_await g()) { // 每次等待一个元素
        sum += *i;
    }
    // generator 被耗尽或停止时跳出循环
    co_return;
}

调用 g() 返回一个可等待的 future,获取到 std::optional<T>。当生成器结束时返回 std::nullopt,表明序列已经耗尽。

组合消费 (operator|)

generator 支持通过 operator| 与异步函数拼接,形成更简洁的管道式写法。consumer 需要是一个返回 future 的函数,并接受生成器作为参数。

future<int> consume_sum(generator<int>& g) {
    int sum = 0;
    while (auto item = co_await g()) {
        sum += *item;
    }
    co_return sum;
}

future<void> async_main() {
    auto g = gen_count(5);

    // 传统写法
    auto sum1 = co_await consume_sum(g);

    // 管道写法,会自动调用 co_invoke(consumer, generator)
    auto g2 = gen_count(5);
    auto sum2 = co_await (g2 | consume_sum);

    co_return;
}

该操作符同样支持右值生成器,内部会移动到消费函数中:

auto total = co_await (gen_count(5) | [](auto gen) -> future<int> {
    int sum = 0;
    while (auto item = co_await gen()) {
        sum += *item;
    }
    co_return sum;
});

// 消费函数还可以返回新的 generator/generator_core,实现链式加工
auto doubled = co_await (gen_count(3) | [](auto gen) -> generator<int> {
    while (auto item = co_await gen()) {
        co_yield *item * 2;
    }
    co_return;
});

异常与停止

  • 生成器内部抛出的异常会在下一次 co_await generator() 时重新抛出,调用方可以像处理普通异步任务一样捕获。
  • 调用生成器的 operator bool() 可以检测它是否仍然可继续产出值。
  • 一旦协程结束或调用方显式停止(例如销毁生成器),内部队列会关闭,后续获取将立即返回 std::nullopt

与运行时的关系

generator<T> 的调度由 ASCO 运行时处理。和 future 一样,需要在 async_main 或其他运行时任务中 co_await,以确保协程被调度执行。若要在核心 runtime 上下文内使用,可以改用 generator_core<T>,它会绑定到核心执行器。

同步原语

本章节介绍 ASCO 提供的基础同步原语,包括互斥锁(Mutex)、读写锁(RWLock)、信号量(Semaphore)与通道(Channel)。它们均为多线程/多协程安全的构件,可与 future<T>/co_await 无缝配合。

何时使用

  • Mutex:需要在多个协程/线程之间串行化访问临界区或受保护对象时使用,支持 with (auto guard = co_await ...) 的 RAII 写法。
  • RWLock:读多写独的场景,可允许大量读者并发访问,写者持锁期间阻塞全部读者与其它写者。
  • Semaphore:需要对共享资源进行计数式访问控制,或实现简单的事件/通知机制时使用。适合“一端释放(release),另一端等待(acquire)”的场景。
  • Channel:需要在任务之间传递数据(消息)时使用。基于无锁 MPMC 队列封装并配合信号量实现“就绪通知”,保证 FIFO 顺序,可多生产者/多消费者。

公共特性

  • 线程安全:在多线程/多协程环境下可安全调用。
  • 与协程兼容:阻塞等待均以 co_await 的方式暴露,不会阻塞线程。
  • 异常安全:在常见使用路径中不抛出异常(若违反约束,如在通道关闭后继续发送,将触发 panic)。

目录

  • Mutex:纯互斥锁及携带数据的互斥锁,推荐用法与注意事项。
  • RWLock:协程友好的读写锁,支持封装对象的读写访问。
  • Semaphore:计数信号量、二值信号量与不限量信号量,API 与使用示例。
  • Channelsender<T>/receiver<T>send/recv/try_recv/stop 语义与注意事项。

典型组合用法

  • 使用 Channel 进行任务间消息传递;在消费者端如果需要批处理或节流,可结合计时器 intervalsleep 实现节奏控制。
  • 使用 Semaphore 表达“完成/可用”事件;多个生产者可安全地 release(n) 增加配额,多个消费者以协程方式 co_await acquire()

更多协程与运行时的信息,请先阅读《future<T>下的异步编程》。

Mutex(协程互斥锁)

asco::sync::mutex<>asco::sync::mutex<T> 提供协程友好的互斥保护能力。它们以 future 形式返回守卫对象,利用 RAII 在离开作用域后自动释放锁,避免线程级别阻塞。

  • 头文件:#include <asco/sync/mutex.h>
  • 命名空间:在全局命名空间下别名为 asco::mutex

类型概览

  • mutex<>:纯互斥锁,不携带值,仅提供排他访问。
  • template<typename T> mutex<T>:在互斥锁内部封装一个 T 实例,锁定后可通过守卫直接访问 T

两种互斥锁的 lock() 都返回 future<guard>,需使用 co_await 获取守卫。

接口

mutex<>

  • future<guard> lock():等待获得互斥锁。协程在竞争失败时会自旋退避,必要时挂起在内部 wait_queue 上。
  • class guard
    • guard::operator bool() const noexcept:指示守卫是否仍持有锁,被移动后的守卫返回 false
    • 析构时自动释放锁并唤醒等待者。

mutex<T>

  • future<guard> lock():同上,但守卫允许直接操作封装的 T
  • class guard
    • T &operator*() / T *operator->():访问被保护的对象。
    • 移动构造后,原守卫失效,新守卫继续持有锁。
  • T &&get():将内部对象移动出互斥锁,并在后续 lock() 时触发 panic,适用于需要一次性夺取所有权的场景。

推荐用法:with

提供 with 宏(在 asco/utils/defines.h 中定义为 if):

#include <asco/sync/mutex.h>
#include <asco/utils/defines.h>
using namespace asco;

sync::mutex<> mtx;

future<void> do_work() {
    with (auto guard = co_await mtx.lock()) {
        // 成功获取锁后执行,作用域结束自动解锁
    }
    co_return;
}

with 在语义上等同于 if (auto guard = ...; guard) { ... },保证代码结构清晰且不会遗漏解锁。

示例

1. 保护共享计数器

#include <asco/future.h>
#include <asco/sync/mutex.h>
#include <asco/utils/defines.h>
#include <atomic>
using namespace asco;

sync::mutex<> counter_mutex;
int counter = 0;
std::atomic<int> active{0};
std::atomic<int> violations{0};

future_spawn<void> worker() {
    with (auto guard = co_await counter_mutex.lock()) {
        auto prev = active.fetch_add(1, std::memory_order_acq_rel);
        if (prev != 0) {
            violations.fetch_add(1, std::memory_order_acq_rel);
        }
        ++counter;
        active.fetch_sub(1, std::memory_order_acq_rel);
    }
    co_return;
}

2. 保护对象并原地修改

#include <asco/future.h>
#include <asco/sync/mutex.h>
#include <asco/utils/defines.h>
#include <string>
using namespace asco;

sync::mutex<std::string> name{"guest"};

future<void> rename(std::string new_name) {
    with (auto guard = co_await name.lock()) {
        *guard = std::move(new_name);
    }
    co_return;
}

future<std::string> snapshot() {
    with (auto guard = co_await name.lock()) {
        co_return *guard;  // 复制守卫中的字符串
    }
}

3. 彻底移交对象所有权

#include <asco/future.h>
#include <asco/sync/mutex.h>
using namespace asco;

sync::mutex<std::vector<int>> data;

future<std::vector<int>> take_all() {
    with (auto guard = co_await data.lock()) {
        co_return data.get();  // 移动出内部向量
    }
}

future<void> reuse() {
    co_await take_all();
    // 再次锁定将触发 panic,提醒调用者对象已被移出
    // co_await take_all(); // panic
    co_return;
}

注意事项

  • 守卫是可移动但不可复制的;请勿在移动后继续使用旧守卫。
  • 如果 mutex<T>::get() 被调用,内部会标记对象已被移动;继续 lock() 会触发 panic,用于显式暴露错误路径。
  • 互斥锁内部使用自旋退避与 wait_queue 协程挂起组合:在低争用场景中开销仅为原子操作,在高争用场景能避免忙等。
  • mutex<> 只保障互斥,不提供递归锁语义;请避免在持锁期间再次调用 lock()

RWLock(协程读写锁)

asco::sync::rwlock<>asco::sync::rwlock<T> 提供读多写独的协程同步原语。读操作可以并行获取共享锁,写操作以独占方式访问,被等待的协程会自动通过内部 wait_queue 挂起,不会阻塞线程。

  • 头文件:#include <asco/sync/rwlock.h>
  • 命名空间:在全局命名空间下别名为 asco::rwlock

类型概览

  • rwlock<>:纯读写锁,不附带存储,仅管理并发访问。
  • template<typename T> rwlock<T>:内部封装一个 T 实例,读锁以常量视图访问,写锁以可变引用操作对象。

两种读写锁都分别提供 read()write(),返回 future<read_guard>future<write_guard>,需配合 co_await 使用。

接口

rwlock<>

  • future<read_guard> read():获取共享读锁。写者占用或排队时会自旋退避,随后挂起到读者等待队列 rq
  • future<write_guard> write():获取独占写锁。尝试设置写意图位,等待所有读者释放并挂起到写者等待队列 wq
  • class read_guard
    • operator bool() const noexcept:指示守卫是否仍有效;移动后原守卫失效。
    • 析构时减少读者计数,当最后一个读者离开时唤醒写者。
  • class write_guard
    • operator bool() const noexcept:移动后原守卫失效。
    • 析构时清除写标志,并唤醒排队写者与阻塞读者。

rwlock<T>

  • future<read_guard> read():取得常量读守卫,可直接访问 const T
  • future<write_guard> write():取得可写守卫,支持修改内部对象。
  • T &&get():在确认没有其他持有者时将内部值移动出来,后续任何 read() / write() 都会触发 panic,适合一次性转移所有权的场景。
  • class read_guard
    • const T &operator*() const / const T *operator->() const:常量访问封装对象。
  • class write_guard
    • T &operator*() / T *operator->():可变访问封装对象。

行为特性

  • 读写公平:写协程在进入等待队列时会设置写意图,后续读者会挂起到 rq 等待写者释放,避免写者无限饥饿。
  • 协程友好:所有等待通过 co_await 表达,不会阻塞线程。内部结合指数退避与 wait_queue,在低争用时保持轻量,在高争用时自动挂起。
  • 守卫语义:守卫可移动但不可复制;移动后原对象变为无效状态(operator bool() 返回 false)。

示例

1. 在多个协程间共享配置快照

#include <asco/future.h>
#include <asco/sync/rwlock.h>
#include <asco/utils/defines.h>
#include <string>
using namespace asco;

rwlock<std::string> config{"v1"};

future<void> update_config(std::string next) {
    with (auto guard = co_await config.write()) {
        *guard = std::move(next);
    }
    co_return;
}

future<std::string> read_config() {
    with (auto guard = co_await config.read()) {
        co_return *guard;  // 复制快照
    }
}

2. 读者并发、写者独占

#include <asco/future.h>
#include <asco/invoke.h>
#include <asco/sync/rwlock.h>
#include <asco/utils/defines.h>
#include <vector>
using namespace asco;

rwlock<> resource_lock;

future_spawn<void> reader_task() {
    with (auto guard = co_await resource_lock.read()) {
        // 多个读者可以同时进入
    }
    co_return;
}

future_spawn<void> writer_task() {
    with (auto guard = co_await resource_lock.write()) {
        // 唯一写者,读者与其他写者都会等待
    }
    co_return;
}

注意事项

  • 写守卫析构后会先唤醒其它写者,再唤醒读者;若需要严格保证写者优先,可在业务层面自行排队调度。
  • rwlock<T>::get() 会永久标记内部对象已被移走,后续任意访问都会触发 panic,用于显式暴露误用。
  • rwlock<> 不提供递归锁语义;请避免在持有写锁期间再次请求读锁或写锁。
  • 推荐结合 with 宏使用,避免手动检查守卫有效性并确保作用域结束即释放锁。

Semaphore(信号量)

asco::binary_semaphoreasco::counting_semaphore<N>asco::unlimited_semaphore 提供了计数型同步原语,可用于限流、互斥(简化版)与事件通知。

  • 头文件:#include <asco/sync/semaphore.h>
  • 命名空间:asco(类型别名定义在全局 asco 命名空间下)

类型与别名

  • binary_semaphore:上限为 1 的二值信号量。
  • template<size_t N> counting_semaphore:上限为 N 的计数信号量。
  • unlimited_semaphore:上限为 size_t 的最大值,可视为“无上限”。

底层实现均基于 semaphore_base<CountMax>

接口

  • bool try_acquire() noexcept
    • 若计数大于 0,则原子地减 1 并返回 true;否则返回 false
  • future<void> acquire()
    • 若计数为 0,则当前协程挂起,直到被 release() 唤醒;恢复后原子地减 1。
  • future<bool> acquire_for(const duration_type auto &timeout)
    • 尝试在指定超时时间内获取许可,超时则返回 false
  • future<bool> acquire_until(const time_point_type auto &expire_time)
    • 尝试在指定时间点前获取许可,超时则返回 false
  • void release(size_t update = 1)
    • 将计数增加 min(update, CountMax - old_count),并唤醒相应数量的等待者。

特性:

  • acquire() 以协程方式挂起,不阻塞线程。
  • release() 支持一次性增加多个许可;对上限做饱和值裁剪。
  • 公平性:不保证严格公平,但能够唤醒同等数量等待者。

使用示例

1. 事件通知(先通知,后等待)

#include <asco/future.h>
#include <asco/sync/semaphore.h>
using namespace asco;

future<int> async_main() {
    binary_semaphore sem{0};

    // 先通知
    sem.release();

    // 后等待:不会永久挂起
    co_await sem.acquire();
    co_return 0;
}

2. 等待后再通知(跨任务)

#include <asco/future.h>
#include <asco/sync/semaphore.h>
#include <atomic>
using namespace asco;

future_spawn<void> worker(binary_semaphore &sem, std::atomic<bool> &done) {
    co_await sem.acquire();
    done.store(true, std::memory_order::release);
    co_return;
}

future<int> async_main() {
    binary_semaphore sem{0};
    std::atomic<bool> done{false};

    auto w = worker(sem, done);

    // 使等待中的任务恢复
    sem.release();
    co_await w;

    co_return done.load(std::memory_order::acquire) ? 0 : 1;
}

3. 高并发场景(多次 release / acquire)

#include <asco/future.h>
#include <asco/sync/semaphore.h>
#include <atomic>
using namespace asco;

future_spawn<void> consumer(counting_semaphore<1000> &sem, std::atomic<size_t> &cnt) {
    for (size_t i = 0; i < 1000; ++i) {
        co_await sem.acquire();
        cnt.fetch_add(1, std::memory_order::relaxed);
    }
    co_return;
}

future<int> async_main() {
    counting_semaphore<1000> sem{0};
    std::atomic<size_t> cnt{0};

    auto c = consumer(sem, cnt);

    for (size_t i = 0; i < 1000; ++i) sem.release();

    co_await c;
    co_return cnt.load(std::memory_order::relaxed) == 1000 ? 0 : 1;
}

注意事项

  • release()update 值会根据上限裁剪,避免溢出。

Notify(轻量通知器)

asco::notify 提供一个极轻量的事件通知原语,支持多个协程等待并由通知唤醒。它基于内部的 wait_queue,适合在不需要复杂状态同步、只需“唤醒即可”的场景中使用。

  • 头文件:#include <asco/sync/notify.h>
  • 命名空间:asco

接口概览

  • yield<> wait()
    • 以协程方式挂起当前任务,等待 notify_one()notify_all() 唤醒。
  • void notify_one()
    • 唤醒至多一个等待中的协程。若当前没有等待者,通知会被丢弃(不计数、不排队)。
  • void notify_all()
    • 唤醒所有等待中的协程,同样不会记录通知历史。

使用示例

1. 简单的等待-通知

#include <asco/future.h>
#include <asco/sync/notify.h>
using namespace asco;

future_spawn<void> worker(notify &n, std::atomic<bool> &flag) {
    co_await n.wait();
    flag.store(true, std::memory_order_release);
    co_return;
}

future<int> async_main() {
    notify n;
    std::atomic<bool> flag{false};

    auto w = worker(n, flag);

    // 执行若干工作…
    n.notify_one();

    co_await w;
    co_return flag.load(std::memory_order_acquire) ? 0 : 1;
}

2. 广播唤醒多个等待者

#include <asco/future.h>
#include <asco/sync/notify.h>
#include <vector>
using namespace asco;

future_spawn<void> waiter(notify &n, std::atomic<int> &counter) {
    co_await n.wait();
    counter.fetch_add(1, std::memory_order_acq_rel);
    co_return;
}

future<int> async_main() {
    notify n;
    std::atomic<int> counter{0};
    std::vector<future_spawn<void>> waiters;

    for (int i = 0; i < 3; ++i) {
        waiters.push_back(waiter(n, counter));
    }

    n.notify_all();
    for (auto &w : waiters) co_await w;

    co_return counter.load(std::memory_order_acquire) == 3 ? 0 : 1;
}

注意事项

  • notify 不会记录历史通知;在没有等待者时调用 notify_one() / notify_all() 会被直接丢弃。
  • 若需要具备通知计数或超时语义,可考虑使用 semaphore 或其他同步原语。

Channel(通道)

ASCO 的 Channel 为多生产者/多消费者(MPMC)数据通道:

  • 结合 unlimited_semaphore 完成“有数据即通知”的等待/唤醒。

  • 头文件:#include <asco/sync/channel.h>

  • 命名空间:

    • 类型:asco::sender<T>, asco::receiver<T>
    • 工厂:asco::channel<T>()

构造与类型

  • auto [tx, rx] = channel<T>();
    • 返回 sender<T>receiver<T> 的二元组,二者共享同一个内部信号量。
  • 也可传入自定义队列工厂:channel<T>(Creator),其中 Creator 需满足 queue::creator 概念。

接口语义

sender

  • future<std::optional<std::tuple<T, queue::push_fail>>> send(T value)
    • 调用方需 co_awaitfuture,以便在底层使用有界队列满载时自动等待配额。
    • 成功:返回 std::nullopt,并唤醒一个等待的接收者。
    • 失败:返回包含原值与失败原因的 std::tuple<T, queue::push_fail>
      • push_fail::closed:队列已关闭或停止。
      • push_fail::full:自定义队列已满且不可再写入。
  • void stop() noexcept
    • 标记底层队列为“发送端停止”。调用后不得再调用 send();否则底层会触发 panic(调试保护)。
  • bool is_stopped() const noexcept
    • 若发送端或接收端已停止,或未绑定到队列,返回 true

receiver

  • std::expected<T, pop_fail> try_recv()
    • 若无可用数据:返回 std::unexpected(pop_fail::non_object)
    • 若通道已完全关闭且无可读对象:可能返回 std::unexpected(pop_fail::closed)
    • 若成功:返回 T
  • future<std::optional<T>> recv()
    • 若当前无数据:协程方式挂起直至有“数据就绪”的通知;
    • 唤醒后尝试读取,若成功返回 T,若关闭导致无对象可读返回 std::nullopt。当底层队列为有界队列时,成功读取还会释放一个配额令发送端继续写入。
  • bool is_stopped() const noexcept
    • 当发送端或接收端停止,且当前帧已读尽时,返回 true

有序性与并发性

  • Channel 保持 FIFO 顺序。
  • 支持 MPMC:多个 sender/receiver 并发安全。

典型用法

1. 单生产者-单消费者

#include <asco/future.h>
#include <asco/sync/channel.h>
#include <print>
using namespace asco;

future<int> async_main() {
    auto [tx, rx] = channel<int>();

    // 发送
    for (int i = 0; i < 5; ++i) {
      if (auto r = co_await tx.send(i); r.has_value()) {
        auto &[value, reason] = *r;
        std::println("send failed: {} (reason = {})", value, static_cast<int>(reason));
            co_return 1;
        }
    }

    // 接收(等待式)
    for (int i = 0; i < 5; ++i) {
        auto v = co_await rx.recv();
        if (!v || *v != i) {
            std::println("recv mismatch: {}", v ? *v : -1);
            co_return 1;
        }
    }

    co_return 0;
}

2. 停止与排干

#include <asco/future.h>
#include <asco/sync/channel.h>
using namespace asco;

future<int> async_main() {
    auto [tx, rx] = channel<int>();

    (void)co_await tx.send(7);
    (void)co_await tx.send(8);

    // 停止发送端:之后不得再调用 send()
    tx.stop();

    // 排干剩余元素
    auto a = co_await rx.recv();
    auto b = co_await rx.recv();

    if (!a || !b || *a != 7 || *b != 8) co_return 1;
    if (!tx.is_stopped() || !rx.is_stopped()) co_return 1;

    // 现在应无更多数据
    auto t = rx.try_recv();
    if (t.has_value()) co_return 1;

    co_return 0;
}

3. 非阻塞读取与等待混合

#include <asco/future.h>
#include <asco/sync/channel.h>
#include <asco/time/interval.h>
#include <print>
using namespace asco;
using namespace std::chrono_literals;

future<int> async_main() {
    auto [tx, rx] = channel<int>();

    // 生产者:异步发送
    auto producer = [] (sender<int> tx) -> future_spawn<void> {
        for (int i = 0; i < 3; ++i) (void)co_await tx.send(i);
        tx.stop();
        co_return;
    }(tx);

    interval tick{100ms};
    while (!rx.is_stopped()) {
        // 先尝试非阻塞
        if (auto r = rx.try_recv(); r.has_value()) {
            std::println("got {}", *r);
            continue;
        }
        // 无数据则小憩一会儿(避免空转)
        co_await tick.tick();
    }

    co_await producer;
    co_return 0;
}

注意事项

  • 始终 co_await sender::send() 以正确处理带界队列的背压;忽略返回 future 可能导致任务提前退出或绕过容量控制。
  • sender::send() 返回值含 queue::push_fail,常见原因是通道已关闭(push_fail::closed);若自定义队列有容量限制,可额外关注 push_fail::full
  • 调用 sender::stop() 后不可再调用 send();这是通道关闭的显式信号,违背会触发 panic(调试保护)。
  • recv() 被唤醒后理论上应能读到元素;若底层已关闭并无对象可读将得到 std::nullopt
  • 使用 try_recv() 判空时,请正确处理 pop_fail::non_objectpop_fail::closed

进阶(Advance)导读

本章面向已经熟悉 ASCO 基本概念(例如 future、协程、以及同步原语)的读者,介绍更贴近运行时实现与系统集成的高级主题。目标是帮助你:

  • 理解框架内部的守护线程与运行时组件如何协作;
  • 学会扩展和定制守护/后台服务;
  • 了解不可恢复错误(panic)与断言(assert)的处理与集成方案;
  • 获取与生产环境相关的实践建议(生命周期、停止策略、日志与崩溃处理)。

当前本章包含的主题(将持续扩展):

  • Panic — ASCO 的 panic 机制,支持协程栈回溯与自定义回调,适用于不可恢复的致命错误处理。
  • 带有堆栈回溯的断言 (asco_assert) — 基于 panic 框架实现的断言宏,支持表达式定位与失败处理集成。
  • 守护线程基类 daemon — 一个用于实现后台服务/周期性任务的轻量基类,封装了线程生命周期、初始化同步与唤醒等待逻辑。

交互与实践建议:

  • 若你希望把守护线程与任务队列、channel 或 worker pool 集成,本章后续会给出示例;欢迎在仓库 Issue 中提出常见场景。
  • 本章不直接覆盖 future 或协程语义的基础内容;若需要可在“快速入门”或“future”章节补充链接。

Panic 模块(asco::panic/panic.h)

本文档介绍 ASCO 框架中的 panic 机制及其头文件 panic/panic.h 的接口与用法。

概述

Panic 用于处理不可恢复的致命错误,如断言失败、严重逻辑漏洞等。调用 panic 后,程序会立即终止,并可输出详细的错误信息与栈回溯,便于定位问题。

主要接口

1. panic(std::string msg) noexcept

  • 触发 panic,输出指定消息,终止程序。
  • [[noreturn]]:不会返回。
  • 推荐用于普通同步场景。

2. co_panic(std::string msg) noexcept

  • 协程环境下触发 panic,输出消息并终止。
  • 用法与 panic 类似。

3. 格式化接口

  • 支持 C++20 std::format 风格:
panic("错误码: {},原因: {}", code, reason);
co_panic("协程异常: {}", info);
  • 自动格式化参数并输出。

4. register_callback(std::function<void(cpptrace::stacktrace &, std::string_view)> cb)

  • 注册 panic 回调,可自定义 panic 行为(如日志落地、核心转储等)。
  • 回调参数:
    • cpptrace::stacktrace &:当前栈(包括异步调用链,当使用 co_panic 时)回溯信息。
    • std::string_view:panic 消息。
  • 适合集成自定义监控、调试工具。

使用示例

#include <asco/panic/panic.h>

void fatal_error() {
    asco::panic::panic("致命错误,无法恢复");
}

void format_example(int code) {
    asco::panic::panic("错误码: {}", code);
}

// 注册自定义回调
asco::panic::register_callback([](cpptrace::stacktrace &st, std::string_view msg) {
    // 可在此保存日志、输出栈(异步调用链)信息等
});

设计要点

  • panic 只用于绝不应继续执行的场景。
  • 格式化接口便于输出结构化错误信息。
  • 回调机制支持扩展和集成第三方工具。
  • 协程专用接口保证异步场景下的正确终止。

与断言的关系

  • asco_assert 断言失败时会调用 panic,统一致命错误处理路径。
  • panic 可作为所有“不可恢复”错误的终极出口。

带有 stacktrace 的动态断言 (asco_assert)

本文档介绍 ASCO 的断言宏 asco_assert 以及其底层实现、使用方式与最佳实践。

概述

asco_assert(expr, [hint]) 用于在运行期验证永远应该为真的条件。若条件失败,它会调用内部的 asco::assert_failed,并最终触发 panic::panic,以不可恢复的方式终止程序([[noreturn]])。

与普通错误处理不同,断言面向开发阶段的逻辑不变量

  • 发现程序员假设被破坏的最早时机。
  • 在失败点提供清晰的表达式字符串和可选提示信息。
  • 将控制权交给 panic 框架,统一异常终止路径(着色输出、栈展开/回溯等)。

接口说明

宏:asco_assert

#define asco_assert(expr, ...) \
    do {                       \
        if (!(expr)) {         \
            asco::assert_failed(#expr, ##__VA_ARGS__); \
        }                      \
    } while (0)

要点:

  1. expr 只会被求值一次(包在 if (!(expr)) 中),避免副作用重复执行。
  2. 失败时字符串化表达式:#expr,便于定位逻辑。
  3. 可选参数(提示 hint)通过 GNU 扩展 ##__VA_ARGS__ 消除空参数的逗号。
  4. 展开后调用对应该签名的 asco::assert_failed

函数:asco::assert_failed

[[noreturn]] void assert_failed(std::string_view expr);
[[noreturn]] void assert_failed(std::string_view expr, std::string_view hint);

实现(assert.cpp)内部直接委托给:

panic::co_panic("Assertion failed on {}", expr);
panic::co_panic("Assertion failed on {}: {}", expr, hint);

因此:

  • 所有断言失败统一走 panic 终止路径;
  • 输出格式固定,便于日志检索;
  • 由于采用 std::string_view,传入的 hint 应为字符串字面量或生命周期足够长的缓冲(避免悬垂)。

使用示例

基本用法

int idx = compute_index();
asco_assert(idx >= 0, "索引必须非负");

失败输出示例:

Assertion failed on idx >= 0: 索引必须非负

无提示信息

asco_assert(ptr != nullptr);

输出:

Assertion failed on ptr != nullptr

结合内部不变量

struct range { int l; int r; };
void normalize(range& rg) {
    asco_assert(rg.l <= rg.r, "range 左值不能大于右值");
    // ... normalize logic
}

避免副作用重复

虽然表达式只执行一次,仍建议保持无副作用:

// 不推荐:含副作用的断言表达式会降低可读性
asco_assert(vec.pop_back() > 0, "不应为空");

与 Panic 框架的集成

断言失败直接调用 panic::co_panic

  • 继承 panic 框架的彩色/结构化输出(若已实现)。
  • 可以统一在 co_panic 处理路径中做栈回溯、日志落地、核心转储等。
  • 所有断言失败为“致命”级别,不返回调用者。

这使断言只负责“检测 + 定位”,而终止策略(如是否生成 dump)完全由 panic 系统集中配置,降低分散的错误处理复杂度。

性能与开销

当前实现无条件启用

  • 每个断言在成功时的开销 ≈ 一次条件分支 + 未触发时的宏展开(极低)。
  • 失败路径较重(格式化 + panic 逻辑)。

在性能关键路径依旧可以使用断言,但需确保:

  1. 断言表达式本身是 O(1) 且无昂贵副作用;
  2. 不使用复杂的临时格式化(hint 仅接受一个 std::string_view,避免构造代价)。

最佳实践

  1. 只断言“绝不该失败”的内部不变量;可预期失败的输入应做显式校验与返回错误。

  2. 保持表达式短小、可读:逻辑复杂时先拆成局部变量再断言。

  3. hint 写明为何“不变量应成立”,而非简单重复表达式。例:"range 必须标准化后 l <= r"

  4. 不在断言中做资源释放等副作用动作,断言失败后不会继续执行。

  5. 若需要更丰富提示(多变量格式化),建议在调用前自行构造稳定的字符串缓冲再传入(目前接口只接受一个 std::string_view)。

与标准库 / 其他框架对比

项目行为自定义提示终止统一性
assert(expr) (C)条件失败 abort()需修改源码分散
std::assert (宏)同上需修改源码分散
asco_assert进入 panic 流程(可扩展统一终止)统一

常见误用与规避

误用风险说明推荐做法
在表达式中包含副作用语义不清将副作用前置,断言纯条件
用断言替代输入合法性校验用户可触发崩溃使用返回值/错误码/异常
传入临时构造的短生命周期缓冲悬垂引用使用字符串字面量或静态缓存

asco::core::daemon(守护线程基类)

本文档说明如何继承和使用 asco::core::daemon 类型来实现后台守护线程。

概述

asco::core::daemon 是一个轻量的守护线程基类,封装了线程生命周期、初始化同步、以及唤醒/睡眠的常用逻辑。典型用法是继承该类并覆盖 init()run_once(std::stop_token &)shutdown() 三个虚方法来实现特定行为。

核心特性:

  • 使用 std::jthread 管理后台线程,支持基于 std::stop_token 的干净停止请求。
  • 提供 awake() / sleep_until_awake*() 系列方法用于线程间唤醒与有时限的等待。
  • 构造/启动/初始化通过 start() + 返回的 RAII init_waiter 实现线程初始化同步,确保启动方能等待后台线程完成 init()
  • 析构函数会请求停止并等待后台线程安全退出。

公有 API(概要)

  • daemon(std::string name):构造器,name 在 Linux 上用于线程命名(可读性和调试)。
  • ~daemon():析构函数,会请求线程停止、唤醒线程(以解除阻塞),并 join() 线程。
  • void awake():释放内部信号量,唤醒调用 sleep_until_awake() 或其变体而阻塞的守护线程。

受保护的辅助接口(给子类使用):

  • init_waiter start():启动后台线程并返回一个 init_waiterinit_waiter 的析构函数会等待后台线程释放 init_sem,因此可通过 RAII 在启动代码中等待初始化完成。
  • sleep_until_awake() / sleep_until_awake_for(...) / sleep_until_awake_before(...):基于内部 std::binary_semaphore 的阻塞/有时限等待,用于在 run_once 中等待事件或超时。

虚方法(子类通常需要覆盖)

  • virtual bool init():线程刚启动时调用一次。默认返回 true。返回 false 表示初始化失败,线程将调用 shutdown() 并退出。
  • virtual bool run_once(std::stop_token &st):守护主循环中每次执行的工作。返回 true 表示继续循环,返回 false 或在 stop_requested() 为真时退出循环。默认实现会 sleep_until_awake() 并返回 true
  • virtual void shutdown():线程退出前的清理逻辑。

生命周期与典型启动模式

  1. 在子类构造函数或初始化函数中调用 start()(受保护,因此通常在子类内部调用),它会创建后台 jthread 并立即返回一个 init_waiter
  2. init_waiter 离开作用域时,其析构函数会阻塞直到后台线程完成 init() 并释放内部信号量,这样启动方就可以等待初始化完成。
  3. 后台线程在循环中调用 run_once(st),直到 stop_requested() 被置位或 run_once 返回 false
  4. 析构函数会发起 request_stop(),并调用 awake() 以解除阻塞,然后 join() 线程,保证线程安全退出。

示例(最小子类实现):

class my_daemon : public asco::core::daemon {
public:
    my_daemon() : daemon("my_daemon") {
        auto waiter = start(); // 启动线程并在 waiter 析构时等待 init 完成
    }

protected:
    bool init() override {
        // 初始化资源
        return true; // 返回 false 可终止线程
    }

    bool run_once(std::stop_token &st) override {
        // 等待唤醒或定时任务
        sleep_until_awake_for(std::chrono::milliseconds(500));

        if (st.stop_requested()) return false;

        // 执行一次工作
        do_work();
        return true; // 继续循环
    }

    void shutdown() override {
        // 清理资源
    }
};

在外部触发工作或唤醒守护线程:

my_daemon d;
// 触发守护线程做一次立即处理
d.awake();

设计细节与注意事项

  • start()protected 的:意在由子类控制何时启动线程(例如在子类构造流程中)。
  • init_waiter 的析构会 acquire 内部信号量,确保调用方等待完成;不要将其返回到会较晚析构的上下文,否则可能阻塞过久。
  • run_once 接受 std::stop_token &:在实现中应检查 st.stop_requested(),并在请求停止时尽快返回 false
  • 析构过程中 daemon 会调用 awake() 以确保如果线程正阻塞在 sleep_until_awake() 上,则能被唤醒并退出。
  • Linux 平台会把后台线程命名为构造时提供的 name(通过 pthread_setname_np),便于调试与崩溃分析。

推荐实践

  1. 将耗时或阻塞的 I/O 放在 run_once 中,并确保响应 stop_requested()
  2. 避免在 init() 中执行可能长时间阻塞的操作(或在 init() 内使用超时机制),因为调用 start() 的上下文会等待 init_waiter 完成。
  3. awake() 语义是“通知有新工作”,而不是强制中断正在执行的工作;若需要立即中断复杂任务,结合 stop_token 使用。
  4. 若需要周期性工作,结合 sleep_until_awake_for()sleep_until_awake_before() 实现合理的等待策略。

常见问题

  • Q: 我可以在 run_once 中抛异常吗?

    • A: 当前基类未显式捕获异常,抛出异常会导致线程异常终止。建议在 run_once 中自行捕获并在 shutdown() 中做清理,或通过 panic 报告致命错误。
  • Q: start() 返回的 init_waiter 需要手动保存吗?

    • A: 不需要长时间保存。通常以局部变量持有,确保在期望等待初始化完成的作用域结束时析构即可。

Context(协程上下文)

asco::context(底层定义在 asco::contexts::context)提供了一个轻量的协程取消原语,可用于在多个协程之间传播“停止/超时”信号。它与 notify 结合使用,支持显式取消与超时自动取消。

  • 头文件:#include <asco/context.h>
  • 命名空间:asco(通过别名导出)

创建方式

  • static std::shared_ptr<context> with_cancel()
    • 创建一个手动可取消的上下文,初始状态为 未取消
  • static std::shared_ptr<context> with_timeout(const duration_type auto &dur)
    • 创建一个上下文并在 dur 后自动取消。内部会启动一个协程调用 cancel()

两个工厂函数都返回 std::shared_ptr<context>。使用共享指针便于在多个协程中传播同一取消源。

取消与状态查询

  • future<void> cancel()
    • 将上下文标记为已取消,并唤醒所有等待该上下文的协程,然后执行取消回调。该协程可以被并发调用;每次调用都会执行已注册的回调。
  • future<void> set_cancel_callback(std::function<void()>)
    • 注册一个在取消发生时调用的回调。若上下文已取消,请先重新检查状态;回调仅在随后执行的 cancel() 协程中触发。
    • 回调实现必须是可重入的,并且要做好幂等处理,以便在并发取消时安全地再次触发取消或操作上下文。
  • bool is_cancelled() const noexcept
    • 查询当前是否处于已取消状态。

取消操作是幂等的;重复调用 cancel() 不会带来额外副作用。

等待取消

context 定义了成员 operator co_await(),同时也为 std::shared_ptr<context> 提供了自由函数 operator co_await()。因此既可以在上下文对象本身上 co_await ctx_ref;,也可以直接 co_await ctx_ptr; 来等待取消事件:

  • 若上下文尚未取消,当前协程将挂起,直到 cancel() 或超时触发。
  • 若上下文已取消,则立即恢复,且 co_await 不返回任何额外数据:它仅表示“取消已经发生”。

示例:

#include <asco/context.h>
#include <asco/future.h>
#include <asco/time/sleep.h>
using namespace asco;

future_spawn<void> worker(context &ctx_ref, std::atomic<bool> &flag) {
    co_await ctx_ref;            // 等待取消信号
    flag.store(true, std::memory_order_release);
    co_return;
}

future<int> async_main() {
    auto ctx = context::with_cancel();
    std::atomic<bool> flag{false};

    auto w = worker(*ctx, flag);  // 也可以 co_await ctx(shared_ptr)

    // 进行其他操作…
    co_await sleep_for(10ms);
    co_await ctx->cancel();      // 通知所有等待方并等待回调执行

    co_await w;
    return flag.load(std::memory_order_acquire) ? 0 : 1;
}

与超时结合

with_timeout() 会在后台调用 sleep_for() 后自动取消:

future<int> async_main() {
    auto ctx = context::with_timeout(50ms);

    co_await ctx;  // 最多等待 50ms

    // 此时 ctx 已经处于取消状态
    if (!ctx->is_cancelled()) {
        co_return 1;
    }
    co_return 0;
}

注意事项

  • context 仅负责取消信号的传播,不携带附加信息。若需要携带错误码或取消原因,请在业务代码中自行维护。
  • 上下文内部使用 notify 唤醒等待者;在没有协程等待时调用 cancel() 也会正确记录状态,随后等待者会立即返回。
  • 取消回调必须是可重入的,且需要自行保证并发调用时的幂等性。

Join Set 聚合器

asco:::join_set<T = void> 提供一个轻量的“任务收集器”,用于在多个异步计算完成后,以生成器形式逐个取回结果。它内部使用 channel<T> 管道按完成顺序输送值,适合将若干协程并发运行、再统一遍历其结果。

  • 头文件:#include <asco/join_set.h>
  • 依赖组件:futuregeneratorchannel

基础用法

典型流程:

  1. 创建 join_set<T> 实例。
  2. 调用 spawn(fn) 注册若干异步任务。fn 必须结果类型统一为 T
  3. 调用 join() 得到 generator<T>,随后通过 co_await 按完成顺序消费结果。

示例:

#include <asco/future.h>
#include <asco/join_set.h>
#include <asco/time/sleep.h>
using namespace asco;

future<int> async_main() {
    base::join_set<int> set;

    for (int i = 0; i < 3; ++i) {
        set.spawn([i]() -> future<int> {
               co_await sleep_for(std::chrono::milliseconds{10 * (i + 1)});
               co_return i;
           })
            .ignore();
    }

    auto results = set.join();
    while (auto value = co_await results()) {
        std::println("result = {}", *value);
    }
    co_return 0;
}

接口概览

  • explicit join_set(Creator ctor):允许自定义底层 channel 的创建器;默认使用 continuous_queue
  • future_spawn<void> spawn(Fn &&fn):启动一个异步任务,将其结果推入内部 channelFn 必须返回 future<T>future_spawn<T>,否则编译失败。
  • generator<T> join():停止接受新任务(内部关闭 sender),并返回一个生成器。生成器被迭代完毕后会自动退出。

行为与保证

  • 结果顺序等于任务实际完成顺序;如果多个任务同时完成,顺序取决于 channel 的排队次序。
  • 任务抛出异常时会沿 future 传播;可以在调用处使用 .ignore(on_exception) 或额外捕获。

使用建议

  • 若任务很多,建议结合 std::for_each / ranges 批量调用 spawn,并尽早 .ignore() 避免遗漏。
  • 如果需要自定义任务队列特性(例如环形缓冲区),可以提供自定义 Creator 参数构造 join_set

Select 选择器

select 用于“竞速”多个异步分支:同时启动(或等待)若干个可取消分支,返回第一个完成的分支结果,并触发其余分支尽快结束。

典型用途:

  • 同时等待多个异步操作,谁先完成就取谁;
  • 把“超时 / 取消信号 / 数据到达”等事件与主任务一起竞速;
  • 用一个统一的 std::variant 承载“本次到底是哪条分支赢了”。

基本用法

select 必须在 worker 线程(也就是协程运行时)里构造。

#include <asco/future.h>
#include <asco/select.h>

using namespace asco;

future<int> async_main() {
    auto sele = asco::select{}
        .along_with([](std::shared_ptr<context>) -> future<int> { co_return 42; })
        .along_with([](std::shared_ptr<context>) -> future_spawn<float> { co_return 3.14f; });

    auto v = co_await sele;

    std::visit(
        [](auto&& br) {
            using B = std::decay_t<decltype(br)>;
            if constexpr (B::branch_index == 0) {
                int x = *br;
                (void)x;
            } else if constexpr (B::branch_index == 1) {
                float y = *br;
                (void)y;
            }
        },
        v);

    co_return 0;
}

分支类型(along_with 重载)

select 支持两类分支:

1) cancellable_function 分支(会注入 select 内部 ctx)

形如:

Fn(std::shared_ptr<context> ctx, Args...)
  • ctxselect 内部创建并注入;当某个分支胜出后,select 会取消这个内部 ctx,用于通知其它分支尽快退出。
  • Fn 可以返回 future<T>future_spawn<T>(以及其它满足框架约束的 future 类型)。

2) cancellable_waitable 分支(等待一个“外部对象”完成)

形如:

sele.along_with(waitable);

其中 waitable 满足:

  • waitable->operator co_await() 返回 yield<notify*>(即它本质是个“可被 notify 唤醒的等待点”);
  • waitable->get_notify() 返回 notify&
  • deliver_type 必须是 void

最常见的例子是 std::shared_ptr<context>co_await ctx; 会等待它被取消或被内部 notify 对象唤醒。

重要:把 std::shared_ptr<context> 作为 waitable 传给 .along_with(ctx) 时,它是“一个等待分支”。

branch_index 规则(索引语义)

每次调用一次 .along_with(...),都会在 select 中追加一个分支,并赋予该分支一个稳定的索引:

  • 第一个追加的分支索引为 0
  • 第二个为 1
  • 以此类推

在返回值的 std::variant 中,可以通过 T::branch_index 判断是哪条分支胜出。

返回值类型

co_await select 的返回值是一个 std::variant:每个备选项都是 branch<I, T>

  • I 是分支索引(见上节)。
  • T 是该分支的返回类型;若分支返回 void,则会被替换为 std::monostate
  • branch 提供 operator* / operator->,用于访问返回值。

直观理解:如果你按顺序追加了 N 个分支,它的返回类型等价于:

future<std::variant<
    branch<0, monostate_if_void<T0>>,
    branch<1, monostate_if_void<T1>>,
    ...,
    branch<N-1, monostate_if_void<TN_1>>
>>

其中 Tk 是第 k 个分支的 deliver type。

取消与唤醒语义

当某个分支胜出后,select 会做两件事来让其它分支尽快结束:

  1. 取消“内部 ctx”

    • 这只影响 cancellable_function 分支中收到的那个 ctx
    • 你的分支可以 co_await ctx; 或检查 ctx->is_cancelled() 来退出。
  2. 对所有 waitable 分支的 notify 调用 notify_all()

    • 这会把 waitable 从等待中唤醒,让它们有机会结束。

注意:对外部 waitable(例如你传入的 std::shared_ptr<context>)来说,notify_all() 只是“唤醒等待”,并不会调用 cancel() 改变其 cancelled 状态。

异常语义

  • 如果胜出的分支抛出异常select 会在取消/唤醒其它分支后,把异常重新抛出给 co_await select 的调用方。