opencv从入门到精通 第六章:3D视觉与多模态感知
常用表示包括:时间表面(Time Surface,记录每个像素最近事件的时间戳)、体素网格(Voxel Grid,将事件离散化为3D时空体素)和事件点云(将每个事件视为四维点t,x,y,p)。对齐策略包括:硬同步(硬件触发)、软同步(时间戳匹配)和异步融合(状态估计补偿)。泊松重建(Poisson Surface Reconstruction)是目前最鲁棒的隐式表面重建方法,通过求解泊松方程,从有
目录
6.2.2 稠密重建:PatchMatch与ColMap接口
6.3.4 事件相机(Event Camera)数据处理入门
6.1 深度相机与点云处理
6.1.1 RGB-D对齐:内外参联合标定与配准
RGB-D相机的核心挑战在于建立彩色图像与深度图像之间的像素级对应关系。这一过程依赖于精确的相机内参(焦距、主点、畸变系数)和外参(旋转和平移变换)标定。联合标定方法通过同时优化可见光相机和深度相机的参数,最小化重投影误差,实现亚像素级的对齐精度。
深度相机通常采用结构光或飞行时间(ToF)技术获取场景几何信息,而彩色相机捕获纹理信息。由于两种传感器物理位置不同,存在视差,必须通过外参标定建立刚性变换矩阵。标定过程中,利用棋盘格或AprilTag等已知几何结构的标定板,同时在两种传感器中检测角点特征,通过最小化三维点云到图像平面的投影误差,求解最优的内外参矩阵。
配准阶段采用基于迭代最近点(ICP)的优化策略,结合颜色一致性约束,进一步细化对齐精度。对于实时应用,采用查找表(LUT)预计算方式,将深度图到彩色图的坐标映射离线计算并存储,在线阶段仅需查表操作即可实现毫秒级的对齐。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: rgbd_calibration_registration.py
Content: RGB-D相机联合标定与配准系统
Usage:
1. 准备标定板图像(推荐棋盘格,pattern_size=(9,6))
2. 同时采集RGB和Depth图像对,保存至./calib_data/rgb/和./calib_data/depth/
3. 运行: python rgbd_calibration_registration.py --mode calibrate --rgb_dir ./calib_data/rgb --depth_dir ./calib_data/depth
4. 标定完成后,运行配准: python rgbd_calibration_registration.py --mode register --calib_file camera_params.yml
Features:
- 支持OpenCV标准棋盘格和ChArUco标定板
- 自动深度图孔洞剔除与边缘优化
- 基于ICP的外参精修
- 实时配准查找表生成
"""
import cv2
import numpy as np
import os
import glob
import argparse
from typing import List, Tuple, Optional
import open3d as o3d
from scipy.optimize import least_squares
class RGBDCalibrator:
"""
RGB-D相机联合标定器
实现可见光相机与深度相机的内外参联合标定
"""
def __init__(self, pattern_size: Tuple[int, int] = (9, 6),
square_size: float = 0.025):
"""
初始化标定参数
Args:
pattern_size: 棋盘格内角点数量 (w, h)
square_size: 棋盘格方格物理尺寸(米)
"""
self.pattern_size = pattern_size
self.square_size = square_size
# 准备世界坐标系下的3D角点模板
self.objp = np.zeros((pattern_size[0] * pattern_size[1], 3), np.float32)
self.objp[:, :2] = np.mgrid[0:pattern_size[0], 0:pattern_size[1]].T.reshape(-1, 2)
self.objp *= square_size
# 存储标定数据
self.rgb_objpoints = [] # 3D世界坐标
self.rgb_imgpoints = [] # 2D图像坐标
self.depth_objpoints = []
self.depth_imgpoints = []
# 相机参数
self.rgb_camera_matrix = None
self.rgb_dist_coeffs = None
self.depth_camera_matrix = None
self.depth_dist_coeffs = None
self.R_stereo = None # 旋转矩阵 (Depth -> RGB)
self.T_stereo = None # 平移向量 (Depth -> RGB)
def find_chessboard_corners(self, rgb_image: np.ndarray,
depth_image: np.ndarray) -> Tuple[bool, np.ndarray, np.ndarray]:
"""
检测棋盘格角点,同时进行RGB和Depth图像的角点提取
Args:
rgb_image: 彩色图像 (H, W, 3)
depth_image: 深度图像 (H, W),单位毫米或米
Returns:
(success, rgb_corners, depth_corners)
"""
gray_rgb = cv2.cvtColor(rgb_image, cv2.COLOR_BGR2GRAY)
# 对深度图进行预处理以辅助角点检测
depth_normalized = cv2.normalize(depth_image, None, 0, 255, cv2.NORM_MINMAX)
depth_normalized = np.uint8(depth_normalized)
# RGB图像角点检测
ret_rgb, corners_rgb = cv2.findChessboardCorners(
gray_rgb, self.pattern_size,
cv2.CALIB_CB_ADAPTIVE_THRESH + cv2.CALIB_CB_NORMALIZE_IMAGE
)
if not ret_rgb:
return False, None, None
# 优化角点位置(亚像素级)
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
corners_rgb = cv2.cornerSubPix(gray_rgb, corners_rgb, (11, 11), (-1, -1), criteria)
# Depth图像角点检测(使用对应区域的深度值)
# 将RGB角点投影到Depth图像(近似),在局部窗口搜索精确角点
corners_depth = self._map_rgb_corners_to_depth(corners_rgb, depth_image)
if corners_depth is None:
return False, None, None
return True, corners_rgb, corners_depth
def _map_rgb_corners_to_depth(self, rgb_corners: np.ndarray,
depth_image: np.ndarray) -> Optional[np.ndarray]:
"""
将RGB图像中的角点映射到深度图像对应位置
利用深度图的几何连续性,在RGB角点周围搜索深度图中的对应角点
"""
corners_depth = rgb_corners.copy()
# 对于每个RGB角点,在深度图的对应位置周围搜索局部极值
for i, corner in enumerate(corners_depth):
x, y = int(corner[0][0]), int(corner[0][1])
# 确保不越界
h, w = depth_image.shape
window_size = 15
x_min, x_max = max(0, x - window_size), min(w, x + window_size)
y_min, y_max = max(0, y - window_size), min(h, y + window_size)
# 提取局部深度区域
local_depth = depth_image[y_min:y_max, x_min:x_max]
# 剔除无效深度值(0或异常值)
valid_mask = (local_depth > 0) & (local_depth < 10000) # 假设单位mm,最大10m
if np.sum(valid_mask) < 10:
return None # 有效深度点不足
# 在有效深度区域寻找角点特征(深度梯度变化)
grad_x = cv2.Sobel(local_depth, cv2.CV_32F, 1, 0, ksize=3)
grad_y = cv2.Sobel(local_depth, cv2.CV_32F, 0, 1, ksize=3)
grad_magnitude = np.sqrt(grad_x**2 + grad_y**2)
# 寻找梯度最大的位置作为角点
max_idx = np.unravel_index(np.argmax(grad_magnitude * valid_mask), grad_magnitude.shape)
corners_depth[i][0][0] = x_min + max_idx[1]
corners_depth[i][0][1] = y_min + max_idx[0]
return corners_depth
def add_calibration_pair(self, rgb_image: np.ndarray, depth_image: np.ndarray) -> bool:
"""
添加一对标定图像
Args:
rgb_image: 彩色图像
depth_image: 深度图像(与RGB对齐的尺寸,或原始尺寸)
Returns:
是否成功检测到角点并添加
"""
# 确保深度图尺寸与RGB一致
if depth_image.shape[:2] != rgb_image.shape[:2]:
depth_image = cv2.resize(depth_image, (rgb_image.shape[1], rgb_image.shape[0]),
interpolation=cv2.INTER_NEAREST)
success, corners_rgb, corners_depth = self.find_chessboard_corners(rgb_image, depth_image)
if success:
self.rgb_objpoints.append(self.objp)
self.rgb_imgpoints.append(corners_rgb)
self.depth_objpoints.append(self.objp)
self.depth_imgpoints.append(corners_depth)
# 可视化检测到的角点
vis_rgb = rgb_image.copy()
cv2.drawChessboardCorners(vis_rgb, self.pattern_size, corners_rgb, success)
cv2.imshow('RGB Corners', vis_rgb)
cv2.waitKey(100)
return success
def calibrate_intrinsics(self) -> Tuple[bool, dict]:
"""
分别标定RGB和Depth相机的内参
"""
if len(self.rgb_objpoints) < 3:
return False, {"error": "至少需要3组标定图像"}
# RGB相机内参标定
ret_rgb, mtx_rgb, dist_rgb, rvecs_rgb, tvecs_rgb = cv2.calibrateCamera(
self.rgb_objpoints, self.rgb_imgpoints,
(640, 480), None, None # 假设标准分辨率,实际应从图像获取
)
# Depth相机内参标定(使用深度图检测的角点)
# 注意:深度图通常分辨率不同,需要获取实际尺寸
ret_depth, mtx_depth, dist_depth, rvecs_depth, tvecs_depth = cv2.calibrateCamera(
self.depth_objpoints, self.depth_imgpoints,
(640, 480), None, None
)
if ret_rgb and ret_depth:
self.rgb_camera_matrix = mtx_rgb
self.rgb_dist_coeffs = dist_rgb
self.depth_camera_matrix = mtx_depth
self.depth_dist_coeffs = dist_depth
# 计算重投影误差评估标定质量
rgb_error = self._compute_reprojection_error(
self.rgb_objpoints, self.rgb_imgpoints, rvecs_rgb, tvecs_rgb, mtx_rgb, dist_rgb
)
depth_error = self._compute_reprojection_error(
self.depth_objpoints, self.depth_imgpoints, rvecs_depth, tvecs_depth, mtx_depth, dist_depth
)
return True, {
"rgb_matrix": mtx_rgb,
"rgb_dist": dist_rgb,
"depth_matrix": mtx_depth,
"depth_dist": dist_depth,
"rgb_reproj_error": rgb_error,
"depth_reproj_error": depth_error
}
return False, {}
def _compute_reprojection_error(self, objpoints, imgpoints, rvecs, tvecs, camera_matrix, dist_coeffs):
"""计算重投影误差"""
total_error = 0
for i in range(len(objpoints)):
imgpoints2, _ = cv2.projectPoints(objpoints[i], rvecs[i], tvecs[i],
camera_matrix, dist_coeffs)
error = cv2.norm(imgpoints[i], imgpoints2, cv2.NORM_L2) / len(imgpoints2)
total_error += error
return total_error / len(objpoints)
def calibrate_stereo(self) -> Tuple[bool, dict]:
"""
立体标定:计算RGB和Depth相机之间的外参(R, T)
"""
if self.rgb_camera_matrix is None:
success, _ = self.calibrate_intrinsics()
if not success:
return False, {"error": "内参标定失败"}
# 立体标定
flags = cv2.CALIB_FIX_INTRINSIC # 固定内参,仅优化外参
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 1e-5)
ret, mtx_rgb, dist_rgb, mtx_depth, dist_depth, R, T, E, F = cv2.stereoCalibrate(
self.rgb_objpoints, self.rgb_imgpoints, self.depth_imgpoints,
self.rgb_camera_matrix, self.rgb_dist_coeffs,
self.depth_camera_matrix, self.depth_dist_coeffs,
(640, 480), # 图像尺寸
criteria=criteria,
flags=flags
)
if ret:
self.R_stereo = R
self.T_stereo = T
return True, {
"R": R,
"T": T,
"E": E,
"F": F,
"stereo_error": ret
}
return False, {}
def refine_extrinsics_with_icp(self, rgb_images: List[np.ndarray],
depth_images: List[np.ndarray]) -> Tuple[bool, np.ndarray, np.ndarray]:
"""
使用ICP算法精修外参
通过将深度图生成的点云与RGB图像重建的点云对齐,优化R和T
"""
if self.R_stereo is None:
return False, None, None
# 收集所有视角的点云
rgb_points_world = []
depth_points_camera = []
for rgb_img, depth_img in zip(rgb_images, depth_images):
# 从RGB图像通过SfM获取稀疏点云(简化版,实际可用COLMAP)
# 这里使用棋盘格3D点作为对应
obj_pts = self.objp
# 将3D点变换到RGB相机坐标系(使用当前外参)
obj_pts_rgb = (self.R_stereo @ obj_pts.T + self.T_stereo).T
# 从深度图直接获取对应点的3D坐标
obj_pts_depth = self._depth_to_3d(depth_img, self.rgb_imgpoints[0]) # 使用第一帧的角点
if obj_pts_depth is not None:
rgb_points_world.extend(obj_pts_rgb)
depth_points_camera.extend(obj_pts_depth)
if len(rgb_points_world) < 10:
return False, None, None
# 转换为Open3D点云格式进行ICP
source = o3d.geometry.PointCloud()
source.points = o3d.utility.Vector3dVector(np.array(depth_points_camera))
target = o3d.geometry.PointCloud()
target.points = o3d.utility.Vector3dVector(np.array(rgb_points_world))
# 执行ICP
threshold = 0.02 # 2cm阈值
trans_init = np.eye(4)
trans_init[:3, :3] = self.R_stereo
trans_init[:3, 3] = self.T_stereo.flatten()
reg_p2p = o3d.pipelines.registration.registration_icp(
source, target, threshold, trans_init,
o3d.pipelines.registration.TransformationEstimationPointToPoint()
)
# 提取优化后的R和T
R_refined = reg_p2p.transformation[:3, :3]
T_refined = reg_p2p.transformation[:3, 3].reshape(3, 1)
return True, R_refined, T_refined
def _depth_to_3d(self, depth_img: np.ndarray, img_points: np.ndarray) -> Optional[np.ndarray]:
"""
将图像坐标和深度值转换为3D点云
"""
if self.depth_camera_matrix is None:
return None
fx = self.depth_camera_matrix[0, 0]
fy = self.depth_camera_matrix[1, 1]
cx = self.depth_camera_matrix[0, 2]
cy = self.depth_camera_matrix[1, 2]
points_3d = []
for pt in img_points:
u, v = int(pt[0][0]), int(pt[0][1])
if 0 <= u < depth_img.shape[1] and 0 <= v < depth_img.shape[0]:
z = depth_img[v, u]
if z > 0:
x = (u - cx) * z / fx
y = (v - cy) * z / fy
points_3d.append([x, y, z])
else:
return None # 无效深度
else:
return None
return np.array(points_3d) if len(points_3d) == len(img_points) else None
def generate_registration_lut(self, image_size: Tuple[int, int] = (640, 480)) -> np.ndarray:
"""
生成查找表(LUT)用于实时配准
将深度图坐标映射到RGB图坐标
"""
if self.R_stereo is None or self.depth_camera_matrix is None:
return None
h, w = image_size
lut = np.zeros((h, w, 2), dtype=np.float32)
# 计算投影矩阵
# P = K_rgb * [R | T] * K_depth^-1
R_T = np.hstack([self.R_stereo, self.T_stereo])
P = self.rgb_camera_matrix @ R_T
# 为每个深度图像素预计算映射
fx_d = self.depth_camera_matrix[0, 0]
fy_d = self.depth_camera_matrix[1, 1]
cx_d = self.depth_camera_matrix[0, 2]
cy_d = self.depth_camera_matrix[1, 2]
for v in range(h):
for u in range(w):
# 归一化坐标
x = (u - cx_d) / fx_d
y = (v - cy_d) / fy_d
# 假设Z=1(单位深度),实际应用时乘以真实深度
X = x
Y = y
Z = 1.0
# 变换到RGB相机坐标系
X_rgb = self.R_stereo[0, 0] * X + self.R_stereo[0, 1] * Y + self.R_stereo[0, 2] * Z + self.T_stereo[0]
Y_rgb = self.R_stereo[1, 0] * X + self.R_stereo[1, 1] * Y + self.R_stereo[1, 2] * Z + self.T_stereo[1]
Z_rgb = self.R_stereo[2, 0] * X + self.R_stereo[2, 1] * Y + self.R_stereo[2, 2] * Z + self.T_stereo[2]
# 投影到RGB图像平面
if Z_rgb > 0:
u_rgb = (self.rgb_camera_matrix[0, 0] * X_rgb / Z_rgb + self.rgb_camera_matrix[0, 2])
v_rgb = (self.rgb_camera_matrix[1, 1] * Y_rgb / Z_rgb + self.rgb_camera_matrix[1, 2])
lut[v, u] = [u_rgb, v_rgb]
else:
lut[v, u] = [-1, -1] # 无效映射
return lut
def save_calibration(self, filename: str):
"""保存标定参数到YAML文件"""
fs = cv2.FileStorage(filename, cv2.FILE_STORAGE_WRITE)
fs.write("rgb_camera_matrix", self.rgb_camera_matrix)
fs.write("rgb_distortion_coefficients", self.rgb_dist_coeffs)
fs.write("depth_camera_matrix", self.depth_camera_matrix)
fs.write("depth_distortion_coefficients", self.depth_dist_coeffs)
fs.write("rotation_matrix", self.R_stereo)
fs.write("translation_vector", self.T_stereo)
fs.release()
print(f"标定参数已保存至: {filename}")
def load_calibration(self, filename: str):
"""从YAML文件加载标定参数"""
fs = cv2.FileStorage(filename, cv2.FILE_STORAGE_READ)
self.rgb_camera_matrix = fs.getNode("rgb_camera_matrix").mat()
self.rgb_dist_coeffs = fs.getNode("rgb_distortion_coefficients").mat()
self.depth_camera_matrix = fs.getNode("depth_camera_matrix").mat()
self.depth_dist_coeffs = fs.getNode("depth_distortion_coefficients").mat()
self.R_stereo = fs.getNode("rotation_matrix").mat()
self.T_stereo = fs.getNode("translation_vector").mat()
fs.release()
class RGBDRegister:
"""
RGB-D实时配准器
使用预计算的LUT实现高效的深度-彩色对齐
"""
def __init__(self, calib_file: str):
"""
初始化配准器
Args:
calib_file: 标定参数YAML文件路径
"""
self.calibrator = RGBDCalibrator()
self.calibrator.load_calibration(calib_file)
# 预计算查找表
self.lut = self.calibrator.generate_registration_lut()
self.lut_ready = self.lut is not None
if not self.lut_ready:
print("警告: LUT生成失败,将使用在线重投影")
def register(self, rgb_image: np.ndarray, depth_image: np.ndarray,
depth_scale: float = 1.0) -> Tuple[np.ndarray, np.ndarray]:
"""
对齐RGB和深度图像
Args:
rgb_image: 彩色图像 (H, W, 3)
depth_image: 深度图像 (H, W),单位取决于depth_scale
depth_scale: 深度单位缩放因子(如1000表示mm转m)
Returns:
(aligned_depth, colored_point_cloud)
aligned_depth: 与RGB对齐的深度图
colored_point_cloud: (N, 6)数组,[x,y,z,r,g,b]
"""
if self.lut_ready:
aligned_depth = self._apply_lut(depth_image, depth_scale)
else:
aligned_depth = self._project_depth_online(rgb_image, depth_image, depth_scale)
# 生成彩色点云
colored_pcd = self._create_colored_point_cloud(rgb_image, aligned_depth)
return aligned_depth, colored_pcd
def _apply_lut(self, depth_image: np.ndarray, depth_scale: float) -> np.ndarray:
"""使用查找表快速映射深度图到RGB图像空间"""
h, w = depth_image.shape
aligned_depth = np.zeros((h, w), dtype=np.float32)
# 向量化操作加速
u_coords = self.lut[:, :, 0]
v_coords = self.lut[:, :, 1]
# 仅处理有效映射
valid_mask = (u_coords >= 0) & (u_coords < w) & (v_coords >= 0) & (v_coords < h)
# 双线性插值获取深度值
valid_u = u_coords[valid_mask].astype(np.int32)
valid_v = v_coords[valid_mask].astype(np.int32)
# 简单最近邻插值(可优化为双线性)
aligned_depth[valid_v, valid_u] = depth_image[valid_mask] * depth_scale
return aligned_depth
def _project_depth_online(self, rgb_image: np.ndarray, depth_image: np.ndarray,
depth_scale: float) -> np.ndarray:
"""在线重投影(无LUT时使用)"""
h, w = rgb_image.shape[:2]
aligned_depth = np.zeros((h, w), dtype=np.float32)
# 创建点云
fx_d = self.calibrator.depth_camera_matrix[0, 0]
fy_d = self.calibrator.depth_camera_matrix[1, 1]
cx_d = self.calibrator.depth_camera_matrix[0, 2]
cy_d = self.calibrator.depth_camera_matrix[1, 2]
# 反向映射:从RGB像素查询对应深度
# 遍历深度图像素,投影到RGB
for v in range(depth_image.shape[0]):
for u in range(depth_image.shape[1]):
z = depth_image[v, u] * depth_scale
if z <= 0:
continue
# 深度相机坐标系下的3D点
x = (u - cx_d) * z / fx_d
y = (v - cy_d) * z / fy_d
# 变换到RGB相机坐标系
point = np.array([x, y, z])
point_rgb = self.calibrator.R_stereo @ point + self.calibrator.T_stereo.flatten()
# 投影到RGB图像
if point_rgb[2] > 0:
u_rgb = int((point_rgb[0] * self.calibrator.rgb_camera_matrix[0, 0] / point_rgb[2]) +
self.calibrator.rgb_camera_matrix[0, 2])
v_rgb = int((point_rgb[1] * self.calibrator.rgb_camera_matrix[1, 1] / point_rgb[2]) +
self.calibrator.rgb_camera_matrix[1, 2])
if 0 <= u_rgb < w and 0 <= v_rgb < h:
# Z-buffer处理:保留最近深度
if aligned_depth[v_rgb, u_rgb] == 0 or z < aligned_depth[v_rgb, u_rgb]:
aligned_depth[v_rgb, u_rgb] = z
return aligned_depth
def _create_colored_point_cloud(self, rgb_image: np.ndarray,
aligned_depth: np.ndarray) -> np.ndarray:
"""从对齐后的图像生成彩色点云"""
h, w = aligned_depth.shape
# 生成像素网格
u, v = np.meshgrid(np.arange(w), np.arange(h))
# 反投影到3D空间
fx = self.calibrator.rgb_camera_matrix[0, 0]
fy = self.calibrator.rgb_camera_matrix[1, 1]
cx = self.calibrator.rgb_camera_matrix[0, 2]
cy = self.calibrator.rgb_camera_matrix[1, 2]
z = aligned_depth
x = (u - cx) * z / fx
y = (v - cy) * z / fy
# 仅保留有效深度点
valid_mask = z > 0
points = np.stack([x[valid_mask], y[valid_mask], z[valid_mask]], axis=-1)
colors = rgb_image[valid_mask] / 255.0
return np.hstack([points, colors])
def main():
parser = argparse.ArgumentParser(description='RGB-D相机标定与配准工具')
parser.add_argument('--mode', choices=['calibrate', 'register'], required=True,
help='运行模式:标定或配准')
parser.add_argument('--rgb_dir', type=str, help='RGB图像目录')
parser.add_argument('--depth_dir', type=str, help='深度图像目录')
parser.add_argument('--calib_file', type=str, default='camera_params.yml',
help='标定参数文件路径')
parser.add_argument('--output_dir', type=str, default='./output',
help='输出目录')
args = parser.parse_args()
if args.mode == 'calibrate':
# 执行标定流程
calibrator = RGBDCalibrator(pattern_size=(9, 6), square_size=0.025)
# 加载图像对
rgb_files = sorted(glob.glob(os.path.join(args.rgb_dir, '*.png')))
depth_files = sorted(glob.glob(os.path.join(args.depth_dir, '*.png')))
print(f"找到 {len(rgb_files)} 对图像")
for rgb_file, depth_file in zip(rgb_files, depth_files):
rgb_img = cv2.imread(rgb_file)
depth_img = cv2.imread(depth_file, cv2.IMREAD_ANYDEPTH)
if rgb_img is None or depth_img is None:
continue
success = calibrator.add_calibration_pair(rgb_img, depth_img)
if success:
print(f"成功处理: {os.path.basename(rgb_file)}")
# 执行标定
print("\n执行内参标定...")
success, intrinsics = calibrator.calibrate_intrinsics()
if success:
print(f"RGB重投影误差: {intrinsics['rgb_reproj_error']:.4f}")
print(f"Depth重投影误差: {intrinsics['depth_reproj_error']:.4f}")
print("\n执行立体标定...")
success, stereo = calibrator.calibrate_stereo()
if success:
print(f"立体标定误差: {stereo['stereo_error']:.4f}")
print(f"平移向量: {stereo['T'].flatten()}")
# 保存结果
calibrator.save_calibration(args.calib_file)
# 精修外参(可选)
# calibrator.refine_extrinsics_with_icp(rgb_images, depth_images)
cv2.destroyAllWindows()
elif args.mode == 'register':
# 执行实时配准
register = RGBDRegister(args.calib_file)
# 示例:处理单个图像对
test_rgb = cv2.imread(os.path.join(args.rgb_dir, 'test_rgb.png'))
test_depth = cv2.imread(os.path.join(args.depth_dir, 'test_depth.png'), cv2.IMREAD_ANYDEPTH)
if test_rgb is not None and test_depth is not None:
aligned_depth, colored_pcd = register.register(test_rgb, test_depth, depth_scale=0.001)
# 可视化结果
depth_colored = cv2.applyColorMap(
cv2.normalize(aligned_depth, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8),
cv2.COLORMAP_JET
)
combined = np.hstack([test_rgb, depth_colored])
cv2.imshow('RGB-D Registration', combined)
cv2.waitKey(0)
# 保存点云
np.save(os.path.join(args.output_dir, 'colored_pointcloud.npy'), colored_pcd)
print(f"彩色点云已保存,共 {len(colored_pcd)} 个点")
if __name__ == '__main__':
main()
6.1.2 点云基础:PCL与OpenCV点云格式互转
点云数据是三维视觉的核心表示形式,不同库(PCL、Open3D、OpenCV)采用不同的内存布局和数据结构。PCL使用模板化的PointCloud类,支持XYZ、XYZRGB、XYZRGBA、XYZI等多种点类型,数据以连续内存块存储,便于SSE优化。Open3D采用更现代的Tensor-based设计,支持GPU加速。OpenCV的Mat结构虽然通用,但在点云处理中缺乏语义信息。
格式互转的关键在于理解各库的内存对齐方式和坐标系约定。PCL默认使用右手坐标系(X右,Y前,Z上),而OpenCV图像坐标系为(X右,Y下)。转换时需特别注意颜色通道的排列顺序(BGR vs RGB)和浮点精度(float32 vs float64)。
高效转换策略包括零拷贝内存映射(当内存布局兼容时)和批量向量化操作。对于大规模点云(百万级点),采用分块转换策略可避免内存峰值占用。此外,利用Open3D的VoxelDownFilter进行预处理,可显著降低后续处理的计算负载。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: pointcloud_format_converter.py
Content: PCL、Open3D与OpenCV点云格式高效互转工具
Usage:
1. 安装依赖: pip install open3d pclpy numpy opencv-python
2. 转换PCD到PLY: python pointcloud_format_converter.py --input cloud.pcd --output cloud.ply --format ply
3. 格式验证: python pointcloud_format_converter.py --verify --input cloud.pcd
4. 批量转换: python pointcloud_format_converter.py --batch --input_dir ./pcd_files --output_dir ./ply_files
Features:
- 支持PCD、PLY、XYZ、LAS格式互转
- 零拷贝内存映射(兼容布局时)
- 批量并行转换
- 点云统计与质量验证
"""
import numpy as np
import cv2
import open3d as o3d
import argparse
import os
import struct
from typing import Union, List, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
import warnings
# 尝试导入pclpy(PCL的Python绑定)
try:
import pclpy
from pclpy import pcl
PCL_AVAILABLE = True
except ImportError:
PCL_AVAILABLE = False
warnings.warn("pclpy未安装,PCL格式支持受限。使用pip install pclpy安装完整功能。")
@dataclass
class PointCloudMetadata:
"""点云元数据结构"""
num_points: int
has_colors: bool
has_normals: bool
has_intensity: bool
bounds: Tuple[np.ndarray, np.ndarray] # min, max
density: float # 点每立方米
class PointCloudConverter:
"""
点云格式转换器
实现PCL、Open3D、NumPy/OpenCV格式的高效互转
"""
SUPPORTED_FORMATS = ['pcd', 'ply', 'xyz', 'xyzrgb', 'xyzn', 'las', 'npz', 'bin']
def __init__(self):
self.conversion_stats = {
'converted': 0,
'errors': 0,
'time_elapsed': 0.0
}
def load(self, filepath: str) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
"""
加载点云文件,返回(points, colors, normals)
Args:
filepath: 点云文件路径
Returns:
points: (N, 3) float32数组
colors: (N, 3) uint8数组或None
normals: (N, 3) float32数组或None
"""
ext = Path(filepath).suffix.lower()
if ext == '.pcd':
return self._load_pcd(filepath)
elif ext == '.ply':
return self._load_ply(filepath)
elif ext == '.xyz' or ext == '.txt':
return self._load_xyz(filepath)
elif ext == '.npz':
return self._load_npz(filepath)
elif ext == '.bin':
return self._load_kitti_bin(filepath)
else:
# 尝试使用Open3D作为通用加载器
return self._load_with_open3d(filepath)
def _load_pcd(self, filepath: str) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
"""加载PCD文件(PCL格式)"""
if PCL_AVAILABLE:
# 使用pclpy加载
cloud = pclpy.pcl.PointCloud.PointXYZRGB()
reader = pclpy.pcl.io.PCDReader()
reader.read(filepath, cloud)
points = np.array([[p.x, p.y, p.z] for p in cloud.points], dtype=np.float32)
# 提取RGB颜色(PCL将RGB打包为float)
colors = None
if hasattr(cloud.points[0], 'rgb'):
colors = np.array([
[(p.rgb >> 16) & 0xFF, (p.rgb >> 8) & 0xFF, p.rgb & 0xFF]
for p in cloud.points
], dtype=np.uint8)
return points, colors, None
else:
# 回退到Open3D
pcd = o3d.io.read_point_cloud(filepath)
return self._open3d_to_numpy(pcd)
def _load_ply(self, filepath: str) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
"""加载PLY文件"""
pcd = o3d.io.read_point_cloud(filepath)
return self._open3d_to_numpy(pcd)
def _load_xyz(self, filepath: str) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
"""加载纯文本XYZ文件"""
data = np.loadtxt(filepath)
if data.shape[1] >= 3:
points = data[:, :3].astype(np.float32)
colors = data[:, 3:6].astype(np.uint8) if data.shape[1] >= 6 else None
normals = data[:, 6:9].astype(np.float32) if data.shape[1] >= 9 else None
return points, colors, normals
return data.astype(np.float32), None, None
def _load_npz(self, filepath: str) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
"""加载NPZ压缩格式(自定义格式)"""
data = np.load(filepath)
points = data['points']
colors = data.get('colors', None)
normals = data.get('normals', None)
return points, colors, normals
def _load_kitti_bin(self, filepath: str) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
"""
加载KITTI点云bin文件(自动驾驶数据集格式)
每点包含[x,y,z,intensity],存储为float32二进制
"""
points = np.fromfile(filepath, dtype=np.float32).reshape(-1, 4)
xyz = points[:, :3]
intensity = points[:, 3:4]
# 将intensity映射为灰度颜色
colors = np.clip(intensity * 255, 0, 255).astype(np.uint8)
colors = np.repeat(colors, 3, axis=1) # 转为RGB
return xyz, colors, None
def _load_with_open3d(self, filepath: str) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
"""使用Open3D作为通用加载器"""
pcd = o3d.io.read_point_cloud(filepath)
if len(pcd.points) == 0:
raise ValueError(f"无法加载文件: {filepath}")
return self._open3d_to_numpy(pcd)
def _open3d_to_numpy(self, pcd: o3d.geometry.PointCloud) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]:
"""Open3D点云转NumPy数组"""
points = np.asarray(pcd.points, dtype=np.float32)
colors = None
if pcd.has_colors():
# Open3D存储为float64 [0,1],转换为uint8 [0,255]
colors = (np.asarray(pcd.colors) * 255).astype(np.uint8)
# RGB转BGR(OpenCV默认)
colors = colors[:, ::-1]
normals = np.asarray(pcd.normals, dtype=np.float32) if pcd.has_normals() else None
return points, colors, normals
def save(self, filepath: str, points: np.ndarray,
colors: Optional[np.ndarray] = None,
normals: Optional[np.ndarray] = None,
binary: bool = True):
"""
保存点云到文件
Args:
filepath: 输出路径
points: (N, 3) float32点坐标
colors: (N, 3) uint8颜色(BGR顺序)
normals: (N, 3) float32法向量
binary: 是否使用二进制格式(更小更快)
"""
ext = Path(filepath).suffix.lower()
if ext == '.pcd':
self._save_pcd(filepath, points, colors, normals, binary)
elif ext == '.ply':
self._save_ply(filepath, points, colors, normals, binary)
elif ext == '.xyz':
self._save_xyz(filepath, points, colors, normals)
elif ext == '.npz':
self._save_npz(filepath, points, colors, normals)
elif ext == '.bin':
self._save_kitti_bin(filepath, points, colors)
else:
# 默认使用PLY
self._save_ply(filepath, points, colors, normals, binary)
def _save_pcd(self, filepath: str, points: np.ndarray,
colors: Optional[np.ndarray], normals: Optional[np.ndarray],
binary: bool):
"""保存为PCD格式(PCL兼容)"""
if PCL_AVAILABLE:
# 使用PCL保存以获得最大兼容性
if colors is not None:
cloud = pclpy.pcl.PointCloud.PointXYZRGB()
cloud.resize(points.shape[0])
for i, (pt, color) in enumerate(zip(points, colors)):
cloud.points[i].x, cloud.points[i].y, cloud.points[i].z = pt
# BGR转RGB打包
rgb = (int(color[2]) << 16) | (int(color[1]) << 8) | int(color[0])
cloud.points[i].rgb = rgb
else:
cloud = pclpy.pcl.PointCloud.PointXYZ()
cloud.from_array(points.astype(np.float32))
writer = pclpy.pcl.io.PCDWriter()
writer.write(filepath, cloud, binary)
else:
# 使用Open3D保存PCD
pcd = self._numpy_to_open3d(points, colors, normals)
o3d.io.write_point_cloud(filepath, pcd, write_ascii=not binary)
def _save_ply(self, filepath: str, points: np.ndarray,
colors: Optional[np.ndarray], normals: Optional[np.ndarray],
binary: bool):
"""保存为PLY格式"""
pcd = self._numpy_to_open3d(points, colors, normals)
o3d.io.write_point_cloud(filepath, pcd, write_ascii=not binary)
def _save_xyz(self, filepath: str, points: np.ndarray,
colors: Optional[np.ndarray], normals: Optional[np.ndarray]):
"""保存为纯文本XYZ格式"""
data = [points]
header = "x y z"
if colors is not None:
data.append(colors.astype(np.float32))
header += " r g b"
if normals is not None:
data.append(normals)
header += " nx ny nz"
combined = np.hstack(data)
np.savetxt(filepath, combined, header=header, comments='')
def _save_npz(self, filepath: str, points: np.ndarray,
colors: Optional[np.ndarray], normals: Optional[np.ndarray]):
"""保存为NPZ压缩格式(高效存储)"""
save_dict = {'points': points}
if colors is not None:
save_dict['colors'] = colors
if normals is not None:
save_dict['normals'] = normals
np.savez_compressed(filepath, **save_dict)
def _save_kitti_bin(self, filepath: str, points: np.ndarray,
colors: Optional[np.ndarray]):
"""保存为KITTI bin格式(用于自动驾驶)"""
# 将颜色转换为intensity(取平均)
if colors is not None:
intensity = colors.mean(axis=1, keepdims=True) / 255.0
else:
intensity = np.zeros((points.shape[0], 1), dtype=np.float32)
data = np.hstack([points.astype(np.float32), intensity.astype(np.float32)])
data.tofile(filepath)
def _numpy_to_open3d(self, points: np.ndarray,
colors: Optional[np.ndarray],
normals: Optional[np.ndarray]) -> o3d.geometry.PointCloud:
"""NumPy数组转Open3D点云"""
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points.astype(np.float64))
if colors is not None:
# BGR转RGB并归一化到[0,1]
rgb = colors[:, ::-1] / 255.0
pcd.colors = o3d.utility.Vector3dVector(rgb.astype(np.float64))
if normals is not None:
pcd.normals = o3d.utility.Vector3dVector(normals.astype(np.float64))
return pcd
def convert(self, input_path: str, output_path: str,
voxel_size: Optional[float] = None) -> bool:
"""
执行格式转换,可选体素降采样
Args:
input_path: 输入文件路径
output_path: 输出文件路径
voxel_size: 体素大小(用于降采样,None表示不降采样)
Returns:
转换是否成功
"""
try:
# 加载
points, colors, normals = self.load(input_path)
# 降采样(如果指定)
if voxel_size is not None and voxel_size > 0:
points, colors, normals = self._voxel_downsample(
points, colors, normals, voxel_size
)
# 保存
self.save(output_path, points, colors, normals)
self.conversion_stats['converted'] += 1
return True
except Exception as e:
print(f"转换失败 {input_path}: {str(e)}")
self.conversion_stats['errors'] += 1
return False
def _voxel_downsample(self, points: np.ndarray, colors: Optional[np.ndarray],
normals: Optional[np.ndarray], voxel_size: float) -> Tuple:
"""使用Open3D进行体素降采样"""
pcd = self._numpy_to_open3d(points, colors, normals)
down_pcd = pcd.voxel_down_sample(voxel_size)
return self._open3d_to_numpy(down_pcd)
def batch_convert(self, input_dir: str, output_dir: str,
input_format: str, output_format: str,
voxel_size: Optional[float] = None,
parallel: bool = True):
"""
批量转换目录中的点云文件
Args:
input_dir: 输入目录
output_dir: 输出目录
input_format: 输入格式扩展名(如'pcd')
output_format: 输出格式扩展名(如'ply')
voxel_size: 降采样体素大小
parallel: 是否并行处理(使用多进程)
"""
import multiprocessing as mp
from functools import partial
input_files = list(Path(input_dir).glob(f'*.{input_format}'))
if not input_files:
print(f"未找到 .{input_format} 文件")
return
os.makedirs(output_dir, exist_ok=True)
convert_func = partial(self._convert_worker,
output_dir=output_dir,
output_format=output_format,
voxel_size=voxel_size)
if parallel and len(input_files) > 1:
with mp.Pool(processes=mp.cpu_count()) as pool:
pool.map(convert_func, input_files)
else:
for f in input_files:
convert_func(f)
def _convert_worker(self, input_file: Path, output_dir: str,
output_format: str, voxel_size: Optional[float]):
"""多进程工作函数"""
output_file = Path(output_dir) / f"{input_file.stem}.{output_format}"
success = self.convert(str(input_file), str(output_file), voxel_size)
if success:
print(f"✓ {input_file.name} -> {output_file.name}")
def analyze(self, filepath: str) -> PointCloudMetadata:
"""
分析点云统计信息
Args:
filepath: 点云文件路径
Returns:
PointCloudMetadata结构
"""
points, colors, normals = self.load(filepath)
# 计算边界框
min_bounds = points.min(axis=0)
max_bounds = points.max(axis=0)
dimensions = max_bounds - min_bounds
volume = np.prod(dimensions)
density = len(points) / volume if volume > 0 else 0
return PointCloudMetadata(
num_points=len(points),
has_colors=colors is not None,
has_normals=normals is not None,
has_intensity=points.shape[1] > 3 if len(points) > 0 else False,
bounds=(min_bounds, max_bounds),
density=density
)
def to_opencv_mat(self, points: np.ndarray, colors: Optional[np.ndarray] = None) -> cv2.Mat:
"""
转换为OpenCV Mat格式(用于传统CV算法)
创建(N, 6)矩阵:[x,y,z,b,g,r]
"""
if colors is not None:
# 确保颜色是uint8
if colors.dtype != np.uint8:
colors = (colors * 255).astype(np.uint8) if colors.max() <= 1.0 else colors.astype(np.uint8)
data = np.hstack([points.astype(np.float32), colors])
else:
data = points.astype(np.float32)
return cv2.Mat(data)
def main():
parser = argparse.ArgumentParser(description='点云格式转换工具')
parser.add_argument('--input', '-i', type=str, help='输入文件路径')
parser.add_argument('--output', '-o', type=str, help='输出文件路径')
parser.add_argument('--format', '-f', type=str, choices=PointCloudConverter.SUPPORTED_FORMATS,
help='目标格式(当输出路径未指定扩展名时使用)')
parser.add_argument('--voxel-size', '-v', type=float, default=None,
help='体素降采样大小(米)')
parser.add_argument('--batch', '-b', action='store_true',
help='批量模式(处理整个目录)')
parser.add_argument('--input_dir', type=str, help='批量输入目录')
parser.add_argument('--output_dir', type=str, help='批量输出目录')
parser.add_argument('--input_format', type=str, help='批量输入格式')
parser.add_argument('--output_format', type=str, help='批量输出格式')
parser.add_argument('--analyze', '-a', action='store_true',
help='分析点云统计信息')
parser.add_argument('--verify', action='store_true',
help='验证文件完整性')
args = parser.parse_args()
converter = PointCloudConverter()
if args.analyze and args.input:
# 分析模式
print(f"分析点云: {args.input}")
meta = converter.analyze(args.input)
print(f"点数: {meta.num_points:,}")
print(f"包含颜色: {meta.has_colors}")
print(f"包含法线: {meta.has_normals}")
print(f"边界框: {meta.bounds[0]} to {meta.bounds[1]}")
print(f"密度: {meta.density:.2f} points/m³")
elif args.batch:
# 批量转换
converter.batch_convert(
args.input_dir, args.output_dir,
args.input_format, args.output_format,
args.voxel_size
)
elif args.input and args.output:
# 单文件转换
success = converter.convert(args.input, args.output, args.voxel_size)
if success:
print(f"转换成功: {args.input} -> {args.output}")
# 显示统计信息
in_meta = converter.analyze(args.input)
out_meta = converter.analyze(args.output)
print(f"原文件点数: {in_meta.num_points:,}")
print(f"输出文件点数: {out_meta.num_points:,}")
if args.voxel_size:
print(f"降采样比率: {out_meta.num_points/in_meta.num_points:.2%}")
else:
parser.print_help()
if __name__ == '__main__':
main()
6.1.3 体素化与八叉树:实时空间占据计算
体素化将连续的三维空间离散化为规则网格,是碰撞检测、路径规划和三维重建的基础操作。均匀体素网格虽然实现简单,但在大规模场景中存在严重的内存效率问题——空区域与 occupied 区域占用同等存储空间。八叉树(Octree)通过自适应空间细分,在稀疏区域使用大单元、密集区域使用小单元,实现内存占用与查询效率的最优平衡。
Open3D的Octree实现支持最大深度16层,对应分辨率可达216 个单元每维度。插入操作从根节点开始,递归检查点是否落在子节点边界内,直到达到最大深度或子节点包含点数低于阈值。查询操作利用轴对齐边界框(AABB)快速剔除无关分支,最近邻搜索时间复杂度为O(logN) 。
实时应用中,体素化面临动态更新的挑战。增量式八叉树支持点的插入和删除,通过维护每个节点的点计数和质心,实现快速更新。对于RGB-D相机流,采用滑动窗口策略,仅更新视锥体内的体素,结合GPU并行计算(CUDA TSDF),可实现30Hz的实时空间占据更新。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: voxelization_octree_occupancy.py
Content: 实时体素化与八叉树空间占据计算系统
Usage:
1. 点云体素化: python voxelization_octree_occupancy.py --mode voxelize --input scene.ply --voxel_size 0.05
2. 八叉树构建: python voxelization_octree_occupancy.py --mode octree --input scene.ply --max_depth 10
3. 实时占据更新: python voxelization_octree_occupancy.py --mode realtime --device 0 --voxel_size 0.02
4. 碰撞检测: python voxelization_octree_occupancy.py --mode collision --octree_file tree.json --query_points queries.npy
Features:
- 高效体素网格生成(支持GPU加速)
- 自适应八叉树构建与动态更新
- 实时RGB-D流处理与TSDF融合
- 快速碰撞检测与最近邻查询
- 空间占据可视化与导出
"""
import numpy as np
import cv2
import open3d as o3d
import argparse
import json
import time
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass, asdict
from collections import deque
import threading
from queue import Queue
@dataclass
class VoxelGridConfig:
"""体素网格配置"""
voxel_size: float = 0.05 # 米
min_bounds: Optional[np.ndarray] = None
max_bounds: Optional[np.ndarray] = None
auto_bounds: bool = True # 自动计算边界
@dataclass
class OctreeConfig:
"""八叉树配置"""
max_depth: int = 10 # 最大深度
min_points_per_node: int = 10 # 节点最小点数(低于则不再细分)
max_points_per_node: int = 1000 # 节点最大点数(超过则强制细分)
class Voxelizer:
"""
高性能体素化器
支持均匀体素网格和TSDF(截断符号距离函数)融合
"""
def __init__(self, config: VoxelGridConfig):
self.config = config
self.voxel_grid = None
self.tsdf_volume = None
def voxelize(self, points: np.ndarray, colors: Optional[np.ndarray] = None) -> o3d.geometry.VoxelGrid:
"""
点云体素化
Args:
points: (N, 3) float32点云
colors: 可选颜色信息
Returns:
Open3D VoxelGrid对象
"""
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points.astype(np.float64))
if colors is not None:
pcd.colors = o3d.utility.Vector3dVector(colors.astype(np.float64) / 255.0)
# 自动计算边界
if self.config.auto_bounds:
bounds = pcd.get_axis_aligned_bounding_box()
self.config.min_bounds = np.array(bounds.min_bound)
self.config.max_bounds = np.array(bounds.max_bound)
# 创建体素网格
self.voxel_grid = o3d.geometry.VoxelGrid.create_from_point_cloud(
pcd, voxel_size=self.config.voxel_size
)
return self.voxel_grid
def create_tsdf_volume(self, volume_size: float = 3.0,
resolution: int = 512) -> o3d.pipelines.integration.ScalableTSDFVolume:
"""
创建TSDF体积(用于实时三维重建)
Args:
volume_size: 物理体积大小(米)
resolution: 体素分辨率
Returns:
TSDF体积对象
"""
voxel_length = volume_size / resolution
self.tsdf_volume = o3d.pipelines.integration.ScalableTSDFVolume(
voxel_length=voxel_length,
sdf_trunc=0.04, # 截断距离
color_type=o3d.pipelines.integration.TSDFVolumeColorType.RGB8
)
return self.tsdf_volume
def integrate_rgbd_frame(self, color_image: np.ndarray,
depth_image: np.ndarray,
camera_intrinsics: np.ndarray,
camera_extrinsics: np.ndarray):
"""
融合单帧RGB-D图像到TSDF体积
Args:
color_image: (H, W, 3) uint8彩色图
depth_image: (H, W) float32深度图(米)
camera_intrinsics: (3, 3) 内参矩阵
camera_extrinsics: (4, 4) 外参矩阵(世界到相机)
"""
if self.tsdf_volume is None:
raise RuntimeError("TSDF volume not initialized")
# 创建RGBD图像
color_o3d = o3d.geometry.Image(color_image)
depth_o3d = o3d.geometry.Image(depth_image.astype(np.float32))
rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(
color_o3d, depth_o3d,
depth_scale=1.0, # 深度已为米
depth_trunc=3.0, # 最大深度3米
convert_rgb_to_intensity=False
)
# 设置相机参数
intrinsic_o3d = o3d.camera.PinholeCameraIntrinsic(
width=color_image.shape[1],
height=color_image.shape[0],
fx=camera_intrinsics[0, 0],
fy=camera_intrinsics[1, 1],
cx=camera_intrinsics[0, 2],
cy=camera_intrinsics[1, 2]
)
# 融合到TSDF
self.tsdf_volume.integrate(rgbd, intrinsic_o3d, camera_extrinsics)
def extract_mesh_from_tsdf(self) -> o3d.geometry.TriangleMesh:
"""从TSDF体积提取三角网格(Marching Cubes算法)"""
if self.tsdf_volume is None:
raise RuntimeError("TSDF volume is empty")
return self.tsdf_volume.extract_triangle_mesh()
def extract_point_cloud_from_tsdf(self) -> o3d.geometry.PointCloud:
"""从TSDF体积提取点云"""
if self.tsdf_volume is None:
raise RuntimeError("TSDF volume is empty")
return self.tsdf_volume.extract_point_cloud()
class AdaptiveOctree:
"""
自适应八叉树实现
支持动态插入、删除、最近邻搜索和碰撞检测
"""
class Node:
"""八叉树节点"""
__slots__ = ['center', 'size', 'depth', 'points', 'children', 'is_leaf', 'point_indices']
def __init__(self, center: np.ndarray, size: float, depth: int):
self.center = center # 节点中心 (3,)
self.size = size # 节点边长
self.depth = depth # 当前深度
self.points = [] # 存储的点 [(idx, point), ...]
self.children = [] # 八个子节点
self.is_leaf = True # 是否为叶节点
self.point_indices = set() # 点索引集合(用于快速删除)
def __init__(self, config: OctreeConfig, root_center: np.ndarray = None,
root_size: float = 10.0):
"""
初始化八叉树
Args:
config: 八叉树配置
root_center: 根节点中心,默认原点
root_size: 根节点大小(米)
"""
self.config = config
self.root_center = root_center if root_center is not None else np.zeros(3)
self.root_size = root_size
self.root = self.Node(self.root_center, root_size, 0)
self.all_points = [] # 全局点存储
self.point_to_node = {} # 点索引到节点的映射
def insert(self, point: np.ndarray, point_idx: int = None) -> int:
"""
插入点到八叉树
Args:
point: (3,) 三维点坐标
point_idx: 点索引(外部指定,未指定则自动分配)
Returns:
点索引
"""
if point_idx is None:
point_idx = len(self.all_points)
self.all_points.append(point)
else:
while len(self.all_points) <= point_idx:
self.all_points.append(None)
self.all_points[point_idx] = point
self._insert_recursive(self.root, point, point_idx, 0)
return point_idx
def _insert_recursive(self, node: 'AdaptiveOctree.Node',
point: np.ndarray, point_idx: int, depth: int):
"""递归插入"""
# 检查点是否在节点范围内
if not self._point_in_node(point, node):
return
node.point_indices.add(point_idx)
if node.is_leaf:
node.points.append((point_idx, point))
# 检查是否需要细分
if (len(node.points) > self.config.max_points_per_node and
depth < self.config.max_depth):
self._subdivide(node)
else:
# 递归插入子节点
for child in node.children:
if self._point_in_node(point, child):
self._insert_recursive(child, point, point_idx, depth + 1)
break
def _subdivide(self, node: 'AdaptiveOctree.Node'):
"""细分节点为八个子节点"""
if node.depth >= self.config.max_depth:
return
half_size = node.size / 2
quarter_size = node.size / 4
# 创建八个子节点(按象限)
offsets = [
[-1, -1, -1], [1, -1, -1], [-1, 1, -1], [1, 1, -1],
[-1, -1, 1], [1, -1, 1], [-1, 1, 1], [1, 1, 1]
]
for offset in offsets:
child_center = node.center + np.array(offset) * quarter_size
child = self.Node(child_center, half_size, node.depth + 1)
node.children.append(child)
# 将当前点分配到子节点
for p_idx, p in node.points:
if self._point_in_node(p, child):
child.points.append((p_idx, p))
child.point_indices.add(p_idx)
node.is_leaf = False
node.points = [] # 清空当前节点存储的点(已分配到子节点)
def _point_in_node(self, point: np.ndarray, node: 'AdaptiveOctree.Node') -> bool:
"""检查点是否在节点边界内"""
half_size = node.size / 2
return np.all(np.abs(point - node.center) <= half_size)
def delete(self, point_idx: int) -> bool:
"""
删除点
Args:
point_idx: 点索引
Returns:
是否成功删除
"""
if point_idx >= len(self.all_points) or self.all_points[point_idx] is None:
return False
# 找到包含该点的节点
node = self._find_node_containing_point(self.root, point_idx)
if node:
node.point_indices.discard(point_idx)
node.points = [(i, p) for i, p in node.points if i != point_idx]
self.all_points[point_idx] = None
return True
return False
def _find_node_containing_point(self, node: 'AdaptiveOctree.Node',
point_idx: int) -> Optional['AdaptiveOctree.Node']:
"""查找包含指定点的节点"""
if point_idx in node.point_indices:
if node.is_leaf:
return node
for child in node.children:
result = self._find_node_containing_point(child, point_idx)
if result:
return result
return None
def nearest_neighbor(self, query_point: np.ndarray) -> Tuple[int, float]:
"""
最近邻搜索
Args:
query_point: (3,) 查询点
Returns:
(最近点索引, 距离)
"""
best_idx = -1
best_dist = float('inf')
stack = [(self.root, self._min_dist_to_node(query_point, self.root))]
while stack:
node, min_dist = stack.pop()
# 剪枝:如果当前节点最小距离大于已知最佳距离,跳过
if min_dist > best_dist:
continue
if node.is_leaf:
# 在叶节点中线性搜索
for p_idx, p in node.points:
if p_idx >= len(self.all_points) or self.all_points[p_idx] is None:
continue
dist = np.linalg.norm(self.all_points[p_idx] - query_point)
if dist < best_dist:
best_dist = dist
best_idx = p_idx
else:
# 将子节点按距离排序后加入栈(优先搜索近的)
children_with_dist = []
for child in node.children:
d = self._min_dist_to_node(query_point, child)
children_with_dist.append((d, child))
children_with_dist.sort(reverse=True) # 栈是LIFO,所以逆序
for d, child in children_with_dist:
stack.append((child, d))
return best_idx, best_dist
def _min_dist_to_node(self, point: np.ndarray, node: 'AdaptiveOctree.Node') -> float:
"""计算点到节点的最小距离(用于剪枝)"""
half_size = node.size / 2
# 计算点到AABB的距离
closest = np.clip(point, node.center - half_size, node.center + half_size)
return np.linalg.norm(point - closest)
def radius_search(self, query_point: np.ndarray, radius: float) -> List[Tuple[int, float]]:
"""
半径搜索
Args:
query_point: (3,) 查询点
radius: 搜索半径
Returns:
[(点索引, 距离), ...]
"""
results = []
self._radius_search_recursive(self.root, query_point, radius, results)
return results
def _radius_search_recursive(self, node: 'AdaptiveOctree.Node',
query_point: np.ndarray, radius: float,
results: List[Tuple[int, float]]):
"""递归半径搜索"""
min_dist = self._min_dist_to_node(query_point, node)
# 剪枝:如果节点最小距离大于半径,跳过
if min_dist > radius:
return
if node.is_leaf:
for p_idx, p in node.points:
if p_idx >= len(self.all_points) or self.all_points[p_idx] is None:
continue
dist = np.linalg.norm(self.all_points[p_idx] - query_point)
if dist <= radius:
results.append((p_idx, dist))
else:
for child in node.children:
self._radius_search_recursive(child, query_point, radius, results)
def collision_detect(self, object_points: np.ndarray,
object_radius: float = 0.0) -> bool:
"""
碰撞检测
Args:
object_points: (N, 3) 物体点云
object_radius: 物体膨胀半径(用于保守检测)
Returns:
是否发生碰撞
"""
for pt in object_points:
# 最近邻距离小于阈值则碰撞
_, dist = self.nearest_neighbor(pt)
if dist <= object_radius:
return True
return False
def get_occupied_voxels(self) -> np.ndarray:
"""
获取所有被占据的体素中心
Returns:
(M, 3) 体素中心坐标
"""
voxels = []
self._collect_leaf_centers(self.root, voxels)
return np.array(voxels)
def _collect_leaf_centers(self, node: 'AdaptiveOctree.Node', voxels: List[np.ndarray]):
"""收集叶节点中心"""
if node.is_leaf and len(node.points) > 0:
voxels.append(node.center)
else:
for child in node.children:
self._collect_leaf_centers(child, voxels)
def to_dict(self) -> dict:
"""序列化为字典(用于JSON保存)"""
def node_to_dict(node):
return {
'center': node.center.tolist(),
'size': node.size,
'depth': node.depth,
'is_leaf': node.is_leaf,
'point_indices': list(node.point_indices) if node.is_leaf else [],
'children': [node_to_dict(c) for c in node.children] if not node.is_leaf else []
}
return {
'config': {
'max_depth': self.config.max_depth,
'min_points_per_node': self.config.min_points_per_node,
'max_points_per_node': self.config.max_points_per_node
},
'root_center': self.root_center.tolist(),
'root_size': self.root_size,
'tree': node_to_dict(self.root),
'points': [p.tolist() if p is not None else None for p in self.all_points]
}
@classmethod
def from_dict(cls, data: dict) -> 'AdaptiveOctree':
"""从字典反序列化"""
config = OctreeConfig(**data['config'])
tree = cls(config, np.array(data['root_center']), data['root_size'])
# 递归重建树结构
def dict_to_node(node_data, parent=None):
node = cls.Node(np.array(node_data['center']),
node_data['size'],
node_data['depth'])
node.is_leaf = node_data['is_leaf']
node.point_indices = set(node_data.get('point_indices', []))
if not node.is_leaf:
for child_data in node_data['children']:
node.children.append(dict_to_node(child_data, node))
else:
# 恢复点数据
for p_idx in node.point_indices:
if p_idx < len(data['points']) and data['points'][p_idx] is not None:
p = np.array(data['points'][p_idx])
node.points.append((p_idx, p))
return node
tree.root = dict_to_node(data['tree'])
tree.all_points = [np.array(p) if p is not None else None for p in data['points']]
return tree
class RealtimeOccupancyMapper:
"""
实时空间占据映射器
结合体素网格和八叉树,处理RGB-D相机流
"""
def __init__(self, voxel_size: float = 0.05, max_depth: int = 12):
self.voxel_size = voxel_size
self.voxelizer = Voxelizer(VoxelGridConfig(voxel_size=voxel_size))
self.octree = AdaptiveOctree(OctreeConfig(max_depth=max_depth))
self.tsdf_volume = None
# 多线程处理
self.frame_queue = Queue(maxsize=5)
self.processing_thread = None
self.is_running = False
# 统计信息
self.stats = {
'frames_processed': 0,
'points_added': 0,
'processing_time_ms': 0
}
def start(self):
"""启动实时处理线程"""
self.is_running = True
self.processing_thread = threading.Thread(target=self._processing_loop)
self.processing_thread.start()
def stop(self):
"""停止处理"""
self.is_running = False
if self.processing_thread:
self.processing_thread.join()
def _processing_loop(self):
"""后台处理循环"""
while self.is_running:
try:
frame_data = self.frame_queue.get(timeout=0.1)
self._process_frame(frame_data)
except:
continue
def _process_frame(self, frame_data: dict):
"""处理单帧数据"""
start_time = time.time()
color = frame_data['color']
depth = frame_data['depth']
intrinsics = frame_data['intrinsics']
pose = frame_data['pose']
# TSDF融合
if self.tsdf_volume is None:
self.tsdf_volume = self.voxelizer.create_tsdf_volume()
self.voxelizer.integrate_rgbd_frame(color, depth, intrinsics, pose)
# 提取新点并更新八叉树
pcd = self.voxelizer.extract_point_cloud_from_tsdf()
points = np.asarray(pcd.points)
# 批量插入八叉树
for pt in points:
self.octree.insert(pt)
self.stats['points_added'] += 1
self.stats['frames_processed'] += 1
self.stats['processing_time_ms'] = (time.time() - start_time) * 1000
def add_frame(self, color: np.ndarray, depth: np.ndarray,
intrinsics: np.ndarray, pose: np.ndarray):
"""
添加一帧RGB-D数据(线程安全)
Args:
color: (H, W, 3) uint8
depth: (H, W) float32(米)
intrinsics: (3, 3) 内参
pose: (4, 4) 相机位姿(世界到相机)
"""
if not self.frame_queue.full():
self.frame_queue.put({
'color': color,
'depth': depth,
'intrinsics': intrinsics,
'pose': pose
})
def query_occupancy(self, query_points: np.ndarray) -> np.ndarray:
"""
查询空间占据状态
Args:
query_points: (N, 3) 查询点
Returns:
(N,) bool数组,True表示被占据
"""
occupied = np.zeros(len(query_points), dtype=bool)
for i, pt in enumerate(query_points):
_, dist = self.octree.nearest_neighbor(pt)
occupied[i] = (dist < self.voxel_size * 0.5)
return occupied
def get_occupancy_grid(self, bounds: Tuple[np.ndarray, np.ndarray],
resolution: float = None) -> np.ndarray:
"""
获取均匀占据网格(用于路径规划)
Args:
bounds: (min_bound, max_bound) 边界
resolution: 网格分辨率(默认使用体素大小)
Returns:
3D numpy数组,0表示空闲,1表示占据
"""
if resolution is None:
resolution = self.voxel_size
min_b, max_b = bounds
dims = ((max_b - min_b) / resolution).astype(int) + 1
grid = np.zeros(dims, dtype=np.uint8)
# 获取所有占据体素
occupied_voxels = self.octree.get_occupied_voxels()
# 映射到网格索引
for voxel in occupied_voxels:
idx = ((voxel - min_b) / resolution).astype(int)
if np.all(idx >= 0) and np.all(idx < dims):
grid[tuple(idx)] = 1
return grid
def main():
parser = argparse.ArgumentParser(description='体素化与八叉树空间占据计算')
parser.add_argument('--mode', choices=['voxelize', 'octree', 'realtime', 'collision'],
required=True, help='运行模式')
parser.add_argument('--input', '-i', type=str, help='输入点云文件')
parser.add_argument('--output', '-o', type=str, help='输出文件')
parser.add_argument('--voxel_size', '-v', type=float, default=0.05,
help='体素大小(米)')
parser.add_argument('--max_depth', '-d', type=int, default=10,
help='八叉树最大深度')
parser.add_argument('--device', type=int, default=0,
help='相机设备ID(实时模式)')
args = parser.parse_args()
if args.mode == 'voxelize':
# 点云体素化
print(f"加载点云: {args.input}")
pcd = o3d.io.read_point_cloud(args.input)
points = np.asarray(pcd.points)
colors = np.asarray(pcd.colors) if pcd.has_colors() else None
print(f"原始点数: {len(points)}")
voxelizer = Voxelizer(VoxelGridConfig(voxel_size=args.voxel_size))
voxel_grid = voxelizer.voxelize(points, colors)
print(f"体素数量: {len(voxel_grid.get_voxels())}")
# 保存体素网格
o3d.io.write_voxel_grid(args.output, voxel_grid)
print(f"已保存至: {args.output}")
# 可视化
o3d.visualization.draw_geometries([voxel_grid])
elif args.mode == 'octree':
# 构建八叉树
print(f"加载点云: {args.input}")
pcd = o3d.io.read_point_cloud(args.input)
points = np.asarray(pcd.points)
print(f"构建八叉树,最大深度: {args.max_depth}")
config = OctreeConfig(max_depth=args.max_depth)
# 计算合适的根节点大小
bounds = pcd.get_axis_aligned_bounding_box()
center = bounds.get_center()
size = max(bounds.get_extent()) * 1.2 # 留20%余量
octree = AdaptiveOctree(config, center, size)
start_time = time.time()
for i, pt in enumerate(points):
octree.insert(pt, i)
if i % 10000 == 0:
print(f"已插入 {i}/{len(points)} 点")
build_time = time.time() - start_time
print(f"构建完成,耗时: {build_time:.2f}s")
# 保存八叉树
tree_data = octree.to_dict()
with open(args.output, 'w') as f:
json.dump(tree_data, f)
print(f"八叉树已保存至: {args.output}")
# 测试最近邻搜索
query = center + np.random.randn(3) * 0.1
idx, dist = octree.nearest_neighbor(query)
print(f"最近邻测试: 查询点 {query}, 最近点索引 {idx}, 距离 {dist:.4f}m")
elif args.mode == 'realtime':
# 实时RGB-D处理(示例,需连接真实相机)
print("启动实时占据映射...")
mapper = RealtimeOccupancyMapper(voxel_size=args.voxel_size,
max_depth=args.max_depth)
mapper.start()
# 模拟数据(实际应使用RealSense等相机)
try:
while True:
# 这里应替换为真实相机读取
# color, depth = camera.capture()
# intrinsics = camera.get_intrinsics()
# pose = slam.get_pose()
# mapper.add_frame(color, depth, intrinsics, pose)
time.sleep(0.033) # 30fps
print(f"已处理帧: {mapper.stats['frames_processed']}, "
f"平均耗时: {mapper.stats['processing_time_ms']:.1f}ms")
except KeyboardInterrupt:
print("停止实时处理")
mapper.stop()
elif args.mode == 'collision':
# 碰撞检测
print("加载八叉树...")
with open(args.input, 'r') as f:
tree_data = json.load(f)
octree = AdaptiveOctree.from_dict(tree_data)
# 加载查询物体点云
obj_pcd = o3d.io.read_point_cloud(args.output) # 这里output参数复用为查询物体
obj_points = np.asarray(obj_pcd.points)
print(f"执行碰撞检测,物体点数: {len(obj_points)}")
collision = octree.collision_detect(obj_points, object_radius=0.02)
print(f"碰撞检测结果: {'发生碰撞' if collision else '无碰撞'}")
if __name__ == '__main__':
main()
6.1.4 深度图后处理:空洞填充与边缘对齐
消费级深度相机(如RealSense、Kinect)在物体边缘、高光表面和远距离区域常出现深度缺失,表现为图像中的零值空洞。这些空洞破坏了几何连续性,直接影响后续点云配准和表面重建的质量。后处理流程包括空洞检测、填充策略选择和边缘优化三个环节。
空洞检测基于连通域分析,区分小面积噪声空洞与大面积无效区域。填充策略需根据空洞周围的几何上下文选择:小空洞采用中值或高斯滤波;线性边缘空洞使用方向性插值;大面积缺失则依赖颜色引导的深度补全(Color-Guided Depth Completion),利用彩色图像的纹理连续性指导深度恢复。
边缘对齐解决RGB与深度传感器间的视差导致的边缘错位问题。通过联合双边滤波(Joint Bilateral Filter),以彩色图像作为引导图,在保持深度边缘与颜色边缘一致的同时平滑噪声。该滤波器在深度图像的每个像素处,根据颜色相似性和空间距离加权邻域像素的深度值,实现跨模态的边缘保持平滑。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: depth_postprocessing.py
Content: 深度图后处理管线:空洞填充、边缘对齐与噪声抑制
Usage:
1. 单图处理: python depth_postprocessing.py --input depth.png --output processed.png --method fast_marching
2. 批量处理: python depth_postprocessing.py --batch --input_dir ./raw_depth/ --output_dir ./processed/
3. 联合滤波: python depth_postprocessing.py --input depth.png --color color.png --output aligned.png --method joint_bilateral
4. 实时处理: python depth_postprocessing.py --realtime --device 0 --visualize
Features:
- 多种空洞填充算法(Navier-Stokes、Fast Marching、颜色引导)
- 联合双边滤波实现RGB-D边缘对齐
- 自适应噪声抑制与边缘保持
- 基于OpenCV CUDA的GPU加速
- 实时流处理能力
"""
import cv2
import numpy as np
import argparse
import os
from typing import Optional, Tuple, List
from enum import Enum
from dataclasses import dataclass
import time
from scipy import ndimage
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import spsolve
class DepthFillMethod(Enum):
"""深度填充方法枚举"""
NAIVE_INPAINT = "naive" # 简单邻域插值
FAST_MARCHING = "fast_marching" # 快速行进法(Telea)
NS_INPAINT = "navier_stokes" # Navier-Stokes流体动力学
COLOR_GUIDED = "color_guided" # 颜色引导填充
ANISOTROPIC = "anisotropic" # 各向异性扩散
PLANAR_FIT = "planar_fit" # 局部平面拟合
@dataclass
class ProcessingConfig:
"""处理配置参数"""
fill_method: DepthFillMethod = DepthFillMethod.FAST_MARCHING
inpaint_radius: int = 5 # 修复半径
bilateral_sigma_color: float = 0.05 # 颜色空间sigma(归一化)
bilateral_sigma_space: float = 5.0 # 空间域sigma(像素)
edge_threshold: float = 0.1 # 边缘检测阈值
min_valid_depth: float = 0.1 # 最小有效深度(米)
max_valid_depth: float = 10.0 # 最大有效深度(米)
use_gpu: bool = False # 使用CUDA加速
class DepthPostProcessor:
"""
深度图后处理器
实现空洞检测、填充、边缘对齐和噪声抑制
"""
def __init__(self, config: ProcessingConfig):
self.config = config
self.gpu_available = cv2.cuda.getCudaEnabledDeviceCount() > 0 if config.use_gpu else False
def detect_holes(self, depth_image: np.ndarray) -> np.ndarray:
"""
检测深度图中的空洞区域
Args:
depth_image: (H, W) float32深度图
Returns:
(H, W) uint8掩码,255表示空洞
"""
# 零值或异常值检测
hole_mask = (depth_image < self.config.min_valid_depth) | \
(depth_image > self.config.max_valid_depth) | \
np.isnan(depth_image) | np.isinf(depth_image)
# 形态学操作清理孤立噪声点
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
hole_mask = cv2.morphologyEx(hole_mask.astype(np.uint8) * 255,
cv2.MORPH_CLOSE, kernel)
return hole_mask
def analyze_holes(self, hole_mask: np.ndarray) -> List[dict]:
"""
分析空洞特征,分类处理策略
Returns:
空洞列表,包含面积、周长、位置等信息
"""
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(
hole_mask, connectivity=8
)
holes = []
for i in range(1, num_labels): # 跳过背景
area = stats[i, cv2.CC_STAT_AREA]
if area < 10: # 忽略过小噪声
continue
x, y, w, h = stats[i, cv2.CC_STAT_LEFT], stats[i, cv2.CC_STAT_TOP], \
stats[i, cv2.CC_STAT_WIDTH], stats[i, cv2.CC_STAT_HEIGHT]
# 计算周长(轮廓长度)
component_mask = (labels == i).astype(np.uint8) * 255
contours, _ = cv2.findContours(component_mask, cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
perimeter = cv2.arcLength(contours[0], True)
# 计算圆形度(区分线性边缘空洞和块状空洞)
circularity = 4 * np.pi * area / (perimeter ** 2) if perimeter > 0 else 0
holes.append({
'id': i,
'area': area,
'bbox': (x, y, w, h),
'centroid': centroids[i],
'perimeter': perimeter,
'circularity': circularity,
'mask': component_mask
})
return holes
def fill_holes(self, depth_image: np.ndarray,
color_image: Optional[np.ndarray] = None,
hole_mask: Optional[np.ndarray] = None) -> np.ndarray:
"""
填充深度图空洞
Args:
depth_image: 输入深度图
color_image: 可选的彩色引导图
hole_mask: 预计算的空洞掩码(可选)
Returns:
填充后的深度图
"""
if hole_mask is None:
hole_mask = self.detect_holes(depth_image)
# 归一化到0-255用于OpenCV修复算法
depth_normalized = cv2.normalize(depth_image, None, 0, 255, cv2.NORM_MINMAX)
depth_uint8 = depth_normalized.astype(np.uint8)
if self.config.fill_method == DepthFillMethod.NAIVE_INPAINT:
filled = self._naive_inpaint(depth_uint8, hole_mask)
elif self.config.fill_method == DepthFillMethod.FAST_MARCHING:
filled = cv2.inpaint(depth_uint8, hole_mask,
self.config.inpaint_radius,
cv2.INPAINT_TELEA)
elif self.config.fill_method == DepthFillMethod.NS_INPAINT:
filled = cv2.inpaint(depth_uint8, hole_mask,
self.config.inpaint_radius,
cv2.INPAINT_NS)
elif self.config.fill_method == DepthFillMethod.COLOR_GUIDED:
if color_image is None:
raise ValueError("颜色引导填充需要提供color_image")
filled = self._color_guided_fill(depth_image, color_image, hole_mask)
elif self.config.fill_method == DepthFillMethod.ANISOTROPIC:
filled = self._anisotropic_diffusion(depth_uint8, hole_mask)
elif self.config.fill_method == DepthFillMethod.PLANAR_FIT:
filled = self._planar_fit_fill(depth_image, hole_mask)
else:
filled = depth_uint8
# 反归一化
filled = filled.astype(np.float32) / 255.0
filled = filled * (depth_image.max() - depth_image.min()) + depth_image.min()
# 保留原始有效深度值
valid_mask = hole_mask == 0
filled[valid_mask] = depth_image[valid_mask]
return filled
def _naive_inpaint(self, depth_uint8: np.ndarray,
hole_mask: np.ndarray) -> np.ndarray:
"""简单邻域插值(快速但质量较低)"""
result = depth_uint8.copy()
hole_coords = np.where(hole_mask > 0)
for y, x in zip(hole_coords[0], hole_coords[1]):
# 提取邻域
y1, y2 = max(0, y-2), min(depth_uint8.shape[0], y+3)
x1, x2 = max(0, x-2), min(depth_uint8.shape[1], x+3)
neighborhood = depth_uint8[y1:y2, x1:x2]
mask = hole_mask[y1:y2, x1:x2] == 0
if np.any(mask):
result[y, x] = np.mean(neighborhood[mask])
else:
result[y, x] = 128 # 默认值
return result
def _color_guided_fill(self, depth_image: np.ndarray,
color_image: np.ndarray,
hole_mask: np.ndarray) -> np.ndarray:
"""
颜色引导的深度填充
利用彩色图像的纹理连续性指导深度插值
"""
# 转换到Lab颜色空间(更符合人眼感知)
if color_image.shape[2] == 3:
color_lab = cv2.cvtColor(color_image, cv2.COLOR_BGR2Lab).astype(np.float32)
else:
color_lab = color_image.astype(np.float32)
result = depth_image.copy()
hole_coords = np.where(hole_mask > 0)
for y, x in zip(hole_coords[0], hole_coords[1]):
# 定义搜索窗口
window_size = self.config.inpaint_radius * 2 + 1
y1 = max(0, y - self.config.inpaint_radius)
y2 = min(depth_image.shape[0], y + self.config.inpaint_radius + 1)
x1 = max(0, x - self.config.inpaint_radius)
x2 = min(depth_image.shape[1], x + self.config.inpaint_radius + 1)
# 提取窗口
depth_window = depth_image[y1:y2, x1:x2]
color_window = color_lab[y1:y2, x1:x2]
mask_window = (hole_mask[y1:y2, x1:x2] == 0) & (depth_window > 0)
if not np.any(mask_window):
continue
# 计算颜色相似性权重
center_color = color_lab[y, x]
color_diff = np.linalg.norm(color_window - center_color, axis=2)
color_weight = np.exp(-color_diff / (2 * self.config.bilateral_sigma_color ** 2))
# 空间距离权重
yy, xx = np.ogrid[y1:y2, x1:x2]
spatial_dist = np.sqrt((yy - y)**2 + (xx - x)**2)
spatial_weight = np.exp(-spatial_dist / (2 * self.config.bilateral_sigma_space ** 2))
# 组合权重
weights = color_weight * spatial_weight * mask_window
if weights.sum() > 0:
result[y, x] = np.sum(depth_window * weights) / np.sum(weights)
return result
def _anisotropic_diffusion(self, depth_uint8: np.ndarray,
hole_mask: np.ndarray,
iterations: int = 50,
kappa: float = 30,
gamma: float = 0.1) -> np.ndarray:
"""
各向异性扩散填充
沿边缘方向平滑,垂直边缘方向保持
"""
result = depth_uint8.astype(np.float32)
mask = (hole_mask > 0).astype(np.float32)
for _ in range(iterations):
# 计算梯度
grad_n = result[:-2, 1:-1] - result[1:-1, 1:-1] # 北
grad_s = result[2:, 1:-1] - result[1:-1, 1:-1] # 南
grad_e = result[1:-1, 2:] - result[1:-1, 1:-1] # 东
grad_w = result[1:-1, :-2] - result[1:-1, 1:-1] # 西
# 扩散系数(Perona-Malik)
c_n = np.exp(-(grad_n / kappa) ** 2)
c_s = np.exp(-(grad_s / kappa) ** 2)
c_e = np.exp(-(grad_e / kappa) ** 2)
c_w = np.exp(-(grad_w / kappa) ** 2)
# 更新(仅在空洞区域)
diff = gamma * (
c_n * grad_n + c_s * grad_s +
c_e * grad_e + c_w * grad_w
)
update_mask = mask[1:-1, 1:-1]
result[1:-1, 1:-1] += diff * update_mask
return np.clip(result, 0, 255).astype(np.uint8)
def _planar_fit_fill(self, depth_image: np.ndarray,
hole_mask: np.ndarray) -> np.ndarray:
"""
局部平面拟合填充
假设局部表面为平面,使用最小二乘拟合
"""
result = depth_image.copy()
holes = self.analyze_holes(hole_mask)
for hole in holes:
x, y, w, h = hole['bbox']
# 扩展边界获取邻域
margin = max(w, h)
x1 = max(0, x - margin)
y1 = max(0, y - margin)
x2 = min(depth_image.shape[1], x + w + margin)
y2 = min(depth_image.shape[0], y + h + margin)
# 提取邻域有效点
neighborhood = depth_image[y1:y2, x1:x2]
mask = (hole_mask[y1:y2, x1:x2] == 0) & (neighborhood > 0)
if np.sum(mask) < 3:
continue
# 构建2D坐标
yy, xx = np.where(mask)
yy += y1
xx += x1
zz = neighborhood[mask]
# 拟合平面 z = ax + by + c
A = np.column_stack([xx, yy, np.ones_like(xx)])
coeffs, _, _, _ = np.linalg.lstsq(A, zz, rcond=None)
# 填充空洞
hole_yy, hole_xx = np.where(hole['mask'] > 0)
hole_yy += y
hole_xx += x
for hy, hx in zip(hole_yy, hole_xx):
result[hy, hx] = coeffs[0] * hx + coeffs[1] * hy + coeffs[2]
return result
def joint_bilateral_filter(self, depth_image: np.ndarray,
color_image: np.ndarray,
hole_mask: Optional[np.ndarray] = None) -> np.ndarray:
"""
联合双边滤波实现RGB-D边缘对齐
Args:
depth_image: 深度图
color_image: 彩色引导图
hole_mask: 空洞掩码(可选)
Returns:
滤波后的深度图
"""
if self.gpu_available:
return self._joint_bilateral_gpu(depth_image, color_image)
else:
return self._joint_bilateral_cpu(depth_image, color_image, hole_mask)
def _joint_bilateral_cpu(self, depth_image: np.ndarray,
color_image: np.ndarray,
hole_mask: Optional[np.ndarray] = None) -> np.ndarray:
"""CPU版联合双边滤波"""
# 归一化
depth_norm = cv2.normalize(depth_image, None, 0, 1, cv2.NORM_MINMAX)
# 转换颜色到浮点
if color_image.dtype == np.uint8:
color_float = color_image.astype(np.float32) / 255.0
else:
color_float = color_image
result = np.zeros_like(depth_norm)
radius = int(self.config.bilateral_sigma_space * 2)
# 对每个像素
for y in range(depth_image.shape[0]):
for x in range(depth_image.shape[1]):
if hole_mask is not None and hole_mask[y, x] > 0:
continue
# 定义窗口
y1, y2 = max(0, y-radius), min(depth_image.shape[0], y+radius+1)
x1, x2 = max(0, x-radius), min(depth_image.shape[1], x+radius+1)
# 提取窗口
depth_window = depth_norm[y1:y2, x1:x2]
color_window = color_float[y1:y2, x1:x2]
# 中心像素
center_depth = depth_norm[y, x]
center_color = color_float[y, x]
# 空间权重
yy, xx = np.ogrid[y1:y2, x1:x2]
spatial_dist = np.sqrt((yy - y)**2 + (xx - x)**2)
spatial_weight = np.exp(-spatial_dist / (2 * self.config.bilateral_sigma_space**2))
# 颜色权重(在Lab空间计算)
color_dist = np.linalg.norm(color_window - center_color, axis=2)
color_weight = np.exp(-color_dist / (2 * self.config.bilateral_sigma_color**2))
# 深度权重(避免跨边缘平滑)
depth_dist = np.abs(depth_window - center_depth)
depth_weight = np.exp(-depth_dist / (2 * 0.1**2)) # 固定sigma
# 组合权重
weights = spatial_weight * color_weight * depth_weight
# 归一化滤波
if weights.sum() > 0:
result[y, x] = np.sum(depth_window * weights) / np.sum(weights)
else:
result[y, x] = center_depth
# 反归一化
result = result * (depth_image.max() - depth_image.min()) + depth_image.min()
return result
def _joint_bilateral_gpu(self, depth_image: np.ndarray,
color_image: np.ndarray) -> np.ndarray:
"""GPU加速版联合双边滤波(使用OpenCV CUDA)"""
# 上传数据到GPU
depth_gpu = cv2.cuda_GpuMat()
depth_gpu.upload(depth_image.astype(np.float32))
color_gpu = cv2.cuda_GpuMat()
color_gpu.upload(color_image.astype(np.float32))
# CUDA联合双边滤波(OpenCV 4.x+)
result_gpu = cv2.cuda.bilateralFilter(depth_gpu, -1,
self.config.bilateral_sigma_color * 255,
self.config.bilateral_sigma_space)
# 下载结果
result = result_gpu.download()
return result
def edge_align(self, depth_image: np.ndarray,
color_image: np.ndarray) -> np.ndarray:
"""
RGB-D边缘对齐主流程
Args:
depth_image: 原始深度图
color_image: 彩色图
Returns:
边缘对齐后的深度图
"""
# 1. 检测空洞
hole_mask = self.detect_holes(depth_image)
# 2. 填充空洞
filled_depth = self.fill_holes(depth_image, color_image, hole_mask)
# 3. 联合双边滤波对齐边缘
aligned_depth = self.joint_bilateral_filter(filled_depth, color_image, hole_mask)
# 4. 后处理:去除残余噪声
aligned_depth = cv2.medianBlur(aligned_depth.astype(np.float32), 5)
return aligned_depth
def process_sequence(self, depth_frames: List[np.ndarray],
color_frames: Optional[List[np.ndarray]] = None) -> List[np.ndarray]:
"""
处理序列数据(支持时序一致性)
Args:
depth_frames: 深度帧列表
color_frames: 彩色帧列表(可选)
Returns:
处理后的深度帧列表
"""
processed = []
prev_depth = None
for i, depth in enumerate(depth_frames):
color = color_frames[i] if color_frames else None
# 时序一致性滤波(使用前一帧信息)
if prev_depth is not None:
# 检测突变(可能为噪声)
diff = np.abs(depth - prev_depth)
sudden_change = diff > 0.5 # 阈值(米)
# 对突变区域降低权重
depth = np.where(sudden_change, (depth + prev_depth) / 2, depth)
# 标准处理流程
processed_depth = self.edge_align(depth, color) if color is not None else \
self.fill_holes(depth)
processed.append(processed_depth)
prev_depth = processed_depth
return processed
def main():
parser = argparse.ArgumentParser(description='深度图后处理工具')
parser.add_argument('--input', '-i', type=str, help='输入深度图')
parser.add_argument('--color', '-c', type=str, help='输入彩色图(联合滤波用)')
parser.add_argument('--output', '-o', type=str, help='输出路径')
parser.add_argument('--method', '-m', type=str,
choices=[m.value for m in DepthFillMethod],
default='fast_marching',
help='填充方法')
parser.add_argument('--radius', '-r', type=int, default=5, help='修复半径')
parser.add_argument('--batch', '-b', action='store_true', help='批量处理')
parser.add_argument('--input_dir', type=str, help='输入目录(批量模式)')
parser.add_argument('--output_dir', type=str, help='输出目录(批量模式)')
parser.add_argument('--realtime', action='store_true', help='实时处理模式')
parser.add_argument('--device', type=int, default=0, help='相机设备ID')
parser.add_argument('--visualize', '-v', action='store_true', help='可视化结果')
args = parser.parse_args()
# 创建配置
config = ProcessingConfig(
fill_method=DepthFillMethod(args.method),
inpaint_radius=args.radius,
use_gpu=cv2.cuda.getCudaEnabledDeviceCount() > 0
)
processor = DepthPostProcessor(config)
if args.batch:
# 批量处理
os.makedirs(args.output_dir, exist_ok=True)
depth_files = sorted([f for f in os.listdir(args.input_dir)
if f.endswith(('.png', '.jpg', '.tiff'))])
for fname in depth_files:
depth_path = os.path.join(args.input_dir, fname)
depth = cv2.imread(depth_path, cv2.IMREAD_ANYDEPTH).astype(np.float32) / 1000.0
color = None
if args.color:
color_path = os.path.join(args.color, fname.replace('depth', 'color'))
if os.path.exists(color_path):
color = cv2.imread(color_path)
processed = processor.edge_align(depth, color) if color is not None else \
processor.fill_holes(depth)
# 保存为16位PNG(毫米精度)
output_path = os.path.join(args.output_dir, fname)
cv2.imwrite(output_path, (processed * 1000).astype(np.uint16))
print(f"处理完成: {fname}")
elif args.realtime:
# 实时处理(示例)
cap = cv2.VideoCapture(args.device)
while True:
ret, frame = cap.read()
if not ret:
break
# 假设输入为RGB-D相机,这里简化处理
# 实际应分别获取彩色和深度流
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0
# 模拟深度图(实际应来自深度相机)
depth = gray * 5.0 # 假设最大5米
# 添加模拟空洞
depth[200:250, 300:350] = 0
processed = processor.edge_align(depth, frame)
if args.visualize:
# 可视化
depth_vis = cv2.applyColorMap(
(depth / 5.0 * 255).astype(np.uint8), cv2.COLORMAP_JET
)
proc_vis = cv2.applyColorMap(
(processed / 5.0 * 255).astype(np.uint8), cv2.COLORMAP_JET
)
cv2.imshow('Original', depth_vis)
cv2.imshow('Processed', proc_vis)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
else:
# 单图处理
depth = cv2.imread(args.input, cv2.IMREAD_ANYDEPTH).astype(np.float32)
# 根据文件类型判断单位(假设16位PNG为毫米)
if depth.max() > 100:
depth = depth / 1000.0 # 转为米
color = cv2.imread(args.color) if args.color else None
print(f"原始深度范围: [{depth.min():.3f}, {depth.max():.3f}]m")
# 检测空洞
holes = processor.detect_holes(depth)
hole_ratio = np.sum(holes > 0) / holes.size * 100
print(f"空洞比例: {hole_ratio:.2f}%")
# 处理
start_time = time.time()
processed = processor.edge_align(depth, color) if color is not None else \
processor.fill_holes(depth)
elapsed = time.time() - start_time
print(f"处理耗时: {elapsed*1000:.1f}ms")
print(f"处理后深度范围: [{processed.min():.3f}, {processed.max():.3f}]m")
# 保存
cv2.imwrite(args.output, (processed * 1000).astype(np.uint16))
print(f"已保存至: {args.output}")
if args.visualize:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes[0, 0].imshow(depth, cmap='jet', vmin=0, vmax=5)
axes[0, 0].set_title('Original Depth')
axes[0, 0].axis('off')
axes[0, 1].imshow(holes, cmap='gray')
axes[0, 1].set_title('Hole Mask')
axes[0, 1].axis('off')
axes[1, 0].imshow(processed, cmap='jet', vmin=0, vmax=5)
axes[1, 0].set_title('Processed Depth')
axes[1, 0].axis('off')
diff = np.abs(processed - depth)
axes[1, 1].imshow(diff, cmap='hot')
axes[1, 1].set_title(f'Difference (mean: {diff.mean():.3f}m)')
axes[1, 1].axis('off')
plt.tight_layout()
plt.show()
if __name__ == '__main__':
main()
6.2 三维重建管线
6.2.1 多视图几何:SfM(运动恢复结构)基础
运动恢复结构(Structure from Motion, SfM)是从无序图像集合中恢复相机位姿和三维场景结构的核心技术。其数学基础是多视图几何中的对极几何(Epipolar Geometry)和射影重建理论。给定两幅图像间的特征匹配,本质矩阵(Essential Matrix)编码了相机的相对旋转和平移;通过奇异值分解(SVD)可提取相机运动参数,进而三角化得到三维点云。
增量式SfM从两幅图像的初始重建开始,迭代添加新图像并优化已有结构。每次添加新图像时,通过PnP(Perspective-n-Point)算法估计新相机位姿,然后对新可见的三维点进行三角化。全局优化采用光束法平差(Bundle Adjustment),最小化所有三维点的重投影误差,同时精修相机参数和点坐标。这一非线性优化问题的规模随图像数量指数增长,因此采用稀疏BA和共视性图(Covisibility Graph)来降低计算复杂度。
OpenCV提供了SfM模块的基础组件,包括特征检测与匹配、本质矩阵估计、三角化和BA接口。对于大规模场景,需结合COLMAP等成熟框架,或实现分布式SfM策略,将图像集划分为重叠的子集分别重建,最后进行全局融合。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: sfm_reconstruction.py
Content: 增量式SfM(运动恢复结构)实现
Usage:
1. 重建场景: python sfm_reconstruction.py --images ./image_folder --output ./reconstruction
2. 使用已知内参: python sfm_reconstruction.py --images ./imgs --intrinsics camera.yml --output ./out
3. 仅匹配特征: python sfm_reconstruction.py --images ./imgs --mode match_only --matches matches.txt
4. 可视化结果: python sfm_reconstruction.py --input ./reconstruction --mode visualize
Features:
- 增量式SfM完整管线
- 支持SIFT、ORB、SuperPoint特征
- 鲁棒的几何验证(RANSAC)
- 稀疏光束法平差(Ceres Solver)
- 共视性图构建与优化
"""
import cv2
import numpy as np
import os
import argparse
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass, field
from pathlib import Path
import json
import pickle
from collections import defaultdict
import networkx as nx
@dataclass
class Camera:
"""相机模型"""
id: int
width: int
height: int
K: np.ndarray # 内参矩阵 (3, 3)
dist_coeffs: np.ndarray # 畸变系数
R: Optional[np.ndarray] = None # 旋转矩阵 (3, 3)
t: Optional[np.ndarray] = None # 平移向量 (3,)
is_registered: bool = False
def projection_matrix(self) -> np.ndarray:
"""计算投影矩阵 P = K[R|t]"""
if self.R is None or self.t is None:
return None
Rt = np.hstack([self.R, self.t.reshape(3, 1)])
return self.K @ Rt
def center(self) -> np.ndarray:
"""计算相机中心 C = -R^T * t"""
if self.R is None:
return None
return -self.R.T @ self.t
@dataclass
class Track:
"""三维点轨迹(被多个图像观测到的点)"""
id: int
point3d: np.ndarray # 3D坐标 (3,)
color: np.ndarray # RGB颜色 (3,)
observations: Dict[int, np.ndarray] = field(default_factory=dict) # {image_id: 2d_point}
def add_observation(self, image_id: int, point2d: np.ndarray):
self.observations[image_id] = point2d
class IncrementalSfM:
"""
增量式SfM重建器
实现完整的SfM管线:特征提取 -> 匹配 -> 几何验证 -> 增量重建 -> BA优化
"""
def __init__(self,
feature_type: str = 'sift',
matcher_type: str = 'flann',
min_matches: int = 50,
reproj_threshold: float = 4.0):
"""
初始化SfM重建器
Args:
feature_type: 特征类型 ('sift', 'orb', 'superpoint')
matcher_type: 匹配器类型 ('flann', 'bf')
min_matches: 最小匹配点数
reproj_threshold: 重投影误差阈值(像素)
"""
self.feature_type = feature_type
self.matcher_type = matcher_type
self.min_matches = min_matches
self.reproj_threshold = reproj_threshold
# 初始化特征检测器
if feature_type == 'sift':
self.detector = cv2.SIFT_create(nfeatures=5000)
self.matcher = cv2.FlannBasedMatcher(dict(algorithm=1, trees=5), dict(checks=50))
elif feature_type == 'orb':
self.detector = cv2.ORB_create(nfeatures=5000)
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
else:
raise ValueError(f"不支持的特征类型: {feature_type}")
# 数据存储
self.images: Dict[int, np.ndarray] = {} # 原始图像
self.keypoints: Dict[int, List[cv2.KeyPoint]] = {} # 特征点
self.descriptors: Dict[int, np.ndarray] = {} # 描述子
self.cameras: Dict[int, Camera] = {} # 相机
self.tracks: Dict[int, Track] = {} # 三维点
self.next_track_id = 0
# 共视性图
self.covisibility_graph = nx.Graph()
# 重建状态
self.reconstructed = False
def add_image(self, image_id: int, image_path: str,
camera_params: Optional[Dict] = None):
"""
添加图像到重建系统
Args:
image_id: 图像唯一ID
image_path: 图像文件路径
camera_params: 相机参数 {'K': ..., 'dist_coeffs': ...}
"""
# 读取图像
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图像: {image_path}")
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
self.images[image_id] = img
# 提取特征
kps, descs = self.detector.detectAndCompute(gray, None)
self.keypoints[image_id] = kps
self.descriptors[image_id] = descs
print(f"图像 {image_id}: 提取了 {len(kps)} 个特征点")
# 创建相机
h, w = img.shape[:2]
if camera_params:
K = camera_params['K']
dist = camera_params.get('dist_coeffs', np.zeros(4))
else:
# 假设标准焦距
focal = max(w, h) * 1.2
K = np.array([[focal, 0, w/2],
[0, focal, h/2],
[0, 0, 1]])
dist = np.zeros(4)
self.cameras[image_id] = Camera(
id=image_id, width=w, height=h,
K=K, dist_coeffs=dist
)
# 添加到共视性图
self.covisibility_graph.add_node(image_id)
def match_images(self, img_id1: int, img_id2: int) -> List[cv2.DMatch]:
"""
匹配两幅图像的特征
Args:
img_id1, img_id2: 图像ID
Returns:
匹配点列表
"""
desc1 = self.descriptors[img_id1]
desc2 = self.descriptors[img_id2]
if desc1 is None or desc2 is None or len(desc1) < 2 or len(desc2) < 2:
return []
# KNN匹配
try:
matches = self.matcher.knnMatch(desc1, desc2, k=2)
except:
return []
# 比率测试(Lowe's ratio test)
good_matches = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < 0.75 * n.distance:
good_matches.append(m)
return good_matches
def verify_geometry(self, img_id1: int, img_id2: int,
matches: List[cv2.DMatch]) -> Tuple[bool, np.ndarray, np.ndarray, List[int]]:
"""
几何验证:估计本质矩阵并恢复相机位姿
Args:
img_id1, img_id2: 图像ID
matches: 特征匹配
Returns:
(success, R, t, inlier_indices)
"""
if len(matches) < self.min_matches:
return False, None, None, []
# 获取匹配点坐标
pts1 = np.float32([self.keypoints[img_id1][m.queryIdx].pt for m in matches])
pts2 = np.float32([self.keypoints[img_id2][m.trainIdx].pt for m in matches])
# 使用对极几何估计本质矩阵
K1 = self.cameras[img_id1].K
K2 = self.cameras[img_id2].K
# 归一化坐标
pts1_norm = cv2.undistortPoints(pts1.reshape(-1, 1, 2), K1, None).reshape(-1, 2)
pts2_norm = cv2.undistortPoints(pts2.reshape(-1, 1, 2), K2, None).reshape(-1, 2)
# 估计本质矩阵
E, mask = cv2.findEssentialMat(
pts1_norm, pts2_norm,
method=cv2.RANSAC,
prob=0.999,
threshold=3.0/self.cameras[img_id1].K[0,0] # 归一化坐标阈值
)
if E is None or mask.sum() < self.min_matches:
return False, None, None, []
# 从本质矩阵恢复位姿
inliers = mask.ravel() > 0
pts1_inliers = pts1_norm[inliers]
pts2_inliers = pts2_norm[inliers]
_, R, t, mask_pose = cv2.recoverPose(E, pts1_inliers, pts2_inliers)
# 获取内点索引
inlier_indices = np.where(inliers)[0][mask_pose.ravel() > 0]
if len(inlier_indices) < self.min_matches:
return False, None, None, []
return True, R, t, inlier_indices
def triangulate_points(self, img_id1: int, img_id2: int,
matches: List[cv2.DMatch],
inlier_indices: List[int]) -> List[Track]:
"""
三角化三维点
Args:
img_id1, img_id2: 图像ID
matches: 匹配点
inlier_indices: 内点索引
Returns:
三维轨迹列表
"""
cam1 = self.cameras[img_id1]
cam2 = self.cameras[img_id2]
# 构建投影矩阵
P1 = cam1.projection_matrix()
P2 = cam2.projection_matrix()
tracks = []
for idx in inlier_indices:
match = matches[idx]
# 获取图像坐标
kp1 = self.keypoints[img_id1][match.queryIdx]
kp2 = self.keypoints[img_id2][match.trainIdx]
pt1 = np.array(kp1.pt)
pt2 = np.array(kp2.pt)
# 三角化
pts4d = cv2.triangulatePoints(
P1, P2,
pt1.reshape(2, 1), pt2.reshape(2, 1)
)
pts3d = (pts4d[:3] / pts4d[3]).ravel()
# 检查深度(应在两相机前方)
if pts3d[2] < 0:
continue
# 获取颜色(从第一幅图像)
x, y = int(kp1.pt[0]), int(kp1.pt[1])
color = self.images[img_id1][y, x] if y < self.images[img_id1].shape[0] and \
x < self.images[img_id1].shape[1] else [128, 128, 128]
# 创建轨迹
track = Track(
id=self.next_track_id,
point3d=pts3d,
color=color[::-1] if len(color) == 3 else [128, 128, 128] # BGR to RGB
)
track.add_observation(img_id1, pt1)
track.add_observation(img_id2, pt2)
self.tracks[self.next_track_id] = track
self.next_track_id += 1
tracks.append(track)
return tracks
def initialize_reconstruction(self, seed_pair: Optional[Tuple[int, int]] = None):
"""
初始化重建(选择最佳图像对)
Args:
seed_pair: 指定的种子图像对(可选)
"""
if seed_pair:
best_pair = seed_pair
matches = self.match_images(seed_pair[0], seed_pair[1])
success, R, t, inliers = self.verify_geometry(seed_pair[0], seed_pair[1], matches)
else:
# 寻找最佳初始对(匹配点最多且基线足够)
best_pair = None
best_score = 0
best_matches = None
best_inliers = None
best_R, best_t = None, None
image_ids = list(self.images.keys())
for i in range(len(image_ids)):
for j in range(i+1, len(image_ids)):
id1, id2 = image_ids[i], image_ids[j]
matches = self.match_images(id1, id2)
if len(matches) < self.min_matches:
continue
success, R, t, inliers = self.verify_geometry(id1, id2, matches)
if not success:
continue
# 评分:内点数 * 基线长度(避免退化配置)
baseline = np.linalg.norm(t)
if baseline < 0.1: # 基线太短
continue
score = len(inliers) * baseline
if score > best_score:
best_score = score
best_pair = (id1, id2)
best_matches = matches
best_inliers = inliers
best_R, best_t = R, t
if best_pair is None:
raise RuntimeError("无法找到合适的初始图像对")
print(f"初始图像对: {best_pair}, 内点数: {len(best_inliers)}")
# 设置第一幅相机的位姿(世界坐标系原点)
self.cameras[best_pair[0]].R = np.eye(3)
self.cameras[best_pair[0]].t = np.zeros(3)
self.cameras[best_pair[0]].is_registered = True
# 设置第二幅相机的位姿
self.cameras[best_pair[1]].R = best_R
self.cameras[best_pair[1]].t = best_t.ravel()
self.cameras[best_pair[1]].is_registered = True
# 三角化初始点云
tracks = self.triangulate_points(best_pair[0], best_pair[1],
best_matches, best_inliers)
print(f"初始三角化: {len(tracks)} 个三维点")
# 添加到共视性图
self.covisibility_graph.add_edge(best_pair[0], best_pair[1],
weight=len(best_inliers))
return best_pair
def register_next_image(self) -> Optional[int]:
"""
注册下一幅最佳图像(PnP)
Returns:
注册的图像ID,若无可用图像则返回None
"""
# 寻找与已重建图像有最多共视点的未注册图像
best_image = None
best_score = 0
best_2d_3d_pairs = None
registered_ids = [cid for cid, cam in self.cameras.items() if cam.is_registered]
for img_id, cam in self.cameras.items():
if cam.is_registered:
continue
# 收集2D-3D对应
points_2d = []
points_3d = []
for track in self.tracks.values():
if img_id in track.observations:
# 该图像观测到此三维点
points_2d.append(track.observations[img_id])
points_3d.append(track.point3d)
if len(points_2d) < 6: # PnP至少需要6个点
continue
score = len(points_2d)
if score > best_score:
best_score = score
best_image = img_id
best_2d_3d_pairs = (np.array(points_2d), np.array(points_3d))
if best_image is None:
return None
points_2d, points_3d = best_2d_3d_pairs
# PnP求解位姿
success, rvec, tvec, inliers = cv2.solvePnPRansac(
points_3d, points_2d,
self.cameras[best_image].K,
self.cameras[best_image].dist_coeffs,
iterationsCount=1000,
reprojectionError=self.reproj_threshold
)
if not success or len(inliers) < 6:
print(f"图像 {best_image} PnP失败")
return None
# 转换到矩阵形式
R, _ = cv2.Rodrigues(rvec)
self.cameras[best_image].R = R
self.cameras[best_image].t = tvec.ravel()
self.cameras[best_image].is_registered = True
print(f"注册图像 {best_image}: {len(inliers)} 个内点")
# 更新共视性图
for reg_id in registered_ids:
matches = self.match_images(best_image, reg_id)
if len(matches) > 10:
self.covisibility_graph.add_edge(best_image, reg_id, weight=len(matches))
return best_image
def triangulate_new_points(self, new_image_id: int):
"""
为新注册的图像三角化新的三维点
Args:
new_image_id: 新注册的图像ID
"""
new_cam = self.cameras[new_image_id]
new_tracks = 0
# 与所有已注册图像进行三角化
for other_id, other_cam in self.cameras.items():
if other_id == new_image_id or not other_cam.is_registered:
continue
matches = self.match_images(new_image_id, other_id)
if len(matches) < 10:
continue
# 检查哪些匹配点尚未三角化
new_matches = []
for m in matches:
# 检查是否已存在包含这两个关键点的轨迹
exists = False
for track in self.tracks.values():
if (new_image_id in track.observations and
other_id in track.observations):
kp_new = self.keypoints[new_image_id][m.queryIdx].pt
kp_other = self.keypoints[other_id][m.trainIdx].pt
if (np.allclose(track.observations[new_image_id], kp_new) and
np.allclose(track.observations[other_id], kp_other)):
exists = True
break
if not exists:
new_matches.append(m)
if len(new_matches) < 10:
continue
# 几何验证
success, _, _, inliers = self.verify_geometry(new_image_id, other_id, new_matches)
if not success:
continue
# 三角化新点
tracks = self.triangulate_points(new_image_id, other_id, new_matches, inliers)
new_tracks += len(tracks)
print(f"新三角化: {new_tracks} 个点")
def bundle_adjustment(self):
"""
光束法平差(简化版,使用OpenCV的Levenberg-Marquardt)
完整实现应使用Ceres Solver或g2o
"""
# 收集所有相机和点
registered_cams = {cid: cam for cid, cam in self.cameras.items() if cam.is_registered}
if len(registered_cams) < 2:
return
# 准备数据
camera_params = []
point_params = []
observations = []
# 相机参数: [fx, fy, cx, cy, r1, r2, r3, t1, t2, t3](Rodrigues旋转)
for cam in registered_cams.values():
rvec, _ = cv2.Rodrigues(cam.R)
params = np.concatenate([
[cam.K[0,0], cam.K[1,1], cam.K[0,2], cam.K[1,2]], # 内参
rvec.ravel(), # 旋转
cam.t # 平移
])
camera_params.append(params)
camera_params = np.array(camera_params)
# 点参数和观测
point_id_map = {}
for tid, track in self.tracks.items():
if len(track.observations) < 2:
continue
point_id_map[tid] = len(point_params)
point_params.append(track.point3d)
for img_id, pt2d in track.observations.items():
if img_id in registered_cams:
cam_idx = list(registered_cams.keys()).index(img_id)
observations.append((cam_idx, point_id_map[tid], pt2d))
if len(point_params) == 0:
return
point_params = np.array(point_params)
# 使用OpenCV的BA(简化版,实际应使用更优的库)
# 这里仅进行几轮重投影误差过滤
for iteration in range(3):
errors = []
for cam_idx, pt_idx, obs in observations:
cam = list(registered_cams.values())[cam_idx]
point = point_params[pt_idx]
# 重投影
proj = cam.projection_matrix()
pt4d = np.append(point, 1)
proj_pt = proj @ pt4d
proj_pt = proj_pt[:2] / proj_pt[2]
error = np.linalg.norm(proj_pt - obs)
errors.append(error)
errors = np.array(errors)
mean_error = np.mean(errors)
# 过滤外点
inlier_mask = errors < self.reproj_threshold * 2
if np.all(inlier_mask):
break
# 移除高误差观测对应的轨迹
removed_tracks = 0
obs_idx = 0
for tid, track in list(self.tracks.items()):
if len(track.observations) < 2:
continue
track_obs = [(cid, pt) for cid, pt in track.observations.items()
if cid in registered_cams]
track_errors = errors[obs_idx:obs_idx+len(track_obs)]
obs_idx += len(track_obs)
if np.any(track_errors > self.reproj_threshold * 3):
del self.tracks[tid]
removed_tracks += 1
print(f"BA迭代 {iteration}: 平均误差 {mean_error:.2f}px, 移除 {removed_tracks} 个外点轨迹")
def reconstruct(self):
"""执行完整重建"""
print("开始增量式SfM重建...")
# 1. 初始化
self.initialize_reconstruction()
# 2. 增量添加图像
while True:
new_id = self.register_next_image()
if new_id is None:
break
# 3. 三角化新点
self.triangulate_new_points(new_id)
# 4. 定期BA优化
if len([c for c in self.cameras.values() if c.is_registered]) % 5 == 0:
self.bundle_adjustment()
# 5. 最终全局BA
print("执行全局光束法平差...")
self.bundle_adjustment()
self.reconstructed = True
print(f"重建完成: {len(self.cameras)} 个相机, {len(self.tracks)} 个三维点")
def export_colmap_format(self, output_dir: str):
"""
导出为COLMAP格式(便于后续稠密重建)
Args:
output_dir: 输出目录
"""
os.makedirs(output_dir, exist_ok=True)
# cameras.txt
with open(os.path.join(output_dir, 'cameras.txt'), 'w') as f:
f.write("# Camera list with one line of data per camera:\n")
f.write("# CAMERA_ID, MODEL, WIDTH, HEIGHT, PARAMS[]\n")
for cam in self.cameras.values():
f.write(f"{cam.id} PINHOLE {cam.width} {cam.height} "
f"{cam.K[0,0]} {cam.K[1,1]} {cam.K[0,2]} {cam.K[1,2]}\n")
# images.txt
with open(os.path.join(output_dir, 'images.txt'), 'w') as f:
f.write("# Image list with two lines of data per image:\n")
f.write("# IMAGE_ID, QW, QX, QY, QZ, TX, TY, TZ, CAMERA_ID, NAME\n")
f.write("# POINTS2D[] as (X, Y, POINT3D_ID)\n")
for cam in self.cameras.values():
if not cam.is_registered:
continue
# 旋转矩阵转四元数
q = self._rotation_matrix_to_quaternion(cam.R)
t = cam.t
f.write(f"{cam.id} {q[0]} {q[1]} {q[2]} {q[3]} "
f"{t[0]} {t[1]} {t[2]} {cam.id} image_{cam.id}.jpg\n")
# 写入2D-3D对应
points_2d_str = []
for tid, track in self.tracks.items():
if cam.id in track.observations:
pt2d = track.observations[cam.id]
points_2d_str.append(f"{pt2d[0]} {pt2d[1]} {tid}")
f.write(" ".join(points_2d_str) + "\n")
# points3D.txt
with open(os.path.join(output_dir, 'points3D.txt'), 'w') as f:
f.write("# 3D point list with one line of data per point:\n")
f.write("# POINT3D_ID, X, Y, Z, R, G, B, ERROR, TRACK[] as (IMAGE_ID, POINT2D_IDX)\n")
for tid, track in self.tracks.items():
error = 0.0 # 可计算实际重投影误差
track_str = []
for img_id, pt2d in track.observations.items():
# 找到对应的特征点索引
kp_idx = -1
for i, kp in enumerate(self.keypoints[img_id]):
if np.allclose(kp.pt, pt2d):
kp_idx = i
break
if kp_idx >= 0:
track_str.append(f"{img_id} {kp_idx}")
f.write(f"{tid} {track.point3d[0]} {track.point3d[1]} {track.point3d[2]} "
f"{int(track.color[0])} {int(track.color[1])} {int(track.color[2])} "
f"{error} {' '.join(track_str)}\n")
print(f"已导出至COLMAP格式: {output_dir}")
def _rotation_matrix_to_quaternion(self, R: np.ndarray) -> np.ndarray:
"""旋转矩阵转四元数 (w, x, y, z)"""
trace = np.trace(R)
if trace > 0:
s = 0.5 / np.sqrt(trace + 1.0)
w = 0.25 / s
x = (R[2,1] - R[1,2]) * s
y = (R[0,2] - R[2,0]) * s
z = (R[1,0] - R[0,1]) * s
elif R[0,0] > R[1,1] and R[0,0] > R[2,2]:
s = 2.0 * np.sqrt(1.0 + R[0,0] - R[1,1] - R[2,2])
w = (R[2,1] - R[1,2]) / s
x = 0.25 * s
y = (R[0,1] + R[1,0]) / s
z = (R[0,2] + R[2,0]) / s
elif R[1,1] > R[2,2]:
s = 2.0 * np.sqrt(1.0 + R[1,1] - R[0,0] - R[2,2])
w = (R[0,2] - R[2,0]) / s
x = (R[0,1] + R[1,0]) / s
y = 0.25 * s
z = (R[1,2] + R[2,1]) / s
else:
s = 2.0 * np.sqrt(1.0 + R[2,2] - R[0,0] - R[1,1])
w = (R[1,0] - R[0,1]) / s
x = (R[0,2] + R[2,0]) / s
y = (R[1,2] + R[2,1]) / s
z = 0.25 * s
return np.array([w, x, y, z])
def save(self, filepath: str):
"""保存重建结果"""
data = {
'cameras': {cid: {
'K': cam.K.tolist(),
'dist': cam.dist_coeffs.tolist(),
'R': cam.R.tolist() if cam.R is not None else None,
't': cam.t.tolist() if cam.t is not None else None,
'registered': cam.is_registered
} for cid, cam in self.cameras.items()},
'tracks': {tid: {
'point3d': track.point3d.tolist(),
'color': track.color.tolist(),
'observations': {k: v.tolist() for k, v in track.observations.items()}
} for tid, track in self.tracks.items()},
'covisibility': nx.to_dict_of_dicts(self.covisibility_graph)
}
with open(filepath, 'wb') as f:
pickle.dump(data, f)
print(f"重建结果已保存: {filepath}")
@classmethod
def load(cls, filepath: str) -> 'IncrementalSfM':
"""加载重建结果"""
with open(filepath, 'rb') as f:
data = pickle.load(f)
sfm = cls()
# 恢复相机
for cid, cdata in data['cameras'].items():
cam = Camera(
id=cid,
width=0, height=0, # 需从图像重新获取
K=np.array(cdata['K']),
dist_coeffs=np.array(cdata['dist']),
R=np.array(cdata['R']) if cdata['R'] else None,
t=np.array(cdata['t']) if cdata['t'] else None,
is_registered=cdata['registered']
)
sfm.cameras[cid] = cam
# 恢复轨迹
for tid, tdata in data['tracks'].items():
track = Track(
id=tid,
point3d=np.array(tdata['point3d']),
color=np.array(tdata['color']),
observations={int(k): np.array(v) for k, v in tdata['observations'].items()}
)
sfm.tracks[tid] = track
sfm.next_track_id = max(sfm.tracks.keys()) + 1 if sfm.tracks else 0
sfm.covisibility_graph = nx.from_dict_of_dicts(data['covisibility'])
sfm.reconstructed = True
return sfm
def main():
parser = argparse.ArgumentParser(description='增量式SfM重建')
parser.add_argument('--images', '-i', type=str, help='图像目录')
parser.add_argument('--output', '-o', type=str, default='./sfm_output',
help='输出目录')
parser.add_argument('--intrinsics', type=str, help='相机内参YAML文件(可选)')
parser.add_argument('--mode', type=str, default='reconstruct',
choices=['reconstruct', 'match_only', 'visualize'],
help='运行模式')
parser.add_argument('--matches', type=str, help='匹配输出文件(match_only模式)')
parser.add_argument('--input', type=str, help='输入重建文件(visualize模式)')
args = parser.parse_args()
if args.mode == 'reconstruct':
# 完整重建模式
sfm = IncrementalSfM(feature_type='sift')
# 加载图像
image_files = sorted([f for f in os.listdir(args.images)
if f.lower().endswith(('.jpg', '.png', '.jpeg'))])
print(f"找到 {len(image_files)} 张图像")
# 加载相机内参(如果提供)
camera_params = None
if args.intrinsics:
fs = cv2.FileStorage(args.intrinsics, cv2.FILE_STORAGE_READ)
K = fs.getNode('K').mat()
dist = fs.getNode('dist').mat()
camera_params = {'K': K, 'dist_coeffs': dist}
fs.release()
# 添加图像
for idx, fname in enumerate(image_files):
img_path = os.path.join(args.images, fname)
sfm.add_image(idx, img_path, camera_params)
# 执行重建
sfm.reconstruct()
# 导出结果
os.makedirs(args.output, exist_ok=True)
sfm.export_colmap_format(args.output)
sfm.save(os.path.join(args.output, 'reconstruction.pkl'))
# 导出PLY点云
pcd = o3d.geometry.PointCloud()
points = np.array([t.point3d for t in sfm.tracks.values()])
colors = np.array([t.color for t in sfm.tracks.values()]) / 255.0
pcd.points = o3d.utility.Vector3dVector(points)
pcd.colors = o3d.utility.Vector3dVector(colors)
o3d.io.write_point_cloud(os.path.join(args.output, 'sparse.ply'), pcd)
print(f"重建完成,结果保存至: {args.output}")
elif args.mode == 'match_only':
# 仅特征匹配模式
sfm = IncrementalSfM()
image_files = sorted([f for f in os.listdir(args.images)
if f.lower().endswith(('.jpg', '.png', '.jpeg'))])
# 加载图像并提取特征
for idx, fname in enumerate(image_files):
sfm.add_image(idx, os.path.join(args.images, fname))
# 计算所有匹配对
all_matches = {}
for i in range(len(image_files)):
for j in range(i+1, len(image_files)):
matches = sfm.match_images(i, j)
if len(matches) > 10:
all_matches[(i, j)] = [(m.queryIdx, m.trainIdx, m.distance) for m in matches]
# 保存匹配结果
with open(args.matches, 'wb') as f:
pickle.dump(all_matches, f)
print(f"匹配结果已保存: {args.matches}")
elif args.mode == 'visualize':
# 可视化模式
sfm = IncrementalSfM.load(args.input)
# 创建点云可视化
pcd = o3d.geometry.PointCloud()
points = np.array([t.point3d for t in sfm.tracks.values()])
colors = np.array([t.color for t in sfm.tracks.values()]) / 255.0
pcd.points = o3d.utility.Vector3dVector(points)
pcd.colors = o3d.utility.Vector3dVector(colors)
# 创建相机可视化
camera_frames = []
for cam in sfm.cameras.values():
if not cam.is_registered:
continue
# 创建相机坐标系
center = cam.center()
if center is None:
continue
# 相机朝向
z_axis = cam.R[2, :] # 相机Z轴(朝向)
y_axis = -cam.R[1, :] # 相机Y轴(向下)
x_axis = np.cross(y_axis, z_axis)
# 绘制相机金字塔
scale = 0.5
points = [
center,
center + (z_axis + x_axis*0.5 + y_axis*0.5) * scale,
center + (z_axis - x_axis*0.5 + y_axis*0.5) * scale,
center + (z_axis - x_axis*0.5 - y_axis*0.5) * scale,
center + (z_axis + x_axis*0.5 - y_axis*0.5) * scale
]
lines = [[0, 1], [0, 2], [0, 3], [0, 4], [1, 2], [2, 3], [3, 4], [4, 1]]
line_set = o3d.geometry.LineSet()
line_set.points = o3d.utility.Vector3dVector(points)
line_set.lines = o3d.utility.Vector2iVector(lines)
line_set.colors = o3d.utility.Vector3dVector([[1, 0, 0] for _ in lines])
camera_frames.append(line_set)
# 显示
o3d.visualization.draw_geometries([pcd] + camera_frames)
if __name__ == '__main__':
main()
6.2.2 稠密重建:PatchMatch与ColMap接口
稀疏SfM重建得到的是特征点云,缺乏表面细节。稠密重建旨在为每个像素恢复深度值,生成密集的三维点云或网格。PatchMatch是一种高效的近似最近邻搜索算法,被广泛应用于多视图立体(MVS)中,通过在图像平面传播和随机扰动深度与法向假设,快速收敛到最优三维表面。
COLMAP是目前最成熟的开源MVS系统,集成了PatchMatch Stereo算法。其核心思想是在参考图像的每个像素处,沿极线搜索匹配代价最小的深度值。匹配代价通常采用归一化互相关(NCC)或零均值NCC(ZNCC)计算图像块相似性。为提高鲁棒性,COLMAP采用多尺度金字塔、几何一致性约束和滤波策略去除异常值。
OpenCV与COLMAP的接口主要通过图像和相机参数的传递实现。首先运行COLMAP的稀疏重建获取相机位姿,然后执行PatchMatch Stereo计算深度图,最后进行深度图融合生成点云。对于自定义实现,可利用OpenCV的stereo模块和SGBM算法作为基线,结合PatchMatch的传播机制进行优化。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: dense_reconstruction_colmap.py
Content: 稠密重建管线:PatchMatch MVS与COLMAP接口
Usage:
1. 完整管线: python dense_reconstruction_colmap.py --images ./imgs --workspace ./workspace
2. 仅深度估计: python dense_reconstruction_colmap.py --sparse_dir ./sparse --workspace ./dense --mode stereo
3. 深度图融合: python dense_reconstruction_colmap.py --depth_dir ./depths --output fused.ply --mode fusion
4. 自定义PatchMatch: python dense_reconstruction_colmap.py --images ./imgs --mode custom_pm
Features:
- COLMAP自动化调用(稀疏重建、PatchMatch、融合)
- 自定义PatchMatch实现(传播+随机搜索)
- 深度图滤波与一致性检查
- 多尺度深度估计
- GPU加速支持(CUDA)
"""
import cv2
import numpy as np
import os
import argparse
import subprocess
import json
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
from pathlib import Path
import open3d as o3d
from scipy.ndimage import gaussian_filter
import re
@dataclass
class PatchMatchParams:
"""PatchMatch参数配置"""
max_iterations: int = 10 # 传播迭代次数
patch_size: int = 11 # 匹配窗口大小(奇数)
num_samples: int = 15 # 随机采样数
geom_consistency: bool = True # 几何一致性约束
filter_min_ncc: float = 0.1 # 最小NCC阈值
filter_min_triangulation_angle: float = 1.0 # 最小三角化角度(度)
filter_min_num_consistent: int = 2 # 最小一致视图数
class ColmapInterface:
"""
COLMAP命令行接口封装
自动化执行SfM和MVS管线
"""
def __init__(self, colmap_path: str = 'colmap'):
self.colmap_path = colmap_path
self._check_colmap()
def _check_colmap(self):
"""检查COLMAP是否可用"""
try:
result = subprocess.run([self.colmap_path, 'help'],
capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError("COLMAP执行失败")
except FileNotFoundError:
raise RuntimeError("未找到COLMAP,请安装并添加到PATH")
def run_feature_extractor(self, database_path: str, image_path: str):
"""特征提取"""
cmd = [
self.colmap_path, 'feature_extractor',
'--database_path', database_path,
'--image_path', image_path,
'--ImageReader.single_camera', '1',
'--SiftExtraction.use_gpu', '1'
]
subprocess.run(cmd, check=True)
print("特征提取完成")
def run_exhaustive_matcher(self, database_path: str):
"""穷尽匹配"""
cmd = [
self.colmap_path, 'exhaustive_matcher',
'--database_path', database_path,
'--SiftMatching.use_gpu', '1'
]
subprocess.run(cmd, check=True)
print("特征匹配完成")
def run_mapper(self, database_path: str, image_path: str,
output_path: str):
"""稀疏重建(SfM)"""
cmd = [
self.colmap_path, 'mapper',
'--database_path', database_path,
'--image_path', image_path,
'--output_path', output_path
]
subprocess.run(cmd, check=True)
print("稀疏重建完成")
def run_image_undistorter(self, image_path: str,
input_path: str, output_path: str):
"""图像去畸变"""
cmd = [
self.colmap_path, 'image_undistorter',
'--image_path', image_path,
'--input_path', input_path,
'--output_path', output_path,
'--output_type', 'COLMAP'
]
subprocess.run(cmd, check=True)
print("图像去畸变完成")
def run_patch_match_stereo(self, workspace_path: str):
"""PatchMatch立体匹配"""
cmd = [
self.colmap_path, 'patch_match_stereo',
'--workspace_path', workspace_path,
'--workspace_format', 'COLMAP',
'--PatchMatchStereo.geom_consistency', 'true',
'--PatchMatchStereo.filter', 'true'
]
subprocess.run(cmd, check=True)
print("PatchMatch立体匹配完成")
def run_stereo_fusion(self, workspace_path: str, output_path: str):
"""深度图融合"""
cmd = [
self.colmap_path, 'stereo_fusion',
'--workspace_path', workspace_path,
'--workspace_format', 'COLMAP',
'--input_type', 'geometric',
'--output_path', output_path
]
subprocess.run(cmd, check=True)
print(f"深度图融合完成: {output_path}")
def run_poisson_mesher(self, input_path: str, output_path: str):
"""泊松表面重建"""
cmd = [
self.colmap_path, 'poisson_mesher',
'--input_path', input_path,
'--output_path', output_path
]
subprocess.run(cmd, check=True)
print(f"网格重建完成: {output_path}")
def run_full_pipeline(self, image_path: str, workspace_path: str):
"""执行完整重建管线"""
database_path = os.path.join(workspace_path, 'database.db')
sparse_path = os.path.join(workspace_path, 'sparse')
dense_path = os.path.join(workspace_path, 'dense')
os.makedirs(workspace_path, exist_ok=True)
# 1. 特征提取与匹配
self.run_feature_extractor(database_path, image_path)
self.run_exhaustive_matcher(database_path)
# 2. 稀疏重建
self.run_mapper(database_path, image_path, sparse_path)
# 3. 图像去畸变
# 找到最大的重建(可能有多个)
sparse_subdirs = [d for d in os.listdir(sparse_path)
if os.path.isdir(os.path.join(sparse_path, d))]
if not sparse_subdirs:
raise RuntimeError("稀疏重建失败")
largest_sparse = max(sparse_subdirs,
key=lambda d: os.path.getsize(
os.path.join(sparse_path, d, 'points3D.bin')))
self.run_image_undistorter(
image_path,
os.path.join(sparse_path, largest_sparse),
dense_path
)
# 4. PatchMatch稠密重建
self.run_patch_match_stereo(dense_path)
# 5. 深度图融合
fused_path = os.path.join(dense_path, 'fused.ply')
self.run_stereo_fusion(dense_path, fused_path)
# 6. 网格重建(可选)
meshed_path = os.path.join(dense_path, 'meshed-poisson.ply')
self.run_poisson_mesher(fused_path, meshed_path)
return {
'sparse': os.path.join(sparse_path, largest_sparse),
'dense': dense_path,
'fused': fused_path,
'meshed': meshed_path
}
class CustomPatchMatch:
"""
自定义PatchMatch实现
基于传播和随机搜索的深度估计
"""
def __init__(self, params: PatchMatchParams):
self.params = params
def compute_depth(self, ref_image: np.ndarray,
src_images: List[np.ndarray],
ref_K: np.ndarray, src_Ks: List[np.ndarray],
ref_Rt: np.ndarray, src_Rts: List[np.ndarray]) -> np.ndarray:
"""
计算参考图像的深度图
Args:
ref_image: 参考图像 (H, W, 3)
src_images: 源图像列表 [(H, W, 3), ...]
ref_K: 参考相机内参 (3, 3)
src_Ks: 源相机内参列表 [(3, 3), ...]
ref_Rt: 参考相机外参 [R|t] (3, 4)
src_Rts: 源相机外参列表 [(3, 4), ...]
Returns:
depth_map: (H, W) 深度图
"""
h, w = ref_image.shape[:2]
# 初始化随机深度和法向
depth_map = np.random.uniform(0.1, 10.0, (h, w))
normal_map = self._init_random_normals(h, w)
# 构建图像金字塔
num_scales = 3
ref_pyramid = self._build_pyramid(ref_image, num_scales)
src_pyramids = [self._build_pyramid(src, num_scales) for src in src_images]
# 从粗到细优化
for scale in range(num_scales-1, -1, -1):
scale_factor = 2 ** scale
# 缩放参数
ref_K_s = ref_K.copy()
ref_K_s[:2, :] /= scale_factor
src_Ks_s = [K.copy() for K in src_Ks]
for K in src_Ks_s:
K[:2, :] /= scale_factor
# 缩放深度图
if scale < num_scales - 1:
depth_map = cv2.resize(depth_map,
(ref_pyramid[scale].shape[1], ref_pyramid[scale].shape[0]),
interpolation=cv2.INTER_LINEAR)
normal_map = cv2.resize(normal_map,
(ref_pyramid[scale].shape[1], ref_pyramid[scale].shape[0]),
interpolation=cv2.INTER_LINEAR)
# 重新归一化法向
norm = np.linalg.norm(normal_map, axis=2, keepdims=True)
normal_map = normal_map / (norm + 1e-8)
# 执行PatchMatch迭代
for iteration in range(self.params.max_iterations):
# 空间传播
depth_map, normal_map = self._spatial_propagation(
ref_pyramid[scale], src_pyramids, depth_map, normal_map,
ref_K_s, src_Ks_s, ref_Rt, src_Rts, iteration
)
# 视图传播(多视图一致性)
if self.params.geom_consistency:
depth_map = self._view_propagation(
ref_pyramid[scale], src_pyramids, depth_map, normal_map,
ref_K_s, src_Ks_s, ref_Rt, src_Rts
)
# 随机搜索
depth_map, normal_map = self._random_search(
ref_pyramid[scale], src_pyramids, depth_map, normal_map,
ref_K_s, src_Ks_s, ref_Rt, src_Rts, iteration
)
return depth_map
def _init_random_normals(self, h: int, w: int) -> np.ndarray:
"""初始化随机法向图(指向相机前方)"""
normals = np.random.randn(h, w, 3)
normals[:, :, 2] = np.abs(normals[:, :, 2]) # Z分量为正
normals = normals / np.linalg.norm(normals, axis=2, keepdims=True)
return normals
def _build_pyramid(self, image: np.ndarray, num_levels: int) -> List[np.ndarray]:
"""构建高斯金字塔"""
pyramid = [image]
for _ in range(num_levels - 1):
pyramid.append(cv2.pyrDown(pyramid[-1]))
return pyramid[::-1] # 从粗到细
def _spatial_propagation(self, ref_img: np.ndarray, src_pyramids: List[List[np.ndarray]],
depth_map: np.ndarray, normal_map: np.ndarray,
ref_K: np.ndarray, src_Ks: List[np.ndarray],
ref_Rt: np.ndarray, src_Rts: List[np.ndarray],
iteration: int) -> Tuple[np.ndarray, np.ndarray]:
"""
空间传播:从邻域像素传播深度假设
奇数迭代:从左到右,从上到下
偶数迭代:从右到左,从下到上
"""
h, w = depth_map.shape
new_depth = depth_map.copy()
new_normal = normal_map.copy()
# 确定扫描方向
reverse = iteration % 2 == 1
y_range = range(h-1, -1, -1) if reverse else range(h)
x_range = range(w-1, -1, -1) if reverse else range(w)
for y in y_range:
for x in x_range:
best_cost = self._compute_cost(
ref_img, src_pyramids, x, y,
depth_map[y, x], normal_map[y, x],
ref_K, src_Ks, ref_Rt, src_Rts
)
# 检查邻域(根据扫描方向)
neighbors = []
if y > 0 and not reverse:
neighbors.append((x, y-1))
if y < h-1 and reverse:
neighbors.append((x, y+1))
if x > 0 and not reverse:
neighbors.append((x-1, y))
if x < w-1 and reverse:
neighbors.append((x+1, y))
for nx, ny in neighbors:
cost = self._compute_cost(
ref_img, src_pyramids, x, y,
new_depth[ny, nx], new_normal[ny, nx],
ref_K, src_Ks, ref_Rt, src_Rts
)
if cost < best_cost:
best_cost = cost
new_depth[y, x] = new_depth[ny, nx]
new_normal[y, x] = new_normal[ny, nx]
return new_depth, new_normal
def _view_propagation(self, ref_img: np.ndarray, src_pyramids: List[List[np.ndarray]],
depth_map: np.ndarray, normal_map: np.ndarray,
ref_K: np.ndarray, src_Ks: List[np.ndarray],
ref_Rt: np.ndarray, src_Rts: List[np.ndarray]) -> np.ndarray:
"""视图传播:利用多视图几何一致性优化深度"""
# 简化的几何一致性检查
# 实际实现应检查重投影误差和深度一致性
return depth_map
def _random_search(self, ref_img: np.ndarray, src_pyramids: List[List[np.ndarray]],
depth_map: np.ndarray, normal_map: np.ndarray,
ref_K: np.ndarray, src_Ks: List[np.ndarray],
ref_Rt: np.ndarray, src_Rts: List[np.ndarray],
iteration: int) -> Tuple[np.ndarray, np.ndarray]:
"""
随机搜索:在假设空间随机采样
搜索范围随迭代减小
"""
h, w = depth_map.shape
new_depth = depth_map.copy()
new_normal = normal_map.copy()
# 当前搜索半径
max_depth_range = 10.0 * (0.5 ** iteration)
max_angle_range = np.pi / 4 * (0.5 ** iteration)
for y in range(h):
for x in range(w):
current_depth = depth_map[y, x]
current_normal = normal_map[y, x]
best_cost = self._compute_cost(
ref_img, src_pyramids, x, y,
current_depth, current_normal,
ref_K, src_Ks, ref_Rt, src_Rts
)
# 随机采样
for _ in range(self.params.num_samples):
# 随机深度扰动
depth_sample = current_depth + np.random.uniform(-max_depth_range, max_depth_range)
depth_sample = np.clip(depth_sample, 0.1, 100.0)
# 随机法向扰动
angle = np.random.uniform(0, max_angle_range)
axis = np.random.randn(3)
axis = axis / np.linalg.norm(axis)
R_perturb = self._axis_angle_to_matrix(axis, angle)
normal_sample = R_perturb @ current_normal
cost = self._compute_cost(
ref_img, src_pyramids, x, y,
depth_sample, normal_sample,
ref_K, src_Ks, ref_Rt, src_Rts
)
if cost < best_cost:
best_cost = cost
new_depth[y, x] = depth_sample
new_normal[y, x] = normal_sample
return new_depth, new_normal
def _compute_cost(self, ref_img: np.ndarray, src_pyramids: List[List[np.ndarray]],
x: int, y: int, depth: float, normal: np.ndarray,
ref_K: np.ndarray, src_Ks: List[np.ndarray],
ref_Rt: np.ndarray, src_Rts: List[np.ndarray]) -> float:
"""
计算匹配代价(ZNCC)
"""
h, w = ref_img.shape[:2]
# 构建局部平面
point3d = self._backproject(x, y, depth, ref_K)
# 计算源视图中的投影
total_cost = 0.0
valid_views = 0
for src_idx, (src_pyr, src_K, src_Rt) in enumerate(zip(src_pyramids, src_Ks, src_Rts)):
# 将3D点投影到源视图
src_img = src_pyr[0] # 使用原始尺度
# 计算单应性(平面诱导)
H = self._compute_homography(ref_K, src_K, ref_Rt, src_Rt, normal, point3d)
# 采样参考块
patch_ref = self._sample_patch(ref_img, x, y, self.params.patch_size)
if patch_ref is None:
continue
# 在源视图中寻找对应块
src_coord = H @ np.array([x, y, 1])
src_coord = src_coord[:2] / src_coord[2]
patch_src = self._sample_patch(src_img, src_coord[0], src_coord[1],
self.params.patch_size)
if patch_src is None:
continue
# 计算ZNCC
zncc = self._compute_zncc(patch_ref, patch_src)
total_cost += 1.0 - zncc # 代价 = 1 - 相关性
valid_views += 1
return total_cost / valid_views if valid_views > 0 else 1.0
def _backproject(self, x: int, y: int, depth: float, K: np.ndarray) -> np.ndarray:
"""反投影图像坐标到3D空间"""
X = (x - K[0, 2]) * depth / K[0, 0]
Y = (y - K[1, 2]) * depth / K[1, 1]
Z = depth
return np.array([X, Y, Z])
def _compute_homography(self, K1: np.ndarray, K2: np.ndarray,
Rt1: np.ndarray, Rt2: np.ndarray,
normal: np.ndarray, point3d: np.ndarray) -> np.ndarray:
"""
计算平面诱导的单应性矩阵
"""
R1, t1 = Rt1[:, :3], Rt1[:, 3]
R2, t2 = Rt2[:, :3], Rt2[:, 3]
# 平面方程: n^T * X + d = 0
d = -normal.dot(point3d)
# 相对位姿
R_rel = R2 @ R1.T
t_rel = t2 - R_rel @ t1
# 单应性
H = K2 @ (R_rel - np.outer(t_rel, normal) / d) @ np.linalg.inv(K1)
return H
def _sample_patch(self, image: np.ndarray, x: float, y: float,
patch_size: int) -> Optional[np.ndarray]:
"""采样图像块"""
half = patch_size // 2
x0, y0 = int(x) - half, int(y) - half
x1, y1 = x0 + patch_size, y0 + patch_size
if x0 < 0 or y0 < 0 or x1 >= image.shape[1] or y1 >= image.shape[0]:
return None
patch = image[y0:y1, x0:x1]
return patch.astype(np.float32)
def _compute_zncc(self, patch1: np.ndarray, patch2: np.ndarray) -> float:
"""计算零均值归一化互相关"""
p1 = patch1.reshape(-1, 3)
p2 = patch2.reshape(-1, 3)
p1 = p1 - p1.mean(axis=0)
p2 = p2 - p2.mean(axis=0)
norm1 = np.linalg.norm(p1)
norm2 = np.linalg.norm(p2)
if norm1 < 1e-6 or norm2 < 1e-6:
return 0.0
return np.sum(p1 * p2) / (norm1 * norm2)
def _axis_angle_to_matrix(self, axis: np.ndarray, angle: float) -> np.ndarray:
"""轴角表示转旋转矩阵(Rodrigues公式)"""
K = np.array([[0, -axis[2], axis[1]],
[axis[2], 0, -axis[0]],
[-axis[1], axis[0], 0]])
R = np.eye(3) + np.sin(angle) * K + (1 - np.cos(angle)) * (K @ K)
return R
class DepthMapFusion:
"""
深度图融合器
将多视角深度图融合为一致的三维点云
"""
def __init__(self,
consistency_threshold: float = 1.0, # 一致性阈值(像素)
min_consistent_views: int = 2):
self.consistency_threshold = consistency_threshold
self.min_consistent_views = min_consistent_views
def fuse(self, depth_maps: List[np.ndarray],
images: List[np.ndarray],
Ks: List[np.ndarray], Rts: List[np.ndarray]) -> o3d.geometry.PointCloud:
"""
融合多视角深度图
Args:
depth_maps: 深度图列表 [(H, W), ...]
images: 彩色图像列表 [(H, W, 3), ...]
Ks: 内参列表 [(3, 3), ...]
Rts: 外参列表 [(3, 4), ...]
Returns:
融合后的点云
"""
points = []
colors = []
num_views = len(depth_maps)
for ref_idx in range(num_views):
ref_depth = depth_maps[ref_idx]
ref_img = images[ref_idx]
ref_K = Ks[ref_idx]
ref_Rt = Rts[ref_idx]
h, w = ref_depth.shape
# 遍历每个像素
for y in range(h):
for x in range(w):
depth = ref_depth[y, x]
if depth <= 0 or np.isnan(depth) or np.isinf(depth):
continue
# 反投影到3D
point3d = self._backproject_pixel(x, y, depth, ref_K, ref_Rt)
# 几何一致性检查
consistent_views = 1
for src_idx in range(num_views):
if src_idx == ref_idx:
continue
# 投影到源视图
src_K = Ks[src_idx]
src_Rt = Rts[src_idx]
src_depth = depth_maps[src_idx]
projected = self._project_point(point3d, src_K, src_Rt)
if projected is None:
continue
px, py = int(projected[0]), int(projected[1])
if px < 0 or px >= w or py < 0 or py >= h:
continue
# 检查深度一致性
src_d = src_depth[py, px]
if src_d <= 0:
continue
# 计算源视图中的3D点
src_point3d = self._backproject_pixel(px, py, src_d, src_K, src_Rt)
# 距离检查
distance = np.linalg.norm(point3d - src_point3d)
if distance < self.consistency_threshold * depth * 0.01: # 1%相对误差
consistent_views += 1
# 保留足够一致的点
if consistent_views >= self.min_consistent_views:
points.append(point3d)
colors.append(ref_img[y, x])
# 构建点云
pcd = o3d.geometry.PointCloud()
if len(points) > 0:
pcd.points = o3d.utility.Vector3dVector(np.array(points))
pcd.colors = o3d.utility.Vector3dVector(np.array(colors) / 255.0)
# 统计滤波去噪
pcd, _ = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)
return pcd
def _backproject_pixel(self, x: int, y: int, depth: float,
K: np.ndarray, Rt: np.ndarray) -> np.ndarray:
"""像素反投影到世界坐标系"""
X = (x - K[0, 2]) * depth / K[0, 0]
Y = (y - K[1, 2]) * depth / K[1, 1]
Z = depth
point_cam = np.array([X, Y, Z, 1])
R, t = Rt[:, :3], Rt[:, 3]
point_world = np.linalg.inv(np.hstack([R, t.reshape(3, 1)])) @ point_cam
return point_world[:3]
def _project_point(self, point3d: np.ndarray, K: np.ndarray,
Rt: np.ndarray) -> Optional[np.ndarray]:
"""3D点投影到图像平面"""
R, t = Rt[:, :3], Rt[:, 3]
point_cam = R @ point3d + t
if point_cam[2] <= 0:
return None
x = K[0, 0] * point_cam[0] / point_cam[2] + K[0, 2]
y = K[1, 1] * point_cam[1] / point_cam[2] + K[1, 2]
return np.array([x, y])
def main():
parser = argparse.ArgumentParser(description='稠密重建工具')
parser.add_argument('--images', '-i', type=str, help='输入图像目录')
parser.add_argument('--workspace', '-w', type=str, help='工作目录')
parser.add_argument('--sparse_dir', type=str, help='稀疏重建目录(用于stereo模式)')
parser.add_argument('--depth_dir', type=str, help='深度图目录(用于fusion模式)')
parser.add_argument('--output', '-o', type=str, default='output.ply', help='输出文件')
parser.add_argument('--mode', type=str, default='full',
choices=['full', 'stereo', 'fusion', 'custom_pm'],
help='运行模式')
parser.add_argument('--colmap_path', type=str, default='colmap',
help='COLMAP可执行文件路径')
args = parser.parse_args()
if args.mode == 'full':
# 完整COLMAP管线
colmap = ColmapInterface(args.colmap_path)
results = colmap.run_full_pipeline(args.images, args.workspace)
print("\n重建结果:")
for key, path in results.items():
print(f" {key}: {path}")
elif args.mode == 'stereo':
# 仅PatchMatch立体匹配
colmap = ColmapInterface(args.colmap_path)
colmap.run_patch_match_stereo(args.workspace)
elif args.mode == 'fusion':
# 深度图融合
# 加载深度图和相机参数
depth_files = sorted([f for f in os.listdir(args.depth_dir)
if f.endswith(('.png', '.exr', '.pfm'))])
depth_maps = []
for f in depth_files:
depth_path = os.path.join(args.depth_dir, f)
if f.endswith('.pfm'):
depth = cv2.imread(depth_path, cv2.IMREAD_UNCHANGED)
else:
depth = cv2.imread(depth_path, cv2.IMREAD_ANYDEPTH).astype(np.float32) / 1000.0
depth_maps.append(depth)
# 这里应加载对应的图像和相机参数
# 简化示例,实际需从COLMAP格式读取
print(f"加载了 {len(depth_maps)} 张深度图")
# 执行融合(需要额外的相机参数)
# fusion = DepthMapFusion()
# pcd = fusion.fuse(depth_maps, images, Ks, Rts)
# o3d.io.write_point_cloud(args.output, pcd)
elif args.mode == 'custom_pm':
# 使用自定义PatchMatch
# 加载图像和相机参数(从COLMAP或自定义格式)
print("自定义PatchMatch模式需要预先计算的相机参数")
print("请使用 --mode full 先执行稀疏重建获取位姿")
if __name__ == '__main__':
main()
6.2.3 网格生成:泊松重建与Delaunay三角化
点云重建得到的是离散的三维点集合,缺乏拓扑连接关系。表面重建算法将这些点转换为连续的三角网格,便于渲染、分析和制造。泊松重建(Poisson Surface Reconstruction)是目前最鲁棒的隐式表面重建方法,通过求解泊松方程,从有向点云(带法向)中重建指示函数(Indicator Function)的等值面,隐式地处理噪声和数据缺失。
Delaunay三角化则是显式方法的代表,将点云投影到二维平面(如切平面或参数域),在平面内执行Delaunay三角化,再反投影回三维空间。其优势在于保持局部几何特征,适合采样均匀、噪声较小的数据。Open3D提供了create_from_point_cloud_poisson和delaunay_triangulation接口,支持重建后的网格简化、平滑和孔洞填充。
网格质量优化包括顶点去噪、边折叠简化(Quadric Error Metrics)和拉普拉斯平滑。对于彩色点云,还需进行纹理坐标生成(UV映射),将顶点颜色或外部图像映射到网格表面。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: mesh_generation.py
Content: 网格生成与优化:泊松重建、Delaunay三角化及后处理
Usage:
1. 泊松重建: python mesh_generation.py --input cloud.ply --output mesh.ply --method poisson --depth 10
2. Delaunay重建: python mesh_generation.py --input cloud.ply --output mesh.ply --method delaunay --alpha 0.5
3. 网格简化: python mesh_generation.py --input mesh.ply --output simplified.ply --mode simplify --target_faces 10000
4. 纹理映射: python mesh_generation.py --input mesh.ply --images ./imgs --poses poses.txt --output textured.obj --mode texture
Features:
- 泊松表面重建(带法向估计)
- 约束Delaunay三角化(Alpha Shapes)
- 网格简化与优化(QEM、拉普拉斯平滑)
- 自动UV展开与纹理映射
- 网格质量评估与修复
"""
import open3d as o3d
import numpy as np
import cv2
import argparse
import os
from typing import Optional, List, Tuple
from dataclasses import dataclass
from scipy.spatial import Delaunay, ConvexHull
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import spsolve
import trimesh
@dataclass
class PoissonParams:
"""泊松重建参数"""
depth: int = 10 # 八叉树深度(分辨率)
scale: float = 1.1 # 重建密度
linear_fit: bool = False # 使用线性插值
boundary_type: int = 1 # 边界类型(1=Dirichlet)
@dataclass
class DelaunayParams:
"""Delaunay重建参数"""
alpha: float = 0.5 # Alpha形状参数
tolerance: float = 0.001 # 容差
hole_filling: bool = True # 填充小孔洞
class MeshReconstructor:
"""
网格重建器
实现泊松重建、Delaunay三角化和网格优化
"""
def __init__(self):
self.point_cloud = None
self.mesh = None
def load_point_cloud(self, filepath: str) -> o3d.geometry.PointCloud:
"""加载点云"""
self.point_cloud = o3d.io.read_point_cloud(filepath)
if len(self.point_cloud.points) == 0:
raise ValueError(f"无法加载点云或点云为空: {filepath}")
print(f"加载点云: {len(self.point_cloud.points)} 个点")
# 检查并估计法向
if not self.point_cloud.has_normals():
print("估计法向...")
self.point_cloud.estimate_normals(
search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30)
)
# 定向法向(朝向相机中心)
self.point_cloud.orient_normals_consistent_tangent_plane(100)
return self.point_cloud
def poisson_reconstruction(self, params: PoissonParams) -> o3d.geometry.TriangleMesh:
"""
泊松表面重建
Args:
params: 泊松重建参数
Returns:
重建的三角网格
"""
print(f"执行泊松重建,深度: {params.depth}")
mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(
self.point_cloud,
depth=params.depth,
scale=params.scale,
linear_fit=params.linear_fit,
n_threads=-1 # 使用所有CPU核心
)
# 根据密度过滤低质量区域
densities = np.asarray(densities)
density_mask = densities > np.quantile(densities, 0.1)
mesh.remove_vertices_by_mask(~density_mask)
# 计算顶点颜色(从点云)
if self.point_cloud.has_colors():
mesh.paint_uniform_color([0.7, 0.7, 0.7]) # 默认灰色
# 使用最近邻投影颜色
self._project_vertex_colors(mesh)
self.mesh = mesh
print(f"泊松重建完成: {len(mesh.vertices)} 顶点, {len(mesh.triangles)} 面")
return mesh
def delaunay_reconstruction(self, params: DelaunayParams) -> o3d.geometry.TriangleMesh:
"""
Alpha形状(约束Delaunay)重建
Args:
params: Delaunay参数
Returns:
重建的三角网格
"""
points = np.asarray(self.point_cloud.points)
print(f"执行Alpha形状重建,alpha: {params.alpha}")
# 使用Open3D的Alpha形状
mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_alpha_shape(
self.point_cloud, params.alpha
)
# 后处理:移除小连通组件
with o3d.utility.VerbosityContextManager(o3d.utility.VerbosityLevel.Debug) as cm:
triangle_clusters, cluster_n_triangles, cluster_area = (
mesh.cluster_connected_triangles()
)
triangle_clusters = np.asarray(triangle_clusters)
cluster_n_triangles = np.asarray(cluster_n_triangles)
# 保留最大的连通组件
largest_cluster_idx = cluster_n_triangles.argmax()
triangles_to_remove = triangle_clusters != largest_cluster_idx
mesh.remove_triangles_by_mask(triangles_to_remove)
mesh.remove_unreferenced_vertices()
self.mesh = mesh
print(f"Delaunay重建完成: {len(mesh.vertices)} 顶点, {len(mesh.triangles)} 面")
return mesh
def ball_pivoting_reconstruction(self, radii: List[float] = None) -> o3d.geometry.TriangleMesh:
"""
滚球算法(Ball Pivoting)重建
Args:
radii: 球体半径列表,默认自动估计
Returns:
重建的三角网格
"""
if radii is None:
# 自动估计半径(基于点云密度)
distances = self.point_cloud.compute_nearest_neighbor_distance()
avg_dist = np.mean(distances)
radii = [avg_dist * 1.5, avg_dist * 3.0, avg_dist * 6.0]
print(f"执行滚球算法,半径: {radii}")
mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_ball_pivoting(
self.point_cloud, o3d.utility.DoubleVector(radii)
)
self.mesh = mesh
print(f"滚球重建完成: {len(mesh.vertices)} 顶点, {len(mesh.triangles)} 面")
return mesh
def _project_vertex_colors(self, mesh: o3d.geometry.TriangleMesh):
"""将点云颜色投影到网格顶点"""
# 构建KD树
pcd_tree = o3d.geometry.KDTreeFlann(self.point_cloud)
colors = np.asarray(self.point_cloud.colors)
vertex_colors = []
for vertex in mesh.vertices:
[k, idx, _] = pcd_tree.search_knn_vector_3d(vertex, 1)
vertex_colors.append(colors[idx[0]])
mesh.vertex_colors = o3d.utility.Vector3dVector(np.array(vertex_colors))
class MeshOptimizer:
"""
网格优化器
实现简化、平滑、修复和UV展开
"""
def __init__(self, mesh: o3d.geometry.TriangleMesh):
self.mesh = mesh
self.original_mesh = mesh.copy()
def simplify_quadric_decimation(self, target_triangles: int) -> o3d.geometry.TriangleMesh:
"""
二次误差度量(QEM)网格简化
Args:
target_triangles: 目标面数
Returns:
简化后的网格
"""
print(f"简化网格至 {target_triangles} 个面...")
simplified = self.mesh.simplify_quadric_decimation(target_triangles)
# 保持边界
simplified = simplified.filter_smooth_simple(number_of_iterations=1)
self.mesh = simplified
print(f"简化完成: {len(simplified.vertices)} 顶点, {len(simplified.triangles)} 面")
return simplified
def laplacian_smooth(self, iterations: int = 10, lambda_filter: float = 0.5) -> o3d.geometry.TriangleMesh:
"""
拉普拉斯平滑
Args:
iterations: 迭代次数
lambda_filter: 滤波系数(0-1)
Returns:
平滑后的网格
"""
print(f"拉普拉斯平滑,迭代 {iterations} 次...")
smoothed = self.mesh.filter_smooth_laplacian(
number_of_iterations=iterations,
lambda_filter=lambda_filter
)
self.mesh = smoothed
return smoothed
def taubin_smooth(self, iterations: int = 10, lambda_filter: float = 0.5,
mu: float = -0.53) -> o3d.geometry.TriangleMesh:
"""
Taubin平滑(保持体积)
Args:
iterations: 迭代次数
lambda_filter: 收缩系数
mu: 膨胀系数(负值)
Returns:
平滑后的网格
"""
print(f"Taubin平滑,迭代 {iterations} 次...")
smoothed = self.mesh.filter_smooth_taubin(
number_of_iterations=iterations,
lambda_filter=lambda_filter,
mu=mu
)
self.mesh = smoothed
return smoothed
def remove_duplicates(self, tolerance: float = 0.0001) -> o3d.geometry.TriangleMesh:
"""移除重复顶点和退化面"""
self.mesh.remove_duplicated_vertices()
self.mesh.remove_duplicated_triangles()
self.mesh.remove_degenerate_triangles()
self.mesh.remove_unreferenced_vertices()
return self.mesh
def hole_filling(self, hole_size: int = 30) -> o3d.geometry.TriangleMesh:
"""
填充小孔洞
Args:
hole_size: 最大填充边界边数
Returns:
修复后的网格
"""
print("检测并填充孔洞...")
# 识别边界边
edge_manifold = self.mesh.is_edge_manifold(allow_boundary_edges=True)
vertex_manifold = self.mesh.is_vertex_manifold()
if not edge_manifold or not vertex_manifold:
# 使用Open3D的孔洞填充
filled = self.mesh.fill_holes(hole_size=hole_size)
self.mesh = filled
print(f"孔洞填充完成")
return self.mesh
def compute_uv_atlas(self, size: int = 2048, gutter: int = 2) -> np.ndarray:
"""
计算UV坐标(使用xatlas算法)
Args:
size: 纹理图尺寸
gutter: 纹理块间距
Returns:
UV坐标数组 (N, 2)
"""
# 使用trimesh进行UV展开
import xatlas # 需要安装: pip install xatlas
vertices = np.asarray(self.mesh.vertices)
triangles = np.asarray(self.mesh.triangles)
# xatlas参数化
vmapping, indices, uvs = xatlas.parametrize(
vertices, triangles,
resolution=size,
gutter=gutter
)
# 重新映射顶点
new_vertices = vertices[vmapping]
# 更新网格
self.mesh.vertices = o3d.utility.Vector3dVector(new_vertices)
self.mesh.triangles = o3d.utility.Vector3iVector(indices)
self.mesh.texture_uvs = o3d.utility.Vector2dVector(uvs)
print(f"UV展开完成: {len(uvs)} 个UV坐标")
return uvs
def texture_mapping(self, images: List[np.ndarray],
camera_poses: List[np.ndarray],
camera_intrinsics: List[np.ndarray],
texture_size: int = 2048) -> o3d.geometry.TriangleMesh:
"""
多图像纹理映射
Args:
images: 彩色图像列表
camera_poses: 相机位姿列表 (4, 4)
camera_intrinsics: 相机内参列表 (3, 3)
texture_size: 输出纹理尺寸
Returns:
带纹理的网格
"""
print("生成纹理映射...")
# 确保有UV坐标
if not self.mesh.has_texture_coordinates(0):
self.compute_uv_atlas(texture_size)
# 创建纹理图(使用Open3D的纹理映射)
# 这里简化处理,实际应使用多图融合策略
# 计算每个三角形的最佳视角
triangle_normals = np.asarray(self.mesh.triangle_normals)
triangle_centers = np.asarray([self.mesh.vertices[t].mean(axis=0)
for t in self.mesh.triangles])
# 选择每个三角形最可见的图像
best_views = []
for center, normal in zip(triangle_centers, triangle_normals):
best_view = -1
best_score = -1
for idx, (pose, K) in enumerate(zip(camera_poses, camera_intrinsics)):
# 转换到相机坐标系
center_cam = pose[:3, :3] @ center + pose[:3, 3]
if center_cam[2] <= 0: # 在相机后方
continue
# 投影到图像平面
u = K[0, 0] * center_cam[0] / center_cam[2] + K[0, 2]
v = K[1, 1] * center_cam[1] / center_cam[2] + K[1, 2]
h, w = images[idx].shape[:2]
if u < 0 or u >= w or v < 0 or v >= h:
continue
# 视角评分(考虑距离、角度、图像中心度)
view_dir = -center_cam / np.linalg.norm(center_cam)
normal_cam = pose[:3, :3] @ normal
angle_score = max(0, view_dir.dot(normal_cam))
dist_score = 1.0 / (1.0 + np.linalg.norm(center_cam))
center_score = 1.0 - (abs(u - w/2) / (w/2) + abs(v - h/2) / (h/2)) / 2
score = angle_score * dist_score * center_score
if score > best_score:
best_score = score
best_view = idx
best_views.append(best_view)
# 创建纹理图(简化:仅使用最佳视角的颜色)
texture_image = np.zeros((texture_size, texture_size, 3), dtype=np.uint8)
# 光栅化三角形到纹理图
uvs = np.asarray(self.mesh.texture_uvs)
triangles = np.asarray(self.mesh.triangles)
for tri_idx, (tri, view_idx) in enumerate(zip(triangles, best_views)):
if view_idx < 0:
continue
# 获取三角形的UV坐标
uv_tri = uvs[tri] * texture_size
# 计算包围盒
min_u = int(np.min(uv_tri[:, 0]))
max_u = int(np.max(uv_tri[:, 0])) + 1
min_v = int(np.min(uv_tri[:, 1]))
max_v = int(np.max(uv_tri[:, 1])) + 1
# 裁剪到纹理范围
min_u = max(0, min_u)
max_u = min(texture_size, max_u)
min_v = max(0, min_v)
max_v = min(texture_size, max_v)
# 获取3D顶点
verts_3d = np.asarray(self.mesh.vertices)[tri]
# 投影到最佳视角图像
pose = camera_poses[view_idx]
K = camera_intrinsics[view_idx]
img = images[view_idx]
# 简单采样:使用重心坐标插值
for v in range(min_v, max_v):
for u in range(min_u, max_u):
# 检查是否在三角形内
p = np.array([u, v])
if self._point_in_triangle(p, uv_tri):
# 计算重心坐标
bary = self._barycentric_coordinates(p, uv_tri)
# 插值3D位置
point_3d = bary[0] * verts_3d[0] + bary[1] * verts_3d[1] + bary[2] * verts_3d[2]
# 投影到图像
point_cam = pose[:3, :3] @ point_3d + pose[:3, 3]
img_u = int(K[0, 0] * point_cam[0] / point_cam[2] + K[0, 2])
img_v = int(K[1, 1] * point_cam[1] / point_cam[2] + K[1, 2])
if 0 <= img_u < img.shape[1] and 0 <= img_v < img.shape[0]:
texture_image[v, u] = img[img_v, img_u]
# 应用纹理
self.mesh.textures = [o3d.geometry.Image(texture_image)]
return self.mesh
def _point_in_triangle(self, p: np.ndarray, tri: np.ndarray) -> bool:
"""检查点是否在三角形内(2D)"""
def sign(p1, p2, p3):
return (p1[0] - p3[0]) * (p2[1] - p3[1]) - (p2[0] - p3[0]) * (p1[1] - p3[1])
d1 = sign(p, tri[0], tri[1])
d2 = sign(p, tri[1], tri[2])
d3 = sign(p, tri[2], tri[0])
has_neg = (d1 < 0) or (d2 < 0) or (d3 < 0)
has_pos = (d1 > 0) or (d2 > 0) or (d3 > 0)
return not (has_neg and has_pos)
def _barycentric_coordinates(self, p: np.ndarray, tri: np.ndarray) -> np.ndarray:
"""计算重心坐标"""
v0 = tri[2] - tri[0]
v1 = tri[1] - tri[0]
v2 = p - tri[0]
dot00 = np.dot(v0, v0)
dot01 = np.dot(v0, v1)
dot02 = np.dot(v0, v2)
dot11 = np.dot(v1, v1)
dot12 = np.dot(v1, v2)
inv_denom = 1 / (dot00 * dot11 - dot01 * dot01)
u = (dot11 * dot02 - dot01 * dot12) * inv_denom
v = (dot00 * dot12 - dot01 * dot02) * inv_denom
w = 1 - u - v
return np.array([w, v, u])
def evaluate_quality(self) -> dict:
"""
评估网格质量
Returns:
质量指标字典
"""
metrics = {}
# 基本统计
metrics['num_vertices'] = len(self.mesh.vertices)
metrics['num_triangles'] = len(self.mesh.triangles)
# 流形性检查
metrics['is_watertight'] = self.mesh.is_watertight()
metrics['is_edge_manifold'] = self.mesh.is_edge_manifold()
metrics['is_vertex_manifold'] = self.mesh.is_vertex_manifold()
metrics['is_self_intersecting'] = self.mesh.is_self_intersecting()
# 几何质量
triangle_areas = np.asarray(self.mesh.get_surface_area())
metrics['surface_area'] = triangle_areas
# 三角形质量(内切圆/外接圆半径比)
vertices = np.asarray(self.mesh.vertices)
triangles = np.asarray(self.mesh.triangles)
qualities = []
for tri in triangles:
v0, v1, v2 = vertices[tri]
a = np.linalg.norm(v1 - v0)
b = np.linalg.norm(v2 - v1)
c = np.linalg.norm(v0 - v2)
# 使用海伦公式计算面积
s = (a + b + c) / 2
area = np.sqrt(max(0, s * (s-a) * (s-b) * (s-c)))
# 内切圆半径
r_in = area / s if s > 0 else 0
# 外接圆半径
r_circ = (a * b * c) / (4 * area) if area > 0 else 0
quality = r_in / r_circ if r_circ > 0 else 0
qualities.append(quality)
qualities = np.array(qualities)
metrics['min_triangle_quality'] = qualities.min()
metrics['mean_triangle_quality'] = qualities.mean()
metrics['num_bad_triangles'] = np.sum(qualities < 0.1)
return metrics
def main():
parser = argparse.ArgumentParser(description='网格生成与优化工具')
parser.add_argument('--input', '-i', type=str, required=True, help='输入文件')
parser.add_argument('--output', '-o', type=str, required=True, help='输出文件')
parser.add_argument('--method', '-m', type=str, default='poisson',
choices=['poisson', 'delaunay', 'ball_pivoting'],
help='重建方法')
parser.add_argument('--depth', type=int, default=10, help='泊松深度')
parser.add_argument('--alpha', type=float, default=0.5, help='Alpha形状参数')
parser.add_argument('--mode', type=str, default='reconstruct',
choices=['reconstruct', 'simplify', 'smooth', 'texture', 'repair'],
help='操作模式')
parser.add_argument('--target_faces', type=int, default=10000, help='简化目标面数')
parser.add_argument('--smooth_iterations', type=int, default=10, help='平滑迭代次数')
parser.add_argument('--images', type=str, help='纹理图像目录(纹理模式)')
parser.add_argument('--poses', type=str, help='相机位姿文件(纹理模式)')
args = parser.parse_args()
if args.mode == 'reconstruct':
# 从点云重建网格
reconstructor = MeshReconstructor()
pcd = reconstructor.load_point_cloud(args.input)
if args.method == 'poisson':
params = PoissonParams(depth=args.depth)
mesh = reconstructor.poisson_reconstruction(params)
elif args.method == 'delaunay':
params = DelaunayParams(alpha=args.alpha)
mesh = reconstructor.delaunay_reconstruction(params)
else: # ball_pivoting
mesh = reconstructor.ball_pivoting_reconstruction()
# 保存
o3d.io.write_triangle_mesh(args.output, mesh)
print(f"网格已保存: {args.output}")
elif args.mode == 'simplify':
# 简化现有网格
mesh = o3d.io.read_triangle_mesh(args.input)
optimizer = MeshOptimizer(mesh)
optimizer.remove_duplicates()
simplified = optimizer.simplify_quadric_decimation(args.target_faces)
o3d.io.write_triangle_mesh(args.output, simplified)
print(f"简化网格已保存: {args.output}")
elif args.mode == 'smooth':
# 平滑网格
mesh = o3d.io.read_triangle_mesh(args.input)
optimizer = MeshOptimizer(mesh)
smoothed = optimizer.taubin_smooth(iterations=args.smooth_iterations)
o3d.io.write_triangle_mesh(args.output, smoothed)
print(f"平滑网格已保存: {args.output}")
elif args.mode == 'repair':
# 修复网格
mesh = o3d.io.read_triangle_mesh(args.input)
optimizer = MeshOptimizer(mesh)
optimizer.remove_duplicates()
optimizer.hole_filling()
# 评估质量
metrics = optimizer.evaluate_quality()
print("\n网格质量评估:")
for key, value in metrics.items():
print(f" {key}: {value}")
o3d.io.write_triangle_mesh(args.output, optimizer.mesh)
elif args.mode == 'texture':
# 纹理映射(需要额外的图像和位姿数据)
mesh = o3d.io.read_triangle_mesh(args.input)
optimizer = MeshOptimizer(mesh)
# 这里应加载图像和位姿数据
print("纹理映射模式需要 --images 和 --poses 参数")
print("示例用法: python mesh_generation.py -i mesh.ply --mode texture "
"--images ./imgs --poses poses.txt -o textured.obj")
if __name__ == '__main__':
main()
6.2.4 纹理映射:UV展开与多图融合
纹理映射将高分辨率图像信息映射到三维网格表面,显著提升视觉真实感。UV展开(UV Unwrapping)将三维表面参数化到二维纹理空间,关键指标是最小化角度畸变和面积畸变。对于复杂拓扑,需引入割缝(Seams)将网格切开展平,割缝位置应隐藏在视觉不敏感区域(如背面、凹角)。
多图融合解决单张图像分辨率不足或遮挡问题。首先为每个网格三角形选择最佳视角(基于可见性、分辨率和视角角度),然后在纹理空间中混合重叠区域的图像。混合策略包括直接覆盖、羽化(Feathering)和多频段融合(Multi-band Blending)。多频段融合将图像分解为不同频率层,低频层使用大权重混合保持光照一致性,高频层保留细节锐度。
Open3D支持基本纹理映射,但复杂场景需借助Blender、xatlas等专业工具进行高质量UV展开。纹理坐标生成后,通过光栅化将多视角图像投影到UV空间,处理边界处的颜色一致性(使用直方图匹配或泊松融合)以消除接缝。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: texture_mapping.py
Content: 纹理映射与多图融合:UV展开、视角选择与无缝融合
Usage:
1. 自动UV展开: python texture_mapping.py --input mesh.ply --output uv_mesh.obj --mode unwrap
2. 单图纹理: python texture_mapping.py --input mesh.obj --image texture.jpg --output textured.obj --mode single
3. 多图融合: python texture_mapping.py --input mesh.obj --images ./imgs --poses poses.npy --output fused.obj --mode multi
4. 无缝融合: python texture_mapping.py --input mesh.obj --images ./imgs --poses poses.npy --output seamless.obj --mode seamless --blend bands
Features:
- 自动UV参数化(Least Squares Conformal Maps)
- 最佳视角选择与可见性计算
- 多频段纹理融合(消除接缝)
- 直方图匹配(光照一致性)
- 纹理图集打包与优化
"""
import open3d as o3d
import numpy as np
import cv2
import argparse
import os
from typing import List, Tuple, Optional, Dict
from dataclasses import dataclass
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import spsolve
from scipy.ndimage import gaussian_filter
import xatlas
@dataclass
class ViewInfo:
"""视图信息"""
image: np.ndarray # 彩色图像
pose: np.ndarray # 相机位姿 (4, 4)
K: np.ndarray # 内参 (3, 3)
image_id: int
class UVUnwrapper:
"""
UV展开器
实现LSCM(最小二乘共形映射)和xatlas参数化
"""
def __init__(self, mesh: o3d.geometry.TriangleMesh):
self.mesh = mesh
self.uv_coords = None
def unwrap_lscm(self) -> np.ndarray:
"""
最小二乘共形映射(LSCM)UV展开
最小化角度畸变
"""
vertices = np.asarray(self.mesh.vertices)
triangles = np.asarray(self.mesh.triangles)
# 构建拉普拉斯矩阵
n_vertices = len(vertices)
# 计算每个顶点的邻居
neighbors = [set() for _ in range(n_vertices)]
for tri in triangles:
for i in range(3):
v0, v1 = tri[i], tri[(i+1)%3]
neighbors[v0].add(v1)
neighbors[v1].add(v0)
# 构建稀疏矩阵(复数形式,实部和虚部分开)
# LSCM: min ||grad(u) - perp(grad(v))||^2
# 简化实现:使用xatlas库
return self.unwrap_xatlas()
def unwrap_xatlas(self, resolution: int = 2048,
gutter: int = 2,
padding: int = 4) -> np.ndarray:
"""
使用xatlas进行高质量UV展开
Args:
resolution: 纹理分辨率
gutter: 图表间距
padding: 内边距
Returns:
UV坐标数组 (N, 2)
"""
vertices = np.asarray(self.mesh.vertices)
triangles = np.asarray(self.mesh.triangles)
# xatlas参数化
vmapping, indices, uvs = xatlas.parametrize(
vertices,
triangles,
resolution=resolution,
gutter=gutter,
padding=padding,
brute_force=True # 更高质量
)
# 重新组织网格
new_vertices = vertices[vmapping]
# 更新网格
self.mesh.vertices = o3d.utility.Vector3dVector(new_vertices)
self.mesh.triangles = o3d.utility.Vector3iVector(indices)
# 存储UV坐标
self.uv_coords = uvs
self.mesh.texture_uvs = o3d.utility.Vector2dVector(uvs)
print(f"UV展开完成: {len(uvs)} 个坐标, {len(np.unique(indices))} 个图表")
return uvs
def visualize_uv_layout(self, size: int = 1024) -> np.ndarray:
"""
可视化UV布局
Args:
size: 图像尺寸
Returns:
UV布局可视化图像
"""
if self.uv_coords is None:
raise RuntimeError("请先执行UV展开")
image = np.ones((size, size, 3), dtype=np.uint8) * 255
triangles = np.asarray(self.mesh.triangles)
uvs = (self.uv_coords * size).astype(np.int32)
# 随机颜色
np.random.seed(42)
for tri in triangles:
color = tuple(np.random.randint(0, 255, 3).tolist())
pts = uvs[tri].reshape(-1, 2)
# 绘制三角形
cv2.polylines(image, [pts], True, color, 1)
return image
class ViewSelector:
"""
最佳视角选择器
为每个网格面选择最佳观测图像
"""
def __init__(self, mesh: o3d.geometry.TriangleMesh):
self.mesh = mesh
self.mesh.compute_triangle_normals()
def select_best_views(self, views: List[ViewInfo]) -> np.ndarray:
"""
为每个三角形选择最佳视角
Args:
views: 视图列表
Returns:
每个面的最佳视图索引数组 (M,)
"""
triangle_centers = np.asarray([self.mesh.vertices[t].mean(axis=0)
for t in self.mesh.triangles])
triangle_normals = np.asarray(self.mesh.triangle_normals)
n_faces = len(self.mesh.triangles)
best_views = np.full(n_faces, -1, dtype=np.int32)
best_scores = np.full(n_faces, -np.inf)
for view_idx, view in enumerate(views):
print(f"处理视图 {view_idx+1}/{len(views)}...")
R = view.pose[:3, :3]
t = view.pose[:3, 3]
K = view.K
h, w = view.image.shape[:2]
# 计算可见性分数
for face_idx in range(n_faces):
center = triangle_centers[face_idx]
normal = triangle_normals[face_idx]
# 转换到相机坐标系
center_cam = R @ center + t
if center_cam[2] <= 0.1: # 在相机后方或太近
continue
# 投影到图像平面
u = K[0, 0] * center_cam[0] / center_cam[2] + K[0, 2]
v = K[1, 1] * center_cam[1] / center_cam[2] + K[1, 2]
if u < 0 or u >= w or v < 0 or v >= h:
continue
# 视角评分
view_dir = -center_cam / np.linalg.norm(center_cam)
normal_cam = R @ normal
# 角度评分(法向与视线对齐)
angle_score = max(0, view_dir.dot(normal_cam))
# 距离评分(适中距离最佳)
dist = np.linalg.norm(center_cam)
dist_score = np.exp(-((dist - 2.0) ** 2) / 2.0)
# 图像中心度
center_score = (1 - abs(u - w/2) / (w/2)) * (1 - abs(v - h/2) / (h/2))
# 综合评分
score = angle_score * dist_score * center_score
if score > best_scores[face_idx]:
best_scores[face_idx] = score
best_views[face_idx] = view_idx
# 处理未被观测到的面(使用最近邻填充)
unobserved = best_views == -1
if np.any(unobserved):
observed_centers = triangle_centers[~unobserved]
observed_views = best_views[~unobserved]
for idx in np.where(unobserved)[0]:
# 找到最近的已观测面
distances = np.linalg.norm(observed_centers - triangle_centers[idx], axis=1)
nearest = np.argmin(distances)
best_views[idx] = observed_views[nearest]
return best_views
class TextureBlender:
"""
纹理融合器
实现多图融合与无缝处理
"""
def __init__(self, mesh: o3d.geometry.TriangleMesh,
texture_size: int = 4096):
self.mesh = mesh
self.texture_size = texture_size
self.texture_image = np.zeros((texture_size, texture_size, 3), dtype=np.float32)
self.weight_map = np.zeros((texture_size, texture_size), dtype=np.float32)
def rasterize_face(self, face_idx: int, view: ViewInfo,
uv_coords: np.ndarray, blend_mode: str = 'linear'):
"""
光栅化单个面到纹理图
Args:
face_idx: 面索引
view: 视图信息
uv_coords: 面的UV坐标 (3, 2)
blend_mode: 混合模式 ('linear', 'multiband')
"""
# 获取3D顶点
tri = np.asarray(self.mesh.triangles)[face_idx]
vertices_3d = np.asarray(self.mesh.vertices)[tri]
# 投影到图像
R, t = view.pose[:3, :3], view.pose[:3, 3]
K = view.K
# 转换到相机坐标并投影
verts_cam = (R @ vertices_3d.T + t.reshape(3, 1)).T
verts_2d = (K @ verts_cam.T).T
verts_2d = verts_2d[:, :2] / verts_2d[:, 2:3]
# UV包围盒
uv_pixels = uv_coords * self.texture_size
min_u = int(np.min(uv_pixels[:, 0]))
max_u = int(np.max(uv_pixels[:, 0])) + 1
min_v = int(np.min(uv_pixels[:, 1]))
max_v = int(np.max(uv_pixels[:, 1])) + 1
# 裁剪
min_u = max(0, min_u)
max_u = min(self.texture_size, max_u)
min_v = max(0, min_v)
max_v = min(self.texture_size, max_v)
# 光栅化
for v in range(min_v, max_v):
for u in range(min_u, max_u):
p = np.array([u, v])
# 重心坐标测试
if not self._point_in_triangle(p, uv_pixels):
continue
bary = self._barycentric_coords(p, uv_pixels)
# 计算3D位置
point_3d = bary[0] * vertices_3d[0] + \
bary[1] * vertices_3d[1] + \
bary[2] * vertices_3d[2]
# 投影到图像获取颜色
point_cam = R @ point_3d + t
img_u = int(K[0, 0] * point_cam[0] / point_cam[2] + K[0, 2])
img_v = int(K[1, 1] * point_cam[1] / point_cam[2] + K[1, 2])
h, w = view.image.shape[:2]
if not (0 <= img_u < w and 0 <= img_v < h):
continue
color = view.image[img_v, img_u].astype(np.float32)
# 计算权重(基于视角角度)
normal = np.asarray(self.mesh.triangle_normals)[face_idx]
view_dir = -point_cam / np.linalg.norm(point_cam)
normal_cam = R @ normal
weight = max(0, view_dir.dot(normal_cam))
# 混合
if blend_mode == 'linear':
self.texture_image[v, u] += color * weight
self.weight_map[v, u] += weight
elif blend_mode == 'feather':
# 羽化边缘
bary_min = np.min(bary)
feather_weight = weight * min(1.0, bary_min * 10)
self.texture_image[v, u] += color * feather_weight
self.weight_map[v, u] += feather_weight
def _point_in_triangle(self, p: np.ndarray, tri: np.ndarray) -> bool:
"""检查点是否在三角形内"""
def area(p1, p2, p3):
return abs((p1[0]*(p2[1]-p3[1]) + p2[0]*(p3[1]-p1[1]) + p3[0]*(p1[1]-p2[1])) / 2.0)
A = area(tri[0], tri[1], tri[2])
A1 = area(p, tri[1], tri[2])
A2 = area(tri[0], p, tri[2])
A3 = area(tri[0], tri[1], p)
return abs(A - (A1 + A2 + A3)) < 1e-6
def _barycentric_coords(self, p: np.ndarray, tri: np.ndarray) -> np.ndarray:
"""计算重心坐标"""
v0 = tri[1] - tri[0]
v1 = tri[2] - tri[0]
v2 = p - tri[0]
d00 = np.dot(v0, v0)
d01 = np.dot(v0, v1)
d11 = np.dot(v1, v1)
d20 = np.dot(v2, v0)
d21 = np.dot(v2, v1)
denom = d00 * d11 - d01 * d01
if abs(denom) < 1e-10:
return np.array([1/3, 1/3, 1/3])
v = (d11 * d20 - d01 * d21) / denom
w = (d00 * d21 - d01 * d20) / denom
u = 1 - v - w
return np.array([u, v, w])
def normalize_texture(self):
"""归一化纹理图(除以权重)"""
mask = self.weight_map > 0
self.texture_image[mask] /= self.weight_map[mask, np.newaxis]
self.texture_image = np.clip(self.texture_image, 0, 255).astype(np.uint8)
def multiband_blend(self, views: List[ViewInfo], best_views: np.ndarray,
num_bands: int = 6) -> np.ndarray:
"""
多频段融合
Args:
views: 视图列表
best_views: 每个面的最佳视图
num_bands: 频段数量
Returns:
融合后的纹理图
"""
print("执行多频段融合...")
# 为每个频段创建权重图和颜色图
band_textures = []
for band in range(num_bands):
sigma = 2 ** band
print(f" 处理频段 {band+1}/{num_bands}, sigma={sigma}")
# 创建该频段的纹理
band_texture = np.zeros((self.texture_size, self.texture_size, 3))
band_weight = np.zeros((self.texture_size, self.texture_size))
# 收集贡献
for face_idx, view_idx in enumerate(best_views):
if view_idx < 0:
continue
view = views[view_idx]
# 获取面的UV
tri = np.asarray(self.mesh.triangles)[face_idx]
uvs = np.asarray(self.mesh.texture_uvs)[tri]
# 临时光栅化
temp_tex = np.zeros((self.texture_size, self.texture_size, 3))
temp_weight = np.zeros((self.texture_size, self.texture_size))
# 简化的光栅化(实际应使用更高效的实现)
self._rasterize_face_simple(face_idx, view, uvs, temp_tex, temp_weight)
# 高斯模糊(当前频段)
if sigma > 0:
temp_tex = gaussian_filter(temp_tex, sigma=sigma)
temp_weight = gaussian_filter(temp_weight, sigma=sigma)
# 如果是第一个频段,保留全部;否则减去低频
if band > 0:
prev_sigma = 2 ** (band - 1)
low_freq = gaussian_filter(temp_tex, sigma=prev_sigma)
temp_tex = temp_tex - low_freq
band_texture += temp_tex
band_weight += temp_weight
# 归一化
mask = band_weight > 0
band_texture[mask] /= band_weight[mask, np.newaxis]
band_textures.append(band_texture)
# 合并所有频段
final_texture = np.sum(band_textures, axis=0)
final_texture = np.clip(final_texture, 0, 255).astype(np.uint8)
return final_texture
def _rasterize_face_simple(self, face_idx: int, view: ViewInfo,
uv_coords: np.ndarray,
texture: np.ndarray, weight: np.ndarray):
"""简化的面光栅化"""
# 类似rasterize_face的实现,但写入外部数组
pass # 实现细节省略,与rasterize_face类似
def poisson_blend(self, views: List[ViewInfo], best_views: np.ndarray) -> np.ndarray:
"""
泊松融合(无缝融合)
在梯度域进行融合,保持细节同时消除接缝
"""
print("执行泊松融合...")
# 构建目标梯度场
target_gradient = np.zeros((self.texture_size, self.texture_size, 3, 2))
guidance_field = np.zeros((self.texture_size, self.texture_size, 3, 2))
# 收集所有贡献的梯度
for face_idx, view_idx in enumerate(best_views):
if view_idx < 0:
continue
view = views[view_idx]
# 计算该视图在该面的梯度贡献
# 实际实现需要计算图像梯度并投影到UV空间
pass # 复杂实现,需构建稀疏线性系统
# 求解泊松方程
# 简化:使用OpenCV的seamlessClone作为替代
result = self.texture_image.copy()
return result.astype(np.uint8)
class TextureMapper:
"""
纹理映射主类
整合UV展开、视角选择和融合
"""
def __init__(self, mesh: o3d.geometry.TriangleMesh):
self.mesh = mesh
self.unwrapper = UVUnwrapper(mesh)
self.view_selector = ViewSelector(mesh)
def process(self, views: List[ViewInfo],
blend_mode: str = 'multiband',
texture_size: int = 4096) -> o3d.geometry.TriangleMesh:
"""
完整纹理映射流程
Args:
views: 视图列表
blend_mode: 融合模式 ('linear', 'multiband', 'poisson')
texture_size: 纹理尺寸
Returns:
带纹理的网格
"""
# 1. UV展开
print("步骤1: UV展开...")
self.unwrapper.unwrap_xatlas(resolution=texture_size)
# 2. 最佳视角选择
print("步骤2: 最佳视角选择...")
best_views = self.view_selector.select_best_views(views)
# 3. 纹理融合
print("步骤3: 纹理融合...")
blender = TextureBlender(self.mesh, texture_size)
if blend_mode == 'multiband':
texture = blender.multiband_blend(views, best_views)
elif blend_mode == 'poisson':
texture = blender.poisson_blend(views, best_views)
else: # linear
for face_idx, view_idx in enumerate(best_views):
if view_idx < 0:
continue
view = views[view_idx]
tri = np.asarray(self.mesh.triangles)[face_idx]
uvs = np.asarray(self.mesh.texture_uvs)[tri]
blender.rasterize_face(face_idx, view, uvs, 'linear')
blender.normalize_texture()
texture = blender.texture_image
# 4. 应用纹理
self.mesh.textures = [o3d.geometry.Image(texture)]
# 保存最佳视图信息(用于调试)
self.best_views = best_views
return self.mesh
def save_textured_mesh(self, filepath: str):
"""保存带纹理的网格"""
# 确保目录存在
os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.',
exist_ok=True)
# 保存OBJ格式(包含纹理坐标和材质引用)
success = o3d.io.write_triangle_mesh(filepath, self.mesh,
write_triangle_uvs=True,
print_progress=True)
if success:
print(f"已保存: {filepath}")
# 同时保存纹理图像
texture_path = filepath.replace('.obj', '_texture.png').replace('.ply', '_texture.png')
if self.mesh.textures:
o3d.io.write_image(texture_path, self.mesh.textures[0])
print(f"纹理已保存: {texture_path}")
else:
print("保存失败")
def main():
parser = argparse.ArgumentParser(description='纹理映射工具')
parser.add_argument('--input', '-i', type=str, required=True, help='输入网格')
parser.add_argument('--output', '-o', type=str, required=True, help='输出网格')
parser.add_argument('--mode', type=str, default='multi',
choices=['unwrap', 'single', 'multi', 'seamless'],
help='处理模式')
parser.add_argument('--image', type=str, help='单张纹理图(single模式)')
parser.add_argument('--images', type=str, help='图像目录(multi/seamless模式)')
parser.add_argument('--poses', type=str, help='相机位姿文件 (.npy)')
parser.add_argument('--intrinsics', type=str, help='相机内参文件 (.npy)')
parser.add_argument('--texture_size', type=int, default=4096, help='纹理尺寸')
parser.add_argument('--blend', type=str, default='multiband',
choices=['linear', 'multiband', 'poisson'],
help='融合算法')
args = parser.parse_args()
# 加载网格
mesh = o3d.io.read_triangle_mesh(args.input)
if args.mode == 'unwrap':
# 仅UV展开
unwrapper = UVUnwrapper(mesh)
unwrapper.unwrap_xatlas(resolution=args.texture_size)
# 保存UV布局可视化
uv_vis = unwrapper.visualize_uv_layout()
cv2.imwrite(args.output.replace('.obj', '_uv_layout.png'), uv_vis)
o3d.io.write_triangle_mesh(args.output, mesh)
elif args.mode == 'single':
# 单图纹理映射
image = cv2.imread(args.image)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# 需要UV坐标
if not mesh.has_texture_coordinates(0):
unwrapper = UVUnwrapper(mesh)
unwrapper.unwrap_xatlas()
# 简单投影(假设正交或已知相机参数)
mesh.textures = [o3d.geometry.Image(image)]
o3d.io.write_triangle_mesh(args.output, mesh)
elif args.mode in ['multi', 'seamless']:
# 多图融合
# 加载图像
image_files = sorted([f for f in os.listdir(args.images)
if f.endswith(('.jpg', '.png'))])
images = [cv2.imread(os.path.join(args.images, f)) for f in image_files]
images = [cv2.cvtColor(img, cv2.COLOR_BGR2RGB) for img in images]
# 加载位姿和内参
poses = np.load(args.poses) # (N, 4, 4)
intrinsics = np.load(args.intrinsics) # (N, 3, 3) 或 (3, 3)
if intrinsics.ndim == 2:
intrinsics = np.repeat(intrinsics[np.newaxis], len(images), axis=0)
# 构建视图列表
views = []
for i, (img, pose, K) in enumerate(zip(images, poses, intrinsics)):
views.append(ViewInfo(image=img, pose=pose, K=K, image_id=i))
# 执行纹理映射
mapper = TextureMapper(mesh)
textured_mesh = mapper.process(views,
blend_mode=args.blend,
texture_size=args.texture_size)
# 保存
mapper.save_textured_mesh(args.output)
if __name__ == '__main__':
main()
6.3 多传感器融合
6.3.1 相机-激光雷达联合标定(手眼标定)
相机与激光雷达的联合标定是多传感器感知系统的基础,旨在建立两个传感器坐标系之间的刚性变换关系。手眼标定(Hand-Eye Calibration)源于机器人学,解决"眼在手上"(Eye-in-Hand)或"眼在手外"(Eye-to-Hand)的标定问题。对于固定安装的传感器组,属于Eye-to-Hand配置,通过采集多组数据,求解旋转和平移的最小二乘解。
标定方法分为基于特征和基于运动两类。基于特征的方法利用标定板(棋盘格或圆形阵列),在图像中检测角点,在点云中检测平面或边缘,建立3D-3D或3D-2D对应关系。基于运动的方法(如Kalibr中的Camera-IMU标定)通过同步采集运动轨迹,利用手眼标定方程AX=XB 求解外参,其中A 和B 分别为相机和雷达的运动变换。
实际部署中,需考虑时间同步(硬件触发或软件对齐)、反射率差异(激光雷达对高反/低反表面的响应)和视场角不匹配(相机FOV通常小于雷达)等问题。标定精度评估通过重投影误差(点云投影到图像的像素偏差)和深度一致性(图像深度与雷达深度的相关性)进行量化。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: camera_lidar_calibration.py
Content: 相机-激光雷达联合标定(手眼标定与基于特征的标定)
Usage:
1. 采集数据: python camera_lidar_calibration.py --mode capture --lidar_topic /velodyne_points --image_topic /camera/image --output ./calib_data
2. 特征检测: python camera_lidar_calibration.py --mode detect --input ./calib_data --pattern chessboard
3. 执行标定: python camera_lidar_calibration.py --mode calibrate --input ./calib_data --camera camera.yml --output calib_result.yml
4. 验证标定: python camera_lidar_calibration.py --mode verify --calib calib_result.yml --pointcloud scan.pcd --image img.png
Features:
- 支持棋盘格和圆形标定板
- 自动点云平面分割与边缘提取
- 手眼标定(AX=XB求解)
- 基于重投影误差的优化
- 实时验证与可视化
"""
import cv2
import numpy as np
import open3d as o3d
import argparse
import os
import yaml
from typing import List, Tuple, Optional, Dict
from dataclasses import dataclass
from scipy.optimize import least_squares
from scipy.spatial.transform import Rotation
@dataclass
class CalibrationData:
"""标定数据对"""
image: np.ndarray # 彩色图像
pointcloud: o3d.geometry.PointCloud # 激光雷达点云
timestamp: float # 时间戳
@dataclass
class CalibrationResult:
"""标定结果"""
R: np.ndarray # 旋转矩阵 (3, 3) 雷达->相机
t: np.ndarray # 平移向量 (3,) 雷达->相机
reprojection_error: float
inlier_ratio: float
class CameraLiDARCalibrator:
"""
相机-激光雷达联合标定器
实现基于特征和手眼标定方法
"""
def __init__(self, camera_matrix: np.ndarray,
dist_coeffs: np.ndarray,
pattern_size: Tuple[int, int] = (9, 6),
square_size: float = 0.08):
"""
初始化标定器
Args:
camera_matrix: 相机内参 (3, 3)
dist_coeffs: 畸变系数
pattern_size: 标定板角点数 (w, h)
square_size: 方格尺寸(米)
"""
self.camera_matrix = camera_matrix
self.dist_coeffs = dist_coeffs
self.pattern_size = pattern_size
self.square_size = square_size
# 准备标定板3D模型
self.board_3d = np.zeros((pattern_size[0] * pattern_size[1], 3), np.float32)
self.board_3d[:, :2] = np.mgrid[0:pattern_size[0], 0:pattern_size[1]].T.reshape(-1, 2)
self.board_3d *= square_size
# 存储数据
self.calibration_data: List[CalibrationData] = []
self.detected_corners: List[np.ndarray] = [] # 图像角点
self.detected_planes: List[np.ndarray] = [] # 点云平面点
def detect_chessboard_in_image(self, image: np.ndarray) -> Optional[np.ndarray]:
"""
在图像中检测棋盘格角点
Args:
image: 输入图像
Returns:
角点像素坐标 (N, 2) 或 None
"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
ret, corners = cv2.findChessboardCorners(
gray, self.pattern_size,
cv2.CALIB_CB_ADAPTIVE_THRESH +
cv2.CALIB_CB_NORMALIZE_IMAGE +
cv2.CALIB_CB_FAST_CHECK
)
if ret:
# 亚像素优化
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
corners = cv2.cornerSubPix(gray, corners, (11, 11), (-1, -1), criteria)
return corners.reshape(-1, 2)
return None
def detect_board_in_pointcloud(self, pointcloud: o3d.geometry.PointCloud,
distance_threshold: float = 0.02) -> Optional[Tuple[np.ndarray, np.ndarray]]:
"""
在点云中检测标定板平面
Args:
pointcloud: 输入点云
distance_threshold: 平面拟合距离阈值
Returns:
(平面内点索引, 平面参数 (nx, ny, nz, d)) 或 None
"""
# RANSAC平面分割
plane_model, inliers = pointcloud.segment_plane(
distance_threshold=distance_threshold,
ransac_n=3,
num_iterations=1000
)
if len(inliers) < 100: # 平面点太少
return None
# 提取平面点云
plane_cloud = pointcloud.select_by_index(inliers)
points = np.asarray(plane_cloud.points)
# 检查平面尺寸(标定板应有一定大小)
bbox = plane_cloud.get_axis_aligned_bounding_box()
extent = bbox.get_extent()
min_board_size = self.pattern_size[0] * self.square_size * 0.8
max_board_size = self.pattern_size[0] * self.square_size * 1.5
if not (min_board_size < extent[0] < max_board_size and
min_board_size < extent[1] < max_board_size):
return None
# 检查平面法向(应大致朝向传感器)
normal = np.array(plane_model[:3])
centroid = np.mean(points, axis=0)
# 法向应指向原点(传感器)
if np.dot(normal, centroid) > 0:
normal = -normal
plane_model[:3] = normal
return np.array(inliers), plane_model
def extract_board_corners_from_pointcloud(self,
pointcloud: o3d.geometry.PointCloud,
plane_model: np.ndarray,
image_corners: np.ndarray) -> Optional[np.ndarray]:
"""
从点云平面提取对应于图像角点的3D位置
策略:利用图像角点的相对位置关系,在点云平面上找到对应模式
"""
# 提取平面点
plane_inliers, _ = pointcloud.segment_plane(
distance_threshold=0.02, ransac_n=3, num_iterations=1000
)
plane_points = np.asarray(pointcloud.points)[plane_inliers]
# 将平面点投影到2D坐标系(PCA)
centroid = np.mean(plane_points, axis=0)
centered = plane_points - centroid
# 计算主成分
_, _, Vt = np.linalg.svd(centered)
basis_x = Vt[0] # 平面内第一主方向
basis_y = Vt[1] # 平面内第二主方向
# 投影到2D
coords_2d = np.column_stack([
centered @ basis_x,
centered @ basis_y
])
# 在图像中,角点是有序的网格
# 在点云中,我们需要找到类似的网格模式
# 使用图像角点的相对位置作为模板
template = self.board_3d[:, :2] # (N, 2)
# 使用ICP或模板匹配找到最佳对齐
# 简化:使用密度峰值检测找到角点候选
# 实际实现应使用更鲁棒的角点检测算法
# 这里使用几何中心+尺度估计作为近似
# 计算边界框
min_xy = coords_2d.min(axis=0)
max_xy = coords_2d.max(axis=0)
# 根据图像角点比例放置3D角点
image_size = np.array([self.pattern_size[0] - 1, self.pattern_size[1] - 1])
corners_3d = []
for corner_2d in self.board_3d[:, :2]:
# 归一化坐标
norm_coord = corner_2d / (image_size * self.square_size)
# 映射到点云2D坐标
pc_coord_2d = min_xy + norm_coord * (max_xy - min_xy)
# 反投影到3D
pc_coord_3d = centroid + pc_coord_2d[0] * basis_x + pc_coord_2d[1] * basis_y
# 投影回平面(确保在平面上)
distance_to_plane = (np.dot(pc_coord_3d, plane_model[:3]) + plane_model[3])
pc_coord_3d = pc_coord_3d - distance_to_plane * plane_model[:3]
corners_3d.append(pc_coord_3d)
return np.array(corners_3d)
def add_calibration_frame(self, image: np.ndarray,
pointcloud: o3d.geometry.PointCloud) -> bool:
"""
添加一帧标定数据
Returns:
是否成功检测到标定板
"""
# 检测图像角点
image_corners = self.detect_chessboard_in_image(image)
if image_corners is None:
print("未能在图像中检测到棋盘格")
return False
# 检测点云平面
result = self.detect_board_in_pointcloud(pointcloud)
if result is None:
print("未能在点云中检测到标定板平面")
return False
plane_inliers, plane_model = result
# 提取点云角点(近似)
pointcloud_corners = self.extract_board_corners_from_pointcloud(
pointcloud, plane_model, image_corners
)
if pointcloud_corners is None:
return False
# 存储数据
self.calibration_data.append(CalibrationData(
image=image,
pointcloud=pointcloud,
timestamp=0.0
))
self.detected_corners.append(image_corners)
self.detected_planes.append(pointcloud_corners)
print(f"成功添加标定帧 #{len(self.calibration_data)}")
return True
def calibrate(self) -> CalibrationResult:
"""
执行标定(基于3D-2D对应)
使用PnP求解相机相对于标定板的位姿,
然后建立雷达-相机变换
"""
if len(self.calibration_data) < 3:
raise ValueError("至少需要3组标定数据")
# 收集所有3D-2D对应
all_obj_points = []
all_img_points = []
for corners_3d, corners_2d in zip(self.detected_planes, self.detected_corners):
all_obj_points.append(corners_3d)
all_img_points.append(corners_2d)
# 方法1:分别求解每个视角的位姿,然后求解手眼标定
camera_poses = [] # 标定板到相机的变换
lidar_poses = [] # 标定板到雷达的变换
for obj_pts, img_pts in zip(all_obj_points, all_img_points):
# PnP求解相机位姿
success, rvec, tvec = cv2.solvePnP(
obj_pts.astype(np.float32),
img_pts.astype(np.float32),
self.camera_matrix,
self.dist_coeffs
)
if success:
R_cam, _ = cv2.Rodrigues(rvec)
# 标定板到相机的变换
T_cam = np.eye(4)
T_cam[:3, :3] = R_cam
T_cam[:3, 3] = tvec.flatten()
camera_poses.append(T_cam)
# 标定板到雷达的变换(假设雷达坐标系中,标定板在z=0平面)
# 简化:假设点云角点已在雷达坐标系中
T_lidar = np.eye(4)
T_lidar[:3, 3] = -obj_pts[0] # 平移到第一个角点
lidar_poses.append(T_lidar)
# 手眼标定:求解雷达到相机的变换 X
# 对于每个视角:A * X = X * B
# 其中 A = camera_pose[i]^-1 * camera_pose[j]
# B = lidar_pose[i]^-1 * lidar_pose[j]
A_list = []
B_list = []
for i in range(len(camera_poses)):
for j in range(i+1, len(camera_poses)):
A = np.linalg.inv(camera_poses[i]) @ camera_poses[j]
B = np.linalg.inv(lidar_poses[i]) @ lidar_poses[j]
A_list.append(A)
B_list.append(B)
# 求解AX=XB(Tsai-Lenz算法)
R_lidar_to_cam, t_lidar_to_cam = self._solve_hand_eye(A_list, B_list)
# 优化:使用所有数据联合优化
result = self._optimize_extrinsics(
R_lidar_to_cam, t_lidar_to_cam,
all_obj_points, all_img_points
)
return result
def _solve_hand_eye(self, A_list: List[np.ndarray],
B_list: List[np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
"""
求解手眼标定方程 AX = XB
使用Tsai-Lenz算法
"""
# 提取旋转和平移
n = len(A_list)
# 构建方程组求解旋转
S = np.zeros((3*n, 3))
v = np.zeros(3*n)
for i, (A, B) in enumerate(zip(A_list, B_list)):
# 旋转的轴角表示
Ra = Rotation.from_matrix(A[:3, :3]).as_rotvec()
Rb = Rotation.from_matrix(B[:3, :3]).as_rotvec()
# 构建方程
S[3*i:3*i+3] = self._skew_symmetric(Ra + Rb)
v[3*i:3*i+3] = Rb - Ra
# 求解
try:
# 使用最小二乘
theta = np.linalg.lstsq(S, v, rcond=None)[0]
theta_norm = np.linalg.norm(theta)
if theta_norm > 1e-6:
R = Rotation.from_rotvec(theta * np.pi / theta_norm).as_matrix()
else:
R = np.eye(3)
except:
R = np.eye(3)
# 求解平移
C = np.zeros((3*n, 3))
d = np.zeros(3*n)
for i, (A, B) in enumerate(zip(A_list, B_list)):
C[3*i:3*i+3] = A[:3, :3] - np.eye(3)
d[3*i:3*i+3] = R @ B[:3, 3] - A[:3, 3]
try:
t = np.linalg.lstsq(C, d, rcond=None)[0]
except:
t = np.zeros(3)
return R, t
def _skew_symmetric(self, v: np.ndarray) -> np.ndarray:
"""向量转反对称矩阵"""
return np.array([[0, -v[2], v[1]],
[v[2], 0, -v[0]],
[-v[1], v[0], 0]])
def _optimize_extrinsics(self, R_init: np.ndarray, t_init: np.ndarray,
obj_points_list: List[np.ndarray],
img_points_list: List[np.ndarray]) -> CalibrationResult:
"""
使用非线性优化精修外参
最小化重投影误差
"""
# 参数化:使用轴角表示旋转
rvec_init, _ = cv2.Rodrigues(R_init)
params_init = np.concatenate([rvec_init.flatten(), t_init])
def residual_func(params):
rvec = params[:3]
t = params[3:6]
R, _ = cv2.Rodrigues(rvec)
residuals = []
for obj_pts, img_pts in zip(obj_points_list, img_points_list):
# 将雷达坐标系中的点变换到相机坐标系
obj_pts_cam = (R @ obj_pts.T + t.reshape(3, 1)).T
# 投影到图像
proj_pts, _ = cv2.projectPoints(
obj_pts_cam,
np.zeros(3), np.zeros(3),
self.camera_matrix, self.dist_coeffs
)
proj_pts = proj_pts.reshape(-1, 2)
# 残差
residuals.extend((proj_pts - img_pts).ravel())
return np.array(residuals)
# 最小二乘优化
result = least_squares(residual_func, params_init, method='lm')
# 提取优化结果
rvec_opt = result.x[:3]
t_opt = result.x[3:6]
R_opt, _ = cv2.Rodrigues(rvec_opt)
# 计算最终误差
residuals = residual_func(result.x)
rmse = np.sqrt(np.mean(residuals**2))
# 计算内点率
inlier_threshold = 3.0 # 像素
inliers = np.abs(residuals) < inlier_threshold
inlier_ratio = np.sum(inliers) / len(residuals)
return CalibrationResult(
R=R_opt,
t=t_opt,
reprojection_error=rmse,
inlier_ratio=inlier_ratio
)
def project_pointcloud_to_image(self, pointcloud: o3d.geometry.PointCloud,
result: CalibrationResult,
image_shape: Tuple[int, int]) -> np.ndarray:
"""
将点云投影到图像平面(用于验证)
Args:
pointcloud: 输入点云
result: 标定结果
image_shape: 图像尺寸 (H, W)
Returns:
投影图像
"""
h, w = image_shape[:2]
image = np.zeros((h, w, 3), dtype=np.uint8)
points = np.asarray(pointcloud.points)
colors = np.asarray(pointcloud.colors) if pointcloud.has_colors() else \
np.ones((len(points), 3)) * 255
# 变换到相机坐标系
points_cam = (result.R @ points.T + result.t.reshape(3, 1)).T
# 仅保留相机前方的点
valid = points_cam[:, 2] > 0
points_cam = points_cam[valid]
colors = colors[valid]
# 投影到图像
proj = (self.camera_matrix @ points_cam.T).T
proj = proj[:, :2] / proj[:, 2:3]
# 绘制
for (u, v), color in zip(proj, colors):
u, v = int(u), int(v)
if 0 <= u < w and 0 <= v < h:
cv2.circle(image, (u, v), 2,
(int(color[2]*255), int(color[1]*255), int(color[0]*255)), -1)
return image
def save_calibration(self, filepath: str, result: CalibrationResult):
"""保存标定结果"""
data = {
'R': result.R.tolist(),
't': result.t.tolist(),
'reprojection_error': float(result.reprojection_error),
'inlier_ratio': float(result.inlier_ratio),
'camera_matrix': self.camera_matrix.tolist(),
'dist_coeffs': self.dist_coeffs.tolist()
}
with open(filepath, 'w') as f:
yaml.dump(data, f)
print(f"标定结果已保存: {filepath}")
def load_calibration(self, filepath: str) -> CalibrationResult:
"""加载标定结果"""
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
return CalibrationResult(
R=np.array(data['R']),
t=np.array(data['t']),
reprojection_error=data['reprojection_error'],
inlier_ratio=data['inlier_ratio']
)
def main():
parser = argparse.ArgumentParser(description='相机-激光雷达联合标定')
parser.add_argument('--mode', type=str, required=True,
choices=['capture', 'detect', 'calibrate', 'verify'],
help='运行模式')
parser.add_argument('--input', '-i', type=str, help='输入目录')
parser.add_argument('--output', '-o', type=str, help='输出文件')
parser.add_argument('--camera', type=str, help='相机参数文件')
parser.add_argument('--calib', type=str, help='标定结果文件(验证模式)')
parser.add_argument('--pattern_size', type=str, default='9,6', help='棋盘格尺寸')
parser.add_argument('--square_size', type=float, default=0.08, help='方格尺寸(米)')
args = parser.parse_args()
if args.mode == 'capture':
# 数据采集模式(需要ROS或实际传感器驱动)
print("数据采集模式需要连接实际传感器")
print("请使用ROS bag录制或自定义驱动程序")
elif args.mode == 'detect':
# 特征检测模式
print("检测标定板特征...")
# 加载图像和点云,检测特征并保存
elif args.mode == 'calibrate':
# 执行标定
# 加载相机参数
fs = cv2.FileStorage(args.camera, cv2.FILE_STORAGE_READ)
camera_matrix = fs.getNode('camera_matrix').mat()
dist_coeffs = fs.getNode('dist_coeffs').mat()
fs.release()
pattern_size = tuple(map(int, args.pattern_size.split(',')))
calibrator = CameraLiDARCalibrator(camera_matrix, dist_coeffs,
pattern_size, args.square_size)
# 加载数据并执行标定
# ... 加载图像和点云文件 ...
result = calibrator.calibrate()
print(f"\n标定结果:")
print(f" 旋转矩阵:\n{result.R}")
print(f" 平移向量: {result.t}")
print(f" 重投影误差: {result.reprojection_error:.3f} 像素")
print(f" 内点率: {result.inlier_ratio:.2%}")
calibrator.save_calibration(args.output, result)
elif args.mode == 'verify':
# 验证标定结果
# 加载标定结果
fs = cv2.FileStorage(args.camera, cv2.FILE_STORAGE_READ)
camera_matrix = fs.getNode('camera_matrix').mat()
dist_coeffs = fs.getNode('dist_coeffs').mat()
fs.release()
calibrator = CameraLiDARCalibrator(camera_matrix, dist_coeffs)
result = calibrator.load_calibration(args.calib)
# 加载点云和图像进行投影验证
pcd = o3d.io.read_point_cloud(args.input)
image = cv2.imread(args.output) # 复用参数
projected = calibrator.project_pointcloud_to_image(
pcd, result, image.shape
)
# 融合显示
overlay = cv2.addWeighted(image, 0.5, projected, 0.5, 0)
cv2.imshow('Calibration Verification', overlay)
cv2.waitKey(0)
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
6.3.2 时空同步:硬件触发与软件对齐策略
多传感器系统的时间同步是数据融合的前提。相机和激光雷达通常以不同频率运行(相机30Hz,雷达10Hz),且存在传输延迟和采集抖动。硬件触发通过共享的物理信号(如PWM或GPIO)强制传感器在同一时刻采集,精度可达微秒级。软件对齐则依赖时间戳,通过最近邻搜索或插值匹配不同传感器的数据帧。
时间戳来源包括系统时间(软件)、PTP(Precision Time Protocol,网络同步)和GPS(全球定位系统)。对于SLAM等应用,还需考虑传感器内部的处理延迟(如激光雷达的扫描累积时间)。对齐策略包括:硬同步(硬件触发)、软同步(时间戳匹配)和异步融合(状态估计补偿)。OpenCV结合ROS或自定义驱动,可实现基于时间戳的消息过滤与同步。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: temporal_spatial_sync.py
Content: 多传感器时空同步系统:硬件触发接口与软件对齐
Usage:
1. 软件同步: python temporal_spatial_sync.py --mode software --camera_topic /cam --lidar_topic /lidar --tolerance 0.05
2. 硬件触发配置: python temporal_spatial_sync.py --mode hardware --device /dev/ttyUSB0 --freq 10
3. 时间戳校准: python temporal_spatial_sync.py --mode calibrate --bag input.bag --output offsets.txt
4. 回放同步数据: python temporal_spatial_sync.py --mode playback --sync_bag synced.bag
Features:
- 基于时间戳的最近邻匹配
- 线性插值对齐(IMU高频数据)
- 硬件触发信号生成(Arduino/树莓派GPIO)
- PTP/gPTP时间同步支持
- 延迟估计与补偿
"""
import numpy as np
import cv2
import argparse
import time
from typing import List, Dict, Optional, Tuple, Callable
from dataclasses import dataclass, field
from collections import deque
import threading
from queue import Queue, PriorityQueue
import serial
import struct
@dataclass(order=True)
class TimestampedData:
"""带时间戳的数据"""
timestamp: float
sensor_type: str = field(compare=False)
data: any = field(compare=False)
sequence: int = field(default=0, compare=False)
class SoftwareSynchronizer:
"""
软件同步器
基于时间戳的数据对齐与插值
"""
def __init__(self, tolerance: float = 0.05):
"""
初始化同步器
Args:
tolerance: 同步容差(秒)
"""
self.tolerance = tolerance
self.buffers: Dict[str, deque] = {}
self.callbacks: List[Callable] = []
self.running = False
def register_sensor(self, sensor_id: str, buffer_size: int = 100):
"""注册传感器数据流"""
self.buffers[sensor_id] = deque(maxlen=buffer_size)
def add_data(self, sensor_id: str, timestamp: float, data: any):
"""
添加传感器数据
Args:
sensor_id: 传感器标识
timestamp: 时间戳(秒,Unix时间或相对时间)
data: 传感器数据
"""
if sensor_id not in self.buffers:
self.register_sensor(sensor_id)
item = TimestampedData(timestamp, sensor_id, data)
self.buffers[sensor_id].append(item)
# 尝试同步
self._try_synchronize()
def _try_synchronize(self):
"""尝试找到同步的数据帧"""
if len(self.buffers) < 2:
return
# 获取所有传感器的最新时间戳
latest_timestamps = {}
for sid, buf in self.buffers.items():
if len(buf) > 0:
latest_timestamps[sid] = buf[-1].timestamp
if len(latest_timestamps) < 2:
return
# 找到时间戳最接近的组
reference_time = np.mean(list(latest_timestamps.values()))
synced_data = {}
for sid, buf in self.buffers.items():
# 找到最接近参考时间的数据
closest = min(buf, key=lambda x: abs(x.timestamp - reference_time))
if abs(closest.timestamp - reference_time) < self.tolerance:
synced_data[sid] = closest
# 如果所有传感器都有有效数据,触发回调
if len(synced_data) == len(self.buffers):
self._trigger_callbacks(synced_data)
# 移除已使用的数据
for sid, item in synced_data.items():
while self.buffers[sid] and self.buffers[sid][0].timestamp <= item.timestamp:
self.buffers[sid].popleft()
def _trigger_callbacks(self, synced_data: Dict[str, TimestampedData]):
"""触发同步回调"""
for callback in self.callbacks:
try:
callback(synced_data)
except Exception as e:
print(f"回调错误: {e}")
def register_callback(self, callback: Callable):
"""注册同步数据回调函数"""
self.callbacks.append(callback)
def interpolate_imu(self, target_time: float,
imu_buffer: deque) -> Optional[TimestampedData]:
"""
对IMU数据进行插值(线性插值)
Args:
target_time: 目标时间戳
imu_buffer: IMU数据缓冲区
Returns:
插值后的数据
"""
# 找到相邻的两个数据点
times = np.array([item.timestamp for item in imu_buffer])
if len(times) < 2:
return None
# 找到包围target_time的索引
idx = np.searchsorted(times, target_time)
if idx == 0 or idx >= len(times):
return None
# 线性插值
t0, t1 = times[idx-1], times[idx]
alpha = (target_time - t0) / (t1 - t0)
data0 = imu_buffer[idx-1].data
data1 = imu_buffer[idx].data
# 假设数据为numpy数组
interp_data = data0 + alpha * (data1 - data0)
return TimestampedData(target_time, 'imu', interp_data)
class HardwareTrigger:
"""
硬件触发器
生成同步触发信号
"""
def __init__(self, device: str = None, frequency: float = 10.0):
"""
初始化硬件触发器
Args:
device: 串口设备路径(如'/dev/ttyUSB0'或'COM3')
frequency: 触发频率(Hz)
"""
self.device = device
self.frequency = frequency
self.period = 1.0 / frequency
self.serial = None
self.running = False
self.trigger_thread = None
if device:
try:
self.serial = serial.Serial(device, 115200, timeout=1)
print(f"已连接硬件触发设备: {device}")
except Exception as e:
print(f"无法连接硬件设备: {e}")
def start(self):
"""启动触发信号生成"""
self.running = True
self.trigger_thread = threading.Thread(target=self._trigger_loop)
self.trigger_thread.start()
# 如果使用GPIO(树莓派)
try:
import RPi.GPIO as GPIO
self.use_gpio = True
self.trigger_pin = 18
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.trigger_pin, GPIO.OUT)
print("使用GPIO硬件触发")
except ImportError:
self.use_gpio = False
def stop(self):
"""停止触发"""
self.running = False
if self.trigger_thread:
self.trigger_thread.join()
if self.serial:
self.serial.close()
def _trigger_loop(self):
"""触发循环"""
next_trigger = time.time()
while self.running:
current_time = time.time()
if current_time >= next_trigger:
self._send_trigger()
next_trigger += self.period
# 精确睡眠
sleep_time = next_trigger - time.time()
if sleep_time > 0:
time.sleep(max(0, sleep_time - 0.001))
def _send_trigger(self):
"""发送触发信号"""
timestamp = time.time()
# 串口触发
if self.serial and self.serial.is_open:
# 发送时间戳(8字节double)
self.serial.write(struct.pack('d', timestamp))
self.serial.write(b'\n')
# GPIO触发(树莓派)
if hasattr(self, 'use_gpio') and self.use_gpio:
import RPi.GPIO as GPIO
GPIO.output(self.trigger_pin, GPIO.HIGH)
time.sleep(0.0001) # 100us脉冲
GPIO.output(self.trigger_pin, GPIO.LOW)
# 软件回调
self.on_trigger(timestamp)
def on_trigger(self, timestamp: float):
"""触发回调(可重写)"""
pass
class TimeCalibration:
"""
时间校准器
估计传感器间的时间偏移
"""
def __init__(self):
self.offsets = {}
def estimate_offset(self, sensor_a_data: List[Tuple[float, float]],
sensor_b_data: List[Tuple[float, float]]) -> float:
"""
估计两个传感器间的时间偏移(使用互相关)
Args:
sensor_a_data: [(timestamp, value), ...]
sensor_b_data: [(timestamp, value), ...]
Returns:
时间偏移(秒),正值表示B比A晚
"""
# 提取时间序列
times_a, values_a = zip(*sensor_a_data)
times_b, values_b = zip(*sensor_b_data)
# 重采样到统一时间轴
common_start = max(min(times_a), min(times_b))
common_end = min(max(times_a), max(times_b))
if common_start >= common_end:
return 0.0
# 插值
from scipy.interpolate import interp1d
interp_a = interp1d(times_a, values_a, kind='linear',
bounds_error=False, fill_value='extrapolate')
interp_b = interp1d(times_b, values_b, kind='linear',
bounds_error=False, fill_value='extrapolate')
# 统一采样
sample_times = np.linspace(common_start, common_end, 1000)
signal_a = interp_a(sample_times)
signal_b = interp_b(sample_times)
# 计算互相关
from scipy.signal import correlate
correlation = correlate(signal_a - signal_a.mean(),
signal_b - signal_b.mean(),
mode='full')
lags = np.arange(-len(signal_b) + 1, len(signal_a))
# 找到最大相关
max_idx = np.argmax(correlation)
lag = lags[max_idx]
# 转换为时间偏移
dt = sample_times[1] - sample_times[0]
offset = lag * dt
return offset
def calibrate_from_motion(self,
trajectory_a: List[Tuple[float, np.ndarray]],
trajectory_b: List[Tuple[float, np.ndarray]]) -> float:
"""
从运动轨迹估计时间偏移(适用于SLAM轨迹对齐)
Args:
trajectory_a: [(timestamp, pose), ...] pose为4x4矩阵或6D向量
trajectory_b: [(timestamp, pose), ...]
Returns:
时间偏移
"""
# 简化:使用位置范数作为信号
signal_a = [(t, np.linalg.norm(p[:3, 3])) for t, p in trajectory_a]
signal_b = [(t, np.linalg.norm(p[:3, 3])) for t, p in trajectory_b]
return self.estimate_offset(signal_a, signal_b)
class SpatialAligner:
"""
空间对齐器
基于运动的外参标定辅助
"""
def __init__(self):
self.trajectories = {}
def add_pose(self, sensor_id: str, timestamp: float, pose: np.ndarray):
"""
添加位姿数据
Args:
sensor_id: 传感器标识
timestamp: 时间戳
pose: 4x4变换矩阵
"""
if sensor_id not in self.trajectories:
self.trajectories[sensor_id] = []
self.trajectories[sensor_id].append((timestamp, pose))
def align_trajectories(self, ref_sensor: str, target_sensor: str) -> Tuple[np.ndarray, np.ndarray]:
"""
对齐两条轨迹(Umeyama算法)
Args:
ref_sensor: 参考传感器
target_sensor: 目标传感器
Returns:
(R, t) 目标到参考的变换
"""
ref_traj = np.array([p for t, p in self.trajectories[ref_sensor]])
tgt_traj = np.array([p for t, p in self.trajectories[target_sensor]])
# 提取平移部分
ref_pts = ref_traj[:, :3, 3]
tgt_pts = tgt_traj[:, :3, 3]
# Umeyama算法
return self._umeyama_alignment(tgt_pts.T, ref_pts.T)
def _umeyama_alignment(self, X: np.ndarray, Y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Umeyama算法:最小二乘刚性对齐
Args:
X: (3, N) 源点集
Y: (3, N) 目标点集
Returns:
(R, t) 使得 Y ≈ R @ X + t
"""
# 计算质心
mx = np.mean(X, axis=1, keepdims=True)
my = np.mean(Y, axis=1, keepdims=True)
# 去质心
Xc = X - mx
Yc = Y - my
# 计算协方差
S = Yc @ Xc.T / X.shape[1]
# SVD分解
U, D, Vt = np.linalg.svd(S)
# 计算旋转
D = np.eye(3)
D[2, 2] = np.linalg.det(U) * np.linalg.det(Vt.T)
R = U @ D @ Vt
# 计算平移
t = my - R @ mx
return R, t.ravel()
def main():
parser = argparse.ArgumentParser(description='时空同步工具')
parser.add_argument('--mode', type=str, required=True,
choices=['software', 'hardware', 'calibrate', 'playback'],
help='运行模式')
parser.add_argument('--tolerance', type=float, default=0.05,
help='同步容差(秒)')
parser.add_argument('--device', type=str, help='硬件设备路径')
parser.add_argument('--frequency', type=float, default=10.0,
help='触发频率(Hz)')
parser.add_argument('--bag', type=str, help='ROS bag文件')
parser.add_argument('--output', '-o', type=str, help='输出文件')
args = parser.parse_args()
if args.mode == 'software':
# 软件同步示例
sync = SoftwareSynchronizer(tolerance=args.tolerance)
# 注册传感器
sync.register_sensor('camera')
sync.register_sensor('lidar')
sync.register_sensor('imu')
# 注册回调
def on_sync(synced_data):
print(f"同步数据 @ {list(synced_data.values())[0].timestamp:.3f}s")
for sid, data in synced_data.items():
print(f" {sid}: seq={data.sequence}")
sync.register_callback(on_sync)
# 模拟数据输入
print("模拟数据同步...")
base_time = time.time()
for i in range(100):
# 相机 30Hz
if i % 3 == 0:
sync.add_data('camera', base_time + i*0.033, f'frame_{i//3}')
# 雷达 10Hz
if i % 10 == 0:
sync.add_data('lidar', base_time + i*0.1 + 0.01, f'scan_{i//10}') # 10ms延迟
# IMU 100Hz
sync.add_data('imu', base_time + i*0.01, np.random.randn(6))
time.sleep(0.01)
elif args.mode == 'hardware':
# 硬件触发
trigger = HardwareTrigger(device=args.device, frequency=args.frequency)
def on_trigger(ts):
print(f"触发 @ {ts:.6f}")
trigger.on_trigger = on_trigger
print(f"启动硬件触发,频率 {args.frequency}Hz")
trigger.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
trigger.stop()
elif args.mode == 'calibrate':
# 时间校准
print("时间校准模式需要ROS bag文件")
print("分析bag文件中的传感器数据,估计时间偏移...")
# 这里应使用rosbag库读取数据
# 简化示例
elif args.mode == 'playback':
# 回放同步数据
print("回放模式")
if __name__ == '__main__':
main()
6.3.3 多相机环视系统:BEV(鸟瞰图)生成
环视系统(Surround View)通过多个广角相机覆盖车辆周围360度视野,经过去畸变、透视变换和图像拼接,生成自顶向下的鸟瞰图(Bird's Eye View, BEV)。BEV表示将三维世界投影到地平面,消除透视畸变,便于障碍物检测、车位识别和路径规划。
关键技术包括:相机内参标定(去畸变)、外参标定(相机到车辆坐标系)、地面平面估计和投影变换。去畸变使用鱼眼模型或KB模型(Kannala-Brandt)校正广角镜头畸变。透视变换通过单应性矩阵将前视图像映射到BEV平面,需精确估计相机高度、俯仰角和地面法向。图像拼接在重叠区域采用多频段融合消除亮度差异和接缝。
高级BEV系统还包含深度估计(将图像像素提升到3D空间)和时序融合(累积多帧数据扩展视野)。OpenCV的fisheye模块和warpPerspective函数支持基础变换,而端到端深度学习方案(如Lift-Splat-Shoot、BEVFormer)直接从多视图特征生成BEV表示,无需显式几何变换。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: bev_generation.py
Content: 多相机环视系统与BEV(鸟瞰图)生成
Usage:
1. 标定相机: python bev_generation.py --mode calibrate --config camera_config.yml --images ./calib_imgs
2. 生成BEV: python bev_generation.py --mode generate --config camera_config.yml --input 4ch_video.mp4 --output bev_view.mp4
3. 实时BEV: python bev_generation.py --mode realtime --config camera_config.yml --sources 0,1,2,3
4. 3D BEV: python bev_generation.py --mode 3d --config camera_config.yml --depth_model midas --output 3d_bev
Features:
- 鱼眼相机去畸变(Kannala-Brandt模型)
- 自动外参标定(基于地平面)
- 多相机图像拼接与融合
- 深度学习深度估计(伪LiDAR)
- 时序BEV融合(扩展视野)
"""
import cv2
import numpy as np
import argparse
import yaml
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
import os
@dataclass
class CameraConfig:
"""相机配置"""
camera_id: str
width: int
height: int
fisheye: bool
K: np.ndarray # 内参
D: np.ndarray # 畸变系数
R_vehicle: np.ndarray # 相机到车辆旋转
t_vehicle: np.ndarray # 相机到车辆平移
ground_height: float = 0.0 # 相机离地面高度
# BEV参数
bev_width: int = 800
bev_height: int = 1200
bev_resolution: float = 0.02 # 米/像素
bev_range_x: Tuple[float, float] = (-10, 10) # 左右范围(米)
bev_range_z: Tuple[float, float] = (0, 20) # 前后范围(米)
class FisheyeUndistorter:
"""
鱼眼相机去畸变器
支持Kannala-Brandt和等距投影模型
"""
def __init__(self, config: CameraConfig):
self.config = config
if config.fisheye:
# 初始化鱼眼映射
self.map1, self.map2 = self._init_fisheye_maps()
else:
self.map1, self.map2 = cv2.initUndistortRectifyMap(
config.K, config.D, None, config.K,
(config.width, config.height), cv2.CV_32FC1
)
def _init_fisheye_maps(self) -> Tuple[np.ndarray, np.ndarray]:
"""初始化鱼眼去畸变映射"""
# 创建输出图像网格
x = np.arange(self.config.width)
y = np.arange(self.config.height)
xv, yv = np.meshgrid(x, y)
# 归一化坐标
x_norm = (xv - self.config.K[0, 2]) / self.config.K[0, 0]
y_norm = (yv - self.config.K[1, 2]) / self.config.K[1, 1]
# 计算极坐标
r = np.sqrt(x_norm**2 + y_norm**2)
theta = np.arctan2(y_norm, x_norm)
# Kannala-Brandt模型反投影
# r = theta + k1*theta^3 + k2*theta^5 + k3*theta^7 + k4*theta^9
# 这里简化使用OpenCV的fisheye模块
Knew = self.config.K.copy()
Knew[0, 0] *= 0.8 # 缩小焦距以保留更多内容
Knew[1, 1] *= 0.8
map1, map2 = cv2.fisheye.initUndistortRectifyMap(
self.config.K, self.config.D, np.eye(3), Knew,
(self.config.width, self.config.height), cv2.CV_32FC1
)
return map1, map2
def undistort(self, image: np.ndarray) -> np.ndarray:
"""去畸变图像"""
return cv2.remap(image, self.map1, self.map2,
interpolation=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT)
class BEVProjector:
"""
BEV投影器
将透视图像投影到鸟瞰平面
"""
def __init__(self, config: CameraConfig):
self.config = config
self.H = None # 单应性矩阵
self._compute_homography()
def _compute_homography(self):
"""
计算透视图像到BEV平面的单应性矩阵
假设地面为平面 z=0(车辆坐标系)
"""
# 车辆坐标系中的地面点(BEV平面)
# x: 左右, z: 前后, y=0: 地面
# BEV图像边界对应的地面范围
x_min, x_max = self.config.bev_range_x
z_min, z_max = self.config.bev_range_z
# BEV四个角对应的3D地面点(车辆坐标系)
ground_3d = np.array([
[x_max, 0, z_max], # 前右
[x_min, 0, z_max], # 前左
[x_min, 0, z_min], # 后左
[x_max, 0, z_min], # 后右
])
# 转换到相机坐标系
R = self.config.R_vehicle
t = self.config.t_vehicle
# 相机坐标系中的地面点
ground_cam = (R @ ground_3d.T + t.reshape(3, 1)).T
# 投影到图像平面
K = self.config.K
# 仅保留相机前方的点
valid = ground_cam[:, 2] > 0.1
if not np.all(valid):
print(f"警告: 相机 {self.config.camera_id} 部分BEV区域在后方")
ground_2d = (K @ ground_cam.T).T
ground_2d = ground_2d[:, :2] / ground_2d[:, 2:3]
# BEV图像坐标
bev_w = self.config.bev_width
bev_h = self.config.bev_height
bev_corners = np.array([
[bev_w, 0], # 前右 -> 右上
[0, 0], # 前左 -> 左上
[0, bev_h], # 后左 -> 左下
[bev_w, bev_h], # 后右 -> 右下
], dtype=np.float32)
# 计算单应性
self.H, _ = cv2.findHomography(
ground_2d.astype(np.float32),
bev_corners,
method=cv2.RANSAC,
ransacReprojThreshold=3.0
)
if self.H is None:
raise RuntimeError(f"无法计算相机 {self.config.camera_id} 的单应性矩阵")
def project(self, image: np.ndarray) -> np.ndarray:
"""
投影图像到BEV平面
Args:
image: 去畸变后的透视图像
Returns:
BEV图像
"""
bev = cv2.warpPerspective(
image, self.H,
(self.config.bev_width, self.config.bev_height),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
return bev
def create_blend_mask(self) -> np.ndarray:
"""
创建融合权重掩码(距离相机越远权重越小)
"""
# 创建距离场
y_coords, x_coords = np.mgrid[0:self.config.bev_height, 0:self.config.bev_width]
# 转换为车辆坐标
x_vehicle = self.config.bev_range_x[0] + x_coords * \
(self.config.bev_range_x[1] - self.config.bev_range_x[0]) / self.config.bev_width
z_vehicle = self.config.bev_range_z[1] - y_coords * \
(self.config.bev_range_z[1] - self.config.bev_range_z[0]) / self.config.bev_height
# 相机位置(车辆坐标系)
cam_pos = -self.config.R_vehicle.T @ self.config.t_vehicle
# 距离
distances = np.sqrt((x_vehicle - cam_pos[0])**2 + (z_vehicle - cam_pos[2])**2)
# 权重(高斯衰减)
weight = np.exp(-distances / 5.0) # 5米衰减
return weight
class SurroundViewSystem:
"""
环视系统
整合多相机BEV生成与拼接
"""
def __init__(self, config_file: str):
"""
初始化环视系统
Args:
config_file: 相机配置文件(YAML)
"""
self.cameras: Dict[str, CameraConfig] = {}
self.undistorters: Dict[str, FisheyeUndistorter] = {}
self.projectors: Dict[str, BEVProjector] = {}
self._load_config(config_file)
def _load_config(self, config_file: str):
"""加载相机配置"""
with open(config_file, 'r') as f:
data = yaml.safe_load(f)
for cam_data in data['cameras']:
config = CameraConfig(
camera_id=cam_data['id'],
width=cam_data['width'],
height=cam_data['height'],
fisheye=cam_data.get('fisheye', False),
K=np.array(cam_data['K']),
D=np.array(cam_data['D']),
R_vehicle=np.array(cam_data['R_vehicle']),
t_vehicle=np.array(cam_data['t_vehicle']),
ground_height=cam_data.get('ground_height', 1.5),
bev_width=data.get('bev_width', 800),
bev_height=data.get('bev_height', 1200),
bev_resolution=data.get('bev_resolution', 0.02)
)
self.cameras[config.camera_id] = config
self.undistorters[config.camera_id] = FisheyeUndistorter(config)
self.projectors[config.camera_id] = BEVProjector(config)
print(f"加载了 {len(self.cameras)} 个相机配置")
def process_frame(self, images: Dict[str, np.ndarray]) -> np.ndarray:
"""
处理一帧多相机图像
Args:
images: {camera_id: image} 字典
Returns:
拼接后的BEV图像
"""
bev_images = {}
masks = {}
# 生成各相机的BEV
for cam_id, image in images.items():
if cam_id not in self.cameras:
continue
# 去畸变
undistorted = self.undistorters[cam_id].undistort(image)
# 投影到BEV
bev = self.projectors[cam_id].project(undistorted)
bev_images[cam_id] = bev
# 生成权重掩码
masks[cam_id] = self.projectors[cam_id].create_blend_mask()
# 多频段融合拼接
return self._blend_bev_images(bev_images, masks)
def _blend_bev_images(self, bev_images: Dict[str, np.ndarray],
masks: Dict[str, np.ndarray]) -> np.ndarray:
"""
多频段融合多个BEV图像
"""
# 简单加权融合(可改进为多频段融合)
result = np.zeros_like(list(bev_images.values())[0], dtype=np.float32)
weight_sum = np.zeros((*result.shape[:2], 1), dtype=np.float32)
for cam_id, bev in bev_images.items():
mask = masks[cam_id][:, :, np.newaxis]
result += bev.astype(np.float32) * mask
weight_sum += mask
# 归一化
result = result / (weight_sum + 1e-6)
result = np.clip(result, 0, 255).astype(np.uint8)
return result
def add_vehicle_overlay(self, bev_image: np.ndarray,
vehicle_length: float = 4.5,
vehicle_width: float = 1.8) -> np.ndarray:
"""
在BEV上叠加车辆图标
Args:
bev_image: BEV图像
vehicle_length: 车长(米)
vehicle_width: 车宽(米)
Returns:
叠加后的图像
"""
result = bev_image.copy()
h, w = result.shape[:2]
# 计算车辆像素尺寸
res = list(self.cameras.values())[0].bev_resolution
px_length = int(vehicle_length / res)
px_width = int(vehicle_width / res)
# 车辆中心(图像底部中央)
center_x = w // 2
center_y = h - px_length // 2
# 绘制车辆矩形
x1 = center_x - px_width // 2
y1 = center_y - px_length // 2
x2 = center_x + px_width // 2
y2 = center_y + px_length // 2
cv2.rectangle(result, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.circle(result, (center_x, y2), 3, (0, 0, 255), -1) # 后轴中心
return result
class DepthBEVGenerator:
"""
基于深度估计的3D BEV生成器
将图像提升到3D空间后投影到BEV
"""
def __init__(self, depth_model_path: Optional[str] = None):
"""
初始化深度估计器
Args:
depth_model_path: MiDaS或类似模型路径
"""
self.use_depth = depth_model_path is not None
if self.use_depth:
# 加载MiDaS
self.depth_model = cv2.dnn.readNetFromONNX(depth_model_path)
# 或使用torch.hub加载
# self.midas = torch.hub.load("intel-isl/MiDaS", "MiDaS")
def estimate_depth(self, image: np.ndarray) -> np.ndarray:
"""
估计单目深度
Args:
image: 输入图像
Returns:
深度图(米)
"""
if not self.use_depth:
# 假设地面为平面,使用恒定深度
return np.ones(image.shape[:2]) * 10.0
# MiDaS预处理
input_blob = cv2.dnn.blobFromImage(
image, 1/255.0, (384, 384), (0.485, 0.456, 0.406), swapRB=True
)
self.depth_model.setInput(input_blob)
depth = self.depth_model.forward()
# 后处理(反归一化到米)
depth = cv2.resize(depth[0, 0], (image.shape[1], image.shape[0]))
depth = 1.0 / (depth + 1e-6) # 反演
# 尺度对齐(需要实际尺度信息)
depth = depth * 10.0 # 假设尺度
return depth
def generate_3d_bev(self, image: np.ndarray,
camera_config: CameraConfig,
depth: Optional[np.ndarray] = None) -> np.ndarray:
"""
生成3D BEV(包含高度信息)
Args:
image: 输入图像
camera_config: 相机配置
depth: 预计算深度图(可选)
Returns:
BEV图像(可包含高度通道)
"""
if depth is None:
depth = self.estimate_depth(image)
h, w = image.shape[:2]
# 创建像素网格
u = np.arange(w)
v = np.arange(h)
uu, vv = np.meshgrid(u, v)
# 反投影到相机坐标系
K_inv = np.linalg.inv(camera_config.K)
# 归一化坐标
pixels = np.stack([uu, vv, np.ones_like(uu)], axis=-1).reshape(-1, 3)
rays = (K_inv @ pixels.T).T
rays = rays / np.linalg.norm(rays, axis=1, keepdims=True)
# 3D点
depth_flat = depth.reshape(-1, 1)
points_cam = rays * depth_flat
# 转换到车辆坐标系
R = camera_config.R_vehicle
t = camera_config.t_vehicle
points_vehicle = (R @ points_cam.T + t.reshape(3, 1)).T
# 筛选地面以上点
ground_mask = points_vehicle[:, 1] > -0.5 # y > -0.5m(车辆坐标系y向上)
points_vehicle = points_vehicle[ground_mask]
colors = image.reshape(-1, 3)[ground_mask]
# 投影到BEV(忽略高度,保留最高/最近点)
bev_h = camera_config.bev_height
bev_w = camera_config.bev_width
bev_image = np.zeros((bev_h, bev_w, 3), dtype=np.uint8)
bev_height = np.full((bev_h, bev_w), -np.inf)
x_range = camera_config.bev_range_x
z_range = camera_config.bev_range_z
for pt, color in zip(points_vehicle, colors):
x, y, z = pt
# 映射到BEV像素
bev_x = int((x - x_range[0]) / (x_range[1] - x_range[0]) * bev_w)
bev_y = int((z_range[1] - z) / (z_range[1] - z_range[0]) * bev_h)
if 0 <= bev_x < bev_w and 0 <= bev_y < bev_h:
# 保留最高点(或最近点)
if y > bev_height[bev_y, bev_x]:
bev_height[bev_y, bev_x] = y
bev_image[bev_y, bev_x] = color
return bev_image
def main():
parser = argparse.ArgumentParser(description='BEV生成工具')
parser.add_argument('--mode', type=str, required=True,
choices=['calibrate', 'generate', 'realtime', '3d'],
help='运行模式')
parser.add_argument('--config', '-c', type=str, required=True,
help='相机配置文件')
parser.add_argument('--input', '-i', type=str, help='输入视频或图像')
parser.add_argument('--output', '-o', type=str, help='输出路径')
parser.add_argument('--sources', type=str, help='相机源ID(逗号分隔)')
parser.add_argument('--depth_model', type=str, help='深度模型路径')
args = parser.parse_args()
if args.mode == 'calibrate':
# 相机标定模式
print("相机标定模式")
print("请提供标定板图像,执行内外参标定")
elif args.mode == 'generate':
# 离线生成BEV
print(f"从视频生成BEV: {args.input}")
system = SurroundViewSystem(args.config)
# 打开视频(假设为4通道拼接视频或单独文件)
cap = cv2.VideoCapture(args.input)
# 获取视频信息
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 假设为2x2布局
cam_width = width // 2
cam_height = height // 2
# 创建输出视频
bev_size = (800, 1200) # (width, height)
writer = cv2.VideoWriter(args.output,
cv2.VideoWriter_fourcc(*'mp4v'),
fps, bev_size)
while True:
ret, frame = cap.read()
if not ret:
break
# 分离4个相机图像
front = frame[:cam_height, :cam_width]
right = frame[:cam_height, cam_width:]
rear = frame[cam_height:, :cam_width]
left = frame[cam_height:, cam_width:]
images = {
'front': front,
'right': right,
'rear': rear,
'left': left
}
# 生成BEV
bev = system.process_frame(images)
bev = system.add_vehicle_overlay(bev)
# 调整尺寸
bev = cv2.resize(bev, bev_size)
writer.write(bev)
cv2.imshow('BEV', bev)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
writer.release()
cv2.destroyAllWindows()
print(f"BEV视频已保存: {args.output}")
elif args.mode == 'realtime':
# 实时BEV
print("实时BEV模式")
system = SurroundViewSystem(args.config)
# 打开相机
source_ids = [int(s) for s in args.sources.split(',')]
caps = [cv2.VideoCapture(sid) for sid in source_ids]
cam_ids = ['front', 'right', 'rear', 'left']
while True:
images = {}
for cam_id, cap in zip(cam_ids, caps):
ret, frame = cap.read()
if ret:
images[cam_id] = frame
if len(images) == 4:
bev = system.process_frame(images)
bev = system.add_vehicle_overlay(bev)
cv2.imshow('Surround View BEV', bev)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
for cap in caps:
cap.release()
cv2.destroyAllWindows()
elif args.mode == '3d':
# 3D BEV(带深度)
print("3D BEV生成模式")
system = SurroundViewSystem(args.config)
depth_generator = DepthBEVGenerator(args.depth_model)
# 处理流程与generate类似,但使用深度估计
if __name__ == '__main__':
main()
6.3.4 事件相机(Event Camera)数据处理入门
事件相机(如DAVIS、Prophesee)是新型仿生视觉传感器,以异步方式输出像素级亮度变化(事件),而非传统帧。每个事件包含时间戳(微秒级)、像素坐标和极性(亮/暗变化)。这种数据驱动机制提供极高的时间分辨率(>1MHz)和动态范围(>120dB),适用于高速运动场景和HDR环境。
事件数据处理的核心挑战在于其稀疏性和异步性。传统视觉算法需适配为事件流处理模式,或累积事件生成帧状表示(Event Frame)。常用表示包括:时间表面(Time Surface,记录每个像素最近事件的时间戳)、体素网格(Voxel Grid,将事件离散化为3D时空体素)和事件点云(将每个事件视为四维点t,x,y,p)。OpenCV结合Metavision SDK(Prophesee官方)或dv-processing库,可实现事件读取、可视化和基础处理。
高级应用包括光流估计(基于事件的时间梯度)、深度估计(立体事件匹配)和SLAM(基于事件的位姿跟踪)。事件相机与帧相机的融合(Hybrid System)结合了两者的优势:事件提供高速响应,帧提供纹理细节。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: event_camera_processing.py
Content: 事件相机数据处理:读取、可视化与基础算法
Usage:
1. 播放事件流: python event_camera_processing.py --input events.raw --mode play --display time_surface
2. 生成事件帧: python event_camera_processing.py --input events.raw --output frames/ --mode to_frames --fps 30
3. 计算光流: python event_camera_processing.py --input events.raw --mode optical_flow --output flow.mp4
4. 事件相机标定: python event_camera_processing.py --mode calibrate --pattern chessboard --input calibration.raw
Features:
- 支持Raw、DAT、HDF5格式事件数据
- 时间表面(Time Surface)生成
- 事件体素化(Voxel Grid)
- 基于事件的光流估计(Lucas-Kanade适配)
- 事件与帧融合可视化
"""
import numpy as np
import cv2
import argparse
from typing import List, Tuple, Optional, Dict
from dataclasses import dataclass
from collections import deque
import os
@dataclass
class Event:
"""单个事件"""
timestamp: float # 秒
x: int
y: int
polarity: int # +1 (ON) 或 -1 (OFF)
class EventReader:
"""
事件数据读取器
支持多种事件相机格式
"""
def __init__(self, filepath: str):
self.filepath = filepath
self.format = self._detect_format()
self.events = []
self.width = 0
self.height = 0
self._load_events()
def _detect_format(self) -> str:
"""检测文件格式"""
ext = os.path.splitext(self.filepath)[1].lower()
if ext == '.raw':
return 'metavision_raw'
elif ext == '.dat':
return 'dat'
elif ext in ['.h5', '.hdf5']:
return 'hdf5'
else:
raise ValueError(f"不支持的文件格式: {ext}")
def _load_events(self):
"""加载事件数据"""
if self.format == 'metavision_raw':
self._load_metavision_raw()
elif self.format == 'dat':
self._load_dat()
elif self.format == 'hdf5':
self._load_hdf5()
print(f"加载了 {len(self.events)} 个事件")
print(f"时间范围: {self.events[0].timestamp:.3f}s - {self.events[-1].timestamp:.3f}s")
def _load_metavision_raw(self):
"""加载Metavision RAW格式(需要SDK)"""
try:
from metavision_core.event_io import EventsIterator
# 获取尺寸信息
mv_iterator = EventsIterator(input_path=self.filepath, delta_t=1000)
self.height, self.width = mv_iterator.get_size()
# 读取所有事件
for evs in mv_iterator:
for ev in evs:
self.events.append(Event(
timestamp=ev['t'] / 1e6, # 微秒转秒
x=int(ev['x']),
y=int(ev['y']),
polarity=1 if ev['p'] == 1 else -1
))
except ImportError:
print("警告: 未安装Metavision SDK,使用模拟数据")
self._generate_simulated_events()
def _load_dat(self):
"""加载DAT格式(AEDAT)"""
# DAT格式: t x y p (每行一个事件,时间戳通常为微秒)
data = np.loadtxt(self.filepath)
# 检测尺寸
self.width = int(data[:, 1].max()) + 1
self.height = int(data[:, 2].max()) + 1
for row in data:
self.events.append(Event(
timestamp=row[0] / 1e6,
x=int(row[1]),
y=int(row[2]),
polarity=1 if row[3] == 1 else -1
))
def _load_hdf5(self):
"""加载HDF5格式"""
import h5py
with h5py.File(self.filepath, 'r') as f:
events_data = f['events']
self.width = f.attrs.get('width', 640)
self.height = f.attrs.get('height', 480)
for i in range(len(events_data['t'])):
self.events.append(Event(
timestamp=events_data['t'][i] / 1e6,
x=int(events_data['x'][i]),
y=int(events_data['y'][i]),
polarity=1 if events_data['p'][i] == 1 else -1
))
def _generate_simulated_events(self):
"""生成模拟事件数据(用于测试)"""
self.width = 640
self.height = 480
# 模拟一个移动的边缘
for t in np.arange(0, 1.0, 0.001):
x = int(320 + 100 * np.sin(2 * np.pi * t))
y = 240
if 0 <= x < self.width:
self.events.append(Event(timestamp=t, x=x, y=y, polarity=1))
self.events.append(Event(timestamp=t, x=x, y=y+1, polarity=-1))
def get_slice(self, start_time: float, duration: float) -> List[Event]:
"""获取时间切片内的事件"""
end_time = start_time + duration
return [e for e in self.events if start_time <= e.timestamp < end_time]
class EventRepresentation:
"""
事件表示转换器
将事件流转换为各种表示形式
"""
def __init__(self, width: int, height: int):
self.width = width
self.height = height
def to_event_frame(self, events: List[Event],
accumulation_time: float = 0.033) -> np.ndarray:
"""
累积事件为帧图像
Args:
events: 事件列表
accumulation_time: 累积时间(秒)
Returns:
(H, W, 3) BGR图像
"""
frame = np.zeros((self.height, self.width, 3), dtype=np.uint8)
for ev in events:
if ev.polarity > 0:
frame[ev.y, ev.x] = [255, 0, 0] # 蓝色表示ON事件
else:
frame[ev.y, ev.x] = [0, 0, 255] # 红色表示OFF事件
return frame
def to_time_surface(self, events: List[Event],
decay_time: float = 0.05) -> np.ndarray:
"""
生成时间表面(Time Surface)
每个像素记录最近事件的时间,随时间指数衰减
Args:
events: 事件列表
decay_time: 衰减时间常数(秒)
Returns:
(H, W) 时间表面
"""
if not events:
return np.zeros((self.height, self.width))
# 最近事件时间戳
latest_time = events[-1].timestamp
# 时间表面
time_surface = np.zeros((self.height, self.width))
for ev in events:
# 指数衰减
delta_t = latest_time - ev.timestamp
value = np.exp(-delta_t / decay_time)
# ON事件为正,OFF为负
if ev.polarity > 0:
time_surface[ev.y, ev.x] = max(time_surface[ev.y, ev.x], value)
else:
time_surface[ev.y, ev.x] = min(time_surface[ev.y, ev.x], -value)
return time_surface
def to_voxel_grid(self, events: List[Event],
num_bins: int = 5) -> np.ndarray:
"""
将事件离散化为体素网格(3D时空体素)
Args:
events: 事件列表
num_bins: 时间维度分箱数
Returns:
(num_bins, H, W) 体素网格
"""
if not events:
return np.zeros((num_bins, self.height, self.width))
t0 = events[0].timestamp
t1 = events[-1].timestamp
duration = t1 - t0
voxel_grid = np.zeros((num_bins, self.height, self.width))
for ev in events:
# 时间分箱
bin_idx = int((ev.timestamp - t0) / duration * (num_bins - 1))
bin_idx = np.clip(bin_idx, 0, num_bins - 1)
# 根据极性添加值
value = 1.0 if ev.polarity > 0 else -1.0
voxel_grid[bin_idx, ev.y, ev.x] += value
return voxel_grid
def to_event_cloud(self, events: List[Event]) -> np.ndarray:
"""
将事件转换为点云格式 (t, x, y, p)
Args:
events: 事件列表
Returns:
(N, 4) 事件点云
"""
cloud = np.array([[e.timestamp, e.x, e.y, e.polarity] for e in events])
return cloud
class EventOpticalFlow:
"""
基于事件的光流估计
适配Lucas-Kanade方法到事件数据
"""
def __init__(self, width: int, height: int):
self.width = width
self.height = height
# 时间表面(用于计算梯度)
self.time_surface_on = np.zeros((height, width))
self.time_surface_off = np.zeros((height, width))
# 光流场
self.flow = np.zeros((height, width, 2))
def update(self, events: List[Event]):
"""
更新光流估计
基于事件的光流约束:
∇I · (u, v) + ∂I/∂t = 0
其中I是时间表面,∂I/∂t由事件触发隐式表示
"""
# 更新时间表面
for ev in events:
if ev.polarity > 0:
self.time_surface_on[ev.y, ev.x] = ev.timestamp
else:
self.time_surface_off[ev.y, ev.x] = ev.timestamp
# 计算时间表面梯度
grad_x_on = cv2.Sobel(self.time_surface_on, cv2.CV_32F, 1, 0, ksize=3)
grad_y_on = cv2.Sobel(self.time_surface_on, cv2.CV_32F, 0, 1, ksize=3)
# 计算光流(简化实现)
# 实际应使用局部窗口最小化事件生成模型
self.flow[:, :, 0] = -grad_x_on / (np.abs(grad_x_on) + np.abs(grad_y_on) + 1e-6)
self.flow[:, :, 1] = -grad_y_on / (np.abs(grad_x_on) + np.abs(grad_y_on) + 1e-6)
def get_flow_visualization(self) -> np.ndarray:
"""获取光流可视化"""
return self._flow_to_color(self.flow)
def _flow_to_color(self, flow: np.ndarray) -> np.ndarray:
"""将光流转换为颜色图像"""
mag, ang = cv2.cartToPolar(flow[:, :, 0], flow[:, :, 1])
hsv = np.zeros((*flow.shape[:2], 3), dtype=np.uint8)
hsv[:, :, 0] = ang * 180 / np.pi / 2
hsv[:, :, 1] = 255
hsv[:, :, 2] = np.minimum(mag * 20, 255)
return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
class EventFrameFusion:
"""
事件与帧融合
结合事件相机和常规相机的数据
"""
def __init__(self, width: int, height: int):
self.width = width
self.height = height
self.event_rep = EventRepresentation(width, height)
def fuse(self, frame: np.ndarray, events: List[Event],
alpha: float = 0.5) -> np.ndarray:
"""
融合帧图像和事件数据
Args:
frame: 灰度或彩色帧
events: 事件列表
alpha: 融合权重
Returns:
融合后的图像
"""
# 生成事件帧
event_frame = self.event_rep.to_event_frame(events, accumulation_time=0.033)
# 调整尺寸
if frame.shape[:2] != (self.height, self.width):
frame = cv2.resize(frame, (self.width, self.height))
# 融合
if len(frame.shape) == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
fused = cv2.addWeighted(frame, 1 - alpha, event_frame, alpha, 0)
return fused
def deblur_frame(self, blurry_frame: np.ndarray,
events: List[Event]) -> np.ndarray:
"""
使用事件数据去模糊(基于事件的去模糊)
原理:事件记录运动期间的亮度变化,可用于估计模糊核
"""
# 简化实现:使用事件梯度增强边缘
time_surface = self.event_rep.to_time_surface(events, decay_time=0.05)
# 归一化到0-255
ts_vis = ((time_surface + 1) / 2 * 255).astype(np.uint8)
# 使用事件信息增强高频
if len(blurry_frame.shape) == 3:
gray = cv2.cvtColor(blurry_frame, cv2.COLOR_BGR2GRAY)
else:
gray = blurry_frame
# 高频增强
laplacian = cv2.Laplacian(gray, cv2.CV_32F)
enhanced = gray.astype(np.float32) + 0.5 * laplacian * np.abs(time_surface)
enhanced = np.clip(enhanced, 0, 255).astype(np.uint8)
if len(blurry_frame.shape) == 3:
enhanced = cv2.cvtColor(enhanced, cv2.COLOR_GRAY2BGR)
return enhanced
def main():
parser = argparse.ArgumentParser(description='事件相机处理工具')
parser.add_argument('--input', '-i', type=str, help='输入事件文件')
parser.add_argument('--output', '-o', type=str, help='输出路径')
parser.add_argument('--mode', type=str, required=True,
choices=['play', 'to_frames', 'optical_flow', 'calibrate', 'deblur'],
help='运行模式')
parser.add_argument('--display', type=str, default='event_frame',
choices=['event_frame', 'time_surface', 'voxel_grid'],
help='显示模式')
parser.add_argument('--fps', type=int, default=30, help='输出帧率')
parser.add_argument('--duration', type=float, default=0.033, help='累积时间(秒)')
args = parser.parse_args()
if args.mode == 'play':
# 播放事件流
reader = EventReader(args.input)
rep = EventRepresentation(reader.width, reader.height)
start_time = reader.events[0].timestamp
end_time = reader.events[-1].timestamp
current_time = start_time
while current_time < end_time:
# 获取时间窗口内的事件
events = reader.get_slice(current_time, args.duration)
# 生成表示
if args.display == 'event_frame':
display = rep.to_event_frame(events, args.duration)
elif args.display == 'time_surface':
ts = rep.to_time_surface(events, decay_time=args.duration)
# 归一化显示
display = ((ts + 1) / 2 * 255).astype(np.uint8)
display = cv2.applyColorMap(display, cv2.COLORMAP_JET)
elif args.display == 'voxel_grid':
voxel = rep.to_voxel_grid(events, num_bins=5)
# 显示第一个时间切片
display = ((voxel[0] + 1) / 2 * 255).astype(np.uint8)
display = cv2.applyColorMap(display, cv2.COLORMAP_HOT)
cv2.imshow('Event Camera', display)
if cv2.waitKey(int(1000 / args.fps)) & 0xFF == ord('q'):
break
current_time += args.duration
cv2.destroyAllWindows()
elif args.mode == 'to_frames':
# 转换为帧序列
reader = EventReader(args.input)
rep = EventRepresentation(reader.width, reader.height)
os.makedirs(args.output, exist_ok=True)
start_time = reader.events[0].timestamp
end_time = reader.events[-1].timestamp
current_time = start_time
frame_idx = 0
while current_time < end_time:
events = reader.get_slice(current_time, args.duration)
frame = rep.to_event_frame(events, args.duration)
cv2.imwrite(os.path.join(args.output, f'frame_{frame_idx:06d}.png'), frame)
current_time += args.duration
frame_idx += 1
if frame_idx % 100 == 0:
print(f"已生成 {frame_idx} 帧")
print(f"完成,共生成 {frame_idx} 帧")
elif args.mode == 'optical_flow':
# 光流估计
reader = EventReader(args.input)
flow_estimator = EventOpticalFlow(reader.width, reader.height)
rep = EventRepresentation(reader.width, reader.height)
writer = cv2.VideoWriter(args.output,
cv2.VideoWriter_fourcc(*'mp4v'),
args.fps, (reader.width, reader.height))
start_time = reader.events[0].timestamp
current_time = start_time
while current_time < end_time:
events = reader.get_slice(current_time, args.duration)
flow_estimator.update(events)
flow_vis = flow_estimator.get_flow_visualization()
# 叠加事件
event_frame = rep.to_event_frame(events, args.duration)
combined = cv2.addWeighted(flow_vis, 0.7, event_frame, 0.3, 0)
writer.write(combined)
cv2.imshow('Event Optical Flow', combined)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
current_time += args.duration
writer.release()
cv2.destroyAllWindows()
elif args.mode == 'deblur':
# 事件辅助去模糊(需要同时提供模糊帧和事件)
print("去模糊模式需要同时提供帧图像和事件数据")
if __name__ == '__main__':
main()
6.4 神经渲染与SLAM
6.4.1 NeRF数据预处理:相机位姿估计与图像筛选
神经辐射场(NeRF)通过多层感知机(MLP)隐式表示场景的密度和颜色,实现照片级真实感渲染。训练NeRF需要精确的相机位姿和高质量的输入图像。数据预处理阶段的核心任务是:从原始图像中估计相机位姿(通常使用COLMAP),筛选信息冗余度低的图像(避免过采样),以及归一化场景坐标系。
COLMAP的稀疏重建输出可直接用于NeRF,但需转换为NeRF坐标系(通常将场景中心置于原点,并缩放到单位球内)。图像筛选基于视角差异(相机中心距离和朝向角度),移除视差过小的图像对。此外,需检测并剔除动态物体(使用语义分割或光流一致性),因为NeRF假设静态场景。
数据增强策略包括:颜色抖动(增强光照鲁棒性)、随机射线采样(降低内存占用)和多尺度训练(加速收敛)。OpenCV在预处理中负责图像去畸变、Resize和格式转换,而位姿估计依赖COLMAP或自定义SfM实现。
Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: nerf_preprocessing.py
Content: NeRF数据预处理:位姿估计、图像筛选与格式转换
Usage:
1. 完整预处理: python nerf_preprocessing.py --images ./raw_images --output ./nerf_data --run_colmap
2. 仅筛选图像: python nerf_preprocessing.py --colmap_dir ./sparse --images ./imgs --output ./selected --mode select
3. 生成transforms.json: python nerf_preprocessing.py --colmap_dir ./sparse --output transforms.json --mode convert
4. 检测动态物体: python nerf_preprocessing.py --images ./imgs --poses poses.npy --mode detect_dynamic
Features:
- 自动化COLMAP调用与结果解析
- 基于视差的图像筛选(避免冗余)
- 动态物体检测与掩码生成
- LLFF格式与NeRF格式互转
- 相机参数归一化与坐标系对齐
"""
import cv2
import numpy as np
import json
import argparse
import os
import shutil
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
from pathlib import Path
import subprocess
from scipy.spatial.transform import Rotation
@dataclass
class CameraPose:
"""相机位姿"""
image_name: str
R: np.ndarray # 旋转矩阵 (3, 3)
t: np.ndarray # 平移向量 (3,)
K: np.ndarray # 内参 (3, 3)
height: int
width: int
class ColmapParser:
"""
COLMAP结果解析器
读取sparse重建结果(bin或txt格式)
"""
def __init__(self, sparse_dir: str):
self.sparse_dir = sparse_dir
self.cameras = {}
self.images = {}
self.points3D = {}
self._read_cameras()
self._read_images()
self._read_points3D()
def _read_cameras(self):
"""读取相机参数"""
cameras_file = os.path.join(self.sparse_dir, 'cameras.txt')
if not os.path.exists(cameras_file):
# 尝试bin格式
self._read_cameras_bin()
return
with open(cameras_file, 'r') as f:
for line in f:
if line.startswith('#') or not line.strip():
continue
parts = line.strip().split()
cam_id = int(parts[0])
model = parts[1]
width = int(parts[2])
height = int(parts[3])
# PINHOLE模型: fx, fy, cx, cy
params = list(map(float, parts[4:]))
K = np.array([[params[0], 0, params[2]],
[0, params[1], params[3]],
[0, 0, 1]])
self.cameras[cam_id] = {
'model': model,
'width': width,
'height': height,
'K': K
}
def _read_cameras_bin(self):
"""读取二进制相机文件"""
import struct
cameras_file = os.path.join(self.sparse_dir, 'cameras.bin')
with open(cameras_file, 'rb') as f:
num_cameras = struct.unpack('Q', f.read(8))[0]
for _ in range(num_cameras):
cam_id = struct.unpack('I', f.read(4))[0]
model_id = struct.unpack('I', f.read(4))[0]
width = struct.unpack('Q', f.read(8))[0]
height = struct.unpack('Q', f.read(8))[0]
# 读取参数(假设PINHOLE)
num_params = 4
params = struct.unpack('d' * num_params, f.read(8 * num_params))
K = np.array([[params[0], 0, params[2]],
[0, params[1], params[3]],
[0, 0, 1]])
self.cameras[cam_id] = {
'model': 'PINHOLE',
'width': width,
'height': height,
'K': K
}
def _read_images(self):
"""读取图像位姿"""
images_file = os.path.join(self.sparse_dir, 'images.txt')
if not os.path.exists(images_file):
self._read_images_bin()
return
with open(images_file, 'r') as f:
lines = f.readlines()
i = 0
while i < len(lines):
line = lines[i].strip()
if line.startswith('#') or not line:
i += 1
continue
# 图像信息行
parts = line.split()
img_id = int(parts[0])
qw, qx, qy, qz = map(float, parts[1:5])
tx, ty, tz = map(float, parts[5:8])
cam_id = int(parts[8])
img_name = parts[9]
# 四元数转旋转矩阵
R = Rotation.from_quat([qx, qy, qz, qw]).as_matrix()
t = np.array([tx, ty, tz])
self.images[img_id] = {
'R': R,
't': t,
'camera_id': cam_id,
'name': img_name
}
i += 2 # 跳过点观测行
def _read_images_bin(self):
"""读取二进制图像文件"""
import struct
images_file = os.path.join(self.sparse_dir, 'images.bin')
with open(images_file, 'rb') as f:
num_images = struct.unpack('Q', f.read(8))[0]
for _ in range(num_images):
img_id = struct.unpack('I', f.read(4))[0]
qw = struct.unpack('d', f.read(8))[0]
qx = struct.unpack('d', f.read(8))[0]
qy = struct.unpack('d', f.read(8))[0]
qz = struct.unpack('d', f.read(8))[0]
tx = struct.unpack('d', f.read(8))[0]
ty = struct.unpack('d', f.read(8))[0]
tz = struct.unpack('d', f.read(8))[0]
cam_id = struct.unpack('I', f.read(4))[0]
# 读取图像名
name_len = struct.unpack('Q', f.read(8))[0]
img_name = f.read(name_len).decode('utf-8')
# 读取点观测(跳过)
num_points2D = struct.unpack('Q', f.read(8))[0]
for _ in range(num_points2D):
f.read(8 * 2) # xy
f.read(8) # point3D_id
R = Rotation.from_quat([qx, qy, qz, qw]).as_matrix()
t = np.array([tx, ty, tz])
self.images[img_id] = {
'R': R,
't': t,
'camera_id': cam_id,
'name': img_name
}
def _read_points3D(self):
"""读取3D点(可选)"""
points_file = os.path.join(self.sparse_dir, 'points3D.txt')
if not os.path.exists(points_file):
return
# 解析略,NeRF通常不需要3D点
def get_camera_poses(self) -> List[CameraPose]:
"""获取所有相机位姿"""
poses = []
for img_id, img_data in self.images.items():
cam_id = img_data['camera_id']
cam_data = self.cameras[cam_id]
pose = CameraPose(
image_name=img_data['name'],
R=img_data['R'],
t=img_data['t'],
K=cam_data['K'],
height=cam_data['height'],
width=cam_data['width']
)
poses.append(pose)
return poses
class NeRFDataPreprocessor:
"""
NeRF数据预处理器
整合COLMAP结果处理、图像筛选和格式转换
"""
def __init__(self, output_dir: str):
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
self.poses = []
self.selected_images = []
def run_colmap(self, image_dir: str, colmap_path: str = 'colmap'):
"""
运行COLMAP稀疏重建
Args:
image_dir: 输入图像目录
colmap_path: COLMAP可执行文件路径
"""
database_path = os.path.join(self.output_dir, 'database.db')
sparse_dir = os.path.join(self.output_dir, 'sparse')
# 特征提取
print("1. 特征提取...")
subprocess.run([
colmap_path, 'feature_extractor',
'--database_path', database_path,
'--image_path', image_dir,
'--ImageReader.single_camera', '1',
'--SiftExtraction.use_gpu', '1'
], check=True)
# 特征匹配
print("2. 特征匹配...")
subprocess.run([
colmap_path, 'exhaustive_matcher',
'--database_path', database_path,
'--SiftMatching.use_gpu', '1'
], check=True)
# 稀疏重建
print("3. 稀疏重建...")
subprocess.run([
colmap_path, 'mapper',
'--database_path', database_path,
'--image_path', image_dir,
'--output_path', sparse_dir
], check=True)
print(f"COLMAP重建完成: {sparse_dir}")
# 返回最大的重建结果目录
sparse_subdirs = [d for d in os.listdir(sparse_dir)
if os.path.isdir(os.path.join(sparse_dir, d))]
if sparse_subdirs:
return os.path.join(sparse_dir, max(sparse_subdirs))
return None
def select_images(self, poses: List[CameraPose],
min_angle_deg: float = 5.0,
min_distance: float = 0.1) -> List[CameraPose]:
"""
基于视差筛选图像,移除冗余视角
Args:
poses: 所有相机位姿
min_angle_deg: 最小角度差(度)
min_distance: 最小距离(米)
Returns:
筛选后的位姿列表
"""
if len(poses) <= 2:
return poses
selected = [poses[0]]
for pose in poses[1:]:
# 计算与已选视角的差异
is_redundant = False
for sel_pose in selected:
# 距离检查
dist = np.linalg.norm(pose.t - sel_pose.t)
# 角度检查(相机朝向)
# 计算相对旋转
R_rel = pose.R @ sel_pose.R.T
angle = np.arccos(np.clip((np.trace(R_rel) - 1) / 2, -1, 1))
angle_deg = np.degrees(angle)
if dist < min_distance and angle_deg < min_angle_deg:
is_redundant = True
break
if not is_redundant:
selected.append(pose)
print(f"图像筛选: {len(poses)} -> {len(selected)}")
return selected
def normalize_scene(self, poses: List[CameraPose]) -> Tuple[List[CameraPose], Dict]:
"""
归一化场景坐标系
将场景中心置于原点,缩放到单位球内
Returns:
(归一化后的位姿, 归一化参数)
"""
# 计算相机中心
centers = np.array([-pose.R.T @ pose.t for pose in poses])
# 场景中心
scene_center = np.mean(centers, axis=0)
# 场景范围
scene_radius = np.max(np.linalg.norm(centers - scene_center, axis=1))
# 归一化参数
norm_params = {
'center': scene_center.tolist(),
'radius': float(scene_radius),
'scale': 1.0 / scene_radius
}
# 归一化位姿
normalized_poses = []
for pose in poses:
# 平移归一化
t_new = (pose.t - pose.R @ scene_center) / scene_radius
# 内参调整(如果需要)
K_new = pose.K.copy()
norm_pose = CameraPose(
image_name=pose.image_name,
R=pose.R,
t=t_new,
K=K_new,
height=pose.height,
width=pose.width
)
normalized_poses.append(norm_pose)
print(f"场景归一化: 中心 {scene_center}, 半径 {scene_radius}")
return normalized_poses, norm_params
def detect_dynamic_objects(self, image_dir: str, poses: List[CameraPose],
threshold: float = 0.5) -> Dict[str, np.ndarray]:
"""
检测动态物体(基于光流一致性)
Args:
image_dir: 图像目录
poses: 相机位姿
threshold: 动态判断阈值
Returns:
图像名到动态掩码的映射
"""
dynamic_masks = {}
# 加载图像
images = {}
for pose in poses:
img_path = os.path.join(image_dir, pose.image_name)
img = cv2.imread(img_path)
if img is not None:
images[pose.image_name] = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 计算相邻帧的光流
img_names = list(images.keys())
for i in range(len(img_names) - 1):
name1, name2 = img_names[i], img_names[i+1]
img1, img2 = images[name1], images[name2]
# 计算光流
flow = cv2.calcOpticalFlowFarneback(
img1, img2, None,
pyr_scale=0.5, levels=3, winsize=15,
iterations=3, poly_n=5, poly_sigma=1.2, flags=0
)
flow_mag = np.sqrt(flow[:, :, 0]**2 + flow[:, :, 1]**2)
# 基于位姿计算期望流(简化,实际应投影3D点)
# 这里使用阈值检测异常
dynamic_mask = flow_mag > threshold * np.mean(flow_mag)
dynamic_masks[name2] = dynamic_mask.astype(np.uint8) * 255
return dynamic_masks
def convert_to_nerf_format(self, poses: List[CameraPose],
norm_params: Dict,
image_dir: str) -> Dict:
"""
转换为NeRF的transforms.json格式
Args:
poses: 相机位姿列表
norm_params: 归一化参数
image_dir: 图像目录(用于复制文件)
Returns:
NeRF格式的字典
"""
frames = []
for pose in poses:
# NeRF使用相机到世界的变换(c2w)
# COLMAP提供的是世界到相机(w2c),需要求逆
# 构建w2c矩阵
w2c = np.eye(4)
w2c[:3, :3] = pose.R
w2c[:3, 3] = pose.t
# c2w是w2c的逆
c2w = np.linalg.inv(w2c)
# NeRF坐标系: x右, y上, z后(与COLMAP不同,需要转换)
# 转换矩阵
convert_mat = np.array([
[1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, -1, 0],
[0, 0, 0, 1]
])
c2w = c2w @ convert_mat
frame = {
'file_path': f"./images/{pose.image_name}",
'transform_matrix': c2w.tolist()
}
frames.append(frame)
# 构建transforms.json
transforms = {
'camera_angle_x': 2 * np.arctan2(pose.width / 2, pose.K[0, 0]),
'frames': frames
}
return transforms
def save_nerf_data(self, transforms: Dict, image_dir: str,
dynamic_masks: Optional[Dict] = None):
"""
保存NeRF训练数据
Args:
transforms: transforms.json内容
image_dir: 原始图像目录
dynamic_masks: 动态物体掩码(可选)
"""
# 保存transforms.json
transforms_path = os.path.join(self.output_dir, 'transforms.json')
with open(transforms_path, 'w') as f:
json.dump(transforms, f, indent=2)
# 复制图像
images_output = os.path.join(self.output_dir, 'images')
os.makedirs(images_output, exist_ok=True)
for frame in transforms['frames']:
img_name = os.path.basename(frame['file_path'])
src_path = os.path.join(image_dir, img_name)
dst_path = os.path.join(images_output, img_name)
if os.path.exists(src_path):
if dynamic_masks and img_name in dynamic_masks:
# 应用动态掩码(将动态区域设为黑色或白色)
img = cv2.imread(src_path)
mask = dynamic_masks[img_name]
img[mask > 0] = [128, 128, 128] # 灰色标记动态区域
cv2.imwrite(dst_path, img)
else:
shutil.copy2(src_path, dst_path)
print(f"NeRF数据已保存至: {self.output_dir}")
def main():
parser = argparse.ArgumentParser(description='NeRF数据预处理')
parser.add_argument('--images', '-i', type=str, help='输入图像目录')
parser.add_argument('--output', '-o', type=str, required=True, help='输出目录')
parser.add_argument('--colmap_dir', type=str, help='COLMAP稀疏重建目录')
parser.add_argument('--mode', type=str, default='full',
choices=['full', 'select', 'convert', 'detect_dynamic'],
help='运行模式')
parser.add_argument('--run_colmap', action='store_true', help='运行COLMAP')
parser.add_argument('--colmap_path', type=str, default='colmap', help='COLMAP路径')
parser.add_argument('--min_angle', type=float, default=5.0, help='最小角度差')
parser.add_argument('--min_distance', type=float, default=0.1, help='最小距离')
args = parser.parse_args()
preprocessor = NeRFDataPreprocessor(args.output)
if args.mode == 'full':
# 完整预处理流程
if args.run_colmap:
sparse_dir = preprocessor.run_colmap(args.images, args.colmap_path)
else:
sparse_dir = args.colmap_dir
# 解析COLMAP结果
parser = ColmapParser(sparse_dir)
poses = parser.get_camera_poses()
# 图像筛选
selected_poses = preprocessor.select_images(
poses, args.min_angle, args.min_distance
)
# 场景归一化
normalized_poses, norm_params = preprocessor.normalize_scene(selected_poses)
# 检测动态物体(可选)
dynamic_masks = preprocessor.detect_dynamic_objects(
args.images, normalized_poses
)
# 转换为NeRF格式
transforms = preprocessor.convert_to_nerf_format(
normalized_poses, norm_params, args.images
)
# 保存
preprocessor.save_nerf_data(transforms, args.images, dynamic_masks)
print("预处理完成!")
elif args.mode == 'select':
# 仅图像筛选
parser = ColmapParser(args.colmap_dir)
poses = parser.get_camera_poses()
selected = preprocessor.select_images(poses, args.min_angle, args.min_distance)
# 保存筛选结果
selected_names = [p.image_name for p in selected]
with open(os.path.join(args.output, 'selected_images.txt'), 'w') as f:
f.write('\n'.join(selected_names))
print(f"已选择 {len(selected)} 张图像")
elif args.mode == 'convert':
# 仅格式转换
parser = ColmapParser(args.colmap_dir)
poses = parser.get_camera_poses()
normalized_poses, norm_params = preprocessor.normalize_scene(poses)
transforms = preprocessor.convert_to_nerf_format(
normalized_poses, norm_params, args.images
)
with open(args.output, 'w') as f:
json.dump(transforms, f, indent=2)
print(f"已保存: {args.output}")
elif args.mode == 'detect_dynamic':
# 仅动态物体检测
parser = ColmapParser(args.colmap_dir)
poses = parser.get_camera_poses()
masks = preprocessor.detect_dynamic_objects(args.images, poses)
# 保存掩码
masks_dir = os.path.join(args.output, 'dynamic_masks')
os.makedirs(masks_dir, exist_ok=True)
for name, mask in masks.items():
cv2.imwrite(os.path.join(masks_dir, name), mask)
print(f"已保存 {len(masks)} 个动态掩码")
if __name__ == '__main__':
main()更多推荐
所有评论(0)