ROS1节点生命周期管理 - 启动、运行、关闭与异常处理

ROS1节点生命周期管理 - 启动、运行、关闭与异常处理

前言

在ROS1系统中,节点的生命周期管理是确保系统稳定运行的关键。本文将深入分析ROS1节点的启动、运行、关闭过程,以及异常处理和资源管理的最佳实践。

节点生命周期概述

生命周期阶段

1
2
3
4
5
6
7
8
9
10
11
┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ 初始化 │───▶│ 运行 │───▶│ 关闭 │───▶│ 清理 │
│ Initialization │ │ Running │ │ Shutdown │ │ Cleanup │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ 参数设置 │ │ 消息处理 │ │ 资源释放 │ │ 内存清理 │
│ 订阅发布 │ │ 回调执行 │ │ 连接断开 │ │ 线程结束 │
│ 服务注册 │ │ 状态更新 │ │ 信号处理 │ │ 日志记录 │
└─────────┘ └─────────┘ └─────────┘ └─────────┘

节点启动阶段

完整的节点启动实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
import threading
import time
import signal
import sys
from std_msgs.msg import String, Int32
from geometry_msgs.msg import Twist

class RobustNode:
def __init__(self, node_name):
"""节点初始化"""
self.node_name = node_name
self.is_initialized = False
self.is_running = False
self.is_shutting_down = False

# 线程管理
self.worker_threads = []
self.shutdown_event = threading.Event()

# 统计信息
self.stats = {
'start_time': None,
'messages_processed': 0,
'errors_count': 0,
'uptime': 0
}

# 注册信号处理器
self._setup_signal_handlers()

# 初始化ROS节点
self._initialize_ros_node()

def _setup_signal_handlers(self):
"""设置信号处理器"""
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)

def _signal_handler(self, signum, frame):
"""信号处理器"""
rospy.logwarn(f"Received signal {signum}, initiating shutdown...")
self.shutdown()

def _initialize_ros_node(self):
"""初始化ROS节点"""
try:
rospy.init_node(self.node_name, anonymous=False)

# 获取参数
self.robot_name = rospy.get_param('~robot_name', 'default_robot')
self.publish_rate = rospy.get_param('~publish_rate', 10.0)
self.enable_debug = rospy.get_param('~debug', False)

rospy.loginfo(f"Node {self.node_name} initialized successfully")
rospy.loginfo(f"Robot name: {self.robot_name}")
rospy.loginfo(f"Publish rate: {self.publish_rate} Hz")

self.is_initialized = True

except Exception as e:
rospy.logerr(f"Failed to initialize ROS node: {e}")
raise

def setup_publishers(self):
"""设置发布者"""
try:
self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)
self.status_pub = rospy.Publisher('/status', String, queue_size=10)
self.counter_pub = rospy.Publisher('/counter', Int32, queue_size=10)

rospy.loginfo("Publishers initialized")

except Exception as e:
rospy.logerr(f"Failed to setup publishers: {e}")
raise

def setup_subscribers(self):
"""设置订阅者"""
try:
self.status_sub = rospy.Subscriber('/status', String, self.status_callback)
self.emergency_sub = rospy.Subscriber('/emergency_stop', String, self.emergency_callback)

rospy.loginfo("Subscribers initialized")

except Exception as e:
rospy.logerr(f"Failed to setup subscribers: {e}")
raise

def status_callback(self, msg):
"""状态消息回调"""
try:
self.stats['messages_processed'] += 1
rospy.loginfo(f"Received status: {msg.data}")

except Exception as e:
self.stats['errors_count'] += 1
rospy.logerr(f"Error in status callback: {e}")

def emergency_callback(self, msg):
"""紧急停止回调"""
rospy.logwarn(f"Emergency stop received: {msg.data}")
self.shutdown()

def start(self):
"""启动节点"""
if not self.is_initialized:
raise RuntimeError("Node not initialized")

try:
# 设置发布者和订阅者
self.setup_publishers()
self.setup_subscribers()

# 启动工作线程
self._start_worker_threads()

