Robot Operating System——深度解析单线程执行器(SingleThreadedExecutor)执行逻辑

大纲

  • 创建SingleThreadedExecutor
  • 新增Node
    • add_node
    • trigger_entity_recollect
      • collect_entities
  • 自旋等待
    • get_next_executable
      • wait_for_work
      • get_next_ready_executable
        • Timer
        • Subscription
        • Service
        • Client
        • Waitable
        • AnyExecutable
    • execute_any_executable
  • 参考资料

在ROS2中,我们设置的各种回调都是在执行器(Executor)中执行的,所以它是整个系统非常核心的组件。
在这里插入图片描述
目前,rclcpp 提供了三种 Executor 类型,它们派生自同一个父类Executor。
在这里插入图片描述

本文我们将借用《Robot Operating System——单线程中启动多个Node》中的例子,将单线程执行器在最上层的使用方法总结如下

  // Create an executor that will be responsible for execution of callbacks for a set of nodes.// With this version, all callbacks will be called from within this thread (the main one).rclcpp::executors::SingleThreadedExecutor exec;rclcpp::NodeOptions options;// Add some nodes to the executor which provide work for the executor during its "spin" function.// An example of available work is executing a subscription callback, or a timer callback.auto server = std::make_shared<composition::Server>(options);exec.add_node(server);// spin will block until work comes in, execute work as it becomes available, and keep blocking.// It will only be interrupted by Ctrl-C.exec.spin();

即:

  1. 创建SingleThreadedExecutor ;
  2. 新增Node;
  3. 自旋等待。

创建SingleThreadedExecutor

SingleThreadedExecutor的构造函数基本就是交给基类rclcpp::Executor的构造函数来实现。

// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & options)
: rclcpp::Executor(options) {}

在Executor的构造函数中,我们着重关注成员变量collector_。

Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),context_(options.context),notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>([this]() {this->entities_need_rebuild_.store(true);})),entities_need_rebuild_(true),collector_(notify_waitable_),wait_set_({}, {}, {}, {}, {}, {}, options.context),current_notify_waitable_(notify_waitable_),impl_(std::make_unique<rclcpp::ExecutorImplementation>())
{shutdown_callback_handle_ = context_->add_on_shutdown_callback([weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {auto strong_gc = weak_gc.lock();if (strong_gc) {strong_gc->trigger();}});notify_waitable_->add_guard_condition(interrupt_guard_condition_);notify_waitable_->add_guard_condition(shutdown_guard_condition_);wait_set_.add_waitable(notify_waitable_);
}

collector_是一个集合,它保存了后续要执行的各个Node指针。

/// Collector used to associate executable entities from nodes and guard conditionsrclcpp::executors::ExecutorEntitiesCollector collector_;

新增Node

add_node

业务逻辑的Node会被添加到上面介绍的成员变量collector_中。

// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{this->collector_.add_node(node_ptr);try {this->trigger_entity_recollect(notify);} catch (const rclcpp::exceptions::RCLError & ex) {throw std::runtime_error(std::string("Failed to trigger guard condition on node add: ") + ex.what());}
}

然后会调用trigger_entity_recollect方法。

trigger_entity_recollect

这个方法会做两件事:

  1. 修改std::atomic_bool类型变量entities_need_rebuild_的值为true,进而让collect_entities()被执行。
  2. 如果notify为true,则会通过interrupt_guard_condition_->trigger()唤醒一个处于等待状态的执行器。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::trigger_entity_recollect(bool notify)
{this->entities_need_rebuild_.store(true);if (!spinning.load() && entities_need_rebuild_.exchange(false)) {std::lock_guard<std::mutex> guard(mutex_);this->collect_entities();}if (notify) {interrupt_guard_condition_->trigger();}
}

collect_entities

