ROS1核心概念解析 - 节点、话题、服务的实现原理

ROS1核心概念深度解析 - 节点、话题、服务的实现原理

前言

ROS1作为机器人操作系统的基础框架,其核心概念包括节点(Nodes)、话题(Topics)、服务(Services)等。本文将深入分析这些核心概念的实现原理和设计思想。

ROS1架构概述

整体架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────────────────┐
│ ROS Master │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Parameter │ │ Service │ │ Topic │ │
│ │ Server │ │ Registry │ │ Registry │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

┌─────────┴─────────┐
│ │
┌───────▼───────┐ ┌───────▼───────┐
│ Node A │ │ Node B │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │Publisher │ │ │ │Subscriber │ │
│ └───────────┘ │ │ └───────────┘ │
└───────────────┘ └───────────────┘

节点(Nodes)深度解析

节点的生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
import threading
import time

class AdvancedNode:
def __init__(self, node_name):
"""节点初始化"""
# 初始化节点
rospy.init_node(node_name, anonymous=True)

# 节点状态
self.is_running = False
self.node_name = rospy.get_name()

# 创建回调函数映射
self.callbacks = {}

# 节点统计信息
self.stats = {
'messages_sent': 0,
'messages_received': 0,
'start_time': rospy.Time.now()
}

rospy.loginfo(f"Node {self.node_name} initialized")

def register_callback(self, topic_name, callback_func):
"""注册回调函数"""
self.callbacks[topic_name] = callback_func
rospy.loginfo(f"Callback registered for topic: {topic_name}")

def start(self):
"""启动节点"""
self.is_running = True
rospy.loginfo(f"Node {self.node_name} started")

# 启动统计线程
stats_thread = threading.Thread(target=self._stats_worker)
stats_thread.daemon = True
stats_thread.start()

def stop(self):
"""停止节点"""
self.is_running = False
rospy.loginfo(f"Node {self.node_name} stopped")

def _stats_worker(self):
"""统计信息工作线程"""
while self.is_running and not rospy.is_shutdown():
time.sleep(10) # 每10秒输出一次统计信息
uptime = (rospy.Time.now() - self.stats['start_time']).to_sec()
rospy.loginfo(f"Node {self.node_name} uptime: {uptime:.2f}s, "
f"Messages sent: {self.stats['messages_sent']}, "
f"Messages received: {self.stats['messages_received']}")

def get_node_info(self):
"""获取节点信息"""
return {
'name': self.node_name,
'namespace': rospy.get_namespace(),
'is_running': self.is_running,
'stats': self.stats.copy()
}

# 使用示例
if __name__ == '__main__':
try:
node = AdvancedNode('advanced_example_node')
node.start()

# 保持节点运行
rospy.spin()

except rospy.ROSInterruptException:
node.stop()
pass

节点间通信机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_msgs.msg import String, Int32
from geometry_msgs.msg import Twist
import json

class NodeCommunicationDemo:
def __init__(self):
rospy.init_node('communication_demo')

# 创建多个发布者
self.string_pub = rospy.Publisher('/demo/string', String, queue_size=10)
self.int_pub = rospy.Publisher('/demo/integer', Int32, queue_size=10)
self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)

# 创建订阅者
self.string_sub = rospy.Subscriber('/demo/string', String, self.string_callback)
self.int_sub = rospy.Subscriber('/demo/integer', Int32, self.int_callback)

# 消息计数器
self.message_count = 0

def string_callback(self, msg):
"""字符串消息回调"""
rospy.loginfo(f"Received string message: {msg.data}")

def int_callback(self, msg):
"""整数消息回调"""
rospy.loginfo(f"Received integer message: {msg.data}")

def publish_string(self, text):
"""发布字符串消息"""
msg = String()
msg.data = text
self.string_pub.publish(msg)
self.message_count += 1

def publish_integer(self, value):
"""发布整数消息"""
msg = Int32()
msg.data = value
self.int_pub.publish(msg)
self.message_count += 1

