Apollo自动驾驶框架深度解析 - Cyber RT通信机制与架构设计

Apollo自动驾驶框架深度解析 - Cyber RT通信机制与架构设计

前言

Apollo是百度开源的自动驾驶平台,其核心通信框架Cyber RT为大规模分布式系统提供了高性能、低延迟的通信解决方案。本文将深入分析Apollo框架的架构设计和Cyber RT的通信机制。

Apollo架构概述

整体架构

Apollo采用模块化设计,主要包含以下核心组件:

1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────────────────────────┐
│ Apollo Platform │
├─────────────────────────────────────────────────────────────┤
│ Perception │ Prediction │ Planning │ Control │ ... │
├─────────────────────────────────────────────────────────────┤
│ Cyber RT Framework │
├─────────────────────────────────────────────────────────────┤
│ Intra-Process │ Shared Memory │ FastDDS │ Protobuf │
├─────────────────────────────────────────────────────────────┤
│ Hardware Layer │
│ Sensors │ Computing Units │ Actuators │ Network │
└─────────────────────────────────────────────────────────────┘

核心特性

  1. 高性能通信:支持微秒级延迟
  2. 分布式架构:支持多进程、多机器部署
  3. 实时性保证:满足自动驾驶的实时性要求
  4. 模块化设计:易于扩展和维护

Cyber RT通信机制

1. 通信层级

Cyber RT采用三层通信架构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 进程内通信(Intra-Process)
class IntraProcessCommunication {
public:
// 零拷贝优化
template<typename MessageType>
void Publish(const std::string& channel, const MessageType& message) {
// 直接内存传递,避免拷贝
auto writer = GetWriter<MessageType>(channel);
writer->Write(message);
}
};

// 进程间通信(Inter-Process)
class InterProcessCommunication {
public:
// 共享内存通信
void PublishViaSharedMemory(const std::string& channel, const void* data, size_t size) {
auto shm_writer = GetSharedMemoryWriter(channel);
shm_writer->Write(data, size);
}

// 网络通信
void PublishViaNetwork(const std::string& channel, const void* data, size_t size) {
auto network_writer = GetNetworkWriter(channel);
network_writer->Write(data, size);
}
};

2. 节点和组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include "cyber/cyber.h"
#include "cyber/component/component.h"

// 自定义消息类型
class LaserScanMessage {
public:
uint64_t timestamp;
std::vector<float> ranges;
float angle_min;
float angle_max;
float angle_increment;
};

// 激光雷达处理组件
class LidarProcessor : public apollo::cyber::Component<LaserScanMessage> {
public:
bool Init() override {
// 初始化组件
AINFO << "LidarProcessor initialized";
return true;
}

bool Proc(const std::shared_ptr<LaserScanMessage>& scan_msg) override {
// 处理激光雷达数据
AINFO << "Processing laser scan with " << scan_msg->ranges.size() << " points";

// 障碍物检测逻辑
DetectObstacles(scan_msg);

return true;
}

private:
void DetectObstacles(const std::shared_ptr<LaserScanMessage>& scan_msg) {
// 简化的障碍物检测
for (size_t i = 0; i < scan_msg->ranges.size(); ++i) {
if (scan_msg->ranges[i] < 2.0f) { // 2米内检测到障碍物
float angle = scan_msg->angle_min + i * scan_msg->angle_increment;
AINFO << "Obstacle detected at angle: " << angle
<< ", distance: " << scan_msg->ranges[i];
}
}
}
};

// 组件注册
CYBER_REGISTER_COMPONENT(LidarProcessor)

3. 发布-订阅模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include "cyber/cyber.h"