collect_entities主要做两件事:

  1. 过滤过期的Node以及相关回调函数。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::collect_entities()
{// Updating the entity collection and waitset expires any active resultthis->wait_result_.reset();// Get the current list of available waitables from the collector.rclcpp::executors::ExecutorEntitiesCollection collection;this->collector_.update_collections();auto callback_groups = this->collector_.get_all_callback_groups();rclcpp::executors::build_entities_collection(callback_groups, collection);// We must remove expired entities here, so that we don't continue to use older entities.// See https://github.com/ros2/rclcpp/issues/2180 for more information.current_collection_.remove_expired_entities();
  1. 更新current_collection_和wait_set_。
  // Update each of the groups of entities in the current collection, adding or removing// from the wait set as necessary.current_collection_.timers.update(collection.timers,[this](auto timer) {wait_set_.add_timer(timer);},[this](auto timer) {wait_set_.remove_timer(timer);});current_collection_.subscriptions.update(collection.subscriptions,[this](auto subscription) {wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);},[this](auto subscription) {wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);});current_collection_.clients.update(collection.clients,[this](auto client) {wait_set_.add_client(client);},[this](auto client) {wait_set_.remove_client(client);});current_collection_.services.update(collection.services,[this](auto service) {wait_set_.add_service(service);},[this](auto service) {wait_set_.remove_service(service);});current_collection_.guard_conditions.update(collection.guard_conditions,[this](auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},[this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});current_collection_.waitables.update(collection.waitables,[this](auto waitable) {wait_set_.add_waitable(waitable);},[this](auto waitable) {wait_set_.remove_waitable(waitable);});// In the case that an entity already has an expired weak pointer// before being removed from the waitset, additionally prune the waitset.this->wait_set_.prune_deleted_entities();
}

自旋等待

spin()内部核心是一个while循环。它会不停使用get_next_executable取出可以运行的Node的回调,然后让execute_any_executable将其执行。

// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
void
SingleThreadedExecutor::spin()
{if (spinning.exchange(true)) {throw std::runtime_error("spin() called while already spinning");}RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););// Clear any previous result and rebuild the waitsetthis->wait_result_.reset();this->entities_need_rebuild_ = true;while (rclcpp::ok(this->context_) && spinning.load()) {rclcpp::AnyExecutable any_executable;if (get_next_executable(any_executable)) {execute_any_executable(any_executable);}}
}

那么这个while循环会不会导致CPU一直空转呢?答案是:不是。我们可以看get_next_executable的实现。

get_next_executable

// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
bool
Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{bool success = false;// Check to see if there are any subscriptions or timers needing service// TODO(wjwwood): improve run to run efficiency of this functionsuccess = get_next_ready_executable(any_executable);// If there are noneif (!success) {// Wait for subscriptions or timers to work onwait_for_work(timeout);if (!spinning.load()) {return false;}// Try againsuccess = get_next_ready_executable(any_executable);}return success;
}

它会在底层调用wait_for_work。

wait_for_work

这个方法会一直阻塞到时间超时,或者有回调函数可以被调用。

// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());// Clear any previous wait resultthis->wait_result_.reset();{std::lock_guard<std::mutex> guard(mutex_);if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {this->collect_entities();}}this->wait_result_.emplace(wait_set_.wait(timeout));if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {RCUTILS_LOG_WARN_NAMED("rclcpp","empty wait set received in wait(). This should never happen.");} else {if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();if (current_notify_waitable_->is_ready(rcl_wait_set)) {current_notify_waitable_->execute(current_notify_waitable_->take_data());}}}
}

其中主要负责等待的是这句

wait_set_.wait(timeout)

在SingleThreadedExecutor中,由于调用get_next_executable没有传递时间,便采用了默认时间。这样get_next_executable会一直等到有回调可以被执行。这样就避免了CPU空转的问题。

  /// Wait for executable in ready state and populate union structure./*** If an executable is ready, it will return immediately, otherwise* block based on the timeout for work to become ready.** \param[out] any_executable populated union structure of ready executable* \param[in] timeout duration of time to wait for work, a negative value*   (the defualt behavior), will make this function block indefinitely* \return true if an executable was ready and any_executable was populated,*   otherwise false*/RCLCPP_PUBLICboolget_next_executable(AnyExecutable & any_executable,std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

get_next_ready_executable

get_next_ready_executable会按顺序寻找Timer、Subscription、Service 、Client和Waitable中第一个处于可被回调状态的Node。

