| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import socket
- import threading
- import ctypes
- import queue
- import logging
- # 配置日志
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
- # 定义深度数据结构
- class Depth_t(ctypes.Structure):
- _fields_ = [
- ("distance0", ctypes.c_uint16), # 深度数据
- ("rssi0", ctypes.c_uint8), # 回波强度
- ("distance1", ctypes.c_uint16), # 深度数据
- ("rssi1", ctypes.c_uint8) # 回波强度
- ]
- # 定义小包结构体
- class LittlePacket(ctypes.Structure):
- _fields_ = [
- ("azimuth", ctypes.c_uint16), # 测距数据的角度值
- ("depth", Depth_t * 16), # 深度数据点位包
- ("timestamp", ctypes.c_uint32), # 数据头
- ]
- # 创建两个共享的队列作为双缓冲区
- buffer1 = queue.Queue(maxsize=1440)
- buffer2 = queue.Queue(maxsize=1440)
- # 创建一个事件对象
- zeroFoundEvent = threading.Event()
- currentBuffer = 1 # 指示当前使用的缓冲区
- def parse_msop_packet(data):
- global currentBuffer
- # 提取数据块
- for i in range(12): # 假设有12个数据块
- start = i * 100 # 每个数据块100字节
- end = start + 100
- data_block = data[start:end]
- # 检查标志位
- if data_block[0:2] == b'\xff\xee':
- little_packet = LittlePacket()
- little_packet.azimuth = int.from_bytes(data_block[2:4], byteorder='little')
- little_packet.timestamp = int.from_bytes(data_block[1201:1205], byteorder='little')
- # 提取16个测距数据
- for j in range(16):
- depth_data = Depth_t()
- depth_data.distance0 = int.from_bytes(data_block[4 + j * 6: 4 + j * 6 + 2], byteorder='little')
- depth_data.rssi0 = int.from_bytes(data_block[4 + j * 6 + 2: 4 + j * 6 + 3], byteorder='little')
- depth_data.distance1 = int.from_bytes(data_block[4 + j * 6 + 3: 4 + j * 6 + 5], byteorder='little')
- depth_data.rssi1 = int.from_bytes(data_block[4 + j * 6 + 5: 4 + j * 6 + 6], byteorder='little')
- little_packet.depth[j] = depth_data
- # 如果发现 0 角度数据,设置事件并切换缓冲区
- if little_packet.azimuth == 0:
- currentBuffer = 2 if currentBuffer == 1 else 1 # 切换到另一个缓冲区
- logging.debug(f"切换到缓冲区 {currentBuffer}, 缓冲区1大小 {buffer1.qsize()}, 缓冲区2大小 {buffer2.qsize()}")
- zeroFoundEvent.set()
- # 将当前包加入到当前缓冲区
- if currentBuffer == 1:
- buffer1.put(little_packet)
- else:
- buffer2.put(little_packet)
- def read_buffer():
- while True:
- if zeroFoundEvent.is_set(): # 如果发现 0 角度数据
- # 处理当前缓冲区的数据
- if currentBuffer == 2:
- while not buffer1.empty():
- little_packet = buffer1.get()
- logging.debug(f"Consumer from buffer1: azimuth: {little_packet.azimuth}")
- else:
- while not buffer2.empty():
- little_packet = buffer2.get()
- logging.debug(f"Consumer from buffer2: azimuth: {little_packet.azimuth}")
- zeroFoundEvent.clear() # 清除事件
- def receive_udp_data():
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.bind(('192.168.8.1', 2368))
- while True:
- try:
- data, addr = sock.recvfrom(1206) # 读取UDP数据包
- parse_msop_packet(data)
- except Exception as e:
- logging.error(f"Error receiving data: {e}")
- receive_thread = threading.Thread(target=receive_udp_data)
- read_thread = threading.Thread(target=read_buffer)
- receive_thread.start()
- read_thread.start()
- # 主线程可以继续执行其他任务
- try:
- while True:
- pass
- except KeyboardInterrupt:
- print("Exiting...")
|