self.is_running = True
self.stats['start_time'] = time.time()

rospy.loginfo(f"Node {self.node_name} started successfully")

except Exception as e:
rospy.logerr(f"Failed to start node: {e}")
self.cleanup()
raise

def _start_worker_threads(self):
"""启动工作线程"""
# 状态发布线程
status_thread = threading.Thread(target=self._status_worker, daemon=True)
status_thread.start()
self.worker_threads.append(status_thread)

# 计数器线程
counter_thread = threading.Thread(target=self._counter_worker, daemon=True)
counter_thread.start()
self.worker_threads.append(counter_thread)

# 统计线程
stats_thread = threading.Thread(target=self._stats_worker, daemon=True)
stats_thread.start()
self.worker_threads.append(stats_thread)

rospy.loginfo(f"Started {len(self.worker_threads)} worker threads")

def _status_worker(self):
"""状态发布工作线程"""
rate = rospy.Rate(self.publish_rate)

while not self.shutdown_event.is_set() and not rospy.is_shutdown():
try:
# 发布状态信息
status_msg = String()
status_msg.data = f"Node {self.node_name} running, uptime: {self._get_uptime():.1f}s"
self.status_pub.publish(status_msg)

rate.sleep()

except Exception as e:
self.stats['errors_count'] += 1
rospy.logerr(f"Error in status worker: {e}")

def _counter_worker(self):
"""计数器工作线程"""
rate = rospy.Rate(1.0) # 1 Hz
counter = 0

while not self.shutdown_event.is_set() and not rospy.is_shutdown():
try:
# 发布计数器
counter_msg = Int32()
counter_msg.data = counter
self.counter_pub.publish(counter_msg)

counter += 1
rate.sleep()

except Exception as e:
self.stats['errors_count'] += 1
rospy.logerr(f"Error in counter worker: {e}")

def _stats_worker(self):
"""统计信息工作线程"""
rate = rospy.Rate(0.5) # 0.5 Hz

while not self.shutdown_event.is_set() and not rospy.is_shutdown():
try:
uptime = self._get_uptime()
self.stats['uptime'] = uptime

if self.enable_debug:
rospy.loginfo(f"Node Stats - Uptime: {uptime:.1f}s, "
f"Messages: {self.stats['messages_processed']}, "
f"Errors: {self.stats['errors_count']}")

rate.sleep()

except Exception as e:
rospy.logerr(f"Error in stats worker: {e}")

def _get_uptime(self):
"""获取运行时间"""
if self.stats['start_time']:
return time.time() - self.stats['start_time']
return 0.0

def run(self):
"""运行节点主循环"""
if not self.is_running:
raise RuntimeError("Node not started")

try:
rospy.loginfo(f"Node {self.node_name} entering main loop")
rospy.spin()

except rospy.ROSInterruptException:
rospy.loginfo("Received ROS interrupt")
except Exception as e:
rospy.logerr(f"Error in main loop: {e}")
self.stats['errors_count'] += 1
finally:
self.shutdown()

def shutdown(self):
"""关闭节点"""
if self.is_shutting_down:
return

self.is_shutting_down = True
rospy.loginfo(f"Shutting down node {self.node_name}...")

try:
# 设置关闭事件
self.shutdown_event.set()

# 等待工作线程结束
for thread in self.worker_threads:
if thread.is_alive():
thread.join(timeout=2.0)

# 发布最终状态
final_status = String()
final_status.data = f"Node {self.node_name} shutting down"
try:
self.status_pub.publish(final_status)
except:
pass # 忽略发布错误

# 清理资源
self.cleanup()

self.is_running = False
rospy.loginfo(f"Node {self.node_name} shutdown complete")

except Exception as e:
rospy.logerr(f"Error during shutdown: {e}")

def cleanup(self):
"""清理资源"""
try:
# 关闭发布者和订阅者
if hasattr(self, 'cmd_vel_pub'):
self.cmd_vel_pub.unregister()
if hasattr(self, 'status_pub'):
self.status_pub.unregister()
if hasattr(self, 'counter_pub'):
self.counter_pub.unregister()

