roi_dir.py

import glob

import cv2
import numpy as np
import json
import os

class ROIDrawer:
    def __init__(self, image_o, label="tiaosheng"):
        self.drawing = False
        self.ix, self.iy = -1, -1
        self.rois = []  # 存储多个ROI
        self.image_o = image_o
        self.image = self.image_o.copy()
        self.temp_image = self.image.copy()
        self.ok = False
        self.label = label  # 目标标签
    
    def draw_crosshair(self, event, x, y, flags, param):
        self.temp_image = self.image.copy()  # 每次更新临时图像
        
        # 关键修改:无论是否有已框选的ROI,只要不在绘制中就显示十字星
        if not self.drawing:  # 只有当不处于拖拽框选状态时,才显示十字星
            cv2.line(self.temp_image, (x, 0), (x, self.temp_image.shape[0]), (0, 255, 0), 1)
            cv2.line(self.temp_image, (0, y), (self.temp_image.shape[1], y), (0, 255, 0), 1)
        
        # 鼠标按下:开始框选
        if event == cv2.EVENT_LBUTTONDOWN:
            self.drawing = True
            self.ix, self.iy = x, y
        
        # 鼠标移动:实时绘制矩形(此时不显示十字星,因为drawing=True)
        elif event == cv2.EVENT_MOUSEMOVE:
            if self.drawing:
                # 绘制当前正在拖拽的矩形
                cv2.rectangle(self.temp_image, (self.ix, self.iy), (x, y), (255, 0, 0), 2)
        
        # 鼠标左键释放:确认当前ROI(继续框选)
        elif event == cv2.EVENT_LBUTTONUP:
            self.drawing = False  # 结束绘制,恢复十字星显示
            # 计算规范的坐标(确保x1 < x2, y1 < y2)
            x1, y1 = min(self.ix, x), min(self.iy, y)
            x2, y2 = max(self.ix, x), max(self.iy, y)
            # 绘制最终矩形到原始图像
            cv2.rectangle(self.image, (x1, y1), (x2, y2), (255, 0, 0), 2)
            # 保存ROI坐标
            self.rois.append([[x1, y1], [x2, y2]])
        
        # 鼠标右键释放:完成框选
        elif event == cv2.EVENT_RBUTTONUP:
            self.drawing = False  # 结束绘制,恢复十字星显示
            x1, y1 = min(self.ix, x), min(self.iy, y)
            x2, y2 = max(self.ix, x), max(self.iy, y)
            cv2.rectangle(self.image, (x1, y1), (x2, y2), (255, 0, 0), 2)
            self.rois.append([[x1, y1], [x2, y2]])
            print(f"已添加ROI: {[x1, y1]} - {[x2, y2]} (共{len(self.rois)}个)")
            self.ok = True
    
    def save_json(self, image_path, output_json_path=None):
        if not self.rois:
            print("没有框选任何目标,不保存JSON")
            return
        
        if not output_json_path:
            img_dir, img_name = os.path.split(image_path)
            img_base = os.path.splitext(img_name)[0]
            output_json_path = os.path.join(img_dir, f"{img_base}.json")
        
        # 构建JSON结构
        json_data = {"version": "1.0.0", "flags": {}, "shapes": [], "imagePath": image_path, "imageHeight": self.image_o.shape[0], "imageWidth": self.image_o.shape[1]}
        
        for points in self.rois:
            (x1, y1), (x2, y2) = points
            bbox = [x1, y1, x2, y2]
            
            shape = {"label": self.label, "shape_type": "rectangle", "points": points,  # 仍然保留原来的 [[x1,y1],[x2,y2]]
                     "bbox": bbox, "description": "", "flags": {}}
            json_data["shapes"].append(shape)
        
        with open(output_json_path, 'w', encoding='utf-8') as f:
            json.dump(json_data, f, ensure_ascii=False, indent=4)
    
    def run(self, image_path, output_json=None):
        cv2.namedWindow('Draw ROI')
        cv2.setMouseCallback('Draw ROI', self.draw_crosshair)
        

        
        while True:
            cv2.imshow('Draw ROI', self.temp_image)
            key = cv2.waitKey(1) & 0xFF
            
            if key == 27:  # Esc键:取消操作
                print("已取消操作")
                self.rois = []
                break
            elif key == 13:  # Enter键:保存并退出
                break
            elif self.ok:  # 右键结束框选
                break
        
        cv2.destroyAllWindows()
        if output_json:
            self.save_json(image_path, output_json)
        return self.rois


