Browse Source

Intial commit

hdy 1 year ago
commit
d74617e925
3 changed files with 1448 additions and 0 deletions
  1. BIN
      __pycache__/test1.cpython-313.pyc
  2. 1244 0
      test1.py
  3. 204 0
      test2.py

BIN
__pycache__/test1.cpython-313.pyc


+ 1244 - 0
test1.py

@@ -0,0 +1,1244 @@
+import requests
+import json
+import ipaddress
+import re
+import aiohttp # type: ignore
+import asyncio
+
+# 固件信息类
+class Firmware:
+    def __init__(self, model: str, sn: str, fpga: str, core: str, aux: str):
+        self.model = model
+        self.sn = sn
+        self.fpga = fpga
+        self.core = core
+        self.aux = aux
+
+# 系统监控数据类
+class Monitor:
+    def __init__(self, load_average: float, men_useage: float, uptime: float):
+        self.load_average = load_average
+        self.men_useage = men_useage
+        self.uptime = uptime
+
+# 网络信息类
+class NetWork:
+    def __init__(self, carrier: bool, duplex: str, ethaddr: str, hostname: str, ipv4_dhcp: bool, ipv4_addr: str, ipv4_override: str, speed: int):
+        self.carrier = carrier
+        self.duplex = duplex
+        self.ethaddr = ethaddr
+        self.hostname = hostname
+        self.ipv4_dhcp = ipv4_dhcp
+        self.ipv4_addr = ipv4_addr
+        self.ipv4_override = ipv4_override
+        self.speed = speed
+
+# 服务器数据类
+class OverView:
+    def __init__(self, scanfreq: int, motor_rpm: int, laser_enable: bool, resolution: float, scan_range_start: int, scan_range_stop: int, filter_level: int, window: int, filter_min_angle: int,
+                 filter_max_angle: int, filter_neighbors: int, host_ip: str, host_port: int):
+        self.scanfreq = scanfreq
+        self.motor_rpm = motor_rpm
+        self.laser_enable = laser_enable
+        self.resolution = resolution
+        self.scan_range_start = scan_range_start
+        self.scan_range_stop = scan_range_stop
+        self.filter_level = filter_level
+        self.window = window
+        self.filter_min_angle = filter_min_angle
+        self.filter_max_angle = filter_max_angle
+        self.filter_neighbors = filter_neighbors
+        self.host_ip = host_ip
+        self.host_port = host_port
+
+# 雷达角度扫面范围封装类
+class ScanRange:
+    def __init__(self, start: int, stop: int):
+        self.start = start
+        self.stop = stop
+
+# 滤波器参数类
+class Filter:
+    def __init__(self, level: int, window: int, min_angle: int, max_angle: int, neighbors: int):
+        self.level = level
+        self.window = window
+        self.min_angle = min_angle
+        self.max_angle = max_angle
+        self.neighbors = neighbors
+
+# 主机配置
+class Host:
+    def __init__(self, ip: str, port: int):
+        self.ip = ip
+        self.port = port
+
+class LakiBeamHTTP:
+    HOST_IP_DEFAULT = "192.168.8.1"
+    HTTP_IP_DEFAULT = "192.168.8.2"
+    def __init__(self, local_ip: str, local_port: str, web_ip: str, web_port: str):
+        self.local_ip = local_ip
+        self.local_port = local_port
+        self.web_ip = web_ip
+        self.web_port = web_port
+        self.base_url = "http://{}:{}".format(web_ip, web_port)
+        self.check_ipconfig()
+
+    # 获取固件信息并填充到firemware
+    async def get_firemware(self, firemware: Firmware):
+        goon = True
+        try:
+            cmdback = await self.get_async_http('/api/v1/system/firmware')
+            document = json.loads(cmdback)
+            if not isinstance(document, dict):
+                return False
+
+            for field in ['model', 'sn', 'fpga', 'core', 'aux']:
+                if field in document:
+                    setattr(firemware, field, document[field])
+                else:
+                    goon = False
+                    break
+            return True 
+        except (requests.RequestException, json.JSONDecodeError) as e:
+            print(f"Error fetching data: {e}")
+            return False
+        
+    # 获取监控信息并填充到monitor
+    async def get_monitor(self, monitor: Monitor):
+        try:
+
+            cmdback = await self.get_async_http('/api/v1/system/monitor')
+            document = json.loads(cmdback)
+
+            if not isinstance(document, dict):
+                print("Error: Response is not a dictionary")
+                return False
+
+            required_fields = ['load_average', 'mem_useage', 'uptime']
+
+            for field in required_fields:
+                if field not in document:
+                    print(f"Error: Missing required field {field}")
+                    return False
+
+            monitor.load_average = document['load_average']
+            monitor.men_useage = document['mem_useage']
+            monitor.uptime = document['uptime']
+
+            return True
+
+        except json.JSONDecodeError:
+            print("Error: Failed to decode JSON")
+            return False
+        except aiohttp.ClientError as e:
+            print(f"Network error: {e}")
+            return False
+        except Exception as e:
+            print(f"Unexpected error: {e}")
+            return False
+
+    #从JSON数据中解析网络配置信息    
+    async def get_network(self, network: NetWork):
+        goon = True
+        try:
+            cmdback = await self.get_async_http('/api/v1/system/network')
+            document = json.loads(cmdback)
+            
+            if not isinstance(document, dict):
+                return False  
+
+            if 'carrier' in document:
+                network.carrier = document['carrier']
+            else:
+                goon = False
+            
+            if 'duplex' in document:
+                network.duplex = document['duplex']
+            else:
+                goon = False
+            
+            if 'ethaddr' in document:
+                network.ethaddr = document['ethaddr']
+            else:
+                goon = False
+            
+            if 'hostname' in document:
+                network.hostname = document['hostname']
+            else:
+                goon = False
+            
+            if 'ipv4' in document:
+                ipv4_info = document['ipv4']
+                if isinstance(ipv4_info, dict):
+                    network.ipv4_dhcp = ipv4_info.get('dhcp', False)
+                    network.ipv4_addr = ipv4_info.get('addr', "")
+                    network.ipv4_override = ipv4_info.get('override', "")
+                else:
+                    goon = False
+            
+            if 'speed' in document:
+                network.speed = document['speed']
+            
+            return goon
+        except json.JSONDecodeError:
+            print("Error decoding JSON response")
+            return False
+        except requests.RequestException as e:
+            print(f"Request error: {e}")
+            return False
+        except Exception as e:
+            print(f"Unexpected error in get_network: {e}")
+            return False
+    
+    # 验证IP地址格式合法性、检查IP地址是否为IPv4地址、发生HTTP请求来设置IP地址
+    def put_override(self, override):
+        goon = True
+        result = ""
+        try:
+            if not re.match(r'^[0-9.]+$', override):
+                goon = False
+                result = "put_override: not ipv4 format"
+                return goon, result
+
+            if "/" not in override:
+                goon = False
+                result = "put_override: format is wrong"
+                return goon, result
+
+            ip_part = override.split('/')[0]
+            
+            try:
+                ip = ipaddress.ip_address(ip_part)
+                if not isinstance(ip, ipaddress.IPv4Address):
+                    goon = False
+                    result = "put_override: wrong ip format\r\n"
+                    return goon, result
+            except ValueError:
+                goon = False
+                result = "put_override: wrong ip format\r\n"
+                return goon, result
+
+            if goon:
+                result = ""
+                cmdback = self.put_async_http('/api/v1/system/network/override', override)
+
+                if 'HTTP/1.1 200 OK' in cmdback:
+                    result = 'HTTP/1.1 200 OK'
+                else:
+                    goon = False
+                    result = "put_override: operation failed"
+
+        except Exception as e:
+            goon = False
+            result = f"put_override: error - {str(e)}"
+
+        return goon, result
+    
+    # 删除静态模式配置
+    def delete_override(self):
+        goon = True
+        result = ""
+
+        try:
+            response = requests.delete('/api/v1/system/network/override')
+            cmdback = response.text
+            
+            if 'HTTP/1.1 200 OK' in cmdback:
+                result = 'HTTP/1.1 200 OK'
+            else:
+                goon = False
+                result = "delete_override: operation failed"
+        except requests.RequestException as e:
+            goon = False
+            result = f"delete_override: request error - {str(e)}"
+
+        return goon, result      
+
+    # 执行系统重置
+    def put_reset(self):
+        result = ""
+        goon = True
+        
+        try:
+            response = requests.put('/api/v1/system/reset', json={'reset': True})
+            cmdback = response.text
+            
+            if 'HTTP/1.1 200 OK' in cmdback:
+                result = 'HTTP/1.1 200 OK'
+            else:
+                goon = False
+                result = "put_reset: operation failed"
+        
+        except requests.RequestException as e:
+            goon = False
+            result = f"put_reset: request error - {str(e)}"
+
+        return goon, result  
+
+    # 获取传感器的概述信息
+    async def get_overview(self, overview: OverView):
+        goon = True
+        OVERVIEW_FIELDS = {
+            'scanfreq': ('scanfreq', int),
+            'motor_rpm': ('motor_rpm', int),
+            'laser_enable': ('laser_enable', bool),
+            'resolution': ('resolution', float),
+            'scan_range': ('scan_range', {
+                'start': ('start', int),
+                'stop': ('stop', int)
+            }),
+            'filter': ('filter', {
+                'level': ('level', int),
+                'min_angle': ('min_angle', int),
+                'max_angle': ('max_angle', int),
+                'neighbors': ('neighbors', int),
+                'window': ('window', int)  
+            }),
+            'host': ('host', {
+                'ip': ('ip', str),
+                'port': ('port', int)
+            })
+        }
+        
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/overview')
+            document = json.loads(cmdback)
+            
+            
+            if not isinstance(document, dict):
+                return False
+
+            for field_name, (json_key, value_type) in OVERVIEW_FIELDS.items():
+                if isinstance(value_type, dict):  
+                    if json_key in document:
+                        nested_data = document[json_key]
+                        for nested_field, (nested_key, nested_type) in value_type.items():
+                            if nested_key in nested_data:
+
+                                setattr(overview, field_name + '_' + nested_field, nested_type(nested_data[nested_key]))
+                            else:
+                                print(f"Missing nested key: {nested_key}")
+                                goon = False
+                    else:
+                        print(f"Missing JSON key: {json_key}")
+                        goon = False
+                else:  
+                    if json_key in document:
+
+                        setattr(overview, field_name, value_type(document[json_key]))
+                    else:
+                        goon = False
+
+        except json.JSONDecodeError as e:
+            print(f"JSON Decode Error: {e}")
+            goon = False
+        except Exception as e:
+            print(f"Unexpected error: {e}")
+            goon = False
+
+        return goon
+          
+
+    # 设置扫描频率
+    def put_scanfreq(self, freq):
+        goon = False
+
+        try:
+            freq_tep = int(freq)
+        except ValueError:
+            goon = False
+            return False, "put_scanfreq: frequency must be an integer"
+
+        if freq_tep not in [10, 20, 25, 30]:
+            goon = False
+            return False, "put_scanfreq: wrong freq"
+        
+        if goon:
+            freq_str = str(freq_tep)
+            try:
+                response = requests.put('/api/v1/sensor/scanfreq', json={'freq': freq_str})
+                cmdback = response.text
+                
+                if 'HTTP/1.1 200 OK' not in cmdback:
+                    goon = False
+                    return False, "put_scanfreq: operate failed"
+            except requests.RequestException as e:
+                goon = False
+                return False, f"put_scanfreq: request error - {str(e)}"
+
+        return goon, ""
+
+    # 获取扫描频率
+    async def get_scanfreq(self):
+        result = ""
+        goon = True
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/scanfreq')
+            document = json.loads(cmdback)
+            if not isinstance(document, dict):
+                goon = False
+            else:
+                if 'freq' in document:
+                      output_freq = document['freq']
+                      result = str(output_freq)  
+                else:
+                    goon = False
+
+        except requests.RequestException:
+            goon = False
+        except json.JSONDecodeError:
+            goon = False
+
+        return goon, result
+
+    # 获取激光状态
+    def get_motor_rpm(self):
+
+            motor_rpm = ""
+            goon = True
+            
+            try:
+                cmdback = self.get_async_http('/api/v1/sensor/motor_rpm')
+                try:
+                    document = json.loads(cmdback)
+                except json.JSONDecodeError:
+                    return False, ""
+                
+                if not isinstance(document, dict):
+                    return False, ""
+                
+                if 'rpm' in document:
+                    try:
+                        output_rpm = int(document['rpm'])
+                        motor_rpm = str(output_rpm)
+                    except (ValueError, TypeError):
+                        goon = False
+                else:
+                    goon = False
+
+            except requests.RequestException:
+                goon = False
+            
+            return goon, motor_rpm
+
+    # 切换激光状态
+    async def get_laser_enable(self):
+        laser_state = ""
+        goon = True
+        
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/laser_enable')
+            try:
+                document = json.loads(cmdback)
+            except json.JSONDecodeError:
+                return False, "false"
+            
+            if not isinstance(document, dict):
+                return False, "false"
+            
+            if 'HTTP/1.1 200 OK' in document:
+                goon = 'HTTP/1.1 200 OK'  
+            else:
+                goon = False
+
+        except requests.RequestException:
+            goon = False
+
+        laser_state = "true" if goon else "false"
+        
+        return goon, laser_state
+
+    # 获取雷达当前水平角分辨率
+    def put_laser_enable(self, config_state):
+        goon = True
+        
+        if config_state == "true":
+            config_state = "true"
+        elif config_state == "false":
+            config_state = "false"
+        else:
+            goon = False
+            config_state = "put_laser_enable: wrong parameter"
+        
+        if goon:
+            try:
+                cmdback = self.put_async_http('/api/v1/sensor/laser_enable', config_state)
+                
+                if not cmdback.startswith('HTTP/1.1 200 OK'):
+                    goon = False
+                    config_state = "put_laser_enable: operation failed"
+                else:
+                    config_state = "true"  
+
+            except requests.RequestException:
+                goon = False
+                config_state = "put_laser_enable: request error"
+
+        return goon, config_state
+
+    # 获取激光分辨率
+    async def get_resolution(self, overview: OverView):
+        resolution = ""
+        goon = True
+        
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/resolution')
+            
+            try:
+                document = json.loads(cmdback)
+            except json.JSONDecodeError:
+                return False, ""  
+            
+            if not isinstance(document, dict):
+                goon = False
+            else:
+                if 'resolution' in document:
+                    output_resolution = document['resolution']
+                    overview.resolution = str(output_resolution)  
+                else:
+                    goon = False
+
+        except requests.RequestException:
+            goon = False
+        
+        return goon, overview.resolution
+
+    # 获取激光扫描起始角度
+    async def get_scan_range(self, scan_range: ScanRange):
+
+        goon = True
+        
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/scan_range')
+
+            try:
+                document = json.loads(cmdback)
+            except json.JSONDecodeError:
+                return False, scan_range  
+
+            if not isinstance(document, dict):
+                goon = False
+            else:
+                if 'scan_range' in document:
+                    value = document['scan_range']
+                    scan_range.start = value['start'] 
+                    scan_range.stop = value['stop']    
+                else:
+                    goon = False
+
+        except requests.RequestException:
+            goon = False
+        
+        return goon, scan_range
+
+    # 获取激光扫描起始角度
+    async def get_laser_start(self, scan_range: ScanRange):
+        goon = True
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/scan_range/start')
+            try:
+                document = json.loads(cmdback)          
+                if isinstance(document, dict) and 'start' in document:
+                    output_start = document['start']
+                    scan_range.start = output_start
+                else:
+                    goon = False
+
+            except json.JSONDecodeError as e:
+                return False, scan_range
+
+        except Exception as e:
+            goon = False
+
+        return goon, scan_range
+
+    # 获取激光扫描停止角度
+    async def get_laser_stop(self, scan_range: ScanRange):
+        goon = True
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/scan_range/stop')
+            try:
+                document = json.loads(cmdback)          
+                if isinstance(document, dict) and 'stop' in document:
+                    output_stop = document['stop']
+                    scan_range.stop = output_stop
+                else:
+                    goon = False
+
+            except json.JSONDecodeError as e:
+                return False, scan_range
+
+        except Exception as e:
+            goon = False
+
+        return goon, scan_range
+
+    # 设置激光扫描开始角度
+    def put_laser_start(self, scan_range: ScanRange):
+        goon = True
+        error_message = ""
+
+        start_freq = scan_range.start
+        if start_freq > 315 or start_freq < 45:
+            goon = False
+            error_message = "put_laser_start: start must < 315 and > 45"
+            return goon, scan_range, error_message
+
+        if goon:
+            temp_scan_range = ScanRange()
+            stop_success, temp_scan_range = self.get_laser_stop(temp_scan_range)
+            
+            if not stop_success:
+                goon = False
+                error_message = "Failed to retrieve current stop position"
+                return goon, scan_range, error_message
+
+            if start_freq >= temp_scan_range.stop:
+                goon = False
+                error_message = "put_laser_start: start must > stop"
+                return goon, scan_range, error_message
+
+        if goon:
+            try:
+                start_str = str(start_freq)
+
+                cmdback = self.put_async_http('/api/v1/sensor/scan_range/start', start_str)
+                
+                if not cmdback.startswith('HTTP/1.1 200 OK'):
+                    goon = False
+                    error_message = "put_laser_start: operation failed"
+                    return goon, scan_range, error_message
+
+            except Exception as e:
+                goon = False
+                error_message = f"put_laser_start: operation failed - {str(e)}"
+                return goon, scan_range, error_message
+
+        return goon, scan_range, error_message   
+
+    # 设置激光扫描结束角度
+    def put_laser_stop(self, scan_range: ScanRange):
+        goon = True
+        error_message = ""
+
+        stop_freq = scan_range.stop
+        
+        if stop_freq > 315 or stop_freq < 45:
+            goon = False
+            error_message = "put_laser_stop: stop must < 315 and > 45"
+            return goon, scan_range, error_message
+
+        if goon:
+            try:
+                cmdback = self.get_async_http('/api/v1/sensor/scan_range/start')
+                
+                if not cmdback.startswith('HTTP/1.1 200 OK'):
+                    goon = False
+                    error_message = "Failed to retrieve start position"
+                    return goon, scan_range, error_message
+
+                try:
+                    document = json.loads(cmdback)
+                except json.JSONDecodeError:
+                    goon = False
+                    error_message = "Failed to parse start position data"
+                    return goon, scan_range, error_message
+
+                if 'start' in document:
+                    start_freq = document['start']
+                else:
+                    goon = False
+                    error_message = "Start position not found"
+                    return goon, scan_range, error_message
+
+                if start_freq >= stop_freq:
+                    goon = False
+                    error_message = "put_laser_stop: stop must > start"
+                    return goon, scan_range, error_message
+
+            except requests.RequestException as e:
+                goon = False
+                error_message = f"Failed to retrieve start position: {str(e)}"
+                return goon, scan_range, error_message
+
+ 
+        if goon:
+            try:
+                stop_str = str(stop_freq)
+                cmdback = self.put_async_http('/api/v1/sensor/scan_range/stop', stop_str)
+                
+                # 验证响应
+                if not cmdback.startswith('HTTP/1.1 200 OK'):
+                    goon = False
+                    error_message = "put_laser_stop: operation failed"
+                    return goon, scan_range, error_message
+
+            except Exception as e:
+                goon = False
+                error_message = f"put_laser_stop: operation failed - {str(e)}"
+                return goon, scan_range, error_message
+
+        return goon, scan_range, error_message
+
+    # 获取滤波器的设置
+    async def get_filter_level(self, filter: Filter):
+        goon = True
+        
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/filter')
+            document = json.loads(cmdback)
+            
+            if not isinstance(document, dict):
+                goon = False
+            else:
+                if 'filter' in document:
+                    value = document['filter']
+                    filter.level = value['level']
+                    filter.window = value['window']
+                    filter.min_angle = value['min_angle']
+                    filter.max_angle = value['max_angle']
+                    filter.neighbors = value['neighbors']
+                else:
+                    goon = False
+
+        except (requests.RequestException, json.JSONDecodeError) as e:
+            goon = False
+            print(f"Error occurred: {str(e)}")
+
+        return goon
+
+    # 设置过滤级别
+    def put_filter_level(self, filter: Filter):
+        try:
+            level = filter.level
+            if level not in [0, 1, 2, 3]:
+                raise ValueError("put_filter_level: wrong level parameter")
+            level_str = str(level)
+            cmdback = self.put_async_http('/api/v1/sensor/filter/level', level_str)
+            if cmdback.startswith('HTTP/1.1 200 OK'):
+                return True
+            else:
+                raise ValueError("put_filter_level: operation failed")
+        
+        except Exception as e:
+            print(str(e))
+            return False
+
+    # 获取过滤窗口设置
+    def get_filter_window(self, filter: Filter):
+        try:
+            cmdback = self.get_async_http('/api/v1/sensor/filter/window')
+            document = json.loads(cmdback)
+            
+            if 'window' in document:
+                temp = document['window']
+                filter.window = temp
+                return True
+            else:
+                return False
+        
+        except (json.JSONDecodeError, Exception) as e:
+            print(f"Error in get_filter_window: {e}")
+            return False
+
+    # 设置过滤窗口
+    def put_filter_window(self, filter: Filter):
+        try:
+            window_tep = filter.window
+            
+            if window_tep < 0:
+                raise ValueError("put_filter_window: window must > 0")
+            
+            if window_tep > 10:
+                raise ValueError("put_filter_window: window must <= 10")
+            
+            window_str = str(window_tep)
+            
+            cmdback = self.put_async_http('/api/v1/sensor/filter/window', window_str)
+            
+            if cmdback.startswith('HTTP/1.1 200 OK'):
+                return True
+            else:
+                raise ValueError("put_filter_window: operation failed")
+        
+        except Exception as e:
+            print(str(e))
+            return False
+
+    # 获取过滤最大角度设置并更新Filter对象
+    def get_filter_max_angle(self, filter: Filter):
+
+        try:
+            cmdback = self.get_async_http('/api/v1/sensor/filter/max_angle')
+            document = json.loads(cmdback)
+            
+            # 检查是否为有效的JSON对象
+            if not isinstance(document, dict):
+                print("Invalid JSON response: not an object")
+                return False
+            
+            if 'max_angle' in document:
+                temp = document['max_angle']
+                filter.max_angle = int(temp)
+                return True
+            else:
+                print(f"No key found in response")
+                return False
+        
+        except json.JSONDecodeError:
+            print("Invalid JSON response: unable to parse")
+            return False
+        except Exception as e:
+            print(f"Error in get_filter_max_angle: {e}")
+            return False
+   
+    # 设置过滤最大角度
+    def put_filter_max_angle(self, filter: Filter):
+        try:
+            max_angle_tep = filter.max_angle
+            
+            # 更严格的参数校验
+            if not isinstance(max_angle_tep, int):
+                raise TypeError("Max angle must be an integer")
+            
+            # 检查最大角度值的有效范围(根据实际需求调整)
+            if max_angle_tep < 0:
+                raise ValueError("Max angle must be >= 0")
+            
+            if max_angle_tep > 180:  # 假设角度范围在0-180之间
+                raise ValueError("Max angle must be <= 180")
+            
+            # 将最大角度值转换为字符串
+            max_angle_str = str(max_angle_tep)
+            
+            # 发送异步HTTP请求
+            cmdback = self.put_async_http('/api/v1/sensor/filter/max_angle', max_angle_str)
+            
+            # 检查响应是否包含预期的确认信息
+            if cmdback.startswith('HTTP/1.1 200 OK'):
+                return True
+            else:
+                raise ValueError("Put filter max angle operation failed")
+        
+        except (ValueError, TypeError) as e:
+            print(f"Validation error: {e}")
+            return False
+        except Exception as e:
+            print(f"Unexpected error in put_filter_max_angle: {e}")
+            return False
+
+    def get_filter_min_angle(self, filter: Filter):
+        try:
+            # 发送HTTP GET请求
+            cmdback = self.get_async_http('/api/v1/sensor/filter/min_angle')
+            document = json.loads(cmdback)
+            
+            # 检查返回的JSON对象是否包含特定的键
+            if 'min_angle' in document:
+                temp = document['min_angle']
+                filter.min_angle = int(temp)  # 更新Filter对象的min_angle属性
+                return True
+            else:
+                print("No 'min_angle' key found in response")
+                return False
+        
+        except json.JSONDecodeError:
+            print("Invalid JSON response")
+            return False
+        except Exception as e:
+            print(f"Error in get_filter_min_angle: {e}")
+            return False
+
+    # 设置过滤最小角度
+    def put_filter_min_angle(self, filter: Filter):
+        try:
+            # 获取最小角度值
+            min_angle_tep = filter.min_angle
+            if min_angle_tep < 0:
+                print("put_filter_min_angle: min_angle must >= 0")
+                return False
+            if min_angle_tep > 359:
+                print("put_filter_min_angle: min_angle must <= 359")
+                return False
+
+            max_angle_str = ""
+            if self.get_filter_max_angle(max_angle_str):
+                max_angle_tep = int(max_angle_str)
+                if min_angle_tep > max_angle_tep:
+                    print("put_filter_min_angle: max_angle must >= min_angle")
+                    return False
+            else:
+                print("Failed to get max angle for comparison")
+                return False
+
+            min_angle_str = str(min_angle_tep)
+            cmdback = self.put_async_http('/api/v1/sensor/filter/min_angle', min_angle_str)
+            if cmdback.startswith('HTTP/1.1 200 OK'):
+                return True
+            else:
+                print("put_filter_min_angle: operation failed")
+                return False
+        
+        except Exception as e:
+            print(f"Unexpected error in put_filter_min_angle: {e}")
+            return False
+
+    # 获取过滤邻居设置并更新Filter对象
+    def get_filter_neighbors(self, filter: Filter):
+        try:
+            cmdback = self.get_async_http('/api/v1/sensor/filter/neighbors')
+            document = json.loads(cmdback)
+            if not isinstance(document, dict):
+                print("Invalid JSON response: not an object")
+                return False
+            
+            # 检查返回的JSON对象是否包含特定的键
+            if 'neighbors' in document:
+                # 获取邻居数量并更新Filter对象的neighbors属性
+                temp = document['neighbors']
+                filter.neighbors = int(temp)  # 确保将其转换为整数
+                return True
+            else:
+                return False
+        
+        except json.JSONDecodeError:
+            print("Invalid JSON response: unable to parse")
+            return False
+        except Exception as e:
+            print(f"Error in get_filter_neighbors: {e}")
+            return False
+
+    # 设置过滤邻居数量
+    def put_filter_neighbors(self, filter: Filter):
+        try:
+            neighbors_tep = filter.neighbors
+            if neighbors_tep < 0:
+                print("put_filter_neighbors: neighbors must > 0")
+                return False
+            if neighbors_tep > 10:
+                print("put_filter_neighbors: neighbors must < 10")
+                return False
+            neighbors_str = str(neighbors_tep)
+            cmdback = self.put_async_http('/api/v1/sensor/filter/neighbors', neighbors_str)
+            if cmdback.startswith('HTTP/1.1 200 OK'):
+                return True
+            else:
+                print("put_filter_neighbors: operation failed")
+                return False
+        
+        except Exception as e:
+            print(f"Unexpected error in put_filter_neighbors: {e}")
+            return False    
+
+    # 获取主机配置并更新Host对象
+    async def get_host(self, host: Host):
+        try:
+
+            cmdback = await self.get_async_http('/api/v1/sensor/host')
+            document = json.loads(cmdback)
+
+            if not isinstance(document, dict):
+                print("Invalid JSON response: not an object")
+                return False
+            
+            if 'host' in document:
+                value = document['host']
+                host.ip = value['ip']
+                host.port = value['port']
+                return True
+            else:
+                return False
+        
+        except json.JSONDecodeError:
+            print("Invalid JSON response: unable to parse")
+            return False
+        except Exception as e:
+            print(f"Error in get_host: {e}")
+            return False
+
+    # 获取主机IP并更新Host对象
+    async def get_host_ip(self, host: Host):
+
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/host/ip')
+            document = json.loads(cmdback)
+            
+            if not isinstance(document, dict):
+                print("Invalid JSON response: not an object")
+                return False
+            
+            if 'ip' in document:
+                host.ip = document['ip']
+                return True
+            else:
+                return False
+        
+        except json.JSONDecodeError:
+            print("Invalid JSON response: unable to parse")
+            return False
+        except Exception as e:
+            print(f"Error in get_host_ip: {e}")
+            return False
+
+    # 设置主机IP地址
+    def put_host_ip(self, host: Host):
+        try:
+            ip = host.ip
+            ip_size = len(ip)
+            for i in range(ip_size):
+                if not ((ip[i] >= '0' and ip[i] <= '9') or ip[i] == '.'):
+                    print("put_host_ip: not ipv4 format")
+                    return False
+            try:
+                parts = ip.split('.')
+                if len(parts) != 4:
+                    print("put_host_ip: not ipv4 format")
+                    return False
+                
+                for part in parts:
+                    num = int(part)
+                    if num < 0 or num > 255:
+                        print("put_host_ip: not ipv4 format")
+                        return False
+            
+            except (ValueError, IndexError):
+                print("put_host_ip: not ipv4 format")
+                return False
+
+            cmdback = self.put_async_http('/api/v1/sensor/host/ip', ip)
+            if cmdback.startswith('HTTP/1.1 200 OK'):
+                return True
+            else:
+                print("put_host_ip: operation failed")
+                return False
+        
+        except Exception as e:
+            print(f"Unexpected error in put_host_ip: {e}")
+            return False
+
+    # 获取主机端口并更新Host对象
+    async def get_host_port(self, host: Host):
+        try:
+            cmdback = await self.get_async_http('/api/v1/sensor/host/port')
+            document = json.loads(cmdback)
+            
+            if not isinstance(document, dict):
+                print("Invalid JSON response: not an object")
+                return False
+            
+            if 'port' in document:
+                temp = document['port']
+                host.port = int(temp)
+                return True
+            else:
+                return False
+        
+        except json.JSONDecodeError:
+            print("Invalid JSON response: unable to parse")
+            return False
+        except Exception as e:
+            print(f"Error in get_host_port: {e}")
+            return False
+
+    # 设置主机端口
+    def put_host_port(self, host: Host):
+        goon = True
+        ip = host.ip
+        cmdback = self.put_async_http('/api/v1/sensor/host/port', ip)
+
+        res = cmdback.find('HTTP/1.1 200 OK')
+        
+        if res == -1: 
+            goon = False
+            message = "put_host_port: operation failed"
+        else:
+            message = f"Operation succeeded for IP: {ip}"
+
+        return goon, message
+
+    # 发送 GET 请求并处理响应
+    def get_http(self, target: str):
+
+        try:
+            full_url = f"{self.base_url}{target}"
+            
+            headers = {
+                'User-Agent': 'Python-Requests',
+                'Connection': 'close'
+            }
+            
+            response = requests.get(full_url, headers=headers)
+            
+            response.raise_for_status()
+            
+            response_text = response.text
+            
+            start = response_text.find("{")
+            end = response_text.rfind("}")
+            
+            if start != -1 and end != -1 and end > start:
+                cutstring = response_text[start:end + 1]
+                return cutstring
+            
+            return ""
+        
+        except requests.RequestException as e:
+            print(f"HTTP 请求错误: {e}")
+            return ""
+        except Exception as e:
+            print(f"处理响应时发生错误: {e}")
+            return ""
+
+    # 发送 PUT 请求并处理响应
+    def put_http(self, target: str, message: str):
+        try:
+            full_url = "{}{}".format(self.base_url, target)
+            headers = {
+                'User-Agent': 'Python-Requests',
+                'Content-Type': 'application/json',
+                'Connection': 'close'
+            } 
+            response = requests.put(full_url, headers=headers, data=message)
+            response.raise_for_status()  
+            return response.text
+        except requests.RequestException as e:
+            print(f"HTTP 请求错误: {e}")
+            return ""
+        except Exception as e:
+            print(f"处理响应时发生错误: {e}")
+            return ""
+
+    # 发送 DELETE 请求并处理响应
+    def delete_http(self, target: str):
+        try:
+            full_url = "{}{}".format(self.base_url, target)
+            headers = {
+                'User-Agent': 'Python-Requests',
+                'Connection': 'close'
+            }            
+            response = requests.delete(full_url, headers=headers)
+            response.raise_for_status()
+            return response.text
+        except requests.RequestException as e:
+            print(f"HTTP 请求错误: {e}")
+            return ""
+        except Exception as e:
+            print(f"处理响应时发生错误: {e}")
+            return ""
+
+    # 发送异步 GET 请求并处理响应
+    async def get_async_http(self, target: str):
+        full_url = f"{self.base_url}{target}"
+        try:
+            async with aiohttp.ClientSession() as session:
+                async with session.get(full_url) as response:
+                    response_text = await response.text()
+                    return response_text  
+        except aiohttp.ClientError as e:
+            print(f"HTTP 请求错误: {e}")
+            return ""  
+        except Exception as e:
+            print(f"处理响应时发生错误: {e}")
+            return ""
+        
+    # 发送异步 PUT 请求并处理响应
+    async def put_async_http(self, target: str, message: str):
+        full_url = f"{self.base_url}{target}"
+        try:
+            async with aiohttp.ClientSession() as session:
+                headers = {
+                    'Content-Type': 'application/json',
+                    'User-Agent': 'Python-aiohttp',
+                    'Connection': 'close'
+                }
+
+                async with session.put(full_url, 
+                                       headers=headers, 
+                                       data=message,
+                                       timeout=aiohttp.ClientTimeout(total=10)) as response:
+                    response.raise_for_status()
+                    response_text = await response.text()
+                    if not response_text:
+                        return "put_async_http: can't connect to http"
+                    
+                    start = response_text.find("{")
+                    end = response_text.rfind("}")
+                    
+                    if start != -1 and end != -1 and end >= start:
+                        return response_text[start:end + 1]
+                    
+                    return response_text
+        
+        except aiohttp.ClientError as e:
+            print(f"HTTP 请求错误: {e}")
+            return f"put_async_http: can't connect to http - {str(e)}"
+        except asyncio.TimeoutError:
+            print("请求超时")
+            return "put_async_http: request timeout"
+        except Exception as e:
+            print(f"处理响应时发生错误: {e}")
+            return f"put_async_http: unexpected error - {str(e)}"
+
+    # 发送异步 DELETE 请求并处理响应
+    async def delete_async_http(self, target: str) -> str:
+        full_url = f"{self.base_url}{target}"
+        
+        try:
+            async with aiohttp.ClientSession() as session:
+                headers = {
+                    'User-Agent': 'Python-aiohttp',
+                    'Connection': 'close'
+                }
+                async with session.delete(full_url, 
+                                          headers=headers,
+                                          timeout=aiohttp.ClientTimeout(total=10)) as response:
+                    response.raise_for_status()
+                    response_text = await response.text()
+                    if not response_text:
+                        return "delete_async_http: can't connect to http"
+                    start = response_text.find("{")
+                    end = response_text.rfind("}")
+                    
+                    if start != -1 and end != -1 and end >= start:
+                        return response_text[start:end + 1]
+                    
+                    return response_text
+        
+        except aiohttp.ClientError as e:
+            print(f"HTTP 请求错误: {e}")
+            return f"delete_async_http: can't connect to http - {str(e)}"
+        except asyncio.TimeoutError:
+            print("请求超时")
+            return "delete_async_http: request timeout"
+        except Exception as e:
+            print(f"处理响应时发生错误: {e}")
+            return f"delete_async_http: unexpected error - {str(e)}"
+
+    # 检查本地和目标 IP 地址的有效性
+    def check_ipconfig(self) -> bool:
+        goon = True
+
+        try:
+            try:
+                local_ip_obj = ipaddress.ip_address(self.local_ip)
+                if not isinstance(local_ip_obj, ipaddress.IPv4Address):
+                    raise ValueError("Not an IPv4 address")
+            except (ValueError, TypeError):
+                goon = False
+                self.local_ip = self.HOST_IP_DEFAULT
+                print(f"Invalid local IP, using default: {self.local_ip}")
+
+            try:
+                web_ip_obj = ipaddress.ip_address(self.web_ip)
+                if not isinstance(web_ip_obj, ipaddress.IPv4Address):
+                    raise ValueError("Not an IPv4 address")
+            except (ValueError, TypeError):
+                goon = False
+                self.web_ip = self.HTTP_IP_DEFAULT
+                print(f"Invalid web IP, using default: {self.web_ip}")
+
+        except Exception as e:
+            print(f"Unexpected error in check_ipconfig: {e}")
+            goon = False
+
+        return goon

