1. 环境部署与安装

# 安装FiftyOne
pip install fiftyone
pip install torch torchvision  # 用于模型评估
pip install scikit-learn  # 用于数据分析

2. 数据质量分析功能实现

import fiftyone as fo
import fiftyone.brain as fob
import fiftyone.zoo as foz
from fiftyone import ViewField as F
import numpy as np

class DataQualityAnalyzer:
    def __init__(self, dataset_name):
        self.dataset = fo.Dataset(dataset_name)
        
    def load_sample_data(self):
        """加载示例数据集"""
        # 使用COCO数据集的子集作为示例
        dataset = foz.load_zoo_dataset(
            "coco-2017",
            split="validation",
            max_samples=1000,
            dataset_name="coco_sample"
        )
        self.dataset = dataset
        return dataset
    
    def analyze_label_distribution(self):
        """分析标签分布"""
        # 统计每个类别的样本数量
        label_counts = self.dataset.count_values("ground_truth.detections.label")
        
        # 可视化标签分布
        session = fo.launch_app(self.dataset)
        
        # 创建标签分布视图
        plot = fo.Plot(
            x="label",
            y="count",
            labels=label_counts,
            title="Label Distribution Analysis"
        )
        
        return label_counts
    
    def uncertainty_sampling(self, model_predictions_field="predictions"):
        """不确定性采样 - 找出模型最不确定的样本"""
        # 计算预测置信度的熵
        self.dataset.compute_metadata()
        
        # 添加不确定性分数
        for sample in self.dataset:
            if sample[model_predictions_field]:
                confidences = [det.confidence for det in sample[model_predictions_field].detections]
                if confidences:
                    # 计算熵作为不确定性度量
                    entropy = -sum([c * np.log(c + 1e-10) for c in confidences])
                    sample["uncertainty_score"] = entropy
                else:
                    sample["uncertainty_score"] = 0
            sample.save()
        
        # 获取最不确定的样本
        uncertain_view = self.dataset.sort_by("uncertainty_score", reverse=True).limit(100)
        
        return uncertain_view
    
    def detect_duplicate_images(self):
        """检测重复图像"""
        # 计算图像相似度
        fob.compute_similarity(
            self.dataset,
            brain_key="image_similarity",
            backend="sklearn",
            metric="euclidean"
        )
        
        # 找出潜在的重复图像
        duplicates = []
        for sample in self.dataset:
            similar_samples = self.dataset.sort_by_similarity(
                sample.id,
                brain_key="image_similarity",
                k=2  # 包括自己和最相似的一个
            )
            if len(similar_samples) > 1:
                similarity_score = similar.samples[1].similarity
                if similarity_score > 0.95:  # 相似度阈值
                    duplicates.append((sample.id, similar_samples[1].id))
        
        return duplicates

3. 异常数据检测(标注错误检测)

class AnomalyDetector:
    def __init__(self, dataset):
        self.dataset = dataset
    
    def detect_label_mistakes(self, predictions_field="predictions", 
                            ground_truth_field="ground_truth"):
        """检测潜在的标注错误"""
        
        # 1. 使用FiftyOne Brain的错误标签检测
        fob.compute_mistakenness(
            self.dataset,
            predictions_field,
            label_field=ground_truth_field,
            brain_key="mistakenness"
        )
        
        # 2. 找出可能标注错误的样本
        potential_mistakes = self.dataset.sort_by("mistakenness", reverse=True).limit(50)
        
        return potential_mistakes
    
    def detect_outliers(self):
        """检测异常样本"""
        # 使用图像embeddings检测异常值
        fob.compute_visualization(
            self.dataset,
            embeddings="embeddings",
            brain_key="image_embeddings",
            method="umap"
        )
        
        # 计算每个样本到聚类中心的距离
        embeddings = np.array([s.embeddings for s in self.dataset])
        from sklearn.ensemble import IsolationForest
        
        clf = IsolationForest(contamination=0.1)
        outliers = clf.fit_predict(embeddings)
        
        # 标记异常样本
        for sample, is_outlier in zip(self.dataset, outliers):
            sample["is_outlier"] = bool(is_outlier == -1)
            sample.save()
        
        outlier_view = self.dataset.match(F("is_outlier") == True)
        return outlier_view
    
    def analyze_annotation_quality(self):
        """分析标注质量"""
        quality_metrics = {
            "missing_annotations": 0,
            "small_objects": 0,
            "boundary_issues": 0,
            "class_confusion": {}
        }
        
        for sample in self.dataset:
            if not sample.ground_truth or not sample.ground_truth.detections:
                quality_metrics["missing_annotations"] += 1
                continue
                
            for detection in sample.ground_truth.detections:
                # 检查小目标
                bbox = detection.bounding_box
                area = bbox[2] * bbox[3]
                if area < 0.01:  # 小于图像面积的1%
                    quality_metrics["small_objects"] += 1
                
                # 检查边界问题
                if bbox[0] < 0 or bbox[1] < 0 or \
                   bbox[0] + bbox[2] > 1 or bbox[1] + bbox[3] > 1:
                    quality_metrics["boundary_issues"] += 1
        
        return quality_metrics

