目录

 视频动作识别与定位系统

SlowFast_X3D 部署

使用 TAG 和 BMN 算法进行时间动作提议生成内容:使用 TAG 和 BMN 算法进行时间动作提议生成用法:python TAG_BMN_Proposal_Generation.py --视频 long_video.mp4 --算法 bmn --GPU功能:边界匹配图生成、置信度评估、NMS 后处理、OpenCV 特征提取

因果在线视频理解


 视频动作识别与定位系统

Temporal Segment Networks通过稀疏采样策略实现长程时序结构建模。该方法将输入视频划分为若干等长时段,从每个时段中随机抽取短片段进行特征提取。所有片段共享卷积网络参数,通过共识函数聚合各片段预测结果形成视频级预测。这种架构有效降低了长视频序列的计算开销,同时捕获了跨时段的时序依赖关系。训练过程中采用跨模态预训练技术,利用RGB模型初始化光流分支,配合部分批量归一化和深度 dropout 抑制过拟合。对于未修剪视频,TSN引入多尺度时间窗口积分机制,通过滑动窗口策略密集采样片段,利用最大池化选取峰值激活窗口并跨尺度融合分数。实时部署时,TSN采用RGB差分或压缩视频运动向量替代光流提取,在保持精度的同时将推理速度提升至每秒340帧。

SlowFast与X3D构成了高效视频理解的互补范式。SlowFast采用双路径架构,慢路径以低帧率捕获空间语义信息,快路径以高帧率处理高时间分辨率的运动信息,通过侧向连接实现双向信息融合。X3D作为可扩展三维卷积网络,沿时间、空间、宽度和深度四个维度渐进式扩展二维图像分类架构,结合深度可分离卷积和通道分离策略减少参数量。部署优化涉及内核融合与硬件感知编译,利用TensorRT或ONNX Runtime执行算子融合,将卷积、批量归一化与激活函数合并为单一内核。移动端部署采用知识蒸馏量化技术,将浮点权重映射为8位整型,配合通道剪枝移除冗余特征映射。

时序动作提名旨在未修剪视频中定位潜在动作区间。Temporal Action Grouping采用自底向上策略,基于动作ness分数序列构建时间分水岭算法,将连续高响应区域聚类为提案,通过自适应阈值抑制背景噪声。Boundary-Matching Network引入边界匹配机制,将提案表示为起始与结束边界的匹配对,构建二维边界匹配置信度图密集评估所有可能区间。网络包含边界分类与置信度评估双分支,联合优化边界定位与提案可靠性估计。实现时采用扩展感受野卷积提取多尺度时序特征,通过非极大值抑制后处理消除重叠提案。

在线视频理解要求模型严格遵循因果约束,仅利用历史帧进行当前时刻预测。因果卷积通过零填充确保卷积核仅访问当前及过去时刻特征,消除未来信息泄露。流式推理架构采用分块处理机制,将长视频序列分割为重叠时序块,利用循环状态传递跨块历史上下文。内存优化方面,采用固定容量特征缓冲区存储历史激活,结合梯度检查点技术权衡计算与显存占用。OpenCV集成时需构建多线程管道,分离视频解码、预处理与推理线程,利用环形缓冲区平滑异步流水线吞吐量差异。

Python

"""
Script: TSN_Inference_Optimized.py
Content: Temporal Segment Networks deployment with OpenCV optimization
Usage: python TSN_Inference_Optimized.py --video input.mp4 --segments 7 --device cpu/gpu
Features: Multi-threaded frame extraction, sparse sampling, two-stream fusion, real-time FPS monitoring
"""

import cv2
import numpy as np
import torch
import torch.nn as nn
from threading import Thread, Lock
from queue import Queue
import argparse
from typing import List, Tuple, Optional
import time

class TSNArch(nn.Module):
    def __init__(self, num_classes: int, num_segments: int = 7, modality: str = 'RGB'):
        super(TSNArch, self).__init__()
        self.num_segments = num_segments
        self.modality = modality
        self.num_classes = num_classes
        
        # Backbone initialization (BN-Inception or ResNet-152)
        self.base_model = self._create_base_model()
        self.consensus = nn.AdaptiveAvgPool2d(1)
        self.dropout = nn.Dropout(p=0.8)
        self.fc = nn.Linear(2048, num_classes)
        
        # Cross-modality initialization handling
        if modality != 'RGB':
            self._modify_first_conv()
    
    def _create_base_model(self) -> nn.Module:
        import torchvision.models as models
        model = models.resnet152(pretrained=True)
        return nn.Sequential(*list(model.children())[:-2])
    
    def _modify_first_conv(self):
        """Adapt first layer for optical flow input (10 channels for flow stacks)"""
        with torch.no_grad():
            conv1 = self.base_model[0].weight
            # Average RGB channels and replicate for flow depth
            new_weight = conv1.mean(dim=1, keepdim=True)
            new_weight = new_weight.repeat(1, 10, 1, 1) / 10.0
            self.base_model[0] = nn.Conv2d(10, 64, kernel_size=7, 
                                            stride=2, padding=3, bias=False)
            self.base_model[0].weight.data = new_weight
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        batch_size = x.size(0)
        # Segment-level feature extraction with shared weights
        base_out = self.base_model(x.view(-1, x.size(2), x.size(3), x.size(4)))
        base_out = self.consensus(base_out).squeeze(-1).squeeze(-1)
        base_out = self.dropout(base_out)
        output = self.fc(base_out)
        
        # Segmental consensus aggregation
        output = output.view(batch_size, self.num_segments, -1)
        consensus_output = output.mean(dim=1)  # Average pooling consensus
        return consensus_output

class VideoBuffer:
    """Thread-safe circular buffer for streaming video frames"""
    def __init__(self, capacity: int = 128):
        self.queue = Queue(maxsize=capacity)
        self.lock = Lock()
        self.finished = False
    
    def push(self, frame: np.ndarray) -> bool:
        try:
            self.queue.put_nowait(frame)
            return True
        except:
            return False
    
    def pop(self) -> Optional[np.ndarray]:
        try:
            return self.queue.get_nowait()
        except:
            return None
    
    def size(self) -> int:
        return self.queue.qsize()

