Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

future<T> 与异步函数

ASCO 将“异步函数调用”和“异步任务”明确区分为两类对象:future<T>join_handle<T>

  • future<T>:异步函数的返回类型,表示一次异步调用的结果;co_await 该对象会执行对应协程并得到 T
  • join_handle<T>:由运行时调度的任务句柄;可 co_await 以等待任务完成,或 detach() 以放弃 join。

1. future<T>:异步函数调用的惰性返回对象

返回 future<T> 的可调用对象满足 asco::async_function 概念,可视为 ASCO 语义下的异步函数。

#include <asco/future.h>
using namespace asco;

future<int> add_one(int x) {
    co_return x + 1;
}

1.1 惰性执行:调用不执行,co_await 才执行

future<T> 是惰性对象。因此:

  • 调用 add_one(41) 仅构造 future<int>(协程句柄处于初始挂起态)。
  • 对该对象执行 co_await 时,协程才开始运行。
future<int> example() {
    auto f = add_one(41);  // 仅构造 future
    int v = co_await f;    // 协程在此处开始执行
    co_return v;
}

1.2 等待语义:内联执行(非并发)

co_await future<T> 会在当前执行流中推进被等待的协程执行,并得到 T。它表达的是“等待一次异步调用的结果”,而不是“并发启动一个任务”。

推论:

  • future 不构成并发;连续 co_await 表示顺序执行。
  • 若协程内部不主动让出(例如 co_await this_task::yield() 或等待某些会挂起的同步原语),可能导致其它任务长时间得不到运行机会。
  • 异常会在 co_await 处重新抛出。

1.3 运行时约束:必须在 runtime 内等待

future<T> 必须在 runtime 上下文中 co_await

  • 通过 core::runtime::block_on(...) 进入运行时;或
  • spawn(...) 启动的任务内部等待。

在运行时之外直接 co_await future 会触发断言失败或 panic

1.4 [[nodiscard]]:避免未执行导致的资源泄露

future<T> 标记为 [[nodiscard]]。若创建 future 但从不 co_await,该异步调用不会执行完成,通常会导致资源泄露或逻辑缺失。


2. join_handle<T>:运行时任务句柄

当需要并发执行异步逻辑时,使用 spawn(...) 将异步函数提交给 runtime,并获得 join_handle<T>

2.1 spawn:将异步函数提交为任务

core::runtime::spawn(async_function) 接收一个返回 future<T> 的可调用对象,启动一个并发任务并返回 join_handle<T>

#include <asco/core/runtime.h>
#include <asco/future.h>
#include <asco/yield.h>

using namespace asco;

future<int> work() {
    co_await this_task::yield();
    co_return 42;
}

future<void> parent() {
    auto h = spawn([]() -> future<int> { co_return co_await work(); });
    int v = co_await h;  // join
    (void)v;
    co_return;
}

2.2 co_await join_handle:挂起当前协程,等待任务完成

join_handle<T> 的等待模型与 future 不同:co_await join_handle<T> 会挂起当前协程,直到该任务完成。

  • 正常完成:返回 T(或 void)。
  • 任务抛异常:在 co_await 处重新抛出。

2.3 detach():放弃 join

detach() 表示调用方不再等待结果,任务将独立执行直至完成/失败。

2.4 spawn_blocking:启动“blocking 环境”的任务

当你需要在 runtime 内部执行少量同步逻辑(例如调用一个必须同步等待的第三方 API)时,可以使用 spawn_blocking(fn) 提交一个同步函数作为任务执行。

语义要点:

  • spawn_blocking(fn) 不会把工作自动转移到“后台线程池”;它仍然运行在 ASCO runtime 的 worker 线程上。
  • 该任务会被标记为“blocking 环境”(asco::this_task::is_blocking_env() == true),因此在该任务内部允许调用一些同步阻塞接口(例如 runtime::block_on(...)blocking_lock()blocking_acquire())。
  • 由于它会阻塞 worker 线程,应避免长时间阻塞操作;若 runtime 线程数过少(尤其是单线程 runtime),在 blocking 任务中再调用 block_on() 这类同步等待接口可能导致死锁。

3. join_set<T>:批量管理并发任务

join_set<T> 用于管理一组任务:

  • spawn(...):提交任务到 set。
  • co_await set:按完成顺序收集结果。
  • join_all():停止收集并返回已到达的结果。

更完整的语义与注意事项见:join_set<T>:批量任务收集

示例(非 void 输出):

#include <asco/core/runtime.h>
#include <asco/join_set.h>
#include <asco/yield.h>

using namespace asco;

future<int> job(int x) {
    co_await this_task::yield();
    co_return x * 2;
}

int main() {
    core::runtime rt;
    return rt.block_on([&]() -> future<int> {
        join_set<int> set{rt};
        for (int i = 0; i < 10; ++i) {
            set.spawn([i]() -> future<int> { co_return co_await job(i); });
        }

        int sum = 0;
        while (auto v = co_await set) {
            sum += *v;
        }

        co_await set.join_all();
        co_return sum;
    });
}

