ROS开发实战指南 - 从入门到项目实践

ROS开发实战指南 - 从入门到项目实践

前言

ROS(Robot Operating System)是机器人开发领域最流行的中间件框架。本文将介绍ROS的核心概念、开发实践,以及如何构建一个完整的ROS项目。

ROS核心概念

1. 节点(Nodes)

节点是ROS中的基本执行单元,每个节点负责特定的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_msgs.msg import String

class TalkerNode:
def __init__(self):
rospy.init_node('talker_node', anonymous=True)
self.publisher = rospy.Publisher('chatter', String, queue_size=10)
self.rate = rospy.Rate(1) # 1 Hz

def run(self):
count = 0
while not rospy.is_shutdown():
message = f"Hello ROS {count}"
rospy.loginfo(f"Publishing: {message}")
self.publisher.publish(message)
count += 1
self.rate.sleep()

if __name__ == '__main__':
try:
talker = TalkerNode()
talker.run()
except rospy.ROSInterruptException:
pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_msgs.msg import String

class ListenerNode:
def __init__(self):
rospy.init_node('listener_node', anonymous=True)
self.subscriber = rospy.Subscriber('chatter', String, self.callback)

def callback(self, data):
rospy.loginfo(f"Received: {data.data}")

def run(self):
rospy.spin()

if __name__ == '__main__':
try:
listener = ListenerNode()
listener.run()
except rospy.ROSInterruptException:
pass

2. 话题(Topics)

话题是节点间异步通信的机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan

class RobotController:
def __init__(self):
rospy.init_node('robot_controller')

# 发布者:控制机器人运动
self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)

# 订阅者:接收激光雷达数据
self.laser_sub = rospy.Subscriber('/scan', LaserScan, self.laser_callback)

# 安全参数
self.min_distance = 0.5 # 最小安全距离

def laser_callback(self, msg):
"""处理激光雷达数据"""
# 获取前方距离(简化处理)
front_distance = min(msg.ranges[0:30] + msg.ranges[-30:])

if front_distance < self.min_distance:
# 检测到障碍物,停止运动
self.stop_robot()
rospy.logwarn(f"Obstacle detected at distance: {front_distance:.2f}m")
else:
# 继续前进
self.move_forward()

def move_forward(self):
"""向前移动"""
twist = Twist()
twist.linear.x = 0.2 # 线速度
twist.angular.z = 0.0 # 角速度
self.cmd_vel_pub.publish(twist)

def stop_robot(self):
"""停止机器人"""
twist = Twist()
twist.linear.x = 0.0
twist.angular.z = 0.0
self.cmd_vel_pub.publish(twist)

def run(self):
rospy.loginfo("Robot controller started")
rospy.spin()

if __name__ == '__main__':
try:
controller = RobotController()
controller.run()
except rospy.ROSInterruptException:
pass

3. 服务(Services)

服务提供同步的请求-响应通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_srvs.srv import AddTwoInts, AddTwoIntsResponse

class MathServiceServer:
def __init__(self):
rospy.init_node('math_service_server')

# 创建服务
self.service = rospy.Service('add_two_ints', AddTwoInts, self.add_callback)
rospy.loginfo("Math service server ready")

def add_callback(self, req):
"""处理加法请求"""
result = req.a + req.b
rospy.loginfo(f"Adding {req.a} + {req.b} = {result}")
return AddTwoIntsResponse(result)

def run(self):
rospy.spin()

if __name__ == '__main__':
try:
server = MathServiceServer()
server.run()
except rospy.ROSInterruptException:
pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_srvs.srv import AddTwoInts

class MathServiceClient:
def __init__(self):
rospy.init_node('math_service_client')

# 等待服务可用
rospy.wait_for_service('add_two_ints')
self.add_service = rospy.ServiceProxy('add_two_ints', AddTwoInts)

def call_service(self, a, b):
"""调用服务"""
try:
response = self.add_service(a, b)
rospy.loginfo(f"Service call successful: {a} + {b} = {response.sum}")
return response.sum
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return None

def run(self):
# 测试服务调用
self.call_service(5, 3)
self.call_service(10, 20)
self.call_service(-5, 8)

if __name__ == '__main__':
try:
client = MathServiceClient()
client.run()
except rospy.ROSInterruptException:
pass

4. 参数服务器(Parameter Server)

参数服务器用于存储和检索配置参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_msgs.msg import String

class ParameterExample:
def __init__(self):
rospy.init_node('parameter_example')

# 设置参数
rospy.set_param('/robot_name', 'TurtleBot')
rospy.set_param('/robot_speed', 0.5)
rospy.set_param('/sensor_enabled', True)