+ 204 - 0
test2.py

@@ -0,0 +1,204 @@
+import asyncio
+from test1 import LakiBeamHTTP, Firmware, Monitor, NetWork, OverView, ScanRange, Filter, Host
+
+# 主测试函数
+async def main():
+    # 创建 LakiBeamHTTP 实例
+    laki_http = LakiBeamHTTP(
+        local_ip="192.168.8.1", 
+        local_port="8080", 
+        web_ip="192.168.8.2", 
+        web_port="80"
+    )
+
+    # 检查 IP 配置
+    if laki_http.check_ipconfig():
+        print("IP 配置有效")
+    else:
+        print("IP 配置无效,已使用默认值")
+
+    # 测试获取固件信息
+    firmware = Firmware("", "", "", "", "")
+    if await laki_http.get_firemware(firmware):  
+        print("+------------------+--------------------------------------+\n"
+              "| Device Information|                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Model            | {firmware.model:<38} |\n"
+              f"| SN               | {firmware.sn:<38} |\n"
+              f"| FPGA             | {firmware.fpga:<38} |\n"
+              f"| Core             | {firmware.core:<38} |\n"
+              f"| Aux              | {firmware.aux:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取固件信息失败")
+
+    # 测试获取监控信息
+    monitor = Monitor(0.0, 0.0, 0.0)
+    if await laki_http.get_monitor(monitor):  
+        print("+------------------+--------------------------------------+\n"
+              "|  System Monitor   |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Load Average     | {monitor.load_average:<38} |\n"
+              f"| Mem Usage        | {monitor.men_useage}%{' '*(37-len(str(monitor.men_useage)+'%'))} |\n"
+              f"| Uptime           | {monitor.uptime:.2f}s{' ' * (38 - len(f'{monitor.uptime:.2f}s'))} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取监控信息失败")
+
+    # 测试获取网络信息
+    network = NetWork(False, "", "", "", False, "", "", 0)
+    if await laki_http.get_network(network):  
+        print("+------------------+--------------------------------------+\n"
+              "| Network Information |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Carrier          | {'True' if network.carrier else 'False':<38} |\n"
+              f"| Duplex           | {network.duplex:<38} |\n"
+              f"| Ethaddr          | {network.ethaddr:<38} |\n"
+              f"| Hostname         | {network.hostname:<38} |\n"
+              f"| Speed            | {network.speed:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取网络信息失败")
+
+    # 测试获取概述信息
+    overview = OverView(0, 0, False, 0.0, 0, 0, 0, 0, 0, 0, 0, "", 0)
+    if await laki_http.get_overview(overview):
+        print("+------------------+--------------------------------------+\n"
+              "| Overview Information |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Scan Frequency    | {overview.scanfreq:<38} |\n"
+              f"| Motor RPM        | {overview.motor_rpm:<38} |\n"
+              f"| Laser Enable     | {'True' if overview.laser_enable else 'False':<38} |\n"
+              f"| Resolution       | {overview.resolution:<38} |\n"
+              f"| Scan Range Start | {overview.scan_range_start:<38} |\n"
+              f"| Scan Range Stop  | {overview.scan_range_stop:<38} |\n"
+              f"| Filter Level     | {overview.filter_level:<38} |\n"
+              f"| Window           | {overview.filter_window:<38} |\n"
+              f"| Min Angle        | {overview.filter_min_angle:<38} |\n"
+              f"| Max Angle        | {overview.filter_max_angle:<38} |\n"
+              f"| Neighbors        | {overview.filter_neighbors:<38} |\n"
+              f"| Host IP          | {overview.host_ip:<38} |\n"
+              f"| Host Port        | {overview.host_port:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取概述信息失败")
+
+    # 测试获取扫描范围
+    scan_range = ScanRange(0, 0)
+    if await laki_http.get_scan_range(scan_range):
+        print("+------------------+--------------------------------------+\n"
+              "|     Scan Range    |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Start            | {scan_range.start:<38} |\n"
+              f"| Stop             | {scan_range.stop:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取扫描范围失败")
+
+    # 测试获取滤波器设置
+    filter_settings = Filter(0, 0, 0, 0, 0)
+    if await laki_http.get_filter_level(filter_settings):
+        print("+------------------+--------------------------------------+\n"
+              "|  Filter Settings  |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Level            | {filter_settings.level:<38} |\n"
+              f"| Window           | {filter_settings.window:<38} |\n"
+              f"| Min Angle        | {filter_settings.min_angle:<38} |\n"
+              f"| Max Angle        | {filter_settings.max_angle:<38} |\n"
+              f"| Neighbors        | {filter_settings.neighbors:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取滤波器设置失败")
+
+    # 测试获取主机配置
+    host = Host("", 0)
+    if await laki_http.get_host(host):
+        print("+------------------+--------------------------------------+\n"
+              "|Host Configuration |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| IP               | {host.ip:<38} |\n"
+              f"| Port             | {host.port:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取主机配置失败")
+
+    # 测试获取主机 IP
+    if await laki_http.get_host_ip(host):
+        print("+------------------+--------------------------------------+\n"
+              "|    Host IP       |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| IP               | {host.ip:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取主机 IP 失败")
+
+    # 测试获取主机端口
+    if await laki_http.get_host_port(host):
+        print("+------------------+--------------------------------------+\n"
+              "|    Host Port     |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Port             | {host.port:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取主机端口失败")
+
+    # 测试获取激光状态
+    laser_state = ""
+    if await laki_http.get_laser_enable():
+        laser_state = "true" if laser_state else "false"
+        print("+------------------+--------------------------------------+\n"
+              "|   Laser State     |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Laser Enable     | {laser_state:<38} |\n"
+              "+------------------+--------------------------------------+")
+
+    else:
+        print("获取激光状态失败")
+
+    # 测试获取激光分辨率
+    overview = OverView(0, 0, False, 0.0, 0, 0, 0, 0, 0, 0, 0, "", 0)
+    success, resolution = await laki_http.get_resolution(overview)
+    if success:
+        print("+------------------+--------------------------------------+\n"
+              "|Laser Resolution   |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Resolution       | {resolution:<38} |\n"
+              "+------------------+--------------------------------------+")
+    else:
+        print("获取激光分辨率失败")
+
+    # 测试获取激光扫描起始角度
+    scan_range_start = ScanRange(0, 0)
+    success, scan_range_start = await laki_http.get_laser_start(scan_range_start)  # 解构返回值
+    if success:  # 检查操作是否成功
+        print("+------------------+--------------------------------------+\n"
+              "| Laser Start Angle |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Start            | {scan_range_start.start:<38} |\n"
+              "+------------------+--------------------------------------+")
+    else:
+        print("获取激光扫描起始角度失败")
+
+    # 测试获取激光扫描结束角度
+    scan_range_stop = ScanRange(0, 0)  # 创建 ScanRange 对象用于结束角度
+    success, scan_range_stop = await laki_http.get_laser_stop(scan_range_stop)  # 调用获取结束角度的函数
+    if success:  # 检查操作是否成功
+        print("+------------------+--------------------------------------+\n"
+              "|  Laser Stop Angle |                                      |\n"
+              "+------------------+--------------------------------------+\n"
+              f"| Stop             | {scan_range_stop.stop:<38} |\n"
+              "+------------------+--------------------------------------+")
+    else:
+        print("获取激光扫描结束角度失败")
+
+if __name__ == "__main__":
+    asyncio.run(main())