4. co_invoke:延长临时可调用对象的生命周期

co_invoke 的目的不是简化书写,而是保证“临时可调用对象 + 协程执行”的组合是内存安全的。

4.1 风险来源:临时可调用对象生命周期不足

如果把一个临时可调用对象(rvalue,例如临时协程 lambda)用于创建 future/join_handle,而该任务又会在之后继续执行,那么该临时对象在表达式结束后就会被销毁。

当任务后续执行仍需要访问该可调用对象的捕获状态时,就会触发未定义行为(常见表现为偶发崩溃或数据损坏)。

4.2 典型反例:临时协程 lambda

future<int> bad() {
    auto f = ([p = std::make_unique<int>(123)]() -> future<int> {
        co_await this_task::yield();
        co_return *p;
    })();

    // 临时 lambda 已销毁;协程恢复后访问 p 可能 UB。
    co_return co_await f;
}

该问题通常呈现为偶发崩溃、错误读写或内存破坏,且与根因位置不一致。

4.3 co_invoke 的机制(对应 asco/invoke.h

co_invoke 的语义是:当你用“临时可调用对象”(尤其是临时协程 lambda)来创建 future/join_handle 时,框架会确保该临时对象的生命周期足够长,从而避免捕获状态悬垂。

4.4 不使用 co_invoke 的后果

若在框架/工具函数中对 Fn&& 进行天真转发并直接调用:

template<class Fn>
auto naive_invoke(Fn&& fn) {
    return std::invoke(std::forward<Fn>(fn));
}

则对临时协程可调用对象的调用可能在后续执行中访问已销毁的捕获状态,从而触发未定义行为。该风险通常不会被编译器诊断。

4.5 适用范围

  • 框架/库作者:只要需要“接受 Fn&& 并调用”,且允许传入临时可调用对象,应使用 co_invoke 或等价的“将可调用对象绑定到任务以延长生命周期”的方案。
  • 一般使用者:通常无需直接调用 co_invokespawn(...)join_set::spawn(...) 等常用入口已经覆盖了该类生命周期风险。

5. this_task::yield():协作式调度点

co_await this_task::yield() 提供一个协作式调度点:让出当前执行权,使其它可运行任务有机会执行。它常用于:

  • 避免忙等;
  • 提升公平性。

6. 典型入口:runtime::block_on(async_main)

常见工程结构为:

  • 用户实现 future<int> async_main()
  • main() 创建 core::runtime 并调用 block_on(async_main)

补充说明:

  • block_on(...) 是同步阻塞接口,要求当前处于“blocking 环境”(asco::this_task::is_blocking_env() == true)。
  • 因此它最常见的用法是在 runtime 之外(例如 main())作为入口;在 runtime 内也只有在 spawn_blocking(...) 任务里才允许使用。

链接目标 asco::main 已提供默认 main();使用者仅需定义 async_main()


7. 选型摘要

  • 在协程中等待一次异步调用:co_await future<T>
  • 提交并发任务并等待其完成:spawn(...) -> join_handle<T>,随后 co_await
  • 管理多个并发任务:join_set<T>

join_set<T>:按完成顺序收集并发任务

join_set<T> 用于批量启动任务,并按“任务完成顺序”持续产出结果。

它适合:

  • 你要同时跑很多个任务;
  • 想边完成边消费结果(streaming),而不是固定按提交顺序等待。

对应头文件:asco/join_set.h


1. 快速上手

#include <asco/core/runtime.h>
#include <asco/join_set.h>
#include <asco/yield.h>

using namespace asco;

future<int> job(int x) {
    co_await this_task::yield();
    co_return x * 2;
}

int main() {
    core::runtime rt;
    return rt.block_on([&]() -> future<int> {
        join_set<int> set{rt};

        for (int i = 0; i < 10; ++i) {
            set.spawn([i]() -> future<int> { co_return co_await job(i); });
        }

        int sum = 0;
        while (auto v = co_await set) {
            // v 的到达顺序 = 完成顺序(不保证与 i 相同)
            sum += *v;
        }

        co_return sum;
    });
}

要点:

  • set.spawn(...) 提交任务;join_set 不提供单个任务的 join_handle
  • co_await set 每次取回一个结果;当 set 中已提交任务全部产出结果后,返回 std::nullopt

2. API 语义

2.1 构造

  • join_set():使用 core::runtime::current()
  • join_set(core::runtime&):显式绑定 runtime

2.2 spawn(async_fn):提交异步任务

void spawn(async_function<> auto &&fn);

语义:

  • fn() 必须返回 future<T>(即“异步函数”)。
  • spawn(...) 会启动一个并发任务。
  • 当该任务完成并产生结果后,结果会出现在 co_await set 的返回序列中(按完成顺序)。

注意:

  • join_set 不暴露单个任务的 join_handle,因此无法对单个任务进行 cancel() 或单独 co_await
  • 传入的任务应当在完成路径上总能产出一个结果;如果任务提前退出且没有产出结果,join_set 将永远等不到该条结果。

2.3 co_await set:取回一个结果

future<std::optional<T>> operator co_await();

T 为非 void 类型时:

  • 每次 co_await set 返回一个 std::optional<T>
    • has_value() == true:拿到一个结果
    • std::nullopt:当前 join_set 中已提交任务的结果都已收齐

Tvoid 时:

  • 每次 co_await set 返回一个 bool
    • true:拿到一个结果
    • false:当前 join_set 中已提交任务的结果都已收齐

2.4 join_all():停止产生新任务并收集已产生的结果

join_all() 会停止新的 spawn,并返回一个 std::vector<T>(包含已收集到的结果)。


3. 常见问题与建议

3.1 任务异常如何处理?

join_set 不会把“任务失败”作为结果返回给你。为了保证 join_set 能持续产出结果:

  • 不要让异常从任务中逃逸;
  • 推荐把错误编码进返回值(例如 std::expected<T, E>),并始终返回一个值。

工程建议:

  • 让任务返回 std::expected<T, E> 或类似结果类型,把错误显式作为值传回;
  • 或在任务内部 try/catch,把错误信息转成 T 的某种错误表示。

3.2 spawn_blocking 并不把工作移到“后台线程”

spawn_blocking(...) 不会把工作自动转移到其它线程。

因此:

  • 避免在其中执行长时间阻塞操作(例如长 IO、长时间 sleep)。

3.3 什么时候该用 join_handle 而不是 join_set

  • 需要对某个任务单独 cancel() / detach() / co_await:用 spawn(...) -> join_handle<T>
  • 需要批量启动、按完成顺序消费结果:用 join_set<T>

同步原语

本章介绍 ASCO 提供的同步原语。它们用于在多个并发执行流之间建立互斥与协调关系。

本章只描述行为与语义:你能做什么、它会怎样表现、以及常见用法。


目录


选型建议

何时使用 channel

channel 适合在并发执行流之间传递值,并通过等待来表达背压(缓冲满时发送等待,缓冲空时接收等待)。

何时使用 spinlock

spinlock 适合保护非常短的临界区:

  • 修改一个小的共享状态(例如计数器、指针、短容器操作);
  • 临界区内不进行可能长时间运行的操作。

注意:

  • 不要在持有锁期间跨越 co_await

何时使用 semaphore

semaphore 适合表达“许可”的语义:

  • 并发限流(最多允许同时进行 N 个工作);
  • 生产者/消费者式的资源计数(有资源才能继续)。

acquire() 是可等待操作:当许可不足时会等待直到有许可被释放。


常见约定

  • try_* 系列 API 表达“不等待的快速路径”;失败时走其它分支。
  • 需要等待时,优先用明确的同步原语(例如 semaphore::acquire()),避免忙等。

sync::channel<T>:通道

sync::channel<T> 用于在并发执行流之间传递值。

  • 它是有界的:当缓冲区满时,发送会等待。
  • 当缓冲区空时,接收会等待。
  • 通道被关闭后,发送会失败;接收在“通道已关闭且缓冲已空”时结束。

头文件:asco/sync/channel.h


1. 创建通道

#include <asco/sync/channel.h>

auto [tx, rx] = asco::sync::channel<int>();

语义:

  • channel<T>() 返回一对端点:sender<T>(发送端)与 receiver<T>(接收端)。
  • sender<T>/receiver<T> 可拷贝;拷贝后的对象与原对象共享同一个通道。

2. 发送:sender<T>::send(...)

2.1 Tvoid

#include <asco/sync/channel.h>
#include <asco/future.h>
#include <expected>

using namespace asco;

future<void> producer(sync::sender<int> tx) {
    auto r = co_await tx.send(42);
    if (!r) {
        // 发送失败:通道已关闭,42 没有被发送
        int unsent = std::move(r.error());
        (void)unsent;
    }
    co_return;
}

语义:

  • send(value) 返回 future<std::expected<std::monostate, T>>
  • 当通道可写时:把 value 发送到通道中,并返回“成功”。
  • 当缓冲区满时:等待直到通道可写或通道关闭。
  • 当通道已关闭且本次发送未发生时:返回“失败”,并在 error() 中返回未发送的 value

2.2 T == void

Tvoid 时,发送表示发送一个“事件”。

#include <asco/sync/channel.h>
#include <asco/future.h>

using namespace asco;

future<void> producer(sync::sender<void> tx) {
    bool ok = co_await tx.send();
    if (!ok) {
        // 通道已关闭,本次事件没有被发送
    }
    co_return;
}

语义:

  • send() 返回 future<bool>
  • 返回 true 表示发送成功;返回 false 表示通道已关闭且本次发送未发生。

3. 接收:receiver<T>::recv()

#include <asco/sync/channel.h>
#include <asco/future.h>
#include <optional>

using namespace asco;

future<void> consumer(sync::receiver<int> rx) {
    while (auto v = co_await rx.recv()) {
        int x = *v;
        (void)x;
    }
    // 循环结束:通道已关闭且缓冲已空
    co_return;
}

语义(Tvoid):

  • recv() 返回 future<std::optional<T>>
  • 当通道中存在值时:返回 T
  • 当通道暂时为空且未关闭时:等待直到有值可读或通道关闭。
  • 当通道已关闭且缓冲已空时:返回 std::nullopt

4. 关闭通道:stop()

发送端与接收端都提供 stop()

tx.stop();
rx.stop();

语义:

  • 调用 stop() 会关闭通道。
  • 关闭后:新的发送不会发生;接收在读完缓冲中的剩余值后结束(recv() 返回 std::nullopt)。

5. 使用建议

  • 当需要把数据从生产者传给消费者,并让双方通过等待来表达背压时,使用 channel<T>
  • 若你只需要“完成通知/事件”,可以用 channel<void> 表达事件流。
  • 当某一侧确定不再使用通道时,调用 stop() 让另一侧尽快结束等待并退出。

sync::mutex:互斥锁

sync::mutex 用于在多个并发执行流之间建立互斥:同一时刻最多只有一个执行流进入临界区。

它提供两种形态:

  • sync::mutex<>:只提供互斥,不绑定数据。
  • sync::mutex<T>:把一个值 T 与互斥锁绑定在一起,通过 guard 直接访问该值。

头文件:asco/sync/mutex.h


1. mutex<>(无数据)

1.1 lock():异步获取

#include <asco/sync/mutex.h>
#include <asco/future.h>

using namespace asco;

sync::mutex<> m;

future<void> f() {
    auto g = co_await m.lock();
    // 临界区
    co_return;
} // g 析构后自动解锁

语义:

  • lock() 返回 future<guard>;需要在 runtime 上下文中 co_await
  • 当锁空闲时:立即获取并返回 guard。
  • 当锁被占用时:等待直到锁被释放。
  • guard 不可拷贝、可移动;guard 析构时自动解锁。

1.2 try_lock():立即尝试获取

if (auto g = m.try_lock()) {
    // 获取成功
} else {
    // 获取失败(不等待)
}

语义:

  • 若锁空闲:返回有效 guard。
  • 若锁被占用:返回空 guard。

1.3 blocking_lock():同步阻塞获取

auto g = m.blocking_lock();

语义:

  • 同步阻塞直到获得锁,并返回 guard。
  • 仅允许在“blocking 环境”中调用;否则会触发 panic
    • runtime 之外:允许(此时会阻塞当前线程)。
    • runtime 之内:仅当当前任务由 spawn_blocking(...) 启动(即 asco::this_task::is_blocking_env() == true)时允许。

2. mutex<T>(保护一个值)

mutex<T> 将一个值 T 与互斥锁绑定。

#include <asco/sync/mutex.h>
#include <asco/future.h>

using namespace asco;

sync::mutex<int> counter{0};

future<void> inc() {
    auto g = co_await counter.lock();
    ++(*g);
    co_return;
}

语义:

  • lock() 返回 future<mutex<T>::guard>
  • 通过 *g / g-> 访问被保护的 T
  • 若 guard 为空(例如 try_lock() 失败返回的 guard),对其解引用会触发 panic

3. guard 的移动语义

  • guard 不可拷贝,但可以移动。
  • 被移动后的 guard 变为空(if (g) 为假)。

4. 使用建议

  • try_lock() 适合实现“不等待的快速路径”;失败时走其它分支。
  • 互斥锁的临界区尽量保持短小,避免把长时间运行的工作放在持锁期间。
  • 在普通异步任务中不要使用 blocking_lock();请使用 co_await lock()
  • 只有当你确实处于“blocking 环境”(例如同步代码、或 spawn_blocking 任务内)时,才考虑 blocking_lock()

sync::spinlock:自旋锁

sync::spinlock 提供线程间互斥:同一时刻最多只有一个执行流持有锁。

它适用于:

  • 临界区很短、竞争不激烈的场景。

头文件:asco/sync/spinlock.h


1. spinlock<>(无数据)

#include <asco/sync/spinlock.h>

asco::sync::spinlock<> lock;

{
    auto g = lock.lock();
    // 临界区
}
// g 析构后自动解锁

语义:

  • lock.lock() 会获取锁,并返回一个守卫对象(guard)。
  • guard 的生命周期内锁处于持有状态;guard 析构时解锁。

2. spinlock<T>(保护一个值)

spinlock<T> 用于把某个值 T 与一把锁绑定在一起。

#include <asco/sync/spinlock.h>
#include <vector>

asco::sync::spinlock<std::vector<int>> xs;

{
    auto g = xs.lock();
    g->push_back(1);
    g->push_back(2);
}

语义:

  • xs.lock() 返回一个 guard。
  • 通过 *g / g-> 访问被保护的 T

3. 使用建议

  • 只在临界区很短时使用自旋锁;临界区越长,对其它执行流的影响越大。
  • 不要在持有自旋锁期间执行可能长时间运行的操作(例如长循环、阻塞调用)。
  • 不要在持有自旋锁期间跨越 co_await;建议在 co_await 前释放锁,恢复后再重新获取。

sync::semaphore<N>:信号量

sync::semaphore<N> 表示一个“最多拥有 N 个许可(permit)”的计数信号量。

  • 当许可数大于 0 时,acquire()/try_acquire() 会消耗 1 个许可并成功返回。
  • 当许可数为 0 时,acquire() 会等待,直到有许可被释放。

常用别名:

  • sync::binary_semaphoresync::semaphore<1>
  • sync::unlimited_semaphore:许可上限非常大

头文件:asco/sync/semaphore.h


1. 构造与计数

sync::semaphore<3> sem{2};

语义:

  • 初始许可数为 min(N, count)
  • get_count() 返回当前许可数的一个瞬时值(可用于观测与调试;并发访问下不保证与后续操作之间的时序关系)。

2. 获取许可

2.1 try_acquire():非等待获取

bool ok = sem.try_acquire();

语义:

  • 若当前许可数大于 0:消耗 1 个许可并返回 true
  • 若当前许可数为 0:不等待,直接返回 false

2.2 acquire():等待获取

co_await sem.acquire();

语义:

  • 若当前许可数大于 0:消耗 1 个许可并返回。
  • 若当前许可数为 0:挂起等待,直到有其它执行流释放许可。
  • 支持任务取消。

acquire() 返回 future<void>,需要在 ASCO runtime 上下文中 co_await

2.3 blocking_acquire():同步等待获取

sem.blocking_acquire();

语义:

  • 同步地等待并获取 1 个许可。
  • 仅允许在“blocking 环境”中调用;否则会触发 panic
    • runtime 之外:允许(此时会阻塞当前线程)。
    • runtime 之内:仅当当前任务由 spawn_blocking(...) 启动(即 asco::this_task::is_blocking_env() == true)时允许。

3. 释放许可

3.1 release(n = 1)

std::size_t released = sem.release(5);

语义:

  • 向信号量增加最多 n 个许可。
  • 许可数不会超过上限 N
  • 返回值为“本次实际增加的许可数”。

当存在等待 acquire() 的执行流时,释放许可会让其中最多 released 个等待方继续执行;被唤醒的具体等待方不作保证。


4. 典型用法

4.1 binary_semaphore:一次只允许一个进入

#include <asco/sync/semaphore.h>
#include <asco/future.h>

using namespace asco;

future<void> f() {
    sync::binary_semaphore sem{1};
    co_await sem.acquire();
    // 临界区
    sem.release();
    co_return;
}

4.2 限流:最多并发 N 个任务

#include <asco/sync/semaphore.h>
#include <asco/core/runtime.h>
#include <asco/future.h>

using namespace asco;

future<void> limited(sync::semaphore<8> &sem) {
    co_await sem.acquire();
    // ... 执行受限工作
    sem.release();
    co_return;
}

5. 使用建议

  • 使用 try_acquire() 实现“尽力而为”的快速路径;失败时走其它分支。
  • 使用 acquire() 表达“必须获得许可才能继续”。
  • 不要在未获得许可时调用 release() 来“抵消”;这会破坏许可语义。

进阶

本章收录一些偏工程实践与进阶语义的主题:

任务取消机制

ASCO 的“任务取消”主要面向由 runtime 调度的任务(join_handle<T>)。它的目标是:

  • 由外部请求取消一个正在运行/挂起的任务;
  • 任务内可以通过 token 观察取消请求,并注册回调执行清理/通知;
  • 取消发生并被处理后,该任务会被终止,不再继续执行。

术语:本文的“任务”指 spawn(...) 产生的 runtime 任务(join_handle)。


1. 取消的组成

相关 API 位于:

  • asco/core/cancellation.h
  • asco/this_task.h
  • asco/join_handle.h

核心类型:

  • asco::core::cancel_source:取消信号的来源
    • request_cancel():请求取消(仅发出 stop 请求)
    • invoke_callbacks():执行已注册的取消回调(LIFO 顺序)
  • asco::core::cancel_token:取消信号的观察者
    • cancel_requested():查询是否已请求取消
    • close_cancellation() / cancellation_closed():关闭/查询“取消关闭”状态
  • asco::core::cancel_callback
    • 基于 token 注册一个回调;对象析构时自动注销

任务内入口(当前正在运行的任务):

  • asco::this_task::get_cancel_token():获取当前任务的 cancel_token
  • asco::this_task::close_cancellation():关闭当前任务的取消(见第 4 节)

2. 如何取消一个任务:join_handle::cancel()

外部取消的标准方式是:对 join_handle<T> 调用并等待 cancel()

#include <asco/future.h>
#include <asco/yield.h>

using namespace asco;

future<void> example_cancel(join_handle<void> &h) {
    h.cancel();

    bool cancelled = false;
    try {
        co_await h;  // join
    } catch (core::coroutine_cancelled &) {
        cancelled = true;
    }

    // cancelled == true
    (void)cancelled;
}

行为要点:

  • cancel() 会给该任务的 cancel_source 发出 stop 请求。
  • 取消请求被处理后,会执行已注册的取消回调(cancel_callback),并终止该任务的后续执行。
  • 被取消的 join_handleco_await 时会抛出 asco::core::coroutine_cancelled

3. 任务内如何响应取消

3.1 首选:注册取消回调(cancel_callback

如果你的代码需要在取消发生时执行某些动作(比如设置 flag、通知别的协程/线程),可以注册回调:

#include <atomic>
#include <asco/cancellation.h>
#include <asco/this_task.h>

using namespace asco;

future<void> with_cancel_callback(std::atomic_bool &flag) {
    auto &token = this_task::get_cancel_token();
    core::cancel_callback cb{token, [&]() { flag.store(true, std::memory_order::release); }};

    while (!flag.load(std::memory_order::acquire)) {
        co_await this_task::yield();
    }

    co_return;
}

语义:

  • 回调在取消请求被处理时执行。
  • 回调的常见用途是:触发一次“通知/标记/释放资源/恢复状态”的动作。

回调顺序与生命周期:

  • 取消回调以 LIFO 顺序执行。

3.2 补充:查询取消请求(cancel_requested()

如果你希望在协程的“安全点”主动结束逻辑,可以轮询当前任务的取消状态:

#include <asco/this_task.h>

using namespace asco;

future<void> worker_loop() {
    while (true) {
        if (this_task::get_cancel_token().cancel_requested()) {
            co_return;
        }
        co_await this_task::yield();
    }
}

语义:

  • cancel_requested()true 时,表示任务已被请求取消。
  • 该写法适合:你需要在循环/阶段边界按自己的方式收尾并退出。

4. 关闭取消:this_task::close_cancellation()

某些关键区段你可能希望“禁止外部取消”,以避免破坏内部一致性。ASCO 提供:

#include <asco/this_task.h>

using namespace asco;

future<void> critical_section() {
    this_task::close_cancellation();

    // ... 做一些你不希望被外部打断的工作

    co_return;
}

行为语义:

  • 调用 this_task::close_cancellation() 后,该任务会进入“取消已关闭”状态。
  • 若之后仍对该任务发出取消请求,会触发 panic
  • 取消关闭是单向的:关闭后无法再打开。

因此建议:

  • 只在你能证明“此后不会再被外部取消”的场景使用它;
  • 更常见的做法是:不关闭取消,而是让任务在安全点自行检查 token 并退出。

5. 常见坑与建议

  • this_task::get_cancel_token() / close_cancellation() 只能在 runtime 中调用;不在 runtime 会 panic
  • 取消回调适合做“通知/打点/设置标志/清理资源/恢复状态”,不要在回调里做复杂阻塞操作。
  • 如果你需要让自定义 awaiter 支持取消:在 await_suspend 时用 cancel_callback 监听 token,并在回调里安排唤醒/中断(可参考仓库内 tests/cancellation.cpp 的 awaiter 写法)。

任务本地存储(Task-local storage)

Task-local storage(下文简称 TLS)用于为“单个任务(task)”保存一份上下文数据,并在该任务的整个协程调用链中随时访问。

它和 thread_local 的核心区别是:

  • thread_local 绑定线程;task_local 绑定任务(由 ASCO runtime 调度执行)。
  • task_local 在同一任务内跨 co_await / yield() 保持一致与可见。

典型用途:请求/追踪上下文、每任务统计、逻辑上的“隐式参数”等。

快速开始

1)定义 TLS 类型

asco 的 TLS 以“类型”为键:一个任务里只存放一份 TLS 对象。 如果你需要多份数据,把它们聚合到一个结构体里即可。

struct my_tls {
    int request_id{};
    std::string trace_id;
};

2)在 spawn() / block_on() 时提供初始值

创建任务时把 TLS 实例作为第二个参数传入:

#include <asco/core/runtime.h>

auto h = asco::spawn(
    []() -> asco::future<void> {
        // ...
        co_return;
    },
    my_tls{.request_id = 42, .trace_id = "abc"});

如果不需要 TLS,直接使用 spawn(fn) 即可。

3)在任务内部访问与修改

