FastDDS性能调优实战 - 延迟优化与吞吐量提升
前言
FastDDS的性能调优是构建高性能分布式系统的关键。本文将深入分析FastDDS的性能瓶颈,并提供具体的调优策略和实现方案,包括延迟优化、吞吐量提升、内存管理等方面。
FastDDS性能分析
性能监控框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| #include <fastdds/dds/domain/DomainParticipant.hpp> #include <fastdds/dds/publisher/Publisher.hpp> #include <fastdds/dds/subscriber/Subscriber.hpp> #include <chrono> #include <thread> #include <vector>
class FastDDSPerformanceMonitor { public: FastDDSPerformanceMonitor() { participant_ = factory_->create_participant( DOMAIN_ID, eprosima::fastdds::dds::PARTICIPANT_QOS_DEFAULT); setup_performance_monitoring(); } ~FastDDSPerformanceMonitor() { if (participant_) { factory_->delete_participant(participant_); } }
private: void setup_performance_monitoring() { eprosima::fastdds::dds::DomainParticipantQos qos; qos.properties().properties().emplace_back( "dds.domain_participant.rtps.builtin.enable_statistics_collection", "true"); qos.properties().properties().emplace_back( "dds.domain_participant.rtps.builtin.statistics_collection_period", "100"); participant_->set_qos(qos); std::cout << "Performance monitoring enabled" << std::endl; } void collect_statistics() { auto start_time = std::chrono::high_resolution_clock::now(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); auto end_time = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::microseconds>( end_time - start_time).count(); std::cout << "Statistics collection took: " << duration << " microseconds" << std::endl; } static constexpr uint32_t DOMAIN_ID = 0; eprosima::fastdds::dds::DomainParticipantFactory* factory_ = eprosima::fastdds::dds::DomainParticipantFactory::get_instance(); eprosima::fastdds::dds::DomainParticipant* participant_; };
|
延迟优化策略
1. 零拷贝优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| #include <fastdds/dds/publisher/DataWriter.hpp> #include <fastdds/dds/subscriber/DataReader.hpp>
template<typename T> class ZeroCopyFastDDS { public: ZeroCopyFastDDS(eprosima::fastdds::dds::DomainParticipant* participant, const std::string& topic_name) : participant_(participant), topic_name_(topic_name) { setup_zero_copy_optimization(); } void setup_zero_copy_optimization() { eprosima::fastdds::dds::DataWriterQos writer_qos; writer_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; writer_qos.history().depth = 1; writer_qos.resource_limits().max_samples = 1; writer_qos.resource_limits().max_instances = 1; writer_qos.resource_limits().max_samples_per_instance = 1; writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; writer_qos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; std::cout << "Zero copy QoS configured for topic: " << topic_name_ << std::endl; } bool publish_zero_copy(const T& data) { auto start = std::chrono::high_resolution_clock::now(); bool result = writer_->write(&data) == eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK; auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); if (duration > 1000) { std::cout << "Publish latency: " << duration << " nanoseconds" << std::endl; } return result; }
private: eprosima::fastdds::dds::DomainParticipant* participant_; std::string topic_name_; eprosima::fastdds::dds::DataWriter* writer_; };
|
2. 内存池优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| #include <memory> #include <vector> #include <mutex> #include <queue>
template<typename T> class FastDDSMemoryPool { public: FastDDSMemoryPool(size_t pool_size = 100) : pool_size_(pool_size) { for (size_t i = 0; i < pool_size_; ++i) { available_objects_.push(std::make_shared<T>()); } std::cout << "Memory pool created with " << pool_size_ << " objects" << std::endl; } std::shared_ptr<T> acquire() { std::lock_guard<std::mutex> lock(mutex_); if (available_objects_.empty()) { return std::make_shared<T>(); } auto obj = available_objects_.front(); available_objects_.pop(); return obj; } void release(std::shared_ptr<T> obj) { if (!obj) return; std::lock_guard<std::mutex> lock(mutex_); if (available_objects_.size() < pool_size_) { reset_object(obj); available_objects_.push(obj); } } size_t get_pool_usage() const { std::lock_guard<std::mutex> lock(mutex_); return pool_size_ - available_objects_.size(); }
private: void reset_object(std::shared_ptr<T> obj) { *obj = T{}; } size_t pool_size_; std::queue<std::shared_ptr<T>> available_objects_; mutable std::mutex mutex_; };
class MemoryPoolOptimizedPublisher { public: MemoryPoolOptimizedPublisher(eprosima::fastdds::dds::DomainParticipant* participant) : participant_(participant) { memory_pool_ = std::make_unique<FastDDSMemoryPool<std_msgs::msg::String>>(50); setup_optimized_publisher(); } void publish_with_pool() { auto msg = memory_pool_->acquire(); msg->data = "Pool optimized message " + std::to_string(message_count_++); bool success = writer_->write(msg.get()); if (success) { auto release_timer = std::make_shared<std::thread>([this, msg]() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); memory_pool_->release(msg); }); release_timer->detach(); } if (message_count_ % 1000 == 0) { std::cout << "Pool usage: " << memory_pool_->get_pool_usage() << "/50 objects" << std::endl; } }
private: void setup_optimized_publisher() { type_support_ = new eprosima::fastdds::dds::TypeSupport( new std_msgs::msg::StringPubSubType()); participant_->register_type(type_support_); topic_ = participant_->create_topic( "MemoryPoolTopic", type_support_->get_type_name(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); publisher_ = participant_->create_publisher( eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT); eprosima::fastdds::dds::DataWriterQos writer_qos; writer_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; writer_qos.history().depth = 1; writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; writer_ = publisher_->create_datawriter(topic_, writer_qos); } eprosima::fastdds::dds::DomainParticipant* participant_; std::unique_ptr<FastDDSMemoryPool<std_msgs::msg::String>> memory_pool_; eprosima::fastdds::dds::TypeSupport* type_support_; eprosima::fastdds::dds::Topic* topic_; eprosima::fastdds::dds::Publisher* publisher_; eprosima::fastdds::dds::DataWriter* writer_; int message_count_ = 0; };
|
吞吐量优化
高吞吐量配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| class HighThroughputFastDDS { public: HighThroughputFastDDS(eprosima::fastdds::dds::DomainParticipant* participant) : participant_(participant) { setup_high_throughput_configuration(); } void setup_high_throughput_configuration() { eprosima::fastdds::dds::DataWriterQos writer_qos; writer_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; writer_qos.history().depth = 100; writer_qos.resource_limits().max_samples = 1000; writer_qos.resource_limits().max_instances = 100; writer_qos.resource_limits().max_samples_per_instance = 100; writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; writer_qos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; writer_qos.transport_priority().value = 100; std::cout << "High throughput QoS configured" << std::endl; } void setup_batch_publishing() { eprosima::fastdds::dds::DataWriterQos batch_qos; batch_qos.batch().enable = true; batch_qos.batch().max_data_bytes = 65536; batch_qos.batch().max_samples = 100; std::cout << "Batch publishing configured" << std::endl; } void publish_batch_messages(const std::vector<std::string>& messages) { auto start = std::chrono::high_resolution_clock::now(); for (const auto& msg_data : messages) { std_msgs::msg::String msg; msg.data = msg_data; writer_->write(&msg); } auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); double throughput = (messages.size() * 1000000.0) / duration; std::cout << "Batch published " << messages.size() << " messages in " << duration << " microseconds, throughput: " << throughput << " msg/s" << std::endl; }
private: eprosima::fastdds::dds::DomainParticipant* participant_; eprosima::fastdds::dds::DataWriter* writer_; };
|
多线程优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| #include <thread> #include <atomic> #include <vector>
class MultiThreadedFastDDS { public: MultiThreadedFastDDS(eprosima::fastdds::dds::DomainParticipant* participant) : participant_(participant), running_(true) { setup_multithreaded_publishing(); } ~MultiThreadedFastDDS() { stop_publishing(); } void setup_multithreaded_publishing() { for (int i = 0; i < PUBLISHER_THREADS; ++i) { publisher_threads_.emplace_back( std::bind(&MultiThreadedFastDDS::publisher_worker, this, i)); } stats_thread_ = std::thread( std::bind(&MultiThreadedFastDDS::statistics_worker, this)); std::cout << "Started " << PUBLISHER_THREADS << " publisher threads" << std::endl; } void publisher_worker(int thread_id) { std::cout << "Publisher thread " << thread_id << " started" << std::endl; int message_count = 0; auto last_time = std::chrono::high_resolution_clock::now(); while (running_) { std_msgs::msg::String msg; msg.data = "Thread " + std::to_string(thread_id) + " message " + std::to_string(message_count++); writer_->write(&msg); total_messages_++; auto current_time = std::chrono::high_resolution_clock::now(); auto elapsed = std::chrono::duration_cast<std::chrono::seconds>( current_time - last_time).count(); if (elapsed >= 1) { thread_stats_[thread_id] = message_count; last_time = current_time; } std::this_thread::sleep_for(std::chrono::microseconds(100)); } std::cout << "Publisher thread " << thread_id << " stopped" << std::endl; } void statistics_worker() { std::cout << "Statistics thread started" << std::endl; while (running_) { std::this_thread::sleep_for(std::chrono::seconds(5)); auto current_total = total_messages_.load(); auto current_time = std::chrono::high_resolution_clock::now(); if (last_stats_time_ != std::chrono::high_resolution_clock::time_point{}) { auto elapsed = std::chrono::duration_cast<std::chrono::seconds>( current_time - last_stats_time_).count(); if (elapsed > 0) { double throughput = (current_total - last_total_messages_) / elapsed; std::cout << "Throughput: " << throughput << " msg/s, " << "Total: " << current_total << " messages" << std::endl; for (int i = 0; i < PUBLISHER_THREADS; ++i) { std::cout << " Thread " << i << ": " << thread_stats_[i] << " msg/s" << std::endl; } } } last_total_messages_ = current_total; last_stats_time_ = current_time; } std::cout << "Statistics thread stopped" << std::endl; } void stop_publishing() { running_ = false; for (auto& thread : publisher_threads_) { if (thread.joinable()) { thread.join(); } } if (stats_thread_.joinable()) { stats_thread_.join(); } std::cout << "All publisher threads stopped" << std::endl; }
private: static constexpr int PUBLISHER_THREADS = 4; eprosima::fastdds::dds::DomainParticipant* participant_; eprosima::fastdds::dds::DataWriter* writer_; std::atomic<bool> running_; std::atomic<uint64_t> total_messages_{0}; std::vector<std::thread> publisher_threads_; std::thread stats_thread_; std::array<int, PUBLISHER_THREADS> thread_stats_{}; uint64_t last_total_messages_ = 0; std::chrono::high_resolution_clock::time_point last_stats_time_; };
|
传输层优化
UDP传输优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| #include <fastdds/rtps/transport/UDPv4TransportDescriptor.h>
class TransportOptimizer { public: TransportOptimizer() { setup_udp_optimization(); setup_shared_memory_optimization(); } void setup_udp_optimization() { auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>(); udp_transport->sendBufferSize = 65536; udp_transport->receiveBufferSize = 65536; udp_transport->multicast_outbound_interface = "0.0.0.0"; udp_transport->interfaceWhiteList.push_back("127.0.0.1"); udp_transport->interfaceWhiteList.push_back("192.168.1.0/24"); std::cout << "UDP transport optimized" << std::endl; } void setup_shared_memory_optimization() { auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>(); shm_transport->segment_size(1024 * 1024); shm_transport->port_queue_capacity(512); shm_transport->segment_cleaning_delay_ms(100); std::cout << "Shared memory transport optimized" << std::endl; } eprosima::fastdds::dds::DomainParticipantQos create_optimized_participant_qos() { eprosima::fastdds::dds::DomainParticipantQos qos; auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>(); udp_transport->sendBufferSize = 65536; udp_transport->receiveBufferSize = 65536; qos.transport().user_transports.push_back(udp_transport); qos.wire_protocol().builtin.discovery_config.discoveryProtocol = eprosima::fastdds::rtps::DiscoveryProtocol::SIMPLE; qos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::Duration_t(30, 0); qos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = eprosima::fastrtps::Duration_t(3, 0); std::cout << "Optimized participant QoS created" << std::endl; return qos; } };
|
性能基准测试
综合性能测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
| #include <benchmark/benchmark.h>
class FastDDSPerformanceBenchmark { public: FastDDSPerformanceBenchmark() { setup_benchmark_environment(); } void setup_benchmark_environment() { auto qos = transport_optimizer_.create_optimized_participant_qos(); participant_ = factory_->create_participant(DOMAIN_ID, qos); setup_benchmark_topics(); } void setup_benchmark_topics() { setup_latency_benchmark(); setup_throughput_benchmark(); setup_memory_benchmark(); } void benchmark_latency() { const int num_samples = 10000; std::vector<double> latencies; latencies.reserve(num_samples); for (int i = 0; i < num_samples; ++i) { auto start = std::chrono::high_resolution_clock::now(); std_msgs::msg::String msg; msg.data = "Latency test " + std::to_string(i); latency_writer_->write(&msg); while (latency_received_count_ <= i) { std::this_thread::sleep_for(std::chrono::microseconds(1)); } auto end = std::chrono::high_resolution_clock::now(); auto latency = std::chrono::duration<double, std::micro>(end - start).count(); latencies.push_back(latency); } std::sort(latencies.begin(), latencies.end()); double min_latency = latencies.front(); double max_latency = latencies.back(); double median_latency = latencies[latencies.size() / 2]; double p95_latency = latencies[static_cast<size_t>(latencies.size() * 0.95)]; double p99_latency = latencies[static_cast<size_t>(latencies.size() * 0.99)]; std::cout << "Latency Benchmark Results:" << std::endl; std::cout << " Min: " << min_latency << " μs" << std::endl; std::cout << " Max: " << max_latency << " μs" << std::endl; std::cout << " Median: " << median_latency << " μs" << std::endl; std::cout << " P95: " << p95_latency << " μs" << std::endl; std::cout << " P99: " << p99_latency << " μs" << std::endl; } void benchmark_throughput() { const int num_messages = 100000; const auto test_duration = std::chrono::seconds(10); auto start_time = std::chrono::high_resolution_clock::now(); auto end_time = start_time + test_duration; int message_count = 0; while (std::chrono::high_resolution_clock::now() < end_time) { std_msgs::msg::String msg; msg.data = "Throughput test " + std::to_string(message_count++); throughput_writer_->write(&msg); } auto actual_end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration<double>(actual_end - start_time).count(); double throughput = message_count / duration; std::cout << "Throughput Benchmark Results:" << std::endl; std::cout << " Messages: " << message_count << std::endl; std::cout << " Duration: " << duration << " seconds" << std::endl; std::cout << " Throughput: " << throughput << " msg/s" << std::endl; } void benchmark_memory_usage() { const int num_allocations = 10000; auto start = std::chrono::high_resolution_clock::now(); std::vector<std_msgs::msg::String> messages; messages.reserve(num_allocations); for (int i = 0; i < num_allocations; ++i) { std_msgs::msg::String msg; msg.data = "Memory test " + std::to_string(i); messages.push_back(msg); } auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration<double, std::milli>(end - start).count(); std::cout << "Memory Benchmark Results:" << std::endl; std::cout << " Allocations: " << num_allocations << std::endl; std::cout << " Duration: " << duration << " ms" << std::endl; std::cout << " Rate: " << num_allocations / duration << " allocs/ms" << std::endl; } void run_all_benchmarks() { std::cout << "Starting FastDDS Performance Benchmarks..." << std::endl; benchmark_latency(); benchmark_throughput(); benchmark_memory_usage(); std::cout << "All benchmarks completed!" << std::endl; }
private: void setup_latency_benchmark() { type_support_ = new eprosima::fastdds::dds::TypeSupport( new std_msgs::msg::StringPubSubType()); participant_->register_type(type_support_); topic_ = participant_->create_topic( "LatencyTopic", type_support_->get_type_name(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); publisher_ = participant_->create_publisher( eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT); eprosima::fastdds::dds::DataWriterQos writer_qos; writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; writer_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; writer_qos.history().depth = 1; latency_writer_ = publisher_->create_datawriter(topic_, writer_qos); } void setup_throughput_benchmark() { eprosima::fastdds::dds::DataWriterQos throughput_qos; throughput_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; throughput_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; throughput_qos.history().depth = 100; throughput_writer_ = publisher_->create_datawriter(topic_, throughput_qos); } void setup_memory_benchmark() { } static constexpr uint32_t DOMAIN_ID = 1; TransportOptimizer transport_optimizer_; eprosima::fastdds::dds::DomainParticipantFactory* factory_ = eprosima::fastdds::dds::DomainParticipantFactory::get_instance(); eprosima::fastdds::dds::DomainParticipant* participant_; eprosima::fastdds::dds::TypeSupport* type_support_; eprosima::fastdds::dds::Topic* topic_; eprosima::fastdds::dds::Publisher* publisher_; eprosima::fastdds::dds::DataWriter* latency_writer_; eprosima::fastdds::dds::DataWriter* throughput_writer_; std::atomic<int> latency_received_count_{0}; };
int main() { FastDDSPerformanceBenchmark benchmark; benchmark.run_all_benchmarks(); return 0; }
|
总结
FastDDS性能调优的关键策略:
- 延迟优化:零拷贝消息传递、内存池、最佳QoS配置
- 吞吐量优化:批量发布、多线程、高吞吐量QoS
- 传输优化:UDP参数调优、共享内存、网络接口配置
- 内存管理:内存池、对象复用、资源限制
- 监控和测试:性能监控、基准测试、统计分析
通过实施这些优化策略,可以显著提升FastDDS应用程序的性能,满足高实时性和高吞吐量的应用需求。