Apollo自动驾驶框架深度解析 - Cyber RT通信机制与架构设计
前言
Apollo是百度开源的自动驾驶平台,其核心通信框架Cyber RT为大规模分布式系统提供了高性能、低延迟的通信解决方案。本文将深入分析Apollo框架的架构设计和Cyber RT的通信机制。
Apollo架构概述
整体架构
Apollo采用模块化设计,主要包含以下核心组件:
1 2 3 4 5 6 7 8 9 10 11 12
| ┌─────────────────────────────────────────────────────────────┐ │ Apollo Platform │ ├─────────────────────────────────────────────────────────────┤ │ Perception │ Prediction │ Planning │ Control │ ... │ ├─────────────────────────────────────────────────────────────┤ │ Cyber RT Framework │ ├─────────────────────────────────────────────────────────────┤ │ Intra-Process │ Shared Memory │ FastDDS │ Protobuf │ ├─────────────────────────────────────────────────────────────┤ │ Hardware Layer │ │ Sensors │ Computing Units │ Actuators │ Network │ └─────────────────────────────────────────────────────────────┘
|
核心特性
- 高性能通信:支持微秒级延迟
- 分布式架构:支持多进程、多机器部署
- 实时性保证:满足自动驾驶的实时性要求
- 模块化设计:易于扩展和维护
Cyber RT通信机制
1. 通信层级
Cyber RT采用三层通信架构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| class IntraProcessCommunication { public: template<typename MessageType> void Publish(const std::string& channel, const MessageType& message) { auto writer = GetWriter<MessageType>(channel); writer->Write(message); } };
class InterProcessCommunication { public: void PublishViaSharedMemory(const std::string& channel, const void* data, size_t size) { auto shm_writer = GetSharedMemoryWriter(channel); shm_writer->Write(data, size); } void PublishViaNetwork(const std::string& channel, const void* data, size_t size) { auto network_writer = GetNetworkWriter(channel); network_writer->Write(data, size); } };
|
2. 节点和组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| #include "cyber/cyber.h" #include "cyber/component/component.h"
class LaserScanMessage { public: uint64_t timestamp; std::vector<float> ranges; float angle_min; float angle_max; float angle_increment; };
class LidarProcessor : public apollo::cyber::Component<LaserScanMessage> { public: bool Init() override { AINFO << "LidarProcessor initialized"; return true; } bool Proc(const std::shared_ptr<LaserScanMessage>& scan_msg) override { AINFO << "Processing laser scan with " << scan_msg->ranges.size() << " points"; DetectObstacles(scan_msg); return true; } private: void DetectObstacles(const std::shared_ptr<LaserScanMessage>& scan_msg) { for (size_t i = 0; i < scan_msg->ranges.size(); ++i) { if (scan_msg->ranges[i] < 2.0f) { float angle = scan_msg->angle_min + i * scan_msg->angle_increment; AINFO << "Obstacle detected at angle: " << angle << ", distance: " << scan_msg->ranges[i]; } } } };
CYBER_REGISTER_COMPONENT(LidarProcessor)
|
3. 发布-订阅模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| #include "cyber/cyber.h"
class PerceptionModule { public: PerceptionModule() { apollo::cyber::Init("perception_module"); node_ = apollo::cyber::CreateNode("perception_node"); obstacle_pub_ = node_->CreateWriter<ObstacleMessage>("/perception/obstacles"); lidar_sub_ = node_->CreateReader<LaserScanMessage>( "/sensors/lidar", std::bind(&PerceptionModule::OnLidarData, this, std::placeholders::_1) ); } void OnLidarData(const std::shared_ptr<LaserScanMessage>& scan_msg) { auto obstacles = ProcessLidarData(scan_msg); for (const auto& obstacle : obstacles) { auto obstacle_msg = std::make_shared<ObstacleMessage>(); obstacle_msg->timestamp = scan_msg->timestamp; obstacle_msg->position = obstacle.position; obstacle_msg->velocity = obstacle.velocity; obstacle_msg->confidence = obstacle.confidence; obstacle_pub_->Write(obstacle_msg); } } private: std::shared_ptr<apollo::cyber::Node> node_; std::shared_ptr<apollo::cyber::Writer<ObstacleMessage>> obstacle_pub_; std::shared_ptr<apollo::cyber::Reader<LaserScanMessage>> lidar_sub_; std::vector<Obstacle> ProcessLidarData(const std::shared_ptr<LaserScanMessage>& scan_msg) { std::vector<Obstacle> obstacles; return obstacles; } };
|
4. 服务调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| #include "cyber/cyber.h"
class PlanningRequest { public: std::vector<ObstacleMessage> obstacles; PoseMessage current_pose; PoseMessage target_pose; };
class PlanningResponse { public: std::vector<PathPoint> path; bool success; std::string error_message; };
class PlanningClient { public: PlanningClient() { node_ = apollo::cyber::CreateNode("planning_client"); client_ = node_->CreateClient<PlanningRequest, PlanningResponse>("/planning/service"); } std::shared_ptr<PlanningResponse> RequestPath(const PlanningRequest& request) { auto response = client_->SendRequest(request); if (response) { AINFO << "Planning service response received"; return response; } else { AERROR << "Failed to get planning service response"; return nullptr; } } private: std::shared_ptr<apollo::cyber::Node> node_; std::shared_ptr<apollo::cyber::Client<PlanningRequest, PlanningResponse>> client_; };
class PlanningServer { public: PlanningServer() { node_ = apollo::cyber::CreateNode("planning_server"); server_ = node_->CreateService<PlanningRequest, PlanningResponse>( "/planning/service", std::bind(&PlanningServer::HandleRequest, this, std::placeholders::_1, std::placeholders::_2) ); } void HandleRequest(const std::shared_ptr<PlanningRequest>& request, std::shared_ptr<PlanningResponse>& response) { AINFO << "Received planning request"; auto path = PlanPath(request->current_pose, request->target_pose, request->obstacles); response = std::make_shared<PlanningResponse>(); response->path = path; response->success = !path.empty(); if (!response->success) { response->error_message = "Path planning failed"; } } private: std::shared_ptr<apollo::cyber::Node> node_; std::shared_ptr<apollo::cyber::Service<PlanningRequest, PlanningResponse>> server_; std::vector<PathPoint> PlanPath(const PoseMessage& start, const PoseMessage& goal, const std::vector<ObstacleMessage>& obstacles) { std::vector<PathPoint> path; return path; } };
|
性能优化技术
1. 零拷贝通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| class ZeroCopyWriter { public: template<typename MessageType> bool Write(const MessageType& message) { auto buffer = GetBuffer<MessageType>(); new (buffer->data()) MessageType(message); NotifySubscribers(buffer); return true; } private: template<typename MessageType> BufferPtr GetBuffer() { return memory_pool_.GetBuffer<MessageType>(); } MemoryPool memory_pool_; };
|
2. 内存池管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| template<typename MessageType> class MessageMemoryPool { public: MessageMemoryPool(size_t pool_size = 1000) : pool_size_(pool_size) { for (size_t i = 0; i < pool_size_; ++i) { auto buffer = std::make_shared<MessageBuffer<MessageType>>(); free_buffers_.push(buffer); } } std::shared_ptr<MessageBuffer<MessageType>> GetBuffer() { std::lock_guard<std::mutex> lock(mutex_); if (free_buffers_.empty()) { return std::make_shared<MessageBuffer<MessageType>>(); } auto buffer = free_buffers_.front(); free_buffers_.pop(); return buffer; } void ReturnBuffer(std::shared_ptr<MessageBuffer<MessageType>> buffer) { std::lock_guard<std::mutex> lock(mutex_); if (free_buffers_.size() < pool_size_) { buffer->Reset(); free_buffers_.push(buffer); } } private: size_t pool_size_; std::queue<std::shared_ptr<MessageBuffer<MessageType>>> free_buffers_; std::mutex mutex_; };
|
3. 无锁队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| template<typename T> class LockFreeQueue { public: LockFreeQueue(size_t capacity) : capacity_(capacity) { buffer_ = new T[capacity_]; head_ = 0; tail_ = 0; } ~LockFreeQueue() { delete[] buffer_; } bool Enqueue(const T& item) { size_t current_tail = tail_.load(std::memory_order_relaxed); size_t next_tail = (current_tail + 1) % capacity_; if (next_tail == head_.load(std::memory_order_acquire)) { return false; } buffer_[current_tail] = item; tail_.store(next_tail, std::memory_order_release); return true; } bool Dequeue(T& item) { size_t current_head = head_.load(std::memory_order_relaxed); if (current_head == tail_.load(std::memory_order_acquire)) { return false; } item = buffer_[current_head]; head_.store((current_head + 1) % capacity_, std::memory_order_release); return true; } private: size_t capacity_; T* buffer_; std::atomic<size_t> head_; std::atomic<size_t> tail_; };
|
部署和配置
Launch文件配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| <launch> <module name="perception" config="./config/perception.pb.txt"> <component name="lidar_processor" class="LidarProcessor" config="./config/lidar_processor.pb.txt" /> <component name="camera_processor" class="CameraProcessor" config="./config/camera_processor.pb.txt" /> </module> <module name="prediction" config="./config/prediction.pb.txt"> <component name="motion_predictor" class="MotionPredictor" config="./config/motion_predictor.pb.txt" /> </module> <module name="planning" config="./config/planning.pb.txt"> <component name="path_planner" class="PathPlanner" config="./config/path_planner.pb.txt" /> </module> <module name="control" config="./config/control.pb.txt"> <component name="pid_controller" class="PIDController" config="./config/pid_controller.pb.txt" /> </module> </launch>
|
配置文件示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| modules { name: "perception" components { name: "lidar_processor" class_name: "LidarProcessor" config_file_path: "./config/lidar_processor.pb.txt" readers { channel: "/sensors/lidar" message_type: "LaserScanMessage" } writers { channel: "/perception/obstacles" message_type: "ObstacleMessage" } } }
|
总结
Apollo框架通过Cyber RT提供了:
- 高性能通信:零拷贝、内存池、无锁队列
- 分布式架构:支持多进程、多机器部署
- 实时性保证:微秒级延迟,满足自动驾驶需求
- 模块化设计:易于扩展和维护
Cyber RT的设计理念和实现技术为大规模分布式实时系统提供了重要参考,其性能优化技术可以应用到其他需要高性能通信的场景中。