在任务(协程)内部,通过 asco::this_task::task_local<T>() 获取 TLS 的引用

#include <asco/this_task.h>

auto &tls = asco::this_task::task_local<my_tls>();
tls.request_id += 1;

同一任务内多次调用 task_local<T>() 会返回同一对象的引用;写入会在后续 co_await / yield() 后仍然可见。

语义说明

同一任务内:引用稳定、跨挂起保持

在同一个任务中:

  • task_local<T>() 返回的引用在该任务生命周期内保持稳定(同一对象)。
  • 对 TLS 的修改在 co_await / this_task::yield() 之后仍然能读到。

不同任务之间:相互隔离、不会继承

TLS 不会从父任务自动继承到子任务

也就是说:父任务 spawn() 了一个子任务,子任务要想使用 TLS,必须在创建子任务时显式传入它自己的 TLS 初始值;子任务对 TLS 的修改不会影响父任务。

如果你希望“父子任务共享状态”,应显式使用共享对象(例如 std::shared_ptr<state>)并把它作为 TLS 的成员或直接作为任务参数传递。

类型必须匹配

TLS 是按类型存取的:

  • 你在 spawn(fn, tls_value) 里传入的 TLS 类型是 T,那么任务内部必须用 task_local<T>() 访问。
  • 如果使用了不同的类型访问,会触发断言失败(类型安全检查)。

