|
@@ -47,17 +47,17 @@ point_data_dtype = np.dtype([
|
|
|
('timestamp', np.uint32)
|
|
('timestamp', np.uint32)
|
|
|
])
|
|
])
|
|
|
|
|
|
|
|
-# 加载C库
|
|
|
|
|
-rblidar_lib = ctypes.CDLL('./rb_lidar.so') # 确保路径正确
|
|
|
|
|
|
|
|
|
|
-# 定义回调函数类型
|
|
|
|
|
|
|
+rblidar_lib = ctypes.CDLL('./rb_lidar.so')
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
CALLBACK_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int)
|
|
CALLBACK_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int)
|
|
|
|
|
|
|
|
def my_callback(data, length):
|
|
def my_callback(data, length):
|
|
|
logging.info("Received length: %d", length)
|
|
logging.info("Received length: %d", length)
|
|
|
byte_data = ctypes.string_at(data, length)
|
|
byte_data = ctypes.string_at(data, length)
|
|
|
|
|
|
|
|
- # 将接收到的字节数组转换为 NumPy 数组
|
|
|
|
|
|
|
+
|
|
|
point_data_array = np.frombuffer(byte_data, dtype=point_data_dtype)
|
|
point_data_array = np.frombuffer(byte_data, dtype=point_data_dtype)
|
|
|
|
|
|
|
|
for point in point_data_array:
|
|
for point in point_data_array:
|
|
@@ -66,28 +66,34 @@ def my_callback(data, length):
|
|
|
# 绘制点云
|
|
# 绘制点云
|
|
|
# plot_point_cloud(point_data_array)
|
|
# plot_point_cloud(point_data_array)
|
|
|
|
|
|
|
|
- # 在这里添加您的处理逻辑
|
|
|
|
|
- # 例如,您可以将数据存储到文件或数据库中
|
|
|
|
|
|
|
|
|
|
class RBLidar:
|
|
class RBLidar:
|
|
|
- def __init__(self, ip: str, port: int):
|
|
|
|
|
- # 初始化日志记录
|
|
|
|
|
|
|
+ def __init__(self, ip: str, port: int, frame_callback=None):
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
|
|
|
|
|
|
|
|
- # 创建回调函数
|
|
|
|
|
- self.callback = CALLBACK_TYPE(my_callback)
|
|
|
|
|
- # 创建C库中的RBLidar实例
|
|
|
|
|
|
|
+ # self.callback = CALLBACK_TYPE(my_callback)
|
|
|
|
|
+
|
|
|
|
|
+ self.frame_callback = frame_callback
|
|
|
|
|
+ self.callback = CALLBACK_TYPE(self._callback_wrapper)
|
|
|
|
|
+
|
|
|
self.lidar = rblidar_lib.rblidar_create(ip.encode('utf-8'), port, self.callback)
|
|
self.lidar = rblidar_lib.rblidar_create(ip.encode('utf-8'), port, self.callback)
|
|
|
|
|
|
|
|
|
|
+ def _callback_wrapper(self,data,length):
|
|
|
|
|
+ my_callback(data, length)
|
|
|
|
|
+
|
|
|
|
|
+ if self.frame_callback:
|
|
|
|
|
+ self.frame_callback(data,length)
|
|
|
|
|
+
|
|
|
def __del__(self):
|
|
def __del__(self):
|
|
|
- # 清理资源
|
|
|
|
|
rblidar_lib.rblidar_destroy(self.lidar)
|
|
rblidar_lib.rblidar_destroy(self.lidar)
|
|
|
|
|
|
|
|
-# 示例使用
|
|
|
|
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
- lidar = RBLidar("192.168.8.1", 2368) # 替换为实际的IP和端口
|
|
|
|
|
|
|
+ def my_frame_callback(data, length):
|
|
|
|
|
+ logging.info("Frame callback processing data")
|
|
|
|
|
+ lidar = RBLidar("192.168.8.1", 2368,frame_callback=my_frame_callback)
|
|
|
try:
|
|
try:
|
|
|
while True:
|
|
while True:
|
|
|
- pass # 持续运行以接收数据
|
|
|
|
|
|
|
+ pass
|
|
|
except KeyboardInterrupt:
|
|
except KeyboardInterrupt:
|
|
|
logging.info("Stopping...")
|
|
logging.info("Stopping...")
|