ROS1消息系统剖析 - 消息定义、序列化与传输机制

ROS1消息系统深度剖析 - 消息定义、序列化与传输机制

前言

ROS1的消息系统是整个通信架构的核心,它定义了节点间交换数据的格式和传输方式。本文将深入分析ROS1消息系统的设计原理、消息定义机制、序列化过程以及传输优化技术。

ROS1消息系统架构

消息系统组件

1
2
3
4
5
6
7
8
9
10
┌─────────────────────────────────────────────────────────────┐
│ ROS Message System │
├─────────────────────────────────────────────────────────────┤
│ Message Definition │ Serialization │ Transport │
│ ┌─────────────────┐ │ ┌─────────────┐ │ ┌─────────────┐ │
│ │ .msg Files │ │ │ MD5 Sum │ │ │ TCP/UDP │ │
│ │ .srv Files │ │ │ CRC32 │ │ │ Shared Mem │ │
│ │ .action Files │ │ │ Binary Data │ │ │ In-Memory │ │
│ └─────────────────┘ │ └─────────────┘ │ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

消息定义机制

自定义消息类型

1. 创建消息定义文件

1
2
3
4
# 创建消息包
mkdir -p ~/catkin_ws/src/my_message_package/msg
mkdir -p ~/catkin_ws/src/my_message_package/srv
mkdir -p ~/catkin_ws/src/my_message_package/action

自定义消息类型 (CustomMessage.msg):

1
2
3
4
5
6
7
8
9
# CustomMessage.msg
Header header
string robot_name
int32 robot_id
float64 battery_level
geometry_msgs/Pose current_pose
sensor_msgs/LaserScan laser_data
bool emergency_stop
string[] sensor_status

自定义服务类型 (RobotControl.srv):

1
2
3
4
5
6
7
8
9
10
# RobotControl.srv
# 请求部分
string command
geometry_msgs/Pose target_pose
float64 max_speed
---
# 响应部分
bool success
string message
float64 execution_time

自定义动作类型 (NavigateToGoal.action):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# NavigateToGoal.action
# 目标
geometry_msgs/PoseStamped target_pose
float64 max_speed
---
# 结果
bool success
string error_message
float64 total_distance
duration execution_time
---
# 反馈
geometry_msgs/PoseStamped current_pose
float64 progress_percentage
float64 remaining_distance

2. 消息包配置

package.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?xml version="1.0"?>
<package format="2">
<name>my_message_package</name>
<version>1.0.0</version>
<description>Custom message definitions</description>

<maintainer email="developer@example.com">Developer</maintainer>
<license>MIT</license>

<buildtool_depend>catkin</buildtool_depend>

<build_depend>message_generation</build_depend>
<build_depend>std_msgs</build_depend>
<build_depend>geometry_msgs</build_depend>
<build_depend>sensor_msgs</build_depend>
<build_depend>actionlib_msgs</build_depend>

<exec_depend>message_runtime</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>actionlib</exec_depend>

<export>
</export>
</package>

CMakeLists.txt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
cmake_minimum_required(VERSION 3.0.2)
project(my_message_package)

find_package(catkin REQUIRED COMPONENTS
message_generation
std_msgs
geometry_msgs
sensor_msgs
actionlib_msgs
)

# 生成消息
add_message_files(
FILES
CustomMessage.msg
)

# 生成服务
add_service_files(
FILES
RobotControl.srv
)

# 生成动作
add_action_files(
FILES
NavigateToGoal.action
)

# 生成消息、服务和动作
generate_messages(
DEPENDENCIES
std_msgs
geometry_msgs
sensor_msgs
actionlib_msgs
)

catkin_package(
CATKIN_DEPENDS message_runtime
)

消息序列化机制

序列化过程详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_msgs.msg import Header, String, Int32, Float64
from geometry_msgs.msg import Pose, Point, Quaternion
from sensor_msgs.msg import LaserScan
import struct
import hashlib

class MessageSerializationDemo:
def __init__(self):
rospy.init_node('message_serialization_demo')