Timer
bool valid_executable = false;if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {return false;}if (!valid_executable) {size_t current_timer_index = 0;while (true) {auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);if (nullptr == timer) {break;}current_timer_index = timer_index;auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());if (entity_iter != current_collection_.timers.end()) {auto callback_group = entity_iter->second.callback_group.lock();if (!callback_group || !callback_group->can_be_taken_from()) {current_timer_index++;continue;}// At this point the timer is either ready for execution or was perhaps// it was canceled, based on the result of call(), but either way it// should not be checked again from peek_next_ready_timer(), so clear// it from the wait result.wait_result_->clear_timer_with_index(current_timer_index);// Check that the timer should be called still, i.e. it wasn't canceled.any_executable.data = timer->call();if (!any_executable.data) {current_timer_index++;continue;}any_executable.timer = timer;any_executable.callback_group = callback_group;valid_executable = true;break;}current_timer_index++;}}
Subscription
  if (!valid_executable) {while (auto subscription = wait_result_->next_ready_subscription()) {auto entity_iter = current_collection_.subscriptions.find(subscription->get_subscription_handle().get());if (entity_iter != current_collection_.subscriptions.end()) {auto callback_group = entity_iter->second.callback_group.lock();if (!callback_group || !callback_group->can_be_taken_from()) {continue;}any_executable.subscription = subscription;any_executable.callback_group = callback_group;valid_executable = true;break;}}}
Service
  if (!valid_executable) {while (auto service = wait_result_->next_ready_service()) {auto entity_iter = current_collection_.services.find(service->get_service_handle().get());if (entity_iter != current_collection_.services.end()) {auto callback_group = entity_iter->second.callback_group.lock();if (!callback_group || !callback_group->can_be_taken_from()) {continue;}any_executable.service = service;any_executable.callback_group = callback_group;valid_executable = true;break;}}}
Client
  if (!valid_executable) {while (auto client = wait_result_->next_ready_client()) {auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());if (entity_iter != current_collection_.clients.end()) {auto callback_group = entity_iter->second.callback_group.lock();if (!callback_group || !callback_group->can_be_taken_from()) {continue;}any_executable.client = client;any_executable.callback_group = callback_group;valid_executable = true;break;}}}
Waitable
  if (!valid_executable) {while (auto waitable = wait_result_->next_ready_waitable()) {auto entity_iter = current_collection_.waitables.find(waitable.get());if (entity_iter != current_collection_.waitables.end()) {auto callback_group = entity_iter->second.callback_group.lock();if (!callback_group || !callback_group->can_be_taken_from()) {continue;}any_executable.waitable = waitable;any_executable.callback_group = callback_group;any_executable.data = waitable->take_data();valid_executable = true;break;}}}
AnyExecutable
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/include/rclcpp/any_executable.hpp
struct AnyExecutable
{RCLCPP_PUBLICAnyExecutable();RCLCPP_PUBLICvirtual ~AnyExecutable();// Only one of the following pointers will be set.rclcpp::SubscriptionBase::SharedPtr subscription;rclcpp::TimerBase::SharedPtr timer;rclcpp::ServiceBase::SharedPtr service;rclcpp::ClientBase::SharedPtr client;rclcpp::Waitable::SharedPtr waitable;// These are used to keep the scope on the containing itemsrclcpp::CallbackGroup::SharedPtr callback_group {nullptr};rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr};std::shared_ptr<void> data {nullptr};
};

execute_any_executable

找到可以执行的Node后,便可以调用execute_any_executable让其执行。

在execute_any_executable内部,我们看到它也是区分Timer、Subscription、Service 、Client和Waitable类型来执行的。

// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::execute_any_executable(AnyExecutable & any_exec)
{if (!spinning.load()) {return;}assert((void("cannot execute an AnyExecutable without a valid callback group"),any_exec.callback_group));if (any_exec.timer) {TRACETOOLS_TRACEPOINT(rclcpp_executor_execute,static_cast<const void *>(any_exec.timer->get_timer_handle().get()));execute_timer(any_exec.timer, any_exec.data);}if (any_exec.subscription) {TRACETOOLS_TRACEPOINT(rclcpp_executor_execute,static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));execute_subscription(any_exec.subscription);}if (any_exec.service) {execute_service(any_exec.service);}if (any_exec.client) {execute_client(any_exec.client);}if (any_exec.waitable) {const std::shared_ptr<void> & const_data = any_exec.data;any_exec.waitable->execute(const_data);}// Reset the callback_group, regardless of typeany_exec.callback_group->can_be_taken_from().store(true);
}

