test1.py 43 KB


  1. import requests
  2. import json
  3. import ipaddress
  4. import re
  5. import aiohttp # type: ignore
  6. import asyncio
  7. # 固件信息类
  8. class Firmware:
  9. def __init__(self, model: str, sn: str, fpga: str, core: str, aux: str):
  10. self.model = model
  11. self.sn = sn
  12. self.fpga = fpga
  13. self.core = core
  14. self.aux = aux
  15. # 系统监控数据类
  16. class Monitor:
  17. def __init__(self, load_average: float, men_useage: float, uptime: float):
  18. self.load_average = load_average
  19. self.men_useage = men_useage
  20. self.uptime = uptime
  21. # 网络信息类
  22. class NetWork:
  23. def __init__(self, carrier: bool, duplex: str, ethaddr: str, hostname: str, ipv4_dhcp: bool, ipv4_addr: str, ipv4_override: str, speed: int):
  24. self.carrier = carrier
  25. self.duplex = duplex
  26. self.ethaddr = ethaddr
  27. self.hostname = hostname
  28. self.ipv4_dhcp = ipv4_dhcp
  29. self.ipv4_addr = ipv4_addr
  30. self.ipv4_override = ipv4_override
  31. self.speed = speed
  32. # 服务器数据类
  33. class OverView:
  34. def __init__(self, scanfreq: int, motor_rpm: int, laser_enable: bool, resolution: float, scan_range_start: int, scan_range_stop: int, filter_level: int, window: int, filter_min_angle: int,
  35. filter_max_angle: int, filter_neighbors: int, host_ip: str, host_port: int):
  36. self.scanfreq = scanfreq
  37. self.motor_rpm = motor_rpm
  38. self.laser_enable = laser_enable
  39. self.resolution = resolution
  40. self.scan_range_start = scan_range_start
  41. self.scan_range_stop = scan_range_stop
  42. self.filter_level = filter_level
  43. self.window = window
  44. self.filter_min_angle = filter_min_angle
  45. self.filter_max_angle = filter_max_angle
  46. self.filter_neighbors = filter_neighbors
  47. self.host_ip = host_ip
  48. self.host_port = host_port
  49. # 雷达角度扫面范围封装类
  50. class ScanRange:
  51. def __init__(self, start: int, stop: int):
  52. self.start = start
  53. self.stop = stop
  54. # 滤波器参数类
  55. class Filter:
  56. def __init__(self, level: int, window: int, min_angle: int, max_angle: int, neighbors: int):
  57. self.level = level
  58. self.window = window
  59. self.min_angle = min_angle
  60. self.max_angle = max_angle
  61. self.neighbors = neighbors
  62. # 主机配置
  63. class Host:
  64. def __init__(self, ip: str, port: int):
  65. self.ip = ip
  66. self.port = port
  67. class LakiBeamHTTP:
  68. HOST_IP_DEFAULT = "192.168.8.1"
  69. HTTP_IP_DEFAULT = "192.168.8.2"
  70. def __init__(self, local_ip: str, local_port: str, web_ip: str, web_port: str):
  71. self.local_ip = local_ip
  72. self.local_port = local_port
  73. self.web_ip = web_ip
  74. self.web_port = web_port
  75. self.base_url = "http://{}:{}".format(web_ip, web_port)
  76. self.check_ipconfig()
  77. # 获取固件信息并填充到firemware
  78. async def get_firemware(self, firemware: Firmware):
  79. goon = True
  80. try:
  81. cmdback = await self.get_async_http('/api/v1/system/firmware')
  82. document = json.loads(cmdback)
  83. if not isinstance(document, dict):
  84. return False
  85. for field in ['model', 'sn', 'fpga', 'core', 'aux']:
  86. if field in document:
  87. setattr(firemware, field, document[field])
  88. else:
  89. goon = False
  90. break
  91. return True
  92. except (requests.RequestException, json.JSONDecodeError) as e:
  93. print(f"Error fetching data: {e}")
  94. return False
  95. # 获取监控信息并填充到monitor
  96. async def get_monitor(self, monitor: Monitor):
  97. try:
  98. cmdback = await self.get_async_http('/api/v1/system/monitor')
  99. document = json.loads(cmdback)
  100. if not isinstance(document, dict):
  101. print("Error: Response is not a dictionary")
  102. return False
  103. required_fields = ['load_average', 'mem_useage', 'uptime']
  104. for field in required_fields:
  105. if field not in document:
  106. print(f"Error: Missing required field {field}")
  107. return False
  108. monitor.load_average = document['load_average']
  109. monitor.men_useage = document['mem_useage']
  110. monitor.uptime = document['uptime']
  111. return True
  112. except json.JSONDecodeError:
  113. print("Error: Failed to decode JSON")
  114. return False
  115. except aiohttp.ClientError as e:
  116. print(f"Network error: {e}")
  117. return False
  118. except Exception as e:
  119. print(f"Unexpected error: {e}")
  120. return False
  121. #从JSON数据中解析网络配置信息
  122. async def get_network(self, network: NetWork):
  123. goon = True
  124. try:
  125. cmdback = await self.get_async_http('/api/v1/system/network')
  126. document = json.loads(cmdback)
  127. if not isinstance(document, dict):
  128. return False
  129. if 'carrier' in document:
  130. network.carrier = document['carrier']
  131. else:
  132. goon = False
  133. if 'duplex' in document:
  134. network.duplex = document['duplex']
  135. else:
  136. goon = False
  137. if 'ethaddr' in document:
  138. network.ethaddr = document['ethaddr']
  139. else:
  140. goon = False
  141. if 'hostname' in document:
  142. network.hostname = document['hostname']
  143. else:
  144. goon = False
  145. if 'ipv4' in document:
  146. ipv4_info = document['ipv4']
  147. if isinstance(ipv4_info, dict):
  148. network.ipv4_dhcp = ipv4_info.get('dhcp', False)
  149. network.ipv4_addr = ipv4_info.get('addr', "")
  150. network.ipv4_override = ipv4_info.get('override', "")
  151. else:
  152. goon = False
  153. if 'speed' in document:
  154. network.speed = document['speed']
  155. return goon
  156. except json.JSONDecodeError:
  157. print("Error decoding JSON response")
  158. return False
  159. except requests.RequestException as e:
  160. print(f"Request error: {e}")
  161. return False
  162. except Exception as e:
  163. print(f"Unexpected error in get_network: {e}")
  164. return False
  165. # 验证IP地址格式合法性、检查IP地址是否为IPv4地址、发生HTTP请求来设置IP地址
  166. def put_override(self, override):
  167. goon = True
  168. result = ""
  169. try:
  170. if not re.match(r'^[0-9.]+$', override):
  171. goon = False
  172. result = "put_override: not ipv4 format"
  173. return goon, result
  174. if "/" not in override:
  175. goon = False
  176. result = "put_override: format is wrong"
  177. return goon, result
  178. ip_part = override.split('/')[0]
  179. try:
  180. ip = ipaddress.ip_address(ip_part)
  181. if not isinstance(ip, ipaddress.IPv4Address):
  182. goon = False
  183. result = "put_override: wrong ip format\r\n"
  184. return goon, result
  185. except ValueError:
  186. goon = False
  187. result = "put_override: wrong ip format\r\n"
  188. return goon, result
  189. if goon:
  190. result = ""
  191. cmdback = self.put_async_http('/api/v1/system/network/override', override)
  192. if 'HTTP/1.1 200 OK' in cmdback:
  193. result = 'HTTP/1.1 200 OK'
  194. else:
  195. goon = False
  196. result = "put_override: operation failed"
  197. except Exception as e:
  198. goon = False
  199. result = f"put_override: error - {str(e)}"
  200. return goon, result
  201. # 删除静态模式配置
  202. def delete_override(self):
  203. goon = True
  204. result = ""
  205. try:
  206. response = requests.delete('/api/v1/system/network/override')
  207. cmdback = response.text
  208. if 'HTTP/1.1 200 OK' in cmdback:
  209. result = 'HTTP/1.1 200 OK'
  210. else:
  211. goon = False
  212. result = "delete_override: operation failed"
  213. except requests.RequestException as e:
  214. goon = False
  215. result = f"delete_override: request error - {str(e)}"
  216. return goon, result
  217. # 执行系统重置
  218. def put_reset(self):
  219. result = ""
  220. goon = True
  221. try:
  222. response = requests.put('/api/v1/system/reset', json={'reset': True})
  223. cmdback = response.text
  224. if 'HTTP/1.1 200 OK' in cmdback:
  225. result = 'HTTP/1.1 200 OK'
  226. else:
  227. goon = False
  228. result = "put_reset: operation failed"
  229. except requests.RequestException as e:
  230. goon = False
  231. result = f"put_reset: request error - {str(e)}"
  232. return goon, result
  233. # 获取传感器的概述信息
  234. async def get_overview(self, overview: OverView):
  235. goon = True
  236. OVERVIEW_FIELDS = {
  237. 'scanfreq': ('scanfreq', int),
  238. 'motor_rpm': ('motor_rpm', int),
  239. 'laser_enable': ('laser_enable', bool),
  240. 'resolution': ('resolution', float),
  241. 'scan_range': ('scan_range', {
  242. 'start': ('start', int),
  243. 'stop': ('stop', int)
  244. }),
  245. 'filter': ('filter', {
  246. 'level': ('level', int),
  247. 'min_angle': ('min_angle', int),
  248. 'max_angle': ('max_angle', int),
  249. 'neighbors': ('neighbors', int),
  250. 'window': ('window', int)
  251. }),
  252. 'host': ('host', {
  253. 'ip': ('ip', str),
  254. 'port': ('port', int)
  255. })
  256. }
  257. try:
  258. cmdback = await self.get_async_http('/api/v1/sensor/overview')
  259. document = json.loads(cmdback)
  260. if not isinstance(document, dict):
  261. return False
  262. for field_name, (json_key, value_type) in OVERVIEW_FIELDS.items():
  263. if isinstance(value_type, dict):
  264. if json_key in document:
  265. nested_data = document[json_key]
  266. for nested_field, (nested_key, nested_type) in value_type.items():
  267. if nested_key in nested_data:
  268. setattr(overview, field_name + '_' + nested_field, nested_type(nested_data[nested_key]))
  269. else:
  270. print(f"Missing nested key: {nested_key}")
  271. goon = False
  272. else:
  273. print(f"Missing JSON key: {json_key}")
  274. goon = False
  275. else:
  276. if json_key in document:
  277. setattr(overview, field_name, value_type(document[json_key]))
  278. else:
  279. goon = False
  280. except json.JSONDecodeError as e:
  281. print(f"JSON Decode Error: {e}")
  282. goon = False
  283. except Exception as e:
  284. print(f"Unexpected error: {e}")
  285. goon = False
  286. return goon
  287. # 设置扫描频率
  288. def put_scanfreq(self, freq):
  289. goon = False
  290. try:
  291. freq_tep = int(freq)
  292. except ValueError:
  293. goon = False
  294. return False, "put_scanfreq: frequency must be an integer"
  295. if freq_tep not in [10, 20, 25, 30]:
  296. goon = False
  297. return False, "put_scanfreq: wrong freq"
  298. if goon:
  299. freq_str = str(freq_tep)
  300. try:
  301. response = requests.put('/api/v1/sensor/scanfreq', json={'freq': freq_str})
  302. cmdback = response.text
  303. if 'HTTP/1.1 200 OK' not in cmdback:
  304. goon = False
  305. return False, "put_scanfreq: operate failed"
  306. except requests.RequestException as e:
  307. goon = False
  308. return False, f"put_scanfreq: request error - {str(e)}"
  309. return goon, ""
  310. # 获取扫描频率
  311. async def get_scanfreq(self):
  312. result = ""
  313. goon = True
  314. try:
  315. cmdback = await self.get_async_http('/api/v1/sensor/scanfreq')
  316. document = json.loads(cmdback)
  317. if not isinstance(document, dict):
  318. goon = False
  319. else:
  320. if 'freq' in document:
  321. output_freq = document['freq']
  322. result = str(output_freq)
  323. else:
  324. goon = False
  325. except requests.RequestException:
  326. goon = False
  327. except json.JSONDecodeError:
  328. goon = False
  329. return goon, result
  330. # 获取激光状态
  331. def get_motor_rpm(self):
  332. motor_rpm = ""
  333. goon = True
  334. try:
  335. cmdback = self.get_async_http('/api/v1/sensor/motor_rpm')
  336. try:
  337. document = json.loads(cmdback)
  338. except json.JSONDecodeError:
  339. return False, ""
  340. if not isinstance(document, dict):
  341. return False, ""
  342. if 'rpm' in document:
  343. try:
  344. output_rpm = int(document['rpm'])
  345. motor_rpm = str(output_rpm)
  346. except (ValueError, TypeError):
  347. goon = False
  348. else:
  349. goon = False
  350. except requests.RequestException:
  351. goon = False
  352. return goon, motor_rpm
  353. # 切换激光状态
  354. async def get_laser_enable(self):
  355. laser_state = ""
  356. goon = True
  357. try:
  358. cmdback = await self.get_async_http('/api/v1/sensor/laser_enable')
  359. try:
  360. document = json.loads(cmdback)
  361. except json.JSONDecodeError:
  362. return False, "false"
  363. if not isinstance(document, dict):
  364. return False, "false"
  365. if 'HTTP/1.1 200 OK' in document:
  366. goon = 'HTTP/1.1 200 OK'
  367. else:
  368. goon = False
  369. except requests.RequestException:
  370. goon = False
  371. laser_state = "true" if goon else "false"
  372. return goon, laser_state
  373. # 获取雷达当前水平角分辨率
  374. def put_laser_enable(self, config_state):
  375. goon = True
  376. if config_state == "true":
  377. config_state = "true"
  378. elif config_state == "false":
  379. config_state = "false"
  380. else:
  381. goon = False
  382. config_state = "put_laser_enable: wrong parameter"
  383. if goon:
  384. try:
  385. cmdback = self.put_async_http('/api/v1/sensor/laser_enable', config_state)
  386. if not cmdback.startswith('HTTP/1.1 200 OK'):
  387. goon = False
  388. config_state = "put_laser_enable: operation failed"
  389. else:
  390. config_state = "true"
  391. except requests.RequestException:
  392. goon = False
  393. config_state = "put_laser_enable: request error"
  394. return goon, config_state
  395. # 获取激光分辨率
  396. async def get_resolution(self, overview: OverView):
  397. resolution = ""
  398. goon = True
  399. try:
  400. cmdback = await self.get_async_http('/api/v1/sensor/resolution')
  401. try:
  402. document = json.loads(cmdback)
  403. except json.JSONDecodeError:
  404. return False, ""
  405. if not isinstance(document, dict):
  406. goon = False
  407. else:
  408. if 'resolution' in document:
  409. output_resolution = document['resolution']
  410. overview.resolution = str(output_resolution)
  411. else:
  412. goon = False
  413. except requests.RequestException:
  414. goon = False
  415. return goon, overview.resolution
  416. # 获取激光扫描起始角度
  417. async def get_scan_range(self, scan_range: ScanRange):
  418. goon = True
  419. try:
  420. cmdback = await self.get_async_http('/api/v1/sensor/scan_range')
  421. try:
  422. document = json.loads(cmdback)
  423. except json.JSONDecodeError:
  424. return False, scan_range
  425. if not isinstance(document, dict):
  426. goon = False
  427. else:
  428. if 'scan_range' in document:
  429. value = document['scan_range']
  430. scan_range.start = value['start']
  431. scan_range.stop = value['stop']
  432. else:
  433. goon = False
  434. except requests.RequestException:
  435. goon = False
  436. return goon, scan_range
  437. # 获取激光扫描起始角度
  438. async def get_laser_start(self, scan_range: ScanRange):
  439. goon = True
  440. try:
  441. cmdback = await self.get_async_http('/api/v1/sensor/scan_range/start')
  442. try:
  443. document = json.loads(cmdback)
  444. if isinstance(document, dict) and 'start' in document:
  445. output_start = document['start']
  446. scan_range.start = output_start
  447. else:
  448. goon = False
  449. except json.JSONDecodeError as e:
  450. return False, scan_range
  451. except Exception as e:
  452. goon = False
  453. return goon, scan_range
  454. # 获取激光扫描停止角度
  455. async def get_laser_stop(self, scan_range: ScanRange):
  456. goon = True
  457. try:
  458. cmdback = await self.get_async_http('/api/v1/sensor/scan_range/stop')
  459. try:
  460. document = json.loads(cmdback)
  461. if isinstance(document, dict) and 'stop' in document:
  462. output_stop = document['stop']
  463. scan_range.stop = output_stop
  464. else:
  465. goon = False
  466. except json.JSONDecodeError as e:
  467. return False, scan_range
  468. except Exception as e:
  469. goon = False
  470. return goon, scan_range
  471. # 设置激光扫描开始角度
  472. def put_laser_start(self, scan_range: ScanRange):
  473. goon = True
  474. error_message = ""
  475. start_freq = scan_range.start
  476. if start_freq > 315 or start_freq < 45:
  477. goon = False
  478. error_message = "put_laser_start: start must < 315 and > 45"
  479. return goon, scan_range, error_message
  480. if goon:
  481. temp_scan_range = ScanRange()
  482. stop_success, temp_scan_range = self.get_laser_stop(temp_scan_range)
  483. if not stop_success:
  484. goon = False
  485. error_message = "Failed to retrieve current stop position"
  486. return goon, scan_range, error_message
  487. if start_freq >= temp_scan_range.stop:
  488. goon = False
  489. error_message = "put_laser_start: start must > stop"
  490. return goon, scan_range, error_message
  491. if goon:
  492. try:
  493. start_str = str(start_freq)
  494. cmdback = self.put_async_http('/api/v1/sensor/scan_range/start', start_str)
  495. if not cmdback.startswith('HTTP/1.1 200 OK'):
  496. goon = False
  497. error_message = "put_laser_start: operation failed"
  498. return goon, scan_range, error_message
  499. except Exception as e:
  500. goon = False
  501. error_message = f"put_laser_start: operation failed - {str(e)}"
  502. return goon, scan_range, error_message
  503. return goon, scan_range, error_message
  504. # 设置激光扫描结束角度
  505. def put_laser_stop(self, scan_range: ScanRange):
  506. goon = True
  507. error_message = ""
  508. stop_freq = scan_range.stop
  509. if stop_freq > 315 or stop_freq < 45:
  510. goon = False
  511. error_message = "put_laser_stop: stop must < 315 and > 45"
  512. return goon, scan_range, error_message
  513. if goon:
  514. try:
  515. cmdback = self.get_async_http('/api/v1/sensor/scan_range/start')
  516. if not cmdback.startswith('HTTP/1.1 200 OK'):
  517. goon = False
  518. error_message = "Failed to retrieve start position"
  519. return goon, scan_range, error_message
  520. try:
  521. document = json.loads(cmdback)
  522. except json.JSONDecodeError:
  523. goon = False
  524. error_message = "Failed to parse start position data"
  525. return goon, scan_range, error_message
  526. if 'start' in document:
  527. start_freq = document['start']
  528. else:
  529. goon = False
  530. error_message = "Start position not found"
  531. return goon, scan_range, error_message
  532. if start_freq >= stop_freq:
  533. goon = False
  534. error_message = "put_laser_stop: stop must > start"
  535. return goon, scan_range, error_message
  536. except requests.RequestException as e:
  537. goon = False
  538. error_message = f"Failed to retrieve start position: {str(e)}"
  539. return goon, scan_range, error_message
  540. if goon:
  541. try:
  542. stop_str = str(stop_freq)
  543. cmdback = self.put_async_http('/api/v1/sensor/scan_range/stop', stop_str)
  544. # 验证响应
  545. if not cmdback.startswith('HTTP/1.1 200 OK'):
  546. goon = False
  547. error_message = "put_laser_stop: operation failed"
  548. return goon, scan_range, error_message
  549. except Exception as e:
  550. goon = False
  551. error_message = f"put_laser_stop: operation failed - {str(e)}"
  552. return goon, scan_range, error_message
  553. return goon, scan_range, error_message
  554. # 获取滤波器的设置
  555. async def get_filter_level(self, filter: Filter):
  556. goon = True
  557. try:
  558. cmdback = await self.get_async_http('/api/v1/sensor/filter')
  559. document = json.loads(cmdback)
  560. if not isinstance(document, dict):
  561. goon = False
  562. else:
  563. if 'filter' in document:
  564. value = document['filter']
  565. filter.level = value['level']
  566. filter.window = value['window']
  567. filter.min_angle = value['min_angle']
  568. filter.max_angle = value['max_angle']
  569. filter.neighbors = value['neighbors']
  570. else:
  571. goon = False
  572. except (requests.RequestException, json.JSONDecodeError) as e:
  573. goon = False
  574. print(f"Error occurred: {str(e)}")
  575. return goon
  576. # 设置过滤级别
  577. def put_filter_level(self, filter: Filter):
  578. try:
  579. level = filter.level
  580. if level not in [0, 1, 2, 3]:
  581. raise ValueError("put_filter_level: wrong level parameter")
  582. level_str = str(level)
  583. cmdback = self.put_async_http('/api/v1/sensor/filter/level', level_str)
  584. if cmdback.startswith('HTTP/1.1 200 OK'):
  585. return True
  586. else:
  587. raise ValueError("put_filter_level: operation failed")
  588. except Exception as e:
  589. print(str(e))
  590. return False
  591. # 获取过滤窗口设置
  592. def get_filter_window(self, filter: Filter):
  593. try:
  594. cmdback = self.get_async_http('/api/v1/sensor/filter/window')
  595. document = json.loads(cmdback)
  596. if 'window' in document:
  597. temp = document['window']
  598. filter.window = temp
  599. return True
  600. else:
  601. return False
  602. except (json.JSONDecodeError, Exception) as e:
  603. print(f"Error in get_filter_window: {e}")
  604. return False
  605. # 设置过滤窗口
  606. def put_filter_window(self, filter: Filter):
  607. try:
  608. window_tep = filter.window
  609. if window_tep < 0:
  610. raise ValueError("put_filter_window: window must > 0")
  611. if window_tep > 10:
  612. raise ValueError("put_filter_window: window must <= 10")
  613. window_str = str(window_tep)
  614. cmdback = self.put_async_http('/api/v1/sensor/filter/window', window_str)
  615. if cmdback.startswith('HTTP/1.1 200 OK'):
  616. return True
  617. else:
  618. raise ValueError("put_filter_window: operation failed")
  619. except Exception as e:
  620. print(str(e))
  621. return False
  622. # 获取过滤最大角度设置并更新Filter对象
  623. def get_filter_max_angle(self, filter: Filter):
  624. try:
  625. cmdback = self.get_async_http('/api/v1/sensor/filter/max_angle')
  626. document = json.loads(cmdback)
  627. # 检查是否为有效的JSON对象
  628. if not isinstance(document, dict):
  629. print("Invalid JSON response: not an object")
  630. return False
  631. if 'max_angle' in document:
  632. temp = document['max_angle']
  633. filter.max_angle = int(temp)
  634. return True
  635. else:
  636. print(f"No key found in response")
  637. return False
  638. except json.JSONDecodeError:
  639. print("Invalid JSON response: unable to parse")
  640. return False
  641. except Exception as e:
  642. print(f"Error in get_filter_max_angle: {e}")
  643. return False
  644. # 设置过滤最大角度
  645. def put_filter_max_angle(self, filter: Filter):
  646. try:
  647. max_angle_tep = filter.max_angle
  648. # 更严格的参数校验
  649. if not isinstance(max_angle_tep, int):
  650. raise TypeError("Max angle must be an integer")
  651. # 检查最大角度值的有效范围(根据实际需求调整)
  652. if max_angle_tep < 0:
  653. raise ValueError("Max angle must be >= 0")
  654. if max_angle_tep > 180: # 假设角度范围在0-180之间
  655. raise ValueError("Max angle must be <= 180")
  656. # 将最大角度值转换为字符串
  657. max_angle_str = str(max_angle_tep)
  658. # 发送异步HTTP请求
  659. cmdback = self.put_async_http('/api/v1/sensor/filter/max_angle', max_angle_str)
  660. # 检查响应是否包含预期的确认信息
  661. if cmdback.startswith('HTTP/1.1 200 OK'):
  662. return True
  663. else:
  664. raise ValueError("Put filter max angle operation failed")
  665. except (ValueError, TypeError) as e:
  666. print(f"Validation error: {e}")
  667. return False
  668. except Exception as e:
  669. print(f"Unexpected error in put_filter_max_angle: {e}")
  670. return False
  671. def get_filter_min_angle(self, filter: Filter):
  672. try:
  673. # 发送HTTP GET请求
  674. cmdback = self.get_async_http('/api/v1/sensor/filter/min_angle')
  675. document = json.loads(cmdback)
  676. # 检查返回的JSON对象是否包含特定的键
  677. if 'min_angle' in document:
  678. temp = document['min_angle']
  679. filter.min_angle = int(temp) # 更新Filter对象的min_angle属性
  680. return True
  681. else:
  682. print("No 'min_angle' key found in response")
  683. return False
  684. except json.JSONDecodeError:
  685. print("Invalid JSON response")
  686. return False
  687. except Exception as e:
  688. print(f"Error in get_filter_min_angle: {e}")
  689. return False
  690. # 设置过滤最小角度
  691. def put_filter_min_angle(self, filter: Filter):
  692. try:
  693. # 获取最小角度值
  694. min_angle_tep = filter.min_angle
  695. if min_angle_tep < 0:
  696. print("put_filter_min_angle: min_angle must >= 0")
  697. return False
  698. if min_angle_tep > 359:
  699. print("put_filter_min_angle: min_angle must <= 359")
  700. return False
  701. max_angle_str = ""
  702. if self.get_filter_max_angle(max_angle_str):
  703. max_angle_tep = int(max_angle_str)
  704. if min_angle_tep > max_angle_tep:
  705. print("put_filter_min_angle: max_angle must >= min_angle")
  706. return False
  707. else:
  708. print("Failed to get max angle for comparison")
  709. return False
  710. min_angle_str = str(min_angle_tep)
  711. cmdback = self.put_async_http('/api/v1/sensor/filter/min_angle', min_angle_str)
  712. if cmdback.startswith('HTTP/1.1 200 OK'):
  713. return True
  714. else:
  715. print("put_filter_min_angle: operation failed")
  716. return False
  717. except Exception as e:
  718. print(f"Unexpected error in put_filter_min_angle: {e}")
  719. return False
  720. # 获取过滤邻居设置并更新Filter对象
  721. def get_filter_neighbors(self, filter: Filter):
  722. try:
  723. cmdback = self.get_async_http('/api/v1/sensor/filter/neighbors')
  724. document = json.loads(cmdback)
  725. if not isinstance(document, dict):
  726. print("Invalid JSON response: not an object")
  727. return False
  728. # 检查返回的JSON对象是否包含特定的键
  729. if 'neighbors' in document:
  730. # 获取邻居数量并更新Filter对象的neighbors属性
  731. temp = document['neighbors']
  732. filter.neighbors = int(temp) # 确保将其转换为整数
  733. return True
  734. else:
  735. return False
  736. except json.JSONDecodeError:
  737. print("Invalid JSON response: unable to parse")
  738. return False
  739. except Exception as e:
  740. print(f"Error in get_filter_neighbors: {e}")
  741. return False
  742. # 设置过滤邻居数量
  743. def put_filter_neighbors(self, filter: Filter):
  744. try:
  745. neighbors_tep = filter.neighbors
  746. if neighbors_tep < 0:
  747. print("put_filter_neighbors: neighbors must > 0")
  748. return False
  749. if neighbors_tep > 10:
  750. print("put_filter_neighbors: neighbors must < 10")
  751. return False
  752. neighbors_str = str(neighbors_tep)
  753. cmdback = self.put_async_http('/api/v1/sensor/filter/neighbors', neighbors_str)
  754. if cmdback.startswith('HTTP/1.1 200 OK'):
  755. return True
  756. else:
  757. print("put_filter_neighbors: operation failed")
  758. return False
  759. except Exception as e:
  760. print(f"Unexpected error in put_filter_neighbors: {e}")
  761. return False
  762. # 获取主机配置并更新Host对象
  763. async def get_host(self, host: Host):
  764. try:
  765. cmdback = await self.get_async_http('/api/v1/sensor/host')
  766. document = json.loads(cmdback)
  767. if not isinstance(document, dict):
  768. print("Invalid JSON response: not an object")
  769. return False
  770. if 'host' in document:
  771. value = document['host']
  772. host.ip = value['ip']
  773. host.port = value['port']
  774. return True
  775. else:
  776. return False
  777. except json.JSONDecodeError:
  778. print("Invalid JSON response: unable to parse")
  779. return False
  780. except Exception as e:
  781. print(f"Error in get_host: {e}")
  782. return False
  783. # 获取主机IP并更新Host对象
  784. async def get_host_ip(self, host: Host):
  785. try:
  786. cmdback = await self.get_async_http('/api/v1/sensor/host/ip')
  787. document = json.loads(cmdback)
  788. if not isinstance(document, dict):
  789. print("Invalid JSON response: not an object")
  790. return False
  791. if 'ip' in document:
  792. host.ip = document['ip']
  793. return True
  794. else:
  795. return False
  796. except json.JSONDecodeError:
  797. print("Invalid JSON response: unable to parse")
  798. return False
  799. except Exception as e:
  800. print(f"Error in get_host_ip: {e}")
  801. return False
  802. # 设置主机IP地址
  803. def put_host_ip(self, host: Host):
  804. try:
  805. ip = host.ip
  806. ip_size = len(ip)
  807. for i in range(ip_size):
  808. if not ((ip[i] >= '0' and ip[i] <= '9') or ip[i] == '.'):
  809. print("put_host_ip: not ipv4 format")
  810. return False
  811. try:
  812. parts = ip.split('.')
  813. if len(parts) != 4:
  814. print("put_host_ip: not ipv4 format")
  815. return False
  816. for part in parts:
  817. num = int(part)
  818. if num < 0 or num > 255:
  819. print("put_host_ip: not ipv4 format")
  820. return False
  821. except (ValueError, IndexError):
  822. print("put_host_ip: not ipv4 format")
  823. return False
  824. cmdback = self.put_async_http('/api/v1/sensor/host/ip', ip)
  825. if cmdback.startswith('HTTP/1.1 200 OK'):
  826. return True
  827. else:
  828. print("put_host_ip: operation failed")
  829. return False
  830. except Exception as e:
  831. print(f"Unexpected error in put_host_ip: {e}")
  832. return False
  833. # 获取主机端口并更新Host对象
  834. async def get_host_port(self, host: Host):
  835. try:
  836. cmdback = await self.get_async_http('/api/v1/sensor/host/port')
  837. document = json.loads(cmdback)
  838. if not isinstance(document, dict):
  839. print("Invalid JSON response: not an object")
  840. return False
  841. if 'port' in document:
  842. temp = document['port']
  843. host.port = int(temp)
  844. return True
  845. else:
  846. return False
  847. except json.JSONDecodeError:
  848. print("Invalid JSON response: unable to parse")
  849. return False
  850. except Exception as e:
  851. print(f"Error in get_host_port: {e}")
  852. return False
  853. # 设置主机端口
  854. def put_host_port(self, host: Host):
  855. goon = True
  856. ip = host.ip
  857. cmdback = self.put_async_http('/api/v1/sensor/host/port', ip)
  858. res = cmdback.find('HTTP/1.1 200 OK')
  859. if res == -1:
  860. goon = False
  861. message = "put_host_port: operation failed"
  862. else:
  863. message = f"Operation succeeded for IP: {ip}"
  864. return goon, message
  865. # 发送 GET 请求并处理响应
  866. def get_http(self, target: str):
  867. try:
  868. full_url = f"{self.base_url}{target}"
  869. headers = {
  870. 'User-Agent': 'Python-Requests',
  871. 'Connection': 'close'
  872. }
  873. response = requests.get(full_url, headers=headers)
  874. response.raise_for_status()
  875. response_text = response.text
  876. start = response_text.find("{")
  877. end = response_text.rfind("}")
  878. if start != -1 and end != -1 and end > start:
  879. cutstring = response_text[start:end + 1]
  880. return cutstring
  881. return ""
  882. except requests.RequestException as e:
  883. print(f"HTTP 请求错误: {e}")
  884. return ""
  885. except Exception as e:
  886. print(f"处理响应时发生错误: {e}")
  887. return ""
  888. # 发送 PUT 请求并处理响应
  889. def put_http(self, target: str, message: str):
  890. try:
  891. full_url = "{}{}".format(self.base_url, target)
  892. headers = {
  893. 'User-Agent': 'Python-Requests',
  894. 'Content-Type': 'application/json',
  895. 'Connection': 'close'
  896. }
  897. response = requests.put(full_url, headers=headers, data=message)
  898. response.raise_for_status()
  899. return response.text
  900. except requests.RequestException as e:
  901. print(f"HTTP 请求错误: {e}")
  902. return ""
  903. except Exception as e:
  904. print(f"处理响应时发生错误: {e}")
  905. return ""
  906. # 发送 DELETE 请求并处理响应
  907. def delete_http(self, target: str):
  908. try:
  909. full_url = "{}{}".format(self.base_url, target)
  910. headers = {
  911. 'User-Agent': 'Python-Requests',
  912. 'Connection': 'close'
  913. }
  914. response = requests.delete(full_url, headers=headers)
  915. response.raise_for_status()
  916. return response.text
  917. except requests.RequestException as e:
  918. print(f"HTTP 请求错误: {e}")
  919. return ""
  920. except Exception as e:
  921. print(f"处理响应时发生错误: {e}")
  922. return ""
  923. # 发送异步 GET 请求并处理响应
  924. async def get_async_http(self, target: str):
  925. full_url = f"{self.base_url}{target}"
  926. try:
  927. async with aiohttp.ClientSession() as session:
  928. async with session.get(full_url) as response:
  929. response_text = await response.text()
  930. return response_text
  931. except aiohttp.ClientError as e:
  932. print(f"HTTP 请求错误: {e}")
  933. return ""
  934. except Exception as e:
  935. print(f"处理响应时发生错误: {e}")
  936. return ""
  937. # 发送异步 PUT 请求并处理响应
  938. async def put_async_http(self, target: str, message: str):
  939. full_url = f"{self.base_url}{target}"
  940. try:
  941. async with aiohttp.ClientSession() as session:
  942. headers = {
  943. 'Content-Type': 'application/json',
  944. 'User-Agent': 'Python-aiohttp',
  945. 'Connection': 'close'
  946. }
  947. async with session.put(full_url,
  948. headers=headers,
  949. data=message,
  950. timeout=aiohttp.ClientTimeout(total=10)) as response:
  951. response.raise_for_status()
  952. response_text = await response.text()
  953. if not response_text:
  954. return "put_async_http: can't connect to http"
  955. start = response_text.find("{")
  956. end = response_text.rfind("}")
  957. if start != -1 and end != -1 and end >= start:
  958. return response_text[start:end + 1]
  959. return response_text
  960. except aiohttp.ClientError as e:
  961. print(f"HTTP 请求错误: {e}")
  962. return f"put_async_http: can't connect to http - {str(e)}"
  963. except asyncio.TimeoutError:
  964. print("请求超时")
  965. return "put_async_http: request timeout"
  966. except Exception as e:
  967. print(f"处理响应时发生错误: {e}")
  968. return f"put_async_http: unexpected error - {str(e)}"
  969. # 发送异步 DELETE 请求并处理响应
  970. async def delete_async_http(self, target: str) -> str:
  971. full_url = f"{self.base_url}{target}"
  972. try:
  973. async with aiohttp.ClientSession() as session:
  974. headers = {
  975. 'User-Agent': 'Python-aiohttp',
  976. 'Connection': 'close'
  977. }
  978. async with session.delete(full_url,
  979. headers=headers,
  980. timeout=aiohttp.ClientTimeout(total=10)) as response:
  981. response.raise_for_status()
  982. response_text = await response.text()
  983. if not response_text:
  984. return "delete_async_http: can't connect to http"
  985. start = response_text.find("{")
  986. end = response_text.rfind("}")
  987. if start != -1 and end != -1 and end >= start:
  988. return response_text[start:end + 1]
  989. return response_text
  990. except aiohttp.ClientError as e:
  991. print(f"HTTP 请求错误: {e}")
  992. return f"delete_async_http: can't connect to http - {str(e)}"
  993. except asyncio.TimeoutError:
  994. print("请求超时")
  995. return "delete_async_http: request timeout"
  996. except Exception as e:
  997. print(f"处理响应时发生错误: {e}")
  998. return f"delete_async_http: unexpected error - {str(e)}"
  999. # 检查本地和目标 IP 地址的有效性
  1000. def check_ipconfig(self) -> bool:
  1001. goon = True
  1002. try:
  1003. try:
  1004. local_ip_obj = ipaddress.ip_address(self.local_ip)
  1005. if not isinstance(local_ip_obj, ipaddress.IPv4Address):
  1006. raise ValueError("Not an IPv4 address")
  1007. except (ValueError, TypeError):
  1008. goon = False
  1009. self.local_ip = self.HOST_IP_DEFAULT
  1010. print(f"Invalid local IP, using default: {self.local_ip}")
  1011. try:
  1012. web_ip_obj = ipaddress.ip_address(self.web_ip)
  1013. if not isinstance(web_ip_obj, ipaddress.IPv4Address):
  1014. raise ValueError("Not an IPv4 address")
  1015. except (ValueError, TypeError):
  1016. goon = False
  1017. self.web_ip = self.HTTP_IP_DEFAULT
  1018. print(f"Invalid web IP, using default: {self.web_ip}")
  1019. except Exception as e:
  1020. print(f"Unexpected error in check_ipconfig: {e}")
  1021. goon = False
  1022. return goon