ROS2通信之参数与动作
- 引言
- 1 参数通信
- 1.1 参数常用指令
- 2 参数之RCLPY实现
- 2.1 创建功能包和节点
- 2.2 编辑脚本文件parameters_basic.py
- 2.3 编译测试
- 3 动作(Action)通信
- 3.1 动作常用指令
- 3.2 自定义通信接口
- 4 动作之RCLPY实现
引言
笔者跟着鱼香ROS的ROS2学习之旅
学习参考:
【ROS2机器人入门到实战】
笔者的学习目录
- MOMO的鱼香ROS2(一)ROS2入门篇——从Ubuntu操作系统开启
- MOMO的鱼香ROS2(二)ROS2入门篇——ROS2初体验
- MOMO的鱼香ROS2(三)ROS2入门篇——ROS2第一个节点
- MOMO的鱼香ROS2(四)ROS2入门篇——ROS2节点通信之话题与服务
- MOMO的鱼香ROS2(五)ROS2入门篇——ROS2接口与自定义
1 参数通信
参数是节点的一个配置值,你可以认为参数是一个节点的设置
ROS2支持的参数值的类型
bool 和bool[],布尔类型用来表示开关,比如我们可以控制雷达控制节点,开始扫描和停止扫描。
int64 和int64[],整形表示一个数字,含义可以自己来定义,这里我们可以用来表示李四节点写小说的周期值
float64 和float64[],浮点型,可以表示小数类型的参数值
string 和string[],字符串,可以用来表示雷达控制节点中真实雷达的ip地址
byte[],字节数组,这个可以用来表示图片,点云数据等信息
1.1 参数常用指令
打开终端,运行小乌龟节点
# 终端1
ros2 run turtlesim turtlesim_node
# 终端2
ros2 run turtlesim turtle_teleop_key
查看参数信息
# 列举所有的参数
ros2 param list
# 参数的详细信息
ros2 param describe <node_name> <param_name>
# 举例
ros2 param describe /turtlesim background_b
# 查看参数值
ros2 param get /turtlesim background_b
设置参数
ros2 param set <node_name> <parameter_name> <value>
# 举例
ros2 param set /turtlesim background_r 44
保存参数
ros2 param dump <node_name>
# 举例
ros2 param dump /turtlesim
文件被保存成了yaml格式,用cat指令看一看
cat ./turtlesim.yaml
恢复参数值
ros2 param load /turtlesim ./turtlesim.yaml
启动节点时加载参数快照
ros2 run <package_name> <executable_name> --ros-args --params-file <file_name>
# 举例
ros2 run turtlesim turtlesim_node --ros-args --params-file ./turtlesim.yaml
2 参数之RCLPY实现
2.1 创建功能包和节点
cd alian_ws/
ros2 pkg create alian_parameters_rclpy --build-type ament_python --dependencies rclpy --destination-directory src --node-name parameters_basic --maintainer-name "alian"
2.2 编辑脚本文件parameters_basic.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Nodeclass ParametersBasicNode(Node):"""创建一个ParametersBasicNode节点,并在初始化时输出一个话"""def __init__(self, name):super().__init__(name)self.get_logger().info(f"节点已启动:{name}!")# 声明参数self.declare_parameter('alian_level', 0)# 获取参数log_level = self.get_parameter("alian_level").value# 设置参数self.get_logger().set_level(log_level)# 定时修改self.timer = self.create_timer(0.5, self.timer_callback)def timer_callback(self):"""定时器回调函数"""# 获取参数log_level = self.get_parameter("alian_level").value# 设置参数self.get_logger().set_level(log_level)print(f"========================{log_level}=============================")self.get_logger().debug("我是DEBUG级别的日志,我被打印出来了!")self.get_logger().info("我是INFO级别的日志,我被打印出来了!")self.get_logger().warn("我是WARN级别的日志,我被打印出来了!")self.get_logger().error("我是ERROR级别的日志,我被打印出来了!")self.get_logger().fatal("我是FATAL级别的日志,我被打印出来了!") def main(args=None):rclpy.init(args=args) # 初始化rclpynode = ParametersBasicNode("parameters_basic") # 新建一个节点rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)rclpy.shutdown() # 关闭rclpy
2.3 编译测试
colcon build --packages-select alian_parameters_rclpy
source install/setup.bash
ros2 run alian_parameters_rclpy parameters_basic
1. 指定参数值测试
终端1:
ros2 run alian_parameters_rclpy parameters_basic --ros-args -p alian_level:=10
2. 动态设置参数测试
在终端1运行的前提下,打开终端2:
ros2 param list
ros2 param set /parameters_basic alian_level 40
3 动作(Action)通信
参数是由服务构建出来了,而Action是由话题和服务共同构建出来的(一个Action = 三个服务+两个话题)
三个服务分别是: 1.目标传递服务 2.结果传递服务 3.取消执行服务
两个话题:1.反馈话题(服务发布,客户端订阅)2.状态话题(服务端发布,客户端订阅)
3.1 动作常用指令
列举目前存在的动作
ros2 action list -t
获取接口的信息
ros2 interface show turtlesim/action/RotateAbsolute
查看动作信息
ros2 action info /turtle1/rotate_absolute
ros2 action send_goal /turtle1/rotate_absolute turtlesim/action/RotateAbsolute "{theta: 1.5}" --feedback
3.2 自定义通信接口
1. 创建接口功能包和接口文件
cd alian_ws/
ros2 pkg create robot_control_interfaces --build-type ament_cmake --destination-directory src --maintainer-name "alian"
mkdir -p src/robot_control_interfaces/action
touch src/robot_control_interfaces/action/MoveRobot.action
2. 修改packages.xml
<depend>rosidl_default_generators</depend><member_of_group>rosidl_interface_packages</member_of_group>
3. 修改CMakeLists.txt
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)rosidl_generate_interfaces(${PROJECT_NAME}"action/MoveRobot.action"
)
4. 编写MoveRobot.action接口
# Goal: 要移动的距离
float32 distance
---
# Result: 最终的位置
float32 pose
---
# Feedback: 中间反馈的位置和状态
float32 pose
uint32 status
uint32 STATUS_MOVEING = 3
uint32 STATUS_STOP = 4
5. 编译生成接口
colcon build --packages-select robot_control_interfaces
4 动作之RCLPY实现
1. 创建功能包和节点
cd alian_ws/
ros2 pkg create alian_action_rclpy --build-type ament_python --dependencies rclpy robot_control_interfaces --destination-directory src --node-name action_robot --maintainer-name "alian"
# 手动再创建action_control_02节点文件
touch src/alian_action_rclpy/alian_action_rclpy/action_control.py
#手动创建机器人类robot.py
touch src/alian_action_rclpy/alian_action_rclpy/robot.py
2. 节点文件1:robot.py 机器人类
from robot_control_interfaces.action import MoveRobot
import math
class Robot():"""机器人类,模拟一个机器人"""def __init__(self) -> None:self.current_pose_ = 0.0self.target_pose_ = 0.0self.move_distance_ = 0.0self.status_ = MoveRobot.Feedbackdef get_status(self):"""获取状态"""return self.status_def get_current_pose(self):"""获取当前位置"""return self.current_pose_def close_goal(self):"""接近目标"""return math.fabs(self.target_pose_ - self.current_pose_) < 0.01def stop_move(self):"""停止移动"""self.status_ = MoveRobot.Feedback.STATUS_STOPdef move_step(self):"""移动一小步"""direct = self.move_distance_ / math.fabs(self.move_distance_)step = direct * math.fabs(self.target_pose_ - self.current_pose_) * 0.1self.current_pose_ += step # 移动一步print(f"移动了:{step}当前位置:{self.current_pose_}")return self.current_pose_def set_goal(self, distance):"""设置目标"""self.move_distance_ = distanceself.target_pose_ += distance # 更新目标位置if self.close_goal():self.stop_move()return Falseself.status_ = MoveRobot.Feedback.STATUS_MOVEING # 更新状态为移动return True
3. 节点文件2:action_robot.py 机器人节点
#!/usr/bin/env python3
import time
# 导入rclpy相关库
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle
# 导入接口
from robot_control_interfaces.action import MoveRobot
# 导入机器人类
from alian_action_rclpy.robot import Robot
#from rclpy.executors import MultiThreadedExecutor
#from rclpy.callback_groups import MutuallyExclusiveCallbackGroupclass ActionRobot(Node):"""机器人端Action服务"""def __init__(self,name):super().__init__(name)self.get_logger().info(f"节点已启动:{name}!")self.robot_ = Robot()self.action_server_ = ActionServer(self, MoveRobot, 'move_robot', self.execute_callback# ,callback_group=MutuallyExclusiveCallbackGroup())def execute_callback(self, goal_handle: ServerGoalHandle):"""执行回调函数,若采用默认handle_goal函数则会自动调用"""self.get_logger().info('执行移动机器人')feedback_msg = MoveRobot.Feedback()self.robot_.set_goal(goal_handle.request.distance)# rate = self.create_rate(2)while rclpy.ok() and not self.robot_.close_goal():# moveself.robot_.move_step()# feedbackfeedback_msg.pose = self.robot_.get_current_pose()feedback_msg.status = self.robot_.get_status()goal_handle.publish_feedback(feedback_msg)# cancel checkif goal_handle.is_cancel_requested:result = MoveRobot.Result()result.pose = self.robot_.get_current_pose()return result# rate.sleep() # Rate会造成死锁,单线程执行器时不能使用time.sleep(0.5)goal_handle.succeed()result = MoveRobot.Result()result.pose = self.robot_.get_current_pose()return resultdef main(args=None):"""主函数"""rclpy.init(args=args)action_robot = ActionRobot("action_robot")# 采用多线程执行器解决rate死锁问题# executor = MultiThreadedExecutor()# executor.add_node(action_robot_02)# executor.spin()rclpy.spin(action_robot)rclpy.shutdown()
4. 节点文件3:action_control.py 控制节点
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
# 导入Action接口
from robot_control_interfaces.action import MoveRobotclass ActionControl(Node):"""Action客户端"""def __init__(self, name):super().__init__(name)self.get_logger().info(f"节点已启动:{name}!")self.action_client_ = ActionClient(self, MoveRobot, 'move_robot')self.send_goal_timer_ = self.create_timer(1, self.send_goal)def send_goal(self):"""发送目标"""self.send_goal_timer_.cancel()goal_msg = MoveRobot.Goal()goal_msg.distance = 5.0self.action_client_.wait_for_server()self._send_goal_future = self.action_client_.send_goal_async(goal_msg,feedback_callback=self.feedback_callback)self._send_goal_future.add_done_callback(self.goal_response_callback)def goal_response_callback(self, future):"""收到目标处理结果"""goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info('Goal rejected :(')returnself.get_logger().info('Goal accepted :)')self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)def get_result_callback(self, future):"""获取结果反馈"""result = future.result().resultself.get_logger().info(f'Result: {result.pose}')def feedback_callback(self, feedback_msg):"""获取回调反馈"""feedback = feedback_msg.feedbackself.get_logger().info(f'Received feedback: {feedback.pose}')def main(args=None):"""主函数"""rclpy.init(args=args)action_robot = ActionControl("action_control")rclpy.spin(action_robot)rclpy.shutdown()
5. 修改setup.py
'console_scripts': ['robot = alian_action_rclpy.robot:main','action_robot = alian_action_rclpy.action_robot:main','action_control = alian_action_rclpy.action_control:main'
6. 编译测试
cd alian_ws/
colcon build --packages-up-to alian_action_rclpy
# 运行机器人节点
source install/setup.bash
ros2 run alian_action_rclpy action_robot
# 新终端
source install/setup.bash
ros2 run alian_action_rclpy action_control