ROS2与DDS架构深度解析 - 从ROS1到ROS2的通信机制演进

ROS2与DDS架构深度解析 - 从ROS1到ROS2的通信机制演进

前言

ROS2采用了DDS(Data Distribution Service)作为底层通信中间件,这是相对于ROS1的重大架构改进。本文将深入分析ROS2的DDS架构设计、通信机制以及相比ROS1的改进之处。

ROS1 vs ROS2 架构对比

通信架构演进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ROS1 架构:
┌─────────────────────────────────────────────────────────────┐
│ ROS Master │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Parameter │ │ Service │ │ Topic │ │
│ │ Server │ │ Registry │ │ Registry │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

┌─────────┴─────────┐
│ │
┌───────▼───────┐ ┌───────▼───────┐
│ Node A │ │ Node B │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │Publisher │ │ │ │Subscriber │ │
│ └───────────┘ │ │ └───────────┘ │
└───────────────┘ └───────────────┘

ROS2 架构:
┌─────────────────────────────────────────────────────────────┐
│ DDS Middleware │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Discovery │ │ Transport │ │ Serialize │ │
│ │ Service │ │ Layer │ │ Layer │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

┌─────────┴─────────┐
│ │
┌───────▼───────┐ ┌───────▼───────┐
│ Node A │ │ Node B │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │Publisher │ │ │ │Subscriber │ │
│ └───────────┘ │ │ └───────────┘ │
└───────────────┘ └───────────────┘

DDS核心概念

DDS标准架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <geometry_msgs/msg/twist.hpp>

class DDSArchitectureDemo : public rclcpp::Node {
public:
DDSArchitectureDemo() : Node("dds_architecture_demo") {
// 创建发布者
string_publisher_ = this->create_publisher<std_msgs::msg::String>(
"/demo/string", 10);

twist_publisher_ = this->create_publisher<geometry_msgs::msg::Twist>(
"/cmd_vel", 10);

// 创建订阅者
string_subscription_ = this->create_subscription<std_msgs::msg::String>(
"/demo/string", 10,
std::bind(&DDSArchitectureDemo::string_callback, this, std::placeholders::_1));

twist_subscription_ = this->create_subscription<geometry_msgs::msg::Twist>(
"/cmd_vel", 10,
std::bind(&DDSArchitectureDemo::twist_callback, this, std::placeholders::_1));

// 创建定时器
timer_ = this->create_wall_timer(
std::chrono::milliseconds(500),
std::bind(&DDSArchitectureDemo::timer_callback, this));

RCLCPP_INFO(this->get_logger(), "DDS Architecture Demo node started");
}

private:
void string_callback(const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Received string: '%s'", msg->data.c_str());
}

void twist_callback(const geometry_msgs::msg::Twist::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Received twist: linear.x=%.2f, angular.z=%.2f",
msg->linear.x, msg->angular.z);
}

void timer_callback() {
// 发布字符串消息
auto string_msg = std_msgs::msg::String();
string_msg.data = "Hello ROS2 DDS!";
string_publisher_->publish(string_msg);

// 发布Twist消息
auto twist_msg = geometry_msgs::msg::Twist();
twist_msg.linear.x = 0.5;
twist_msg.angular.z = 0.2;
twist_publisher_->publish(twist_msg);
}

// 发布者
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr string_publisher_;
rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr twist_publisher_;

// 订阅者
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr string_subscription_;
rclcpp::Subscription<geometry_msgs::msg::Twist>::SharedPtr twist_subscription_;

// 定时器
rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
auto node = std::make_shared<DDSArchitectureDemo>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}

ROS2服务质量(QoS)机制

QoS配置详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <sensor_msgs/msg/laser_scan.hpp>

