import socket import threading import ctypes import queue import logging # 配置日志 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # 定义深度数据结构 class Depth_t(ctypes.Structure): _fields_ = [ ("distance0", ctypes.c_uint16), # 深度数据 ("rssi0", ctypes.c_uint8), # 回波强度 ("distance1", ctypes.c_uint16), # 深度数据 ("rssi1", ctypes.c_uint8) # 回波强度 ] # 定义小包结构体 class LittlePacket(ctypes.Structure): _fields_ = [ ("azimuth", ctypes.c_uint16), # 测距数据的角度值 ("depth", Depth_t * 16), # 深度数据点位包 ("timestamp", ctypes.c_uint32), # 数据头 ] # 创建两个共享的队列作为双缓冲区 buffer1 = queue.Queue(maxsize=1440) buffer2 = queue.Queue(maxsize=1440) # 创建一个事件对象 zeroFoundEvent = threading.Event() currentBuffer = 1 # 指示当前使用的缓冲区 def parse_msop_packet(data): global currentBuffer # 提取数据块 for i in range(12): # 假设有12个数据块 start = i * 100 # 每个数据块100字节 end = start + 100 data_block = data[start:end] # 检查标志位 if data_block[0:2] == b'\xff\xee': little_packet = LittlePacket() little_packet.azimuth = int.from_bytes(data_block[2:4], byteorder='little') little_packet.timestamp = int.from_bytes(data_block[1201:1205], byteorder='little') # 提取16个测距数据 for j in range(16): depth_data = Depth_t() depth_data.distance0 = int.from_bytes(data_block[4 + j * 6: 4 + j * 6 + 2], byteorder='little') depth_data.rssi0 = int.from_bytes(data_block[4 + j * 6 + 2: 4 + j * 6 + 3], byteorder='little') depth_data.distance1 = int.from_bytes(data_block[4 + j * 6 + 3: 4 + j * 6 + 5], byteorder='little') depth_data.rssi1 = int.from_bytes(data_block[4 + j * 6 + 5: 4 + j * 6 + 6], byteorder='little') little_packet.depth[j] = depth_data # 如果发现 0 角度数据,设置事件并切换缓冲区 if little_packet.azimuth == 0: currentBuffer = 2 if currentBuffer == 1 else 1 # 切换到另一个缓冲区 logging.debug(f"切换到缓冲区 {currentBuffer}, 缓冲区1大小 {buffer1.qsize()}, 缓冲区2大小 {buffer2.qsize()}") zeroFoundEvent.set() # 将当前包加入到当前缓冲区 if currentBuffer == 1: buffer1.put(little_packet) else: buffer2.put(little_packet) def read_buffer(): while True: if zeroFoundEvent.is_set(): # 如果发现 0 角度数据 # 处理当前缓冲区的数据 if currentBuffer == 2: while not buffer1.empty(): little_packet = buffer1.get() logging.debug(f"Consumer from buffer1: azimuth: {little_packet.azimuth}") else: while not buffer2.empty(): little_packet = buffer2.get() logging.debug(f"Consumer from buffer2: azimuth: {little_packet.azimuth}") zeroFoundEvent.clear() # 清除事件 def receive_udp_data(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('192.168.8.1', 2368)) while True: try: data, addr = sock.recvfrom(1206) # 读取UDP数据包 parse_msop_packet(data) except Exception as e: logging.error(f"Error receiving data: {e}") receive_thread = threading.Thread(target=receive_udp_data) read_thread = threading.Thread(target=read_buffer) receive_thread.start() read_thread.start() # 主线程可以继续执行其他任务 try: while True: pass except KeyboardInterrupt: print("Exiting...")