多传感器融合:工业场景下的状态决策逻辑

在这里插入图片描述

关于作者

我接触视觉整整 10 年

机器视觉、烟草、煤矿等行业都有深度开发经验。从硬件选型、算法开发、模型训练,到上位机开发及部署,都在一线磨过

之前是多家公司人工智能团队的技术负责人。现在自己创业了,还在继续做视觉落地这件事。


作者说

在做视觉这件事之前,我以为最难的是算法开发和模型精度。

后来发现,真正让人崩溃的从来不是算法本身,而是现场那些"没想到"的情况——

  • 摄像头突然逆光,画面一片白
  • 皮带头晃动,带动的尘雾让检测全乱
  • 机器停了,但传感器报告还在"运行"
  • 边缘设备算力不够,一运行就卡死

这些问题是实验室里遇不到的。

所以我打算写一个系列,记录我们在真实工业场景下踩过的坑、解决过的问题。不讲多么酷炫的算法,只聊怎么让算法稳定跑在客户现场

这是第四篇。


踩坑实录:三个"正确"的判断,却做出了错误的决定

做第三个项目的时候,我已经用上了 YOLO + 光流 + 粉尘检测 + 人员检测,四管齐下。

但还是出问题了。

有一天现场报故障:机器明明在运行,但系统显示"停止"。我调出日志一看:

  • YOLO:检测到目标 ✓
  • 光流:检测到运动 ✓
  • 粉尘:正常 ✓
  • 人员:没人 ✓

每个模块的判断都是对的,但合在一起却错了——为什么?

这是第三个大坑:单一模块正常,不代表系统正常。

从那以后,我们开始认真研究多传感器融合状态决策逻辑


01 为什么需要多方法融合?

1.1 单一方法的局限

方法 能解决的问题 解决不了的
YOLO 检测"有什么" 判断"动不动态"、镜头脏
光流 判断运动 区分目标运动和背景干扰
暗通道 检测粉尘 判断机器是否在运行
人员检测 人员闯入 机器状态

每种方法都有盲区,必须融合。

1.2 融合的核心挑战

  • 优先级:谁先谁后?冲突时听谁的?
  • 时序:不同方法的检测频率不同步(YOLO 可能是 5fps,光流是 30fps)
  • 误检叠加:一个误检还能接受,多个误检就会导致错误决策

02 分层决策架构

2.1 设计原则

我们采用分层决策,核心原则是:优先级高的信息,直接覆盖其他判断

┌─────────────────────────────────────┐
│           最高优先级                │
│         人员安全相关                 │
│    (检测到人 → 必须停止一切)        │
└──────────────┬──────────────────────┘
               │
┌──────────────▼──────────────────────┐
│           次优先级                   │
│         环境异常                     │
│    (粉尘遮挡 → 维持状态)           │
└──────────────┬──────────────────────┘
               │
┌──────────────▼──────────────────────┐
│           基础判断                   │
│         目标运动检测                 │
│    (光流法 → 运行/停止)            │
└─────────────────────────────────────┘

2.2 代码实现

def evaluate_status(operator_detected, occlusion_found, flow_score, threshold):
    """
    分层状态决策
    
    参数:
        operator_detected: 操作人员检测结果
        occlusion_found: 遮挡检测结果(粉尘)
        flow_score: 光流投票结果(运动帧数/总帧数)
        threshold: 判定阈值(0.0-1.0)
    
    返回:
        status: 'run' / 'stop' / 'hold'
        reason: 判断原因
    """
    # 最高优先级:人员检测
    if operator_detected:
        return 'stop', 'operator_detected'
    
    # 次优先级:遮挡判断
    if occlusion_found:
        return 'hold', 'occlusion_detected'
    
    # 基础判断:光流检测
    if flow_score > threshold:
        return 'run', 'flow_detected'
    elif flow_score < (threshold - 0.1):
        return 'stop', 'no_activity'
    else:
        # 模糊区间,维持原状态
        return 'hold', 'ambiguous'

2.3 原始代码中的实现

在我原来的代码里,决策逻辑是通过 evaluate_system_state 方法实现的:

# 原始代码片段
def evaluate_system_state(self):
    # 1. 检查是否有人员
    if self.human_visible:
        return "stop", "human_detected"
    
    # 2. 检查遮挡状态
    if self.environment_blocked:
        return "hold", "environment_blocked"
    
    # 3. 检查光流状态
    if activity_detected:
        return "run", "activity_confirmed"
    else:
        return "stop", "no_activity"

核心思想是一致的:按优先级逐层判断,高优先级直接覆盖


03 时间窗口投票:平滑误检波动

3.1 问题

单帧判断容易受噪声影响——某一帧误检,结果就变了。

3.2 解决思路

用时间窗口投票:积累多帧结果,只有稳定超过阈值才变更状态。

3.3 代码实现

class SlidingWindow:
    """滑动窗口投票器"""
    
    def __init__(self, size=30, ratio=0.75):
        self.window_size = size
        self.trigger_ratio = ratio
        self.buffer = deque(maxlen=size)
    
    def append(self, detected):
        """记录检测结果"""
        self.buffer.append(1 if detected else 0)
    
    def compute(self):
        """计算投票结果"""
        if len(self.buffer) < self.window_size:
            return None
        
        ratio = sum(self.buffer) / len(self.buffer)
        return ratio >= self.trigger_ratio

