FastDDS性能调优实战 - 延迟优化与吞吐量提升

FastDDS性能调优实战 - 延迟优化与吞吐量提升

前言

FastDDS的性能调优是构建高性能分布式系统的关键。本文将深入分析FastDDS的性能瓶颈,并提供具体的调优策略和实现方案,包括延迟优化、吞吐量提升、内存管理等方面。

FastDDS性能分析

性能监控框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <chrono>
#include <thread>
#include <vector>

class FastDDSPerformanceMonitor {
public:
FastDDSPerformanceMonitor() {
// 创建域参与者
participant_ = factory_->create_participant(
DOMAIN_ID,
eprosima::fastdds::dds::PARTICIPANT_QOS_DEFAULT);

setup_performance_monitoring();
}

~FastDDSPerformanceMonitor() {
if (participant_) {
factory_->delete_participant(participant_);
}
}

private:
void setup_performance_monitoring() {
// 配置性能监控参数
eprosima::fastdds::dds::DomainParticipantQos qos;

// 启用统计信息收集
qos.properties().properties().emplace_back(
"dds.domain_participant.rtps.builtin.enable_statistics_collection", "true");

// 设置统计信息更新间隔
qos.properties().properties().emplace_back(
"dds.domain_participant.rtps.builtin.statistics_collection_period", "100");

participant_->set_qos(qos);

std::cout << "Performance monitoring enabled" << std::endl;
}

void collect_statistics() {
// 收集统计信息
auto start_time = std::chrono::high_resolution_clock::now();

// 模拟数据收集
std::this_thread::sleep_for(std::chrono::milliseconds(100));

auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time).count();

std::cout << "Statistics collection took: " << duration << " microseconds" << std::endl;
}

static constexpr uint32_t DOMAIN_ID = 0;

eprosima::fastdds::dds::DomainParticipantFactory* factory_ =
eprosima::fastdds::dds::DomainParticipantFactory::get_instance();
eprosima::fastdds::dds::DomainParticipant* participant_;
};

延迟优化策略

1. 零拷贝优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>

template<typename T>
class ZeroCopyFastDDS {
public:
ZeroCopyFastDDS(eprosima::fastdds::dds::DomainParticipant* participant,
const std::string& topic_name)
: participant_(participant), topic_name_(topic_name) {

setup_zero_copy_optimization();
}

void setup_zero_copy_optimization() {
// 配置零拷贝QoS
eprosima::fastdds::dds::DataWriterQos writer_qos;

// 设置历史策略为KEEP_LAST,减少内存使用
writer_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
writer_qos.history().depth = 1;

// 设置资源限制
writer_qos.resource_limits().max_samples = 1;
writer_qos.resource_limits().max_instances = 1;
writer_qos.resource_limits().max_samples_per_instance = 1;

// 设置可靠性为BEST_EFFORT以减少延迟
writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS;

// 设置持久性为VOLATILE
writer_qos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS;

std::cout << "Zero copy QoS configured for topic: " << topic_name_ << std::endl;
}

bool publish_zero_copy(const T& data) {
auto start = std::chrono::high_resolution_clock::now();

// 直接写入数据,避免拷贝
bool result = writer_->write(&data) == eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK;

auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();

if (duration > 1000) { // 超过1微秒
std::cout << "Publish latency: " << duration << " nanoseconds" << std::endl;
}

return result;
}

private:
eprosima::fastdds::dds::DomainParticipant* participant_;
std::string topic_name_;
eprosima::fastdds::dds::DataWriter* writer_;
};

2. 内存池优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#include <memory>
#include <vector>
#include <mutex>
#include <queue>