def publish_velocity(self, linear_x, angular_z):
"""发布速度命令"""
msg = Twist()
msg.linear.x = linear_x
msg.angular.z = angular_z
self.cmd_vel_pub.publish(msg)

def run_demo(self):
"""运行通信演示"""
rate = rospy.Rate(1) # 1 Hz

while not rospy.is_shutdown():
# 发布不同类型的消息
self.publish_string(f"Hello ROS! Message #{self.message_count}")
self.publish_integer(self.message_count)

# 发布速度命令
if self.message_count % 2 == 0:
self.publish_velocity(0.5, 0.0) # 前进
else:
self.publish_velocity(0.0, 0.5) # 转向

rate.sleep()

if __name__ == '__main__':
try:
demo = NodeCommunicationDemo()
demo.run_demo()
except rospy.ROSInterruptException:
pass

话题(Topics)机制深度分析

发布-订阅模式的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import PoseStamped
import threading
import queue
import time

class TopicMechanismDemo:
def __init__(self):
rospy.init_node('topic_mechanism_demo')

# 创建消息队列
self.message_queue = queue.Queue(maxsize=100)

# 创建发布者
self.laser_pub = rospy.Publisher('/scan', LaserScan, queue_size=10)
self.pose_pub = rospy.Publisher('/pose', PoseStamped, queue_size=10)

# 创建订阅者
self.laser_sub = rospy.Subscriber('/scan', LaserScan, self.laser_callback)
self.pose_sub = rospy.Subscriber('/pose', PoseStamped, self.pose_callback)

# 统计信息
self.stats = {
'laser_messages': 0,
'pose_messages': 0,
'total_bandwidth': 0
}

def laser_callback(self, msg):
"""激光雷达数据回调"""
self.stats['laser_messages'] += 1

# 计算消息大小(字节)
msg_size = len(str(msg).encode('utf-8'))
self.stats['total_bandwidth'] += msg_size

# 处理激光雷达数据
min_range = min(msg.ranges) if msg.ranges else float('inf')
max_range = max(msg.ranges) if msg.ranges else 0.0

rospy.loginfo(f"Laser scan: {len(msg.ranges)} points, "
f"range: [{min_range:.2f}, {max_range:.2f}]")

def pose_callback(self, msg):
"""位姿数据回调"""
self.stats['pose_messages'] += 1

x = msg.pose.position.x
y = msg.pose.position.y
z = msg.pose.position.z

rospy.loginfo(f"Robot pose: x={x:.2f}, y={y:.2f}, z={z:.2f}")

def create_laser_scan(self):
"""创建模拟激光雷达数据"""
msg = LaserScan()
msg.header.stamp = rospy.Time.now()
msg.header.frame_id = 'laser_frame'

# 设置激光雷达参数
msg.angle_min = -3.14159
msg.angle_max = 3.14159
msg.angle_increment = 0.0174533 # 1度
msg.time_increment = 0.0
msg.scan_time = 0.1
msg.range_min = 0.1
msg.range_max = 10.0

# 生成模拟距离数据
msg.ranges = [1.0 + 0.5 * (i % 10) for i in range(360)]

return msg

def create_pose(self, x, y, z=0.0):
"""创建位姿消息"""
msg = PoseStamped()
msg.header.stamp = rospy.Time.now()
msg.header.frame_id = 'map'

msg.pose.position.x = x
msg.pose.position.y = y
msg.pose.position.z = z

msg.pose.orientation.w = 1.0 # 无旋转

return msg

def publisher_thread(self):
"""发布者线程"""
rate = rospy.Rate(10) # 10 Hz
counter = 0

while not rospy.is_shutdown():
# 发布激光雷达数据
laser_msg = self.create_laser_scan()
self.laser_pub.publish(laser_msg)

# 发布位姿数据
pose_msg = self.create_pose(
x=1.0 * counter,
y=0.5 * counter,
z=0.0
)
self.pose_pub.publish(pose_msg)

counter += 1
rate.sleep()

def stats_thread(self):
"""统计信息线程"""
rate = rospy.Rate(1) # 1 Hz

