||
- import requests
- import json
- import ipaddress
- import re
- import aiohttp # type: ignore
- import asyncio
- # 固件信息类
- class Firmware:
- def __init__(self, model: str, sn: str, fpga: str, core: str, aux: str):
- self.model = model
- self.sn = sn
- self.fpga = fpga
- self.core = core
- self.aux = aux
- # 系统监控数据类
- class Monitor:
- def __init__(self, load_average: float, men_useage: float, uptime: float):
- self.load_average = load_average
- self.men_useage = men_useage
- self.uptime = uptime
- # 网络信息类
- class NetWork:
- def __init__(self, carrier: bool, duplex: str, ethaddr: str, hostname: str, ipv4_dhcp: bool, ipv4_addr: str, ipv4_override: str, speed: int):
- self.carrier = carrier
- self.duplex = duplex
- self.ethaddr = ethaddr
- self.hostname = hostname
- self.ipv4_dhcp = ipv4_dhcp
- self.ipv4_addr = ipv4_addr
- self.ipv4_override = ipv4_override
- self.speed = speed
- # 服务器数据类
- class OverView:
- def __init__(self, scanfreq: int, motor_rpm: int, laser_enable: bool, resolution: float, scan_range_start: int, scan_range_stop: int, filter_level: int, window: int, filter_min_angle: int,
- filter_max_angle: int, filter_neighbors: int, host_ip: str, host_port: int):
- self.scanfreq = scanfreq
- self.motor_rpm = motor_rpm
- self.laser_enable = laser_enable
- self.resolution = resolution
- self.scan_range_start = scan_range_start
- self.scan_range_stop = scan_range_stop
- self.filter_level = filter_level
- self.window = window
- self.filter_min_angle = filter_min_angle
- self.filter_max_angle = filter_max_angle
- self.filter_neighbors = filter_neighbors
- self.host_ip = host_ip
- self.host_port = host_port
- # 雷达角度扫面范围封装类
- class ScanRange:
- def __init__(self, start: int, stop: int):
- self.start = start
- self.stop = stop
- # 滤波器参数类
- class Filter:
- def __init__(self, level: int, window: int, min_angle: int, max_angle: int, neighbors: int):
- self.level = level
- self.window = window
- self.min_angle = min_angle
- self.max_angle = max_angle
- self.neighbors = neighbors
- # 主机配置
- class Host:
- def __init__(self, ip: str, port: int):
- self.ip = ip
- self.port = port
- class LakiBeamHTTP:
- HOST_IP_DEFAULT = "192.168.8.1"
- HTTP_IP_DEFAULT = "192.168.8.2"
- def __init__(self, local_ip: str, local_port: str, web_ip: str, web_port: str):
- self.local_ip = local_ip
- self.local_port = local_port
- self.web_ip = web_ip
- self.web_port = web_port
- self.base_url = "http://{}:{}".format(web_ip, web_port)
- self.check_ipconfig()
- # 获取固件信息并填充到firemware
- async def get_firemware(self, firemware: Firmware):
- goon = True
- try:
- cmdback = await self.get_async_http('/api/v1/system/firmware')
- document = json.loads(cmdback)
- if not isinstance(document, dict):
- return False
- for field in ['model', 'sn', 'fpga', 'core', 'aux']:
- if field in document:
- setattr(firemware, field, document[field])
- else:
- goon = False
- break
- return True
- except (requests.RequestException, json.JSONDecodeError) as e:
- print(f"Error fetching data: {e}")
- return False
-
- # 获取监控信息并填充到monitor
- async def get_monitor(self, monitor: Monitor):
- try:
- cmdback = await self.get_async_http('/api/v1/system/monitor')
- document = json.loads(cmdback)
- if not isinstance(document, dict):
- print("Error: Response is not a dictionary")
- return False
- required_fields = ['load_average', 'mem_useage', 'uptime']
- for field in required_fields:
- if field not in document:
- print(f"Error: Missing required field {field}")
- return False
- monitor.load_average = document['load_average']
- monitor.men_useage = document['mem_useage']
- monitor.uptime = document['uptime']
- return True
- except json.JSONDecodeError:
- print("Error: Failed to decode JSON")
- return False
- except aiohttp.ClientError as e:
- print(f"Network error: {e}")
- return False
- except Exception as e:
- print(f"Unexpected error: {e}")
- return False
- #从JSON数据中解析网络配置信息
- async def get_network(self, network: NetWork):
- goon = True
- try:
- cmdback = await self.get_async_http('/api/v1/system/network')
- document = json.loads(cmdback)
-
- if not isinstance(document, dict):
- return False
- if 'carrier' in document:
- network.carrier = document['carrier']
- else:
- goon = False
-
- if 'duplex' in document:
- network.duplex = document['duplex']
- else:
- goon = False
-
- if 'ethaddr' in document:
- network.ethaddr = document['ethaddr']
- else:
- goon = False
-
- if 'hostname' in document:
- network.hostname = document['hostname']
- else:
- goon = False
-
- if 'ipv4' in document:
- ipv4_info = document['ipv4']
- if isinstance(ipv4_info, dict):
- network.ipv4_dhcp = ipv4_info.get('dhcp', False)
- network.ipv4_addr = ipv4_info.get('addr', "")
- network.ipv4_override = ipv4_info.get('override', "")
- else:
- goon = False
-
- if 'speed' in document:
- network.speed = document['speed']
-
- return goon
- except json.JSONDecodeError:
- print("Error decoding JSON response")
- return False
- except requests.RequestException as e:
- print(f"Request error: {e}")
- return False
- except Exception as e:
- print(f"Unexpected error in get_network: {e}")
- return False
-
- # 验证IP地址格式合法性、检查IP地址是否为IPv4地址、发生HTTP请求来设置IP地址
- def put_override(self, override):
- goon = True
- result = ""
- try:
- if not re.match(r'^[0-9.]+$', override):
- goon = False
- result = "put_override: not ipv4 format"
- return goon, result
- if "/" not in override:
- goon = False
- result = "put_override: format is wrong"
- return goon, result
- ip_part = override.split('/')[0]
-
- try:
- ip = ipaddress.ip_address(ip_part)
- if not isinstance(ip, ipaddress.IPv4Address):
- goon = False
- result = "put_override: wrong ip format\r\n"
- return goon, result
- except ValueError:
- goon = False
- result = "put_override: wrong ip format\r\n"
- return goon, result
- if goon:
- result = ""
- cmdback = self.put_async_http('/api/v1/system/network/override', override)
- if 'HTTP/1.1 200 OK' in cmdback:
- result = 'HTTP/1.1 200 OK'
- else:
- goon = False
- result = "put_override: operation failed"
- except Exception as e:
- goon = False
- result = f"put_override: error - {str(e)}"
- return goon, result
-
- # 删除静态模式配置
- def delete_override(self):
- goon = True
- result = ""
- try:
- response = requests.delete('/api/v1/system/network/override')
- cmdback = response.text
-
- if 'HTTP/1.1 200 OK' in cmdback:
- result = 'HTTP/1.1 200 OK'
- else:
- goon = False
- result = "delete_override: operation failed"
- except requests.RequestException as e:
- goon = False
- result = f"delete_override: request error - {str(e)}"
- return goon, result
- # 执行系统重置
- def put_reset(self):
- result = ""
- goon = True
-
- try:
- response = requests.put('/api/v1/system/reset', json={'reset': True})
- cmdback = response.text
-
- if 'HTTP/1.1 200 OK' in cmdback:
- result = 'HTTP/1.1 200 OK'
- else:
- goon = False
- result = "put_reset: operation failed"
-
- except requests.RequestException as e:
- goon = False
- result = f"put_reset: request error - {str(e)}"
- return goon, result
- # 获取传感器的概述信息
- async def get_overview(self, overview: OverView):
- goon = True
- OVERVIEW_FIELDS = {
- 'scanfreq': ('scanfreq', int),
- 'motor_rpm': ('motor_rpm', int),
- 'laser_enable': ('laser_enable', bool),
- 'resolution': ('resolution', float),
- 'scan_range': ('scan_range', {
- 'start': ('start', int),
- 'stop': ('stop', int)
- }),
- 'filter': ('filter', {
- 'level': ('level', int),
- 'min_angle': ('min_angle', int),
- 'max_angle': ('max_angle', int),
- 'neighbors': ('neighbors', int),
- 'window': ('window', int)
- }),
- 'host': ('host', {
- 'ip': ('ip', str),
- 'port': ('port', int)
- })
- }
-
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/overview')
- document = json.loads(cmdback)
-
-
- if not isinstance(document, dict):
- return False
- for field_name, (json_key, value_type) in OVERVIEW_FIELDS.items():
- if isinstance(value_type, dict):
- if json_key in document:
- nested_data = document[json_key]
- for nested_field, (nested_key, nested_type) in value_type.items():
- if nested_key in nested_data:
- setattr(overview, field_name + '_' + nested_field, nested_type(nested_data[nested_key]))
- else:
- print(f"Missing nested key: {nested_key}")
- goon = False
- else:
- print(f"Missing JSON key: {json_key}")
- goon = False
- else:
- if json_key in document:
- setattr(overview, field_name, value_type(document[json_key]))
- else:
- goon = False
- except json.JSONDecodeError as e:
- print(f"JSON Decode Error: {e}")
- goon = False
- except Exception as e:
- print(f"Unexpected error: {e}")
- goon = False
- return goon
-
- # 设置扫描频率
- def put_scanfreq(self, freq):
- goon = False
- try:
- freq_tep = int(freq)
- except ValueError:
- goon = False
- return False, "put_scanfreq: frequency must be an integer"
- if freq_tep not in [10, 20, 25, 30]:
- goon = False
- return False, "put_scanfreq: wrong freq"
-
- if goon:
- freq_str = str(freq_tep)
- try:
- response = requests.put('/api/v1/sensor/scanfreq', json={'freq': freq_str})
- cmdback = response.text
-
- if 'HTTP/1.1 200 OK' not in cmdback:
- goon = False
- return False, "put_scanfreq: operate failed"
- except requests.RequestException as e:
- goon = False
- return False, f"put_scanfreq: request error - {str(e)}"
- return goon, ""
- # 获取扫描频率
- async def get_scanfreq(self):
- result = ""
- goon = True
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/scanfreq')
- document = json.loads(cmdback)
- if not isinstance(document, dict):
- goon = False
- else:
- if 'freq' in document:
- output_freq = document['freq']
- result = str(output_freq)
- else:
- goon = False
- except requests.RequestException:
- goon = False
- except json.JSONDecodeError:
- goon = False
- return goon, result
- # 获取激光状态
- def get_motor_rpm(self):
- motor_rpm = ""
- goon = True
-
- try:
- cmdback = self.get_async_http('/api/v1/sensor/motor_rpm')
- try:
- document = json.loads(cmdback)
- except json.JSONDecodeError:
- return False, ""
-
- if not isinstance(document, dict):
- return False, ""
-
- if 'rpm' in document:
- try:
- output_rpm = int(document['rpm'])
- motor_rpm = str(output_rpm)
- except (ValueError, TypeError):
- goon = False
- else:
- goon = False
- except requests.RequestException:
- goon = False
-
- return goon, motor_rpm
- # 切换激光状态
- async def get_laser_enable(self):
- laser_state = ""
- goon = True
-
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/laser_enable')
- try:
- document = json.loads(cmdback)
- except json.JSONDecodeError:
- return False, "false"
-
- if not isinstance(document, dict):
- return False, "false"
-
- if 'HTTP/1.1 200 OK' in document:
- goon = 'HTTP/1.1 200 OK'
- else:
- goon = False
- except requests.RequestException:
- goon = False
- laser_state = "true" if goon else "false"
-
- return goon, laser_state
- # 获取雷达当前水平角分辨率
- def put_laser_enable(self, config_state):
- goon = True
-
- if config_state == "true":
- config_state = "true"
- elif config_state == "false":
- config_state = "false"
- else:
- goon = False
- config_state = "put_laser_enable: wrong parameter"
-
- if goon:
- try:
- cmdback = self.put_async_http('/api/v1/sensor/laser_enable', config_state)
-
- if not cmdback.startswith('HTTP/1.1 200 OK'):
- goon = False
- config_state = "put_laser_enable: operation failed"
- else:
- config_state = "true"
- except requests.RequestException:
- goon = False
- config_state = "put_laser_enable: request error"
- return goon, config_state
- # 获取激光分辨率
- async def get_resolution(self, overview: OverView):
- resolution = ""
- goon = True
-
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/resolution')
-
- try:
- document = json.loads(cmdback)
- except json.JSONDecodeError:
- return False, ""
-
- if not isinstance(document, dict):
- goon = False
- else:
- if 'resolution' in document:
- output_resolution = document['resolution']
- overview.resolution = str(output_resolution)
- else:
- goon = False
- except requests.RequestException:
- goon = False
-
- return goon, overview.resolution
- # 获取激光扫描起始角度
- async def get_scan_range(self, scan_range: ScanRange):
- goon = True
-
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/scan_range')
- try:
- document = json.loads(cmdback)
- except json.JSONDecodeError:
- return False, scan_range
- if not isinstance(document, dict):
- goon = False
- else:
- if 'scan_range' in document:
- value = document['scan_range']
- scan_range.start = value['start']
- scan_range.stop = value['stop']
- else:
- goon = False
- except requests.RequestException:
- goon = False
-
- return goon, scan_range
- # 获取激光扫描起始角度
- async def get_laser_start(self, scan_range: ScanRange):
- goon = True
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/scan_range/start')
- try:
- document = json.loads(cmdback)
- if isinstance(document, dict) and 'start' in document:
- output_start = document['start']
- scan_range.start = output_start
- else:
- goon = False
- except json.JSONDecodeError as e:
- return False, scan_range
- except Exception as e:
- goon = False
- return goon, scan_range
- # 获取激光扫描停止角度
- async def get_laser_stop(self, scan_range: ScanRange):
- goon = True
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/scan_range/stop')
- try:
- document = json.loads(cmdback)
- if isinstance(document, dict) and 'stop' in document:
- output_stop = document['stop']
- scan_range.stop = output_stop
- else:
- goon = False
- except json.JSONDecodeError as e:
- return False, scan_range
- except Exception as e:
- goon = False
- return goon, scan_range
- # 设置激光扫描开始角度
- def put_laser_start(self, scan_range: ScanRange):
- goon = True
- error_message = ""
- start_freq = scan_range.start
- if start_freq > 315 or start_freq < 45:
- goon = False
- error_message = "put_laser_start: start must < 315 and > 45"
- return goon, scan_range, error_message
- if goon:
- temp_scan_range = ScanRange()
- stop_success, temp_scan_range = self.get_laser_stop(temp_scan_range)
-
- if not stop_success:
- goon = False
- error_message = "Failed to retrieve current stop position"
- return goon, scan_range, error_message
- if start_freq >= temp_scan_range.stop:
- goon = False
- error_message = "put_laser_start: start must > stop"
- return goon, scan_range, error_message
- if goon:
- try:
- start_str = str(start_freq)
- cmdback = self.put_async_http('/api/v1/sensor/scan_range/start', start_str)
-
- if not cmdback.startswith('HTTP/1.1 200 OK'):
- goon = False
- error_message = "put_laser_start: operation failed"
- return goon, scan_range, error_message
- except Exception as e:
- goon = False
- error_message = f"put_laser_start: operation failed - {str(e)}"
- return goon, scan_range, error_message
- return goon, scan_range, error_message
- # 设置激光扫描结束角度
- def put_laser_stop(self, scan_range: ScanRange):
- goon = True
- error_message = ""
- stop_freq = scan_range.stop
-
- if stop_freq > 315 or stop_freq < 45:
- goon = False
- error_message = "put_laser_stop: stop must < 315 and > 45"
- return goon, scan_range, error_message
- if goon:
- try:
- cmdback = self.get_async_http('/api/v1/sensor/scan_range/start')
-
- if not cmdback.startswith('HTTP/1.1 200 OK'):
- goon = False
- error_message = "Failed to retrieve start position"
- return goon, scan_range, error_message
- try:
- document = json.loads(cmdback)
- except json.JSONDecodeError:
- goon = False
- error_message = "Failed to parse start position data"
- return goon, scan_range, error_message
- if 'start' in document:
- start_freq = document['start']
- else:
- goon = False
- error_message = "Start position not found"
- return goon, scan_range, error_message
- if start_freq >= stop_freq:
- goon = False
- error_message = "put_laser_stop: stop must > start"
- return goon, scan_range, error_message
- except requests.RequestException as e:
- goon = False
- error_message = f"Failed to retrieve start position: {str(e)}"
- return goon, scan_range, error_message
-
- if goon:
- try:
- stop_str = str(stop_freq)
- cmdback = self.put_async_http('/api/v1/sensor/scan_range/stop', stop_str)
-
- # 验证响应
- if not cmdback.startswith('HTTP/1.1 200 OK'):
- goon = False
- error_message = "put_laser_stop: operation failed"
- return goon, scan_range, error_message
- except Exception as e:
- goon = False
- error_message = f"put_laser_stop: operation failed - {str(e)}"
- return goon, scan_range, error_message
- return goon, scan_range, error_message
- # 获取滤波器的设置
- async def get_filter_level(self, filter: Filter):
- goon = True
-
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/filter')
- document = json.loads(cmdback)
-
- if not isinstance(document, dict):
- goon = False
- else:
- if 'filter' in document:
- value = document['filter']
- filter.level = value['level']
- filter.window = value['window']
- filter.min_angle = value['min_angle']
- filter.max_angle = value['max_angle']
- filter.neighbors = value['neighbors']
- else:
- goon = False
- except (requests.RequestException, json.JSONDecodeError) as e:
- goon = False
- print(f"Error occurred: {str(e)}")
- return goon
- # 设置过滤级别
- def put_filter_level(self, filter: Filter):
- try:
- level = filter.level
- if level not in [0, 1, 2, 3]:
- raise ValueError("put_filter_level: wrong level parameter")
- level_str = str(level)
- cmdback = self.put_async_http('/api/v1/sensor/filter/level', level_str)
- if cmdback.startswith('HTTP/1.1 200 OK'):
- return True
- else:
- raise ValueError("put_filter_level: operation failed")
-
- except Exception as e:
- print(str(e))
- return False
- # 获取过滤窗口设置
- def get_filter_window(self, filter: Filter):
- try:
- cmdback = self.get_async_http('/api/v1/sensor/filter/window')
- document = json.loads(cmdback)
-
- if 'window' in document:
- temp = document['window']
- filter.window = temp
- return True
- else:
- return False
-
- except (json.JSONDecodeError, Exception) as e:
- print(f"Error in get_filter_window: {e}")
- return False
- # 设置过滤窗口
- def put_filter_window(self, filter: Filter):
- try:
- window_tep = filter.window
-
- if window_tep < 0:
- raise ValueError("put_filter_window: window must > 0")
-
- if window_tep > 10:
- raise ValueError("put_filter_window: window must <= 10")
-
- window_str = str(window_tep)
-
- cmdback = self.put_async_http('/api/v1/sensor/filter/window', window_str)
-
- if cmdback.startswith('HTTP/1.1 200 OK'):
- return True
- else:
- raise ValueError("put_filter_window: operation failed")
-
- except Exception as e:
- print(str(e))
- return False
- # 获取过滤最大角度设置并更新Filter对象
- def get_filter_max_angle(self, filter: Filter):
- try:
- cmdback = self.get_async_http('/api/v1/sensor/filter/max_angle')
- document = json.loads(cmdback)
-
- # 检查是否为有效的JSON对象
- if not isinstance(document, dict):
- print("Invalid JSON response: not an object")
- return False
-
- if 'max_angle' in document:
- temp = document['max_angle']
- filter.max_angle = int(temp)
- return True
- else:
- print(f"No key found in response")
- return False
-
- except json.JSONDecodeError:
- print("Invalid JSON response: unable to parse")
- return False
- except Exception as e:
- print(f"Error in get_filter_max_angle: {e}")
- return False
-
- # 设置过滤最大角度
- def put_filter_max_angle(self, filter: Filter):
- try:
- max_angle_tep = filter.max_angle
-
- # 更严格的参数校验
- if not isinstance(max_angle_tep, int):
- raise TypeError("Max angle must be an integer")
-
- # 检查最大角度值的有效范围(根据实际需求调整)
- if max_angle_tep < 0:
- raise ValueError("Max angle must be >= 0")
-
- if max_angle_tep > 180: # 假设角度范围在0-180之间
- raise ValueError("Max angle must be <= 180")
-
- # 将最大角度值转换为字符串
- max_angle_str = str(max_angle_tep)
-
- # 发送异步HTTP请求
- cmdback = self.put_async_http('/api/v1/sensor/filter/max_angle', max_angle_str)
-
- # 检查响应是否包含预期的确认信息
- if cmdback.startswith('HTTP/1.1 200 OK'):
- return True
- else:
- raise ValueError("Put filter max angle operation failed")
-
- except (ValueError, TypeError) as e:
- print(f"Validation error: {e}")
- return False
- except Exception as e:
- print(f"Unexpected error in put_filter_max_angle: {e}")
- return False
- def get_filter_min_angle(self, filter: Filter):
- try:
- # 发送HTTP GET请求
- cmdback = self.get_async_http('/api/v1/sensor/filter/min_angle')
- document = json.loads(cmdback)
-
- # 检查返回的JSON对象是否包含特定的键
- if 'min_angle' in document:
- temp = document['min_angle']
- filter.min_angle = int(temp) # 更新Filter对象的min_angle属性
- return True
- else:
- print("No 'min_angle' key found in response")
- return False
-
- except json.JSONDecodeError:
- print("Invalid JSON response")
- return False
- except Exception as e:
- print(f"Error in get_filter_min_angle: {e}")
- return False
- # 设置过滤最小角度
- def put_filter_min_angle(self, filter: Filter):
- try:
- # 获取最小角度值
- min_angle_tep = filter.min_angle
- if min_angle_tep < 0:
- print("put_filter_min_angle: min_angle must >= 0")
- return False
- if min_angle_tep > 359:
- print("put_filter_min_angle: min_angle must <= 359")
- return False
- max_angle_str = ""
- if self.get_filter_max_angle(max_angle_str):
- max_angle_tep = int(max_angle_str)
- if min_angle_tep > max_angle_tep:
- print("put_filter_min_angle: max_angle must >= min_angle")
- return False
- else:
- print("Failed to get max angle for comparison")
- return False
- min_angle_str = str(min_angle_tep)
- cmdback = self.put_async_http('/api/v1/sensor/filter/min_angle', min_angle_str)
- if cmdback.startswith('HTTP/1.1 200 OK'):
- return True
- else:
- print("put_filter_min_angle: operation failed")
- return False
-
- except Exception as e:
- print(f"Unexpected error in put_filter_min_angle: {e}")
- return False
- # 获取过滤邻居设置并更新Filter对象
- def get_filter_neighbors(self, filter: Filter):
- try:
- cmdback = self.get_async_http('/api/v1/sensor/filter/neighbors')
- document = json.loads(cmdback)
- if not isinstance(document, dict):
- print("Invalid JSON response: not an object")
- return False
-
- # 检查返回的JSON对象是否包含特定的键
- if 'neighbors' in document:
- # 获取邻居数量并更新Filter对象的neighbors属性
- temp = document['neighbors']
- filter.neighbors = int(temp) # 确保将其转换为整数
- return True
- else:
- return False
-
- except json.JSONDecodeError:
- print("Invalid JSON response: unable to parse")
- return False
- except Exception as e:
- print(f"Error in get_filter_neighbors: {e}")
- return False
- # 设置过滤邻居数量
- def put_filter_neighbors(self, filter: Filter):
- try:
- neighbors_tep = filter.neighbors
- if neighbors_tep < 0:
- print("put_filter_neighbors: neighbors must > 0")
- return False
- if neighbors_tep > 10:
- print("put_filter_neighbors: neighbors must < 10")
- return False
- neighbors_str = str(neighbors_tep)
- cmdback = self.put_async_http('/api/v1/sensor/filter/neighbors', neighbors_str)
- if cmdback.startswith('HTTP/1.1 200 OK'):
- return True
- else:
- print("put_filter_neighbors: operation failed")
- return False
-
- except Exception as e:
- print(f"Unexpected error in put_filter_neighbors: {e}")
- return False
- # 获取主机配置并更新Host对象
- async def get_host(self, host: Host):
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/host')
- document = json.loads(cmdback)
- if not isinstance(document, dict):
- print("Invalid JSON response: not an object")
- return False
-
- if 'host' in document:
- value = document['host']
- host.ip = value['ip']
- host.port = value['port']
- return True
- else:
- return False
-
- except json.JSONDecodeError:
- print("Invalid JSON response: unable to parse")
- return False
- except Exception as e:
- print(f"Error in get_host: {e}")
- return False
- # 获取主机IP并更新Host对象
- async def get_host_ip(self, host: Host):
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/host/ip')
- document = json.loads(cmdback)
-
- if not isinstance(document, dict):
- print("Invalid JSON response: not an object")
- return False
-
- if 'ip' in document:
- host.ip = document['ip']
- return True
- else:
- return False
-
- except json.JSONDecodeError:
- print("Invalid JSON response: unable to parse")
- return False
- except Exception as e:
- print(f"Error in get_host_ip: {e}")
- return False
- # 设置主机IP地址
- def put_host_ip(self, host: Host):
- try:
- ip = host.ip
- ip_size = len(ip)
- for i in range(ip_size):
- if not ((ip[i] >= '0' and ip[i] <= '9') or ip[i] == '.'):
- print("put_host_ip: not ipv4 format")
- return False
- try:
- parts = ip.split('.')
- if len(parts) != 4:
- print("put_host_ip: not ipv4 format")
- return False
-
- for part in parts:
- num = int(part)
- if num < 0 or num > 255:
- print("put_host_ip: not ipv4 format")
- return False
-
- except (ValueError, IndexError):
- print("put_host_ip: not ipv4 format")
- return False
- cmdback = self.put_async_http('/api/v1/sensor/host/ip', ip)
- if cmdback.startswith('HTTP/1.1 200 OK'):
- return True
- else:
- print("put_host_ip: operation failed")
- return False
-
- except Exception as e:
- print(f"Unexpected error in put_host_ip: {e}")
- return False
- # 获取主机端口并更新Host对象
- async def get_host_port(self, host: Host):
- try:
- cmdback = await self.get_async_http('/api/v1/sensor/host/port')
- document = json.loads(cmdback)
-
- if not isinstance(document, dict):
- print("Invalid JSON response: not an object")
- return False
-
- if 'port' in document:
- temp = document['port']
- host.port = int(temp)
- return True
- else:
- return False
-
- except json.JSONDecodeError:
- print("Invalid JSON response: unable to parse")
- return False
- except Exception as e:
- print(f"Error in get_host_port: {e}")
- return False
- # 设置主机端口
- def put_host_port(self, host: Host):
- goon = True
- ip = host.ip
- cmdback = self.put_async_http('/api/v1/sensor/host/port', ip)
- res = cmdback.find('HTTP/1.1 200 OK')
-
- if res == -1:
- goon = False
- message = "put_host_port: operation failed"
- else:
- message = f"Operation succeeded for IP: {ip}"
- return goon, message
- # 发送 GET 请求并处理响应
- def get_http(self, target: str):
- try:
- full_url = f"{self.base_url}{target}"
-
- headers = {
- 'User-Agent': 'Python-Requests',
- 'Connection': 'close'
- }
-
- response = requests.get(full_url, headers=headers)
-
- response.raise_for_status()
-
- response_text = response.text
-
- start = response_text.find("{")
- end = response_text.rfind("}")
-
- if start != -1 and end != -1 and end > start:
- cutstring = response_text[start:end + 1]
- return cutstring
-
- return ""
-
- except requests.RequestException as e:
- print(f"HTTP 请求错误: {e}")
- return ""
- except Exception as e:
- print(f"处理响应时发生错误: {e}")
- return ""
- # 发送 PUT 请求并处理响应
- def put_http(self, target: str, message: str):
- try:
- full_url = "{}{}".format(self.base_url, target)
- headers = {
- 'User-Agent': 'Python-Requests',
- 'Content-Type': 'application/json',
- 'Connection': 'close'
- }
- response = requests.put(full_url, headers=headers, data=message)
- response.raise_for_status()
- return response.text
- except requests.RequestException as e:
- print(f"HTTP 请求错误: {e}")
- return ""
- except Exception as e:
- print(f"处理响应时发生错误: {e}")
- return ""
- # 发送 DELETE 请求并处理响应
- def delete_http(self, target: str):
- try:
- full_url = "{}{}".format(self.base_url, target)
- headers = {
- 'User-Agent': 'Python-Requests',
- 'Connection': 'close'
- }
- response = requests.delete(full_url, headers=headers)
- response.raise_for_status()
- return response.text
- except requests.RequestException as e:
- print(f"HTTP 请求错误: {e}")
- return ""
- except Exception as e:
- print(f"处理响应时发生错误: {e}")
- return ""
- # 发送异步 GET 请求并处理响应
- async def get_async_http(self, target: str):
- full_url = f"{self.base_url}{target}"
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(full_url) as response:
- response_text = await response.text()
- return response_text
- except aiohttp.ClientError as e:
- print(f"HTTP 请求错误: {e}")
- return ""
- except Exception as e:
- print(f"处理响应时发生错误: {e}")
- return ""
-
- # 发送异步 PUT 请求并处理响应
- async def put_async_http(self, target: str, message: str):
- full_url = f"{self.base_url}{target}"
- try:
- async with aiohttp.ClientSession() as session:
- headers = {
- 'Content-Type': 'application/json',
- 'User-Agent': 'Python-aiohttp',
- 'Connection': 'close'
- }
- async with session.put(full_url,
- headers=headers,
- data=message,
- timeout=aiohttp.ClientTimeout(total=10)) as response:
- response.raise_for_status()
- response_text = await response.text()
- if not response_text:
- return "put_async_http: can't connect to http"
-
- start = response_text.find("{")
- end = response_text.rfind("}")
-
- if start != -1 and end != -1 and end >= start:
- return response_text[start:end + 1]
-
- return response_text
-
- except aiohttp.ClientError as e:
- print(f"HTTP 请求错误: {e}")
- return f"put_async_http: can't connect to http - {str(e)}"
- except asyncio.TimeoutError:
- print("请求超时")
- return "put_async_http: request timeout"
- except Exception as e:
- print(f"处理响应时发生错误: {e}")
- return f"put_async_http: unexpected error - {str(e)}"
- # 发送异步 DELETE 请求并处理响应
- async def delete_async_http(self, target: str) -> str:
- full_url = f"{self.base_url}{target}"
-
- try:
- async with aiohttp.ClientSession() as session:
- headers = {
- 'User-Agent': 'Python-aiohttp',
- 'Connection': 'close'
- }
- async with session.delete(full_url,
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=10)) as response:
- response.raise_for_status()
- response_text = await response.text()
- if not response_text:
- return "delete_async_http: can't connect to http"
- start = response_text.find("{")
- end = response_text.rfind("}")
-
- if start != -1 and end != -1 and end >= start:
- return response_text[start:end + 1]
-
- return response_text
-
- except aiohttp.ClientError as e:
- print(f"HTTP 请求错误: {e}")
- return f"delete_async_http: can't connect to http - {str(e)}"
- except asyncio.TimeoutError:
- print("请求超时")
- return "delete_async_http: request timeout"
- except Exception as e:
- print(f"处理响应时发生错误: {e}")
- return f"delete_async_http: unexpected error - {str(e)}"
- # 检查本地和目标 IP 地址的有效性
- def check_ipconfig(self) -> bool:
- goon = True
- try:
- try:
- local_ip_obj = ipaddress.ip_address(self.local_ip)
- if not isinstance(local_ip_obj, ipaddress.IPv4Address):
- raise ValueError("Not an IPv4 address")
- except (ValueError, TypeError):
- goon = False
- self.local_ip = self.HOST_IP_DEFAULT
- print(f"Invalid local IP, using default: {self.local_ip}")
- try:
- web_ip_obj = ipaddress.ip_address(self.web_ip)
- if not isinstance(web_ip_obj, ipaddress.IPv4Address):
- raise ValueError("Not an IPv4 address")
- except (ValueError, TypeError):
- goon = False
- self.web_ip = self.HTTP_IP_DEFAULT
- print(f"Invalid web IP, using default: {self.web_ip}")
- except Exception as e:
- print(f"Unexpected error in check_ipconfig: {e}")
- goon = False
- return goon
|