|
@@ -1,108 +1,108 @@
|
|
|
-import socket
|
|
|
|
|
-import threading
|
|
|
|
|
-import ctypes
|
|
|
|
|
-import queue
|
|
|
|
|
-import logging
|
|
|
|
|
-
|
|
|
|
|
-# 配置日志
|
|
|
|
|
-logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
-
|
|
|
|
|
-# 定义深度数据结构
|
|
|
|
|
-class Depth_t(ctypes.Structure):
|
|
|
|
|
- _fields_ = [
|
|
|
|
|
- ("distance0", ctypes.c_uint16), # 深度数据
|
|
|
|
|
- ("rssi0", ctypes.c_uint8), # 回波强度
|
|
|
|
|
- ("distance1", ctypes.c_uint16), # 深度数据
|
|
|
|
|
- ("rssi1", ctypes.c_uint8) # 回波强度
|
|
|
|
|
- ]
|
|
|
|
|
-
|
|
|
|
|
-# 定义小包结构体
|
|
|
|
|
-class LittlePacket(ctypes.Structure):
|
|
|
|
|
- _fields_ = [
|
|
|
|
|
- ("azimuth", ctypes.c_uint16), # 测距数据的角度值
|
|
|
|
|
- ("depth", Depth_t * 16), # 深度数据点位包
|
|
|
|
|
- ("timestamp", ctypes.c_uint32), # 数据头
|
|
|
|
|
- ]
|
|
|
|
|
-
|
|
|
|
|
-# 创建两个共享的队列作为双缓冲区
|
|
|
|
|
-buffer1 = queue.Queue(maxsize=1440)
|
|
|
|
|
-buffer2 = queue.Queue(maxsize=1440)
|
|
|
|
|
-
|
|
|
|
|
-# 创建一个事件对象
|
|
|
|
|
-zeroFoundEvent = threading.Event()
|
|
|
|
|
-currentBuffer = 1 # 指示当前使用的缓冲区
|
|
|
|
|
-
|
|
|
|
|
-def parse_msop_packet(data):
|
|
|
|
|
- global currentBuffer
|
|
|
|
|
-
|
|
|
|
|
- # 提取数据块
|
|
|
|
|
- for i in range(12): # 假设有12个数据块
|
|
|
|
|
- start = i * 100 # 每个数据块100字节
|
|
|
|
|
- end = start + 100
|
|
|
|
|
- data_block = data[start:end]
|
|
|
|
|
-
|
|
|
|
|
- # 检查标志位
|
|
|
|
|
- if data_block[0:2] == b'\xff\xee':
|
|
|
|
|
- little_packet = LittlePacket()
|
|
|
|
|
- little_packet.azimuth = int.from_bytes(data_block[2:4], byteorder='little')
|
|
|
|
|
- little_packet.timestamp = int.from_bytes(data_block[1201:1205], byteorder='little')
|
|
|
|
|
-
|
|
|
|
|
- # 提取16个测距数据
|
|
|
|
|
- for j in range(16):
|
|
|
|
|
- depth_data = Depth_t()
|
|
|
|
|
- depth_data.distance0 = int.from_bytes(data_block[4 + j * 6: 4 + j * 6 + 2], byteorder='little')
|
|
|
|
|
- depth_data.rssi0 = int.from_bytes(data_block[4 + j * 6 + 2: 4 + j * 6 + 3], byteorder='little')
|
|
|
|
|
- depth_data.distance1 = int.from_bytes(data_block[4 + j * 6 + 3: 4 + j * 6 + 5], byteorder='little')
|
|
|
|
|
- depth_data.rssi1 = int.from_bytes(data_block[4 + j * 6 + 5: 4 + j * 6 + 6], byteorder='little')
|
|
|
|
|
- little_packet.depth[j] = depth_data
|
|
|
|
|
-
|
|
|
|
|
- # 如果发现 0 角度数据,设置事件并切换缓冲区
|
|
|
|
|
- if little_packet.azimuth == 0:
|
|
|
|
|
- currentBuffer = 2 if currentBuffer == 1 else 1 # 切换到另一个缓冲区
|
|
|
|
|
- logging.debug(f"切换到缓冲区 {currentBuffer}, 缓冲区1大小 {buffer1.qsize()}, 缓冲区2大小 {buffer2.qsize()}")
|
|
|
|
|
- zeroFoundEvent.set()
|
|
|
|
|
-
|
|
|
|
|
- # 将当前包加入到当前缓冲区
|
|
|
|
|
- if currentBuffer == 1:
|
|
|
|
|
- buffer1.put(little_packet)
|
|
|
|
|
- else:
|
|
|
|
|
- buffer2.put(little_packet)
|
|
|
|
|
-
|
|
|
|
|
-def read_buffer():
|
|
|
|
|
- while True:
|
|
|
|
|
- if zeroFoundEvent.is_set(): # 如果发现 0 角度数据
|
|
|
|
|
- # 处理当前缓冲区的数据
|
|
|
|
|
- if currentBuffer == 2:
|
|
|
|
|
- while not buffer1.empty():
|
|
|
|
|
- little_packet = buffer1.get()
|
|
|
|
|
- logging.debug(f"Consumer from buffer1: azimuth: {little_packet.azimuth}")
|
|
|
|
|
- else:
|
|
|
|
|
- while not buffer2.empty():
|
|
|
|
|
- little_packet = buffer2.get()
|
|
|
|
|
- logging.debug(f"Consumer from buffer2: azimuth: {little_packet.azimuth}")
|
|
|
|
|
-
|
|
|
|
|
- zeroFoundEvent.clear() # 清除事件
|
|
|
|
|
-
|
|
|
|
|
-def receive_udp_data():
|
|
|
|
|
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
- sock.bind(('192.168.8.1', 2368))
|
|
|
|
|
-
|
|
|
|
|
- while True:
|
|
|
|
|
- try:
|
|
|
|
|
- data, addr = sock.recvfrom(1206) # 读取UDP数据包
|
|
|
|
|
- parse_msop_packet(data)
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logging.error(f"Error receiving data: {e}")
|
|
|
|
|
-
|
|
|
|
|
-receive_thread = threading.Thread(target=receive_udp_data)
|
|
|
|
|
-read_thread = threading.Thread(target=read_buffer)
|
|
|
|
|
-
|
|
|
|
|
-receive_thread.start()
|
|
|
|
|
-read_thread.start()
|
|
|
|
|
-
|
|
|
|
|
-# 主线程可以继续执行其他任务
|
|
|
|
|
-try:
|
|
|
|
|
- while True:
|
|
|
|
|
- pass
|
|
|
|
|
-except KeyboardInterrupt:
|
|
|
|
|
- print("Exiting...")
|
|
|
|
|
|
|
+import socket
|
|
|
|
|
+import threading
|
|
|
|
|
+import ctypes
|
|
|
|
|
+import queue
|
|
|
|
|
+import logging
|
|
|
|
|
+
|
|
|
|
|
+# 配置日志
|
|
|
|
|
+logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
+
|
|
|
|
|
+# 定义深度数据结构
|
|
|
|
|
+class Depth_t(ctypes.Structure):
|
|
|
|
|
+ _fields_ = [
|
|
|
|
|
+ ("distance0", ctypes.c_uint16), # 深度数据
|
|
|
|
|
+ ("rssi0", ctypes.c_uint8), # 回波强度
|
|
|
|
|
+ ("distance1", ctypes.c_uint16), # 深度数据
|
|
|
|
|
+ ("rssi1", ctypes.c_uint8) # 回波强度
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+# 定义小包结构体
|
|
|
|
|
+class LittlePacket(ctypes.Structure):
|
|
|
|
|
+ _fields_ = [
|
|
|
|
|
+ ("azimuth", ctypes.c_uint16), # 测距数据的角度值
|
|
|
|
|
+ ("depth", Depth_t * 16), # 深度数据点位包
|
|
|
|
|
+ ("timestamp", ctypes.c_uint32), # 数据头
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+# 创建两个共享的队列作为双缓冲区
|
|
|
|
|
+buffer1 = queue.Queue(maxsize=1440)
|
|
|
|
|
+buffer2 = queue.Queue(maxsize=1440)
|
|
|
|
|
+
|
|
|
|
|
+# 创建一个事件对象
|
|
|
|
|
+zeroFoundEvent = threading.Event()
|
|
|
|
|
+currentBuffer = 1 # 指示当前使用的缓冲区
|
|
|
|
|
+
|
|
|
|
|
+def parse_msop_packet(data):
|
|
|
|
|
+ global currentBuffer
|
|
|
|
|
+
|
|
|
|
|
+ # 提取数据块
|
|
|
|
|
+ for i in range(12): # 假设有12个数据块
|
|
|
|
|
+ start = i * 100 # 每个数据块100字节
|
|
|
|
|
+ end = start + 100
|
|
|
|
|
+ data_block = data[start:end]
|
|
|
|
|
+
|
|
|
|
|
+ # 检查标志位
|
|
|
|
|
+ if data_block[0:2] == b'\xff\xee':
|
|
|
|
|
+ little_packet = LittlePacket()
|
|
|
|
|
+ little_packet.azimuth = int.from_bytes(data_block[2:4], byteorder='little')
|
|
|
|
|
+ little_packet.timestamp = int.from_bytes(data_block[1201:1205], byteorder='little')
|
|
|
|
|
+
|
|
|
|
|
+ # 提取16个测距数据
|
|
|
|
|
+ for j in range(16):
|
|
|
|
|
+ depth_data = Depth_t()
|
|
|
|
|
+ depth_data.distance0 = int.from_bytes(data_block[4 + j * 6: 4 + j * 6 + 2], byteorder='little')
|
|
|
|
|
+ depth_data.rssi0 = int.from_bytes(data_block[4 + j * 6 + 2: 4 + j * 6 + 3], byteorder='little')
|
|
|
|
|
+ depth_data.distance1 = int.from_bytes(data_block[4 + j * 6 + 3: 4 + j * 6 + 5], byteorder='little')
|
|
|
|
|
+ depth_data.rssi1 = int.from_bytes(data_block[4 + j * 6 + 5: 4 + j * 6 + 6], byteorder='little')
|
|
|
|
|
+ little_packet.depth[j] = depth_data
|
|
|
|
|
+
|
|
|
|
|
+ # 如果发现 0 角度数据,设置事件并切换缓冲区
|
|
|
|
|
+ if little_packet.azimuth == 0:
|
|
|
|
|
+ currentBuffer = 2 if currentBuffer == 1 else 1 # 切换到另一个缓冲区
|
|
|
|
|
+ logging.debug(f"切换到缓冲区 {currentBuffer}, 缓冲区1大小 {buffer1.qsize()}, 缓冲区2大小 {buffer2.qsize()}")
|
|
|
|
|
+ zeroFoundEvent.set()
|
|
|
|
|
+
|
|
|
|
|
+ # 将当前包加入到当前缓冲区
|
|
|
|
|
+ if currentBuffer == 1:
|
|
|
|
|
+ buffer1.put(little_packet)
|
|
|
|
|
+ else:
|
|
|
|
|
+ buffer2.put(little_packet)
|
|
|
|
|
+
|
|
|
|
|
+def read_buffer():
|
|
|
|
|
+ while True:
|
|
|
|
|
+ if zeroFoundEvent.is_set(): # 如果发现 0 角度数据
|
|
|
|
|
+ # 处理当前缓冲区的数据
|
|
|
|
|
+ if currentBuffer == 2:
|
|
|
|
|
+ while not buffer1.empty():
|
|
|
|
|
+ little_packet = buffer1.get()
|
|
|
|
|
+ logging.debug(f"Consumer from buffer1: azimuth: {little_packet.azimuth}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ while not buffer2.empty():
|
|
|
|
|
+ little_packet = buffer2.get()
|
|
|
|
|
+ logging.debug(f"Consumer from buffer2: azimuth: {little_packet.azimuth}")
|
|
|
|
|
+
|
|
|
|
|
+ zeroFoundEvent.clear() # 清除事件
|
|
|
|
|
+
|
|
|
|
|
+def receive_udp_data():
|
|
|
|
|
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
+ sock.bind(('192.168.8.1', 2368))
|
|
|
|
|
+
|
|
|
|
|
+ while True:
|
|
|
|
|
+ try:
|
|
|
|
|
+ data, addr = sock.recvfrom(1206) # 读取UDP数据包
|
|
|
|
|
+ parse_msop_packet(data)
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logging.error(f"Error receiving data: {e}")
|
|
|
|
|
+
|
|
|
|
|
+receive_thread = threading.Thread(target=receive_udp_data)
|
|
|
|
|
+read_thread = threading.Thread(target=read_buffer)
|
|
|
|
|
+
|
|
|
|
|
+receive_thread.start()
|
|
|
|
|
+read_thread.start()
|
|
|
|
|
+
|
|
|
|
|
+# 主线程可以继续执行其他任务
|
|
|
|
|
+try:
|
|
|
|
|
+ while True:
|
|
|
|
|
+ pass
|
|
|
|
|
+except KeyboardInterrupt:
|
|
|
|
|
+ print("Exiting...")
|