因此,一个任务只会有一份 TLS:想存多种值,请把它们放进同一个结构体。

生命周期与析构时机

构造

TLS 对象由 spawn() / block_on() 的第二个参数初始化(以转发/移动方式构造)。

可访问范围

task_local<T>() 仅在“当前正在执行的任务”内部可用。

  • 若当前没有正在运行的任务,task_local<T>() 会触发 panic(提示“当前没有正在运行的任务”)。
  • 不要在 runtime 之外调用它。

析构

TLS 会在任务执行期间一直存在,并且至少存活到任务结束。

析构时机不做精确保证,但可以依赖以下语义:

  • 若你在任务结束后仍持有对应的 join_handle,TLS 可能继续存活,从而延迟资源回收。
  • 若你提前销毁 join_handle,TLS 也不会因此在任务尚未结束时提前析构。

建议:不要把 TLS 析构发生的“确切时刻”当作业务语义;若需要确定性的清理,应在任务函数内部显式管理资源生命周期。

常见模式与建议

  • 把 TLS 当作“每任务上下文”,不要用它做跨任务共享。
  • 需要传播上下文时,显式把上下文作为 spawn() 的第二个参数传给新任务。

最小示例

下面示例展示:初始值来自 spawn(),修改跨 yield() 保持,且同一任务内引用稳定。