class PerceptionModule {
public:
PerceptionModule() {
// 初始化Cyber框架
apollo::cyber::Init("perception_module");

// 创建节点
node_ = apollo::cyber::CreateNode("perception_node");

// 创建发布者
obstacle_pub_ = node_->CreateWriter<ObstacleMessage>("/perception/obstacles");

// 创建订阅者
lidar_sub_ = node_->CreateReader<LaserScanMessage>(
"/sensors/lidar",
std::bind(&PerceptionModule::OnLidarData, this, std::placeholders::_1)
);
}

void OnLidarData(const std::shared_ptr<LaserScanMessage>& scan_msg) {
// 处理激光雷达数据
auto obstacles = ProcessLidarData(scan_msg);

// 发布障碍物信息
for (const auto& obstacle : obstacles) {
auto obstacle_msg = std::make_shared<ObstacleMessage>();
obstacle_msg->timestamp = scan_msg->timestamp;
obstacle_msg->position = obstacle.position;
obstacle_msg->velocity = obstacle.velocity;
obstacle_msg->confidence = obstacle.confidence;

obstacle_pub_->Write(obstacle_msg);
}
}

private:
std::shared_ptr<apollo::cyber::Node> node_;
std::shared_ptr<apollo::cyber::Writer<ObstacleMessage>> obstacle_pub_;
std::shared_ptr<apollo::cyber::Reader<LaserScanMessage>> lidar_sub_;

std::vector<Obstacle> ProcessLidarData(const std::shared_ptr<LaserScanMessage>& scan_msg) {
// 障碍物检测和跟踪算法
std::vector<Obstacle> obstacles;
// ... 实现障碍物检测逻辑
return obstacles;
}
};

4. 服务调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include "cyber/cyber.h"

// 服务请求和响应消息
class PlanningRequest {
public:
std::vector<ObstacleMessage> obstacles;
PoseMessage current_pose;
PoseMessage target_pose;
};

class PlanningResponse {
public:
std::vector<PathPoint> path;
bool success;
std::string error_message;
};

// 规划服务客户端
class PlanningClient {
public:
PlanningClient() {
node_ = apollo::cyber::CreateNode("planning_client");
client_ = node_->CreateClient<PlanningRequest, PlanningResponse>("/planning/service");
}

std::shared_ptr<PlanningResponse> RequestPath(const PlanningRequest& request) {
auto response = client_->SendRequest(request);
if (response) {
AINFO << "Planning service response received";
return response;
} else {
AERROR << "Failed to get planning service response";
return nullptr;
}
}

private:
std::shared_ptr<apollo::cyber::Node> node_;
std::shared_ptr<apollo::cyber::Client<PlanningRequest, PlanningResponse>> client_;
};

// 规划服务服务器
class PlanningServer {
public:
PlanningServer() {
node_ = apollo::cyber::CreateNode("planning_server");
server_ = node_->CreateService<PlanningRequest, PlanningResponse>(
"/planning/service",
std::bind(&PlanningServer::HandleRequest, this,
std::placeholders::_1, std::placeholders::_2)
);
}

void HandleRequest(const std::shared_ptr<PlanningRequest>& request,
std::shared_ptr<PlanningResponse>& response) {
AINFO << "Received planning request";

// 路径规划算法
auto path = PlanPath(request->current_pose, request->target_pose, request->obstacles);

response = std::make_shared<PlanningResponse>();
response->path = path;
response->success = !path.empty();

if (!response->success) {
response->error_message = "Path planning failed";
}
}

private:
std::shared_ptr<apollo::cyber::Node> node_;
std::shared_ptr<apollo::cyber::Service<PlanningRequest, PlanningResponse>> server_;

std::vector<PathPoint> PlanPath(const PoseMessage& start,
const PoseMessage& goal,
const std::vector<ObstacleMessage>& obstacles) {
// A*算法或其他路径规划算法
std::vector<PathPoint> path;
// ... 实现路径规划逻辑
return path;
}
};

性能优化技术

1. 零拷贝通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ZeroCopyWriter {
public:
template<typename MessageType>
bool Write(const MessageType& message) {
// 获取预分配的内存缓冲区
auto buffer = GetBuffer<MessageType>();

// 直接在缓冲区中构造消息,避免拷贝
new (buffer->data()) MessageType(message);

// 通知订阅者
NotifySubscribers(buffer);

return true;
}

private:
template<typename MessageType>
BufferPtr GetBuffer() {
// 从内存池获取缓冲区
return memory_pool_.GetBuffer<MessageType>();
}

MemoryPool memory_pool_;
};

