opencv从入门到精通 第九章 视频动作识别与定位系统
对于未修剪视频,TSN引入多尺度时间窗口积分机制,通过滑动窗口策略密集采样片段,利用最大池化选取峰值激活窗口并跨尺度融合分数。SlowFast采用双路径架构,慢路径以低帧率捕获空间语义信息,快路径以高帧率处理高时间分辨率的运动信息,通过侧向连接实现双向信息融合。Temporal Action Grouping采用自底向上策略,基于动作ness分数序列构建时间分水岭算法,将连续高响应区域聚类为提案,
目录
视频动作识别与定位系统
Temporal Segment Networks通过稀疏采样策略实现长程时序结构建模。该方法将输入视频划分为若干等长时段,从每个时段中随机抽取短片段进行特征提取。所有片段共享卷积网络参数,通过共识函数聚合各片段预测结果形成视频级预测。这种架构有效降低了长视频序列的计算开销,同时捕获了跨时段的时序依赖关系。训练过程中采用跨模态预训练技术,利用RGB模型初始化光流分支,配合部分批量归一化和深度 dropout 抑制过拟合。对于未修剪视频,TSN引入多尺度时间窗口积分机制,通过滑动窗口策略密集采样片段,利用最大池化选取峰值激活窗口并跨尺度融合分数。实时部署时,TSN采用RGB差分或压缩视频运动向量替代光流提取,在保持精度的同时将推理速度提升至每秒340帧。
SlowFast与X3D构成了高效视频理解的互补范式。SlowFast采用双路径架构,慢路径以低帧率捕获空间语义信息,快路径以高帧率处理高时间分辨率的运动信息,通过侧向连接实现双向信息融合。X3D作为可扩展三维卷积网络,沿时间、空间、宽度和深度四个维度渐进式扩展二维图像分类架构,结合深度可分离卷积和通道分离策略减少参数量。部署优化涉及内核融合与硬件感知编译,利用TensorRT或ONNX Runtime执行算子融合,将卷积、批量归一化与激活函数合并为单一内核。移动端部署采用知识蒸馏量化技术,将浮点权重映射为8位整型,配合通道剪枝移除冗余特征映射。
时序动作提名旨在未修剪视频中定位潜在动作区间。Temporal Action Grouping采用自底向上策略,基于动作ness分数序列构建时间分水岭算法,将连续高响应区域聚类为提案,通过自适应阈值抑制背景噪声。Boundary-Matching Network引入边界匹配机制,将提案表示为起始与结束边界的匹配对,构建二维边界匹配置信度图密集评估所有可能区间。网络包含边界分类与置信度评估双分支,联合优化边界定位与提案可靠性估计。实现时采用扩展感受野卷积提取多尺度时序特征,通过非极大值抑制后处理消除重叠提案。
在线视频理解要求模型严格遵循因果约束,仅利用历史帧进行当前时刻预测。因果卷积通过零填充确保卷积核仅访问当前及过去时刻特征,消除未来信息泄露。流式推理架构采用分块处理机制,将长视频序列分割为重叠时序块,利用循环状态传递跨块历史上下文。内存优化方面,采用固定容量特征缓冲区存储历史激活,结合梯度检查点技术权衡计算与显存占用。OpenCV集成时需构建多线程管道,分离视频解码、预处理与推理线程,利用环形缓冲区平滑异步流水线吞吐量差异。
Python
"""
Script: TSN_Inference_Optimized.py
Content: Temporal Segment Networks deployment with OpenCV optimization
Usage: python TSN_Inference_Optimized.py --video input.mp4 --segments 7 --device cpu/gpu
Features: Multi-threaded frame extraction, sparse sampling, two-stream fusion, real-time FPS monitoring
"""
import cv2
import numpy as np
import torch
import torch.nn as nn
from threading import Thread, Lock
from queue import Queue
import argparse
from typing import List, Tuple, Optional
import time
class TSNArch(nn.Module):
def __init__(self, num_classes: int, num_segments: int = 7, modality: str = 'RGB'):
super(TSNArch, self).__init__()
self.num_segments = num_segments
self.modality = modality
self.num_classes = num_classes
# Backbone initialization (BN-Inception or ResNet-152)
self.base_model = self._create_base_model()
self.consensus = nn.AdaptiveAvgPool2d(1)
self.dropout = nn.Dropout(p=0.8)
self.fc = nn.Linear(2048, num_classes)
# Cross-modality initialization handling
if modality != 'RGB':
self._modify_first_conv()
def _create_base_model(self) -> nn.Module:
import torchvision.models as models
model = models.resnet152(pretrained=True)
return nn.Sequential(*list(model.children())[:-2])
def _modify_first_conv(self):
"""Adapt first layer for optical flow input (10 channels for flow stacks)"""
with torch.no_grad():
conv1 = self.base_model[0].weight
# Average RGB channels and replicate for flow depth
new_weight = conv1.mean(dim=1, keepdim=True)
new_weight = new_weight.repeat(1, 10, 1, 1) / 10.0
self.base_model[0] = nn.Conv2d(10, 64, kernel_size=7,
stride=2, padding=3, bias=False)
self.base_model[0].weight.data = new_weight
def forward(self, x: torch.Tensor) -> torch.Tensor:
batch_size = x.size(0)
# Segment-level feature extraction with shared weights
base_out = self.base_model(x.view(-1, x.size(2), x.size(3), x.size(4)))
base_out = self.consensus(base_out).squeeze(-1).squeeze(-1)
base_out = self.dropout(base_out)
output = self.fc(base_out)
# Segmental consensus aggregation
output = output.view(batch_size, self.num_segments, -1)
consensus_output = output.mean(dim=1) # Average pooling consensus
return consensus_output
class VideoBuffer:
"""Thread-safe circular buffer for streaming video frames"""
def __init__(self, capacity: int = 128):
self.queue = Queue(maxsize=capacity)
self.lock = Lock()
self.finished = False
def push(self, frame: np.ndarray) -> bool:
try:
self.queue.put_nowait(frame)
return True
except:
return False
def pop(self) -> Optional[np.ndarray]:
try:
return self.queue.get_nowait()
except:
return None
def size(self) -> int:
return self.queue.qsize()
class OpticalFlowExtractor:
"""GPU-accelerated optical flow computation for TSN temporal stream"""
def __init__(self, device: str = 'cuda', frame_interval: int = 5):
self.device = device
self.frame_interval = frame_interval
self.prev_frame = None
self.flow_buffer = []
# OpenCV CUDA optimization for optical flow
if device == 'cuda' and cv2.cuda.getCudaEnabledDeviceCount() > 0:
self.gpu_enabled = True
self.gpu_flow = cv2.cuda.FarnebackOpticalFlow_create(
numLevels=3, pyrScale=0.5, winSize=15,
numIters=3, polyN=5, polySigma=1.2, flags=0
)
else:
self.gpu_enabled = False
def compute_flow(self, frame: np.ndarray) -> np.ndarray:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if self.prev_frame is None:
self.prev_frame = gray
return np.zeros_like(frame)
if self.gpu_enabled:
gpu_prev = cv2.cuda_GpuMat()
gpu_curr = cv2.cuda_GpuMat()
gpu_prev.upload(self.prev_frame)
gpu_curr.upload(gray)
gpu_flow = self.gpu_flow.calc(gpu_prev, gpu_curr, None)
flow = gpu_flow.download()
else:
flow = cv2.calcOpticalFlowFarneback(
self.prev_frame, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
)
self.prev_frame = gray
return flow
def extract_flow_stack(self, frames: List[np.ndarray]) -> np.ndarray:
"""Generate stacked optical flow (x,y) * 5 = 10 channels"""
flows = []
for i in range(0, len(frames)-1, self.frame_interval):
flow = self.compute_flow(frames[i])
flows.append(flow)
if len(flows) == 5:
break
# Normalize and stack
flow_stack = np.zeros((256, 256, 10), dtype=np.float32)
for idx, flow in enumerate(flows[:5]):
flow_resized = cv2.resize(flow, (256, 256))
flow_stack[:, :, idx*2] = flow_resized[:, :, 0] / 20.0 # x-component
flow_stack[:, :, idx*2+1] = flow_resized[:, :, 1] / 20.0 # y-component
return flow_stack
class TSNInferenceEngine:
"""Optimized TSN inference with multi-threading and batch processing"""
def __init__(self, model_path: str, num_segments: int = 7, device: str = 'cpu'):
self.device = device
self.num_segments = num_segments
# Model initialization
self.model = TSNArch(num_classes=101, num_segments=num_segments)
self.model.eval()
if device == 'cuda' and torch.cuda.is_available():
self.model = self.model.cuda()
# Two-stream preparation
self.rgb_buffer = []
self.flow_extractor = OpticalFlowExtractor(device=device)
# Threading components
self.frame_buffer = VideoBuffer(capacity=256)
self.result_queue = Queue()
self.capture_thread = None
def sparse_sampling(self, total_frames: int) -> List[int]:
"""Sparse temporal sampling: divide video into K segments, sample 1 per segment"""
segment_length = total_frames // self.num_segments
indices = []
for i in range(self.num_segments):
start = i * segment_length
end = (i + 1) * segment_length
# Random sampling during training, center sampling during inference
idx = start + segment_length // 2
indices.append(min(idx, total_frames - 1))
return indices
def preprocess_rgb(self, frame: np.ndarray) -> torch.Tensor:
"""Standard ImageNet preprocessing"""
frame = cv2.resize(frame, (256, 256))
# Center crop 224x224
h, w = frame.shape[:2]
start_h, start_w = (h - 224) // 2, (w - 224) // 2
frame = frame[start_h:start_h+224, start_w:start_w+224]
frame = frame.astype(np.float32) / 255.0
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
frame = (frame - mean) / std
frame = torch.from_numpy(frame).permute(2, 0, 1).unsqueeze(0)
return frame
def inference(self, video_path: str) -> Tuple[int, float]:
"""Video-level prediction with sparse sampling"""
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
indices = self.sparse_sampling(total_frames)
# Sparse frame extraction
frames = []
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
frames.append(self.preprocess_rgb(frame))
cap.release()
if len(frames) < self.num_segments:
# Padding with last frame if video too short
while len(frames) < self.num_segments:
frames.append(frames[-1] if frames else torch.zeros(1, 3, 224, 224))
batch = torch.stack(frames).squeeze(1)
if self.device == 'cuda':
batch = batch.cuda()
with torch.no_grad():
with torch.cuda.amp.autocast(): # Mixed precision for speed
output = self.model(batch.unsqueeze(0))
probs = torch.softmax(output, dim=1)
confidence, pred_class = probs.max(1)
return pred_class.item(), confidence.item()
def stream_inference(self, source: int = 0):
"""Real-time streaming inference with thread pool optimization"""
def capture_frames():
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Minimize latency
while not self.frame_buffer.finished:
ret, frame = cap.read()
if ret:
self.frame_buffer.push(frame)
cap.release()
self.capture_thread = Thread(target=capture_frames)
self.capture_thread.start()
# Processing loop with sliding window
window_frames = []
while True:
frame = self.frame_buffer.pop()
if frame is None:
continue
window_frames.append(frame)
if len(window_frames) == self.num_segments:
# Batch inference on window
batch = [self.preprocess_rgb(f) for f in window_frames]
batch_tensor = torch.stack(batch).squeeze(1)
if self.device == 'cuda':
batch_tensor = batch_tensor.cuda()
with torch.no_grad():
output = self.model(batch_tensor)
# Temporal fusion: weight recent frames higher
weights = torch.tensor([0.5, 0.7, 0.8, 0.9, 1.0, 1.0, 1.0])
weighted_output = (output * weights.unsqueeze(1).to(output.device)).mean(0)
pred = torch.argmax(weighted_output).item()
window_frames.pop(0)
yield pred, frame
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--video', type=str, default='0')
parser.add_argument('--segments', type=int, default=7)
parser.add_argument('--device', type=str, default='cpu', choices=['cpu', 'cuda'])
args = parser.parse_args()
engine = TSNInferenceEngine(
model_path='tsn_resnet152.pth',
num_segments=args.segments,
device=args.device
)
if args.video == '0':
args.video = 0
# Real-time demonstration
for pred_class, frame in engine.stream_inference(int(args.video)):
cv2.putText(frame, f"Class: {pred_class}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('TSN Streaming', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
SlowFast_X3D 部署
Python
"""
Script: SlowFast_X3D_Deployment.py
Content: SlowFast and X3D model optimization for edge deployment
Usage: python SlowFast_X3D_Deployment.py --model_type slowfast --quantization int8 --source video.mp4
Features: TensorRT conversion, kernel fusion, precision calibration, multi-stream batching
"""
import cv2
import torch
import torch.nn as nn
import numpy as np
from typing import Dict, List, Tuple
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
from collections import deque
import argparse
import time
class SlowFastPathway(nn.Module):
"""Dual-pathway architecture with lateral connections"""
def __init__(self, slow_channels: int = 64, fast_channels: int = 8):
super().__init__()
self.slow_path = self._create_slow_path(slow_channels)
self.fast_path = self._create_fast_path(fast_channels)
self.lateral_connections = nn.ModuleList([
nn.Conv3d(fast_channels, slow_channels, kernel_size=(5, 1, 1),
stride=(4, 1, 1), padding=(2, 0, 0))
for _ in range(4) # ResNet stages
])
def _create_slow_path(self, channels: int) -> nn.Module:
# Low frame rate (4 frames per clip), high channel capacity
return nn.Sequential(
nn.Conv3d(3, channels, kernel_size=(1, 7, 7), stride=(1, 2, 2), padding=(0, 3, 3)),
nn.BatchNorm3d(channels),
nn.ReLU(inplace=True)
)
def _create_fast_path(self, channels: int) -> nn.Module:
# High frame rate (32 frames), low channel capacity (alpha=1/8)
return nn.Sequential(
nn.Conv3d(3, channels, kernel_size=(5, 7, 7), stride=(1, 2, 2), padding=(2, 3, 3)),
nn.BatchNorm3d(channels),
nn.ReLU(inplace=True)
)
def forward(self, slow_input: torch.Tensor, fast_input: torch.Tensor) -> torch.Tensor:
slow_feat = self.slow_path(slow_input)
fast_feat = self.fast_path(fast_input)
# Lateral fusion: fast to slow
lateral_feat = self.lateral_connections[0](fast_feat)
fused = slow_feat + lateral_feat
return fused, fast_feat
class X3DScaleEfficient(nn.Module):
"""Expandable 3D convolution with depthwise separable decomposition"""
def __init__(self, input_dim: int = 3, expansion_factor: float = 2.25):
super().__init__()
self.expansion_factor = expansion_factor
# Progressive expansion along 4 dimensions: {gamma_t, gamma_s, gamma_w, gamma_d}
self.stem = nn.Sequential(
nn.Conv3d(input_dim, 24, kernel_size=(1, 3, 3), stride=(1, 2, 2),
padding=(0, 1, 1), bias=False),
nn.BatchNorm3d(24),
nn.ReLU(inplace=True)
)
# Depthwise separable 3D conv for mobile optimization
self.block = self._create_x3d_block(24, int(24 * expansion_factor))
def _create_x3d_block(self, in_ch: int, out_ch: int) -> nn.Module:
mid_ch = in_ch * self.expansion_factor
return nn.Sequential(
# Depthwise
nn.Conv3d(in_ch, in_ch, kernel_size=(3, 3, 3),
groups=in_ch, padding=(1, 1, 1), bias=False),
nn.BatchNorm3d(in_ch),
nn.ReLU(inplace=True),
# Pointwise expansion
nn.Conv3d(in_ch, mid_ch, kernel_size=1, bias=False),
nn.BatchNorm3d(mid_ch),
nn.ReLU(inplace=True),
# Pointwise projection
nn.Conv3d(mid_ch, out_ch, kernel_size=1, bias=False),
nn.BatchNorm3d(out_ch)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.block(self.stem(x))
class TensorRTOptimizer:
"""TensorRT deployment optimization with layer fusion and precision calibration"""
def __init__(self, model: nn.Module, input_shapes: Dict[str, Tuple],
precision: str = 'fp16', max_batch: int = 8):
self.model = model
self.input_shapes = input_shapes
self.precision = precision
self.max_batch = max_batch
self.logger = trt.Logger(trt.Logger.WARNING)
self.engine = None
self.context = None
def build_engine(self, onnx_path: str) -> trt.ICudaEngine:
"""Convert ONNX model to optimized TensorRT engine"""
builder = trt.Builder(self.logger)
network = builder.create_network(
1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
)
parser = trt.OnnxParser(network, self.logger)
with open(onnx_path, 'rb') as f:
parser.parse(f.read())
config = builder.create_builder_config()
config.max_workspace_size = 1 << 30 # 1GB workspace
if self.precision == 'int8':
config.set_flag(trt.BuilderFlag.INT8)
# Calibration for quantization
config.int8_calibrator = self._create_calibrator()
elif self.precision == 'fp16':
config.set_flag(trt.BuilderFlag.FP16)
# Enable kernel fusion and DLA acceleration if available
config.set_flag(trt.BuilderFlag.STRICT_TYPES)
self.engine = builder.build_engine(network, config)
self.context = self.engine.create_execution_context()
return self.engine
def _create_calibrator(self):
"""INT8 calibration using entropy minimization"""
class Int8Calibrator(trt.IInt8EntropyCalibrator2):
def __init__(self, data_loader, cache_file):
super().__init__()
self.data_loader = data_loader
self.cache_file = cache_file
self.batch_size = 8
self.current_index = 0
def get_batch(self, names, p_str=None):
if self.current_index >= len(self.data_loader):
return None
# Return calibration batch
batch = next(iter(self.data_loader))
self.current_index += 1
return [batch.cuda().data_ptr()]
def get_batch_size(self):
return self.batch_size
return Int8Calibrator(None, 'calibration.cache')
def infer(self, inputs: Dict[str, torch.Tensor]) -> torch.Tensor:
"""Optimized inference with pinned memory and async execution"""
# Allocate device memory
d_inputs = {}
h_outputs = {}
d_outputs = {}
bindings = []
for name, shape in self.input_shapes.items():
size = trt.volume(shape) * self.max_batch
d_inputs[name] = cuda.mem_alloc(size * 4) # float32
# Pinned memory for host-to-device transfer
pinned = cuda.pagelocked_empty(size, np.float32)
cuda.memcpy_htod_async(d_inputs[name], pinned, cuda.Stream())
bindings.append(int(d_inputs[name]))
# Execute async
stream = cuda.Stream()
self.context.execute_async_v2(bindings=bindings, stream_handle=stream.handle)
stream.synchronize()
return torch.randn(self.max_batch, 400) # Placeholder for output
class StreamingVideoProcessor:
"""Real-time video processing with batch aggregation and latency hiding"""
def __init__(self, engine: TensorRTOptimizer, clip_length: int = 32,
sampling_rate: int = 2):
self.engine = engine
self.clip_length = clip_length
self.sampling_rate = sampling_rate
self.frame_buffer = deque(maxlen=clip_length * sampling_rate)
self.async_stream = cuda.Stream()
def preprocess_opencv(self, frame: np.ndarray, target_size: Tuple[int, int] = (256, 256)) -> np.ndarray:
"""Hardware-accelerated preprocessing using OpenCV CUDA"""
gpu_frame = cv2.cuda_GpuMat()
gpu_frame.upload(frame)
# Resize on GPU
gpu_resized = cv2.cuda.resize(gpu_frame, target_size)
# Normalize and convert to tensor format
frame_rgb = cv2.cuda.cvtColor(gpu_resized, cv2.COLOR_BGR2RGB)
cpu_frame = frame_rgb.download().astype(np.float32) / 255.0
# Mean subtraction and std normalization
mean = np.array([0.45, 0.45, 0.45])
std = np.array([0.225, 0.225, 0.225])
normalized = (cpu_frame - mean) / std
# HWC to CHW format
return np.transpose(normalized, (2, 0, 1))
def temporal_sampling(self, frames: deque) -> np.ndarray:
"""Sparse sampling for slow and fast pathways"""
# Slow path: 4 frames
slow_indices = np.linspace(0, len(frames)-1, 4, dtype=int)
slow_clip = np.stack([frames[i] for i in slow_indices])
# Fast path: 32 frames (alpha=8 temporal resolution)
fast_indices = np.linspace(0, len(frames)-1, 32, dtype=int)
fast_clip = np.stack([frames[i] for i in fast_indices])
return slow_clip, fast_clip
def process_stream(self, video_source: str):
"""Continuous processing with queue management"""
cap = cv2.VideoCapture(video_source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Low latency mode
batch_frames = []
last_inference_time = time.time()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
processed = self.preprocess_opencv(frame)
self.frame_buffer.append(processed)
# Trigger inference when buffer full
if len(self.frame_buffer) == self.frame_buffer.maxlen:
slow_input, fast_input = self.temporal_sampling(self.frame_buffer)
# Prepare batch tensors
slow_tensor = torch.from_numpy(slow_input).unsqueeze(0).cuda()
fast_tensor = torch.from_numpy(fast_input).unsqueeze(0).cuda()
# Async inference
result = self.engine.infer({'slow': slow_tensor, 'fast': fast_tensor})
# Overlapping I/O: fetch results while processing next batch
pred_class = torch.argmax(result).item()
# Display with latency annotation
latency = (time.time() - last_inference_time) * 1000
cv2.putText(frame, f"Pred: {pred_class} Lat: {latency:.1f}ms",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow('SlowFast Stream', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
last_inference_time = time.time()
# Slide window with overlap (tau=0.5)
for _ in range(self.clip_length // 2):
self.frame_buffer.popleft()
cap.release()
def export_to_onnx(model: nn.Module, save_path: str, input_sample: torch.Tensor):
"""Export PyTorch model to ONNX with dynamic batch and temporal dimensions"""
torch.onnx.export(
model,
input_sample,
save_path,
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size', 2: 'num_frames'},
'output': {0: 'batch_size'}
}
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--model_type', choices=['slowfast', 'x3d'], default='slowfast')
parser.add_argument('--quantization', choices=['fp32', 'fp16', 'int8'], default='fp16')
parser.add_argument('--source', type=str, required=True)
args = parser.parse_args()
# Model initialization
if args.model_type == 'slowfast':
model = SlowFastPathway()
dummy_input = (torch.randn(1, 3, 4, 224, 224), torch.randn(1, 3, 32, 224, 224))
else:
model = X3DScaleEfficient()
dummy_input = torch.randn(1, 3, 16, 224, 224)
# Export and optimize
onnx_path = f"{args.model_type}.onnx"
export_to_onnx(model, onnx_path, dummy_input)
optimizer = TensorRTOptimizer(
model=model,
input_shapes={'input': (1, 3, 16, 224, 224)},
precision=args.quantization
)
optimizer.build_engine(onnx_path)
# Streaming inference
processor = StreamingVideoProcessor(optimizer)
processor.process_stream(args.source)
使用 TAG 和 BMN 算法进行时间动作提议生成
内容:使用 TAG 和 BMN 算法进行时间动作提议生成
用法:python TAG_BMN_Proposal_Generation.py --视频 long_video.mp4 --算法 bmn --GPU
功能:边界匹配图生成、置信度评估、NMS 后处理、OpenCV 特征提取
Python
复制
"""
Script: TAG_BMN_Proposal_Generation.py
Content: Temporal Action Proposal Generation using TAG and BMN algorithms
Usage: python TAG_BMN_Proposal_Generation.py --video long_video.mp4 --algorithm bmn --gpu
Features: Boundary matching map generation, confidence evaluation, NMS post-processing, OpenCV feature extraction
"""
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Tuple, Dict
import scipy.ndimage as ndimage
from scipy.signal import argrelextrema
import argparse
import json
class TAGAlgorithm:
"""Temporal Action Grouping via watershed and actionness grouping"""
def __init__(self, min_action_length: int = 10, grouping_threshold: float = 0.5):
self.min_action_length = min_action_length
self.threshold = grouping_threshold
self.gaussian_kernel = 9 # Smoothing for actionness curve
def compute_actionness(self, features: np.ndarray) -> np.ndarray:
"""Compute temporal actionness scores using frame-level features"""
# Temporal convolution for actionness estimation
actionness = np.linalg.norm(features, axis=1)
# Gaussian smoothing to suppress noise
actionness = ndimage.gaussian_filter1d(actionness, sigma=self.gaussian_kernel)
# Normalize to 0-1
actionness = (actionness - actionness.min()) / (actionness.max() - actionness.min())
return actionness
def temporal_grouping(self, actionness: np.ndarray, video_length: int) -> List[Tuple[int, int, float]]:
"""Watershed-based proposal grouping"""
proposals = []
# Thresholding to create binary mask
binary_mask = (actionness > self.threshold).astype(np.int32)
# Label connected components (watershed basins)
labeled, num_features = ndimage.label(binary_mask)
for i in range(1, num_features + 1):
region = np.where(labeled == i)[0]
if len(region) >= self.min_action_length:
start, end = region[0], region[-1]
confidence = np.mean(actionness[start:end])
proposals.append((int(start), int(end), float(confidence)))
return proposals
def suppress_non_maximum(self, proposals: List[Tuple], overlap_thresh: float = 0.7) -> List[Tuple]:
"""Soft-NMS for proposal ranking"""
if not proposals:
return []
# Sort by confidence
proposals = sorted(proposals, key=lambda x: x[2], reverse=True)
keep = []
while proposals:
current = proposals.pop(0)
keep.append(current)
# Compute temporal IoU
proposals = [p for p in proposals if self._temporal_iou(current, p) < overlap_thresh]
return keep
def _temporal_iou(self, p1: Tuple, p2: Tuple) -> float:
s1, e1 = p1[0], p1[1]
s2, e2 = p2[0], p2[1]
intersection = max(0, min(e1, e2) - max(s1, s2))
union = max(e1, e2) - min(s1, s2)
return intersection / union if union > 0 else 0
class BMNNetwork(nn.Module):
"""Boundary-Matching Network for dense proposal generation"""
def __init__(self, temporal_dim: int = 100, feature_dim: int = 400,
num_samples: int = 32, max_duration: int = 100):
super().__init__()
self.temporal_dim = temporal_dim
self.max_duration = max_duration
self.num_samples = num_samples
# Base feature extraction
self.base_layer = nn.Sequential(
nn.Conv1d(feature_dim, 512, kernel_size=3, padding=1),
nn.BatchNorm1d(512),
nn.ReLU(inplace=True),
nn.Conv1d(512, 512, kernel_size=3, padding=1),
nn.BatchNorm1d(512),
nn.ReLU(inplace=True)
)
# Boundary prediction branches
self.start_conv = nn.Conv1d(512, 1, kernel_size=1)
self.end_conv = nn.Conv1d(512, 1, kernel_size=1)
# Boundary-Matching module
self.bm_conv = nn.Conv2d(512, 512, kernel_size=3, padding=1)
self.bm_score = nn.Conv2d(512, 1, kernel_size=1)
# Confidence regression
self.confidence_fc = nn.Linear(512, 1)
def generate_bm_map(self, start_probs: torch.Tensor, end_probs: torch.Tensor) -> torch.Tensor:
"""Construct 2D Boundary-Matching confidence map"""
batch_size = start_probs.size(0)
device = start_probs.device
# Create start-end pairs matrix [batch, T, T]
T = self.temporal_dim
start_grid = start_probs.unsqueeze(2).expand(-1, -1, T) # [B, T, 1] -> [B, T, T]
end_grid = end_probs.unsqueeze(1).expand(-1, T, -1) # [B, 1, T] -> [B, T, T]
# Mask valid proposals (end > start)
mask = torch.triu(torch.ones(T, T, device=device), diagonal=0)
bm_input = start_grid * end_grid * mask.unsqueeze(0)
return bm_input
def forward(self, features: torch.Tensor) -> Dict[str, torch.Tensor]:
"""
Args:
features: [batch, temporal_dim, feature_dim]
Returns:
start_prob: [batch, T]
end_prob: [batch, T]
bm_confidence: [batch, T, T]
"""
# Transpose for 1D conv [B, C, T]
x = features.transpose(1, 2)
base_feat = self.base_layer(x) # [B, 512, T]
# Boundary probability prediction
start_logits = self.start_conv(base_feat).squeeze(1) # [B, T]
end_logits = self.end_conv(base_feat).squeeze(1) # [B, T]
start_prob = torch.sigmoid(start_logits)
end_prob = torch.sigmoid(end_logits)
# Generate BM map
bm_input = self.generate_bm_map(start_prob, end_prob)
# 2D convolution over BM map
bm_feat = self.bm_conv(bm_input.unsqueeze(1)) # Add channel dim
bm_scores = self.bm_score(bm_feat).squeeze(1)
return {
'start': start_prob,
'end': end_prob,
'bm_confidence': torch.sigmoid(bm_scores)
}
class FeatureExtractor:
"""OpenCV-based Two-Stream feature extraction for proposal generation"""
def __init__(self, backbone: str = 'resnet', device: str = 'cpu'):
self.device = device
self.backbone = backbone
self.sampling_stride = 16 # Extract features every 16 frames
# Pre-trained feature extractor
self.net = cv2.dnn.readNetFromCaffe(
'resnet152.prototxt',
'resnet152.caffemodel'
)
if device == 'gpu' and cv2.cuda.getCudaEnabledDeviceCount() > 0:
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
def extract_snippet_features(self, video_path: str, output_dim: int = 400) -> np.ndarray:
"""Extract temporal features using sparse sampling"""
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
snippet_indices = range(0, total_frames, self.sampling_stride)
features = []
for idx in snippet_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if not ret:
break
# Blob preparation
blob = cv2.dnn.blobFromImage(
frame, 1.0, (224, 224),
(103.939, 116.779, 123.68),
swapRB=False, crop=False
)
self.net.setInput(blob)
feat = self.net.forward()
features.append(feat.flatten())
cap.release()
# Padding or truncation to uniform length
features = np.array(features)
if len(features) < 100:
padding = np.zeros((100 - len(features), output_dim))
features = np.vstack([features, padding])
return features[:100] # Truncate to max temporal dim
class ProposalInferencePipeline:
"""End-to-end pipeline for TAG/BMN proposal generation"""
def __init__(self, method: str = 'bmn', device: str = 'cpu'):
self.method = method
self.device = device
self.feature_extractor = FeatureExtractor(device=device)
if method == 'tag':
self.proposal_generator = TAGAlgorithm()
else:
self.bmn_model = BMNNetwork()
if device == 'cuda' and torch.cuda.is_available():
self.bmn_model = self.bmn_model.cuda()
self.bmn_model.eval()
def generate_proposals(self, video_path: str, top_k: int = 100) -> List[Dict]:
"""Generate temporal proposals with confidence scores"""
# Feature extraction
features = self.feature_extractor.extract_snippet_features(video_path)
if self.method == 'tag':
# TAG: Actionness grouping
proposals = self.proposal_generator.temporal_grouping(
self.proposal_generator.compute_actionness(features),
len(features)
)
proposals = self.proposal_generator.suppress_non_maximum(proposals)
else:
# BMN: Boundary matching
feat_tensor = torch.from_numpy(features).float().unsqueeze(0)
if self.device == 'cuda':
feat_tensor = feat_tensor.cuda()
with torch.no_grad():
outputs = self.bmn_model(feat_tensor)
# Proposal generation from BM map
proposals = self._extract_proposals_from_bm_map(
outputs['start'].cpu().numpy()[0],
outputs['end'].cpu().numpy()[0],
outputs['bm_confidence'].cpu().numpy()[0],
top_k
)
# Format output
results = []
for start, end, conf in proposals[:top_k]:
results.append({
'start_time': start * 0.1, # Assuming 0.1s per snippet
'end_time': end * 0.1,
'confidence': float(conf),
'duration': (end - start) * 0.1
})
return results
def _extract_proposals_from_bm_map(self, start_prob: np.ndarray,
end_prob: np.ndarray,
bm_conf: np.ndarray,
top_k: int) -> List[Tuple]:
"""Extract proposals from 2D BM confidence map"""
proposals = []
T = len(start_prob)
# Find peaks in start and end curves
start_peaks = argrelextrema(start_prob, np.greater, order=5)[0]
end_peaks = argrelextrema(end_prob, np.greater, order=5)[0]
for s in start_peaks:
for e in end_peaks:
if e > s and (e - s) < T // 2: # Duration constraint
conf = bm_conf[s, e] * start_prob[s] * end_prob[e]
proposals.append((s, e, conf))
# Sort by confidence
proposals.sort(key=lambda x: x[2], reverse=True)
return proposals[:top_k]
def visualize_proposals(self, video_path: str, proposals: List[Dict], output_path: str):
"""Render proposals as temporal segments on video"""
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(
output_path,
cv2.VideoWriter_fourcc(*'mp4v'),
fps, (width, height)
)
frame_idx = 0
colors = [(0, 255, 0), (0, 0, 255), (255, 0, 0)]
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
current_time = frame_idx / fps
active_proposals = [
p for p in proposals
if p['start_time'] <= current_time <= p['end_time']
]
# Draw active proposals
for i, prop in enumerate(active_proposals[:3]):
color = colors[i % len(colors)]
progress = (current_time - prop['start_time']) / prop['duration']
bar_x = int(width * progress)
# Progress bar
cv2.rectangle(frame, (0, 50 + i*30), (bar_x, 80 + i*30), color, -1)
text = f"Action {i+1}: {prop['confidence']:.2f}"
cv2.putText(frame, text, (10, 70 + i*30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
out.write(frame)
frame_idx += 1
cap.release()
out.release()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--video', type=str, required=True)
parser.add_argument('--algorithm', choices=['tag', 'bmn'], default='bmn')
parser.add_argument('--gpu', action='store_true')
parser.add_argument('--output', type=str, default='output_proposals.json')
args = parser.parse_args()
pipeline = ProposalInferencePipeline(
method=args.algorithm,
device='cuda' if args.gpu else 'cpu'
)
proposals = pipeline.generate_proposals(args.video, top_k=50)
# Save results
with open(args.output, 'w') as f:
json.dump(proposals, f, indent=2)
print(f"Generated {len(proposals)} proposals, saved to {args.output}")
# Visualization
pipeline.visualize_proposals(
args.video, proposals,
args.video.replace('.mp4', '_annotated.mp4')
)
因果在线视频理解
内容:使用因果卷积和高效内存推理进行流式动作检测
用法:python Causal_Online_Video_Understanding.py --source 0 --model gru --latency 50
特点:因果卷积、环形缓冲区内存管理、分块流式传输、异步 OpenCV I/O
Python
复制
"""
Script: Causal_Online_Video_Understanding.py
Content: Streaming action detection with causal convolution and memory-efficient inference
Usage: python Causal_Online_Video_Understanding.py --source 0 --model gru --latency 50
Features: Causal convolutions, ring buffer memory management, chunked streaming, async OpenCV I/O
"""
import cv2
import torch
import torch.nn as nn
import numpy as np
from typing import Deque, Optional, Tuple
from collections import deque
import threading
import queue
import time
import argparse
class CausalConv1d(nn.Module):
"""Causal convolution ensuring temporal order constraints"""
def __init__(self, in_channels: int, out_channels: int,
kernel_size: int, dilation: int = 1):
super().__init__()
self.kernel_size = kernel_size
self.dilation = dilation
self.padding = (kernel_size - 1) * dilation
self.conv = nn.Conv1d(
in_channels, out_channels, kernel_size,
padding=0, # Manual padding for causality
dilation=dilation
)
# State buffer for streaming inference
self.cache = None
self.cache_size = self.padding
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: [batch, channels, time]
if self.training:
# Training mode: left padding only
x_padded = nn.functional.pad(x, (self.padding, 0))
return self.conv(x_padded)
else:
# Inference mode: use cached history
if self.cache is None or self.cache.size(0) != x.size(0):
self.cache = torch.zeros(x.size(0), x.size(1), self.cache_size,
device=x.device)
# Concatenate cache with current input
x_cache = torch.cat([self.cache, x], dim=2)
out = self.conv(x_cache)
# Update cache with latest inputs (keep last padding elements)
self.cache = x_cache[:, :, -self.cache_size:].detach()
return out
def reset_cache(self):
"""Reset temporal state for new video sequence"""
self.cache = None
class StreamingActionDetector(nn.Module):
"""Online action detection with causal temporal modeling"""
def __init__(self, feature_dim: int = 2048, hidden_dim: int = 512,
num_classes: int = 20, num_layers: int = 3):
super().__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
# Causal temporal convolution stack
self.temporal_encoder = nn.ModuleList()
dilation = 1
for i in range(num_layers):
self.temporal_encoder.append(
CausalConv1d(
feature_dim if i == 0 else hidden_dim,
hidden_dim,
kernel_size=3,
dilation=dilation
)
)
dilation *= 2 # Exponential dilation for receptive field expansion
# Classification head
self.classifier = nn.Sequential(
nn.AdaptiveAvgPool1d(1),
nn.Flatten(),
nn.Linear(hidden_dim, num_classes),
nn.Softmax(dim=-1)
)
self.frame_buffer = deque(maxlen=dilation * 2) # Receptive field size
def forward(self, x: torch.Tensor, online: bool = False) -> torch.Tensor:
"""
x: [batch, channels, time] frame features
online: True for streaming with stateful inference
"""
out = x
for layer in self.temporal_encoder:
layer.train(not online) # Set causal mode
out = torch.relu(layer(out))
return self.classifier(out)
def step(self, frame_feat: torch.Tensor) -> torch.Tensor:
"""Single frame inference for online streaming"""
# frame_feat: [1, channels]
self.frame_buffer.append(frame_feat)
if len(self.frame_buffer) < 3: # Minimum receptive field
return torch.zeros(1, 20) # Return dummy for warmup
# Form temporal window from buffer
window = torch.stack(list(self.frame_buffer), dim=2) # [1, C, T]
with torch.no_grad():
return self.forward(window, online=True)
class GRUMemoryBank(nn.Module):
"""GRU-based causal memory for long-term dependency"""
def __init__(self, input_dim: int, hidden_dim: int, num_classes: int):
super().__init__()
self.gru = nn.GRU(input_dim, hidden_dim, batch_first=True)
self.fc = nn.Linear(hidden_dim, num_classes)
self.hidden_state = None
def forward(self, x: torch.Tensor, reset: bool = False) -> torch.Tensor:
# x: [1, 1, input_dim] for single frame step
if reset or self.hidden_state is None:
self.hidden_state = None
out, self.hidden_state = self.gru(x, self.hidden_state)
return self.fc(out[:, -1, :])
def reset_states(self):
self.hidden_state = None
class OpenCVStreamProcessor:
"""High-performance video streaming with multi-threading"""
def __init__(self, source: str, buffer_size: int = 32,
target_fps: int = 30, resolution: Tuple[int, int] = (224, 224)):
self.source = source
self.buffer_size = buffer_size
self.target_fps = target_fps
self.resolution = resolution
# Threading components
self.frame_queue = queue.Queue(maxsize=buffer_size)
self.result_queue = queue.Queue()
self.capture_thread = None
self.stop_event = threading.Event()
# Feature extractor (OpenCV DNN)
self.feature_net = cv2.dnn.readNetFromONNX('resnet18.onnx')
# Performance monitoring
self.frame_times = deque(maxlen=30)
self.last_time = time.time()
def start_capture(self):
"""Initialize threaded frame capture"""
def capture_loop():
cap = cv2.VideoCapture(self.source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Set properties for reduced latency
cap.set(cv2.CAP_PROP_FPS, self.target_fps)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
while not self.stop_event.is_set():
ret, frame = cap.read()
if ret:
# Drop frames if queue full (maintain real-time)
try:
self.frame_queue.put_nowait(frame)
except queue.Full:
pass
cap.release()
self.capture_thread = threading.Thread(target=capture_loop)
self.capture_thread.start()
def extract_features(self, frame: np.ndarray) -> np.ndarray:
"""ONNX-optimized feature extraction"""
# Preprocessing
blob = cv2.dnn.blobFromImage(
frame, 1.0/255.0, self.resolution,
mean=(0.485, 0.456, 0.406),
swapRB=True
)
self.feature_net.setInput(blob)
features = self.feature_net.forward()
return features.flatten()
def process_stream(self, model: nn.Module, device: str = 'cpu'):
"""Main processing loop with causal inference"""
self.start_capture()
if device == 'cuda':
model = model.cuda()
model.eval()
# Reset model states
if hasattr(model, 'reset_cache'):
model.reset_cache()
if hasattr(model, 'reset_states'):
model.reset_states()
fps_display = 0
while True:
try:
frame = self.frame_queue.get(timeout=1.0)
except queue.Empty:
continue
# Compute latency
current_time = time.time()
latency = (current_time - self.last_time) * 1000
self.last_time = current_time
self.frame_times.append(latency)
# Feature extraction
feat = self.extract_features(frame)
feat_tensor = torch.from_numpy(feat).float().unsqueeze(0).unsqueeze(0)
if device == 'cuda':
feat_tensor = feat_tensor.cuda()
# Causal inference
with torch.no_grad():
if isinstance(model, StreamingActionDetector):
output = model.step(feat_tensor)
else:
output = model(feat_tensor, reset=False)
pred_class = torch.argmax(output).item()
confidence = torch.max(output).item()
# Visualization
avg_latency = np.mean(self.frame_times)
fps_display = 1000.0 / avg_latency if avg_latency > 0 else 0
cv2.putText(frame, f"Action: {pred_class} ({confidence:.2f})",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(frame, f"Latency: {avg_latency:.1f}ms ({fps_display:.1f} FPS)",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow('Online Action Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
self.stop_event.set()
self.capture_thread.join()
class ChunkedCausalInference:
"""Memory-efficient chunked processing for long videos"""
def __init__(self, chunk_size: int = 64, overlap: int = 8):
self.chunk_size = chunk_size
self.overlap = overlap
self.memory_bank = []
def process_long_video(self, video_path: str, model: nn.Module,
feature_extractor: callable) -> np.ndarray:
"""Process long video in overlapping chunks with state propagation"""
cap = cv2.VideoCapture(video_path)
features_buffer = []
all_predictions = []
while True:
ret, frame = cap.read()
if not ret:
break
feat = feature_extractor(frame)
features_buffer.append(feat)
if len(features_buffer) == self.chunk_size:
# Process chunk
chunk_tensor = torch.from_numpy(
np.array(features_buffer)
).float().unsqueeze(0).transpose(1, 2) # [1, C, T]
# Carry over last overlap frames for temporal continuity
if hasattr(model, 'cache'):
model.cache = self.memory_bank[-1] if self.memory_bank else None
with torch.no_grad():
preds = model(chunk_tensor, online=True)
all_predictions.append(preds.cpu().numpy())
# Save state for next chunk
if hasattr(model, 'cache') and model.cache is not None:
self.memory_bank.append(model.cache.clone())
# Slide window with overlap
features_buffer = features_buffer[-self.overlap:]
cap.release()
return np.concatenate(all_predictions, axis=0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--source', type=str, default='0')
parser.add_argument('--model', choices=['causal_cnn', 'gru'], default='causal_cnn')
parser.add_argument('--device', choices=['cpu', 'cuda'], default='cpu')
parser.add_argument('--latency', type=int, default=50, help='Target latency ms')
args = parser.parse_args()
# Model selection
if args.model == 'causal_cnn':
model = StreamingActionDetector(feature_dim=512, hidden_dim=256, num_classes=20)
else:
model = GRUMemoryBank(input_dim=512, hidden_dim=256, num_classes=20)
# Stream processing
processor = OpenCVStreamProcessor(
source=int(args.source) if args.source.isdigit() else args.source,
buffer_size=32
)
try:
processor.process_stream(model, device=args.device)
finally:
cv2.destroyAllWindows()更多推荐
所有评论(0)