4. 模型评估方法

class ModelEvaluator:
    def __init__(self, dataset):
        self.dataset = dataset
        
    def evaluate_detection_model(self, predictions_field="predictions",
                               ground_truth_field="ground_truth"):
        """评估目标检测模型"""
        
        # 1. 计算mAP (Mean Average Precision)
        results = self.dataset.evaluate_detections(
            predictions_field,
            gt_field=ground_truth_field,
            eval_key="eval",
            compute_mAP=True
        )
        
        # 2. 获取详细指标
        metrics = {
            "mAP": results.mAP(),
            "precision": results.precision(),
            "recall": results.recall(),
            "f1_score": 2 * (results.precision() * results.recall()) / 
                       (results.precision() + results.recall() + 1e-10)
        }
        
        # 3. 生成混淆矩阵
        confusion_matrix = results.plot_confusion_matrix(
            classes=self.dataset.distinct(f"{ground_truth_field}.detections.label"),
            backend="matplotlib"
        )
        
        # 4. PR曲线
        pr_curve = results.plot_pr_curves(
            classes=self.dataset.distinct(f"{ground_truth_field}.detections.label"),
            backend="matplotlib"
        )
        
        return metrics, results
    
    def analyze_model_failures(self, eval_key="eval"):
        """分析模型失败案例"""
        
        # 1. 找出False Positives
        fp_view = self.dataset.filter_labels(
            "predictions",
            F(f"{eval_key}") == "fp"
        )
        
        # 2. 找出False Negatives  
        fn_view = self.dataset.filter_labels(
            "ground_truth",
            F(f"{eval_key}") == "fn"
        )
        
        # 3. 按类别分析错误
        class_performance = {}
        for class_name in self.dataset.distinct("ground_truth.detections.label"):
            class_view = self.dataset.filter_labels(
                "ground_truth",
                F("label") == class_name
            )
            
            class_results = class_view.evaluate_detections(
                "predictions",
                gt_field="ground_truth"
            )
            
            class_performance[class_name] = {
                "precision": class_results.precision(),
                "recall": class_results.recall(),
                "support": len(class_view)
            }
            
        return fp_view, fn_view, class_performance
    
    def cross_validation_analysis(self, n_folds=5):
        """交叉验证分析"""
        from sklearn.model_selection import KFold
        
        # 为数据集添加fold标签
        sample_ids = [s.id for s in self.dataset]
        kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
        
        fold_metrics = []
        
        for fold_idx, (train_idx, val_idx) in enumerate(kf.split(sample_ids)):
            # 创建训练和验证视图
            train_ids = [sample_ids[i] for i in train_idx]
            val_ids = [sample_ids[i] for i in val_idx]
            
            train_view = self.dataset.select(train_ids)
            val_view = self.dataset.select(val_ids)
            
            # 评估验证集
            results = val_view.evaluate_detections(
                "predictions",
                gt_field="ground_truth"
            )
            
            fold_metrics.append({
                "fold": fold_idx,
                "mAP": results.mAP(),
                "precision": results.precision(),
                "recall": results.recall()
            })
            
        return fold_metrics

5. 数据集管理功能

class DatasetManager:
    def __init__(self):
        self.datasets = {}
        
    def create_dataset(self, name, persistent=True):
        """创建新数据集"""
        dataset = fo.Dataset(name, persistent=persistent)
        self.datasets[name] = dataset
        return dataset
    
    def version_control(self, dataset, version_name):
        """数据集版本控制"""
        # 创建数据集快照
        snapshot = dataset.clone(name=f"{dataset.name}_v_{version_name}")
        
        # 记录版本信息
        snapshot.info = {
            "version": version_name,
            "created_at": datetime.now().isoformat(),
            "parent_dataset": dataset.name,
            "num_samples": len(dataset)
        }
        
        return snapshot
    
    def merge_datasets(self, dataset1, dataset2, merge_key="filepath"):
        """合并数据集"""
        merged = dataset1.clone()
        
        # 合并样本
        for sample in dataset2:
            existing = merged.match(F(merge_key) == sample[merge_key])
            if len(existing) == 0:
                merged.add_sample(sample)
            else:
                # 更新现有样本
                for s in existing:
                    s.merge(sample)
                    s.save()
                    
        return merged
    
    def filter_and_export(self, dataset, filter_criteria, export_path, 
                         export_format="coco"):
        """筛选并导出数据"""
        # 应用筛选条件
        filtered_view = dataset.match(filter_criteria)
        
        # 导出数据
        if export_format == "coco":
            filtered_view.export(
                export_dir=export_path,
                dataset_type=fo.types.COCODetectionDataset,
                label_field="ground_truth"
            )
        elif export_format == "yolo":
            filtered_view.export(
                export_dir=export_path,
                dataset_type=fo.types.YOLOv5Dataset,
                label_field="ground_truth"
            )
            
        return filtered_view
    
    def add_metadata(self, dataset):
        """添加元数据"""
        # 计算图像统计信息
        dataset.compute_metadata()
        
        # 添加自定义元数据
        for sample in dataset:
            metadata = sample.metadata
            sample["image_quality_score"] = self._calculate_image_quality(sample)
            sample["annotation_complexity"] = len(sample.ground_truth.detections) if sample.ground_truth else 0
            sample.save()
            
    def _calculate_image_quality(self, sample):
        """计算图像质量分数"""
        # 简单的图像质量评估
        metadata = sample.metadata
        
        # 基于分辨率、亮度等因素
        width, height = metadata.width, metadata.height
        resolution_score = min(1.0, (width * height) / (1920 * 1080))
        
        return resolution_score