template<typename T>
class FastDDSMemoryPool {
public:
FastDDSMemoryPool(size_t pool_size = 100) : pool_size_(pool_size) {
// 预分配内存池
for (size_t i = 0; i < pool_size_; ++i) {
available_objects_.push(std::make_shared<T>());
}

std::cout << "Memory pool created with " << pool_size_ << " objects" << std::endl;
}

std::shared_ptr<T> acquire() {
std::lock_guard<std::mutex> lock(mutex_);

if (available_objects_.empty()) {
// 池为空,创建新对象
return std::make_shared<T>();
}

auto obj = available_objects_.front();
available_objects_.pop();
return obj;
}

void release(std::shared_ptr<T> obj) {
if (!obj) return;

std::lock_guard<std::mutex> lock(mutex_);

if (available_objects_.size() < pool_size_) {
// 重置对象状态
reset_object(obj);
available_objects_.push(obj);
}
// 如果池已满,则丢弃对象
}

size_t get_pool_usage() const {
std::lock_guard<std::mutex> lock(mutex_);
return pool_size_ - available_objects_.size();
}

private:
void reset_object(std::shared_ptr<T> obj) {
// 重置对象到初始状态
*obj = T{};
}

size_t pool_size_;
std::queue<std::shared_ptr<T>> available_objects_;
mutable std::mutex mutex_;
};

class MemoryPoolOptimizedPublisher {
public:
MemoryPoolOptimizedPublisher(eprosima::fastdds::dds::DomainParticipant* participant)
: participant_(participant) {

// 创建内存池
memory_pool_ = std::make_unique<FastDDSMemoryPool<std_msgs::msg::String>>(50);

setup_optimized_publisher();
}

void publish_with_pool() {
// 从池中获取消息对象
auto msg = memory_pool_->acquire();

// 快速填充数据
msg->data = "Pool optimized message " + std::to_string(message_count_++);

// 发布消息
bool success = writer_->write(msg.get());

if (success) {
// 延迟释放到池中
auto release_timer = std::make_shared<std::thread>([this, msg]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
memory_pool_->release(msg);
});
release_timer->detach();
}

// 定期输出池使用情况
if (message_count_ % 1000 == 0) {
std::cout << "Pool usage: " << memory_pool_->get_pool_usage()
<< "/50 objects" << std::endl;
}
}

private:
void setup_optimized_publisher() {
// 注册类型
type_support_ = new eprosima::fastdds::dds::TypeSupport(
new std_msgs::msg::StringPubSubType());
participant_->register_type(type_support_);

// 创建主题
topic_ = participant_->create_topic(
"MemoryPoolTopic",
type_support_->get_type_name(),
eprosima::fastdds::dds::TOPIC_QOS_DEFAULT);

// 创建发布者
publisher_ = participant_->create_publisher(
eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT);

// 创建优化的数据写入器
eprosima::fastdds::dds::DataWriterQos writer_qos;
writer_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
writer_qos.history().depth = 1;
writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS;

writer_ = publisher_->create_datawriter(topic_, writer_qos);
}

eprosima::fastdds::dds::DomainParticipant* participant_;
std::unique_ptr<FastDDSMemoryPool<std_msgs::msg::String>> memory_pool_;

eprosima::fastdds::dds::TypeSupport* type_support_;
eprosima::fastdds::dds::Topic* topic_;
eprosima::fastdds::dds::Publisher* publisher_;
eprosima::fastdds::dds::DataWriter* writer_;

int message_count_ = 0;
};

吞吐量优化

高吞吐量配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class HighThroughputFastDDS {
public:
HighThroughputFastDDS(eprosima::fastdds::dds::DomainParticipant* participant)
: participant_(participant) {

setup_high_throughput_configuration();
}

void setup_high_throughput_configuration() {
// 配置高吞吐量QoS
eprosima::fastdds::dds::DataWriterQos writer_qos;

// 设置历史策略为KEEP_LAST,增加深度
writer_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
writer_qos.history().depth = 100;

// 设置资源限制
writer_qos.resource_limits().max_samples = 1000;
writer_qos.resource_limits().max_instances = 100;
writer_qos.resource_limits().max_samples_per_instance = 100;

// 设置可靠性为BEST_EFFORT以提高吞吐量
writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS;

// 设置持久性为VOLATILE
writer_qos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS;

// 设置发送缓冲区大小
writer_qos.transport_priority().value = 100;

std::cout << "High throughput QoS configured" << std::endl;
}

void setup_batch_publishing() {
// 配置批量发布
eprosima::fastdds::dds::DataWriterQos batch_qos;

// 设置批处理参数
batch_qos.batch().enable = true;
batch_qos.batch().max_data_bytes = 65536; // 64KB
batch_qos.batch().max_samples = 100;

std::cout << "Batch publishing configured" << std::endl;
}

void publish_batch_messages(const std::vector<std::string>& messages) {
auto start = std::chrono::high_resolution_clock::now();

for (const auto& msg_data : messages) {
std_msgs::msg::String msg;
msg.data = msg_data;
writer_->write(&msg);
}

auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();

double throughput = (messages.size() * 1000000.0) / duration; // messages per second

std::cout << "Batch published " << messages.size() << " messages in "
<< duration << " microseconds, throughput: "
<< throughput << " msg/s" << std::endl;
}

private:
eprosima::fastdds::dds::DomainParticipant* participant_;
eprosima::fastdds::dds::DataWriter* writer_;
};