def demonstrate_basic_serialization(self):
"""演示基本消息序列化"""
rospy.loginfo("=== Basic Message Serialization ===")

# 创建基本消息
string_msg = String()
string_msg.data = "Hello ROS!"

int_msg = Int32()
int_msg.data = 42

float_msg = Float64()
float_msg.data = 3.14159

# 序列化消息
string_serialized = string_msg.serialize()
int_serialized = int_msg.serialize()
float_serialized = float_msg.serialize()

rospy.loginfo(f"String message size: {len(string_serialized)} bytes")
rospy.loginfo(f"Int32 message size: {len(int_serialized)} bytes")
rospy.loginfo(f"Float64 message size: {len(float_serialized)} bytes")

# 反序列化消息
string_deserialized = String()
string_deserialized.deserialize(string_serialized)

int_deserialized = Int32()
int_deserialized.deserialize(int_serialized)

float_deserialized = Float64()
float_deserialized.deserialize(float_serialized)

rospy.loginfo(f"Deserialized string: {string_deserialized.data}")
rospy.loginfo(f"Deserialized int: {int_deserialized.data}")
rospy.loginfo(f"Deserialized float: {float_deserialized.data}")

def demonstrate_complex_serialization(self):
"""演示复杂消息序列化"""
rospy.loginfo("=== Complex Message Serialization ===")

# 创建复杂消息
pose_msg = Pose()
pose_msg.position = Point(1.0, 2.0, 3.0)
pose_msg.orientation = Quaternion(0.0, 0.0, 0.0, 1.0)

# 序列化
pose_serialized = pose_msg.serialize()
rospy.loginfo(f"Pose message size: {len(pose_serialized)} bytes")

# 反序列化
pose_deserialized = Pose()
pose_deserialized.deserialize(pose_serialized)

rospy.loginfo(f"Deserialized pose position: "
f"x={pose_deserialized.position.x}, "
f"y={pose_deserialized.position.y}, "
f"z={pose_deserialized.position.z}")

def demonstrate_laser_scan_serialization(self):
"""演示激光雷达消息序列化"""
rospy.loginfo("=== LaserScan Message Serialization ===")

# 创建激光雷达消息
laser_msg = LaserScan()
laser_msg.header.stamp = rospy.Time.now()
laser_msg.header.frame_id = "laser_frame"

laser_msg.angle_min = -3.14159
laser_msg.angle_max = 3.14159
laser_msg.angle_increment = 0.0174533
laser_msg.time_increment = 0.0
laser_msg.scan_time = 0.1
laser_msg.range_min = 0.1
laser_msg.range_max = 10.0

# 生成360个距离数据点
laser_msg.ranges = [1.0 + 0.5 * (i % 10) for i in range(360)]
laser_msg.intensities = [100.0 + 50.0 * (i % 5) for i in range(360)]

# 序列化
laser_serialized = laser_msg.serialize()
rospy.loginfo(f"LaserScan message size: {len(laser_serialized)} bytes")
rospy.loginfo(f"Number of range points: {len(laser_msg.ranges)}")

# 计算数据压缩比
raw_data_size = len(laser_msg.ranges) * 4 + len(laser_msg.intensities) * 4
compression_ratio = len(laser_serialized) / raw_data_size
rospy.loginfo(f"Compression ratio: {compression_ratio:.2f}")

def demonstrate_message_integrity(self):
"""演示消息完整性检查"""
rospy.loginfo("=== Message Integrity Check ===")

# 创建消息
msg = String()
msg.data = "Important message data"

# 序列化
serialized = msg.serialize()

# 计算MD5哈希
md5_hash = hashlib.md5(serialized).hexdigest()
rospy.loginfo(f"Message MD5 hash: {md5_hash}")

# 模拟传输损坏
corrupted = bytearray(serialized)
if len(corrupted) > 0:
corrupted[0] = (corrupted[0] + 1) % 256