参考资料

  • https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Executors.html#executors

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/392540.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python 爬虫入门实战——爬取维基百科“百科全书”词条页面内链

1. 简述 本次爬取维基百科“百科全书”词条页面内链&#xff0c;仅发送一次请求&#xff0c;获取一个 html 页面&#xff0c;同时不包含应对反爬虫的知识&#xff0c;仅包含最基础的网页爬取、数据清洗、存储为 csv 文件。 爬取网址 url 为 “https://zh.wikipedia.org/wiki/…

数据结构:基于顺序表实现通讯录系统(含源码)

目录 一、前言 二、各个功能的实现 2.1 初始化通讯录 2.2 添加通讯录数据 2.3 查找通讯录数据 2.4 删除通讯录数据 2.5 修改通讯录数据 2.6 展示通讯录数据​编辑 2.7 销毁通讯录数据 三、添加菜单和测试 四、完整源码 sxb.h sxb.c contact.h contact.c test.c 一、前…

【隐私计算篇】混淆电路之深入浅出

入门隐私计算的阶段&#xff0c;一般都会涉及对于混淆电路的学习&#xff0c;这是因为混淆电路是多方安全计算中的基础密码原语&#xff0c;也是隐私保护中重要的技术。为了帮助更好地理解混淆电路的原理&#xff0c;今天对其进行原理以及相关优化手段进行解析和分享。 1. 混淆…

不同角色路由权限配置(六)

