|
|
@@ -1,91 +1,93 @@
|
|
|
-# rblidar.py
|
|
|
-import ctypes
|
|
|
-import logging
|
|
|
-import numpy as np
|
|
|
-import matplotlib.pyplot as plt
|
|
|
-
|
|
|
-
|
|
|
-def plot_point_cloud(point_data_array, filename='point_cloud.png'):
|
|
|
- # 提取点数据的坐标
|
|
|
- x = point_data_array['dist'] * np.cos(np.deg2rad(point_data_array['azimuth']/100))/1000
|
|
|
- y = point_data_array['dist'] * np.sin(np.deg2rad(point_data_array['azimuth']/100))/1000
|
|
|
-
|
|
|
- # logging.info(point_data_array['dist'][1000:1200])
|
|
|
- logging.info(y[1000:1200])
|
|
|
-
|
|
|
- # 绘制点云
|
|
|
- fig, ax = plt.subplots(figsize=(8, 8))
|
|
|
- scatter = ax.scatter(x, y, s=2, c=point_data_array['rssi'], cmap='viridis')
|
|
|
- ax.set_xlim([-2, 2])
|
|
|
- ax.set_ylim([-2, 2])
|
|
|
- ax.set_aspect('equal')
|
|
|
- ax.set_title('Point Cloud')
|
|
|
- ax.set_xlabel('X (m)')
|
|
|
- ax.set_ylabel('Y (m)')
|
|
|
- cbar = fig.colorbar(scatter, ax=ax)
|
|
|
- cbar.set_label('RSSI')
|
|
|
-
|
|
|
- # 保存为 PNG 文件
|
|
|
- plt.savefig(filename, dpi=300) # dpi 可以根据需要调整
|
|
|
- plt.close(fig) # 关闭图形,释放内存
|
|
|
-
|
|
|
-class point_data_t(ctypes.Structure):
|
|
|
- _pack_ = 1 # 设置对齐方式为 1 字节
|
|
|
- _fields_ = [
|
|
|
- ("azimuth", ctypes.c_uint16),
|
|
|
- ("dist", ctypes.c_uint16),
|
|
|
- ("rssi", ctypes.c_uint16),
|
|
|
- ("timestamp", ctypes.c_uint32)
|
|
|
- ]
|
|
|
-
|
|
|
-
|
|
|
-# 创建与结构体相对应的 numpy dtype
|
|
|
-point_data_dtype = np.dtype([
|
|
|
- ('azimuth', np.uint16),
|
|
|
- ('dist', np.uint16),
|
|
|
- ('rssi', np.uint16),
|
|
|
- ('timestamp', np.uint32)
|
|
|
-])
|
|
|
-
|
|
|
-# 加载C库
|
|
|
-rblidar_lib = ctypes.CDLL('./rb_lidar.so') # 确保路径正确
|
|
|
-
|
|
|
-# 定义回调函数类型
|
|
|
-CALLBACK_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int)
|
|
|
-
|
|
|
-def my_callback(data, length):
|
|
|
- logging.info("Received length: %d", length)
|
|
|
- byte_data = ctypes.string_at(data, length)
|
|
|
-
|
|
|
- # 将接收到的字节数组转换为 NumPy 数组
|
|
|
- point_data_array = np.frombuffer(byte_data, dtype=point_data_dtype)
|
|
|
-
|
|
|
-
|
|
|
- # 绘制点云
|
|
|
- plot_point_cloud(point_data_array)
|
|
|
-
|
|
|
- # 在这里添加您的处理逻辑
|
|
|
- # 例如,您可以将数据存储到文件或数据库中
|
|
|
-
|
|
|
-class RBLidar:
|
|
|
- def __init__(self, ip: str, port: int):
|
|
|
- # 初始化日志记录
|
|
|
- logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
|
|
|
-
|
|
|
- # 创建回调函数
|
|
|
- self.callback = CALLBACK_TYPE(my_callback)
|
|
|
- # 创建C库中的RBLidar实例
|
|
|
- self.lidar = rblidar_lib.rblidar_create(ip.encode('utf-8'), port, self.callback)
|
|
|
-
|
|
|
- def __del__(self):
|
|
|
- # 清理资源
|
|
|
- rblidar_lib.rblidar_destroy(self.lidar)
|
|
|
-
|
|
|
-# 示例使用
|
|
|
-if __name__ == "__main__":
|
|
|
- lidar = RBLidar("192.168.8.1", 2368) # 替换为实际的IP和端口
|
|
|
- try:
|
|
|
- while True:
|
|
|
- pass # 持续运行以接收数据
|
|
|
- except KeyboardInterrupt:
|
|
|
- logging.info("Stopping...")
|
|
|
+# rblidar.py
|
|
|
+import ctypes
|
|
|
+import logging
|
|
|
+import numpy as np
|
|
|
+import matplotlib.pyplot as plt
|
|
|
+
|
|
|
+
|
|
|
+def plot_point_cloud(point_data_array, filename='point_cloud.png'):
|
|
|
+ # 提取点数据的坐标
|
|
|
+ x = point_data_array['dist'] * np.cos(np.deg2rad(point_data_array['azimuth']/100))/1000
|
|
|
+ y = point_data_array['dist'] * np.sin(np.deg2rad(point_data_array['azimuth']/100))/1000
|
|
|
+
|
|
|
+ # logging.info(point_data_array['dist'][1000:1200])
|
|
|
+ logging.info(y[1000:1200])
|
|
|
+
|
|
|
+ # 绘制点云
|
|
|
+ fig, ax = plt.subplots(figsize=(8, 8))
|
|
|
+ scatter = ax.scatter(x, y, s=2, c=point_data_array['rssi'], cmap='viridis')
|
|
|
+ ax.set_xlim([-2, 2])
|
|
|
+ ax.set_ylim([-2, 2])
|
|
|
+ ax.set_aspect('equal')
|
|
|
+ ax.set_title('Point Cloud')
|
|
|
+ ax.set_xlabel('X (m)')
|
|
|
+ ax.set_ylabel('Y (m)')
|
|
|
+ cbar = fig.colorbar(scatter, ax=ax)
|
|
|
+ cbar.set_label('RSSI')
|
|
|
+
|
|
|
+ # 保存为 PNG 文件
|
|
|
+ plt.savefig(filename, dpi=300) # dpi 可以根据需要调整
|
|
|
+ plt.close(fig) # 关闭图形,释放内存
|
|
|
+
|
|
|
+class point_data_t(ctypes.Structure):
|
|
|
+ _pack_ = 1 # 设置对齐方式为 1 字节
|
|
|
+ _fields_ = [
|
|
|
+ ("azimuth", ctypes.c_uint16),
|
|
|
+ ("dist", ctypes.c_uint16),
|
|
|
+ ("rssi", ctypes.c_uint16),
|
|
|
+ ("timestamp", ctypes.c_uint32)
|
|
|
+ ]
|
|
|
+
|
|
|
+
|
|
|
+# 创建与结构体相对应的 numpy dtype
|
|
|
+point_data_dtype = np.dtype([
|
|
|
+ ('azimuth', np.uint16),
|
|
|
+ ('dist', np.uint16),
|
|
|
+ ('rssi', np.uint16),
|
|
|
+ ('timestamp', np.uint32)
|
|
|
+])
|
|
|
+
|
|
|
+# 加载C库
|
|
|
+rblidar_lib = ctypes.CDLL('./rb_lidar.so') # 确保路径正确
|
|
|
+
|
|
|
+# 定义回调函数类型
|
|
|
+CALLBACK_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int)
|
|
|
+
|
|
|
+def my_callback(data, length):
|
|
|
+ logging.info("Received length: %d", length)
|
|
|
+ byte_data = ctypes.string_at(data, length)
|
|
|
+
|
|
|
+ # 将接收到的字节数组转换为 NumPy 数组
|
|
|
+ point_data_array = np.frombuffer(byte_data, dtype=point_data_dtype)
|
|
|
+
|
|
|
+ for point in point_data_array:
|
|
|
+ logging.info(f"Azimuth: {point['azimuth']}, Distance: {point['dist']}, RSSI: {point['rssi']}, Timestamp: {point['timestamp']}")
|
|
|
+
|
|
|
+ # 绘制点云
|
|
|
+ # plot_point_cloud(point_data_array)
|
|
|
+
|
|
|
+ # 在这里添加您的处理逻辑
|
|
|
+ # 例如,您可以将数据存储到文件或数据库中
|
|
|
+
|
|
|
+class RBLidar:
|
|
|
+ def __init__(self, ip: str, port: int):
|
|
|
+ # 初始化日志记录
|
|
|
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
|
|
|
+
|
|
|
+ # 创建回调函数
|
|
|
+ self.callback = CALLBACK_TYPE(my_callback)
|
|
|
+ # 创建C库中的RBLidar实例
|
|
|
+ self.lidar = rblidar_lib.rblidar_create(ip.encode('utf-8'), port, self.callback)
|
|
|
+
|
|
|
+ def __del__(self):
|
|
|
+ # 清理资源
|
|
|
+ rblidar_lib.rblidar_destroy(self.lidar)
|
|
|
+
|
|
|
+# 示例使用
|
|
|
+if __name__ == "__main__":
|
|
|
+ lidar = RBLidar("192.168.8.1", 2368) # 替换为实际的IP和端口
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ pass # 持续运行以接收数据
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ logging.info("Stopping...")
|