目录
0 Introduction
What
Why
Advantage
1 Example
2 Concept
3 Flow Chart
4 Module
4.1 Transport
4.1.1.1 Segment
4.1.1.1.1 State
4.1.1.1.2 Block
4.1.1.1.3 Common
4.1.1.2 Notifier
4.1.1.2.1 ConditionNotifier
4.1.1.2.2 MulticastNotifier
4.1.2 Transmitter
4.1.2.1 IntraTransmitter
4.1.2.2 ShmTransmitter
4.1.2.3 RtpsTransmitter
4.1.2.4 HybridTransmitter
4.1.3 Receiver
4.1.3.1 IntraReceiver
4.1.3.2 ShmReceiver
4.1.3.3 RtpsReceiver
4.1.3.4 HybridReceiver
4.1.4 Summary
4.2 Service Discovery
4.2.1 Partcipant
4.2.2 Container
4.2.2.1 SingleValueWarehouse
4.2.2.2 MultiValueWarehouse
4.2.2.3 Graph
4.2.3 NodeManager
4.2.4 ServiceManager
4.2.5 ChannelManager
4.2.6 Summary
4.3 Scheduler
4.3.1 CRoutine
4.3.1.1 State
4.3.1.2 Context
4.3.1.3 ContextPool
4.3.1.4 Atomic_flag
4.3.1.5 Function
4.3.2 Processor
4.3.3 SchedulerConf
4.3.4 Classic
4.3.4.1 ClassicContext
4.3.4.2 SchedulerClassic
4.3.5 Choreography
4.3.5.1 ChoreographyContext
4.3.5.2 SchedulerChoreography
4.3.6 CRoutine Pool
4.3.6.1 TaskManager
4.3.6.2 Task
4.3.7 Summary
4.4 Data
4.4.1 CacheBuffer
4.4.2 ChannelBuffer
4.4.3 DataNotifier
4.4.4 DataDispatcher
4.4.5 DataVisitor
4.4.6 Summary
4.5 Node
4.5.1 Writer
4.5.2 Reader
4.5.2.1 Blocker
4.5.2.2 ReceiverManager
4.5.3 Client
4.5.4 Service
4.5.5 Summary
5 Others
5.1 ClassLoaderModule
5.1.1 ClassLoaderManager
5.1.2 ClassLoader
5.1.4 Utility
5.2 Mainboard
5.2.1 ModuleArgument
5.2.2 ModuleController
5.2.3 Main
5.3 Base
5.3.1 Signal
5.3.1.1 Slot
5.3.1.2 Signal
5.3.1.3 Connection
5.3.2 BoundedQueue
5.4 Timer
5.4.1 TimerTask
5.4.2 TimerBucket
5.4.3 TimingWheel
References
0 Introduction
What
cyber 是⼀个专⻔为⾃动驾驶场景设计的运⾏时框架。 它基于集中式计算模型 ,在性能、延迟和数据吞吐量⽅⾯进⾏了⾼度优化。 [1]
以下简称框架
Why
-
为什么要写这个⽂档
-
对于熟悉cyber的⼈ ,可以通过⽂档查缺补漏
-
对于不熟悉cyber的⼈ ,希望可以通过⽂档学习框架整体视图与详细逻辑 ,是⼊⻔⼿册,⽆需到处搜索⽂档看
-
作为组内分享提纲使⽤
-
为什么会有 cyber [1]
在⾃动驾驶场景中,需要的是⼀个有效的集中式计算模型,对⾼性能有要求,包括⾼并发、低延迟和⾼吞吐量
已经从开发转向产品化,随着在现实世界中的⼤规模部署,我们看到了对最⾼鲁棒性和⾼性能的需求。这就是为什么我们花了数年时间构建框架,以满⾜⾃动驾驶解决⽅案的要求
Advantage
使⽤ cyber 的主要好处:[1]
加速开发
具有数据融合功能的定义明确的任务接⼝
⼀系列开发⼯具
⼤量传感器驱动程序
简化部署
⾼效⾃适应的消息通信
具有资源意识的可配置⽤⼾级调度程序
可移植,依赖更少
赋能⾃动驾驶
默认的运⾏时框架
为⾃动驾驶搭建专⽤模块
1 Example
就拿推荐的 talker、listener ⼊⼿,在 apollo 的编译产出⽬录中分别启动以上两个⼯具
终端 1
source cyber/setup.bash
export GLOG_alsologtostderr=1
./bazel-bin/cyber/examples/talker
终端 2
source cyber/setup.bash
export GLOG_alsologtostderr=1
./bazel-bin/cyber/examples/listener
⽇志
Talker
I0831 12:09:54.647799 2441916 talker.cc:43] [talker]talker sent a message! No. 0
I0831 12:09:55.647987 2441916 talker.cc:43] [talker]talker sent a message! No. 1
I0831 12:09:56.647899 2441916 talker.cc:43] [talker]talker sent a message! No. 2
I0831 12:09:57.647931 2441916 talker.cc:43] [talker]talker sent a message! No. 3
I0831 12:09:58.647884 2441916 talker.cc:43] [talker]talker sent a message! No. 4
I0831 12:09:59.647895 2441916 talker.cc:43] [talker]talker sent a message! No. 5
I0831 12:10:00.647893 2441916 talker.cc:43] [talker]talker sent a message! No. 6
I0831 12:10:01.647935 2441916 talker.cc:43] [talker]talker sent a message! No. 7
I0831 12:10:02.647927 2441916 talker.cc:43] [talker]talker sent a message! No. 8
I0831 12:10:03.647889 2441916 talker.cc:43] [talker]talker sent a message! No. 9
I0831 12:10:04.647902 2441916 talker.cc:43] [talker]talker sent a message! No. 10
I0831 12:10:05.647905 2441916 talker.cc:43] [talker]talker sent a message! No. 11
Listener
I0831 12:09:55.648399 2441894 listener.cc:23] [listener]Received message seq-> 1
I0831 12:09:55.648418 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
I0831 12:09:56.647986 2441894 listener.cc:23] [listener]Received message seq-> 2
I0831 12:09:56.648017 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
I0831 12:09:57.648029 2441894 listener.cc:23] [listener]Received message seq-> 3
I0831 12:09:57.648074 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
I0831 12:09:58.648011 2441894 listener.cc:23] [listener]Received message seq-> 4
I0831 12:09:58.648043 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
I0831 12:09:59.648080 2441894 listener.cc:23] [listener]Received message seq-> 5
I0831 12:09:59.648118 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
I0831 12:10:00.647995 2441894 listener.cc:23] [listener]Received message seq-> 6
I0831 12:10:00.648036 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
I0831 12:10:01.648051 2441894 listener.cc:23] [listener]Received message seq-> 7
I0831 12:10:01.648074 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
I0831 12:10:02.648077 2441894 listener.cc:23] [listener]Received message seq-> 8
I0831 12:10:02.648123 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
I0831 12:10:03.647994 2441894 listener.cc:23] [listener]Received message seq-> 9
I0831 12:10:03.648032 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
出现以上⽇志即代表 demo 运⾏成功,两个模块可以进⾏实时通信。看起来很简单的功能,其实 cyber 内部的流程相当复杂。流程后⾯会详述。
源码
-
Talker: https://github.com/ApolloAuto/apollo/blob/master/cyber/examples/talker.cc
-
Listener: https://github.com/ApolloAuto/apollo/blob/master/cyber/examples/listener.cc
现在你的视⻆可能是这个样⼦,只是⼀个宏观
2 Concept
cyber 中有许多的概念,要想弄懂 框架,就要先弄懂这些概念,下⾯列举⼀部分常⽤的基本概念
-
Component
-
基于 cyber 开发的模块,⽐如感知、预测、规划、控制、监控均以 Component 的形式存在, Component 之间使⽤ Channel 或者 Service 进⾏通信
-
两个基于框架开发组件的⼩例子
-
Component
-
TimerComponent
-
-
-
Channel
-
Component 之间进⾏发布/订阅之间的通道,类似于 kafka 中的 topic 的概念
-
Writer
-
负责向 Channel ⾥⾯发布数据
-
-
Reader
-
负责从 Channel ⾥⾯订阅数据
-
-
-
-
Service
-
Component 之间进⾏双向通信的载体,就是传统的 C/S 模式, 类似于传统 RPC 中的概念
-
Server
-
创建 Service 的地⽅即为 Server, 负责处理 Client 的请求
-
-
Client
-
⽤户对服务发出请求,并得到响应
-
-
-
-
Node
-
是 Writer/Reader 和 Server/Client 的容器,在 Componet 中利⽤ Node 来实现不同的通信形式
-
视⻆变化
-
-
-
Topology
-
在⼀个由众多模块组成系统中,每个模块中的服务发现模块都会保存当前系统其他模块的信息,⽤有向图来表示。Writer、Reader、Server、Client 都是有向图中的顶点,Channel 是 Writer 指向 Reader 的边,Service是 Server 指向 Client 的边。因此我们称 Writer 和 Server为上游,Reader 和 Client 为下游。
-
⼀个随意的假设系统⽰意图如下
3 Flow Chart
⼀般性来讲 ,⼀个框架内部会有若⼲个流程 ,⾄少会有两个流程 ,⼀个初始化 ,⼀个数据流。 对于cyber 来讲 , 内部也会有相关流程
-
初始化流程
-
Writer/Reader 模型的发布订阅流程
-
Client/Server 模型的交互流程
-
调度流程
-
服务发现流程
4 Module
框架 内部就是三个⼤模块 ,分别为
-
通信
-
服务发现
-
调度
三个⼤模块依赖或多或少依赖其他⼩模块来做的。
4.1 Transport
-
代码⽬录
-
cyber/transport
-
-
配置协议
-
cyber/proto/transport_conf.proto
-
-
模块作⽤
-
消息通过不同⽅式进⾏发送与接收
-
Intra
-
同进程内部通信
-
-
SHM
-
同机器不同进程通过共享内存的⽅式通信
-
-
Rtps
-
不同机器不同进程通过 DDS ⽅式通信
-
-
Hybrid
-
⾃适应 Intra、SHM、 Rtps 功能进⾏通信
-
-
-
-
模块流程
-
提交消息
-
接收消息
-
4.1.1 Share Memory
框架内部的共享内存实现的基本思路是将消息的本⾝存在可读写的 Segment 中,然后将消息的 Meta 通知到监听器中。
参考 Linux 共享内存描述 [2]
System V 共享内存是较旧的共享内存 API。POSIX 共享内存提供了⼀个更简单、更好地可设
计的接⼝。
另⼀⽅⾯,相⽐于 System V 共享内存⽅式,POSIX 共享存储器不太⼴泛(尤其是在较旧的系
统上)
4.1.1.1 Segment
-
XsiSegment
-
System V shared memory
-
ShmXXX
-
不可写回⽂件系统
-
-
-
PosixSegment
-
POSIX shared memory
-
mmap
-
可写回⽂件系统
-
-
一个共享内存的例子[3]
每个 Channel 都独占一个 Segment,每个 Segment 在内存中的示意图如下 (v8.0)
v10.0排布发生了变化
State --> Block --> ArenaBlock --> Buffer_x --> ArenaBuffer_x
新增一个ArenaSegment,结构图如下
4.1.1.1.1 State
状态类中就有四个原子变量以及相应的 Get、Set、Add 操作。
-
need_remap_: 标记当前的 Segment 是否要重新初始化
-
当消息的长度大于 Block 中 buf 的长度时候,设置这个标记为 true
-
-
seq_: 当前的 block 索引,也可以理解为RingBuffer的索引,只会⾃增,外层会做取余操作
-
reference_count_:当前 Segment 的引用计数,为0时会真正析构
-
ceiling_msg_size_:消息长度上限
4.1.1.1.2 Block
Block 内部很简单,就三个成员变量
-
lock_num_:原子变量,用于带次数的读自旋,写入互斥,简易的读写锁
-
msg_size_: 消息的长度
-
msg_info_size_: 消息 Meta 的长度
内部实际的消息 buffer地址是在 Segment 类中持有
4.1.1.1.3 Common
无论是基于哪种方式实现的共享内存,实际的 buffer 的获取去读写的流程是一样的。
获取一个可写 buffer 逻辑
-
判断当前参数合法性(WritableBlock)
-
判断当前 Segment 是否初始化,如果没有初始化需要去初始化(OpenOrCreate)
-
根据消息的长度判断是否需要重新初始化
-
不断的找到下一个可以写的块,找到为止
-
原子索引State::seq(uint32_t)不断增大,并与块的数量取余,实现了一个 RingBuffer
-
-
填充 WritableBlock,并返回成功
获取一个可读 buffer 逻辑
-
判断当前参数合法性(ReadableBlock)
-
判断当前 Segment 是否初始化,如果没有初始化需要去初始化(OpenOnly)
-
判断当前可读块的索引是否合法
-
根据 state 的标志位是否需要重新初始化
-
根据对应的可读索引获取相关的读锁
-
获取相关的 buffer
4.1.1.2 Notifier
被共享的数据在写到 Segment 中后,应该有某种方式去通知监听者,框架内部实现了两种方式
4.1.1.2.1 ConditionNotifier
通过 System V 共享内存的方式,每个机器上开辟一块内存。
读者通过不断检查内存中数据是否变化,其内部数据结构也一个固定大小(4096)的 RingBuffer,buffer 中的每个元素是可读信息的 Meta(ReadableBlock)。
4.1.1.2.2 MulticastNotifier
无非是利用 UDP 的方式进行组播通知,不再赘述。
-
通知方发送消息到指定的 fd
-
sendto
-
-
通过监听指定的 fd 是否读就绪,就绪后再读取消息
-
poll
-
recvfrom
-
4.1.2 Transmitter
通信模块利用数据提交器(Transmitter)来发送数据
注意:Transmitter 与 Channel 是一一对应的关系,即一个 Transmitter 只能对一个 Channel 进行提交数据
4.1.2.1 IntraTransmitter
同进程内部通信相比较跨进程方式非常简单,省去的中间的媒介,直接调用 IntraDispatcher 的回调就可以。但是使用场景也受限制,因为很少有同进程的 Writer/Reader 的需求,一般都是进程级别的通信。
4.1.2.2 ShmTransmitter
共享内存方式提交数据流程 (普通方式)
-
检查共享内存提交器是否被启用
-
根据待提交消息的长度去申请相应大小的可写块
-
将待提交消息和 Meta 序列化到可写块中
-
根据可写块、Channel、Host 创建一个可读信息(ReadableBlock)
-
将可读信息通知给监听器
零拷贝提交数据流程
-
调用Writer申请一个Message指针
-
调用链
-
Writer --> HybirdTransmitter --> ShmTransmitter --> ProtobufArenaManager::AcquireArenaMessage
-
-
申请逻辑
-
对ArenaBuffer上写锁
-
基于ArenaBuffer创建Arena
-
基于Arena创建一个自定义析构的Message指针
-
析构中判断解写锁
-
-
注意: ArenaBuffer并没有解写锁
-
-
-
用户填充Message
-
调用Writer的Write接口将填充好的Message写出去
-
申请一个ArenaBlock
-
创建一个MessageWrapper
-
把消息序列化到MessageWrapper中,并取得新消息的指针
-
将MessageWrapper中的数据拷贝(memcpy)到ArenaBuffer (Segment)中, 1K
-
ExtendedStruct数据
-
addr_offset
-
channel_id
-
-
-
序列化MessageInfo到ArenaBuffer末尾
-
根据可写块、Channel、Host 创建一个可读信息(ReadableBlock)
-
将可读信息通知给监听器
-
4.1.2.3 RtpsTransmitter
流程
-
检查 Rtps 提交器是否被启用
-
序列化消息成字符串到 UnderlayMessage 信息里面去
-
注意: Rtps 方式传递的消息是 UnderlayMessage 结构
-
-
根据消息 Meta 来构建 rtps 的写参数
-
调用 rtps 的 publisher 写消息
-
eprosima::fastrtps 库实现 Write 函数天然支持通知,所以无需使用者关心
-
4.1.2.4 HybridTransmitter
HybridTransmitter 可以理解为根据拓扑变化动态开启不同方式通信的 Transmitter。
内部在创建&&初始化的时候,初始化了三种通信方式的实例,只不过没有 enable。
目前框架中的 Writer 默认创建的是 HybridTransmitter,在 Writer 加入拓扑前,在 ChannelManager 注册了一个回调,当有 Channel 级别的拓扑变化消息来临时,会执行这个回调,回调内部会根据拓扑信息(是否同 Channel,是否同机器,是否同进程)进行判断,如果属于当前 Writer 的 Channel,会动态的调用对应 Transmitter 的 enable 以此来做真正的开启对应 Transmitter 的功能。
这也就是为什么 talker/listener 模型中 listener 即使比 talker 先启动却还是收不到第一个(序号为 0)包的原因,那就是 talker 内部的 Writer 写第一个消息时拓扑中还没有 listener 的 Reader,所以 Writer 在写消息前应该判断一下是否有对应的 Reader 在读。当然,即使没有提前判断,实际上也没有真正的写入消息,只不过从语义上更加完整而已。
issue链接: https://github.com/ApolloAuto/apollo/issues/14989
4.1.3 Receiver
通信模块利用数据接收器(Receiver)来接收数据,内部的Dispatcher暂时忽略即可。
4.1.3.1 IntraReceiver
同进程内部的 Receiver 无需启动线程进行监听,只需要初始化好处理不同 Channel 的 handler 即可。IntraTransmitter 写入即触发。
Why: 只不过 IntraDispatcher 内部有封装了一层名为 ChannelChain 的结构来做实际存储回调&&通知的地方,不同于一般的映射,它多了一层 message_type 的判断
4.1.3.2 ShmReceiver
共享内存监听方启动了一个单独的线程,这个线程还可以单独设置与 cpu 亲和性配置
流程
-
notifier 监听到有可读信息
-
判断当前可读信息是否合法
-
是否是本机器信息
-
当前 channel 是否有可读段
-
-
从当前 channel 的可读段中申请可读块
-
执行回调(非用户)
-
从可读块中解析成 pb
-
将消息写到指定的 ChannelBuffer 里面并通知对应协程工作
-
v10.0版本注意读锁解锁位置,存疑?
-
-
从当前 channel 的可读段中释放可读块
4.1.3.3 RtpsReceiver
Rtps 接收器借助 eprosima::fastrtps::Subscriber 实现非常简单,创建 subscriber 的时候注册一个 listener 即可,消息监听机制在 rtps 内部就已经做好了。
4.1.3.4 HybridReceiver
对应 HybridTransmitter,HybridReceiver 也会根据拓扑变化来动态监听来自不同通信方式的消息。
框架内部的 Reader 默认创建的是 HybridReceiver,内部包含 Intra、Shm、Rtps receiver。Reader 初始化时也在 ChannelManager 中注册了一个回调,会根据拓扑变化来动态的开启和关闭对应的 Receiver。
4.1.4 Summary
普通共享内存的过程v8.0
零拷贝共享内存的过程v10.0
学习完通信模块,现在的视角应该是这个样子。
4.2 Service Discovery
在传统互联网大型后端微服务架构中,服务与服务之间经常以 RPC(Remote Procedure Call)形式进行调用,那么就要知道对方的服务在哪里,有多少,是否健康,服务发现需求就应运而生。以 BNS(Baidu Naming Service)为例,Client 只需要知道下游服务的名字(可以理解为一个字符串,实际上是一个遵守某种规则的字符串),就可以通过 Baidu-RPC 进行访问,非常简单。BNS 负责提供服务注册、服务获取和健康检查的功能,Baidu-RPC 基于 BNS 给的 IP 列表进行做负载均衡。BNS 就可以理解为一种服务发现的实现,只不过是中心化的,所有的服务都必须要注册才可以。
而 cyber 框架不同于互联网大型后端架构,是运行在车载 os(终端)上,所以就不可能引入一个中心化的组件来做这件事,事实上也不需要,因为在算力有限制的情况下,服务数量也是有限制的,因此需要借助广播等形式进行去中心化的服务发现机制,实际上 DDS 协议非常适合此场景,框架层也采用 eProsima 实现的 Fast-DDS 来实现服务发现功能。
框架层的服务发现实现位于 cyber/service_discovery 目录中,可以看到,一个节点(进程)有一个完整的拓扑管理单例 TopologyManager,内部由 NodeManager、ChannelManager 和 ServiceManager 三部分组成。
4.2.1 Partcipant
一个 TopologyManager 里面还包含一个 DDS 中 Participant 的实例,名字是由 hostname+pid 格式组成。
Participant 在被创建时注册了一个回调,用来监听实体离开拓扑的消息并通知所有的 Manager。
三个子 Manager 在初始化时各自基于共同的 Partcipant 上创建不同 Topic 的发布者和订阅者。
4.2.2 Container
每一个 Manager 都需要特殊的数据结构来存储拓扑信息,下面逐一介绍
4.2.2.1 SingleValueWarehouse
std::unordered_map的一个 Wrapper,并提供了相应的 Add、Remove、Search、GetAllRoles 接口。
4.2.2.2 MultiValueWarehouse
std::unordered_multimap的一个 Wrapper,并提供了相应的 Add、Remove、Search、GetAllRoles 接口。
4.2.2.3 Graph
Graph 用来存储有向图, 和课本上的邻接矩阵和邻接表不同,一般学习的时候,顶点的编号一般是从 0 到 n-1,所以可以采用一个二维向量或者数组+链表描述。实际上使用的时候,编号不可能这么整齐或者都是数字。框架中使用 NodeName 作为顶点的名字,使用两层无序映射容器来实现顶点之间的对应关系。
std::unordered_map<std::string, std::unordered_map<std::string, Vertice>>
描述为第⼀层 key 是有向边的尾,第⼆层 value 为有向边的头,边(第⼆层key)⼀般指的是
Channel(channel_name+dst.name)
事实上这个已经⾜以描述⼀个 有向图了
另外实现了一个数据结构来描述边的集合,这个比较特殊,理论上一个有向边只能有两个顶点,可是在框架里面实际上一个 Channel 可以有多个读者和写者,所以描述时采用同 Channel 的边放在一个集合里面去。在图中做插入和删除时是以边为单位的。只有一个完整的边才会真正的放在顶点与顶点之间的关系中去,也就是说,边存在中间状态和完全状态。
struct RelatedVertices {
std::unordered_map<std::string, Vertice> src; // key: node_name
std::unordered_map<std::string, Vertice> dst;
};
std::unordered_map<std::string, RelatedVertices>; // key: channel_name
4.2.3 NodeManager
通过 NodeManager 可以找到这个拓扑中的所有的 Node,所以 NodeManager 只能管理 node。
其在 Participant 上注册的 Topic 为 node_change_broadcast。
内部需要借助 SingleValueWarehouse 来保存拓扑信息,每一个 node_id 对应一个 Node Meta
值得注意的是,如果拓扑中有两个名字相同的 Node,那么后创建的 Node 所在的进程(通常是新启动的进程)会退出。
通过调试Monitor知道,如果让被监控组件暴力退出(意味着没有析构的机会),那么NodeManager将收不到对方离开拓扑的消息,只能依赖Partcipant自动探活机制来更新拓扑。
4.2.4 ServiceManager
通过 ServiceManager 可以找到所有的 Server 和 Client。
其在 Participant 上注册 Topic 为 service_change_broadcast。
和传统不同,框架中的创建 Service 的地方就被称为 Server,有且只有一个,而 Client 和传统一样,可以有多个。所以 ServiceManager 采用 SingleValueWarehouse 来存储 Server,用 MultiValueWarehouse 来存储 Client,key 统一为 ServiceID,从结构储存上也可以看出来一个 ServiceID 对应一个 Server,一个 ServiceID 对应多个 Client。
4.2.5 ChannelManager
通过 ChannelManager 可以找到所有的 Writer 和 Reader。
其在 Participant 注册 Topic 为 channel_change_broadcast。
ChannelManager 提供了三种维度的数据存储和查询
-
利用 Graph 来存储 Node 之间的关系
-
利用 MultiValueWarehouse 来存储以下四种数据
-
channel_writer
-
Key: Channel ID
-
-
channel_reader
-
Key: Channel ID
-
-
node_writer
-
Key: Node ID
-
-
node_reader
-
Key: Node ID
-
-
通过以上的数据结构可以实现查询 Node 与 Node 之间的关系,可以查询对应的 Key(node_id or channel_id)上有没有 Writer 或者 Reader。
一个使用场景,我们在创建一个 Writer 并准备些数据前,可以通过 ChannelManager 实例来判断一下当前的 channel 是否有 reader,如果没有,我们可以先不写数据。
4.2.6 Summary
现在的视角可能是这样
4.3 Scheduler
所谓的调度,一定是系统资源和运行任务的矛盾,如果系统资源足够多,那么就不需要调度了,也没有调度的必要。调度的作用就是在资源有限的情况下,合理利用系统资源,使系统的效率最高。[6]
借用教科书上的一句话,进程是系统资源分配的单位,线程是 CPU 调度的单位。
那么在框架里,协程就是调度器调度的单位。
4.3.1 CRoutine
一些知名的协程库,比如腾讯的 libco[7],百度的 bthread[8](也被称为 M:N 线程库),还有 Go 语言原生自带的 Goroutine[9]等都是开放给用户在业务模块中使用。框架内部实现的协程库不仅仅在框架内部使用,通过 Reader、Component 等封装在给用户使用,也可以封装成Task提供给用户使用。
一点不同的是,框架内部的生命周期是跟随创建他的主体(Reader、Component 等)的,除非主体不存在,协程才会真正的从调度器中移除。
从它的默认栈大小也能看出来这个库是否是轻量级别的。
CRoutine 默认栈大小 2MB, bthread 默认(最小)栈大小是两个内存页大小(通常是 8KB), libco 默认的栈大小是 128KB,Goroutine 最小的栈大小是 2KB。当然,协程栈大小是可以动态增长的,通常也会有一个上限,这里不详细论述。
4.3.1.1 State
一共五种状态
-
READY
-
协程就绪
-
-
FINISHED
-
协程结束
-
-
SLEEP
-
休眠
-
目前 Sleep 状态框架内几乎不会用,业务层使用也会保证休眠时间控制
-
-
IO_WAIT
-
等待 IO
-
目前只有 io 模块在用(io 模块目前框架内没有启用,提供给业务层用)
-
-
DATA_WAIT
-
等待数据
-
READY 状态可以和 SLEEP、IO_WAIT、DATA_WAIT 在特定的条件下相互转化。在协程生命周期结束时(Reader 或 Component 析构)状态切换成 FINISHED 并退出。
4.3.1.2 Context
协程实例中持有协程上下文的指针,而栈空间是在上下文中保管的,框架内部使用汇编语言实现了协程栈在寄存器上的上下文交换来实现协程中断和恢复。
4.3.1.3 ContextPool
框架中也规定了要有协程上下文的上限,同时要节省创建&&销毁上下文的开销,所以采用一个池化的概念,借助 base::CCObjectPool(一个 lock-free 的单链表)来实现,上下文池大小由进程内的 Component 数量和配置中的 routine_num 的最大值决定。当然,真的到最大值后,框架也没有做硬限,而是单独的创建新的上下文来使用,直至等到池子里面有空闲的上下文。
一个 shared_ptr 带 Deleter 的用法:在上下文池创建实例时,自己塞了一个 Deleter 给了实例的 shared_ptr,这样再协程退出时能做到自动释放池中上下文
4.3.1.4 Atomic_flag
协程内部有两个 atomic_flag,都是利用 test_and_set 机制来实现相关功能。
lock_用来做协程的获取和释放的。
updated_用来做数据通知的,如果有数据来临,updated_会被 clear,如果当前协程正处于 DATA_WAIT 或者 IO_WAIT 状态,就把状态置为 READY。
4.3.1.5 Function
有两种,第一种是针对框架内部的使用,比如Reader
直接用只有一个消息的RoutineFactory举例子,多参数类似。意思就是不断的利用DataVisitor取数据,取到了执行用户的回调(参见4.5.2 Reader),执行完继续让出执行权,等待下一次调度。
template <typename M0, typename F>
RoutineFactory CreateRoutineFactory(F&& f, const std::shared_ptr<data::DataVisitor<M0>>& dv) {RoutineFactory factory;factory.SetDataVisitor(dv);factory.create_routine = = {return = {std::shared_ptr<M0> msg;for (;;) {CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);if (dv->TryFetch(msg)) {f(msg);CRoutine::Yield(RoutineState::READY);} else {CRoutine::Yield();}}};};return factory;
}
第二种是针对业务模块开放的协程池,不断的从BoundedQueue中取任务执行
auto func = this {while (!stop_) {std::function<void()> task;if (!task_queue_->Dequeue(&task)) {auto routine = croutine::CRoutine::GetCurrentRoutine();routine->HangUp();continue;}task();}
};
4.3.2 Processor
对于协程来说,Processor 相当于逻辑上的 CPU,每个 Processor 内部会有一个线程,这个线程会根据配置和指定的 CPU 设置亲和性,保证线程的局部调度,提升 CPU 缓存的命中率。
处理器内部的线程就干一件事,不断地从 ProcessContext 中获取下一个就绪的协程,获取后对协程进行恢复(Resume)。获取不到数据就绪协程时候,会调用上下文的 Wait 函数进行等待。
只不过根据 ProcessContext 内部的获取就绪协程策略逻辑不同,分为了 ClassicContext 和 ChoreographyContext。
真正创建和保存处理器的地方就是调度器。而调度器根据派发任务和通知机制又被分成两种策略,也正好对应了两种 Context。
4.3.3 SchedulerConf
调度器配置协议: cyber/proto/scheduler_conf.proto,因为调度器实现与配置强相关,且目前没有一份针对框架调度器配置详细的说明文档,业务层想要配置自己的调度策略难上加难,基本都用的默认的 SchedulerClassic 策略,参数还都是默认的,并不能发挥出框架的全部优势。
SchedulerConf 是在 cyber_conf.proto 中的第一个元素,我们只讨论他的自身。
针对字段逐一解释,可以对比着 cyber/conf/example_sched_classic.conf 或 cyber/conf/example_sched_choreography.conf 看,可以更加直观形象。
SchedulerConf
import "classic_conf.proto";
import "choreography_conf.proto";message InnerThread {optional string name = 1;optional string cpuset = 2;optional string policy = 3;optional uint32 prio = 4 [default = 1];
}message SchedulerConf {optional string policy = 1;optional uint32 routine_num = 2;optional uint32 default_proc_num = 3;optional string process_level_cpuset = 4;repeated InnerThread threads = 5;optional ClassicConf classic_conf = 6;optional ChoreographyConf choreography_conf = 7;
}
字段解释
-
Schedulerconf
-
policy
-
指定当前进程采用何种调度策略
-
取值范围
-
classic
-
choreography
-
-
-
routine_num
-
当前进程协程数量,4.3.1.3 中协程上下文池的大小参考值之一
-
-
default_proc_num
-
逻辑处理器的数量,可以理解为 4.3.2 中 Processor 的数量
-
-
process_level_cpuset
-
当前进程的主线程可以使用的 cpuset,并和这些 cpu 设置亲和性,关于亲和性,参考引用 10
-
格式
-
0-7,16-23 : 代表使用 0~7 号和 16~23 号 cpu
-
0,2,4,6,8: 代表使用第 0、2、4、6、8 号 cpu
-
-
疑问
-
调度器被初始化应该是在框架的 Init 函数中,Init 函数在 main 函数中调用,那为什么把主线程和这些 cpu 设置亲和性?主线程需要参与调度?
-
-
-
threads
-
框架内部线程的配置,比如 async_log、shm 等线程
-
name
-
内部线程的名字,比如上述的 async_log、shm
-
-
cpuset
-
内部线程可使用的 cpuset,格式参考 process_level_cpuset
-
-
policy
-
Linux 系统 POSIX 线程调度的策略,参考引用 11
-
取值范围, 含义参考引用 12
-
SCHED_FIFO
-
SCHED_RR
-
SCHED_OTHER
-
采取 Linux 系统默认时分调度,框架只会设置线程的优先级。参考引用 13
-
-
-
-
prio
-
Processes scheduled under one of the real-time policies (SCHED_FIFO, SCHED_RR) have a sched_priority value in the range 1(low) to 99 (high)
-
如果policy设置成SCHED_FIFO或SCHED_RR,那么此线程就是系统的实时线程。这就对应Linux 系统线程调度的优先级, sched_param 中的 sched_priority 参数,和 nice value 不同,这个数字越高优先级越高
-
取值范围⼀般是 1 到 99,参考引⽤ 12
-
-
-
ClassicConf
message ClassicTask {optional string name = 1;optional uint32 prio = 2 [default = 1];optional string group_name = 3;
}message SchedGroup {required string name = 1 [default = "default_grp"];optional uint32 processor_num = 2;optional string affinity = 3;optional string cpuset = 4;optional string processor_policy = 5;optional int32 processor_prio = 6 [default = 0];repeated ClassicTask tasks = 7;
}message ClassicConf {repeated SchedGroup groups = 1;
}
字段解释
-
name
-
调度策略组名字
-
-
processor_num
-
调度策略组的逻辑 CPU 数量(Processor 数量),同时也是 task_pool_size_的值
-
注意,这个没有默认值,需要填有意义的数字
-
-
affinity
-
亲和性的类型
-
取值
-
1to1
-
一个线程对应一个 cpu
-
-
range
-
一个线程对应一组 cpu
-
-
-
-
cpuset
-
与 InnerThread::cpuset 一致,不再赘述
-
-
processor_policy
-
与 InnerThread::policy 一致,不再赘述
-
-
processor_prio
-
与 InnerThread::prio 一致,不再赘述
-
-
ClassicTask
-
name
-
协程的名字
-
-
prio
-
协程优先级
-
数字越大,优先级越高
-
范围
-
0 ~ 19
-
-
-
group_name
-
调度策略组名字
-
-
ChoreographyConf
message ChoreographyTask {optional string name = 1;optional int32 processor = 2;optional uint32 prio = 3 [default = 1];
}message ChoreographyConf {optional uint32 choreography_processor_num = 1;optional string choreography_affinity = 2;optional string choreography_processor_policy = 3;optional int32 choreography_processor_prio = 4;optional string choreography_cpuset = 5;optional uint32 pool_processor_num = 6;optional string pool_affinity = 7;optional string pool_processor_policy = 8;optional int32 pool_processor_prio = 9;optional string pool_cpuset = 10;repeated ChoreographyTask tasks = 11;
}
字段解释
-
choreography_processor_num
-
可编排逻辑 CPU 数量
-
-
choreography_affinity
-
可编排 Processor 的亲和性
-
取值
-
range
-
1to1
-
-
-
choreography_processor_policy
-
可编排 Processor 的线程调度策略
-
与 InnerThread::policy 作用&&取值一致
-
-
-
choreography_processor_prio
-
可编排 Processor 的线程调度优先级
-
与 InnerThread::prio 作用&&取值一致
-
-
-
choreography_cpuset
-
可编排 Processor 的可使用 cpu 集合
-
与 InnerThread::cpuset 作用&&取值一致
-
-
-
pool_processor_num
-
任务池 ClassicProcessor 数量, 也是协程池的大小
-
-
pool_affinity
-
任务池 ClassicProcessor 亲和性
-
-
pool_processor_policy
-
任务池 ClassicProcessor 调度策略
-
-
pool_cpuset
-
任务池 ClassicProcessor 使用的 cpu 集合
-
-
ChoreographyTask
-
name
-
协程名字
-
-
processor
-
处理器 ID
-
-
prio
-
协程优先级
-
数字越大,优先级越高
-
-
4.3.4 Classic
Classic 调度模型有点类似于 Golang 语言中最早的 GMP 调度模型,多个逻辑 CPU 共同竞争一个全局队列,为了阻止队列的 race-condition,所以要加互斥锁,影响了一部分性能。
框架中的 Classic 调度模型由 ClassicContext、Processor、SchedulerClassic 三个类来实现。
4.3.4.1 ClassicContext
ClassicContext 是真正持有全局队列的地方,根据不同的 GroupName 划分成不同的全局队列,全局队列与全局队列之间相互独立,互不影响。ClassicContext 被创建时候就会指定相应的组名字,如果没有指定,会使用默认的全局队列。
一个 ClassicContext 对象持有一个全局队列的指针,一个 Processor 对应一个 ClassicContext,也就是说,多个 Processor 虽然对应多个 ClassicContext,但实际的队列只有一个。
获取下一个协程逻辑比较简单,每次都从最大优先级的队列开始遍历,找到一个状态就绪的协程进行返回。这个函数也是真正被多个 Processor 竞争的。这种调度方式从理论上讲是有可能造成低优先级的协程饿死。
4.3.4.2 SchedulerClassic
初始化逻辑
-
读指定的配置文件
-
配置文件名字一般是进程启动-p 参数指定的名字,后面加上conf 后缀,且在 conf 目录下的
-
-
把配置文件内容序列化成 pb,如果序列化成功
-
保存内部线程配置
-
保存 cpuset 信息,并设置和主线程的亲和性
-
保存自定义协程调度配置信息
-
-
检查配置协程组长度,如果没有配置则生成一个默认的
-
默认逻辑 CPU 数量为 2
-
默认协程组就一个(全局队列就一个)
-
-
创建每个协程组的的逻辑 CPU(Processor),对于每一个 Processor
-
创建 ClassicContext 并绑定到 Processor 上
-
根据配置设置 Processor 内部线程的亲和性
-
派发任务逻辑
-
获取指定协程的互斥锁
-
按照协程 ID 保存协程的映射
-
按照协程的配置来设置协程组和优先级信息,如果没有配置则进入默认协程组
-
按照协程组和优先级来将写成推入 ClassicContext 中的队列
-
通过 ClassicContext 通知指定协程组有新的就绪任务需要处理
通知 Processor 任务逻辑(被绑定到 DataVisitor 的回调中)
-
检查调度器是否停止(unlikely)
-
根据协程 ID 找到对应协程实例
-
如果协程处于 DATA_WAIT 或者 IO_WAIT 状态,就设置更新标志(4.3.1.4 中的 update_标志位)
-
通过 ClassicContext 通知指定协程组有新的就绪任务需要处理
下图为亲和性为 1to1 时 SchedulerClassic 简易模型图
4.3.5 Choreography
Choreography 调度模型有点类似于 Golang 语言中现在的 GMP 调度模型,通过本地队列+全局队列的方式来减少锁竞争。只不过没有实现 work-stealing 机制,事实上在终端上也用不上这个机制,因为协程数量有限制。
框架中的 Choreography 调度模型由 ChoreographyContext、Processor、SchedulerChoreography 三个类实现。
4.3.5.1 ChoreographyContext
ChoreographyContext 中的队列由 std::multimap 来实现,Key 为优先级,Value 是协程,并将 std::greater 作为 Compare 类,意味着遍历的时候会根据 Key 从大到小来进行,以此来实现从优先级高到优先级低的顺序进行调度。
一个 ChoreographyContext 里面一个队列,一个 Processor 绑定一个 ChoreographyContext,也就是说,Processor 不必与其他 Processor 去竞争,这就是所谓的本地队列。
获取下一个协程的逻辑就是遍历 std::multimap,找到第一个就绪的协程进行返回,理论上还是存在低优先级协程饿死问题。
4.3.5.2 SchedulerChoreography
初始化逻辑
-
读指定的配置文件
-
配置文件名字一般是进程启动-p 参数指定的名字,后面加上。conf 后缀,且在 conf 目录下的
-
-
把配置文件内容序列化成 pb,如果序列化成功
-
保存内部线程配置
-
保存 cpuset 信息,并设置和主线程的亲和性
-
获取 Choreography 相关配置
-
获取 Classic 相关配置
-
保存自定义协程调度配置信息
-
-
检查 Processor 数量,如果没有设置则设置成默认的数量,为 2
-
根据配置创建 Processor,分为两部分
-
Choreography
-
创建 ChoreographyContext,并绑定到 Processor 上
-
根据配置设置 Processor 内部线程与 cpu 的亲和性
-
-
Classic
-
创建 ClassicContext,并绑定到 Processor 上
-
根据配置设置 Processor 内部线程与 cpu 的亲和性
-
-
任务派发逻辑
-
获取指定协程的互斥锁
-
按照协程 ID 保存协程的映射
-
根据配置来设置相关协程信息
-
Processor
-
Priority
-
-
如果当前协程的 ProcessorID 小于 Choreography Processor 的数量,把协程入本地队列,否则,入全局队列
通知 Processor 任务逻辑(被绑定到 DataVisitor 的回调中)
-
检查调度器是否停止(unlikely)
-
根据协程 ID 找到对应协程实例
-
如果协程处于 DATA_WAIT 或者 IO_WAIT 状态,就设置更新标志(4.3.1.4 中的 update_标志位)
-
如果当前协程是本地队列的任务,则通过指定 Processor 的 ChoreographyContext 去通知,否则通过 ClassicContext 通知指定协程组有新的就绪任务需要处理。
4.3.6 CRoutine Pool
协程对于用户是透明的,但是用户还是想用怎么办?
框架提供了协程池,协程池的大小取决于4.3.3中ChoreographyConf::pool_processor_num。
4.3.6.1 TaskManager
内部持有一个大小为1000的BoundedQueue用于做生产者消费者模型的队列。
TaskManager在初始化时,会通过无DataVisitor的RoutineFactory来创建协程池中的若干协程。
每个协程一旦从队列中取数据,就可以执行相关的任务。参见4.3.1.5中第二种方式。
4.3.6.2 Task
入口代码: cyber/task/task.h的Async函数。
这个函数判断是否是现实模式,是的话向TaskManager推任务,反之则用std::async进行模拟。
在这个文件中,还封装了协程或者线程的Yield、Sleep等操作,算是可以间接的操作协程。
4.3.7 Summary
4.4 Data
数据模块主要是针对框架内部的数据流转做一个派发、存储、通知等机制。
我们按照存储、通知、派发的顺序进行介绍。
4.4.1 CacheBuffer
就是一个线程安全的RingBuffer,用来真正存储消息指针的地方。
内部buffer由std::vector实现,由两个uint64的头尾指针进行控制。
4.4.2 ChannelBuffer
ChannelBuffer就是CacheBuffer的一个Wrapper,内部额外存储一个uint64值当做channel_id。
这也就意味着一个ChannelBuffer只能处理一个channel的数据。
4.4.3 DataNotifier
DataNotifier是一个单例类,内部持有channel_id对应一系列callback的映射。
结构如图所示
按照channel_id的维度进行通知,理论上可能会执行多个回调。
4.4.4 DataDispatcher
DataDispatcher是只有一个泛型参数的单例类,内部管理若干个ChannelBuffer的映射。
另外有一个存有DataNotifier的单例指针来做数据通知。
对外提供两个对外接口
-
AddBuffer
-
将ChannelBuffer和channel_id做映射
-
-
Dispatch
-
将对应channel的数据写入到若干个ChannelBuffer里面去,并通知对应的协程干活(更新协程update_状态,参见4.4.5)
-
4.4.5 DataVisitor
DataVisitor是一个1~4个泛型参数的模板类,泛型的数量决定内部持有的ChannelBuffer的数量。
拿最常用的1个泛型的数据访问器举例。
内部持有一个ChannelBuffer来存储对应channel的数据,这个buffer交由DataDispatcher来管理。
用一个Notifier来做数据通知,这个Notifier交由DataNotifier管理。
对外就提供一个TryFetch函数供获取数据,这个函数是在协程中执行。
在创建协程时,DataVisitor被传入了一个回调放在了Notifier里面。
if (visitor != nullptr) {visitor->RegisterNotifyCallback([this, task_id]{if (cyber_unlikely(stop_.load())) {return;}// NotifyProcessor里面实际更改了协程的update_标志,剩下的交由调度器进行调度this->NotifyProcessor(task_id);});
}
4.4.6 Summary
4.5 Node
Node是cyber的基础构件,每个模块都有Node且都通过Node进行通信。
一个模块可以有不同的通信类型通过Node定义reader/writer 或者 server/client。
也可以说,Node相当于Writer、Reader、Client和Server的容器。
内部含有两个实现类
-
NodeChannelImpl
-
用于创建Writer和Reader,在实例化时会调用NodeManager的Join函数来加入到当前拓扑中。
-
-
NodeServiceImpl
-
用于创建Service和Client
-
4.5.1 Writer
真正提供给用户使用的类,创建Writer后,用户可以通过Write函数来向Channel里面写数据。
Writer初始化流程
-
判断是否初始化,初始化过直接返回
-
创建Transmitter
-
在ChannelManager中注册回调函数,以此来监听拓扑变化
-
通过ChannelManager获取Channel上所有Reader
-
对Channel上所有Reader开启Transmitter的写入功能
-
调用ChannelManager的Join函数以Writer的身份加入当前拓扑中
拓扑监听回调流程
-
判断当前ChangeMsg的role是否是Reader,不是则过滤
-
判断当前Reader的Channel是否和当前Channel相等,不相等则过滤
-
根据Reader的操作(加入 or 离开)来打开或者关闭transmitter的功能。
另外,Writer通过封装ChannelManager提供GetReaders函数来获取对应的Reader集合,不必非要直接调用ChannelManager
4.5.2 Reader
Writer写入的数据,必须由Reader来读。同样的,Reader也是提供给用户使用的类。
Reader有两种使用方式,一种是读缓存,另一种是在创建时传入回调,在有数据时自动执行。
方式一: 回调
auto reader = node->CreateReader<Chatter>(channel_name, [&](const std::shared_ptr<Chatter>& msg) {// your logic
});
方式二:
auto reader = node->CreateReader<Chatter>(channel_name);// 1.获取最新的数据,单条
auto message = reader->GetLatestObserved();// 2.获取最新的数据,多条
reader->SetHistoryDepth(100);
reader->Observe();for (auto it = reader->Begin(); it != reader->End(); ++it) {// your logic for *it}
回调的方式很好理解,数据来的时候顺便执行一下。
同步读的方式,其实是读Reader内部的缓存,缓存由Blocker实现。
Reader的初始化流程
-
判断是否初始化过
-
对用户传的回调函数进行封装,如果用户没有传回调,则使用默认调用缓存入队函数
-
最后的回调落入协程之中执行,参见4.3.1.5代码 f(msg)
-
-
创建DataVisitor
-
根据封装好的回调函数和DataVisitor创建RoutineFactory
-
用scheduler实例来创建新的协程任务
-
通过ReceiverManager来创建receiver
-
获取ChannelManager实例并注册拓扑变化监听函数
-
通过ChannelManager实例获取Channel上的Writer,并打开receiver的对应功能
-
调用ChannelManager的Join函数将当前的Reader加入到全局拓扑中
Reader的拓扑监听函数逻辑
-
判断当前ChangeMsg的role是否是Writer,不是则过滤
-
判断当前Writer的Channel是否和当前Channel相等,不相等则过滤
-
根据Writer的操作(加入 or 离开)来打开或者关闭receiver的功能
4.5.2.1 Blocker
代码位于cyber/blocker/blocker.h
内部含有两个用std::list实现的容量有限制的队列
-
published_msg_queue_
-
用于记录已经发布的数据,理论上应该比CacheBuffer的数据落后一点点
-
-
observed_msg_queue_
-
用于Reader的临时观察,使用时会对published_msg_queue_进行浅拷贝
-
Reader的SetHistoryDepth函数就是用来设置这两个队列的长度的
4.5.2.2 ReceiverManager
ReceiverManager代码位于cyber/node/reader_base.h中,我们只关心的其中的回调函数的逻辑。
[](const std::shared_ptr<MessageT>& msg,const transport::MessageInfo& msg_info,const proto::RoleAttributes& reader_attr) {(void)msg_info;(void)reader_attr;PerfEventCache::Instance()->AddTransportEvent(TransPerf::DISPATCH, reader_attr.channel_id(),msg_info.seq_num());data::DataDispatcher<MessageT>::Instance()->Dispatch(reader_attr.channel_id(), msg);PerfEventCache::Instance()->AddTransportEvent(TransPerf::NOTIFY, reader_attr.channel_id(),msg_info.seq_num());
}
只有一行代码有用,就是调用了DataDispatcher的Dispatch函数(作用参见4.4.4)
这个回调应该是框架中最复杂的回调,复杂的原因并不是内部逻辑,而是他经过了层层的Wrapper,中间还有一次动态的注册。Transport::CreateReceiver默认创建的HybridReceiver,所以这个回调先是散落在三种Receiver中,在根据拓扑变化动态注册到对应的XXXDispatcher中。
注意: Transport中的XXXDispatcher和data::DataDispatcher不要搞混
4.5.3 Client
Client内部实际上用两个Channel和C++异步编程(Promise&&shared_future)实现的。
内部通过4.1.2.3描述的RtpsTransmitter写请求的Channel,并通过future进行超时等待。
通过4.1.3.3描述的RtpsReceiver来接收响应的Channel,在回调处set_value即可。
Client初始化函数就是创建RtpsTransmitter和RtpsReceiver。
通过一个std::unordered_map存储pending request。
4.5.4 Service
同Client一样,内部还是有两个Channel和RtpsTransmitter&&RtpsReceiver。
Service在内部使用std::list作为队列来实现生产者消费者模型。
RtpsReceiver在被创建时塞入的回调是将带有处理客户端请求函数的lambda表达式入队列,
Service有启动时有单独的线程消费这个队列,处理请求(执行用户回调)后通过RtpsTransmitter写入到响应Channel里面。
4.5.5 Summary
那么现在的视角应该是这个样子
5 Others
除去主要模块 ,框架层还包含其他辅助模块来实现不同的功能 ,同⼤模块⼀样重要。
5.1 ClassLoaderModule
在cyber中 ,基于框架开发的应⽤模块全是以动态链接库的形式存在 ,所以就需要有辅助类将这些动 态链接库中的类加载出来并能够根据类的名字创建相应的对象 ,这些类都是Component或者
TimerComponent的派⽣类。 从调⽤栈顺序上⼀⼀介绍
5.1.1 ClassLoaderManager
内部保存着动态链接库路径和对应加载后ClassLoader的实例的映射(std::map) ,对外提供 LoadLibrary函数 ,函数内部简单判重 ,不存在即创建后返回。
根据类名字创建类对象
-
获取当前所有的ClassLoader实例
-
遍历所有的ClassLoader ,如果ClassLoader⾥有这个类
-
调⽤ClassLoader的创建Object接⼝并返回创建好对象的shared_ptr
5.1.2 ClassLoader
ClassLoader类内部并没有持有动态链接库的句柄 ,⽽是将句柄放到了Utility的static变量中。 内部只保存当前动态链接库的路径和对应的引⽤计数(加载库 和类对象)。
各种判断类函数也是依靠 Utility中的函数 ,⽐如ClassLoader构造函数内调⽤ utility::LoadLibrary来加 载指定路径下的动态链接库,可以说ClassLoader就是utility中函数的OOP的Wrapper。
判断类是否在这个ClassLoader内
-
调⽤ utility::GetValidClassNames获取这个ClassLoader所有的类名字
-
通过std::find来寻找指定的类名字 根据名字创建指定类对象
-
判断当前路径的库是否被加载 ,如果没有 ,⽴刻加载
-
通过utility::CreateClassObj来创建类的对象
-
创建成功对象的引⽤计数+1
-
创建shared_ptr管理创建成功的对象 ,并传⼊Deleter在指针析构时⾃动引⽤计数-1和删除对象指 针
-
返回这个shared_ptr
5.1.3 SharedLibrary
SharedLibrary是动态链接库的实体 ,⼀个SharedLibrary对应⼀个动态链接库 ,在构造时通过dlopen 打开动态链接库并保存相应的handle, 通过dlsym函数和handle可知道当前库是否有对应的符号。
构造时可能会抛异常 ,需要外层函数进⾏捕捉。
5.1.4 Utility
这⾥⾯是若⼲个C风格函数 ,有⼏个全局的数据结构来保存当前状态。
-
ClassClassFactoryMap
-
std::map<std::string, std::map<std::string, AbstractClassFactoryBase*>>
-
第⼀层key: 基类名字
-
第⼆层key: 派⽣类名字
-
-
-
LibPathSharedLibVector
-
std::vector<std::pair<std::string, std::shared_ptr<SharedLibrary>>>
-
-
ClassFactoryVector
-
std::vector<AbstractClassFactoryBase*>
-
主要介绍⼏个主要的函数
加载动态链接库(LoadLibrary)
-
判断这个动态链接库是否被加载过 ,如果被加载过 ,则将当前的ClassLoader和所有有关系的 ClassFactory做关联
-
获取当前动态链接库所有的ClassFactory并且每⼀个ClassFactory做映射操作
-
加递归锁
-
设置当前活跃的ClassLoader
-
设置当前加载的库名字
-
创建SharedLibrary的shared_ptr
-
设置当前加载的库名字为空
-
设置当前活跃的ClassLoader为空
-
解锁
-
如果SharedLibrary对象为空 ,返回失败
-
将库名字和SharedLibrary对象的Pair添加到LibPathSharedLibVector⾥⾯去
根据类的名字创建对象
-
获取递归锁
-
根据基类名字获取派⽣类Map (ClassClassFactoryMap第⼆层)
-
在派⽣类Map中找到对应派⽣类的ClassFactory
-
释放递归锁
-
如果ClassFactory不为空且包含当前的ClassLoader 。 调⽤ClassFactory的CreateObj函数创建对象
-
-
返回创建好的对象
注册类函数
-
根据class_name和base_class_name创建utility::ClassFactory的对象cf
-
通过cf来设置ClassLoader和其关联的库名字
-
疑问:为什么获取到的ClassLoader和库名字不为空???
-
回答: RegisterClass函数可以理解为是在shared_library = SharedLibraryPtr(new SharedLibrary(library_path));这个语句后执⾏的 ,所以取到的数据不为空。深层原因是动态链 接库⼀旦被打开 ,优先执⾏宏命令!
-
-
获取ClassClassFactoryMap互斥锁并加锁
-
将cf按照映射关系放到ClassClassFactoryMap中
-
获取ClassClassFactoryMap互斥锁并解锁
5.2 Mainboard
这个模块会被编译成⼀个可执⾏的⼆进制⽂件mainboard ,所有基于cyber开发的模块 会被编译成动态链接库 ,通过mainboard进⾏启动。
5.2.1 ModuleArgument
就是把主函数的argc和argv翻译成std::string的过程 ,通过getopt_long来实现 ,总体过程就是常规的 解析 ,没什么意思。
选项 | 含义 | C++数据 | 备注 |
-d | dag⽂件 | std::vector<std::string> | -d 后⾯可以指定多个dag⽂件 |
-p | 进程名字 | std::string | 框架配置⽂件名字根据这个选项决定 |
-h | 帮助信息 | - | - |
5.2.2 ModuleController
负责创建所有的dag⽂件中的Component和TimerComponent对象并缓存起来。 加载过程
-
根据配置初始化log
-
将所有的dag⽂件路径全部转为绝对路径
-
对于每⼀个dag⽂件
-
将dag⽂件内的格式化信息转为proto buf
-
对于每⼀个动态链接库
-
将动态链接库的路径转为绝对路径并检查路径是否存在 ,不在则终⽌加载流程
-
利⽤ClassLoaderManager加动态链接库加载进来
-
-
对于每⼀个Component
-
利⽤ClassLoaderManager创建对象并初始化
-
缓存初始化好的对象
-
-
对于每⼀个TimerComponent
-
利⽤ClassLoaderManager创建对象并初始化
-
缓存初始化好的对象
-
-
5.2.3 Main
这⾥是main函数所在的地⽅ ,可以理解为框架的⼊⼝。
主要阐述⼀下流程
-
⽤ModuleArgument进⾏解析命令⾏参数
-
初始化框架
-
初始化ModuleController
-
调⽤框架层的WaitForShutdown不停地等待结束
-
清空ModuleController
5.3 Base
base模块都是封装好的基础数据结构 ,供其他模块进⾏使⽤。
5.3.1 Signal
框架中通信模块&&服务发现模块均使⽤Signal来存储&&触发回调。 ⼀共由三个类来实现
-
Signal
-
Connection
-
Slot
三个类均是接受变⻓泛型参数的模板类
回调定义: using Callback = std::function<void(Args...)> 即返回值为空的不定⻓参数的std::function
5.3.1.1 Slot
Slot内部就⼀个Callback和⼀个标志位。 标志位⽤来标志当前连接是否可⽤。
重载操作符()来实现执⾏回调。
5.3.1.2 Signal
内部⽤std::list来存储Slot的shared_ptr ,对外提供Connect和Disconnect函数来注册和删除Slot。 重载操作符()实现批量通知 ,需要学习的是 ,他是先浅拷贝信号集⾥⾯的所有Slot ,然后串⾏执⾏。
5.3.1.3 Connection
Signal注册回调成功会给⽤⼾返回⼀个Connection对象 ,⽤⼾可以通过这个对象进⾏断连。 构造时被传⼊Slot和Signal指针 ,通过Slot可以知道连接状态 ,通过Signal可以删除Slot。
5.3.2 BoundedQueue
有界队列是通过池化技术+原⼦头尾指针控制+不同的等待策略来实现的。
内部池⼦在初始化时已经申请好内存 ,包括头尾指针的内存 ,所以池⼦⼤⼩=指定⼤⼩+2( head and tail)。
⼊队时候利⽤原⼦变量(tail)的CAS机制来获取⼀个元素进⾏写⼊。 出队时也利⽤原⼦变量(head)的CAS机制来获取头部元素进⾏返回。
同样提供带等待的⼊队&&出队接⼝ ,是利⽤WaitStrategy实现 ,默认是SleepWaitStrategy。
CAS机制均⽤的weak版本 ,weak版本据官⽅介绍说有偶发性的的⽐较成功也会返回失败的情 况 ,但是性能⽐strong版本的⾼ ,具体参考引⽤14
5.4 Timer
如果有⼀个任务需要周期性来执⾏ ,那么就要使⽤定时器。
框架层中的TimerComponent通过把Proc注册到定时器中进⾏实现。
从设计角度上看 ,⼀个任务应该对应⼀个Timer对象 ,所以Timer对象中持有对应任务的指针⽤于插⼊ 和释放。
整体上定时器内部是实现了时间轮模型的变种 ,有着O(1)时间复杂度去维护多个定时器 ,感兴趣的同 学可以直接参考原始论⽂[15]。
定时器提供了三个配置参数
-
Period : 执⾏的间隔时间 ,单位是ms
-
Callback: 需要执⾏的回调
-
Oneshot
-
True: 执⾏⼀次
-
False: 周期性执⾏
-
启动函数
-
判断当前是否为真实模型 ,不是的直接返回
-
判断当前定时器是否启动过 ,如果没有启动过
-
初始化定时器任务 ,如果初始化成功 ,将当前定时器任务插⼊到时间轮中
-
停⽌函数
-
判断是否停⽌过&&定时器任务是否为空 ,如果是第⼀次停⽌ ,那么将任务重置为空
值得⼀提的是初始化周期性运⾏的定时器任务时 ,会把⽤户的任务(callback)封装⼀层 ,内部会根据预
期触发间隔&&实际触发间隔&&任务单次执⾏时间来计算出相关时间补偿参数 ,并调⽤时间轮的 AddTask接⼝将任务添加到下次执⾏的Bucket中。
5.4.1 TimerTask
TimerTask是⼀个全部成员都开放的结构体 ,内部包含⼀个函数包装器和若⼲个uint64_t变量。
• timer_id_: Timer的id ,由⼀个全局uint64_t原⼦变量累加形成。
• callback: ⽤户回调的Wrapper
• interval_ms: ⽤户指定的执⾏的时间间隔
• remainder_interval_ms:
• next_fire_duration_ms:
• accumulated_error_ns:
• last_execute_time_ns: 上次执⾏的开始时间
5.4.2 TimerBucket
是⼀个std::list ,每⼀个元素是TimerTask的weak_ptr ,并对外提供线程安全的AddTask接⼝。
5.4.3 TimingWheel
时间轮是⼀个单例,意味着整个进程就⼀份 ,和定时器是1:N的关系。
这个和以往的单时间轮+round模型不同 ,框架层实现了个⼆级的时间轮。 ⼀级时间轮为512个bucket ,⼆级时间轮有64个bucket ,精度为2ms。
时间轮单独启动⼀个线程 ,指针2ms从⼀级时间轮⾛⼀个bucket ,如果bucket不为空 ,会把其上⾯的
任务全部都推⼊框架层中的协程池(线程池) 中执⾏。 ⾛完⼀圈之后指针为0时⼆级指针⾛⼀个 bucket ,并把对应⼆级bucket的task添加到⼀级时间轮上。
-
时间精度:2ms一个tick,工作轮处理1024ms内的任务
-
溢出处理:超过1024ms的任务进入辅助轮,按1024ms/槽分级
-
级联机制:工作轮转完一圈(512 ticks)后,辅助轮推进一槽,并将该槽任务重新分配到工作轮
-
任务执行:每次tick处理当前工作轮槽内的所有任务,异步执行回调
References
-
https://github.com/ApolloAuto/apollo/blob/master/cyber/doxy-docs/source/CyberRT_FAQs.md
-
https://man7.org/linux/man-pages/man7/shm_overview.7.html
-
https://users.cs.cf.ac.uk/Dave.Marshall/C/node27.html
-
https://github.com/eProsima/Fast-DDS/blob/master/README.md
-
RTI Connext DDS
-
https://github.com/daohu527/dig-into-apollo/tree/main/cyber/source
-
https://github.com/Tencent/libco/blob/master/co_routine.h
-
https://github.com/apache/brpc/tree/master/src/bthread
-
https://github.com/golang/go
-
https://man7.org/linux/man-pages/man3/pthread_setaffinity_np.3.html
-
https://man7.org/linux/man-pages/man3/pthread_setschedparam.3.html
-
https://man7.org/linux/man-pages/man7/sched.7.html
-
https://linux.die.net/man/2/setpriority
-
https://zh.cppreference.com/w/cpp/atomic/atomic/compare_exchange
-
http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf