CyberRT共享内存通信原理详解
CyberRT 共享内存通信原理详解目录概述整体架构核心数据结构内存管理同步机制通知机制通信流程详解高级特性Arena 消息代码示例概述CyberRT 是 Apollo 自动驾驶平台的通信框架共享内存Shared Memory简称 SHM是其实现高吞吐量、低延迟进程间通信IPC的核心机制之一。为什么使用共享内存相比其他 IPC 方式如 Socket、管道共享内存具有以下优势零拷贝或极少拷贝多个进程直接访问同一块物理内存低延迟避免了内核态与用户态之间的频繁切换高吞吐量适合高频大数据传输场景整体架构核心组件┌─────────────────────────────────────────────────────────────────┐ │ 应用层Writer/Reader │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ ShmTransmitter / ShmReceiver │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ ShmDispatcher分发器 │ │ ┌──────────────────────┐ ┌──────────────────────┐ │ │ │ Notifier通知器 │◄───────►│ Segment段 │ │ │ └──────────────────────┘ └──────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 共享内存区域物理内存 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ State │ │ Blocks[] │ │ Arena │ │ Block │ │ │ │ │ │ │ │ Blocks[] │ │ Buffers │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ └─────────────────────────────────────────────────────────────────┘关键组件说明ShmTransmitter: 发送端负责将消息写入共享内存ShmReceiver: 接收端负责从共享内存读取消息ShmDispatcher: 单例分发器管理所有共享内存段和通知Notifier: 通知器用于进程间消息通知Segment: 共享内存段每个 Channel 对应一个 SegmentBlock: 内存块用于存储单个消息核心数据结构1. State共享内存状态State位于共享内存的开头用于管理整个共享内存段的元数据。位置:cyber/transport/shm/state.hclassState{private:std::atomicboolneed_remap_{false};// 是否需要重新映射std::atomicuint32_tseq_{0};// 普通消息序列号std::atomicuint32_tarena_seq_{0};// Arena消息序列号std::atomicuint32_treference_count_{0};// 引用计数std::atomicuint64_tceiling_msg_size_;// 最大消息大小};关键成员解释seq_轮询分配 Block 时使用的序列号通过FetchAddSeq(1)递增reference_count_记录使用该共享内存段的进程数用于安全销毁need_remap_当共享内存需要重建时标记2. Block内存块Block管理单个消息块的锁和元数据。位置:cyber/transport/shm/block.hclassBlock{private:std::atomicint32_tlock_num_{0};// 读写锁计数器uint64_tmsg_size_;// 消息大小uint64_tmsg_info_size_;// 消息信息大小};读写锁机制kRWLockFree 0空闲状态kWriteExclusive -1写者独占 0读者数量允许多个读者同时读3. WritableBlock / ReadableBlock用于操作 Block 的结构体。位置:cyber/transport/shm/segment.hstructWritableBlock{uint32_tindex0;// Block 在数组中的索引Block*blocknullptr;// 指向 Block 的指针uint8_t*bufnullptr;// 指向实际数据缓冲区的指针};usingReadableBlockWritableBlock;4. ReadableInfo可读信息用于在进程间通知哪个 Block 有新消息。位置:cyber/transport/shm/readable_info.hclassReadableInfo{private:uint64_thost_id_;// 主机IDint32_tblock_index_;// 普通Block索引int32_tarena_block_index_;// Arena Block索引uint64_tchannel_id_;// Channel ID};内存管理1. 共享内存段布局每个 Channel 对应一个共享内存段布局如下┌───────────────────────────────────────────────────────────────┐ │ Offset 0 │ State │ ├───────────────────┼────────────────────────────────────────────┤ │ Offset sizeof(State) │ Block[0], Block[1], ..., Block[N-1] │ ├───────────────────┼────────────────────────────────────────────┤ │ ... │ ArenaBlock[0], ..., ArenaBlock[M-1] │ ├───────────────────┼────────────────────────────────────────────┤ │ ... │ BlockBuf[0], BlockBuf[1], ... │ │ │ (每个BlockBuf对应一个Block) │ ├───────────────────┼────────────────────────────────────────────┤ │ ... │ ArenaBlockBuf[0], ... │ └───────────────────────────────────────────────────────────────┘2. ShmConf共享内存配置ShmConf根据消息大小动态计算所需的内存配置。位置:cyber/transport/shm/shm_conf.h消息大小档位消息范围天花板大小Block 数量0 - 10K16K51210K - 100K128K128100K - 1M1M641M - 6M8M326M - 10M16M1610M32M83. Segment 类Segment是共享内存段的抽象基类有两种实现PosixSegment使用 POSIX 共享内存shm_openXsiSegment使用 System V 共享内存shmget关键方法// 写入流程boolAcquireBlockToWrite(size_t msg_size,WritableBlock*writable_block);voidReleaseWrittenBlock(constWritableBlockwritable_block);// 读取流程boolAcquireBlockToRead(ReadableBlock*readable_block);voidReleaseReadBlock(constReadableBlockreadable_block);4. 内存初始化流程OpenOrCreate以PosixSegment::OpenOrCreate()为例cyber/transport/shm/posix_segment.cc:39创建共享内存文件intfdshm_open(shm_name_.c_str(),O_RDWR|O_CREAT|O_EXCL,0644);设置大小ftruncate(fd,conf_.managed_shm_size());映射到进程地址空间managed_shm_mmap(nullptr,conf_.managed_shm_size(),PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);在共享内存上构造对象state_new(managed_shm_)State(conf_.ceiling_msg_size());blocks_new(managed_shm_sizeof(State))Block[conf_.block_num()];这里使用了Placement New技术在已分配的内存上构造对象。同步机制1. Block 的读写锁关键这是最不容易理解的部分让我们详细分析。位置:cyber/transport/shm/block.cc写锁获取TryLockForWriteboolBlock::TryLockForWrite(){int32_trw_lock_freekRWLockFree;// 0if(!lock_num_.compare_exchange_weak(rw_lock_free,kWriteExclusive,// 从 0 变为 -1std::memory_order_acq_rel,std::memory_order_relaxed)){returnfalse;}returntrue;}原理使用原子 CAS 操作只有当lock_num_为 0空闲时才成功将其设置为 -1写独占。读锁获取TryLockForReadboolBlock::TryLockForRead(){int32_tlock_numlock_num_.load();if(lock_numkRWLockFree){// 如果正在被写returnfalse;}int32_ttry_times0;while(!lock_num_.compare_exchange_weak(lock_num,lock_num1,// 读者数 1std::memory_order_acq_rel,std::memory_order_relaxed)){try_times;if(try_timeskMaxTryLockTimes){returnfalse;}lock_numlock_num_.load();if(lock_numkRWLockFree){returnfalse;}}returntrue;}原理检查当前是否正在被写lock_num 0使用 CAS 原子递增读者计数重试最多 5 次锁释放voidBlock::ReleaseWriteLock(){lock_num_.fetch_add(1);// -1 1 0恢复空闲}voidBlock::ReleaseReadLock(){lock_num_.fetch_sub(1);// 读者数 -1}2. Block 分配策略轮询位置:cyber/transport/shm/segment.cc:295uint32_tSegment::GetNextWritableBlockIndex(){constautoblock_numconf_.block_num();while(1){uint32_ttry_idxstate_-FetchAddSeq(1)%block_num;if(blocks_[try_idx].TryLockForWrite()){returntry_idx;}}}原理序列号seq_原子递增对 Block 数量取模得到尝试索引尝试获取该 Block 的写锁如果失败继续下一个循环直到成功这是一个无锁的轮询分配策略保证了公平性。通知机制当发送者写完消息后需要通知接收者。CyberRT 提供了两种通知方式1. ConditionNotifier条件变量通知使用共享内存 信号量实现适合同一主机内的进程。位置:cyber/transport/shm/condition_notifier.h内部结构structIndicator{std::atomicuint64_tnext_seq{0};ReadableInfo infos[kBufLength];// 4096 个槽位uint64_tseqs[kBufLength]{0};};通知流程发送者Notify(info)→ 将 info 写入 ring buffer更新 seq接收者Listen(timeout, info)→ 轮询等待 seq 变化读取 info2. MulticastNotifier组播通知使用 UDP 组播适合跨主机通信。3. NotifierFactory工厂类根据配置创建对应的 Notifier。通信流程详解完整的发送流程让我们跟随一条消息从 Writer 到 Reader 的完整旅程。步骤 1初始化 ShmTransmitter位置:cyber/transport/transmitter/shm_transmitter.h:160voidShmTransmitter::Enable(){// 创建 Segmentsegment_SegmentFactory::CreateSegment(channel_id_);// 创建 Notifiernotifier_NotifierFactory::CreateNotifier();}步骤 2发送消息Transmit位置:cyber/transport/transmitter/shm_transmitter.h:2011. AcquireBlockToWrite(msg_size, wb) └─ 获取一个可写的 Block会阻塞直到成功 2. message::SerializeToArray(msg, wb.buf, msg_size) └─ 将 Protobuf 消息序列化到 Block 的缓冲区 3. wb.block-set_msg_size(msg_size) └─ 设置消息大小 4. msg_info.SerializeTo(msg_info_addr, MessageInfo::kSize) └─ 在消息后面追加 MessageInfo包含 seq, timestamp 等 5. ReleaseWrittenBlock(wb) └─ 释放写锁此时 Block 变为可读 6. notifier_-Notify(readable_info) └─ 通知接收者有新消息完整的接收流程步骤 1初始化 ShmReceiver 和 ShmDispatcher位置:cyber/transport/receiver/shm_receiver.h:62voidShmReceiver::Enable(){// 向 Dispatcher 添加监听器dispatcher_-AddListenerM(this-attr_,listener);}ShmDispatcher是单例内部有一个独立线程监听通知位置:cyber/transport/dispatcher/shm_dispatcher.ccvoidShmDispatcher::ThreadFunc(){while(!is_shutdown_.load()){ReadableInfo info;if(notifier_-Listen(100,info)){// 等待通知ReadMessage(info.channel_id(),info.block_index());ReadArenaMessage(info.channel_id(),info.arena_block_index());}}}步骤 2读取消息位置:cyber/transport/dispatcher/shm_dispatcher.cc1. AcquireBlockToRead(rb) └─ 获取读锁 2. message::ParseFromArray(rb-buf, msg_size, msg) └─ 反序列化消息 3. listener(msg, msg_info) └─ 调用用户注册的回调函数 4. ReleaseReadBlock(rb) └─ 释放读锁高级特性Arena 消息这是另一个不容易理解的部分Arena 消息是为了进一步减少内存拷贝而设计的优化。为什么需要 Arena 消息普通消息流程用户消息 → 序列化到共享内存 → 接收者从共享内存反序列化 两次拷贝/序列化Arena 消息流程用户直接在共享内存上构造消息 → 接收者直接使用 零拷贝Protobuf ArenaArena 是 Protobuf 提供的内存分配器可以在预分配的内存块上构造消息对象。位置:cyber/transport/shm/protobuf_arena_manager.hArena 消息的通信流程发送端AcquireMessage(msg)从 Arena 获取一个消息对象用户直接填充这个消息对象Transmit(msg)发送时只需要发送一个 wrapper包含指向 arena 中消息的指针接收端从 wrapper 中恢复出消息指针直接使用该消息无需反序列化消息析构时自动归还 arena 内存代码示例1. 基本使用示例Writer#includecyber/cyber.h#includecyber/proto/unit_test.pb.husingapollo::cyber::proto::Chatter;intmain(intargc,char**argv){apollo::cyber::Init(argv[0]);autonodeapollo::cyber::CreateNode(shm_writer);// 创建 Writer默认使用 SHMautowriternode-CreateWriterChatter(channel/chatter);automsgstd::make_sharedChatter();msg-set_timestamp(apollo::cyber::Time::Now().ToNanosecond());msg-set_seq(0);msg-set_content(Hello, SHM!);// 发送消息writer-Write(msg);apollo::cyber::WaitForShutdown();return0;}2. 基本使用示例Reader#includecyber/cyber.h#includecyber/proto/unit_test.pb.husingapollo::cyber::proto::Chatter;voidMessageCallback(conststd::shared_ptrChattermsg,constapollo::cyber::transport::MessageInfomsg_info){AINFOReceived message: seqmsg-seq(), contentmsg-content();}intmain(intargc,char**argv){apollo::cyber::Init(argv[0]);autonodeapollo::cyber::CreateNode(shm_reader);// 创建 Readerautoreadernode-CreateReaderChatter(channel/chatter,MessageCallback);apollo::cyber::WaitForShutdown();return0;}总结关键要点回顾每个 Channel 一个 SegmentChannel ID 作为共享内存名称Segment 包含多个 BlockBlock 是消息存储的基本单位原子读写锁使用std::atomic实现无锁读写同步轮询分配序列号 取模实现公平的 Block 分配双重通知Notifier 负责进程间的消息到达通知Arena 优化进一步减少序列化开销难以理解的点总结Placement New 的使用在共享内存上构造对象原子读写锁的实现lock_num_的三种状态转换轮询分配策略为什么seq % block_num能工作Arena 消息的零拷贝原理Protobuf Arena 与共享内存的结合通过这份文档你应该对 CyberRT 中共享内存通信的原理有了完整的理解