ROS2性能优化实战 - 延迟优化、吞吐量提升与资源管理
前言
ROS2的性能优化是构建高性能机器人系统的关键。本文将深入分析ROS2的性能瓶颈,并提供具体的优化策略和实现方案,包括延迟优化、吞吐量提升、内存管理和CPU优化等。
ROS2性能分析框架
性能监控工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| #include <rclcpp/rclcpp.hpp> #include <std_msgs/msg/string.hpp> #include <chrono> #include <vector> #include <mutex>
class PerformanceMonitor : public rclcpp::Node { public: PerformanceMonitor() : Node("performance_monitor") { message_count_ = 0; total_latency_ = 0.0; max_latency_ = 0.0; min_latency_ = std::numeric_limits<double>::max(); publisher_ = this->create_publisher<std_msgs::msg::String>( "/performance/test", 10); subscription_ = this->create_subscription<std_msgs::msg::String>( "/performance/test", 10, std::bind(&PerformanceMonitor::message_callback, this, std::placeholders::_1)); timer_ = this->create_wall_timer( std::chrono::milliseconds(100), std::bind(&PerformanceMonitor::publish_message, this)); stats_timer_ = this->create_wall_timer( std::chrono::seconds(5), std::bind(&PerformanceMonitor::print_statistics, this)); RCLCPP_INFO(this->get_logger(), "Performance monitor started"); }
private: void publish_message() { auto msg = std_msgs::msg::String(); msg.data = "Performance test message " + std::to_string(message_count_); auto send_time = std::chrono::high_resolution_clock::now(); msg.header.stamp = this->now(); publisher_->publish(msg); message_count_++; } void message_callback(const std_msgs::msg::String::SharedPtr msg) { auto receive_time = std::chrono::high_resolution_clock::now(); auto latency = std::chrono::duration<double, std::milli>( receive_time.time_since_epoch()).count() - msg->header.stamp.nanosec / 1000000.0; std::lock_guard<std::mutex> lock(stats_mutex_); total_latency_ += latency; max_latency_ = std::max(max_latency_, latency); min_latency_ = std::min(min_latency_, latency); received_messages_++; } void print_statistics() { std::lock_guard<std::mutex> lock(stats_mutex_); if (received_messages_ > 0) { double avg_latency = total_latency_ / received_messages_; RCLCPP_INFO(this->get_logger(), "Performance Stats - Messages: %zu, " "Avg Latency: %.2f ms, Min: %.2f ms, Max: %.2f ms", received_messages_, avg_latency, min_latency_, max_latency_); } } rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_; rclcpp::TimerBase::SharedPtr timer_; rclcpp::TimerBase::SharedPtr stats_timer_; std::mutex stats_mutex_; size_t message_count_ = 0; size_t received_messages_ = 0; double total_latency_ = 0.0; double max_latency_ = 0.0; double min_latency_ = std::numeric_limits<double>::max(); };
int main(int argc, char *argv[]) { rclcpp::init(argc, argv); auto node = std::make_shared<PerformanceMonitor>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; }
|
延迟优化策略
1. 零拷贝消息传递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| #include <rclcpp/rclcpp.hpp> #include <sensor_msgs/msg/laser_scan.hpp> #include <memory>
class ZeroCopyOptimizer : public rclcpp::Node { public: ZeroCopyOptimizer() : Node("zero_copy_optimizer") { auto qos = rclcpp::QoS(1) .reliability(RMW_QOS_RELIABILITY_BEST_EFFORT) .durability(RMW_QOS_DURABILITY_VOLATILE) .history(RMW_QOS_HISTORY_KEEP_LAST); laser_publisher_ = this->create_publisher<sensor_msgs::msg::LaserScan>( "/optimized/laser", qos); laser_subscription_ = this->create_subscription<sensor_msgs::msg::LaserScan>( "/optimized/laser", qos, std::bind(&ZeroCopyOptimizer::laser_callback, this, std::placeholders::_1)); allocate_message_buffers(); timer_ = this->create_wall_timer( std::chrono::milliseconds(50), std::bind(&ZeroCopyOptimizer::publish_laser_data, this)); RCLCPP_INFO(this->get_logger(), "Zero copy optimizer started"); }
private: void allocate_message_buffers() { for (int i = 0; i < BUFFER_SIZE; ++i) { auto msg = std::make_shared<sensor_msgs::msg::LaserScan>(); msg->ranges.resize(360, 0.0); msg->intensities.resize(360, 0.0); message_buffer_.push_back(msg); } current_buffer_index_ = 0; } void publish_laser_data() { auto msg = message_buffer_[current_buffer_index_]; current_buffer_index_ = (current_buffer_index_ + 1) % BUFFER_SIZE; msg->header.stamp = this->now(); msg->header.frame_id = "laser_frame"; msg->angle_min = -3.14159; msg->angle_max = 3.14159; msg->angle_increment = 0.0174533; msg->time_increment = 0.0; msg->scan_time = 0.05; msg->range_min = 0.1; msg->range_max = 10.0; for (size_t i = 0; i < 360; ++i) { msg->ranges[i] = 1.0 + 0.5 * sin(i * 0.1); msg->intensities[i] = 100.0 + 50.0 * cos(i * 0.05); } laser_publisher_->publish(*msg); } void laser_callback(const sensor_msgs::msg::LaserScan::SharedPtr msg) { auto min_range = *std::min_element(msg->ranges.begin(), msg->ranges.end()); auto max_range = *std::max_element(msg->ranges.begin(), msg->ranges.end()); RCLCPP_DEBUG(this->get_logger(), "Laser scan: %zu points, range [%.2f, %.2f]", msg->ranges.size(), min_range, max_range); } static constexpr int BUFFER_SIZE = 10; rclcpp::Publisher<sensor_msgs::msg::LaserScan>::SharedPtr laser_publisher_; rclcpp::Subscription<sensor_msgs::msg::LaserScan>::SharedPtr laser_subscription_; rclcpp::TimerBase::SharedPtr timer_; std::vector<std::shared_ptr<sensor_msgs::msg::LaserScan>> message_buffer_; int current_buffer_index_; };
int main(int argc, char *argv[]) { rclcpp::init(argc, argv); auto node = std::make_shared<ZeroCopyOptimizer>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; }
|
2. 内存池优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| #include <rclcpp/rclcpp.hpp> #include <memory> #include <vector> #include <mutex> #include <condition_variable>
template<typename T> class MessagePool { public: MessagePool(size_t pool_size = 100) : pool_size_(pool_size) { for (size_t i = 0; i < pool_size_; ++i) { available_messages_.push(std::make_shared<T>()); } } std::shared_ptr<T> acquire() { std::unique_lock<std::mutex> lock(mutex_); if (available_messages_.empty()) { return std::make_shared<T>(); } auto msg = available_messages_.front(); available_messages_.pop(); return msg; } void release(std::shared_ptr<T> msg) { if (!msg) return; std::unique_lock<std::mutex> lock(mutex_); if (available_messages_.size() < pool_size_) { reset_message(msg); available_messages_.push(msg); } }
private: void reset_message(std::shared_ptr<T> msg) { *msg = T{}; } size_t pool_size_; std::queue<std::shared_ptr<T>> available_messages_; std::mutex mutex_; };
class MemoryPoolOptimizer : public rclcpp::Node { public: MemoryPoolOptimizer() : Node("memory_pool_optimizer") { laser_pool_ = std::make_unique<MessagePool<sensor_msgs::msg::LaserScan>>(50); laser_publisher_ = this->create_publisher<sensor_msgs::msg::LaserScan>( "/pool/laser", 10); timer_ = this->create_wall_timer( std::chrono::milliseconds(20), std::bind(&MemoryPoolOptimizer::publish_with_pool, this)); RCLCPP_INFO(this->get_logger(), "Memory pool optimizer started"); }
private: void publish_with_pool() { auto msg = laser_pool_->acquire(); msg->header.stamp = this->now(); msg->header.frame_id = "laser_frame"; msg->angle_min = -3.14159; msg->angle_max = 3.14159; msg->angle_increment = 0.0174533; msg->range_min = 0.1; msg->range_max = 10.0; if (msg->ranges.size() != 360) { msg->ranges.resize(360); msg->intensities.resize(360); } for (size_t i = 0; i < 360; ++i) { msg->ranges[i] = 1.0 + 0.3 * sin(i * 0.1); msg->intensities[i] = 100.0 + 30.0 * cos(i * 0.05); } laser_publisher_->publish(*msg); auto release_timer = this->create_wall_timer( std::chrono::milliseconds(100), [this, msg]() { laser_pool_->release(msg); }); release_timer->cancel(); } std::unique_ptr<MessagePool<sensor_msgs::msg::LaserScan>> laser_pool_; rclcpp::Publisher<sensor_msgs::msg::LaserScan>::SharedPtr laser_publisher_; rclcpp::TimerBase::SharedPtr timer_; };
int main(int argc, char *argv[]) { rclcpp::init(argc, argv); auto node = std::make_shared<MemoryPoolOptimizer>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; }
|
吞吐量优化
高吞吐量消息处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
| #include <rclcpp/rclcpp.hpp> #include <std_msgs/msg/string.hpp> #include <thread> #include <atomic> #include <queue> #include <condition_variable>
class HighThroughputProcessor : public rclcpp::Node { public: HighThroughputProcessor() : Node("high_throughput_processor") { auto qos = rclcpp::QoS(100) .reliability(RMW_QOS_RELIABILITY_BEST_EFFORT) .durability(RMW_QOS_DURABILITY_VOLATILE) .history(RMW_QOS_HISTORY_KEEP_LAST); publisher_ = this->create_publisher<std_msgs::msg::String>( "/throughput/test", qos); subscription_ = this->create_subscription<std_msgs::msg::String>( "/throughput/test", qos, std::bind(&HighThroughputProcessor::message_callback, this, std::placeholders::_1)); start_worker_threads(); stats_timer_ = this->create_wall_timer( std::chrono::seconds(1), std::bind(&HighThroughputProcessor::print_throughput_stats, this)); RCLCPP_INFO(this->get_logger(), "High throughput processor started"); }
private: void start_worker_threads() { for (int i = 0; i < PUBLISHER_THREADS; ++i) { publish_threads_.emplace_back( std::bind(&HighThroughputProcessor::publisher_worker, this, i)); } for (int i = 0; i < PROCESSOR_THREADS; ++i) { process_threads_.emplace_back( std::bind(&HighThroughputProcessor::processor_worker, this)); } } void publisher_worker(int thread_id) { rclcpp::Rate rate(1000); while (rclcpp::ok()) { auto msg = std_msgs::msg::String(); msg.data = "Message from thread " + std::to_string(thread_id) + " #" + std::to_string(publish_count_++); publisher_->publish(msg); rate.sleep(); } } void processor_worker() { while (rclcpp::ok()) { std_msgs::msg::String::SharedPtr msg; { std::unique_lock<std::mutex> lock(queue_mutex_); queue_cv_.wait(lock, [this] { return !message_queue_.empty() || !rclcpp::ok(); }); if (!message_queue_.empty()) { msg = message_queue_.front(); message_queue_.pop(); } } if (msg) { process_count_++; } } } void message_callback(const std_msgs::msg::String::SharedPtr msg) { { std::lock_guard<std::mutex> lock(queue_mutex_); if (message_queue_.size() < MAX_QUEUE_SIZE) { message_queue_.push(msg); } } queue_cv_.notify_one(); } void print_throughput_stats() { auto current_publish = publish_count_.load(); auto current_process = process_count_.load(); RCLCPP_INFO(this->get_logger(), "Throughput - Published: %zu msg/s, Processed: %zu msg/s, Queue: %zu", current_publish - last_publish_count_, current_process - last_process_count_, message_queue_.size()); last_publish_count_ = current_publish; last_process_count_ = current_process; } static constexpr int PUBLISHER_THREADS = 4; static constexpr int PROCESSOR_THREADS = 8; static constexpr size_t MAX_QUEUE_SIZE = 10000; rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_; rclcpp::TimerBase::SharedPtr stats_timer_; std::vector<std::thread> publish_threads_; std::vector<std::thread> process_threads_; std::queue<std_msgs::msg::String::SharedPtr> message_queue_; std::mutex queue_mutex_; std::condition_variable queue_cv_; std::atomic<size_t> publish_count_{0}; std::atomic<size_t> process_count_{0}; size_t last_publish_count_ = 0; size_t last_process_count_ = 0; };
int main(int argc, char *argv[]) { rclcpp::init(argc, argv); auto node = std::make_shared<HighThroughputProcessor>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; }
|
资源管理优化
CPU亲和性和优先级设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| #include <rclcpp/rclcpp.hpp> #include <sched.h> #include <sys/resource.h> #include <thread>
class ResourceOptimizer : public rclcpp::Node { public: ResourceOptimizer() : Node("resource_optimizer") { setup_cpu_affinity(); setup_thread_priority(); timer_ = this->create_wall_timer( std::chrono::milliseconds(10), std::bind(&ResourceOptimizer::high_priority_task, this)); RCLCPP_INFO(this->get_logger(), "Resource optimizer started"); }
private: void setup_cpu_affinity() { cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(0, &cpuset); if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) == 0) { RCLCPP_INFO(this->get_logger(), "CPU affinity set to CPU 0"); } else { RCLCPP_WARN(this->get_logger(), "Failed to set CPU affinity"); } } void setup_thread_priority() { struct sched_param param; param.sched_priority = 80; if (sched_setscheduler(0, SCHED_FIFO, ¶m) == 0) { RCLCPP_INFO(this->get_logger(), "Thread priority set to SCHED_FIFO"); } else { RCLCPP_WARN(this->get_logger(), "Failed to set thread priority"); } } void high_priority_task() { auto start = std::chrono::high_resolution_clock::now(); perform_critical_task(); auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start); if (duration.count() > 1000) { RCLCPP_WARN(this->get_logger(), "Task took %ld microseconds", duration.count()); } } void perform_critical_task() { volatile double result = 0.0; for (int i = 0; i < 1000; ++i) { result += sin(i * 0.01) * cos(i * 0.02); } } rclcpp::TimerBase::SharedPtr timer_; };
int main(int argc, char *argv[]) { rclcpp::init(argc, argv); auto node = std::make_shared<ResourceOptimizer>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; }
|
网络优化
网络传输优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| #include <rclcpp/rclcpp.hpp> #include <std_msgs/msg/string.hpp>
class NetworkOptimizer : public rclcpp::Node { public: NetworkOptimizer() : Node("network_optimizer") { setup_network_optimization(); publisher_ = this->create_publisher<std_msgs::msg::String>( "/network/optimized", 10); create_batch_messages(); timer_ = this->create_wall_timer( std::chrono::milliseconds(100), std::bind(&NetworkOptimizer::publish_batch, this)); RCLCPP_INFO(this->get_logger(), "Network optimizer started"); }
private: void setup_network_optimization() { auto qos = rclcpp::QoS(10) .reliability(RMW_QOS_RELIABILITY_RELIABLE) .durability(RMW_QOS_DURABILITY_VOLATILE) .history(RMW_QOS_HISTORY_KEEP_LAST); rmw_qos_profile_t custom_qos = qos.get_rmw_qos_profile(); custom_qos.avoid_ros_namespace_conventions = true; } void create_batch_messages() { batch_messages_.reserve(BATCH_SIZE); for (int i = 0; i < BATCH_SIZE; ++i) { auto msg = std_msgs::msg::String(); msg.data = "Batch message " + std::to_string(i); batch_messages_.push_back(msg); } } void publish_batch() { for (const auto& msg : batch_messages_) { publisher_->publish(msg); } RCLCPP_DEBUG(this->get_logger(), "Published batch of %zu messages", batch_messages_.size()); } static constexpr int BATCH_SIZE = 10; rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; rclcpp::TimerBase::SharedPtr timer_; std::vector<std_msgs::msg::String> batch_messages_; };
int main(int argc, char *argv[]) { rclcpp::init(argc, argv); auto node = std::make_shared<NetworkOptimizer>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; }
|
性能测试和基准测试
综合性能测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| #include <rclcpp/rclcpp.hpp> #include <benchmark/benchmark.h> #include <chrono>
class ROS2Benchmark : public rclcpp::Node { public: ROS2Benchmark() : Node("ros2_benchmark") { setup_test_topics(); run_benchmarks(); }
private: void setup_test_topics() { publisher_ = this->create_publisher<std_msgs::msg::String>( "/benchmark/test", 10); subscription_ = this->create_subscription<std_msgs::msg::String>( "/benchmark/test", 10, std::bind(&ROS2Benchmark::benchmark_callback, this, std::placeholders::_1)); } void run_benchmarks() { benchmark_latency(); benchmark_throughput(); benchmark_memory_usage(); } void benchmark_latency() { const int num_messages = 1000; std::vector<double> latencies; latencies.reserve(num_messages); for (int i = 0; i < num_messages; ++i) { auto start = std::chrono::high_resolution_clock::now(); auto msg = std_msgs::msg::String(); msg.data = "Latency test " + std::to_string(i); publisher_->publish(msg); while (received_count_ <= i) { rclcpp::spin_some(this->shared_from_this()); std::this_thread::sleep_for(std::chrono::microseconds(10)); } auto end = std::chrono::high_resolution_clock::now(); auto latency = std::chrono::duration<double, std::milli>(end - start).count(); latencies.push_back(latency); } double avg_latency = std::accumulate(latencies.begin(), latencies.end(), 0.0) / latencies.size(); double max_latency = *std::max_element(latencies.begin(), latencies.end()); double min_latency = *std::min_element(latencies.begin(), latencies.end()); RCLCPP_INFO(this->get_logger(), "Latency Benchmark - Avg: %.2f ms, Min: %.2f ms, Max: %.2f ms", avg_latency, min_latency, max_latency); } void benchmark_throughput() { const int num_messages = 10000; auto start = std::chrono::high_resolution_clock::now(); for (int i = 0; i < num_messages; ++i) { auto msg = std_msgs::msg::String(); msg.data = "Throughput test " + std::to_string(i); publisher_->publish(msg); } auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration<double>(end - start).count(); double throughput = num_messages / duration; RCLCPP_INFO(this->get_logger(), "Throughput Benchmark - %d messages in %.2f s, %.0f msg/s", num_messages, duration, throughput); } void benchmark_memory_usage() { const int num_allocations = 1000; auto start = std::chrono::high_resolution_clock::now(); std::vector<std_msgs::msg::String::SharedPtr> messages; messages.reserve(num_allocations); for (int i = 0; i < num_allocations; ++i) { auto msg = std::make_shared<std_msgs::msg::String>(); msg->data = "Memory test " + std::to_string(i); messages.push_back(msg); } auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration<double, std::milli>(end - start).count(); RCLCPP_INFO(this->get_logger(), "Memory Benchmark - %d allocations in %.2f ms, %.2f allocs/ms", num_allocations, duration, num_allocations / duration); } void benchmark_callback(const std_msgs::msg::String::SharedPtr msg) { received_count_++; } rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_; std::atomic<int> received_count_{0}; };
int main(int argc, char *argv[]) { rclcpp::init(argc, argv); auto node = std::make_shared<ROS2Benchmark>(); rclcpp::shutdown(); return 0; }
|
总结
ROS2性能优化的关键策略:
- 延迟优化:零拷贝消息传递、内存池、QoS配置
- 吞吐量优化:多线程处理、批量操作、队列管理
- 资源管理:CPU亲和性、线程优先级、内存优化
- 网络优化:传输参数调优、批量传输
- 监控和测试:性能监控、基准测试
通过实施这些优化策略,可以显著提升ROS2应用程序的性能,满足实时机器人系统的要求。