rblidar.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # rblidar.py
  2. import ctypes
  3. import logging
  4. import numpy as np
  5. import matplotlib.pyplot as plt
  6. def plot_point_cloud(point_data_array, filename='point_cloud.png'):
  7. # 提取点数据的坐标
  8. x = point_data_array['dist'] * np.cos(np.deg2rad(point_data_array['azimuth']/100))/1000
  9. y = point_data_array['dist'] * np.sin(np.deg2rad(point_data_array['azimuth']/100))/1000
  10. # logging.info(point_data_array['dist'][1000:1200])
  11. logging.info(y[1000:1200])
  12. # 绘制点云
  13. fig, ax = plt.subplots(figsize=(8, 8))
  14. scatter = ax.scatter(x, y, s=2, c=point_data_array['rssi'], cmap='viridis')
  15. ax.set_xlim([-2, 2])
  16. ax.set_ylim([-2, 2])
  17. ax.set_aspect('equal')
  18. ax.set_title('Point Cloud')
  19. ax.set_xlabel('X (m)')
  20. ax.set_ylabel('Y (m)')
  21. cbar = fig.colorbar(scatter, ax=ax)
  22. cbar.set_label('RSSI')
  23. # 保存为 PNG 文件
  24. plt.savefig(filename, dpi=300) # dpi 可以根据需要调整
  25. plt.close(fig) # 关闭图形,释放内存
  26. class point_data_t(ctypes.Structure):
  27. _pack_ = 1 # 设置对齐方式为 1 字节
  28. _fields_ = [
  29. ("azimuth", ctypes.c_uint16),
  30. ("dist", ctypes.c_uint16),
  31. ("rssi", ctypes.c_uint16),
  32. ("timestamp", ctypes.c_uint32)
  33. ]
  34. # 创建与结构体相对应的 numpy dtype
  35. point_data_dtype = np.dtype([
  36. ('azimuth', np.uint16),
  37. ('dist', np.uint16),
  38. ('rssi', np.uint16),
  39. ('timestamp', np.uint32)
  40. ])
  41. rblidar_lib = ctypes.CDLL('./rb_lidar.so')
  42. CALLBACK_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int)
  43. def my_callback(data, length):
  44. logging.info("Received length: %d", length)
  45. byte_data = ctypes.string_at(data, length)
  46. point_data_array = np.frombuffer(byte_data, dtype=point_data_dtype)
  47. for point in point_data_array:
  48. logging.info(f"Azimuth: {point['azimuth']}, Distance: {point['dist']}, RSSI: {point['rssi']}, Timestamp: {point['timestamp']}")
  49. # 绘制点云
  50. # plot_point_cloud(point_data_array)
  51. class RBLidar:
  52. def __init__(self, ip: str, port: int, frame_callback=None):
  53. logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
  54. # self.callback = CALLBACK_TYPE(my_callback)
  55. self.frame_callback = frame_callback
  56. self.callback = CALLBACK_TYPE(self._callback_wrapper)
  57. self.lidar = rblidar_lib.rblidar_create(ip.encode('utf-8'), port, self.callback)
  58. def _callback_wrapper(self,data,length):
  59. my_callback(data, length)
  60. if self.frame_callback:
  61. self.frame_callback(data,length)
  62. def __del__(self):
  63. rblidar_lib.rblidar_destroy(self.lidar)
  64. if __name__ == "__main__":
  65. def my_frame_callback(data, length):
  66. logging.info("Frame callback processing data")
  67. lidar = RBLidar("192.168.8.1", 2368,frame_callback=my_frame_callback)
  68. try:
  69. while True:
  70. pass
  71. except KeyboardInterrupt:
  72. logging.info("Stopping...")