class OpticalFlowExtractor:
    """GPU-accelerated optical flow computation for TSN temporal stream"""
    def __init__(self, device: str = 'cuda', frame_interval: int = 5):
        self.device = device
        self.frame_interval = frame_interval
        self.prev_frame = None
        self.flow_buffer = []
        
        # OpenCV CUDA optimization for optical flow
        if device == 'cuda' and cv2.cuda.getCudaEnabledDeviceCount() > 0:
            self.gpu_enabled = True
            self.gpu_flow = cv2.cuda.FarnebackOpticalFlow_create(
                numLevels=3, pyrScale=0.5, winSize=15,
                numIters=3, polyN=5, polySigma=1.2, flags=0
            )
        else:
            self.gpu_enabled = False
    
    def compute_flow(self, frame: np.ndarray) -> np.ndarray:
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        if self.prev_frame is None:
            self.prev_frame = gray
            return np.zeros_like(frame)
        
        if self.gpu_enabled:
            gpu_prev = cv2.cuda_GpuMat()
            gpu_curr = cv2.cuda_GpuMat()
            gpu_prev.upload(self.prev_frame)
            gpu_curr.upload(gray)
            gpu_flow = self.gpu_flow.calc(gpu_prev, gpu_curr, None)
            flow = gpu_flow.download()
        else:
            flow = cv2.calcOpticalFlowFarneback(
                self.prev_frame, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
            )
        
        self.prev_frame = gray
        return flow
    
    def extract_flow_stack(self, frames: List[np.ndarray]) -> np.ndarray:
        """Generate stacked optical flow (x,y) * 5 = 10 channels"""
        flows = []
        for i in range(0, len(frames)-1, self.frame_interval):
            flow = self.compute_flow(frames[i])
            flows.append(flow)
            if len(flows) == 5:
                break
        
        # Normalize and stack
        flow_stack = np.zeros((256, 256, 10), dtype=np.float32)
        for idx, flow in enumerate(flows[:5]):
            flow_resized = cv2.resize(flow, (256, 256))
            flow_stack[:, :, idx*2] = flow_resized[:, :, 0] / 20.0  # x-component
            flow_stack[:, :, idx*2+1] = flow_resized[:, :, 1] / 20.0  # y-component
        
        return flow_stack