多线程优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include <thread>
#include <atomic>
#include <vector>

class MultiThreadedFastDDS {
public:
MultiThreadedFastDDS(eprosima::fastdds::dds::DomainParticipant* participant)
: participant_(participant), running_(true) {

setup_multithreaded_publishing();
}

~MultiThreadedFastDDS() {
stop_publishing();
}

void setup_multithreaded_publishing() {
// 创建多个发布线程
for (int i = 0; i < PUBLISHER_THREADS; ++i) {
publisher_threads_.emplace_back(
std::bind(&MultiThreadedFastDDS::publisher_worker, this, i));
}

// 创建统计线程
stats_thread_ = std::thread(
std::bind(&MultiThreadedFastDDS::statistics_worker, this));

std::cout << "Started " << PUBLISHER_THREADS << " publisher threads" << std::endl;
}

void publisher_worker(int thread_id) {
std::cout << "Publisher thread " << thread_id << " started" << std::endl;

int message_count = 0;
auto last_time = std::chrono::high_resolution_clock::now();

while (running_) {
// 发布消息
std_msgs::msg::String msg;
msg.data = "Thread " + std::to_string(thread_id) +
" message " + std::to_string(message_count++);

writer_->write(&msg);

// 更新统计信息
total_messages_++;

// 定期输出统计信息
auto current_time = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
current_time - last_time).count();

if (elapsed >= 1) { // 每秒输出一次
thread_stats_[thread_id] = message_count;
last_time = current_time;
}

// 控制发布频率
std::this_thread::sleep_for(std::chrono::microseconds(100)); // 10kHz
}

std::cout << "Publisher thread " << thread_id << " stopped" << std::endl;
}

void statistics_worker() {
std::cout << "Statistics thread started" << std::endl;

while (running_) {
std::this_thread::sleep_for(std::chrono::seconds(5));

auto current_total = total_messages_.load();
auto current_time = std::chrono::high_resolution_clock::now();

if (last_stats_time_ != std::chrono::high_resolution_clock::time_point{}) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
current_time - last_stats_time_).count();

if (elapsed > 0) {
double throughput = (current_total - last_total_messages_) / elapsed;

std::cout << "Throughput: " << throughput << " msg/s, "
<< "Total: " << current_total << " messages" << std::endl;

// 输出各线程统计
for (int i = 0; i < PUBLISHER_THREADS; ++i) {
std::cout << " Thread " << i << ": " << thread_stats_[i] << " msg/s" << std::endl;
}
}
}

last_total_messages_ = current_total;
last_stats_time_ = current_time;
}

std::cout << "Statistics thread stopped" << std::endl;
}

void stop_publishing() {
running_ = false;

// 等待所有线程结束
for (auto& thread : publisher_threads_) {
if (thread.joinable()) {
thread.join();
}
}

if (stats_thread_.joinable()) {
stats_thread_.join();
}

std::cout << "All publisher threads stopped" << std::endl;
}

private:
static constexpr int PUBLISHER_THREADS = 4;

eprosima::fastdds::dds::DomainParticipant* participant_;
eprosima::fastdds::dds::DataWriter* writer_;

std::atomic<bool> running_;
std::atomic<uint64_t> total_messages_{0};

std::vector<std::thread> publisher_threads_;
std::thread stats_thread_;

std::array<int, PUBLISHER_THREADS> thread_stats_{};

uint64_t last_total_messages_ = 0;
std::chrono::high_resolution_clock::time_point last_stats_time_;
};