一、启用方式 配置开启config/config.ts。同时需要 src/access.ts 提供权限配置 export default {access: {},// access 插件依赖 initial State 所以需要同时开启initialState: {}, };这里以扩展的路由配置为例&#xff0c;配置只有admin权限才能查看的页面 1、在src/acces…

前端web开发HTML+CSS3+移动web(0基础,超详细)——第3天

目录 一&#xff0c;列表-无序和有序的定义列表 二&#xff0c;表格-基本使用与表格结构标签 三&#xff0c;合并单元格 四&#xff0c;表单-input标签 五&#xff0c;表单-下拉菜单 六&#xff0c;表单-文本域 七&#xff0c;表单-label标签 八&#xff0c;表单-按钮 …

【已解决】页面操作系统功能,诡异报错500nginx错误

【已解决】页面操作系统功能&#xff0c;诡异报错500nginx错误&#xff0c;后台没有任何报错信息 不知道啥原因 清理了浏览器缓存 也没有效果 还有一个表现情况&#xff0c;同样的操作&#xff0c;有时可以又是不行 因为报错ng的代理问题&#xff0c;检查了ng配置 后续经过同…

【C/C++】C语言和C++实现Stack(栈)对比

我们初步了解了C&#xff0c;也用C语言实现过栈&#xff0c;就我们当前所更新过的有关C学习内容以栈为例子&#xff0c;来简单对比一下C语言和C。 1.C中栈的实现 栈的C语言实现在【数据结构】栈的概念、结构和实现详解-CSDN博客 &#xff0c;下面是C实现的栈&#xff0c; 在St…

OD C卷 - 多线段数据压缩

多段 线 数据压缩 &#xff08;200&#xff09; 如图中每个方格为一个像素&#xff08;i&#xff0c;j&#xff09;&#xff0c;线的走向只能水平、垂直、倾斜45度&#xff1b;图中线段表示为(2, 8)、&#xff08;3,7&#xff09;、&#xff08;3, 6&#xff09;、&#xff08…

学习STM32(2)--STM32单片机GPIO应用

目录 1 引 言 2 实验目的 3 实验内容 3.1掌握STM32F103的GPIO控制 3.1.1 GPIO的分组 3.1.2 GPIO的常用功能 3.1.3 STM32单片机GPIO位结构 3.1.4 STM32单片机GPIO工作模式 3.1.5 STM32的GPIO 输出-点亮LED编程要点 使用GPIO时&#xff0c;按下面步骤进行&#xff1…

部署服务器项目及发布

当技术总监直接丢给我一个服务器账号密码时&#xff0c;我该怎么完成映射本机&#xff1b;配置网关&#xff1b;配置代理和发布项目呢&#xff1f; 我使用的是putty远程登录到服务器 输入ip后&#xff0c;点open 输入账号密码 登录的账号如果不是root&#xff1b;使用sudo su…

sqllab靶场练习第1~15关

1、第一关 代码解析 if(isset($_GET[id]))//判断获取的id字段是否为空 { $id$_GET[id]; //logging the connection parameters to a file for analysis. $fpfopen(result.txt,a);//打开这个文件&#xff0c;记录操作的日志 fwrite($fp,ID:.$id."\n"); fclose($fp);…

【C++高阶】深入理解C++异常处理机制:从try到catch的全面解析

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;Lambda表达式 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀C异常 &#x1f4d2;1. C异常概念…

WPF学习(3)- WrapPanel控件(瀑布流布局)+DockPanel控件(停靠布局)

WrapPanel控件&#xff08;瀑布流布局&#xff09; WrapPanel控件表示将其子控件从左到右的顺序排列&#xff0c;如果第一行显示不了&#xff0c;则自动换至第二行&#xff0c;继续显示剩余的子控件。我们来看看它的结构定义&#xff1a; public class WrapPanel : Panel {pub…

【前端】(仅思路)如何在前端实现一个fc手柄,将手机作为游戏手柄设备。

文章目录 背景界面demo原型图&#xff08;没错&#xff0c;就是它&#xff0c;童年回忆&#xff09; 遇到的问题最终后端demo(甚至比前端逻辑更简单) 背景 突发奇想&#xff0c;想要在前端实现一个fc游戏手柄&#xff0c;然后控制电脑的nes模拟器玩玩魂斗罗。 思路很简单&…

【编程笔记】解决移动硬盘无法访问文件或目录损坏且无法读取

解决移动硬盘无法访问文件或目录损坏且无法读取 只解决&#xff1a;移动硬盘无法访问文件或目录损坏且无法读取 问题 由于频繁下载数据&#xff0c;多次安装虚拟机导致磁盘无法被系统识别。磁盘本身是好的&#xff0c;只是不能被识别&#xff0c;如果将磁盘格式化&#xff0c…

Chainlit快速实现AI对话应用1 分钟内实现聊天数据的持久化保存

概述 默认情况下&#xff0c;Chainlit 应用不会保留其生成的聊天和元素。即网页一刷新&#xff0c;所有的聊天记录&#xff0c;页面上的所有聊天记录都会消失。但是&#xff0c;存储和利用这些数据的能力可能是您的项目或组织的重要组成部分。 一旦启用&#xff0c;数据持久性…

Unlikely argument type for equals(): int seems to be unrelated to Long

代码审查不规范&#xff1a; Unlikely argument type for equals(): int seems to be unrelated to Long check package code_check;public class Obj {public Obj(){}private Long mail;public Long getMail(){return mail;}public void setMail(Long mail){this.mail mail;…

【零基础实战】基于物联网的人工淡水湖养殖系统设计

文章目录 一、前言1.1 项目介绍1.1.1 开发背景1.1.2 项目实现的功能1.1.3 项目硬件模块组成1.1.4 ESP8266工作模式配置 1.2 系统设计方案1.2.1 关键技术与创新点1.2.2 功能需求分析1.2.3 现有技术与市场分析1.2.4 硬件架构设计1.2.5 软件架构设计1.2.6 上位机开发思路 1.3 系统…

Java | Leetcode Java题解之第324题摆动排序II

题目&#xff1a; 题解&#xff1a; class Solution {Random random new Random();public void wiggleSort(int[] nums) {int n nums.length;int x (n 1) / 2;int mid x - 1;int target findKthLargest(nums, n - mid);for (int k 0, i 0, j n - 1; k < j; k) {if…

DAMA学习笔记(十)-数据仓库与商务智能

1.引言 数据仓库&#xff08;Data Warehouse&#xff0c;DW&#xff09;的概念始于20世纪80年代。该技术赋能组织将不同来源的数据整合到公共的数据模型中去&#xff0c;整合后的数据能为业务运营提供洞察&#xff0c;为企业决策支持和创造组织价值开辟新的可能性。与商务智能&…