|
|
@@ -0,0 +1,1196 @@
|
|
|
+import requests
|
|
|
+import json
|
|
|
+import ipaddress
|
|
|
+import re
|
|
|
+import aiohttp # type: ignore
|
|
|
+import asyncio
|
|
|
+
|
|
|
+# 固件信息类
|
|
|
+class Firmware:
|
|
|
+ def __init__(self, model: str, sn: str, fpga: str, core: str, aux: str):
|
|
|
+ self.model = model
|
|
|
+ self.sn = sn
|
|
|
+ self.fpga = fpga
|
|
|
+ self.core = core
|
|
|
+ self.aux = aux
|
|
|
+
|
|
|
+# 系统监控数据类
|
|
|
+class Monitor:
|
|
|
+ def __init__(self, load_average: float, men_useage: float, uptime: float):
|
|
|
+ self.load_average = load_average
|
|
|
+ self.men_useage = men_useage
|
|
|
+ self.uptime = uptime
|
|
|
+
|
|
|
+# 网络信息类
|
|
|
+class NetWork:
|
|
|
+ def __init__(self, carrier: bool, duplex: str, ethaddr: str, hostname: str, ipv4_dhcp: bool, ipv4_addr: str, ipv4_override: str, speed: int):
|
|
|
+ self.carrier = carrier
|
|
|
+ self.duplex = duplex
|
|
|
+ self.ethaddr = ethaddr
|
|
|
+ self.hostname = hostname
|
|
|
+ self.ipv4_dhcp = ipv4_dhcp
|
|
|
+ self.ipv4_addr = ipv4_addr
|
|
|
+ self.ipv4_override = ipv4_override
|
|
|
+ self.speed = speed
|
|
|
+
|
|
|
+# 服务器数据类
|
|
|
+class OverView:
|
|
|
+ def __init__(self, scanfreq: int, motor_rpm: int, laser_enable: bool, resolution: float, scan_range_start: int, scan_range_stop: int, filter_level: int, window: int, filter_min_angle: int,
|
|
|
+ filter_max_angle: int, filter_neighbors: int, host_ip: str, host_port: int):
|
|
|
+ self.scanfreq = scanfreq
|
|
|
+ self.motor_rpm = motor_rpm
|
|
|
+ self.laser_enable = laser_enable
|
|
|
+ self.resolution = resolution
|
|
|
+ self.scan_range_start = scan_range_start
|
|
|
+ self.scan_range_stop = scan_range_stop
|
|
|
+ self.filter_level = filter_level
|
|
|
+ self.window = window
|
|
|
+ self.filter_min_angle = filter_min_angle
|
|
|
+ self.filter_max_angle = filter_max_angle
|
|
|
+ self.filter_neighbors = filter_neighbors
|
|
|
+ self.host_ip = host_ip
|
|
|
+ self.host_port = host_port
|
|
|
+
|
|
|
+# 雷达角度扫面范围封装类
|
|
|
+class ScanRange:
|
|
|
+ def __init__(self, start=None, stop=None):
|
|
|
+ self.start = start
|
|
|
+ self.stop = stop
|
|
|
+
|
|
|
+# 滤波器参数类
|
|
|
+class Filter:
|
|
|
+ def __init__(self, level: int, window: int, min_angle: int, max_angle: int, neighbors: int):
|
|
|
+ self.level = level
|
|
|
+ self.window = window
|
|
|
+ self.min_angle = min_angle
|
|
|
+ self.max_angle = max_angle
|
|
|
+ self.neighbors = neighbors
|
|
|
+
|
|
|
+# 主机配置
|
|
|
+class Host:
|
|
|
+ def __init__(self, ip: str, port: int):
|
|
|
+ self.ip = ip
|
|
|
+ self.port = port
|
|
|
+
|
|
|
+class LakiBeamHTTP:
|
|
|
+ HOST_IP_DEFAULT = "192.168.198.1"
|
|
|
+ HTTP_IP_DEFAULT = "192.168.198.2"
|
|
|
+ def __init__(self, local_ip: str, local_port: str, web_ip: str, web_port: str):
|
|
|
+ self.local_ip = local_ip
|
|
|
+ self.local_port = local_port
|
|
|
+ self.web_ip = web_ip
|
|
|
+ self.web_port = web_port
|
|
|
+ self.base_url = "http://{}:{}".format(web_ip, web_port)
|
|
|
+ self.check_ipconfig()
|
|
|
+
|
|
|
+ # 获取固件信息并填充到firemware
|
|
|
+ async def get_firemware(self, firemware: Firmware):
|
|
|
+ goon = True
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/system/firmware')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ return False
|
|
|
+
|
|
|
+ for field in ['model', 'sn', 'fpga', 'core', 'aux']:
|
|
|
+ if field in document:
|
|
|
+ setattr(firemware, field, document[field])
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+ break
|
|
|
+ return True
|
|
|
+ except (requests.RequestException, json.JSONDecodeError) as e:
|
|
|
+ print(f"Error fetching data: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 获取监控信息并填充到monitor
|
|
|
+ async def get_monitor(self, monitor: Monitor):
|
|
|
+ try:
|
|
|
+
|
|
|
+ cmdback = await self.get_async_http('/api/v1/system/monitor')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ print("Error: Response is not a dictionary")
|
|
|
+ return False
|
|
|
+
|
|
|
+ required_fields = ['load_average', 'mem_useage', 'uptime']
|
|
|
+
|
|
|
+ for field in required_fields:
|
|
|
+ if field not in document:
|
|
|
+ print(f"Error: Missing required field {field}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ monitor.load_average = document['load_average']
|
|
|
+ monitor.men_useage = document['mem_useage']
|
|
|
+ monitor.uptime = document['uptime']
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print("Error: Failed to decode JSON")
|
|
|
+ return False
|
|
|
+ except aiohttp.ClientError as e:
|
|
|
+ print(f"Network error: {e}")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Unexpected error: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ #从JSON数据中解析网络配置信息
|
|
|
+ async def get_network(self, network: NetWork):
|
|
|
+ goon = True
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/system/network')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ return False
|
|
|
+
|
|
|
+ if 'carrier' in document:
|
|
|
+ network.carrier = document['carrier']
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ if 'duplex' in document:
|
|
|
+ network.duplex = document['duplex']
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ if 'ethaddr' in document:
|
|
|
+ network.ethaddr = document['ethaddr']
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ if 'hostname' in document:
|
|
|
+ network.hostname = document['hostname']
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ if 'ipv4' in document:
|
|
|
+ ipv4_info = document['ipv4']
|
|
|
+ if isinstance(ipv4_info, dict):
|
|
|
+ network.ipv4_dhcp = ipv4_info.get('dhcp', False)
|
|
|
+ network.ipv4_addr = ipv4_info.get('addr', "")
|
|
|
+ network.ipv4_override = ipv4_info.get('override', "")
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ if 'speed' in document:
|
|
|
+ network.speed = document['speed']
|
|
|
+
|
|
|
+ return goon
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print("Error decoding JSON response")
|
|
|
+ return False
|
|
|
+ except requests.RequestException as e:
|
|
|
+ print(f"Request error: {e}")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Unexpected error in get_network: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 验证IP地址格式合法性、检查IP地址是否为IPv4地址、发生HTTP请求来设置IP地址
|
|
|
+ def put_override(self, override):
|
|
|
+ goon = True
|
|
|
+ result = ""
|
|
|
+ try:
|
|
|
+ if not re.match(r'^[0-9.]+$', override):
|
|
|
+ goon = False
|
|
|
+ result = "put_override: not ipv4 format"
|
|
|
+ return goon, result
|
|
|
+
|
|
|
+ if "/" not in override:
|
|
|
+ goon = False
|
|
|
+ result = "put_override: format is wrong"
|
|
|
+ return goon, result
|
|
|
+
|
|
|
+ ip_part = override.split('/')[0]
|
|
|
+
|
|
|
+ try:
|
|
|
+ ip = ipaddress.ip_address(ip_part)
|
|
|
+ if not isinstance(ip, ipaddress.IPv4Address):
|
|
|
+ goon = False
|
|
|
+ result = "put_override: wrong ip format\r\n"
|
|
|
+ return goon, result
|
|
|
+ except ValueError:
|
|
|
+ goon = False
|
|
|
+ result = "put_override: wrong ip format\r\n"
|
|
|
+ return goon, result
|
|
|
+
|
|
|
+ if goon:
|
|
|
+ result = ""
|
|
|
+ cmdback = self.put_async_http('/api/v1/system/network/override', override)
|
|
|
+
|
|
|
+ if 'HTTP/1.1 200 OK' in cmdback:
|
|
|
+ result = 'HTTP/1.1 200 OK'
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+ result = "put_override: operation failed"
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ goon = False
|
|
|
+ result = f"put_override: error - {str(e)}"
|
|
|
+
|
|
|
+ return goon, result
|
|
|
+
|
|
|
+ # 删除静态模式配置
|
|
|
+ def delete_override(self):
|
|
|
+ goon = True
|
|
|
+ result = ""
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.delete('/api/v1/system/network/override')
|
|
|
+ cmdback = response.text
|
|
|
+
|
|
|
+ if 'HTTP/1.1 200 OK' in cmdback:
|
|
|
+ result = 'HTTP/1.1 200 OK'
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+ result = "delete_override: operation failed"
|
|
|
+ except requests.RequestException as e:
|
|
|
+ goon = False
|
|
|
+ result = f"delete_override: request error - {str(e)}"
|
|
|
+
|
|
|
+ return goon, result
|
|
|
+
|
|
|
+ # 执行系统重置
|
|
|
+ def put_reset(self):
|
|
|
+ result = ""
|
|
|
+ goon = True
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.put('/api/v1/system/reset', json={'reset': True})
|
|
|
+ cmdback = response.text
|
|
|
+
|
|
|
+ if 'HTTP/1.1 200 OK' in cmdback:
|
|
|
+ result = 'HTTP/1.1 200 OK'
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+ result = "put_reset: operation failed"
|
|
|
+
|
|
|
+ except requests.RequestException as e:
|
|
|
+ goon = False
|
|
|
+ result = f"put_reset: request error - {str(e)}"
|
|
|
+
|
|
|
+ return goon, result
|
|
|
+
|
|
|
+ # 获取传感器的概述信息
|
|
|
+ async def get_overview(self, overview: OverView):
|
|
|
+ goon = True
|
|
|
+ OVERVIEW_FIELDS = {
|
|
|
+ 'scanfreq': ('scanfreq', int),
|
|
|
+ 'motor_rpm': ('motor_rpm', int),
|
|
|
+ 'laser_enable': ('laser_enable', bool),
|
|
|
+ 'resolution': ('resolution', float),
|
|
|
+ 'scan_range': ('scan_range', {
|
|
|
+ 'start': ('start', int),
|
|
|
+ 'stop': ('stop', int)
|
|
|
+ }),
|
|
|
+ 'filter': ('filter', {
|
|
|
+ 'level': ('level', int),
|
|
|
+ 'min_angle': ('min_angle', int),
|
|
|
+ 'max_angle': ('max_angle', int),
|
|
|
+ 'neighbors': ('neighbors', int),
|
|
|
+ 'window': ('window', int)
|
|
|
+ }),
|
|
|
+ 'host': ('host', {
|
|
|
+ 'ip': ('ip', str),
|
|
|
+ 'port': ('port', int)
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/overview')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ return False
|
|
|
+
|
|
|
+ for field_name, (json_key, value_type) in OVERVIEW_FIELDS.items():
|
|
|
+ if isinstance(value_type, dict):
|
|
|
+ if json_key in document:
|
|
|
+ nested_data = document[json_key]
|
|
|
+ for nested_field, (nested_key, nested_type) in value_type.items():
|
|
|
+ if nested_key in nested_data:
|
|
|
+
|
|
|
+ setattr(overview, field_name + '_' + nested_field, nested_type(nested_data[nested_key]))
|
|
|
+ else:
|
|
|
+ print(f"Missing nested key: {nested_key}")
|
|
|
+ goon = False
|
|
|
+ else:
|
|
|
+ print(f"Missing JSON key: {json_key}")
|
|
|
+ goon = False
|
|
|
+ else:
|
|
|
+ if json_key in document:
|
|
|
+
|
|
|
+ setattr(overview, field_name, value_type(document[json_key]))
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f"JSON Decode Error: {e}")
|
|
|
+ goon = False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Unexpected error: {e}")
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ return goon
|
|
|
+
|
|
|
+
|
|
|
+ # 设置扫描频率
|
|
|
+ def put_scanfreq(freq):
|
|
|
+ try:
|
|
|
+ freq_tep = int(freq)
|
|
|
+ except ValueError:
|
|
|
+ return False, "put_scanfreq: frequency must be an integer"
|
|
|
+
|
|
|
+ if freq_tep not in [10, 20, 25, 30]:
|
|
|
+ return False, "put_scanfreq: wrong freq"
|
|
|
+
|
|
|
+ freq_str = str(freq_tep)
|
|
|
+ try:
|
|
|
+ response = requests.put('http://192.168.8.2/api/v1/sensor/scanfreq', data=freq_str)
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ return True, "Frequency updated successfully"
|
|
|
+ else:
|
|
|
+ return False, f"put_scanfreq: operate failed - {response.text}"
|
|
|
+ except requests.RequestException as e:
|
|
|
+ return False, f"put_scanfreq: request error - {str(e)}"
|
|
|
+
|
|
|
+
|
|
|
+ # 获取扫描频率
|
|
|
+ async def get_scanfreq(self):
|
|
|
+ result = ""
|
|
|
+ goon = True
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/scanfreq')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ goon = False
|
|
|
+ else:
|
|
|
+ if 'freq' in document:
|
|
|
+ output_freq = document['freq']
|
|
|
+ result = str(output_freq)
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except requests.RequestException:
|
|
|
+ goon = False
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ return goon, result
|
|
|
+
|
|
|
+ # 获取激光状态
|
|
|
+ def get_motor_rpm(self):
|
|
|
+
|
|
|
+ motor_rpm = ""
|
|
|
+ goon = True
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmdback = self.get_async_http('/api/v1/sensor/motor_rpm')
|
|
|
+ try:
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ return False, ""
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ return False, ""
|
|
|
+
|
|
|
+ if 'rpm' in document:
|
|
|
+ try:
|
|
|
+ output_rpm = int(document['rpm'])
|
|
|
+ motor_rpm = str(output_rpm)
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ goon = False
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except requests.RequestException:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ return goon, motor_rpm
|
|
|
+
|
|
|
+ # 切换激光状态
|
|
|
+ async def get_laser_enable(self):
|
|
|
+ laser_state = ""
|
|
|
+ goon = True
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/laser_enable')
|
|
|
+ try:
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ return False, "false"
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ return False, "false"
|
|
|
+
|
|
|
+ if 'HTTP/1.1 200 OK' in document:
|
|
|
+ goon = 'HTTP/1.1 200 OK'
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except requests.RequestException:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ laser_state = "true" if goon else "false"
|
|
|
+
|
|
|
+ return goon, laser_state
|
|
|
+
|
|
|
+ # 获取雷达当前水平角分辨率
|
|
|
+ def put_laser_enable(self, config_state):
|
|
|
+ goon = True
|
|
|
+
|
|
|
+ if config_state == "true":
|
|
|
+ config_state = "true"
|
|
|
+ elif config_state == "false":
|
|
|
+ config_state = "false"
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+ config_state = "put_laser_enable: wrong parameter"
|
|
|
+
|
|
|
+ if goon:
|
|
|
+ try:
|
|
|
+ cmdback = self.put_async_http('/api/v1/sensor/laser_enable', config_state)
|
|
|
+
|
|
|
+ if not cmdback.startswith('HTTP/1.1 200 OK'):
|
|
|
+ goon = False
|
|
|
+ config_state = "put_laser_enable: operation failed"
|
|
|
+ else:
|
|
|
+ config_state = "true"
|
|
|
+
|
|
|
+ except requests.RequestException:
|
|
|
+ goon = False
|
|
|
+ config_state = "put_laser_enable: request error"
|
|
|
+
|
|
|
+ return goon, config_state
|
|
|
+
|
|
|
+ # 获取激光分辨率
|
|
|
+ async def get_resolution(self, overview: OverView):
|
|
|
+ resolution = ""
|
|
|
+ goon = True
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/resolution')
|
|
|
+
|
|
|
+ try:
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ return False, ""
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ goon = False
|
|
|
+ else:
|
|
|
+ if 'resolution' in document:
|
|
|
+ output_resolution = document['resolution']
|
|
|
+ overview.resolution = str(output_resolution)
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except requests.RequestException:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ return goon, overview.resolution
|
|
|
+
|
|
|
+ # 获取激光扫描起始角度
|
|
|
+ async def get_scan_range(self, scan_range: ScanRange):
|
|
|
+
|
|
|
+ goon = True
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/scan_range')
|
|
|
+
|
|
|
+ try:
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ return False, scan_range
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ goon = False
|
|
|
+ else:
|
|
|
+ if 'scan_range' in document:
|
|
|
+ value = document['scan_range']
|
|
|
+ scan_range.start = value['start']
|
|
|
+ scan_range.stop = value['stop']
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except requests.RequestException:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ return goon, scan_range
|
|
|
+
|
|
|
+ # 获取激光扫描起始角度
|
|
|
+ async def get_laser_start(self, scan_range: ScanRange):
|
|
|
+ goon = True
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/scan_range/start')
|
|
|
+ try:
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ if isinstance(document, dict) and 'start' in document:
|
|
|
+ output_start = document['start']
|
|
|
+ scan_range.start = output_start
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ return False, scan_range
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ return goon, scan_range
|
|
|
+
|
|
|
+ # 获取激光扫描停止角度
|
|
|
+ async def get_laser_stop(self, scan_range: ScanRange):
|
|
|
+ goon = True
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/scan_range/stop')
|
|
|
+ try:
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ if isinstance(document, dict) and 'stop' in document:
|
|
|
+ output_stop = document['stop']
|
|
|
+ scan_range.stop = output_stop
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ return False, scan_range
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ return goon, scan_range
|
|
|
+
|
|
|
+ # 设置激光扫描开始角度
|
|
|
+ async def put_laser_start(self, scan_range: ScanRange):
|
|
|
+ goon = True
|
|
|
+ error_message = ""
|
|
|
+
|
|
|
+ start_freq = scan_range.start
|
|
|
+ if start_freq > 315 or start_freq < 45:
|
|
|
+ goon = False
|
|
|
+ error_message = "put_laser_start: start must < 315 and > 45"
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+ if goon:
|
|
|
+ temp_scan_range = ScanRange()
|
|
|
+ stop_success, temp_scan_range = await self.get_laser_stop(temp_scan_range)
|
|
|
+
|
|
|
+ if not stop_success:
|
|
|
+ goon = False
|
|
|
+ error_message = "Failed to retrieve current stop position"
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+ if start_freq >= temp_scan_range.stop:
|
|
|
+ goon = False
|
|
|
+ error_message = "put_laser_start: start must > stop"
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+ if goon:
|
|
|
+ try:
|
|
|
+ start_str = str(start_freq)
|
|
|
+
|
|
|
+ cmdback = await self.put_async_http('/api/v1/sensor/scan_range/start', start_str)
|
|
|
+
|
|
|
+ if not cmdback.startswith('HTTP/1.1 200 OK'):
|
|
|
+ goon = False
|
|
|
+ error_message = "put_laser_start: operation failed"
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ goon = False
|
|
|
+ error_message = f"put_laser_start: operation failed - {str(e)}"
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+
|
|
|
+ # 设置激光扫描结束角度
|
|
|
+ async def put_laser_stop(self, scan_range: ScanRange):
|
|
|
+ goon = True
|
|
|
+ error_message = ""
|
|
|
+
|
|
|
+ stop_freq = scan_range.stop
|
|
|
+
|
|
|
+ if stop_freq > 315 or stop_freq < 45:
|
|
|
+ goon = False
|
|
|
+ error_message = "put_laser_stop: stop must < 315 and > 45"
|
|
|
+ return goon, scan_range, error_message
|
|
|
+ if goon:
|
|
|
+ try:
|
|
|
+ stop_str = str(stop_freq)
|
|
|
+ cmdback = await self.put_async_http('/api/v1/sensor/scan_range/stop', stop_str)
|
|
|
+
|
|
|
+ # 验证响应
|
|
|
+ if not cmdback.startswith('HTTP/1.1 200 OK'):
|
|
|
+ goon = False
|
|
|
+ error_message = "put_laser_stop: operation failed"
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ goon = False
|
|
|
+ error_message = f"put_laser_stop: operation failed - {str(e)}"
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+ return goon, scan_range, error_message
|
|
|
+
|
|
|
+ # 获取滤波器的设置
|
|
|
+ async def get_filter_level(self, filter: Filter):
|
|
|
+ goon = True
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/filter')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ goon = False
|
|
|
+ else:
|
|
|
+ if 'filter' in document:
|
|
|
+ value = document['filter']
|
|
|
+ filter.level = value['level']
|
|
|
+ filter.window = value['window']
|
|
|
+ filter.min_angle = value['min_angle']
|
|
|
+ filter.max_angle = value['max_angle']
|
|
|
+ filter.neighbors = value['neighbors']
|
|
|
+ else:
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ except (requests.RequestException, json.JSONDecodeError) as e:
|
|
|
+ goon = False
|
|
|
+ print(f"Error occurred: {str(e)}")
|
|
|
+
|
|
|
+ return goon
|
|
|
+
|
|
|
+ # 设置过滤级别
|
|
|
+ async def put_filter_level(self, filter: Filter):
|
|
|
+ try:
|
|
|
+ level = filter.level
|
|
|
+ if level not in [0, 1, 2, 3]:
|
|
|
+ raise ValueError("put_filter_level: wrong level parameter")
|
|
|
+ level_str = str(level)
|
|
|
+ status, response_text = await self.put_async_http('/api/v1/sensor/filter/level', level_str)
|
|
|
+
|
|
|
+ if status == 200:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ raise ValueError(f"put_filter_level: operation failed, status code: {status}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(str(e))
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+ # 获取过滤窗口设置
|
|
|
+ def get_filter_window(self, filter: Filter):
|
|
|
+ try:
|
|
|
+ cmdback = self.get_async_http('/api/v1/sensor/filter/window')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ if 'window' in document:
|
|
|
+ temp = document['window']
|
|
|
+ filter.window = temp
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+ except (json.JSONDecodeError, Exception) as e:
|
|
|
+ print(f"Error in get_filter_window: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 设置过滤窗口
|
|
|
+ async def put_filter_window(self, filter: Filter):
|
|
|
+ try:
|
|
|
+ window_tep = filter.window
|
|
|
+
|
|
|
+ if window_tep < 0:
|
|
|
+ raise ValueError("put_filter_window: window must > 0")
|
|
|
+
|
|
|
+ if window_tep > 10:
|
|
|
+ raise ValueError("put_filter_window: window must <= 10")
|
|
|
+
|
|
|
+ window_str = str(window_tep)
|
|
|
+ status, response_text = await self.put_async_http('/api/v1/sensor/filter/window', window_str)
|
|
|
+
|
|
|
+
|
|
|
+ if status == 200:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ raise ValueError(f"put_filter_level: operation failed, status code: {status}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(str(e))
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+ # 获取过滤最大角度设置并更新Filter对象
|
|
|
+ async def get_filter_max_angle(self, filter: Filter):
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/filter/max_angle')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ # 检查是否为有效的JSON对象
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ print("Invalid JSON response: not an object")
|
|
|
+ return False
|
|
|
+
|
|
|
+ if 'max_angle' in document:
|
|
|
+ temp = document['max_angle']
|
|
|
+ filter.max_angle = int(temp)
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ print(f"No key found in response")
|
|
|
+ return False
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print("Invalid JSON response: unable to parse")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error in get_filter_max_angle: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 设置过滤最大角度
|
|
|
+ async def put_filter_max_angle(self, filter: Filter):
|
|
|
+ try:
|
|
|
+ max_angle_tep = filter.max_angle
|
|
|
+
|
|
|
+ # 更严格的参数校验
|
|
|
+ if not isinstance(max_angle_tep, int):
|
|
|
+ raise TypeError("Max angle must be an integer")
|
|
|
+
|
|
|
+ # 检查最大角度值的有效范围(根据实际需求调整)
|
|
|
+ if max_angle_tep < 0:
|
|
|
+ raise ValueError("Max angle must be >= 0")
|
|
|
+
|
|
|
+ if max_angle_tep > 180: # 假设角度范围在0-180之间
|
|
|
+ raise ValueError("Max angle must be <= 180")
|
|
|
+
|
|
|
+ # 将最大角度值转换为字符串
|
|
|
+ max_angle_str = str(max_angle_tep)
|
|
|
+
|
|
|
+
|
|
|
+ status, response_text = await self.put_async_http('/api/v1/sensor/filter/max_angle', max_angle_str)
|
|
|
+
|
|
|
+ if status == 200:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ raise ValueError(f"put_filter_level: operation failed, status code: {status}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(str(e))
|
|
|
+ return False
|
|
|
+
|
|
|
+ def get_filter_min_angle(self, filter: Filter):
|
|
|
+ try:
|
|
|
+ # 发送HTTP GET请求
|
|
|
+ cmdback = self.get_async_http('/api/v1/sensor/filter/min_angle')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ # 检查返回的JSON对象是否包含特定的键
|
|
|
+ if 'min_angle' in document:
|
|
|
+ temp = document['min_angle']
|
|
|
+ filter.min_angle = int(temp) # 更新Filter对象的min_angle属性
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ print("No 'min_angle' key found in response")
|
|
|
+ return False
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print("Invalid JSON response")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error in get_filter_min_angle: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 设置过滤最小角度
|
|
|
+ async def put_filter_min_angle(self, filter: Filter):
|
|
|
+ try:
|
|
|
+ # 获取最小角度值
|
|
|
+ min_angle_tep = filter.min_angle
|
|
|
+ if min_angle_tep < 0:
|
|
|
+ print("put_filter_min_angle: min_angle must >= 0")
|
|
|
+ return False
|
|
|
+ if min_angle_tep > 359:
|
|
|
+ print("put_filter_min_angle: min_angle must <= 359")
|
|
|
+ return False
|
|
|
+
|
|
|
+ max_angle_result = await self.get_filter_max_angle(filter) # 使用 await 等待结果
|
|
|
+ if max_angle_result:
|
|
|
+ max_angle_tep = filter.max_angle
|
|
|
+ if min_angle_tep > max_angle_tep:
|
|
|
+ print("put_filter_min_angle: max_angle must >= min_angle")
|
|
|
+ return False
|
|
|
+ else:
|
|
|
+ print("Failed to get max angle for comparison")
|
|
|
+ return False
|
|
|
+
|
|
|
+ min_angle_str = str(min_angle_tep)
|
|
|
+ cmdback = await self.put_async_http('/api/v1/sensor/filter/min_angle', min_angle_str) # 使用 await 等待结果
|
|
|
+ # if cmdback.startswith('HTTP/1.1 200 OK'):
|
|
|
+ # return True
|
|
|
+ # else:
|
|
|
+ # print("put_filter_min_angle: operation failed")
|
|
|
+ # return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Unexpected error in put_filter_min_angle: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+ # 获取过滤邻居设置并更新Filter对象
|
|
|
+ def get_filter_neighbors(self, filter: Filter):
|
|
|
+ try:
|
|
|
+ cmdback = self.get_async_http('/api/v1/sensor/filter/neighbors')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ print("Invalid JSON response: not an object")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检查返回的JSON对象是否包含特定的键
|
|
|
+ if 'neighbors' in document:
|
|
|
+ # 获取邻居数量并更新Filter对象的neighbors属性
|
|
|
+ temp = document['neighbors']
|
|
|
+ filter.neighbors = int(temp) # 确保将其转换为整数
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print("Invalid JSON response: unable to parse")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error in get_filter_neighbors: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 设置过滤邻居数量
|
|
|
+ async def put_filter_neighbors(self, filter: Filter):
|
|
|
+ try:
|
|
|
+ neighbors_tep = filter.neighbors
|
|
|
+ if neighbors_tep < 0:
|
|
|
+ print("put_filter_neighbors: neighbors must > 0")
|
|
|
+ return False
|
|
|
+ if neighbors_tep > 10:
|
|
|
+ print("put_filter_neighbors: neighbors must < 10")
|
|
|
+ return False
|
|
|
+ neighbors_str = str(neighbors_tep)
|
|
|
+ status, response_text = await self.put_async_http('/api/v1/sensor/filter/neighbors', neighbors_str)
|
|
|
+ if status == 200:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ print("put_filter_neighbors: operation failed")
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Unexpected error in put_filter_neighbors: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+ # 获取主机配置并更新Host对象
|
|
|
+ async def get_host(self, host: Host):
|
|
|
+ try:
|
|
|
+
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/host')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ print("Invalid JSON response: not an object")
|
|
|
+ return False
|
|
|
+
|
|
|
+ if 'host' in document:
|
|
|
+ value = document['host']
|
|
|
+ host.ip = value['ip']
|
|
|
+ host.port = value['port']
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print("Invalid JSON response: unable to parse")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error in get_host: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 获取主机IP并更新Host对象
|
|
|
+ async def get_host_ip(self, host: Host):
|
|
|
+
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/host/ip')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ print("Invalid JSON response: not an object")
|
|
|
+ return False
|
|
|
+
|
|
|
+ if 'ip' in document:
|
|
|
+ host.ip = document['ip']
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print("Invalid JSON response: unable to parse")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error in get_host_ip: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 设置主机IP地址
|
|
|
+ async def put_host_ip(self, host: Host):
|
|
|
+ try:
|
|
|
+ ip = host.ip
|
|
|
+ ip_size = len(ip)
|
|
|
+ for i in range(ip_size):
|
|
|
+ if not ((ip[i] >= '0' and ip[i] <= '9') or ip[i] == '.'):
|
|
|
+ print("put_host_ip: not ipv4 format")
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ parts = ip.split('.')
|
|
|
+ if len(parts) != 4:
|
|
|
+ print("put_host_ip: not ipv4 format")
|
|
|
+ return False
|
|
|
+
|
|
|
+ for part in parts:
|
|
|
+ num = int(part)
|
|
|
+ if num < 0 or num > 255:
|
|
|
+ print("put_host_ip: not ipv4 format")
|
|
|
+ return False
|
|
|
+
|
|
|
+ except (ValueError, IndexError):
|
|
|
+ print("put_host_ip: not ipv4 format")
|
|
|
+ return False
|
|
|
+
|
|
|
+ status, response_text = await self.put_async_http('/api/v1/sensor/host/ip', ip)
|
|
|
+
|
|
|
+ if status == 200:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ print("put_filter_neighbors: operation failed")
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Unexpected error in put_filter_neighbors: {e}")
|
|
|
+ return False
|
|
|
+ # 获取主机端口并更新Host对象
|
|
|
+ async def get_host_port(self, host: Host):
|
|
|
+ try:
|
|
|
+ cmdback = await self.get_async_http('/api/v1/sensor/host/port')
|
|
|
+ document = json.loads(cmdback)
|
|
|
+
|
|
|
+ if not isinstance(document, dict):
|
|
|
+ print("Invalid JSON response: not an object")
|
|
|
+ return False
|
|
|
+
|
|
|
+ if 'port' in document:
|
|
|
+ temp = document['port']
|
|
|
+ host.port = int(temp)
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print("Invalid JSON response: unable to parse")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error in get_host_port: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 设置主机端口
|
|
|
+ async def put_host_port(self, host: Host):
|
|
|
+ goon = True
|
|
|
+ port = str(host.port)
|
|
|
+ cmdback = await self.put_async_http('/api/v1/sensor/host/port', port)
|
|
|
+
|
|
|
+ res = cmdback.find('HTTP/1.1 200 OK')
|
|
|
+
|
|
|
+ if res == -1:
|
|
|
+ goon = False
|
|
|
+ message = "put_host_port: operation failed"
|
|
|
+ else:
|
|
|
+ message = f"Operation succeeded for IP: {port}"
|
|
|
+
|
|
|
+ return goon, message
|
|
|
+
|
|
|
+ # 发送 GET 请求并处理响应
|
|
|
+ def get_http(self, target: str):
|
|
|
+
|
|
|
+ try:
|
|
|
+ full_url = f"{self.base_url}{target}"
|
|
|
+
|
|
|
+ headers = {
|
|
|
+ 'User-Agent': 'Python-Requests',
|
|
|
+ 'Connection': 'close'
|
|
|
+ }
|
|
|
+
|
|
|
+ response = requests.get(full_url, headers=headers)
|
|
|
+
|
|
|
+ response.raise_for_status()
|
|
|
+
|
|
|
+ response_text = response.text
|
|
|
+
|
|
|
+ start = response_text.find("{")
|
|
|
+ end = response_text.rfind("}")
|
|
|
+
|
|
|
+ if start != -1 and end != -1 and end > start:
|
|
|
+ cutstring = response_text[start:end + 1]
|
|
|
+ return cutstring
|
|
|
+
|
|
|
+ return ""
|
|
|
+
|
|
|
+ except requests.RequestException as e:
|
|
|
+ print(f"HTTP 请求错误: {e}")
|
|
|
+ return ""
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理响应时发生错误: {e}")
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 发送 PUT 请求并处理响应
|
|
|
+ def put_http(self, target: str, message: str):
|
|
|
+ try:
|
|
|
+ full_url = "{}{}".format(self.base_url, target)
|
|
|
+ headers = {
|
|
|
+ 'User-Agent': 'Python-Requests',
|
|
|
+ 'Content-Type': 'application/json',
|
|
|
+ 'Connection': 'close'
|
|
|
+ }
|
|
|
+ response = requests.put(full_url, headers=headers, data=message)
|
|
|
+ response.raise_for_status()
|
|
|
+ return response.text
|
|
|
+ except requests.RequestException as e:
|
|
|
+ print(f"HTTP 请求错误: {e}")
|
|
|
+ return ""
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理响应时发生错误: {e}")
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 发送 DELETE 请求并处理响应
|
|
|
+ def delete_http(self, target: str):
|
|
|
+ try:
|
|
|
+ full_url = "{}{}".format(self.base_url, target)
|
|
|
+ headers = {
|
|
|
+ 'User-Agent': 'Python-Requests',
|
|
|
+ 'Connection': 'close'
|
|
|
+ }
|
|
|
+ response = requests.delete(full_url, headers=headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ return response.text
|
|
|
+ except requests.RequestException as e:
|
|
|
+ print(f"HTTP 请求错误: {e}")
|
|
|
+ return ""
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理响应时发生错误: {e}")
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 发送异步 GET 请求并处理响应
|
|
|
+ async def get_async_http(self, target: str):
|
|
|
+ full_url = f"{self.base_url}{target}"
|
|
|
+ try:
|
|
|
+ async with aiohttp.ClientSession() as session:
|
|
|
+ async with session.get(full_url) as response:
|
|
|
+ response_text = await response.text()
|
|
|
+ return response_text
|
|
|
+ except aiohttp.ClientError as e:
|
|
|
+ print(f"HTTP 请求错误: {e}")
|
|
|
+ return ""
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理响应时发生错误: {e}")
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 发送异步 PUT 请求并处理响应
|
|
|
+ async def put_async_http(self, target: str, message: str):
|
|
|
+ full_url = f"{self.base_url}{target}"
|
|
|
+ try:
|
|
|
+ async with aiohttp.ClientSession() as session:
|
|
|
+ headers = {
|
|
|
+ 'Content-Type': 'application/json',
|
|
|
+ 'User-Agent': 'Python-aiohttp',
|
|
|
+ 'Connection': 'close'
|
|
|
+ }
|
|
|
+
|
|
|
+ async with session.put(full_url,
|
|
|
+ headers=headers,
|
|
|
+ data=message,
|
|
|
+ timeout=aiohttp.ClientTimeout(total=10)) as response:
|
|
|
+ response.raise_for_status()
|
|
|
+ response_text = await response.json()
|
|
|
+ return response.status, response_text
|
|
|
+
|
|
|
+ except aiohttp.ClientResponseError as e:
|
|
|
+ print(f"HTTP 请求错误: {e}")
|
|
|
+ return e.status, f"put_async_http: can't connect to http - {str(e)}"
|
|
|
+ except asyncio.TimeoutError:
|
|
|
+ print("请求超时")
|
|
|
+ return 408, "put_async_http: request timeout"
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理响应时发生错误: {e}")
|
|
|
+ return 500, f"put_async_http: unexpected error - {str(e)}"
|
|
|
+
|
|
|
+
|
|
|
+ # 发送异步 DELETE 请求并处理响应
|
|
|
+ async def delete_async_http(self, target: str) -> str:
|
|
|
+ full_url = f"{self.base_url}{target}"
|
|
|
+
|
|
|
+ try:
|
|
|
+ async with aiohttp.ClientSession() as session:
|
|
|
+ headers = {
|
|
|
+ 'User-Agent': 'Python-aiohttp',
|
|
|
+ 'Connection': 'close'
|
|
|
+ }
|
|
|
+ async with session.delete(full_url,
|
|
|
+ headers=headers,
|
|
|
+ timeout=aiohttp.ClientTimeout(total=10)) as response:
|
|
|
+ response.raise_for_status()
|
|
|
+ response_text = await response.text()
|
|
|
+ if not response_text:
|
|
|
+ return "delete_async_http: can't connect to http"
|
|
|
+ start = response_text.find("{")
|
|
|
+ end = response_text.rfind("}")
|
|
|
+
|
|
|
+ if start != -1 and end != -1 and end >= start:
|
|
|
+ return response_text[start:end + 1]
|
|
|
+
|
|
|
+ return response_text
|
|
|
+
|
|
|
+ except aiohttp.ClientError as e:
|
|
|
+ print(f"HTTP 请求错误: {e}")
|
|
|
+ return f"delete_async_http: can't connect to http - {str(e)}"
|
|
|
+ except asyncio.TimeoutError:
|
|
|
+ print("请求超时")
|
|
|
+ return "delete_async_http: request timeout"
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理响应时发生错误: {e}")
|
|
|
+ return f"delete_async_http: unexpected error - {str(e)}"
|
|
|
+
|
|
|
+ # 检查本地和目标 IP 地址的有效性
|
|
|
+ def check_ipconfig(self) -> bool:
|
|
|
+ goon = True
|
|
|
+
|
|
|
+ try:
|
|
|
+ try:
|
|
|
+ local_ip_obj = ipaddress.ip_address(self.local_ip)
|
|
|
+ if not isinstance(local_ip_obj, ipaddress.IPv4Address):
|
|
|
+ raise ValueError("Not an IPv4 address")
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ goon = False
|
|
|
+ self.local_ip = self.HOST_IP_DEFAULT
|
|
|
+ print(f"Invalid local IP, using default: {self.local_ip}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ web_ip_obj = ipaddress.ip_address(self.web_ip)
|
|
|
+ if not isinstance(web_ip_obj, ipaddress.IPv4Address):
|
|
|
+ raise ValueError("Not an IPv4 address")
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ goon = False
|
|
|
+ self.web_ip = self.HTTP_IP_DEFAULT
|
|
|
+ print(f"Invalid web IP, using default: {self.web_ip}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Unexpected error in check_ipconfig: {e}")
|
|
|
+ goon = False
|
|
|
+
|
|
|
+ return goon
|