rblidar.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. # rblidar.py
  2. import ctypes
  3. import logging
  4. import numpy as np
  5. import matplotlib.pyplot as plt
  6. def plot_point_cloud(point_data_array, filename='point_cloud.png'):
  7. # 提取点数据的坐标
  8. x = point_data_array['dist'] * np.cos(np.deg2rad(point_data_array['azimuth']/100))/1000
  9. y = point_data_array['dist'] * np.sin(np.deg2rad(point_data_array['azimuth']/100))/1000
  10. # logging.info(point_data_array['dist'][1000:1200])
  11. logging.info(y[1000:1200])
  12. # 绘制点云
  13. fig, ax = plt.subplots(figsize=(8, 8))
  14. scatter = ax.scatter(x, y, s=2, c=point_data_array['rssi'], cmap='viridis')
  15. ax.set_xlim([-2, 2])
  16. ax.set_ylim([-2, 2])
  17. ax.set_aspect('equal')
  18. ax.set_title('Point Cloud')
  19. ax.set_xlabel('X (m)')
  20. ax.set_ylabel('Y (m)')
  21. cbar = fig.colorbar(scatter, ax=ax)
  22. cbar.set_label('RSSI')
  23. # 保存为 PNG 文件
  24. plt.savefig(filename, dpi=300) # dpi 可以根据需要调整
  25. plt.close(fig) # 关闭图形,释放内存
  26. class point_data_t(ctypes.Structure):
  27. _pack_ = 1 # 设置对齐方式为 1 字节
  28. _fields_ = [
  29. ("azimuth", ctypes.c_uint16),
  30. ("dist", ctypes.c_uint16),
  31. ("rssi", ctypes.c_uint16),
  32. ("timestamp", ctypes.c_uint32)
  33. ]
  34. # 创建与结构体相对应的 numpy dtype
  35. point_data_dtype = np.dtype([
  36. ('azimuth', np.uint16),
  37. ('dist', np.uint16),
  38. ('rssi', np.uint16),
  39. ('timestamp', np.uint32)
  40. ])
  41. # 加载C库
  42. rblidar_lib = ctypes.CDLL('./rb_lidar.so') # 确保路径正确
  43. # 定义回调函数类型
  44. CALLBACK_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int)
  45. def my_callback(data, length):
  46. logging.info("Received length: %d", length)
  47. byte_data = ctypes.string_at(data, length)
  48. # 将接收到的字节数组转换为 NumPy 数组
  49. point_data_array = np.frombuffer(byte_data, dtype=point_data_dtype)
  50. for point in point_data_array:
  51. logging.info(f"Azimuth: {point['azimuth']}, Distance: {point['dist']}, RSSI: {point['rssi']}, Timestamp: {point['timestamp']}")
  52. # 绘制点云
  53. # plot_point_cloud(point_data_array)
  54. # 在这里添加您的处理逻辑
  55. # 例如,您可以将数据存储到文件或数据库中
  56. class RBLidar:
  57. def __init__(self, ip: str, port: int):
  58. # 初始化日志记录
  59. logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
  60. # 创建回调函数
  61. self.callback = CALLBACK_TYPE(my_callback)
  62. # 创建C库中的RBLidar实例
  63. self.lidar = rblidar_lib.rblidar_create(ip.encode('utf-8'), port, self.callback)
  64. def __del__(self):
  65. # 清理资源
  66. rblidar_lib.rblidar_destroy(self.lidar)
  67. # 示例使用
  68. if __name__ == "__main__":
  69. lidar = RBLidar("192.168.8.1", 2368) # 替换为实际的IP和端口
  70. try:
  71. while True:
  72. pass # 持续运行以接收数据
  73. except KeyboardInterrupt:
  74. logging.info("Stopping...")