class QoSDemo : public rclcpp::Node {
public:
QoSDemo() : Node("qos_demo") {
// 创建不同的QoS配置
setup_qos_profiles();

// 创建发布者和订阅者
create_publishers();
create_subscribers();

// 创建定时器
timer_ = this->create_wall_timer(
std::chrono::milliseconds(100),
std::bind(&QoSDemo::timer_callback, this));

RCLCPP_INFO(this->get_logger(), "QoS Demo node started");
}

private:
void setup_qos_profiles() {
// 可靠性配置
reliable_qos_ = rclcpp::QoS(10)
.reliability(RMW_QOS_RELIABILITY_RELIABLE)
.durability(RMW_QOS_DURABILITY_TRANSIENT_LOCAL);

best_effort_qos_ = rclcpp::QoS(10)
.reliability(RMW_QOS_RELIABILITY_BEST_EFFORT)
.durability(RMW_QOS_DURABILITY_VOLATILE);

// 实时性配置
realtime_qos_ = rclcpp::QoS(1)
.reliability(RMW_QOS_RELIABILITY_RELIABLE)
.durability(RMW_QOS_DURABILITY_VOLATILE)
.deadline(std::chrono::milliseconds(50))
.liveliness_lease_duration(std::chrono::milliseconds(100));

// 传感器数据配置
sensor_qos_ = rclcpp::QoS(5)
.reliability(RMW_QOS_RELIABILITY_BEST_EFFORT)
.durability(RMW_QOS_DURABILITY_VOLATILE)
.history(RMW_QOS_HISTORY_KEEP_LAST);
}

void create_publishers() {
// 可靠发布者
reliable_publisher_ = this->create_publisher<std_msgs::msg::String>(
"/qos/reliable", reliable_qos_);

// 尽力而为发布者
best_effort_publisher_ = this->create_publisher<std_msgs::msg::String>(
"/qos/best_effort", best_effort_qos_);

// 实时发布者
realtime_publisher_ = this->create_publisher<std_msgs::msg::String>(
"/qos/realtime", realtime_qos_);

// 传感器数据发布者
sensor_publisher_ = this->create_publisher<sensor_msgs::msg::LaserScan>(
"/qos/sensor", sensor_qos_);
}

void create_subscribers() {
// 可靠订阅者
reliable_subscription_ = this->create_subscription<std_msgs::msg::String>(
"/qos/reliable", reliable_qos_,
std::bind(&QoSDemo::reliable_callback, this, std::placeholders::_1));

// 尽力而为订阅者
best_effort_subscription_ = this->create_subscription<std_msgs::msg::String>(
"/qos/best_effort", best_effort_qos_,
std::bind(&QoSDemo::best_effort_callback, this, std::placeholders::_1));

// 实时订阅者
realtime_subscription_ = this->create_subscription<std_msgs::msg::String>(
"/qos/realtime", realtime_qos_,
std::bind(&QoSDemo::realtime_callback, this, std::placeholders::_1));

// 传感器数据订阅者
sensor_subscription_ = this->create_subscription<sensor_msgs::msg::LaserScan>(
"/qos/sensor", sensor_qos_,
std::bind(&QoSDemo::sensor_callback, this, std::placeholders::_1));
}

void reliable_callback(const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Reliable: %s", msg->data.c_str());
}

void best_effort_callback(const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Best Effort: %s", msg->data.c_str());
}

void realtime_callback(const std_msgs::msg::String::SharedPtr msg) {
auto now = this->now();
auto msg_time = msg->header.stamp;
auto latency = (now - msg_time).nanoseconds() / 1000000.0; // ms

RCLCPP_INFO(this->get_logger(), "Realtime: %s (latency: %.2f ms)",
msg->data.c_str(), latency);
}

void sensor_callback(const sensor_msgs::msg::LaserScan::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Sensor: %zu ranges", msg->ranges.size());
}

void timer_callback() {
auto now = this->now();

// 发布可靠消息
auto reliable_msg = std_msgs::msg::String();
reliable_msg.data = "Reliable message " + std::to_string(counter_);
reliable_msg.header.stamp = now;
reliable_publisher_->publish(reliable_msg);

// 发布尽力而为消息
auto best_effort_msg = std_msgs::msg::String();
best_effort_msg.data = "Best effort message " + std::to_string(counter_);
best_effort_msg.header.stamp = now;
best_effort_publisher_->publish(best_effort_msg);

// 发布实时消息
auto realtime_msg = std_msgs::msg::String();
realtime_msg.data = "Realtime message " + std::to_string(counter_);
realtime_msg.header.stamp = now;
realtime_publisher_->publish(realtime_msg);

// 发布传感器数据
auto sensor_msg = sensor_msgs::msg::LaserScan();
sensor_msg.header.stamp = now;
sensor_msg.header.frame_id = "laser_frame";
sensor_msg.angle_min = -3.14159;
sensor_msg.angle_max = 3.14159;
sensor_msg.angle_increment = 0.0174533;
sensor_msg.range_min = 0.1;
sensor_msg.range_max = 10.0;
sensor_msg.ranges.resize(360, 1.0);
sensor_publisher_->publish(sensor_msg);

counter_++;
}

// QoS配置
rclcpp::QoS reliable_qos_;
rclcpp::QoS best_effort_qos_;
rclcpp::QoS realtime_qos_;
rclcpp::QoS sensor_qos_;

// 发布者
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr reliable_publisher_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr best_effort_publisher_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr realtime_publisher_;
rclcpp::Publisher<sensor_msgs::msg::LaserScan>::SharedPtr sensor_publisher_;

// 订阅者
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr reliable_subscription_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr best_effort_subscription_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr realtime_subscription_;
rclcpp::Subscription<sensor_msgs::msg::LaserScan>::SharedPtr sensor_subscription_;

// 定时器
rclcpp::TimerBase::SharedPtr timer_;

// 计数器
int counter_ = 0;
};

