ROS1消息系统深度剖析 - 消息定义、序列化与传输机制
前言
ROS1的消息系统是整个通信架构的核心,它定义了节点间交换数据的格式和传输方式。本文将深入分析ROS1消息系统的设计原理、消息定义机制、序列化过程以及传输优化技术。
ROS1消息系统架构
消息系统组件
1 2 3 4 5 6 7 8 9 10
| ┌─────────────────────────────────────────────────────────────┐ │ ROS Message System │ ├─────────────────────────────────────────────────────────────┤ │ Message Definition │ Serialization │ Transport │ │ ┌─────────────────┐ │ ┌─────────────┐ │ ┌─────────────┐ │ │ │ .msg Files │ │ │ MD5 Sum │ │ │ TCP/UDP │ │ │ │ .srv Files │ │ │ CRC32 │ │ │ Shared Mem │ │ │ │ .action Files │ │ │ Binary Data │ │ │ In-Memory │ │ │ └─────────────────┘ │ └─────────────┘ │ └─────────────┘ │ └─────────────────────────────────────────────────────────────┘
|
消息定义机制
自定义消息类型
1. 创建消息定义文件
1 2 3 4
| mkdir -p ~/catkin_ws/src/my_message_package/msg mkdir -p ~/catkin_ws/src/my_message_package/srv mkdir -p ~/catkin_ws/src/my_message_package/action
|
自定义消息类型 (CustomMessage.msg):
1 2 3 4 5 6 7 8 9
| # CustomMessage.msg Header header string robot_name int32 robot_id float64 battery_level geometry_msgs/Pose current_pose sensor_msgs/LaserScan laser_data bool emergency_stop string[] sensor_status
|
自定义服务类型 (RobotControl.srv):
1 2 3 4 5 6 7 8 9 10
| # RobotControl.srv # 请求部分 string command geometry_msgs/Pose target_pose float64 max_speed --- # 响应部分 bool success string message float64 execution_time
|
自定义动作类型 (NavigateToGoal.action):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| # NavigateToGoal.action # 目标 geometry_msgs/PoseStamped target_pose float64 max_speed --- # 结果 bool success string error_message float64 total_distance duration execution_time --- # 反馈 geometry_msgs/PoseStamped current_pose float64 progress_percentage float64 remaining_distance
|
2. 消息包配置
package.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <?xml version="1.0"?> <package format="2"> <name>my_message_package</name> <version>1.0.0</version> <description>Custom message definitions</description> <maintainer email="developer@example.com">Developer</maintainer> <license>MIT</license> <buildtool_depend>catkin</buildtool_depend> <build_depend>message_generation</build_depend> <build_depend>std_msgs</build_depend> <build_depend>geometry_msgs</build_depend> <build_depend>sensor_msgs</build_depend> <build_depend>actionlib_msgs</build_depend> <exec_depend>message_runtime</exec_depend> <exec_depend>std_msgs</exec_depend> <exec_depend>geometry_msgs</exec_depend> <exec_depend>sensor_msgs</exec_depend> <exec_depend>actionlib</exec_depend> <export> </export> </package>
|
CMakeLists.txt:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| cmake_minimum_required(VERSION 3.0.2) project(my_message_package)
find_package(catkin REQUIRED COMPONENTS message_generation std_msgs geometry_msgs sensor_msgs actionlib_msgs )
add_message_files( FILES CustomMessage.msg )
add_service_files( FILES RobotControl.srv )
add_action_files( FILES NavigateToGoal.action )
generate_messages( DEPENDENCIES std_msgs geometry_msgs sensor_msgs actionlib_msgs )
catkin_package( CATKIN_DEPENDS message_runtime )
|
消息序列化机制
序列化过程详解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
import rospy from std_msgs.msg import Header, String, Int32, Float64 from geometry_msgs.msg import Pose, Point, Quaternion from sensor_msgs.msg import LaserScan import struct import hashlib
class MessageSerializationDemo: def __init__(self): rospy.init_node('message_serialization_demo') def demonstrate_basic_serialization(self): """演示基本消息序列化""" rospy.loginfo("=== Basic Message Serialization ===") string_msg = String() string_msg.data = "Hello ROS!" int_msg = Int32() int_msg.data = 42 float_msg = Float64() float_msg.data = 3.14159 string_serialized = string_msg.serialize() int_serialized = int_msg.serialize() float_serialized = float_msg.serialize() rospy.loginfo(f"String message size: {len(string_serialized)} bytes") rospy.loginfo(f"Int32 message size: {len(int_serialized)} bytes") rospy.loginfo(f"Float64 message size: {len(float_serialized)} bytes") string_deserialized = String() string_deserialized.deserialize(string_serialized) int_deserialized = Int32() int_deserialized.deserialize(int_serialized) float_deserialized = Float64() float_deserialized.deserialize(float_serialized) rospy.loginfo(f"Deserialized string: {string_deserialized.data}") rospy.loginfo(f"Deserialized int: {int_deserialized.data}") rospy.loginfo(f"Deserialized float: {float_deserialized.data}") def demonstrate_complex_serialization(self): """演示复杂消息序列化""" rospy.loginfo("=== Complex Message Serialization ===") pose_msg = Pose() pose_msg.position = Point(1.0, 2.0, 3.0) pose_msg.orientation = Quaternion(0.0, 0.0, 0.0, 1.0) pose_serialized = pose_msg.serialize() rospy.loginfo(f"Pose message size: {len(pose_serialized)} bytes") pose_deserialized = Pose() pose_deserialized.deserialize(pose_serialized) rospy.loginfo(f"Deserialized pose position: " f"x={pose_deserialized.position.x}, " f"y={pose_deserialized.position.y}, " f"z={pose_deserialized.position.z}") def demonstrate_laser_scan_serialization(self): """演示激光雷达消息序列化""" rospy.loginfo("=== LaserScan Message Serialization ===") laser_msg = LaserScan() laser_msg.header.stamp = rospy.Time.now() laser_msg.header.frame_id = "laser_frame" laser_msg.angle_min = -3.14159 laser_msg.angle_max = 3.14159 laser_msg.angle_increment = 0.0174533 laser_msg.time_increment = 0.0 laser_msg.scan_time = 0.1 laser_msg.range_min = 0.1 laser_msg.range_max = 10.0 laser_msg.ranges = [1.0 + 0.5 * (i % 10) for i in range(360)] laser_msg.intensities = [100.0 + 50.0 * (i % 5) for i in range(360)] laser_serialized = laser_msg.serialize() rospy.loginfo(f"LaserScan message size: {len(laser_serialized)} bytes") rospy.loginfo(f"Number of range points: {len(laser_msg.ranges)}") raw_data_size = len(laser_msg.ranges) * 4 + len(laser_msg.intensities) * 4 compression_ratio = len(laser_serialized) / raw_data_size rospy.loginfo(f"Compression ratio: {compression_ratio:.2f}") def demonstrate_message_integrity(self): """演示消息完整性检查""" rospy.loginfo("=== Message Integrity Check ===") msg = String() msg.data = "Important message data" serialized = msg.serialize() md5_hash = hashlib.md5(serialized).hexdigest() rospy.loginfo(f"Message MD5 hash: {md5_hash}") corrupted = bytearray(serialized) if len(corrupted) > 0: corrupted[0] = (corrupted[0] + 1) % 256 try: msg_deserialized = String() msg_deserialized.deserialize(bytes(corrupted)) rospy.loginfo("Message deserialization successful") except Exception as e: rospy.logwarn(f"Message deserialization failed: {e}") def run_demo(self): """运行序列化演示""" self.demonstrate_basic_serialization() self.demonstrate_complex_serialization() self.demonstrate_laser_scan_serialization() self.demonstrate_message_integrity()
if __name__ == '__main__': try: demo = MessageSerializationDemo() demo.run_demo() except rospy.ROSInterruptException: pass
|
消息传输优化
高效消息传输机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
import rospy from sensor_msgs.msg import LaserScan, Image from geometry_msgs.msg import Twist import threading import time import numpy as np
class MessageTransportOptimization: def __init__(self): rospy.init_node('message_transport_optimization') self.laser_pub = rospy.Publisher('/optimized/scan', LaserScan, queue_size=1) self.cmd_vel_pub = rospy.Publisher('/optimized/cmd_vel', Twist, queue_size=1) self.laser_sub = rospy.Subscriber('/optimized/scan', LaserScan, self.laser_callback) self.stats = { 'messages_sent': 0, 'messages_received': 0, 'total_bytes': 0, 'start_time': time.time() } def laser_callback(self, msg): """激光雷达回调""" self.stats['messages_received'] += 1 current_time = time.time() message_time = msg.header.stamp.to_sec() latency = current_time - message_time rospy.loginfo(f"Received laser scan, latency: {latency*1000:.2f}ms") def create_optimized_laser_scan(self): """创建优化的激光雷达数据""" msg = LaserScan() msg.header.stamp = rospy.Time.now() msg.header.frame_id = 'laser_frame' msg.angle_min = -3.14159 msg.angle_max = 3.14159 msg.angle_increment = 0.0349066 msg.time_increment = 0.0 msg.scan_time = 0.05 msg.range_min = 0.1 msg.range_max = 8.0 num_points = 180 msg.ranges = [1.0 + 0.3 * np.sin(i * 0.1) for i in range(num_points)] msg.intensities = [] return msg def publisher_thread(self): """发布者线程""" rate = rospy.Rate(20) while not rospy.is_shutdown(): laser_msg = self.create_optimized_laser_scan() self.laser_pub.publish(laser_msg) msg_size = len(laser_msg.serialize()) self.stats['messages_sent'] += 1 self.stats['total_bytes'] += msg_size rate.sleep() def stats_thread(self): """统计线程""" rate = rospy.Rate(1) while not rospy.is_shutdown(): elapsed_time = time.time() - self.stats['start_time'] if elapsed_time > 0: msg_rate = self.stats['messages_sent'] / elapsed_time byte_rate = self.stats['total_bytes'] / elapsed_time / 1024 rospy.loginfo(f"Transport Stats: " f"Rate: {msg_rate:.1f} msg/s, " f"Bandwidth: {byte_rate:.1f} KB/s, " f"Total messages: {self.stats['messages_sent']}") rate.sleep() def run_demo(self): """运行传输优化演示""" rospy.loginfo("Starting message transport optimization demo...") pub_thread = threading.Thread(target=self.publisher_thread) pub_thread.daemon = True pub_thread.start() stats_thread = threading.Thread(target=self.stats_thread) stats_thread.daemon = True stats_thread.start() rospy.spin()
if __name__ == '__main__': try: demo = MessageTransportOptimization() demo.run_demo() except rospy.ROSInterruptException: pass
|
消息类型系统
消息类型注册和管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
import rospy from std_msgs.msg import * from geometry_msgs.msg import * from sensor_msgs.msg import * import rospkg
class MessageTypeSystem: def __init__(self): rospy.init_node('message_type_system') self.message_types = {} self.register_builtin_types() def register_builtin_types(self): """注册内置消息类型""" self.message_types['std_msgs/String'] = String self.message_types['std_msgs/Int32'] = Int32 self.message_types['std_msgs/Float64'] = Float64 self.message_types['std_msgs/Bool'] = Bool self.message_types['std_msgs/Header'] = Header self.message_types['geometry_msgs/Point'] = Point self.message_types['geometry_msgs/Pose'] = Pose self.message_types['geometry_msgs/Twist'] = Twist self.message_types['sensor_msgs/LaserScan'] = LaserScan self.message_types['sensor_msgs/Image'] = Image rospy.loginfo(f"Registered {len(self.message_types)} message types") def create_message_instance(self, message_type): """创建消息实例""" if message_type in self.message_types: msg_class = self.message_types[message_type] return msg_class() else: rospy.logerr(f"Unknown message type: {message_type}") return None def demonstrate_message_creation(self): """演示消息创建""" rospy.loginfo("=== Message Creation Demo ===") message_examples = [ 'std_msgs/String', 'std_msgs/Int32', 'geometry_msgs/Point', 'geometry_msgs/Pose' ] for msg_type in message_examples: msg = self.create_message_instance(msg_type) if msg: rospy.loginfo(f"Created message of type: {msg_type}") if isinstance(msg, String): msg.data = "Example string message" elif isinstance(msg, Int32): msg.data = 42 elif isinstance(msg, Point): msg.x = 1.0 msg.y = 2.0 msg.z = 3.0 elif isinstance(msg, Pose): msg.position = Point(1.0, 2.0, 3.0) msg.orientation = Quaternion(0.0, 0.0, 0.0, 1.0) serialized = msg.serialize() rospy.loginfo(f"Message size: {len(serialized)} bytes") def analyze_message_structure(self, msg): """分析消息结构""" rospy.loginfo(f"=== Message Structure Analysis ===") rospy.loginfo(f"Message type: {type(msg).__name__}") rospy.loginfo(f"Message fields: {dir(msg)}") fields = [attr for attr in dir(msg) if not attr.startswith('_')] rospy.loginfo(f"Public fields: {fields}") serialized = msg.serialize() rospy.loginfo(f"Serialized size: {len(serialized)} bytes") def run_demo(self): """运行消息类型系统演示""" self.demonstrate_message_creation() pose_msg = Pose() pose_msg.position = Point(1.0, 2.0, 3.0) pose_msg.orientation = Quaternion(0.0, 0.0, 0.0, 1.0) self.analyze_message_structure(pose_msg)
if __name__ == '__main__': try: demo = MessageTypeSystem() demo.run_demo() except rospy.ROSInterruptException: pass
|
总结
ROS1消息系统的设计体现了以下特点:
- 类型安全:强类型消息定义确保数据完整性
- 序列化优化:高效的二进制序列化减少传输开销
- 模块化设计:支持自定义消息类型和扩展
- 传输优化:多种传输机制适应不同场景需求
理解ROS1消息系统的实现原理,有助于:
- 设计高效的消息类型
- 优化数据传输性能
- 调试通信问题
- 扩展消息系统功能
这些知识对于开发高性能的ROS1应用程序至关重要。