while not rospy.is_shutdown():
total_messages = (self.stats['laser_messages'] +
self.stats['pose_messages'])

rospy.loginfo(f"Topic Statistics: "
f"Laser: {self.stats['laser_messages']}, "
f"Pose: {self.stats['pose_messages']}, "
f"Total: {total_messages}, "
f"Bandwidth: {self.stats['total_bandwidth']} bytes")

rate.sleep()

def run_demo(self):
"""运行演示"""
rospy.loginfo("Starting topic mechanism demo...")

# 启动发布者线程
pub_thread = threading.Thread(target=self.publisher_thread)
pub_thread.daemon = True
pub_thread.start()

# 启动统计线程
stats_thread = threading.Thread(target=self.stats_thread)
stats_thread.daemon = True
stats_thread.start()

# 主线程处理回调
rospy.spin()

if __name__ == '__main__':
try:
demo = TopicMechanismDemo()
demo.run_demo()
except rospy.ROSInterruptException:
pass

服务(Services)机制详解

服务调用的实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_srvs.srv import AddTwoInts, AddTwoIntsResponse, SetBool, SetBoolResponse
from geometry_msgs.srv import GetMap
import threading
import time

class ServiceMechanismDemo:
def __init__(self):
rospy.init_node('service_mechanism_demo')

# 创建服务服务器
self.add_service = rospy.Service('add_two_ints', AddTwoInts, self.add_callback)
self.toggle_service = rospy.Service('toggle_service', SetBool, self.toggle_callback)

# 服务状态
self.service_enabled = True
self.call_count = 0

rospy.loginfo("Service mechanism demo initialized")

def add_callback(self, req):
"""加法服务回调"""
if not self.service_enabled:
rospy.logwarn("Service is disabled")
return AddTwoIntsResponse()

self.call_count += 1
result = req.a + req.b

rospy.loginfo(f"AddTwoInts service called: {req.a} + {req.b} = {result}")

# 模拟处理时间
time.sleep(0.1)

return AddTwoIntsResponse(result)

def toggle_callback(self, req):
"""切换服务回调"""
self.service_enabled = req.data
message = "enabled" if self.service_enabled else "disabled"

rospy.loginfo(f"Service {message}")

return SetBoolResponse(True, f"Service {message}")

def client_demo(self):
"""客户端演示"""
rospy.loginfo("Waiting for services...")

# 等待服务可用
rospy.wait_for_service('add_two_ints')
rospy.wait_for_service('toggle_service')

# 创建服务客户端
add_client = rospy.ServiceProxy('add_two_ints', AddTwoInts)
toggle_client = rospy.ServiceProxy('toggle_service', SetBool)

# 测试服务调用
for i in range(5):
try:
# 调用加法服务
response = add_client(i, i * 2)
rospy.loginfo(f"Client received: {response.sum}")

# 每3次调用后切换服务状态
if i % 3 == 2:
toggle_client(not self.service_enabled)

except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")

rospy.sleep(1)

def run_demo(self):
"""运行演示"""
# 启动客户端线程
client_thread = threading.Thread(target=self.client_demo)
client_thread.daemon = True
client_thread.start()

# 主线程处理服务请求
rospy.spin()

if __name__ == '__main__':
try:
demo = ServiceMechanismDemo()
demo.run_demo()
except rospy.ROSInterruptException:
pass

参数服务器(Parameter Server)

参数管理机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
import yaml
import json

class ParameterServerDemo:
def __init__(self):
rospy.init_node('parameter_server_demo')

# 设置各种类型的参数
self.setup_parameters()

# 创建参数监听器
self.param_sub = rospy.Subscriber('/parameter_updates',
rospy.AnyMsg,
self.param_update_callback)

def setup_parameters(self):
"""设置参数"""
# 字符串参数
rospy.set_param('/robot/name', 'TurtleBot')
rospy.set_param('/robot/model', 'Kobuki')

# 数值参数
rospy.set_param('/robot/max_speed', 1.0)
rospy.set_param('/robot/max_angular_speed', 2.0)