class TSNInferenceEngine:
    """Optimized TSN inference with multi-threading and batch processing"""
    def __init__(self, model_path: str, num_segments: int = 7, device: str = 'cpu'):
        self.device = device
        self.num_segments = num_segments
        
        # Model initialization
        self.model = TSNArch(num_classes=101, num_segments=num_segments)
        self.model.eval()
        if device == 'cuda' and torch.cuda.is_available():
            self.model = self.model.cuda()
        
        # Two-stream preparation
        self.rgb_buffer = []
        self.flow_extractor = OpticalFlowExtractor(device=device)
        
        # Threading components
        self.frame_buffer = VideoBuffer(capacity=256)
        self.result_queue = Queue()
        self.capture_thread = None
        
    def sparse_sampling(self, total_frames: int) -> List[int]:
        """Sparse temporal sampling: divide video into K segments, sample 1 per segment"""
        segment_length = total_frames // self.num_segments
        indices = []
        for i in range(self.num_segments):
            start = i * segment_length
            end = (i + 1) * segment_length
            # Random sampling during training, center sampling during inference
            idx = start + segment_length // 2
            indices.append(min(idx, total_frames - 1))
        return indices
    
    def preprocess_rgb(self, frame: np.ndarray) -> torch.Tensor:
        """Standard ImageNet preprocessing"""
        frame = cv2.resize(frame, (256, 256))
        # Center crop 224x224
        h, w = frame.shape[:2]
        start_h, start_w = (h - 224) // 2, (w - 224) // 2
        frame = frame[start_h:start_h+224, start_w:start_w+224]
        
        frame = frame.astype(np.float32) / 255.0
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        frame = (frame - mean) / std
        frame = torch.from_numpy(frame).permute(2, 0, 1).unsqueeze(0)
        return frame
    
    def inference(self, video_path: str) -> Tuple[int, float]:
        """Video-level prediction with sparse sampling"""
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        indices = self.sparse_sampling(total_frames)
        
        # Sparse frame extraction
        frames = []
        for idx in indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if ret:
                frames.append(self.preprocess_rgb(frame))
        cap.release()
        
        if len(frames) < self.num_segments:
            # Padding with last frame if video too short
            while len(frames) < self.num_segments:
                frames.append(frames[-1] if frames else torch.zeros(1, 3, 224, 224))
        
        batch = torch.stack(frames).squeeze(1)
        if self.device == 'cuda':
            batch = batch.cuda()
        
        with torch.no_grad():
            with torch.cuda.amp.autocast():  # Mixed precision for speed
                output = self.model(batch.unsqueeze(0))
                probs = torch.softmax(output, dim=1)
                confidence, pred_class = probs.max(1)
        
        return pred_class.item(), confidence.item()
    
    def stream_inference(self, source: int = 0):
        """Real-time streaming inference with thread pool optimization"""
        def capture_frames():
            cap = cv2.VideoCapture(source)
            cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # Minimize latency
            while not self.frame_buffer.finished:
                ret, frame = cap.read()
                if ret:
                    self.frame_buffer.push(frame)
            cap.release()
        
        self.capture_thread = Thread(target=capture_frames)
        self.capture_thread.start()
        
        # Processing loop with sliding window
        window_frames = []
        while True:
            frame = self.frame_buffer.pop()
            if frame is None:
                continue
            
            window_frames.append(frame)
            if len(window_frames) == self.num_segments:
                # Batch inference on window
                batch = [self.preprocess_rgb(f) for f in window_frames]
                batch_tensor = torch.stack(batch).squeeze(1)
                if self.device == 'cuda':
                    batch_tensor = batch_tensor.cuda()
                
                with torch.no_grad():
                    output = self.model(batch_tensor)
                
                # Temporal fusion: weight recent frames higher
                weights = torch.tensor([0.5, 0.7, 0.8, 0.9, 1.0, 1.0, 1.0])
                weighted_output = (output * weights.unsqueeze(1).to(output.device)).mean(0)
                pred = torch.argmax(weighted_output).item()
                
                window_frames.pop(0)
                
                yield pred, frame

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--video', type=str, default='0')
    parser.add_argument('--segments', type=int, default=7)
    parser.add_argument('--device', type=str, default='cpu', choices=['cpu', 'cuda'])
    args = parser.parse_args()
    
    engine = TSNInferenceEngine(
        model_path='tsn_resnet152.pth',
        num_segments=args.segments,
        device=args.device
    )
    
    if args.video == '0':
        args.video = 0
    
    # Real-time demonstration
    for pred_class, frame in engine.stream_inference(int(args.video)):
        cv2.putText(frame, f"Class: {pred_class}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.imshow('TSN Streaming', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cv2.destroyAllWindows()

SlowFast_X3D 部署

Python

"""
Script: SlowFast_X3D_Deployment.py
Content: SlowFast and X3D model optimization for edge deployment
Usage: python SlowFast_X3D_Deployment.py --model_type slowfast --quantization int8 --source video.mp4
Features: TensorRT conversion, kernel fusion, precision calibration, multi-stream batching
"""

import cv2
import torch
import torch.nn as nn
import numpy as np
from typing import Dict, List, Tuple
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
from collections import deque
import argparse
import time

class SlowFastPathway(nn.Module):
    """Dual-pathway architecture with lateral connections"""
    def __init__(self, slow_channels: int = 64, fast_channels: int = 8):
        super().__init__()
        self.slow_path = self._create_slow_path(slow_channels)
        self.fast_path = self._create_fast_path(fast_channels)
        self.lateral_connections = nn.ModuleList([
            nn.Conv3d(fast_channels, slow_channels, kernel_size=(5, 1, 1), 
                     stride=(4, 1, 1), padding=(2, 0, 0))
            for _ in range(4)  # ResNet stages
        ])
    
    def _create_slow_path(self, channels: int) -> nn.Module:
        # Low frame rate (4 frames per clip), high channel capacity
        return nn.Sequential(
            nn.Conv3d(3, channels, kernel_size=(1, 7, 7), stride=(1, 2, 2), padding=(0, 3, 3)),
            nn.BatchNorm3d(channels),
            nn.ReLU(inplace=True)
        )
    
    def _create_fast_path(self, channels: int) -> nn.Module:
        # High frame rate (32 frames), low channel capacity (alpha=1/8)
        return nn.Sequential(
            nn.Conv3d(3, channels, kernel_size=(5, 7, 7), stride=(1, 2, 2), padding=(2, 3, 3)),
            nn.BatchNorm3d(channels),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, slow_input: torch.Tensor, fast_input: torch.Tensor) -> torch.Tensor:
        slow_feat = self.slow_path(slow_input)
        fast_feat = self.fast_path(fast_input)
        
        # Lateral fusion: fast to slow
        lateral_feat = self.lateral_connections[0](fast_feat)
        fused = slow_feat + lateral_feat
        
        return fused, fast_feat

class X3DScaleEfficient(nn.Module):
    """Expandable 3D convolution with depthwise separable decomposition"""
    def __init__(self, input_dim: int = 3, expansion_factor: float = 2.25):
        super().__init__()
        self.expansion_factor = expansion_factor
        
        # Progressive expansion along 4 dimensions: {gamma_t, gamma_s, gamma_w, gamma_d}
        self.stem = nn.Sequential(
            nn.Conv3d(input_dim, 24, kernel_size=(1, 3, 3), stride=(1, 2, 2), 
                     padding=(0, 1, 1), bias=False),
            nn.BatchNorm3d(24),
            nn.ReLU(inplace=True)
        )
        
        # Depthwise separable 3D conv for mobile optimization
        self.block = self._create_x3d_block(24, int(24 * expansion_factor))
    
    def _create_x3d_block(self, in_ch: int, out_ch: int) -> nn.Module:
        mid_ch = in_ch * self.expansion_factor
        return nn.Sequential(
            # Depthwise
            nn.Conv3d(in_ch, in_ch, kernel_size=(3, 3, 3), 
                     groups=in_ch, padding=(1, 1, 1), bias=False),
            nn.BatchNorm3d(in_ch),
            nn.ReLU(inplace=True),
            # Pointwise expansion
            nn.Conv3d(in_ch, mid_ch, kernel_size=1, bias=False),
            nn.BatchNorm3d(mid_ch),
            nn.ReLU(inplace=True),
            # Pointwise projection
            nn.Conv3d(mid_ch, out_ch, kernel_size=1, bias=False),
            nn.BatchNorm3d(out_ch)
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.block(self.stem(x))

class TensorRTOptimizer:
    """TensorRT deployment optimization with layer fusion and precision calibration"""
    def __init__(self, model: nn.Module, input_shapes: Dict[str, Tuple], 
                 precision: str = 'fp16', max_batch: int = 8):
        self.model = model
        self.input_shapes = input_shapes
        self.precision = precision
        self.max_batch = max_batch
        self.logger = trt.Logger(trt.Logger.WARNING)
        self.engine = None
        self.context = None
        
    def build_engine(self, onnx_path: str) -> trt.ICudaEngine:
        """Convert ONNX model to optimized TensorRT engine"""
        builder = trt.Builder(self.logger)
        network = builder.create_network(
            1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
        )
        parser = trt.OnnxParser(network, self.logger)
        
        with open(onnx_path, 'rb') as f:
            parser.parse(f.read())
        
        config = builder.create_builder_config()
        config.max_workspace_size = 1 << 30  # 1GB workspace
        
        if self.precision == 'int8':
            config.set_flag(trt.BuilderFlag.INT8)
            # Calibration for quantization
            config.int8_calibrator = self._create_calibrator()
        elif self.precision == 'fp16':
            config.set_flag(trt.BuilderFlag.FP16)
        
        # Enable kernel fusion and DLA acceleration if available
        config.set_flag(trt.BuilderFlag.STRICT_TYPES)
        
        self.engine = builder.build_engine(network, config)
        self.context = self.engine.create_execution_context()
        return self.engine
    
    def _create_calibrator(self):
        """INT8 calibration using entropy minimization"""
        class Int8Calibrator(trt.IInt8EntropyCalibrator2):
            def __init__(self, data_loader, cache_file):
                super().__init__()
                self.data_loader = data_loader
                self.cache_file = cache_file
                self.batch_size = 8
                self.current_index = 0
            
            def get_batch(self, names, p_str=None):
                if self.current_index >= len(self.data_loader):
                    return None
                # Return calibration batch
                batch = next(iter(self.data_loader))
                self.current_index += 1
                return [batch.cuda().data_ptr()]
            
            def get_batch_size(self):
                return self.batch_size
        
        return Int8Calibrator(None, 'calibration.cache')
    
    def infer(self, inputs: Dict[str, torch.Tensor]) -> torch.Tensor:
        """Optimized inference with pinned memory and async execution"""
        # Allocate device memory
        d_inputs = {}
        h_outputs = {}
        d_outputs = {}
        
        bindings = []
        for name, shape in self.input_shapes.items():
            size = trt.volume(shape) * self.max_batch
            d_inputs[name] = cuda.mem_alloc(size * 4)  # float32
            
            # Pinned memory for host-to-device transfer
            pinned = cuda.pagelocked_empty(size, np.float32)
            cuda.memcpy_htod_async(d_inputs[name], pinned, cuda.Stream())
            bindings.append(int(d_inputs[name]))
        
        # Execute async
        stream = cuda.Stream()
        self.context.execute_async_v2(bindings=bindings, stream_handle=stream.handle)
        stream.synchronize()
        
        return torch.randn(self.max_batch, 400)  # Placeholder for output

class StreamingVideoProcessor:
    """Real-time video processing with batch aggregation and latency hiding"""
    def __init__(self, engine: TensorRTOptimizer, clip_length: int = 32, 
                 sampling_rate: int = 2):
        self.engine = engine
        self.clip_length = clip_length
        self.sampling_rate = sampling_rate
        self.frame_buffer = deque(maxlen=clip_length * sampling_rate)
        self.async_stream = cuda.Stream()
        
    def preprocess_opencv(self, frame: np.ndarray, target_size: Tuple[int, int] = (256, 256)) -> np.ndarray:
        """Hardware-accelerated preprocessing using OpenCV CUDA"""
        gpu_frame = cv2.cuda_GpuMat()
        gpu_frame.upload(frame)
        
        # Resize on GPU
        gpu_resized = cv2.cuda.resize(gpu_frame, target_size)
        
        # Normalize and convert to tensor format
        frame_rgb = cv2.cuda.cvtColor(gpu_resized, cv2.COLOR_BGR2RGB)
        cpu_frame = frame_rgb.download().astype(np.float32) / 255.0
        
        # Mean subtraction and std normalization
        mean = np.array([0.45, 0.45, 0.45])
        std = np.array([0.225, 0.225, 0.225])
        normalized = (cpu_frame - mean) / std
        
        # HWC to CHW format
        return np.transpose(normalized, (2, 0, 1))
    
    def temporal_sampling(self, frames: deque) -> np.ndarray:
        """Sparse sampling for slow and fast pathways"""
        # Slow path: 4 frames
        slow_indices = np.linspace(0, len(frames)-1, 4, dtype=int)
        slow_clip = np.stack([frames[i] for i in slow_indices])
        
        # Fast path: 32 frames (alpha=8 temporal resolution)
        fast_indices = np.linspace(0, len(frames)-1, 32, dtype=int)
        fast_clip = np.stack([frames[i] for i in fast_indices])
        
        return slow_clip, fast_clip
    
    def process_stream(self, video_source: str):
        """Continuous processing with queue management"""
        cap = cv2.VideoCapture(video_source)
        cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # Low latency mode
        
        batch_frames = []
        last_inference_time = time.time()
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            
            processed = self.preprocess_opencv(frame)
            self.frame_buffer.append(processed)
            
            # Trigger inference when buffer full
            if len(self.frame_buffer) == self.frame_buffer.maxlen:
                slow_input, fast_input = self.temporal_sampling(self.frame_buffer)
                
                # Prepare batch tensors
                slow_tensor = torch.from_numpy(slow_input).unsqueeze(0).cuda()
                fast_tensor = torch.from_numpy(fast_input).unsqueeze(0).cuda()
                
                # Async inference
                result = self.engine.infer({'slow': slow_tensor, 'fast': fast_tensor})
                
                # Overlapping I/O: fetch results while processing next batch
                pred_class = torch.argmax(result).item()
                
                # Display with latency annotation
                latency = (time.time() - last_inference_time) * 1000
                cv2.putText(frame, f"Pred: {pred_class} Lat: {latency:.1f}ms", 
                           (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                cv2.imshow('SlowFast Stream', frame)
                
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                
                last_inference_time = time.time()
                # Slide window with overlap (tau=0.5)
                for _ in range(self.clip_length // 2):
                    self.frame_buffer.popleft()
        
        cap.release()

def export_to_onnx(model: nn.Module, save_path: str, input_sample: torch.Tensor):
    """Export PyTorch model to ONNX with dynamic batch and temporal dimensions"""
    torch.onnx.export(
        model,
        input_sample,
        save_path,
        export_params=True,
        opset_version=11,
        do_constant_folding=True,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={
            'input': {0: 'batch_size', 2: 'num_frames'},
            'output': {0: 'batch_size'}
        }
    )

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_type', choices=['slowfast', 'x3d'], default='slowfast')
    parser.add_argument('--quantization', choices=['fp32', 'fp16', 'int8'], default='fp16')
    parser.add_argument('--source', type=str, required=True)
    args = parser.parse_args()
    
    # Model initialization
    if args.model_type == 'slowfast':
        model = SlowFastPathway()
        dummy_input = (torch.randn(1, 3, 4, 224, 224), torch.randn(1, 3, 32, 224, 224))
    else:
        model = X3DScaleEfficient()
        dummy_input = torch.randn(1, 3, 16, 224, 224)
    
    # Export and optimize
    onnx_path = f"{args.model_type}.onnx"
    export_to_onnx(model, onnx_path, dummy_input)
    
    optimizer = TensorRTOptimizer(
        model=model,
        input_shapes={'input': (1, 3, 16, 224, 224)},
        precision=args.quantization
    )
    optimizer.build_engine(onnx_path)
    
    # Streaming inference
    processor = StreamingVideoProcessor(optimizer)
    processor.process_stream(args.source)

使用 TAG 和 BMN 算法进行时间动作提议生成
内容:使用 TAG 和 BMN 算法进行时间动作提议生成
用法:python TAG_BMN_Proposal_Generation.py --视频 long_video.mp4 --算法 bmn --GPU
功能:边界匹配图生成、置信度评估、NMS 后处理、OpenCV 特征提取

Python

复制

"""
Script: TAG_BMN_Proposal_Generation.py
Content: Temporal Action Proposal Generation using TAG and BMN algorithms
Usage: python TAG_BMN_Proposal_Generation.py --video long_video.mp4 --algorithm bmn --gpu
Features: Boundary matching map generation, confidence evaluation, NMS post-processing, OpenCV feature extraction
"""

import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Tuple, Dict
import scipy.ndimage as ndimage
from scipy.signal import argrelextrema
import argparse
import json

class TAGAlgorithm:
    """Temporal Action Grouping via watershed and actionness grouping"""
    def __init__(self, min_action_length: int = 10, grouping_threshold: float = 0.5):
        self.min_action_length = min_action_length
        self.threshold = grouping_threshold
        self.gaussian_kernel = 9  # Smoothing for actionness curve
    
    def compute_actionness(self, features: np.ndarray) -> np.ndarray:
        """Compute temporal actionness scores using frame-level features"""
        # Temporal convolution for actionness estimation
        actionness = np.linalg.norm(features, axis=1)
        # Gaussian smoothing to suppress noise
        actionness = ndimage.gaussian_filter1d(actionness, sigma=self.gaussian_kernel)
        # Normalize to 0-1
        actionness = (actionness - actionness.min()) / (actionness.max() - actionness.min())
        return actionness
    
    def temporal_grouping(self, actionness: np.ndarray, video_length: int) -> List[Tuple[int, int, float]]:
        """Watershed-based proposal grouping"""
        proposals = []
        
        # Thresholding to create binary mask
        binary_mask = (actionness > self.threshold).astype(np.int32)
        
        # Label connected components (watershed basins)
        labeled, num_features = ndimage.label(binary_mask)
        
        for i in range(1, num_features + 1):
            region = np.where(labeled == i)[0]
            if len(region) >= self.min_action_length:
                start, end = region[0], region[-1]
                confidence = np.mean(actionness[start:end])
                proposals.append((int(start), int(end), float(confidence)))
        
        return proposals
    
    def suppress_non_maximum(self, proposals: List[Tuple], overlap_thresh: float = 0.7) -> List[Tuple]:
        """Soft-NMS for proposal ranking"""
        if not proposals:
            return []
        
        # Sort by confidence
        proposals = sorted(proposals, key=lambda x: x[2], reverse=True)
        keep = []
        
        while proposals:
            current = proposals.pop(0)
            keep.append(current)
            
            # Compute temporal IoU
            proposals = [p for p in proposals if self._temporal_iou(current, p) < overlap_thresh]
        
        return keep
    
    def _temporal_iou(self, p1: Tuple, p2: Tuple) -> float:
        s1, e1 = p1[0], p1[1]
        s2, e2 = p2[0], p2[1]
        intersection = max(0, min(e1, e2) - max(s1, s2))
        union = max(e1, e2) - min(s1, s2)
        return intersection / union if union > 0 else 0

class BMNNetwork(nn.Module):
    """Boundary-Matching Network for dense proposal generation"""
    def __init__(self, temporal_dim: int = 100, feature_dim: int = 400, 
                 num_samples: int = 32, max_duration: int = 100):
        super().__init__()
        self.temporal_dim = temporal_dim
        self.max_duration = max_duration
        self.num_samples = num_samples
        
        # Base feature extraction
        self.base_layer = nn.Sequential(
            nn.Conv1d(feature_dim, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Conv1d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True)
        )
        
        # Boundary prediction branches
        self.start_conv = nn.Conv1d(512, 1, kernel_size=1)
        self.end_conv = nn.Conv1d(512, 1, kernel_size=1)
        
        # Boundary-Matching module
        self.bm_conv = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.bm_score = nn.Conv2d(512, 1, kernel_size=1)
        
        # Confidence regression
        self.confidence_fc = nn.Linear(512, 1)
    
    def generate_bm_map(self, start_probs: torch.Tensor, end_probs: torch.Tensor) -> torch.Tensor:
        """Construct 2D Boundary-Matching confidence map"""
        batch_size = start_probs.size(0)
        device = start_probs.device
        
        # Create start-end pairs matrix [batch, T, T]
        T = self.temporal_dim
        start_grid = start_probs.unsqueeze(2).expand(-1, -1, T)  # [B, T, 1] -> [B, T, T]
        end_grid = end_probs.unsqueeze(1).expand(-1, T, -1)      # [B, 1, T] -> [B, T, T]
        
        # Mask valid proposals (end > start)
        mask = torch.triu(torch.ones(T, T, device=device), diagonal=0)
        bm_input = start_grid * end_grid * mask.unsqueeze(0)
        
        return bm_input
    
    def forward(self, features: torch.Tensor) -> Dict[str, torch.Tensor]:
        """
        Args:
            features: [batch, temporal_dim, feature_dim]
        Returns:
            start_prob: [batch, T]
            end_prob: [batch, T]
            bm_confidence: [batch, T, T]
        """
        # Transpose for 1D conv [B, C, T]
        x = features.transpose(1, 2)
        base_feat = self.base_layer(x)  # [B, 512, T]
        
        # Boundary probability prediction
        start_logits = self.start_conv(base_feat).squeeze(1)  # [B, T]
        end_logits = self.end_conv(base_feat).squeeze(1)    # [B, T]
        
        start_prob = torch.sigmoid(start_logits)
        end_prob = torch.sigmoid(end_logits)
        
        # Generate BM map
        bm_input = self.generate_bm_map(start_prob, end_prob)
        
        # 2D convolution over BM map
        bm_feat = self.bm_conv(bm_input.unsqueeze(1))  # Add channel dim
        bm_scores = self.bm_score(bm_feat).squeeze(1)
        
        return {
            'start': start_prob,
            'end': end_prob,
            'bm_confidence': torch.sigmoid(bm_scores)
        }

class FeatureExtractor:
    """OpenCV-based Two-Stream feature extraction for proposal generation"""
    def __init__(self, backbone: str = 'resnet', device: str = 'cpu'):
        self.device = device
        self.backbone = backbone
        self.sampling_stride = 16  # Extract features every 16 frames
        
        # Pre-trained feature extractor
        self.net = cv2.dnn.readNetFromCaffe(
            'resnet152.prototxt', 
            'resnet152.caffemodel'
        )
        if device == 'gpu' and cv2.cuda.getCudaEnabledDeviceCount() > 0:
            self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
            self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
    
    def extract_snippet_features(self, video_path: str, output_dim: int = 400) -> np.ndarray:
        """Extract temporal features using sparse sampling"""
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        snippet_indices = range(0, total_frames, self.sampling_stride)
        
        features = []
        
        for idx in snippet_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if not ret:
                break
            
            # Blob preparation
            blob = cv2.dnn.blobFromImage(
                frame, 1.0, (224, 224), 
                (103.939, 116.779, 123.68), 
                swapRB=False, crop=False
            )
            
            self.net.setInput(blob)
            feat = self.net.forward()
            features.append(feat.flatten())
        
        cap.release()
        
        # Padding or truncation to uniform length
        features = np.array(features)
        if len(features) < 100:
            padding = np.zeros((100 - len(features), output_dim))
            features = np.vstack([features, padding])
        return features[:100]  # Truncate to max temporal dim

class ProposalInferencePipeline:
    """End-to-end pipeline for TAG/BMN proposal generation"""
    def __init__(self, method: str = 'bmn', device: str = 'cpu'):
        self.method = method
        self.device = device
        self.feature_extractor = FeatureExtractor(device=device)
        
        if method == 'tag':
            self.proposal_generator = TAGAlgorithm()
        else:
            self.bmn_model = BMNNetwork()
            if device == 'cuda' and torch.cuda.is_available():
                self.bmn_model = self.bmn_model.cuda()
            self.bmn_model.eval()
    
    def generate_proposals(self, video_path: str, top_k: int = 100) -> List[Dict]:
        """Generate temporal proposals with confidence scores"""
        # Feature extraction
        features = self.feature_extractor.extract_snippet_features(video_path)
        
        if self.method == 'tag':
            # TAG: Actionness grouping
            proposals = self.proposal_generator.temporal_grouping(
                self.proposal_generator.compute_actionness(features),
                len(features)
            )
            proposals = self.proposal_generator.suppress_non_maximum(proposals)
        else:
            # BMN: Boundary matching
            feat_tensor = torch.from_numpy(features).float().unsqueeze(0)
            if self.device == 'cuda':
                feat_tensor = feat_tensor.cuda()
            
            with torch.no_grad():
                outputs = self.bmn_model(feat_tensor)
            
            # Proposal generation from BM map
            proposals = self._extract_proposals_from_bm_map(
                outputs['start'].cpu().numpy()[0],
                outputs['end'].cpu().numpy()[0],
                outputs['bm_confidence'].cpu().numpy()[0],
                top_k
            )
        
        # Format output
        results = []
        for start, end, conf in proposals[:top_k]:
            results.append({
                'start_time': start * 0.1,  # Assuming 0.1s per snippet
                'end_time': end * 0.1,
                'confidence': float(conf),
                'duration': (end - start) * 0.1
            })
        
        return results
    
    def _extract_proposals_from_bm_map(self, start_prob: np.ndarray, 
                                      end_prob: np.ndarray,
                                      bm_conf: np.ndarray,
                                      top_k: int) -> List[Tuple]:
        """Extract proposals from 2D BM confidence map"""
        proposals = []
        T = len(start_prob)
        
        # Find peaks in start and end curves
        start_peaks = argrelextrema(start_prob, np.greater, order=5)[0]
        end_peaks = argrelextrema(end_prob, np.greater, order=5)[0]
        
        for s in start_peaks:
            for e in end_peaks:
                if e > s and (e - s) < T // 2:  # Duration constraint
                    conf = bm_conf[s, e] * start_prob[s] * end_prob[e]
                    proposals.append((s, e, conf))
        
        # Sort by confidence
        proposals.sort(key=lambda x: x[2], reverse=True)
        return proposals[:top_k]
    
    def visualize_proposals(self, video_path: str, proposals: List[Dict], output_path: str):
        """Render proposals as temporal segments on video"""
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        out = cv2.VideoWriter(
            output_path,
            cv2.VideoWriter_fourcc(*'mp4v'),
            fps, (width, height)
        )
        
        frame_idx = 0
        colors = [(0, 255, 0), (0, 0, 255), (255, 0, 0)]
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            
            current_time = frame_idx / fps
            active_proposals = [
                p for p in proposals 
                if p['start_time'] <= current_time <= p['end_time']
            ]
            
            # Draw active proposals
            for i, prop in enumerate(active_proposals[:3]):
                color = colors[i % len(colors)]
                progress = (current_time - prop['start_time']) / prop['duration']
                bar_x = int(width * progress)
                
                # Progress bar
                cv2.rectangle(frame, (0, 50 + i*30), (bar_x, 80 + i*30), color, -1)
                text = f"Action {i+1}: {prop['confidence']:.2f}"
                cv2.putText(frame, text, (10, 70 + i*30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            out.write(frame)
            frame_idx += 1
        
        cap.release()
        out.release()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--video', type=str, required=True)
    parser.add_argument('--algorithm', choices=['tag', 'bmn'], default='bmn')
    parser.add_argument('--gpu', action='store_true')
    parser.add_argument('--output', type=str, default='output_proposals.json')
    args = parser.parse_args()
    
    pipeline = ProposalInferencePipeline(
        method=args.algorithm,
        device='cuda' if args.gpu else 'cpu'
    )
    
    proposals = pipeline.generate_proposals(args.video, top_k=50)
    
    # Save results
    with open(args.output, 'w') as f:
        json.dump(proposals, f, indent=2)
    
    print(f"Generated {len(proposals)} proposals, saved to {args.output}")
    
    # Visualization
    pipeline.visualize_proposals(
        args.video, proposals, 
        args.video.replace('.mp4', '_annotated.mp4')
    )

因果在线视频理解

内容:使用因果卷积和高效内存推理进行流式动作检测
用法:python Causal_Online_Video_Understanding.py --source 0 --model gru --latency 50
特点:因果卷积、环形缓冲区内存管理、分块流式传输、异步 OpenCV I/O

Python

复制

"""
Script: Causal_Online_Video_Understanding.py
Content: Streaming action detection with causal convolution and memory-efficient inference
Usage: python Causal_Online_Video_Understanding.py --source 0 --model gru --latency 50
Features: Causal convolutions, ring buffer memory management, chunked streaming, async OpenCV I/O
"""

import cv2
import torch
import torch.nn as nn
import numpy as np
from typing import Deque, Optional, Tuple
from collections import deque
import threading
import queue
import time
import argparse

class CausalConv1d(nn.Module):
    """Causal convolution ensuring temporal order constraints"""
    def __init__(self, in_channels: int, out_channels: int, 
                 kernel_size: int, dilation: int = 1):
        super().__init__()
        self.kernel_size = kernel_size
        self.dilation = dilation
        self.padding = (kernel_size - 1) * dilation
        
        self.conv = nn.Conv1d(
            in_channels, out_channels, kernel_size,
            padding=0,  # Manual padding for causality
            dilation=dilation
        )
        
        # State buffer for streaming inference
        self.cache = None
        self.cache_size = self.padding
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: [batch, channels, time]
        if self.training:
            # Training mode: left padding only
            x_padded = nn.functional.pad(x, (self.padding, 0))
            return self.conv(x_padded)
        else:
            # Inference mode: use cached history
            if self.cache is None or self.cache.size(0) != x.size(0):
                self.cache = torch.zeros(x.size(0), x.size(1), self.cache_size, 
                                       device=x.device)
            
            # Concatenate cache with current input
            x_cache = torch.cat([self.cache, x], dim=2)
            out = self.conv(x_cache)
            
            # Update cache with latest inputs (keep last padding elements)
            self.cache = x_cache[:, :, -self.cache_size:].detach()
            return out
    
    def reset_cache(self):
        """Reset temporal state for new video sequence"""
        self.cache = None

class StreamingActionDetector(nn.Module):
    """Online action detection with causal temporal modeling"""
    def __init__(self, feature_dim: int = 2048, hidden_dim: int = 512, 
                 num_classes: int = 20, num_layers: int = 3):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # Causal temporal convolution stack
        self.temporal_encoder = nn.ModuleList()
        dilation = 1
        for i in range(num_layers):
            self.temporal_encoder.append(
                CausalConv1d(
                    feature_dim if i == 0 else hidden_dim,
                    hidden_dim,
                    kernel_size=3,
                    dilation=dilation
                )
            )
            dilation *= 2  # Exponential dilation for receptive field expansion
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Linear(hidden_dim, num_classes),
            nn.Softmax(dim=-1)
        )
        
        self.frame_buffer = deque(maxlen=dilation * 2)  # Receptive field size
    
    def forward(self, x: torch.Tensor, online: bool = False) -> torch.Tensor:
        """
        x: [batch, channels, time] frame features
        online: True for streaming with stateful inference
        """
        out = x
        for layer in self.temporal_encoder:
            layer.train(not online)  # Set causal mode
            out = torch.relu(layer(out))
        
        return self.classifier(out)
    
    def step(self, frame_feat: torch.Tensor) -> torch.Tensor:
        """Single frame inference for online streaming"""
        # frame_feat: [1, channels]
        self.frame_buffer.append(frame_feat)
        
        if len(self.frame_buffer) < 3:  # Minimum receptive field
            return torch.zeros(1, 20)  # Return dummy for warmup
        
        # Form temporal window from buffer
        window = torch.stack(list(self.frame_buffer), dim=2)  # [1, C, T]
        
        with torch.no_grad():
            return self.forward(window, online=True)

class GRUMemoryBank(nn.Module):
    """GRU-based causal memory for long-term dependency"""
    def __init__(self, input_dim: int, hidden_dim: int, num_classes: int):
        super().__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)
        self.hidden_state = None
    
    def forward(self, x: torch.Tensor, reset: bool = False) -> torch.Tensor:
        # x: [1, 1, input_dim] for single frame step
        if reset or self.hidden_state is None:
            self.hidden_state = None
        
        out, self.hidden_state = self.gru(x, self.hidden_state)
        return self.fc(out[:, -1, :])
    
    def reset_states(self):
        self.hidden_state = None

class OpenCVStreamProcessor:
    """High-performance video streaming with multi-threading"""
    def __init__(self, source: str, buffer_size: int = 32, 
                 target_fps: int = 30, resolution: Tuple[int, int] = (224, 224)):
        self.source = source
        self.buffer_size = buffer_size
        self.target_fps = target_fps
        self.resolution = resolution
        
        # Threading components
        self.frame_queue = queue.Queue(maxsize=buffer_size)
        self.result_queue = queue.Queue()
        self.capture_thread = None
        self.stop_event = threading.Event()
        
        # Feature extractor (OpenCV DNN)
        self.feature_net = cv2.dnn.readNetFromONNX('resnet18.onnx')
        
        # Performance monitoring
        self.frame_times = deque(maxlen=30)
        self.last_time = time.time()
    
    def start_capture(self):
        """Initialize threaded frame capture"""
        def capture_loop():
            cap = cv2.VideoCapture(self.source)
            cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
            
            # Set properties for reduced latency
            cap.set(cv2.CAP_PROP_FPS, self.target_fps)
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
            
            while not self.stop_event.is_set():
                ret, frame = cap.read()
                if ret:
                    # Drop frames if queue full (maintain real-time)
                    try:
                        self.frame_queue.put_nowait(frame)
                    except queue.Full:
                        pass
            cap.release()
        
        self.capture_thread = threading.Thread(target=capture_loop)
        self.capture_thread.start()
    
    def extract_features(self, frame: np.ndarray) -> np.ndarray:
        """ONNX-optimized feature extraction"""
        # Preprocessing
        blob = cv2.dnn.blobFromImage(
            frame, 1.0/255.0, self.resolution, 
            mean=(0.485, 0.456, 0.406),
            swapRB=True
        )
        
        self.feature_net.setInput(blob)
        features = self.feature_net.forward()
        return features.flatten()
    
    def process_stream(self, model: nn.Module, device: str = 'cpu'):
        """Main processing loop with causal inference"""
        self.start_capture()
        
        if device == 'cuda':
            model = model.cuda()
        model.eval()
        
        # Reset model states
        if hasattr(model, 'reset_cache'):
            model.reset_cache()
        if hasattr(model, 'reset_states'):
            model.reset_states()
        
        fps_display = 0
        
        while True:
            try:
                frame = self.frame_queue.get(timeout=1.0)
            except queue.Empty:
                continue
            
            # Compute latency
            current_time = time.time()
            latency = (current_time - self.last_time) * 1000
            self.last_time = current_time
            self.frame_times.append(latency)
            
            # Feature extraction
            feat = self.extract_features(frame)
            feat_tensor = torch.from_numpy(feat).float().unsqueeze(0).unsqueeze(0)
            if device == 'cuda':
                feat_tensor = feat_tensor.cuda()
            
            # Causal inference
            with torch.no_grad():
                if isinstance(model, StreamingActionDetector):
                    output = model.step(feat_tensor)
                else:
                    output = model(feat_tensor, reset=False)
            
            pred_class = torch.argmax(output).item()
            confidence = torch.max(output).item()
            
            # Visualization
            avg_latency = np.mean(self.frame_times)
            fps_display = 1000.0 / avg_latency if avg_latency > 0 else 0
            
            cv2.putText(frame, f"Action: {pred_class} ({confidence:.2f})", 
                       (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            cv2.putText(frame, f"Latency: {avg_latency:.1f}ms ({fps_display:.1f} FPS)", 
                       (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            
            cv2.imshow('Online Action Detection', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        self.stop_event.set()
        self.capture_thread.join()

class ChunkedCausalInference:
    """Memory-efficient chunked processing for long videos"""
    def __init__(self, chunk_size: int = 64, overlap: int = 8):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.memory_bank = []
    
    def process_long_video(self, video_path: str, model: nn.Module, 
                          feature_extractor: callable) -> np.ndarray:
        """Process long video in overlapping chunks with state propagation"""
        cap = cv2.VideoCapture(video_path)
        features_buffer = []
        
        all_predictions = []
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            feat = feature_extractor(frame)
            features_buffer.append(feat)
            
            if len(features_buffer) == self.chunk_size:
                # Process chunk
                chunk_tensor = torch.from_numpy(
                    np.array(features_buffer)
                ).float().unsqueeze(0).transpose(1, 2)  # [1, C, T]
                
                # Carry over last overlap frames for temporal continuity
                if hasattr(model, 'cache'):
                    model.cache = self.memory_bank[-1] if self.memory_bank else None
                
                with torch.no_grad():
                    preds = model(chunk_tensor, online=True)
                    all_predictions.append(preds.cpu().numpy())
                
                # Save state for next chunk
                if hasattr(model, 'cache') and model.cache is not None:
                    self.memory_bank.append(model.cache.clone())
                
                # Slide window with overlap
                features_buffer = features_buffer[-self.overlap:]
        
        cap.release()
        return np.concatenate(all_predictions, axis=0)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--source', type=str, default='0')
    parser.add_argument('--model', choices=['causal_cnn', 'gru'], default='causal_cnn')
    parser.add_argument('--device', choices=['cpu', 'cuda'], default='cpu')
    parser.add_argument('--latency', type=int, default=50, help='Target latency ms')
    args = parser.parse_args()
    
    # Model selection
    if args.model == 'causal_cnn':
        model = StreamingActionDetector(feature_dim=512, hidden_dim=256, num_classes=20)
    else:
        model = GRUMemoryBank(input_dim=512, hidden_dim=256, num_classes=20)
    
    # Stream processing
    processor = OpenCVStreamProcessor(
        source=int(args.source) if args.source.isdigit() else args.source,
        buffer_size=32
    )
    
    try:
        processor.process_stream(model, device=args.device)
    finally:
        cv2.destroyAllWindows()
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