无锁连续队列 continuous_queue
导入头文件 <asco/nolock/continuous_queue.h>
使用无锁队列。
该结构提供一对生产者/消费者句柄:sender<T>
与 receiver<T>
,通过 create<T>()
一次性创建并绑定到同一个内部缓冲区链表。它是“无锁”的:数据入队/出队全程只使用原子操作与内存序,不持有互斥锁;同时采用按 CPU cache line 对齐的定长帧(frame)作为环形/链式缓冲,尽量减少伪共享与 cache 抖动。
适用场景:
- 单个 sender 与单个 receiver 的高吞吐、低延迟消息传递(也允许多个 sender/receiver 拷贝同一端句柄并发使用,但每个 sender/receiver 自身不是线程安全对象)。
- T 的移动/构造开销较低、且异常保证良好的场景。
注意:
sender
与receiver
都不是线程安全类型,但可以拷贝多个句柄在不同线程上使用(内部通过引用计数和原子指针维护帧链表)。- T 需满足
move_secure
概念:移动安全。 - 原则上不允许 T 的构造函数抛出异常。
核心概念
frame<T>
: 4KB 对齐、定长数据页,包含三组游标与状态:- head: 消费进度(接收方已领取的下标),初始 preset 为 0;
- tail: 生产进度(发送方已保留的尾下标),满时置为
index_nullopt
; - released: 生产方串行释放游标,保证写入与可见性的顺序;
- next: 单向链表指向“下一帧”;
- sender_stopped/receiver_stopped: 双方停止标志,用于优雅关闭;
- refcount: 对帧的共享引用计数,保障跨句柄与跨帧切换时的生命周期安全;
- 每帧可容纳
length = (4096 - header_size) / sizeof(T)
个元素,元素区按alignof(T)
对齐;
接口速览
std::tuple<sender<T>, receiver<T>> create<T>()
:创建一对句柄。sender<T>::push(T|T&&) -> std::optional<T>
:- 入队成功返回
std::nullopt
; - 若队列已关闭或一方停止,则返回“未消费的值”以便调用者自行处理(避免丢失)。
- 入队成功返回
receiver<T>::pop() -> std::expected<T, receiver::pop_fail>
:- 成功返回元素;
- 失败返回
pop_fail::non_object
(暂时无元素)或pop_fail::closed
(队列关闭)。
sender::stop()
/receiver::stop()
:设置全链路停止标志,释放当前持有的帧引用;is_stopped()
: 查询端是否已停止/关闭(含对方停止的传播)。
使用示例
#include <asco/nolock/continuous_queue.h>
namespace cq = asco::continuous_queue;
void producer_consumer_demo() {
auto [tx, rx] = cq::create<int>();
// 生产者线程/协程:
for (int i = 0; i < 1000; ++i) {
if (auto unconsumed = tx.push(i)) {
// 队列已关闭或对端停止,处理未消费数据
break;
}
}
tx.stop(); // 可选:显式结束生产
// 消费者线程/协程:
while (true) {
auto r = rx.pop();
if (r) {
int v = *r;
// 使用 v
} else if (r.error() == decltype(rx)::pop_fail::non_object) {
// 当前无元素,可自旋/让出/休眠
continue;
} else {
// closed
break;
}
}
}
行为与内存模型
- push 路径:
- CAS 递增 tail,若满则尝试跳帧(确保 next 存在,不存在则分配新 frame 并链接);
- 在计算出的 index 处原位构造元素;
- 按序等待
released == index
,随后将released = index + 1
,序列化生产可见性;
- pop 路径:
- CAS 递增 head,若 head 达到 length 则尝试跳到 next 帧;
- 若暂时无元素,返回
non_object
;若检测到停止且无更多元素,返回closed
; - 取出并移动/拷贝元素后显式析构存储槽位;
- 停止传播:
stop()
会沿帧链标记sender_stopped
或receiver_stopped
,另一端在检查到标志且无待处理元素时认为关闭;
内存序:
- 关键原子使用 acquire/release 或 acq_rel,保证跨核可见性;
- 使用
std::hardware_destructive_interference_size
对齐关键字段,降低伪共享;
复杂度与性能提示
- push/pop 常数时间,跨帧时有轻微分配/链接成本(有 freelist 缓解);
- 单生产/单消费路径下最大化局部性;多 sender/receiver 并发下仍是无锁,但存在轻度竞争自旋;
- 为极低延迟,push 在等待 released 时做短自旋,超过阈值后
cpu_relax()
;可根据场景考虑在上层增加让出/休眠策略;
正确性与限制
- T 的析构至少不应在队列析构路径上抛出;
- 对 T 的对齐要求已被 frame header 设计满足(length>16 推导出 sizeof/alignof 约束);
- sender/receiver 自身不是线程安全对象;若需多线程使用,请复制句柄,每个句柄独占所在线程;
- 不提供有界背压信号,仅当帧满时自动扩展到下一帧;如需限流请在上层实现;
与 asco 运行时的关系
该容器完全在用户态工作,不依赖调度器或阻塞系统调用;适合在 asco 协程调度环境中作为跨任务的高效消息通道,亦可用于普通线程环境。
API 参考
- 命名空间:
asco::nolock::continuous_queue
:原始实现asco::continuous_queue
:re-export 别名
- 类型:
sender<T>
:生产端receiver<T>
:消费端(pop_fail { non_object, closed }
)
- 函数:
create<T>() -> tuple<sender<T>, receiver<T>>
sender::push(T|T&&) -> optional<T>
receiver::pop() -> expected<T, pop_fail>
sender::stop() / receiver::stop()
sender::is_stopped() / receiver::is_stopped()
调试与故障排查
- 若频繁返回
non_object
,可能是生产/消费速率不匹配;考虑在消费者侧加入自适应让出策略; - 若过早返回
closed
,检查是否一端调用了stop()
或已经释放了所有句柄; - 大对象 T 可能降低每帧容量与缓存局部性,必要时改为传输指针/句柄而非大对象本体。