Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

无锁连续队列 continuous_queue

导入头文件 <asco/nolock/continuous_queue.h> 使用无锁队列。

该结构提供一对生产者/消费者句柄:sender<T>receiver<T>,通过 create<T>() 一次性创建并绑定到同一个内部缓冲区链表。它是“无锁”的:数据入队/出队全程只使用原子操作与内存序,不持有互斥锁;同时采用按 CPU cache line 对齐的定长帧(frame)作为环形/链式缓冲,尽量减少伪共享与 cache 抖动。

适用场景:

  • 单个 sender 与单个 receiver 的高吞吐、低延迟消息传递(也允许多个 sender/receiver 拷贝同一端句柄并发使用,但每个 sender/receiver 自身不是线程安全对象)。
  • T 的移动/构造开销较低、且异常保证良好的场景。

注意:

  • senderreceiver 都不是线程安全类型,但可以拷贝多个句柄在不同线程上使用(内部通过引用计数和原子指针维护帧链表)。
  • T 需满足 move_secure 概念:移动安全。
  • 原则上不允许 T 的构造函数抛出异常。

核心概念

  • frame<T>: 4KB 对齐、定长数据页,包含三组游标与状态:
    • head: 消费进度(接收方已领取的下标),初始 preset 为 0;
    • tail: 生产进度(发送方已保留的尾下标),满时置为 index_nullopt
    • released: 生产方串行释放游标,保证写入与可见性的顺序;
    • next: 单向链表指向“下一帧”;
    • sender_stopped/receiver_stopped: 双方停止标志,用于优雅关闭;
    • refcount: 对帧的共享引用计数,保障跨句柄与跨帧切换时的生命周期安全;
  • 每帧可容纳 length = (4096 - header_size) / sizeof(T) 个元素,元素区按 alignof(T) 对齐;

接口速览

  • std::tuple<sender<T>, receiver<T>> create<T>():创建一对句柄。
  • sender<T>::push(T|T&&) -> std::optional<T>
    • 入队成功返回 std::nullopt
    • 若队列已关闭或一方停止,则返回“未消费的值”以便调用者自行处理(避免丢失)。
  • receiver<T>::pop() -> std::expected<T, receiver::pop_fail>
    • 成功返回元素;
    • 失败返回 pop_fail::non_object(暂时无元素)或 pop_fail::closed(队列关闭)。
  • sender::stop() / receiver::stop():设置全链路停止标志,释放当前持有的帧引用;
  • is_stopped(): 查询端是否已停止/关闭(含对方停止的传播)。

使用示例

#include <asco/nolock/continuous_queue.h>

namespace cq = asco::continuous_queue;

void producer_consumer_demo() {
    auto [tx, rx] = cq::create<int>();

    // 生产者线程/协程:
    for (int i = 0; i < 1000; ++i) {
        if (auto unconsumed = tx.push(i)) {
            // 队列已关闭或对端停止,处理未消费数据
            break;
        }
    }
    tx.stop(); // 可选:显式结束生产

    // 消费者线程/协程:
    while (true) {
        auto r = rx.pop();
        if (r) {
            int v = *r;
            // 使用 v
        } else if (r.error() == decltype(rx)::pop_fail::non_object) {
            // 当前无元素,可自旋/让出/休眠
            continue;
        } else {
            // closed
            break;
        }
    }
}

行为与内存模型

  • push 路径:
    1. CAS 递增 tail,若满则尝试跳帧(确保 next 存在,不存在则分配新 frame 并链接);
    2. 在计算出的 index 处原位构造元素;
    3. 按序等待 released == index,随后将 released = index + 1,序列化生产可见性;
  • pop 路径:
    1. CAS 递增 head,若 head 达到 length 则尝试跳到 next 帧;
    2. 若暂时无元素,返回 non_object;若检测到停止且无更多元素,返回 closed
    3. 取出并移动/拷贝元素后显式析构存储槽位;
  • 停止传播:stop() 会沿帧链标记 sender_stoppedreceiver_stopped,另一端在检查到标志且无待处理元素时认为关闭;

内存序:

  • 关键原子使用 acquire/release 或 acq_rel,保证跨核可见性;
  • 使用 std::hardware_destructive_interference_size 对齐关键字段,降低伪共享;

复杂度与性能提示

  • push/pop 常数时间,跨帧时有轻微分配/链接成本(有 freelist 缓解);
  • 单生产/单消费路径下最大化局部性;多 sender/receiver 并发下仍是无锁,但存在轻度竞争自旋;
  • 为极低延迟,push 在等待 released 时做短自旋,超过阈值后 cpu_relax();可根据场景考虑在上层增加让出/休眠策略;

正确性与限制

  • T 的析构至少不应在队列析构路径上抛出;
  • 对 T 的对齐要求已被 frame header 设计满足(length>16 推导出 sizeof/alignof 约束);
  • sender/receiver 自身不是线程安全对象;若需多线程使用,请复制句柄,每个句柄独占所在线程;
  • 不提供有界背压信号,仅当帧满时自动扩展到下一帧;如需限流请在上层实现;

与 asco 运行时的关系

该容器完全在用户态工作,不依赖调度器或阻塞系统调用;适合在 asco 协程调度环境中作为跨任务的高效消息通道,亦可用于普通线程环境。

API 参考

  • 命名空间:
    • asco::nolock::continuous_queue:原始实现
    • asco::continuous_queue:re-export 别名
  • 类型:
    • sender<T>:生产端
    • receiver<T>:消费端(pop_fail { non_object, closed }
  • 函数:
    • create<T>() -> tuple<sender<T>, receiver<T>>
    • sender::push(T|T&&) -> optional<T>
    • receiver::pop() -> expected<T, pop_fail>
    • sender::stop() / receiver::stop()
    • sender::is_stopped() / receiver::is_stopped()

调试与故障排查

  • 若频繁返回 non_object,可能是生产/消费速率不匹配;考虑在消费者侧加入自适应让出策略;
  • 若过早返回 closed,检查是否一端调用了 stop() 或已经释放了所有句柄;
  • 大对象 T 可能降低每帧容量与缓存局部性,必要时改为传输指针/句柄而非大对象本体。