| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- # rblidar.py
- import ctypes
- import logging
- import numpy as np
- import matplotlib.pyplot as plt
- def plot_point_cloud(point_data_array, filename='point_cloud.png'):
- # 提取点数据的坐标
- x = point_data_array['dist'] * np.cos(np.deg2rad(point_data_array['azimuth']/100))/1000
- y = point_data_array['dist'] * np.sin(np.deg2rad(point_data_array['azimuth']/100))/1000
- # logging.info(point_data_array['dist'][1000:1200])
- logging.info(y[1000:1200])
- # 绘制点云
- fig, ax = plt.subplots(figsize=(8, 8))
- scatter = ax.scatter(x, y, s=2, c=point_data_array['rssi'], cmap='viridis')
- ax.set_xlim([-2, 2])
- ax.set_ylim([-2, 2])
- ax.set_aspect('equal')
- ax.set_title('Point Cloud')
- ax.set_xlabel('X (m)')
- ax.set_ylabel('Y (m)')
- cbar = fig.colorbar(scatter, ax=ax)
- cbar.set_label('RSSI')
- # 保存为 PNG 文件
- plt.savefig(filename, dpi=300) # dpi 可以根据需要调整
- plt.close(fig) # 关闭图形,释放内存
- class point_data_t(ctypes.Structure):
- _pack_ = 1 # 设置对齐方式为 1 字节
- _fields_ = [
- ("azimuth", ctypes.c_uint16),
- ("dist", ctypes.c_uint16),
- ("rssi", ctypes.c_uint16),
- ("timestamp", ctypes.c_uint32)
- ]
- # 创建与结构体相对应的 numpy dtype
- point_data_dtype = np.dtype([
- ('azimuth', np.uint16),
- ('dist', np.uint16),
- ('rssi', np.uint16),
- ('timestamp', np.uint32)
- ])
- # 加载C库
- rblidar_lib = ctypes.CDLL('./rb_lidar.so') # 确保路径正确
- # 定义回调函数类型
- CALLBACK_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int)
- def my_callback(data, length):
- logging.info("Received length: %d", length)
- byte_data = ctypes.string_at(data, length)
- # 将接收到的字节数组转换为 NumPy 数组
- point_data_array = np.frombuffer(byte_data, dtype=point_data_dtype)
- # 绘制点云
- plot_point_cloud(point_data_array)
- # 在这里添加您的处理逻辑
- # 例如,您可以将数据存储到文件或数据库中
- class RBLidar:
- def __init__(self, ip: str, port: int):
- # 初始化日志记录
- logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
- # 创建回调函数
- self.callback = CALLBACK_TYPE(my_callback)
- # 创建C库中的RBLidar实例
- self.lidar = rblidar_lib.rblidar_create(ip.encode('utf-8'), port, self.callback)
- def __del__(self):
- # 清理资源
- rblidar_lib.rblidar_destroy(self.lidar)
- # 示例使用
- if __name__ == "__main__":
- lidar = RBLidar("192.168.8.1", 2368) # 替换为实际的IP和端口
- try:
- while True:
- pass # 持续运行以接收数据
- except KeyboardInterrupt:
- logging.info("Stopping...")
|