struct tls_int { int value; };

auto h = asco::spawn(
    []() -> asco::future<int> {
        auto &tls = asco::this_task::task_local<tls_int>();
        tls.value += 1;
        co_await asco::this_task::yield();
        co_return asco::this_task::task_local<tls_int>().value;
    },
    tls_int{41});

int result = co_await h; // result == 42

asco::core::daemon:后台守护线程基类

asco::core::daemon 用于把一个“循环执行的后台工作”封装为一个对象。

  • daemon 在内部启动一个后台线程。
  • 线程按固定生命周期运行:init() → 重复调用 run_once(...)shutdown()
  • 对象析构时会请求线程停止,并等待线程退出。

头文件:asco/core/daemon.h


1. 生命周期

1.1 启动:start()

start() 启动后台线程,并返回一个 init_waiter

语义:

  • 后台线程开始后会先调用 init()
  • init_waiter 析构时会等待 init() 完成。

典型用法(在派生类构造函数中启动,并等待初始化完成):

#include <asco/core/daemon.h>

struct my_daemon : asco::core::daemon {
    my_daemon() : daemon("my-daemon") {
        auto _ = start();
        // 这里开始可以认为 init() 已经完成
    }

    bool init() override;
    bool run_once(std::stop_token &st) override;
    void shutdown() override;
};

