1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
import rospy import yaml import json
class ParameterServerDemo: def __init__(self): rospy.init_node('parameter_server_demo') self.setup_parameters() self.param_sub = rospy.Subscriber('/parameter_updates', rospy.AnyMsg, self.param_update_callback) def setup_parameters(self): """设置参数""" rospy.set_param('/robot/name', 'TurtleBot') rospy.set_param('/robot/model', 'Kobuki') rospy.set_param('/robot/max_speed', 1.0) rospy.set_param('/robot/max_angular_speed', 2.0) rospy.set_param('/robot/safety_enabled', True) rospy.set_param('/robot/autonomous_mode', False) rospy.set_param('/robot/sensors', ['laser', 'camera', 'imu']) config = { 'laser': {'frequency': 10, 'range': 10.0}, 'camera': {'width': 640, 'height': 480, 'fps': 30} } rospy.set_param('/robot/sensor_config', config) rospy.loginfo("Parameters set successfully") def param_update_callback(self, msg): """参数更新回调""" rospy.loginfo("Parameter update received") def demonstrate_parameter_operations(self): """演示参数操作""" rospy.loginfo("=== Parameter Operations Demo ===") robot_name = rospy.get_param('/robot/name', 'Unknown') max_speed = rospy.get_param('/robot/max_speed', 0.0) sensors = rospy.get_param('/robot/sensors', []) rospy.loginfo(f"Robot name: {robot_name}") rospy.loginfo(f"Max speed: {max_speed}") rospy.loginfo(f"Sensors: {sensors}") if rospy.has_param('/robot/safety_enabled'): safety_enabled = rospy.get_param('/robot/safety_enabled') rospy.loginfo(f"Safety enabled: {safety_enabled}") param_names = rospy.get_param_names() rospy.loginfo(f"Total parameters: {len(param_names)}") robot_params = [name for name in param_names if name.startswith('/robot/')] rospy.loginfo(f"Robot parameters: {robot_params}") if rospy.has_param('/robot/temp_param'): rospy.delete_param('/robot/temp_param') rospy.loginfo("Temporary parameter deleted") def load_parameters_from_file(self, filename): """从文件加载参数""" try: with open(filename, 'r') as f: params = yaml.safe_load(f) self._set_params_recursive('', params) rospy.loginfo(f"Parameters loaded from {filename}") except Exception as e: rospy.logerr(f"Failed to load parameters: {e}") def _set_params_recursive(self, prefix, params): """递归设置参数""" for key, value in params.items(): param_name = f"{prefix}/{key}" if prefix else f"/{key}" if isinstance(value, dict): self._set_params_recursive(param_name, value) else: rospy.set_param(param_name, value) def save_parameters_to_file(self, filename): """保存参数到文件""" try: params = {} param_names = rospy.get_param_names() for param_name in param_names: value = rospy.get_param(param_name) self._set_dict_value(params, param_name, value) with open(filename, 'w') as f: yaml.dump(params, f, default_flow_style=False) rospy.loginfo(f"Parameters saved to {filename}") except Exception as e: rospy.logerr(f"Failed to save parameters: {e}") def _set_dict_value(self, d, key_path, value): """设置字典值""" keys = key_path.strip('/').split('/') current = d for key in keys[:-1]: if key not in current: current[key] = {} current = current[key] current[keys[-1]] = value def run_demo(self): """运行演示""" self.demonstrate_parameter_operations() self.save_parameters_to_file('/tmp/robot_params.yaml') rospy.spin()
if __name__ == '__main__': try: demo = ParameterServerDemo() demo.run_demo() except rospy.ROSInterruptException: pass
|