# 验证完整性
try:
msg_deserialized = String()
msg_deserialized.deserialize(bytes(corrupted))
rospy.loginfo("Message deserialization successful")
except Exception as e:
rospy.logwarn(f"Message deserialization failed: {e}")

def run_demo(self):
"""运行序列化演示"""
self.demonstrate_basic_serialization()
self.demonstrate_complex_serialization()
self.demonstrate_laser_scan_serialization()
self.demonstrate_message_integrity()

if __name__ == '__main__':
try:
demo = MessageSerializationDemo()
demo.run_demo()
except rospy.ROSInterruptException:
pass

消息传输优化

高效消息传输机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from sensor_msgs.msg import LaserScan, Image
from geometry_msgs.msg import Twist
import threading
import time
import numpy as np

class MessageTransportOptimization:
def __init__(self):
rospy.init_node('message_transport_optimization')

# 创建发布者
self.laser_pub = rospy.Publisher('/optimized/scan', LaserScan, queue_size=1)
self.cmd_vel_pub = rospy.Publisher('/optimized/cmd_vel', Twist, queue_size=1)

# 创建订阅者
self.laser_sub = rospy.Subscriber('/optimized/scan', LaserScan, self.laser_callback)

# 传输统计
self.stats = {
'messages_sent': 0,
'messages_received': 0,
'total_bytes': 0,
'start_time': time.time()
}

def laser_callback(self, msg):
"""激光雷达回调"""
self.stats['messages_received'] += 1

# 计算传输延迟
current_time = time.time()
message_time = msg.header.stamp.to_sec()
latency = current_time - message_time

rospy.loginfo(f"Received laser scan, latency: {latency*1000:.2f}ms")

def create_optimized_laser_scan(self):
"""创建优化的激光雷达数据"""
msg = LaserScan()
msg.header.stamp = rospy.Time.now()
msg.header.frame_id = 'laser_frame'

# 优化的激光雷达参数
msg.angle_min = -3.14159
msg.angle_max = 3.14159
msg.angle_increment = 0.0349066 # 2度,减少数据量
msg.time_increment = 0.0
msg.scan_time = 0.05 # 20Hz
msg.range_min = 0.1
msg.range_max = 8.0 # 减少最大距离

# 生成优化的距离数据(减少点数)
num_points = 180 # 从360减少到180
msg.ranges = [1.0 + 0.3 * np.sin(i * 0.1) for i in range(num_points)]

# 不包含强度数据以节省带宽
msg.intensities = []

return msg

def publisher_thread(self):
"""发布者线程"""
rate = rospy.Rate(20) # 20Hz

while not rospy.is_shutdown():
# 发布优化的激光雷达数据
laser_msg = self.create_optimized_laser_scan()
self.laser_pub.publish(laser_msg)

# 计算消息大小
msg_size = len(laser_msg.serialize())
self.stats['messages_sent'] += 1
self.stats['total_bytes'] += msg_size

rate.sleep()

def stats_thread(self):
"""统计线程"""
rate = rospy.Rate(1) # 1Hz

while not rospy.is_shutdown():
elapsed_time = time.time() - self.stats['start_time']

if elapsed_time > 0:
msg_rate = self.stats['messages_sent'] / elapsed_time
byte_rate = self.stats['total_bytes'] / elapsed_time / 1024 # KB/s

rospy.loginfo(f"Transport Stats: "
f"Rate: {msg_rate:.1f} msg/s, "
f"Bandwidth: {byte_rate:.1f} KB/s, "
f"Total messages: {self.stats['messages_sent']}")

rate.sleep()

def run_demo(self):
"""运行传输优化演示"""
rospy.loginfo("Starting message transport optimization demo...")

# 启动发布者线程
pub_thread = threading.Thread(target=self.publisher_thread)
pub_thread.daemon = True
pub_thread.start()

# 启动统计线程
stats_thread = threading.Thread(target=self.stats_thread)
stats_thread.daemon = True
stats_thread.start()

# 主线程处理回调
rospy.spin()