start() 是受保护成员函数,只能在 daemon 的派生类内部调用。

1.2 停止:析构函数

daemon 对象析构时:

  • 会请求后台线程停止(通过 std::stop_token 传递 stop 请求)。
  • 会调用一次 awake(),用于唤醒正在等待的后台线程。
  • 会等待后台线程退出。

2. 线程执行逻辑:init() / run_once() / shutdown()

2.1 init()

virtual bool init();

语义:

  • 在后台线程开始工作前调用一次。
  • 返回 true 表示初始化成功;返回 false 表示启动失败。
  • init() 返回 false 时,后台线程不会进入 run_once() 循环,并会调用 shutdown()

2.2 run_once(std::stop_token &st)

virtual bool run_once(std::stop_token &st);

语义:

  • 在后台线程中反复调用。
  • st.stop_requested()true 时,调用方应尽快结束当前轮次并返回。
  • 返回 true 表示继续下一轮;返回 false 表示结束循环。

2.3 shutdown()

virtual void shutdown();

语义:

  • 在线程退出前调用一次,用于资源释放与收尾。
  • 无论 init() 失败还是正常退出循环,都会调用 shutdown()

3. 唤醒与等待:awake()sleep_until_awake*

3.1 awake()

void awake();

语义:

  • 使一次等待中的 sleep_until_awake* 结束等待。
  • 可由任何线程调用。