传输层优化

UDP传输优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.h>

class TransportOptimizer {
public:
TransportOptimizer() {
setup_udp_optimization();
setup_shared_memory_optimization();
}

void setup_udp_optimization() {
// 创建UDP传输描述符
auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();

// 优化UDP参数
udp_transport->sendBufferSize = 65536; // 64KB发送缓冲区
udp_transport->receiveBufferSize = 65536; // 64KB接收缓冲区

// 设置多播配置
udp_transport->multicast_outbound_interface = "0.0.0.0";

// 设置网络接口
udp_transport->interfaceWhiteList.push_back("127.0.0.1");
udp_transport->interfaceWhiteList.push_back("192.168.1.0/24");

std::cout << "UDP transport optimized" << std::endl;
}

void setup_shared_memory_optimization() {
// 创建共享内存传输描述符
auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();

// 设置共享内存段大小
shm_transport->segment_size(1024 * 1024); // 1MB

// 设置端口队列大小
shm_transport->port_queue_capacity(512);

// 设置段清理延迟
shm_transport->segment_cleaning_delay_ms(100);

std::cout << "Shared memory transport optimized" << std::endl;
}

eprosima::fastdds::dds::DomainParticipantQos create_optimized_participant_qos() {
eprosima::fastdds::dds::DomainParticipantQos qos;

// 配置传输层
auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
udp_transport->sendBufferSize = 65536;
udp_transport->receiveBufferSize = 65536;

qos.transport().user_transports.push_back(udp_transport);

// 设置RTPS参数
qos.wire_protocol().builtin.discovery_config.discoveryProtocol =
eprosima::fastdds::rtps::DiscoveryProtocol::SIMPLE;

// 优化发现配置
qos.wire_protocol().builtin.discovery_config.leaseDuration =
eprosima::fastrtps::Duration_t(30, 0); // 30秒

qos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod =
eprosima::fastrtps::Duration_t(3, 0); // 3秒

std::cout << "Optimized participant QoS created" << std::endl;

return qos;
}
};

性能基准测试

综合性能测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#include <benchmark/benchmark.h>

class FastDDSPerformanceBenchmark {
public:
FastDDSPerformanceBenchmark() {
setup_benchmark_environment();
}

void setup_benchmark_environment() {
// 创建优化的域参与者
auto qos = transport_optimizer_.create_optimized_participant_qos();

participant_ = factory_->create_participant(DOMAIN_ID, qos);

setup_benchmark_topics();
}

void setup_benchmark_topics() {
// 创建延迟测试主题
setup_latency_benchmark();

// 创建吞吐量测试主题
setup_throughput_benchmark();

// 创建内存使用测试主题
setup_memory_benchmark();
}

void benchmark_latency() {
const int num_samples = 10000;
std::vector<double> latencies;
latencies.reserve(num_samples);

for (int i = 0; i < num_samples; ++i) {
auto start = std::chrono::high_resolution_clock::now();

std_msgs::msg::String msg;
msg.data = "Latency test " + std::to_string(i);

latency_writer_->write(&msg);

// 等待消息被接收
while (latency_received_count_ <= i) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}

auto end = std::chrono::high_resolution_clock::now();
auto latency = std::chrono::duration<double, std::micro>(end - start).count();
latencies.push_back(latency);
}

// 计算统计信息
std::sort(latencies.begin(), latencies.end());
double min_latency = latencies.front();
double max_latency = latencies.back();
double median_latency = latencies[latencies.size() / 2];
double p95_latency = latencies[static_cast<size_t>(latencies.size() * 0.95)];
double p99_latency = latencies[static_cast<size_t>(latencies.size() * 0.99)];

std::cout << "Latency Benchmark Results:" << std::endl;
std::cout << " Min: " << min_latency << " μs" << std::endl;
std::cout << " Max: " << max_latency << " μs" << std::endl;
std::cout << " Median: " << median_latency << " μs" << std::endl;
std::cout << " P95: " << p95_latency << " μs" << std::endl;
std::cout << " P99: " << p99_latency << " μs" << std::endl;
}

