main.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import socket
  2. import threading
  3. import ctypes
  4. import queue
  5. import logging
  6. # 配置日志
  7. logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
  8. # 定义深度数据结构
  9. class Depth_t(ctypes.Structure):
  10. _fields_ = [
  11. ("distance0", ctypes.c_uint16), # 深度数据
  12. ("rssi0", ctypes.c_uint8), # 回波强度
  13. ("distance1", ctypes.c_uint16), # 深度数据
  14. ("rssi1", ctypes.c_uint8) # 回波强度
  15. ]
  16. # 定义小包结构体
  17. class LittlePacket(ctypes.Structure):
  18. _fields_ = [
  19. ("azimuth", ctypes.c_uint16), # 测距数据的角度值
  20. ("depth", Depth_t * 16), # 深度数据点位包
  21. ("timestamp", ctypes.c_uint32), # 数据头
  22. ]
  23. # 创建两个共享的队列作为双缓冲区
  24. buffer1 = queue.Queue(maxsize=1440)
  25. buffer2 = queue.Queue(maxsize=1440)
  26. # 创建一个事件对象
  27. zeroFoundEvent = threading.Event()
  28. currentBuffer = 1 # 指示当前使用的缓冲区
  29. def parse_msop_packet(data):
  30. global currentBuffer
  31. # 提取数据块
  32. for i in range(12): # 假设有12个数据块
  33. start = i * 100 # 每个数据块100字节
  34. end = start + 100
  35. data_block = data[start:end]
  36. # 检查标志位
  37. if data_block[0:2] == b'\xff\xee':
  38. little_packet = LittlePacket()
  39. little_packet.azimuth = int.from_bytes(data_block[2:4], byteorder='little')
  40. little_packet.timestamp = int.from_bytes(data_block[1201:1205], byteorder='little')
  41. # 提取16个测距数据
  42. for j in range(16):
  43. depth_data = Depth_t()
  44. depth_data.distance0 = int.from_bytes(data_block[4 + j * 6: 4 + j * 6 + 2], byteorder='little')
  45. depth_data.rssi0 = int.from_bytes(data_block[4 + j * 6 + 2: 4 + j * 6 + 3], byteorder='little')
  46. depth_data.distance1 = int.from_bytes(data_block[4 + j * 6 + 3: 4 + j * 6 + 5], byteorder='little')
  47. depth_data.rssi1 = int.from_bytes(data_block[4 + j * 6 + 5: 4 + j * 6 + 6], byteorder='little')
  48. little_packet.depth[j] = depth_data
  49. # 如果发现 0 角度数据,设置事件并切换缓冲区
  50. if little_packet.azimuth == 0:
  51. currentBuffer = 2 if currentBuffer == 1 else 1 # 切换到另一个缓冲区
  52. logging.debug(f"切换到缓冲区 {currentBuffer}, 缓冲区1大小 {buffer1.qsize()}, 缓冲区2大小 {buffer2.qsize()}")
  53. zeroFoundEvent.set()
  54. # 将当前包加入到当前缓冲区
  55. if currentBuffer == 1:
  56. buffer1.put(little_packet)
  57. else:
  58. buffer2.put(little_packet)
  59. def read_buffer():
  60. while True:
  61. if zeroFoundEvent.is_set(): # 如果发现 0 角度数据
  62. # 处理当前缓冲区的数据
  63. if currentBuffer == 2:
  64. while not buffer1.empty():
  65. little_packet = buffer1.get()
  66. logging.debug(f"Consumer from buffer1: azimuth: {little_packet.azimuth}")
  67. else:
  68. while not buffer2.empty():
  69. little_packet = buffer2.get()
  70. logging.debug(f"Consumer from buffer2: azimuth: {little_packet.azimuth}")
  71. zeroFoundEvent.clear() # 清除事件
  72. def receive_udp_data():
  73. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  74. sock.bind(('192.168.8.1', 2368))
  75. while True:
  76. try:
  77. data, addr = sock.recvfrom(1206) # 读取UDP数据包
  78. parse_msop_packet(data)
  79. except Exception as e:
  80. logging.error(f"Error receiving data: {e}")
  81. receive_thread = threading.Thread(target=receive_udp_data)
  82. read_thread = threading.Thread(target=read_buffer)
  83. receive_thread.start()
  84. read_thread.start()
  85. # 主线程可以继续执行其他任务
  86. try:
  87. while True:
  88. pass
  89. except KeyboardInterrupt:
  90. print("Exiting...")