3.2 等待:sleep_until_awake...

daemon 提供一组等待函数,供 run_once() 在“无事可做时”进入等待:

  • sleep_until_awake():等待直到被 awake() 唤醒。
  • sleep_until_awake_for(duration):等待直到被唤醒或超时。
  • sleep_until_awake_before(time_point):等待直到被唤醒或到达指定时间点。

语义:

  • 这些函数在当前线程内等待。
  • 发生唤醒或超时后返回。
  • 这些函数不返回“唤醒原因”;如需区分原因,请在 run_once() 中自行检查条件。

4. 使用建议

  • run_once() 中优先使用 sleep_until_awake* 进入等待;这样析构时的停止请求能更快生效。
  • run_once() 应避免无限阻塞:若必须等待外部事件,建议设置超时并周期性检查 st.stop_requested()
  • awake() 表示“有新工作/状态变化”;调用 awake() 前后如何更新共享状态由派生类自行约定。

测试框架

ASCO 自带一个轻量级测试框架,用于为协程/异步代码编写测试用例。

它并不只用于本项目自身:只要你的程序/库是基于 ASCO 构建的(能链接到 asco::test,通常还需要 asco::core),就可以直接复用这套测试框架来编写与运行测试。

它的核心特点是:

  • 测试用例本身是 future<test_result>,可以在测试里直接 co_await
  • 同一个测试可执行文件中可注册多个用例,运行时会执行全部用例并汇总结果。
  • 与 CMake/CTest 集成:每个测试目标是一个可执行文件,通过 add_test() 接入。