3.4 在状态决策中的应用

class SystemController:
    def __init__(self):
        self.motion_window = SlidingWindow(size=30, ratio=0.75)
        self.current_status = "unknown"
        self.last_switch_time = 0
        self.min_interval = 10  # 状态最短保持时间(秒)
    
    def refresh(self, motion_flag, occlusion_flag, operator_flag):
        # 添加投票
        self.motion_window.append(motion_flag)
        vote_outcome = self.motion_window.compute()
        
        # 人员检测:直接停机
        if operator_flag:
            self._switch_status("stop", "operator_present")
            return
        
        # 遮挡判断:维持状态
        if occlusion_flag:
            return
        
        # 运动投票
        if vote_outcome is not None:
            if vote_outcome:
                self._switch_status("run", "motion_confirmed")
            else:
                self._switch_status("stop", "motion_absent")
    
    def _switch_status(self, new_status, cause):
        """带防抖的状态切换"""
        now = time.time()
        
        # 检查是否在保持期内
        if self.current_status == new_status:
            return
        
        if now - self.last_switch_time < self.min_interval:
            return
        
        self.current_status = new_status
        self.last_switch_time = now

04 状态维持与防抖机制

4.1 问题

工业现场最怕状态跳变——一会儿"运行",一会儿"停止"。

4.2 解决思路

两种防抖策略:

  1. 时间窗口投票:不是单帧判断,而是积累多帧结果
  2. 状态维持:状态变更后,必须保持一定时间才允许再次变更

4.3 代码实现

class SystemEvaluator:
    def __init__(self, interval=10, ratio=0.75):
        self.min_interval = interval  # 状态最短保持时间
        self.vote_ratio = ratio  # 投票阈值
        
        self.previous_status = "unknown"
        self.last_toggle_time = 0
        self.vote_history = deque(maxlen=30)  # 30帧投票历史
    
    def evaluate(self, motion_flag, occlusion_flag, operator_flag, current_time):
        """
        状态评估
        
        参数:
            motion_flag: 运动检测结果
            occlusion_flag: 遮挡检测结果
            operator_flag: 人员检测结果
            current_time: 当前时间戳
        
        返回:
            status: 评估结果
            source: 决策来源
        """
        # 1. 人员检测:最高优先级,直接停止
        if operator_flag:
            self._update_state("stop", "operator_present", current_time)
            return self.previous_status, "operator_present"
        
        # 2. 遮挡检测:维持在当前状态
        if occlusion_flag:
            self._preserve_state(current_time)
            return self.previous_status, "occlusion_detected"
        
        # 3. 添加到投票历史
        self.vote_history.append(1 if motion_flag else 0)
        
        # 4. 投票判断
        if len(self.vote_history) >= 20:  # 至少20帧
            vote_score = sum(self.vote_history) / len(self.vote_history)
            
            if vote_score >= self.vote_ratio:
                new_state = "run"
            elif vote_score <= (self.vote_ratio - 0.1):
                new_state = "stop"
            else:
                # 模糊区间,维持状态
                return self.previous_status, "unclear"
            
            self._update_state(new_state, "vote_result", current_time)
        
        return self.previous_status, "no_change"
    
    def _update_state(self, new_state, src, time_now):
        """更新状态(带防抖)"""
        # 检查是否在保持期内
        if new_state == self.previous_status:
            self.last_toggle_time = time_now
            return
        
        # 检查保持时间
        if time_now - self.last_toggle_time < self.min_interval:
            return
        
        self.previous_status = new_state
        self.last_toggle_time = time_now
    
    def _preserve_state(self, time_now):
        """维持当前状态"""
        # 遮挡时更新时间,但不改变状态
        self.last_toggle_time = time_now

05 实际效果:误检率从 15% 降到 2%

5.1 优化前后对比

指标 优化前 优化后
状态跳变频率 5-10次/天 0-1次/天
误报率 15% 2%
漏报率 8% 1%
平均响应时间 3秒 2秒

5.2 关键经验

  1. 分层决策:让高优先级信息(如人员)直接覆盖
  2. 时间窗口投票:平滑单帧误检的影响
  3. 状态维持:防止频繁跳变
  4. 日志记录:每次状态变更都要记录原因,方便排查

06 总结

本文分享了多传感器融合与状态决策的实战方案:

  1. 分层决策架构:按优先级逐层判断,人员安全 > 环境异常 > 基础运动检测
  2. 时间窗口投票:积累多帧结果,避免单帧误检影响
  3. 状态维持机制:状态变更后必须保持一定时间,防止频繁跳变
  4. 完整代码实现:提供了可直接复用的状态机代码

核心教训不是算法越准系统越稳,而是决策逻辑越合理系统越稳。


07 写在最后

工业视觉落地,拼的不是算法多先进,而是能不能在客户现场稳定跑起来。

这篇文章讲的是方法,但真正值钱的,是我们在多个项目里积累的那些"没想到"和"怎么办"。

后续我会继续更新这个系列,分享更多实战经验——

  • 边缘设备部署避坑指南
  • 数据闭环与自动标注实践

如果你也有工业视觉落地的困扰,欢迎一起交流。


本文所有代码均为示意,核心思路可复现,具体参数需根据实际场景调整。

📎 相关阅读

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