# 记录最终统计信息
uptime = self._get_uptime()
rospy.loginfo(f"Final stats - Uptime: {uptime:.1f}s, "
f"Messages: {self.stats['messages_processed']}, "
f"Errors: {self.stats['errors_count']}")

except Exception as e:
rospy.logerr(f"Error during cleanup: {e}")

def get_node_info(self):
"""获取节点信息"""
return {
'name': self.node_name,
'is_initialized': self.is_initialized,
'is_running': self.is_running,
'is_shutting_down': self.is_shutting_down,
'uptime': self._get_uptime(),
'stats': self.stats.copy()
}

# 使用示例
if __name__ == '__main__':
try:
# 创建节点
node = RobustNode('robust_example_node')

# 启动节点
node.start()

# 运行节点
node.run()

except KeyboardInterrupt:
rospy.loginfo("Keyboard interrupt received")
except Exception as e:
rospy.logerr(f"Node error: {e}")
finally:
if 'node' in locals():
node.shutdown()

异常处理机制

全面的异常处理框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
import traceback
import logging
from std_msgs.msg import String
import functools
import time

class ExceptionHandler:
"""异常处理器"""

def __init__(self, node_name):
self.node_name = node_name
self.error_count = 0
self.error_history = []

def handle_exception(self, e, context=""):
"""处理异常"""
self.error_count += 1
error_info = {
'timestamp': time.time(),
'exception': str(e),
'context': context,
'traceback': traceback.format_exc()
}
self.error_history.append(error_info)

rospy.logerr(f"Exception in {self.node_name}: {e}")
if rospy.get_param('~debug', False):
rospy.logerr(f"Traceback: {traceback.format_exc()}")

def get_error_stats(self):
"""获取错误统计"""
return {
'total_errors': self.error_count,
'recent_errors': len(self.error_history[-10:]) # 最近10个错误
}

def exception_handler(handler):
"""异常处理装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
handler.handle_exception(e, func.__name__)
return None
return wrapper
return decorator

class ResilientNode:
"""具有异常恢复能力的节点"""

def __init__(self, node_name):
rospy.init_node(node_name)

# 异常处理器
self.exception_handler = ExceptionHandler(node_name)

# 节点状态
self.is_healthy = True
self.health_check_interval = 5.0 # 5秒健康检查
self.last_health_check = time.time()

# 创建发布者
self.status_pub = rospy.Publisher('/node_health', String, queue_size=10)

# 设置定时器进行健康检查
self.health_timer = rospy.Timer(
rospy.Duration(self.health_check_interval),
self.health_check_callback
)

@exception_handler
def safe_callback(self, msg):
"""安全的消息回调"""
rospy.loginfo(f"Processing message: {msg.data}")

# 模拟可能的异常
if "error" in msg.data.lower():
raise ValueError("Simulated error in message processing")

@exception_handler
def health_check_callback(self, event):
"""健康检查回调"""
current_time = time.time()

# 检查错误率
error_stats = self.exception_handler.get_error_stats()
if error_stats['total_errors'] > 10:
self.is_healthy = False
rospy.logwarn("Node marked as unhealthy due to high error rate")

# 发布健康状态
health_msg = String()
health_msg.data = f"Healthy: {self.is_healthy}, Errors: {error_stats['total_errors']}"
self.status_pub.publish(health_msg)

self.last_health_check = current_time

def run_with_recovery(self):
"""带恢复机制运行"""
while not rospy.is_shutdown():
try:
# 主要工作循环
self.main_work_loop()

except Exception as e:
self.exception_handler.handle_exception(e, "main_work_loop")

# 尝试恢复
if self.attempt_recovery():
rospy.loginfo("Recovery successful, continuing...")
else:
rospy.logerr("Recovery failed, shutting down...")
break

rospy.sleep(0.1)

def main_work_loop(self):
"""主要工作循环"""
# 模拟主要工作
rospy.sleep(1.0)

def attempt_recovery(self):
"""尝试恢复"""
try:
# 重置状态
self.is_healthy = True
rospy.loginfo("Attempting node recovery...")

# 执行恢复操作
time.sleep(1.0) # 模拟恢复时间

rospy.loginfo("Node recovery completed")
return True

except Exception as e:
self.exception_handler.handle_exception(e, "attempt_recovery")
return False

# 使用示例
if __name__ == '__main__':
try:
node = ResilientNode('resilient_example_node')

# 创建订阅者
rospy.Subscriber('/test_topic', String, node.safe_callback)

# 运行节点
node.run_with_recovery()

except rospy.ROSInterruptException:
pass

资源管理

内存和连接管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
import gc
import psutil
import threading
import time
from std_msgs.msg import String

class ResourceManager:
"""资源管理器"""

def __init__(self):
self.process = psutil.Process()
self.monitoring = False
self.monitor_thread = None

def start_monitoring(self):
"""开始资源监控"""
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_resources, daemon=True)
self.monitor_thread.start()

def stop_monitoring(self):
"""停止资源监控"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()