使用方式:

  • 用例用 ASCO_TEST(name) 声明。
  • 断言用 ASCO_CHECK(...)
  • 用例成功结束用 ASCO_SUCCESS()
  • 测试可执行文件需要链接 asco::test

编写测试用例

1) 基本结构

每个用例通过 ASCO_TEST(name) 声明,其函数体是一个协程(返回 future<test_result>):

#include <asco/test/test.h>

using namespace asco;

ASCO_TEST(my_first_test) {
    ASCO_CHECK(1 + 1 == 2, "math broken: {}", 1 + 1);
    ASCO_SUCCESS();
}
  • name 会作为测试名显示在输出中(例如 my_first_test)。
  • 每个测试必须ASCO_SUCCESS() 结束(或在失败处提前 co_return std::unexpected{...})。

2) 断言:ASCO_CHECK

ASCO_CHECK(expr, fmt, ...) 用于断言:

  • expr 为假时,测试立即失败并 co_return std::unexpected{...}
  • 失败信息包含:
    • 你提供的格式化消息(std::format(fmt, ...)
      • 当前源码位置(文件名/行/列)

建议:

  • fmt 清晰描述期望条件与实际情况
  • 对于会等待的异步条件,配合 co_await this_task::yield() 自旋等待(见下文示例)

3) 验证 panic 行为

  • 在测试框架下,asco::panic(...) 会抛出 asco::panicked,从而允许你在测试中捕获它,用于验证“应该 panic 的错误行为”。
  • asco::panicked 不能通过 std::exception & 捕获,请按 asco::panicked & 捕获。

示例:

#include <asco/panic.h>

ASCO_TEST(expect_panic) {
    bool caught = false;
    try {
        asco::panic("boom");
    } catch (asco::panicked &e) {
        (void)e; // 如需信息可用 e.to_string()
        caught = true;
    }

    ASCO_CHECK(caught, "panic should throw asco::panicked under ASCO_TESTING");
    ASCO_SUCCESS();
}

4) 异步/并发测试写法

测试是协程,因此可以自然地写异步逻辑,例如:

  • co_await sem.acquire() / co_await join_handle
  • spawn([&] -> future<void> { ... }) 启动并发任务
  • co_await this_task::yield() 让出执行权

一个常用的小工具模式是“等待条件成立”:

#include <functional>
#include <asco/this_task.h>

template<class Pred>
asco::future<bool> wait_until(Pred &&pred, std::size_t max_spins = 4096) {
    for (std::size_t i = 0; i < max_spins; i++) {
        if (std::invoke(pred)) co_return true;
        co_await asco::this_task::yield();
    }
    co_return false;
}

再用 ASCO_CHECK(co_await wait_until(...), "...") 来避免无限等待。

新增一个测试目标(接入 CTest)

新增一个测试目标的最小步骤:

  1. 新建一个测试源文件(例如 tests/channel.cpp),写入若干 ASCO_TEST(...) 用例。
  2. tests/CMakeLists.txt 中添加可执行文件,并用 CTest 注册:
add_executable(test_channel channel.cpp)
target_link_libraries(test_channel PRIVATE asco::core asco::test)

add_test(channel test_channel)

说明:

  • 链接 asco::test 后,不需要再为测试可执行文件提供 main()
  • 如果测试用到了 runtime/同步原语等功能,需要同时链接 asco::core

运行测试

方式 A:使用 CTest

在已生成构建目录(例如 build/)的情况下:

  • 运行全部测试:
ctest --test-dir build --output-on-failure
  • 只跑某一个测试(按 CTest 名称过滤,例如 semaphore):
ctest --test-dir build -R semaphore --output-on-failure

方式 B:直接运行测试可执行文件

你也可以直接运行构建产物,例如:

./build/tests/test_semaphore

当某个用例失败时,程序会以非 0 退出码结束,便于 CI/脚本判定。

命名与建议

  • 测试用例名:建议使用 模块_场景_期望 的风格(例如 semaphore_acquire_blocks_and_release_wakes)。
  • 一个测试文件可以包含多个 ASCO_TEST;按功能聚合(例如 semaphore.cpp 放信号量相关)。
  • 尽量避免依赖睡眠/真实时间;优先用 yield + 条件等待来保证测试稳定性。