void benchmark_throughput() {
const int num_messages = 100000;
const auto test_duration = std::chrono::seconds(10);

auto start_time = std::chrono::high_resolution_clock::now();
auto end_time = start_time + test_duration;

int message_count = 0;

while (std::chrono::high_resolution_clock::now() < end_time) {
std_msgs::msg::String msg;
msg.data = "Throughput test " + std::to_string(message_count++);

throughput_writer_->write(&msg);
}

auto actual_end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration<double>(actual_end - start_time).count();

double throughput = message_count / duration;

std::cout << "Throughput Benchmark Results:" << std::endl;
std::cout << " Messages: " << message_count << std::endl;
std::cout << " Duration: " << duration << " seconds" << std::endl;
std::cout << " Throughput: " << throughput << " msg/s" << std::endl;
}

void benchmark_memory_usage() {
const int num_allocations = 10000;

auto start = std::chrono::high_resolution_clock::now();

std::vector<std_msgs::msg::String> messages;
messages.reserve(num_allocations);

for (int i = 0; i < num_allocations; ++i) {
std_msgs::msg::String msg;
msg.data = "Memory test " + std::to_string(i);
messages.push_back(msg);
}

auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration<double, std::milli>(end - start).count();

std::cout << "Memory Benchmark Results:" << std::endl;
std::cout << " Allocations: " << num_allocations << std::endl;
std::cout << " Duration: " << duration << " ms" << std::endl;
std::cout << " Rate: " << num_allocations / duration << " allocs/ms" << std::endl;
}

void run_all_benchmarks() {
std::cout << "Starting FastDDS Performance Benchmarks..." << std::endl;

benchmark_latency();
benchmark_throughput();
benchmark_memory_usage();

std::cout << "All benchmarks completed!" << std::endl;
}

private:
void setup_latency_benchmark() {
// 配置延迟测试
type_support_ = new eprosima::fastdds::dds::TypeSupport(
new std_msgs::msg::StringPubSubType());
participant_->register_type(type_support_);

topic_ = participant_->create_topic(
"LatencyTopic",
type_support_->get_type_name(),
eprosima::fastdds::dds::TOPIC_QOS_DEFAULT);

publisher_ = participant_->create_publisher(
eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT);

// 配置最小延迟QoS
eprosima::fastdds::dds::DataWriterQos writer_qos;
writer_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS;
writer_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
writer_qos.history().depth = 1;

latency_writer_ = publisher_->create_datawriter(topic_, writer_qos);
}

void setup_throughput_benchmark() {
// 配置吞吐量测试
eprosima::fastdds::dds::DataWriterQos throughput_qos;
throughput_qos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS;
throughput_qos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
throughput_qos.history().depth = 100;

throughput_writer_ = publisher_->create_datawriter(topic_, throughput_qos);
}

void setup_memory_benchmark() {
// 内存测试使用相同的写入器
}

static constexpr uint32_t DOMAIN_ID = 1;

TransportOptimizer transport_optimizer_;
eprosima::fastdds::dds::DomainParticipantFactory* factory_ =
eprosima::fastdds::dds::DomainParticipantFactory::get_instance();
eprosima::fastdds::dds::DomainParticipant* participant_;

eprosima::fastdds::dds::TypeSupport* type_support_;
eprosima::fastdds::dds::Topic* topic_;
eprosima::fastdds::dds::Publisher* publisher_;
eprosima::fastdds::dds::DataWriter* latency_writer_;
eprosima::fastdds::dds::DataWriter* throughput_writer_;

std::atomic<int> latency_received_count_{0};
};

int main() {
FastDDSPerformanceBenchmark benchmark;
benchmark.run_all_benchmarks();
return 0;
}

总结

FastDDS性能调优的关键策略:

  1. 延迟优化:零拷贝消息传递、内存池、最佳QoS配置
  2. 吞吐量优化:批量发布、多线程、高吞吐量QoS
  3. 传输优化:UDP参数调优、共享内存、网络接口配置
  4. 内存管理:内存池、对象复用、资源限制
  5. 监控和测试:性能监控、基准测试、统计分析

通过实施这些优化策略,可以显著提升FastDDS应用程序的性能,满足高实时性和高吞吐量的应用需求。