# 布尔参数
rospy.set_param('/robot/safety_enabled', True)
rospy.set_param('/robot/autonomous_mode', False)

# 列表参数
rospy.set_param('/robot/sensors', ['laser', 'camera', 'imu'])

# 字典参数
config = {
'laser': {'frequency': 10, 'range': 10.0},
'camera': {'width': 640, 'height': 480, 'fps': 30}
}
rospy.set_param('/robot/sensor_config', config)

rospy.loginfo("Parameters set successfully")

def param_update_callback(self, msg):
"""参数更新回调"""
rospy.loginfo("Parameter update received")

def demonstrate_parameter_operations(self):
"""演示参数操作"""
rospy.loginfo("=== Parameter Operations Demo ===")

# 读取参数
robot_name = rospy.get_param('/robot/name', 'Unknown')
max_speed = rospy.get_param('/robot/max_speed', 0.0)
sensors = rospy.get_param('/robot/sensors', [])

rospy.loginfo(f"Robot name: {robot_name}")
rospy.loginfo(f"Max speed: {max_speed}")
rospy.loginfo(f"Sensors: {sensors}")

# 检查参数是否存在
if rospy.has_param('/robot/safety_enabled'):
safety_enabled = rospy.get_param('/robot/safety_enabled')
rospy.loginfo(f"Safety enabled: {safety_enabled}")

# 获取参数名称列表
param_names = rospy.get_param_names()
rospy.loginfo(f"Total parameters: {len(param_names)}")

# 搜索参数
robot_params = [name for name in param_names if name.startswith('/robot/')]
rospy.loginfo(f"Robot parameters: {robot_params}")

# 删除参数
if rospy.has_param('/robot/temp_param'):
rospy.delete_param('/robot/temp_param')
rospy.loginfo("Temporary parameter deleted")

def load_parameters_from_file(self, filename):
"""从文件加载参数"""
try:
with open(filename, 'r') as f:
params = yaml.safe_load(f)

# 递归设置参数
self._set_params_recursive('', params)
rospy.loginfo(f"Parameters loaded from {filename}")

except Exception as e:
rospy.logerr(f"Failed to load parameters: {e}")

def _set_params_recursive(self, prefix, params):
"""递归设置参数"""
for key, value in params.items():
param_name = f"{prefix}/{key}" if prefix else f"/{key}"

if isinstance(value, dict):
self._set_params_recursive(param_name, value)
else:
rospy.set_param(param_name, value)

def save_parameters_to_file(self, filename):
"""保存参数到文件"""
try:
params = {}
param_names = rospy.get_param_names()

# 构建参数字典
for param_name in param_names:
value = rospy.get_param(param_name)
self._set_dict_value(params, param_name, value)

# 保存到文件
with open(filename, 'w') as f:
yaml.dump(params, f, default_flow_style=False)

rospy.loginfo(f"Parameters saved to {filename}")

except Exception as e:
rospy.logerr(f"Failed to save parameters: {e}")

def _set_dict_value(self, d, key_path, value):
"""设置字典值"""
keys = key_path.strip('/').split('/')
current = d

for key in keys[:-1]:
if key not in current:
current[key] = {}
current = current[key]

current[keys[-1]] = value

def run_demo(self):
"""运行演示"""
# 演示参数操作
self.demonstrate_parameter_operations()

# 保存参数到文件
self.save_parameters_to_file('/tmp/robot_params.yaml')

# 保持节点运行
rospy.spin()

if __name__ == '__main__':
try:
demo = ParameterServerDemo()
demo.run_demo()
except rospy.ROSInterruptException:
pass

总结

ROS1的核心概念体现了分布式系统的设计思想:

  1. 节点(Nodes):提供模块化的功能单元
  2. 话题(Topics):实现异步的发布-订阅通信
  3. 服务(Services):提供同步的请求-响应通信
  4. 参数服务器:集中管理配置参数

这些机制共同构成了ROS1的通信基础设施,为机器人系统的模块化开发提供了强大的支持。理解这些核心概念的实现原理,有助于更好地设计和优化ROS1应用程序。