int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
auto node = std::make_shared<QoSDemo>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}

ROS2生命周期管理

生命周期节点实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_lifecycle/lifecycle_node.hpp>
#include <std_msgs/msg/string.hpp>

class LifecycleDemo : public rclcpp_lifecycle::LifecycleNode {
public:
LifecycleDemo() : LifecycleNode("lifecycle_demo") {
// 创建发布者
publisher_ = this->create_publisher<std_msgs::msg::String>("/lifecycle/demo", 10);

// 创建定时器
timer_ = this->create_wall_timer(
std::chrono::milliseconds(1000),
std::bind(&LifecycleDemo::timer_callback, this));

RCLCPP_INFO(this->get_logger(), "Lifecycle node created");
}

// 配置状态回调
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
on_configure(const rclcpp_lifecycle::State &) override {
RCLCPP_INFO(this->get_logger(), "Configuring lifecycle node");

// 初始化资源
counter_ = 0;
is_configured_ = true;

return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS;
}

// 激活状态回调
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
on_activate(const rclcpp_lifecycle::State &) override {
RCLCPP_INFO(this->get_logger(), "Activating lifecycle node");

// 激活发布者
publisher_->on_activate();

// 启动工作
is_active_ = true;

return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS;
}

// 停用状态回调
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
on_deactivate(const rclcpp_lifecycle::State &) override {
RCLCPP_INFO(this->get_logger(), "Deactivating lifecycle node");

// 停用发布者
publisher_->on_deactivate();

// 停止工作
is_active_ = false;

return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS;
}

// 清理状态回调
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
on_cleanup(const rclcpp_lifecycle::State &) override {
RCLCPP_INFO(this->get_logger(), "Cleaning up lifecycle node");

// 清理资源
is_configured_ = false;

return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS;
}

// 关闭状态回调
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
on_shutdown(const rclcpp_lifecycle::State &) override {
RCLCPP_INFO(this->get_logger(), "Shutting down lifecycle node");

// 关闭资源
is_active_ = false;
is_configured_ = false;

return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS;
}

private:
void timer_callback() {
if (is_active_) {
auto msg = std_msgs::msg::String();
msg.data = "Lifecycle message " + std::to_string(counter_++);
publisher_->publish(msg);

RCLCPP_INFO(this->get_logger(), "Published: %s", msg.data.c_str());
}
}

rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr timer_;

bool is_configured_ = false;
bool is_active_ = false;
int counter_ = 0;
};

int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);

auto node = std::make_shared<LifecycleDemo>();

// 启动生命周期节点
node->configure();
node->activate();

rclcpp::spin(node->get_node_base_interface());

rclcpp::shutdown();
return 0;
}

ROS2组件系统

组件化节点实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_components/register_node_macro.hpp>
#include <std_msgs/msg/string.hpp>

class ComponentDemo : public rclcpp::Node {
public:
ComponentDemo(const rclcpp::NodeOptions &options)
: Node("component_demo", options) {

// 声明参数
this->declare_parameter("publish_rate", 1.0);
this->declare_parameter("message_prefix", "Component");

// 获取参数
double publish_rate = this->get_parameter("publish_rate").as_double();
std::string message_prefix = this->get_parameter("message_prefix").as_string();

// 创建发布者
publisher_ = this->create_publisher<std_msgs::msg::String>(
"/component/demo", 10);

// 创建订阅者
subscription_ = this->create_subscription<std_msgs::msg::String>(
"/component/demo", 10,
std::bind(&ComponentDemo::callback, this, std::placeholders::_1));

// 创建定时器
timer_ = this->create_wall_timer(
std::chrono::milliseconds(static_cast<int>(1000.0 / publish_rate)),
std::bind(&ComponentDemo::timer_callback, this));

message_prefix_ = message_prefix;

RCLCPP_INFO(this->get_logger(), "Component demo started with rate: %.1f Hz", publish_rate);
}

private:
void callback(const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Received: %s", msg->data.c_str());
}

void timer_callback() {
auto msg = std_msgs::msg::String();
msg.data = message_prefix_ + " message " + std::to_string(counter_++);
publisher_->publish(msg);
}

rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
rclcpp::TimerBase::SharedPtr timer_;

std::string message_prefix_;
int counter_ = 0;
};