if __name__ == '__main__':
try:
demo = MessageTransportOptimization()
demo.run_demo()
except rospy.ROSInterruptException:
pass

消息类型系统

消息类型注册和管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
from std_msgs.msg import *
from geometry_msgs.msg import *
from sensor_msgs.msg import *
import rospkg

class MessageTypeSystem:
def __init__(self):
rospy.init_node('message_type_system')

# 消息类型注册表
self.message_types = {}
self.register_builtin_types()

def register_builtin_types(self):
"""注册内置消息类型"""
# 基本类型
self.message_types['std_msgs/String'] = String
self.message_types['std_msgs/Int32'] = Int32
self.message_types['std_msgs/Float64'] = Float64
self.message_types['std_msgs/Bool'] = Bool
self.message_types['std_msgs/Header'] = Header

# 几何类型
self.message_types['geometry_msgs/Point'] = Point
self.message_types['geometry_msgs/Pose'] = Pose
self.message_types['geometry_msgs/Twist'] = Twist

# 传感器类型
self.message_types['sensor_msgs/LaserScan'] = LaserScan
self.message_types['sensor_msgs/Image'] = Image

rospy.loginfo(f"Registered {len(self.message_types)} message types")

def create_message_instance(self, message_type):
"""创建消息实例"""
if message_type in self.message_types:
msg_class = self.message_types[message_type]
return msg_class()
else:
rospy.logerr(f"Unknown message type: {message_type}")
return None

def demonstrate_message_creation(self):
"""演示消息创建"""
rospy.loginfo("=== Message Creation Demo ===")

# 创建不同类型的消息
message_examples = [
'std_msgs/String',
'std_msgs/Int32',
'geometry_msgs/Point',
'geometry_msgs/Pose'
]

for msg_type in message_examples:
msg = self.create_message_instance(msg_type)
if msg:
rospy.loginfo(f"Created message of type: {msg_type}")

# 设置示例数据
if isinstance(msg, String):
msg.data = "Example string message"
elif isinstance(msg, Int32):
msg.data = 42
elif isinstance(msg, Point):
msg.x = 1.0
msg.y = 2.0
msg.z = 3.0
elif isinstance(msg, Pose):
msg.position = Point(1.0, 2.0, 3.0)
msg.orientation = Quaternion(0.0, 0.0, 0.0, 1.0)

# 序列化并显示大小
serialized = msg.serialize()
rospy.loginfo(f"Message size: {len(serialized)} bytes")

def analyze_message_structure(self, msg):
"""分析消息结构"""
rospy.loginfo(f"=== Message Structure Analysis ===")
rospy.loginfo(f"Message type: {type(msg).__name__}")
rospy.loginfo(f"Message fields: {dir(msg)}")

# 分析字段
fields = [attr for attr in dir(msg) if not attr.startswith('_')]
rospy.loginfo(f"Public fields: {fields}")

# 序列化分析
serialized = msg.serialize()
rospy.loginfo(f"Serialized size: {len(serialized)} bytes")

def run_demo(self):
"""运行消息类型系统演示"""
self.demonstrate_message_creation()

# 分析特定消息结构
pose_msg = Pose()
pose_msg.position = Point(1.0, 2.0, 3.0)
pose_msg.orientation = Quaternion(0.0, 0.0, 0.0, 1.0)

self.analyze_message_structure(pose_msg)

if __name__ == '__main__':
try:
demo = MessageTypeSystem()
demo.run_demo()
except rospy.ROSInterruptException:
pass

总结

ROS1消息系统的设计体现了以下特点:

  1. 类型安全:强类型消息定义确保数据完整性
  2. 序列化优化:高效的二进制序列化减少传输开销
  3. 模块化设计:支持自定义消息类型和扩展
  4. 传输优化:多种传输机制适应不同场景需求

理解ROS1消息系统的实现原理,有助于:

  • 设计高效的消息类型
  • 优化数据传输性能
  • 调试通信问题
  • 扩展消息系统功能

这些知识对于开发高性能的ROS1应用程序至关重要。