# 获取参数
robot_name = rospy.get_param('/robot_name', 'DefaultRobot')
robot_speed = rospy.get_param('/robot_speed', 0.2)
sensor_enabled = rospy.get_param('/sensor_enabled', False)

rospy.loginfo(f"Robot Name: {robot_name}")
rospy.loginfo(f"Robot Speed: {robot_speed}")
rospy.loginfo(f"Sensor Enabled: {sensor_enabled}")

def run(self):
rospy.spin()

if __name__ == '__main__':
try:
example = ParameterExample()
example.run()
except rospy.ROSInterruptException:
pass

ROS包开发

创建ROS包

1
2
3
4
5
6
7
8
9
10
11
# 创建工作空间
mkdir -p ~/catkin_ws/src
cd ~/catkin_ws/src

# 创建包
catkin_create_pkg my_robot_package rospy std_msgs geometry_msgs sensor_msgs

# 编译工作空间
cd ~/catkin_ws
catkin_make
source devel/setup.bash

包结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
my_robot_package/
├── CMakeLists.txt
├── package.xml
├── scripts/
│ ├── talker.py
│ └── listener.py
├── src/
│ └── my_robot_package/
│ ├── __init__.py
│ └── robot_controller.py
├── launch/
│ └── robot.launch
└── config/
└── robot_params.yaml

Launch文件

1
2
3
4
5
6
7
8
9
10
11
12
<!-- robot.launch -->
<launch>
<!-- 参数文件 -->
<rosparam file="$(find my_robot_package)/config/robot_params.yaml" command="load" />

<!-- 启动节点 -->
<node name="robot_controller" pkg="my_robot_package" type="robot_controller.py" output="screen" />
<node name="sensor_node" pkg="my_robot_package" type="sensor_node.py" output="screen" />

<!-- 条件启动 -->
<node name="debug_node" pkg="my_robot_package" type="debug_node.py" output="screen" if="$(arg debug)" />
</launch>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# robot_params.yaml
robot:
name: "MyRobot"
max_speed: 1.0
min_speed: 0.1

sensors:
laser:
enabled: true
frequency: 10
range_min: 0.1
range_max: 10.0

camera:
enabled: true
width: 640
height: 480
fps: 30

高级特性

1. 动态重配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from dynamic_reconfigure.server import Server
from my_robot_package.cfg import RobotConfig

class DynamicConfigServer:
def __init__(self):
rospy.init_node('dynamic_config_server')

# 创建动态重配置服务器
self.srv = Server(RobotConfig, self.reconfigure_callback)
rospy.loginfo("Dynamic reconfigure server started")

def reconfigure_callback(self, config, level):
"""处理重配置请求"""
rospy.loginfo(f"Reconfigure Request: speed={config.speed}, enabled={config.enabled}")

# 应用新配置
self.apply_config(config)

return config

def apply_config(self, config):
"""应用配置参数"""
# 更新机器人速度
rospy.set_param('/robot_speed', config.speed)

# 启用/禁用传感器
rospy.set_param('/sensor_enabled', config.enabled)

def run(self):
rospy.spin()

if __name__ == '__main__':
try:
server = DynamicConfigServer()
server.run()
except rospy.ROSInterruptException:
pass

2. 动作服务器(Action Server)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
import actionlib
from my_robot_package.msg import MoveAction, MoveGoal, MoveResult, MoveFeedback

class MoveActionServer:
def __init__(self):
rospy.init_node('move_action_server')

# 创建动作服务器
self.server = actionlib.SimpleActionServer('move', MoveAction, self.execute_callback, False)
self.server.start()
rospy.loginfo("Move action server started")

def execute_callback(self, goal):
"""执行移动动作"""
rospy.loginfo(f"Moving to position: x={goal.x}, y={goal.y}")

# 模拟移动过程
for i in range(10):
if self.server.is_preempt_requested():
self.server.set_preempted()
return

# 发布反馈
feedback = MoveFeedback()
feedback.progress = (i + 1) * 10
self.server.publish_feedback(feedback)

rospy.sleep(0.1)

# 返回结果
result = MoveResult()
result.success = True
result.message = "Move completed successfully"
self.server.set_succeeded(result)

def run(self):
rospy.spin()

if __name__ == '__main__':
try:
server = MoveActionServer()
server.run()
except rospy.ROSInterruptException:
pass

总结

ROS提供了强大的机器人开发框架,包括:

  1. 节点通信:话题、服务、参数服务器
  2. 包管理:模块化开发,便于代码复用
  3. 工具链:丰富的调试和可视化工具
  4. 生态系统:大量的开源包和算法

掌握ROS开发对于机器人工程师来说是必不可少的技能。通过实际项目练习,可以更好地理解ROS的设计理念和最佳实践。