6. 完整的使用示例

def main():
    # 1. 初始化数据质量分析器
    analyzer = DataQualityAnalyzer("my_dataset")
    
    # 2. 加载数据
    dataset = analyzer.load_sample_data()
    
    # 3. 数据质量分析
    print("=== 数据质量分析 ===")
    
    # 标签分布分析
    label_dist = analyzer.analyze_label_distribution()
    print(f"标签分布: {label_dist}")
    
    # 不确定性采样
    uncertain_samples = analyzer.uncertainty_sampling()
    print(f"发现 {len(uncertain_samples)} 个不确定样本")
    
    # 4. 异常检测
    print("\n=== 异常数据检测 ===")
    detector = AnomalyDetector(dataset)
    
    # 检测标注错误
    mistakes = detector.detect_label_mistakes()
    print(f"潜在标注错误: {len(mistakes)} 个")
    
    # 检测异常值
    outliers = detector.detect_outliers()
    print(f"异常样本: {len(outliers)} 个")
    
    # 标注质量分析
    quality_metrics = detector.analyze_annotation_quality()
    print(f"标注质量指标: {quality_metrics}")
    
    # 5. 模型评估
    print("\n=== 模型评估 ===")
    evaluator = ModelEvaluator(dataset)
    
    # 基础评估
    metrics, results = evaluator.evaluate_detection_model()
    print(f"模型指标: {metrics}")
    
    # 失败案例分析
    fp_view, fn_view, class_perf = evaluator.analyze_model_failures()
    print(f"False Positives: {len(fp_view)}")
    print(f"False Negatives: {len(fn_view)}")
    print(f"类别性能: {class_perf}")
    
    # 6. 数据集管理
    print("\n=== 数据集管理 ===")
    manager = DatasetManager()
    
    # 版本控制
    snapshot = manager.version_control(dataset, "v1.0")
    print(f"创建数据集快照: {snapshot.name}")
    
    # 筛选高质量数据
    high_quality_view = dataset.match(F("image_quality_score") > 0.8)
    manager.filter_and_export(
        dataset,
        F("image_quality_score") > 0.8,
        "./high_quality_export",
        "coco"
    )
    
    # 7. 启动可视化界面
    session = fo.launch_app(dataset)
    
    # 在界面中可以:
    # - 查看数据分布
    # - 交互式筛选样本
    # - 比较预测和真实标签
    # - 查看评估指标
    # - 标注和修正数据

if __name__ == "__main__":
    main()

7. 高级功能扩展

class AdvancedAnalytics:
    """高级分析功能"""
    
    def active_learning_selection(self, dataset, model, n_samples=100):
        """主动学习样本选择"""
        # 1. 熵采样
        entropy_samples = self._entropy_sampling(dataset, model, n_samples//3)
        
        # 2. 边缘采样
        margin_samples = self._margin_sampling(dataset, model, n_samples//3)
        
        # 3. 多样性采样
        diverse_samples = self._diversity_sampling(dataset, n_samples//3)
        
        # 合并选择的样本
        selected = entropy_samples + margin_samples + diverse_samples
        return dataset.select(selected)
    
    def data_augmentation_analysis(self, dataset):
        """数据增强需求分析"""
        augmentation_needs = {
            "low_light_images": 0,
            "small_objects_dominant": 0,
            "limited_viewpoints": 0,
            "class_imbalance": {}
        }
        
        # 分析每个样本
        for sample in dataset:
            # 检查亮度
            if sample.metadata.brightness < 0.3:
                augmentation_needs["low_light_images"] += 1
                
            # 检查目标大小
            if sample.ground_truth:
                small_obj_count = sum(1 for det in sample.ground_truth.detections 
                                    if det.bounding_box[2] * det.bounding_box[3] < 0.05)
                if small_obj_count > len(sample.ground_truth.detections) * 0.5:
                    augmentation_needs["small_objects_dominant"] += 1
                    
        return augmentation_needs
    
    def temporal_consistency_check(self, video_dataset):
        """视频数据时序一致性检查"""
        inconsistencies = []
        
        for video in video_dataset:
            frames = video.frames
            
            for i in range(1, len(frames)):
                prev_frame = frames[i-1]
                curr_frame = frames[i]
                
                # 检查标签突变
                if self._check_label_discontinuity(prev_frame, curr_frame):
                    inconsistencies.append({
                        "video": video.id,
                        "frame": i,
                        "type": "label_discontinuity"
                    })
                    
        return inconsistencies
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