# rblidar.py import ctypes import logging import numpy as np import matplotlib.pyplot as plt def plot_point_cloud(point_data_array, filename='point_cloud.png'): # 提取点数据的坐标 x = point_data_array['dist'] * np.cos(np.deg2rad(point_data_array['azimuth']/100))/1000 y = point_data_array['dist'] * np.sin(np.deg2rad(point_data_array['azimuth']/100))/1000 # logging.info(point_data_array['dist'][1000:1200]) logging.info(y[1000:1200]) # 绘制点云 fig, ax = plt.subplots(figsize=(8, 8)) scatter = ax.scatter(x, y, s=2, c=point_data_array['rssi'], cmap='viridis') ax.set_xlim([-2, 2]) ax.set_ylim([-2, 2]) ax.set_aspect('equal') ax.set_title('Point Cloud') ax.set_xlabel('X (m)') ax.set_ylabel('Y (m)') cbar = fig.colorbar(scatter, ax=ax) cbar.set_label('RSSI') # 保存为 PNG 文件 plt.savefig(filename, dpi=300) # dpi 可以根据需要调整 plt.close(fig) # 关闭图形,释放内存 class point_data_t(ctypes.Structure): _pack_ = 1 # 设置对齐方式为 1 字节 _fields_ = [ ("azimuth", ctypes.c_uint16), ("dist", ctypes.c_uint16), ("rssi", ctypes.c_uint16), ("timestamp", ctypes.c_uint32) ] # 创建与结构体相对应的 numpy dtype point_data_dtype = np.dtype([ ('azimuth', np.uint16), ('dist', np.uint16), ('rssi', np.uint16), ('timestamp', np.uint32) ]) rblidar_lib = ctypes.CDLL('./rb_lidar.so') CALLBACK_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int) def my_callback(data, length): logging.info("Received length: %d", length) byte_data = ctypes.string_at(data, length) point_data_array = np.frombuffer(byte_data, dtype=point_data_dtype) for point in point_data_array: logging.info(f"Azimuth: {point['azimuth']}, Distance: {point['dist']}, RSSI: {point['rssi']}, Timestamp: {point['timestamp']}") # 绘制点云 # plot_point_cloud(point_data_array) class RBLidar: def __init__(self, ip: str, port: int, frame_callback=None): logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') # self.callback = CALLBACK_TYPE(my_callback) self.frame_callback = frame_callback self.callback = CALLBACK_TYPE(self._callback_wrapper) self.lidar = rblidar_lib.rblidar_create(ip.encode('utf-8'), port, self.callback) def _callback_wrapper(self,data,length): my_callback(data, length) if self.frame_callback: self.frame_callback(data,length) def __del__(self): rblidar_lib.rblidar_destroy(self.lidar) if __name__ == "__main__": def my_frame_callback(data, length): logging.info("Frame callback processing data") lidar = RBLidar("192.168.8.1", 2368,frame_callback=my_frame_callback) try: while True: pass except KeyboardInterrupt: logging.info("Stopping...")