// 注册组件
RCLCPP_COMPONENTS_REGISTER_NODE(ComponentDemo)

组件启动文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!-- component_launch.xml -->
<launch>
<!-- 启动组件 -->
<node pkg="component_package"
exec="component_demo_container"
name="component_container"
output="screen">
<param name="publish_rate" value="2.0"/>
<param name="message_prefix" value="Container"/>
</node>

<!-- 使用组件管理器 -->
<node pkg="rclcpp_components"
exec="component_container"
name="component_manager"
output="screen">
<param name="plugin_name" value="ComponentDemo"/>
<param name="plugin_namespace" value=""/>
</node>
</launch>

ROS2多进程通信

分布式节点通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <geometry_msgs/msg/twist.hpp>

class DistributedNode : public rclcpp::Node {
public:
DistributedNode(const std::string &node_name) : Node(node_name) {
// 创建发布者
cmd_vel_publisher_ = this->create_publisher<geometry_msgs::msg::Twist>(
"/cmd_vel", 10);

// 创建订阅者
status_subscription_ = this->create_subscription<std_msgs::msg::String>(
"/robot/status", 10,
std::bind(&DistributedNode::status_callback, this, std::placeholders::_1));

// 创建服务客户端
service_client_ = this->create_client<std_srvs::srv::SetBool>("/robot/emergency_stop");

// 创建定时器
timer_ = this->create_wall_timer(
std::chrono::milliseconds(100),
std::bind(&DistributedNode::control_loop, this));

RCLCPP_INFO(this->get_logger(), "Distributed node %s started", node_name.c_str());
}

private:
void status_callback(const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Robot status: %s", msg->data.c_str());
}

void control_loop() {
// 发布控制命令
auto twist_msg = geometry_msgs::msg::Twist();
twist_msg.linear.x = 0.5;
twist_msg.angular.z = 0.2;
cmd_vel_publisher_->publish(twist_msg);

// 定期检查服务可用性
if (service_client_->service_is_ready()) {
auto request = std::make_shared<std_srvs::srv::SetBool::Request>();
request->data = false;

auto future = service_client_->async_send_request(request);
// 注意:实际应用中应该使用适当的等待机制
}
}

rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr cmd_vel_publisher_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr status_subscription_;
rclcpp::Client<std_srvs::srv::SetBool>::SharedPtr service_client_;
rclcpp::TimerBase::SharedPtr timer_;
};

// 机器人控制节点
class RobotController : public DistributedNode {
public:
RobotController() : DistributedNode("robot_controller") {
// 机器人特定的初始化
}
};

// 导航节点
class NavigationNode : public DistributedNode {
public:
NavigationNode() : DistributedNode("navigation_node") {
// 导航特定的初始化
}
};

int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);

// 根据参数决定创建哪种节点
if (argc > 1 && std::string(argv[1]) == "controller") {
auto node = std::make_shared<RobotController>();
rclcpp::spin(node);
} else if (argc > 1 && std::string(argv[1]) == "navigation") {
auto node = std::make_shared<NavigationNode>();
rclcpp::spin(node);
} else {
auto node = std::make_shared<DistributedNode>("default_node");
rclcpp::spin(node);
}

rclcpp::shutdown();
return 0;
}

总结

ROS2相比ROS1的主要改进:

  1. DDS中间件:提供更强大的分布式通信能力
  2. QoS机制:支持多种服务质量策略
  3. 生命周期管理:更精细的节点状态控制
  4. 组件系统:支持模块化和可插拔架构
  5. 多进程支持:原生支持分布式部署
  6. 实时性:更好的实时性能保证

这些改进使得ROS2更适合工业应用和实时系统,为机器人系统的开发提供了更强大的基础设施。