def roi_video(video_path, output_json_path=None):

    json_path = os.path.splitext(video_path)[0] + ".json"
    cap = cv2.VideoCapture(video_path)
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    loaded_frames = []
    prompts = {}
    roi_box = True
    image_path = "roi_box.jpg"
    
    video_result = {}
    video_name = os.path.basename(video_path)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        h, w = frame.shape[:2]
        target_area = 1000 * 1500
        orig_area = h * w
        if orig_area > target_area:
            scale = np.sqrt(target_area / orig_area)
            new_w = int(w * scale)
            new_h = int(h * scale)
            new_w -= new_w % 2
            new_h -= new_h % 2
            frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
        
        if len(prompts) < 1:
            if roi_box:
                print(f"正在处理视频: {video_name}")
                roi_drawer = ROIDrawer(frame)
                selected_rois = roi_drawer.run(image_path, output_json=json_path)
                if len(selected_rois) > 0:
                    boxes = []
                    for p_i, points in enumerate(selected_rois):
                        (x1, y1), (x2, y2) = points
                        bbox = [x1, y1, x2, y2, p_i]
                        boxes.append(bbox)
                    
                    # 构建返回结果
                    video_result = {'video_name': video_name,  'boxes': boxes, 'frame_resolution': f"{new_w}x{new_h}" if orig_area > target_area else f"{w}x{h}"}
                    break  # 获取到boxes后退出循环
        else:
            break
    
    cap.release()
    return video_name, video_result

def roi_video_directory(directory_path, video_extensions=None):
    print("1. 左键拖拽框选目标(松开后继续框选下一个,十字星始终显示)")
    print("2. 右键拖拽框选最后一个目标(松开后结束框选)")
    print("3. 按Esc键取消操作,按Enter键保存并退出")
    
    video_files=glob.glob(directory_path+"/*.mp4")
    
    if not video_files:
        print(f"警告: 在目录 '{directory_path}' 中未找到视频文件")
        return {}
    
    print(f"找到 {len(video_files)} 个视频文件:")
    for i, video_file in enumerate(video_files, 1):
        print(f"{i}. {video_file}")
    
    results = {}
    
    for video_file in video_files:
        try:
            video_name, video_result = roi_video(str(video_file))
            if video_result:  # 只有当有结果时才添加
                results[video_name] = video_result
                print(results)
            else:
                results[video_name] = {'video_name': video_name, 'video_path': str(video_file), 'boxes': [],'frame_resolution': 'unknown'}
        except Exception as e:
            print(f"处理视频 '{video_file.name}' 时出错: {e}")
            results[video_file.name] = {'video_name': video_file.name, 'video_path': str(video_file), 'boxes': [],  'frame_resolution': 'error', 'error': str(e)}
    
    return results

def print_results_summary(results_dict):

    print("处理结果摘要:")
    print("=" * 60)
    
    total_videos = len(results_dict)
    
    print(f"总视频数: {total_videos}")
    print("-" * 60)
    
    # 详细输出每个视频的boxes
    print("\n详细boxes信息:")
    print("=" * 60)
    for video_name, result in results_dict.items():
        boxes = result.get('boxes', [])
        if boxes:
            print(f"\n{video_name}:")
            for i, box in enumerate(boxes, 1):
                print(f"  ROI{i - 1}: {box}")
        else:
            print(f"\n{video_name}: 无ROI")
    
    print("=" * 60)

def save_results_to_json(results_dict, output_path="video_rois_results.json"):

    # 简化结果以便保存(移除可能包含非JSON序列化的对象)
    simplified_results = {}
    for video_name, result in results_dict.items():
        simplified_results[video_name] = {'video_name': result.get('video_name', video_name), 'boxes': result.get('boxes', []),
           'frame_resolution': result.get('frame_resolution', 'unknown')}
        if 'error' in result:
            simplified_results[video_name]['error'] = result['error']
    
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(simplified_results, f, ensure_ascii=False, indent=4)
    
    print(f"\n结果已保存到: {output_path}")


if __name__ == '__main__':
    directory_path = r"B:\data\tiaosheng\20260112"
    results = roi_video_directory(directory_path)
    
    print_results_summary(results)
    
    save_results_to_json(results, "video_rois_results.json")
    
    # 也可以直接打印原始字典
    print("\n原始结果字典:")
    print(json.dumps(results, ensure_ascii=False, indent=2))

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