def _monitor_resources(self):
"""监控资源使用"""
while self.monitoring and not rospy.is_shutdown():
try:
# 内存使用
memory_info = self.process.memory_info()
memory_percent = self.process.memory_percent()

# CPU使用
cpu_percent = self.process.cpu_percent()

# 文件描述符
num_fds = self.process.num_fds() if hasattr(self.process, 'num_fds') else 0

rospy.loginfo(f"Resources - Memory: {memory_info.rss/1024/1024:.1f}MB "
f"({memory_percent:.1f}%), CPU: {cpu_percent:.1f}%, "
f"FDs: {num_fds}")

# 内存使用过高时触发垃圾回收
if memory_percent > 80.0:
rospy.logwarn("High memory usage detected, triggering garbage collection")
gc.collect()

time.sleep(10) # 每10秒检查一次

except Exception as e:
rospy.logerr(f"Error monitoring resources: {e}")

def cleanup_resources(self):
"""清理资源"""
rospy.loginfo("Cleaning up resources...")

# 停止监控
self.stop_monitoring()

# 强制垃圾回收
gc.collect()

# 记录最终资源使用
memory_info = self.process.memory_info()
rospy.loginfo(f"Final memory usage: {memory_info.rss/1024/1024:.1f}MB")

class ResourceAwareNode:
"""具有资源感知能力的节点"""

def __init__(self, node_name):
rospy.init_node(node_name)

# 资源管理器
self.resource_manager = ResourceManager()
self.resource_manager.start_monitoring()

# 创建发布者
self.status_pub = rospy.Publisher('/resource_status', String, queue_size=10)

# 消息缓存(限制大小)
self.message_cache = []
self.max_cache_size = 100

def add_to_cache(self, message):
"""添加到缓存(带大小限制)"""
self.message_cache.append(message)

# 限制缓存大小
if len(self.message_cache) > self.max_cache_size:
self.message_cache.pop(0) # 移除最旧的消息

def cleanup(self):
"""清理节点资源"""
rospy.loginfo("Cleaning up node resources...")

# 清理消息缓存
self.message_cache.clear()

# 清理资源管理器
self.resource_manager.cleanup_resources()

def run(self):
"""运行节点"""
try:
rospy.spin()
finally:
self.cleanup()

# 使用示例
if __name__ == '__main__':
try:
node = ResourceAwareNode('resource_aware_node')
node.run()
except rospy.ROSInterruptException:
pass

总结

ROS1节点生命周期管理的关键要点:

  1. 初始化阶段:参数设置、发布者/订阅者创建、信号处理
  2. 运行阶段:多线程管理、异常处理、资源监控
  3. 关闭阶段:优雅关闭、资源清理、统计信息记录
  4. 异常处理:全面的异常捕获和恢复机制
  5. 资源管理:内存监控、连接管理、垃圾回收

通过实施这些最佳实践,可以创建稳定、可靠的ROS1节点,确保系统在高负载和异常情况下仍能正常运行。