2. 内存池管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
template<typename MessageType>
class MessageMemoryPool {
public:
MessageMemoryPool(size_t pool_size = 1000) : pool_size_(pool_size) {
// 预分配内存块
for (size_t i = 0; i < pool_size_; ++i) {
auto buffer = std::make_shared<MessageBuffer<MessageType>>();
free_buffers_.push(buffer);
}
}

std::shared_ptr<MessageBuffer<MessageType>> GetBuffer() {
std::lock_guard<std::mutex> lock(mutex_);

if (free_buffers_.empty()) {
// 池中没有可用缓冲区,创建新的
return std::make_shared<MessageBuffer<MessageType>>();
}

auto buffer = free_buffers_.front();
free_buffers_.pop();
return buffer;
}

void ReturnBuffer(std::shared_ptr<MessageBuffer<MessageType>> buffer) {
std::lock_guard<std::mutex> lock(mutex_);

if (free_buffers_.size() < pool_size_) {
buffer->Reset(); // 重置缓冲区状态
free_buffers_.push(buffer);
}
// 如果池已满,则丢弃缓冲区
}

private:
size_t pool_size_;
std::queue<std::shared_ptr<MessageBuffer<MessageType>>> free_buffers_;
std::mutex mutex_;
};

3. 无锁队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
template<typename T>
class LockFreeQueue {
public:
LockFreeQueue(size_t capacity) : capacity_(capacity) {
buffer_ = new T[capacity_];
head_ = 0;
tail_ = 0;
}

~LockFreeQueue() {
delete[] buffer_;
}

bool Enqueue(const T& item) {
size_t current_tail = tail_.load(std::memory_order_relaxed);
size_t next_tail = (current_tail + 1) % capacity_;

if (next_tail == head_.load(std::memory_order_acquire)) {
return false; // 队列已满
}

buffer_[current_tail] = item;
tail_.store(next_tail, std::memory_order_release);

return true;
}

bool Dequeue(T& item) {
size_t current_head = head_.load(std::memory_order_relaxed);

if (current_head == tail_.load(std::memory_order_acquire)) {
return false; // 队列为空
}

item = buffer_[current_head];
head_.store((current_head + 1) % capacity_, std::memory_order_release);

return true;
}

private:
size_t capacity_;
T* buffer_;
std::atomic<size_t> head_;
std::atomic<size_t> tail_;
};

部署和配置

Launch文件配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<!-- apollo_launch.xml -->
<launch>
<!-- 感知模块 -->
<module name="perception" config="./config/perception.pb.txt">
<component name="lidar_processor"
class="LidarProcessor"
config="./config/lidar_processor.pb.txt" />
<component name="camera_processor"
class="CameraProcessor"
config="./config/camera_processor.pb.txt" />
</module>

<!-- 预测模块 -->
<module name="prediction" config="./config/prediction.pb.txt">
<component name="motion_predictor"
class="MotionPredictor"
config="./config/motion_predictor.pb.txt" />
</module>

<!-- 规划模块 -->
<module name="planning" config="./config/planning.pb.txt">
<component name="path_planner"
class="PathPlanner"
config="./config/path_planner.pb.txt" />
</module>

<!-- 控制模块 -->
<module name="control" config="./config/control.pb.txt">
<component name="pid_controller"
class="PIDController"
config="./config/pid_controller.pb.txt" />
</module>
</launch>

配置文件示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// perception.pb.txt
modules {
name: "perception"
components {
name: "lidar_processor"
class_name: "LidarProcessor"
config_file_path: "./config/lidar_processor.pb.txt"
readers {
channel: "/sensors/lidar"
message_type: "LaserScanMessage"
}
writers {
channel: "/perception/obstacles"
message_type: "ObstacleMessage"
}
}
}

总结

Apollo框架通过Cyber RT提供了:

  1. 高性能通信:零拷贝、内存池、无锁队列
  2. 分布式架构:支持多进程、多机器部署
  3. 实时性保证:微秒级延迟,满足自动驾驶需求
  4. 模块化设计:易于扩展和维护

Cyber RT的设计理念和实现技术为大规模分布式实时系统提供了重要参考,其性能优化技术可以应用到其他需要高性能通信的场景中。