ROS开发实战指南 - 从入门到项目实践
前言
ROS(Robot Operating System)是机器人开发领域最流行的中间件框架。本文将介绍ROS的核心概念、开发实践,以及如何构建一个完整的ROS项目。
ROS核心概念
1. 节点(Nodes)
节点是ROS中的基本执行单元,每个节点负责特定的功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
import rospy from std_msgs.msg import String
class TalkerNode: def __init__(self): rospy.init_node('talker_node', anonymous=True) self.publisher = rospy.Publisher('chatter', String, queue_size=10) self.rate = rospy.Rate(1) def run(self): count = 0 while not rospy.is_shutdown(): message = f"Hello ROS {count}" rospy.loginfo(f"Publishing: {message}") self.publisher.publish(message) count += 1 self.rate.sleep()
if __name__ == '__main__': try: talker = TalkerNode() talker.run() except rospy.ROSInterruptException: pass
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
import rospy from std_msgs.msg import String
class ListenerNode: def __init__(self): rospy.init_node('listener_node', anonymous=True) self.subscriber = rospy.Subscriber('chatter', String, self.callback) def callback(self, data): rospy.loginfo(f"Received: {data.data}") def run(self): rospy.spin()
if __name__ == '__main__': try: listener = ListenerNode() listener.run() except rospy.ROSInterruptException: pass
|
2. 话题(Topics)
话题是节点间异步通信的机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
import rospy from geometry_msgs.msg import Twist from sensor_msgs.msg import LaserScan
class RobotController: def __init__(self): rospy.init_node('robot_controller') self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1) self.laser_sub = rospy.Subscriber('/scan', LaserScan, self.laser_callback) self.min_distance = 0.5 def laser_callback(self, msg): """处理激光雷达数据""" front_distance = min(msg.ranges[0:30] + msg.ranges[-30:]) if front_distance < self.min_distance: self.stop_robot() rospy.logwarn(f"Obstacle detected at distance: {front_distance:.2f}m") else: self.move_forward() def move_forward(self): """向前移动""" twist = Twist() twist.linear.x = 0.2 twist.angular.z = 0.0 self.cmd_vel_pub.publish(twist) def stop_robot(self): """停止机器人""" twist = Twist() twist.linear.x = 0.0 twist.angular.z = 0.0 self.cmd_vel_pub.publish(twist) def run(self): rospy.loginfo("Robot controller started") rospy.spin()
if __name__ == '__main__': try: controller = RobotController() controller.run() except rospy.ROSInterruptException: pass
|
3. 服务(Services)
服务提供同步的请求-响应通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
import rospy from std_srvs.srv import AddTwoInts, AddTwoIntsResponse
class MathServiceServer: def __init__(self): rospy.init_node('math_service_server') self.service = rospy.Service('add_two_ints', AddTwoInts, self.add_callback) rospy.loginfo("Math service server ready") def add_callback(self, req): """处理加法请求""" result = req.a + req.b rospy.loginfo(f"Adding {req.a} + {req.b} = {result}") return AddTwoIntsResponse(result) def run(self): rospy.spin()
if __name__ == '__main__': try: server = MathServiceServer() server.run() except rospy.ROSInterruptException: pass
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
import rospy from std_srvs.srv import AddTwoInts
class MathServiceClient: def __init__(self): rospy.init_node('math_service_client') rospy.wait_for_service('add_two_ints') self.add_service = rospy.ServiceProxy('add_two_ints', AddTwoInts) def call_service(self, a, b): """调用服务""" try: response = self.add_service(a, b) rospy.loginfo(f"Service call successful: {a} + {b} = {response.sum}") return response.sum except rospy.ServiceException as e: rospy.logerr(f"Service call failed: {e}") return None def run(self): self.call_service(5, 3) self.call_service(10, 20) self.call_service(-5, 8)
if __name__ == '__main__': try: client = MathServiceClient() client.run() except rospy.ROSInterruptException: pass
|
4. 参数服务器(Parameter Server)
参数服务器用于存储和检索配置参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
import rospy from std_msgs.msg import String
class ParameterExample: def __init__(self): rospy.init_node('parameter_example') rospy.set_param('/robot_name', 'TurtleBot') rospy.set_param('/robot_speed', 0.5) rospy.set_param('/sensor_enabled', True) robot_name = rospy.get_param('/robot_name', 'DefaultRobot') robot_speed = rospy.get_param('/robot_speed', 0.2) sensor_enabled = rospy.get_param('/sensor_enabled', False) rospy.loginfo(f"Robot Name: {robot_name}") rospy.loginfo(f"Robot Speed: {robot_speed}") rospy.loginfo(f"Sensor Enabled: {sensor_enabled}") def run(self): rospy.spin()
if __name__ == '__main__': try: example = ParameterExample() example.run() except rospy.ROSInterruptException: pass
|
ROS包开发
创建ROS包
1 2 3 4 5 6 7 8 9 10 11
| mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src
catkin_create_pkg my_robot_package rospy std_msgs geometry_msgs sensor_msgs
cd ~/catkin_ws catkin_make source devel/setup.bash
|
包结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| my_robot_package/ ├── CMakeLists.txt ├── package.xml ├── scripts/ │ ├── talker.py │ └── listener.py ├── src/ │ └── my_robot_package/ │ ├── __init__.py │ └── robot_controller.py ├── launch/ │ └── robot.launch └── config/ └── robot_params.yaml
|
Launch文件
1 2 3 4 5 6 7 8 9 10 11 12
| <launch> <rosparam file="$(find my_robot_package)/config/robot_params.yaml" command="load" /> <node name="robot_controller" pkg="my_robot_package" type="robot_controller.py" output="screen" /> <node name="sensor_node" pkg="my_robot_package" type="sensor_node.py" output="screen" /> <node name="debug_node" pkg="my_robot_package" type="debug_node.py" output="screen" if="$(arg debug)" /> </launch>
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| robot: name: "MyRobot" max_speed: 1.0 min_speed: 0.1
sensors: laser: enabled: true frequency: 10 range_min: 0.1 range_max: 10.0 camera: enabled: true width: 640 height: 480 fps: 30
|
高级特性
1. 动态重配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
import rospy from dynamic_reconfigure.server import Server from my_robot_package.cfg import RobotConfig
class DynamicConfigServer: def __init__(self): rospy.init_node('dynamic_config_server') self.srv = Server(RobotConfig, self.reconfigure_callback) rospy.loginfo("Dynamic reconfigure server started") def reconfigure_callback(self, config, level): """处理重配置请求""" rospy.loginfo(f"Reconfigure Request: speed={config.speed}, enabled={config.enabled}") self.apply_config(config) return config def apply_config(self, config): """应用配置参数""" rospy.set_param('/robot_speed', config.speed) rospy.set_param('/sensor_enabled', config.enabled) def run(self): rospy.spin()
if __name__ == '__main__': try: server = DynamicConfigServer() server.run() except rospy.ROSInterruptException: pass
|
2. 动作服务器(Action Server)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
import rospy import actionlib from my_robot_package.msg import MoveAction, MoveGoal, MoveResult, MoveFeedback
class MoveActionServer: def __init__(self): rospy.init_node('move_action_server') self.server = actionlib.SimpleActionServer('move', MoveAction, self.execute_callback, False) self.server.start() rospy.loginfo("Move action server started") def execute_callback(self, goal): """执行移动动作""" rospy.loginfo(f"Moving to position: x={goal.x}, y={goal.y}") for i in range(10): if self.server.is_preempt_requested(): self.server.set_preempted() return feedback = MoveFeedback() feedback.progress = (i + 1) * 10 self.server.publish_feedback(feedback) rospy.sleep(0.1) result = MoveResult() result.success = True result.message = "Move completed successfully" self.server.set_succeeded(result) def run(self): rospy.spin()
if __name__ == '__main__': try: server = MoveActionServer() server.run() except rospy.ROSInterruptException: pass
|
总结
ROS提供了强大的机器人开发框架,包括:
- 节点通信:话题、服务、参数服务器
- 包管理:模块化开发,便于代码复用
- 工具链:丰富的调试和可视化工具
- 生态系统:大量的开源包和算法
掌握ROS开发对于机器人工程师来说是必不可少的技能。通过实际项目练习,可以更好地理解ROS的设计理念和最佳实践。