项目:计算机视觉数据质量分析与模型评估系统(基于FiftyOne)
【代码】项目:计算机视觉数据质量分析与模型评估系统(基于FiftyOne)
·
1. 环境部署与安装
# 安装FiftyOne
pip install fiftyone
pip install torch torchvision # 用于模型评估
pip install scikit-learn # 用于数据分析
2. 数据质量分析功能实现
import fiftyone as fo
import fiftyone.brain as fob
import fiftyone.zoo as foz
from fiftyone import ViewField as F
import numpy as np
class DataQualityAnalyzer:
def __init__(self, dataset_name):
self.dataset = fo.Dataset(dataset_name)
def load_sample_data(self):
"""加载示例数据集"""
# 使用COCO数据集的子集作为示例
dataset = foz.load_zoo_dataset(
"coco-2017",
split="validation",
max_samples=1000,
dataset_name="coco_sample"
)
self.dataset = dataset
return dataset
def analyze_label_distribution(self):
"""分析标签分布"""
# 统计每个类别的样本数量
label_counts = self.dataset.count_values("ground_truth.detections.label")
# 可视化标签分布
session = fo.launch_app(self.dataset)
# 创建标签分布视图
plot = fo.Plot(
x="label",
y="count",
labels=label_counts,
title="Label Distribution Analysis"
)
return label_counts
def uncertainty_sampling(self, model_predictions_field="predictions"):
"""不确定性采样 - 找出模型最不确定的样本"""
# 计算预测置信度的熵
self.dataset.compute_metadata()
# 添加不确定性分数
for sample in self.dataset:
if sample[model_predictions_field]:
confidences = [det.confidence for det in sample[model_predictions_field].detections]
if confidences:
# 计算熵作为不确定性度量
entropy = -sum([c * np.log(c + 1e-10) for c in confidences])
sample["uncertainty_score"] = entropy
else:
sample["uncertainty_score"] = 0
sample.save()
# 获取最不确定的样本
uncertain_view = self.dataset.sort_by("uncertainty_score", reverse=True).limit(100)
return uncertain_view
def detect_duplicate_images(self):
"""检测重复图像"""
# 计算图像相似度
fob.compute_similarity(
self.dataset,
brain_key="image_similarity",
backend="sklearn",
metric="euclidean"
)
# 找出潜在的重复图像
duplicates = []
for sample in self.dataset:
similar_samples = self.dataset.sort_by_similarity(
sample.id,
brain_key="image_similarity",
k=2 # 包括自己和最相似的一个
)
if len(similar_samples) > 1:
similarity_score = similar.samples[1].similarity
if similarity_score > 0.95: # 相似度阈值
duplicates.append((sample.id, similar_samples[1].id))
return duplicates
3. 异常数据检测(标注错误检测)
class AnomalyDetector:
def __init__(self, dataset):
self.dataset = dataset
def detect_label_mistakes(self, predictions_field="predictions",
ground_truth_field="ground_truth"):
"""检测潜在的标注错误"""
# 1. 使用FiftyOne Brain的错误标签检测
fob.compute_mistakenness(
self.dataset,
predictions_field,
label_field=ground_truth_field,
brain_key="mistakenness"
)
# 2. 找出可能标注错误的样本
potential_mistakes = self.dataset.sort_by("mistakenness", reverse=True).limit(50)
return potential_mistakes
def detect_outliers(self):
"""检测异常样本"""
# 使用图像embeddings检测异常值
fob.compute_visualization(
self.dataset,
embeddings="embeddings",
brain_key="image_embeddings",
method="umap"
)
# 计算每个样本到聚类中心的距离
embeddings = np.array([s.embeddings for s in self.dataset])
from sklearn.ensemble import IsolationForest
clf = IsolationForest(contamination=0.1)
outliers = clf.fit_predict(embeddings)
# 标记异常样本
for sample, is_outlier in zip(self.dataset, outliers):
sample["is_outlier"] = bool(is_outlier == -1)
sample.save()
outlier_view = self.dataset.match(F("is_outlier") == True)
return outlier_view
def analyze_annotation_quality(self):
"""分析标注质量"""
quality_metrics = {
"missing_annotations": 0,
"small_objects": 0,
"boundary_issues": 0,
"class_confusion": {}
}
for sample in self.dataset:
if not sample.ground_truth or not sample.ground_truth.detections:
quality_metrics["missing_annotations"] += 1
continue
for detection in sample.ground_truth.detections:
# 检查小目标
bbox = detection.bounding_box
area = bbox[2] * bbox[3]
if area < 0.01: # 小于图像面积的1%
quality_metrics["small_objects"] += 1
# 检查边界问题
if bbox[0] < 0 or bbox[1] < 0 or \
bbox[0] + bbox[2] > 1 or bbox[1] + bbox[3] > 1:
quality_metrics["boundary_issues"] += 1
return quality_metrics
4. 模型评估方法
class ModelEvaluator:
def __init__(self, dataset):
self.dataset = dataset
def evaluate_detection_model(self, predictions_field="predictions",
ground_truth_field="ground_truth"):
"""评估目标检测模型"""
# 1. 计算mAP (Mean Average Precision)
results = self.dataset.evaluate_detections(
predictions_field,
gt_field=ground_truth_field,
eval_key="eval",
compute_mAP=True
)
# 2. 获取详细指标
metrics = {
"mAP": results.mAP(),
"precision": results.precision(),
"recall": results.recall(),
"f1_score": 2 * (results.precision() * results.recall()) /
(results.precision() + results.recall() + 1e-10)
}
# 3. 生成混淆矩阵
confusion_matrix = results.plot_confusion_matrix(
classes=self.dataset.distinct(f"{ground_truth_field}.detections.label"),
backend="matplotlib"
)
# 4. PR曲线
pr_curve = results.plot_pr_curves(
classes=self.dataset.distinct(f"{ground_truth_field}.detections.label"),
backend="matplotlib"
)
return metrics, results
def analyze_model_failures(self, eval_key="eval"):
"""分析模型失败案例"""
# 1. 找出False Positives
fp_view = self.dataset.filter_labels(
"predictions",
F(f"{eval_key}") == "fp"
)
# 2. 找出False Negatives
fn_view = self.dataset.filter_labels(
"ground_truth",
F(f"{eval_key}") == "fn"
)
# 3. 按类别分析错误
class_performance = {}
for class_name in self.dataset.distinct("ground_truth.detections.label"):
class_view = self.dataset.filter_labels(
"ground_truth",
F("label") == class_name
)
class_results = class_view.evaluate_detections(
"predictions",
gt_field="ground_truth"
)
class_performance[class_name] = {
"precision": class_results.precision(),
"recall": class_results.recall(),
"support": len(class_view)
}
return fp_view, fn_view, class_performance
def cross_validation_analysis(self, n_folds=5):
"""交叉验证分析"""
from sklearn.model_selection import KFold
# 为数据集添加fold标签
sample_ids = [s.id for s in self.dataset]
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
fold_metrics = []
for fold_idx, (train_idx, val_idx) in enumerate(kf.split(sample_ids)):
# 创建训练和验证视图
train_ids = [sample_ids[i] for i in train_idx]
val_ids = [sample_ids[i] for i in val_idx]
train_view = self.dataset.select(train_ids)
val_view = self.dataset.select(val_ids)
# 评估验证集
results = val_view.evaluate_detections(
"predictions",
gt_field="ground_truth"
)
fold_metrics.append({
"fold": fold_idx,
"mAP": results.mAP(),
"precision": results.precision(),
"recall": results.recall()
})
return fold_metrics
5. 数据集管理功能
class DatasetManager:
def __init__(self):
self.datasets = {}
def create_dataset(self, name, persistent=True):
"""创建新数据集"""
dataset = fo.Dataset(name, persistent=persistent)
self.datasets[name] = dataset
return dataset
def version_control(self, dataset, version_name):
"""数据集版本控制"""
# 创建数据集快照
snapshot = dataset.clone(name=f"{dataset.name}_v_{version_name}")
# 记录版本信息
snapshot.info = {
"version": version_name,
"created_at": datetime.now().isoformat(),
"parent_dataset": dataset.name,
"num_samples": len(dataset)
}
return snapshot
def merge_datasets(self, dataset1, dataset2, merge_key="filepath"):
"""合并数据集"""
merged = dataset1.clone()
# 合并样本
for sample in dataset2:
existing = merged.match(F(merge_key) == sample[merge_key])
if len(existing) == 0:
merged.add_sample(sample)
else:
# 更新现有样本
for s in existing:
s.merge(sample)
s.save()
return merged
def filter_and_export(self, dataset, filter_criteria, export_path,
export_format="coco"):
"""筛选并导出数据"""
# 应用筛选条件
filtered_view = dataset.match(filter_criteria)
# 导出数据
if export_format == "coco":
filtered_view.export(
export_dir=export_path,
dataset_type=fo.types.COCODetectionDataset,
label_field="ground_truth"
)
elif export_format == "yolo":
filtered_view.export(
export_dir=export_path,
dataset_type=fo.types.YOLOv5Dataset,
label_field="ground_truth"
)
return filtered_view
def add_metadata(self, dataset):
"""添加元数据"""
# 计算图像统计信息
dataset.compute_metadata()
# 添加自定义元数据
for sample in dataset:
metadata = sample.metadata
sample["image_quality_score"] = self._calculate_image_quality(sample)
sample["annotation_complexity"] = len(sample.ground_truth.detections) if sample.ground_truth else 0
sample.save()
def _calculate_image_quality(self, sample):
"""计算图像质量分数"""
# 简单的图像质量评估
metadata = sample.metadata
# 基于分辨率、亮度等因素
width, height = metadata.width, metadata.height
resolution_score = min(1.0, (width * height) / (1920 * 1080))
return resolution_score
6. 完整的使用示例
def main():
# 1. 初始化数据质量分析器
analyzer = DataQualityAnalyzer("my_dataset")
# 2. 加载数据
dataset = analyzer.load_sample_data()
# 3. 数据质量分析
print("=== 数据质量分析 ===")
# 标签分布分析
label_dist = analyzer.analyze_label_distribution()
print(f"标签分布: {label_dist}")
# 不确定性采样
uncertain_samples = analyzer.uncertainty_sampling()
print(f"发现 {len(uncertain_samples)} 个不确定样本")
# 4. 异常检测
print("\n=== 异常数据检测 ===")
detector = AnomalyDetector(dataset)
# 检测标注错误
mistakes = detector.detect_label_mistakes()
print(f"潜在标注错误: {len(mistakes)} 个")
# 检测异常值
outliers = detector.detect_outliers()
print(f"异常样本: {len(outliers)} 个")
# 标注质量分析
quality_metrics = detector.analyze_annotation_quality()
print(f"标注质量指标: {quality_metrics}")
# 5. 模型评估
print("\n=== 模型评估 ===")
evaluator = ModelEvaluator(dataset)
# 基础评估
metrics, results = evaluator.evaluate_detection_model()
print(f"模型指标: {metrics}")
# 失败案例分析
fp_view, fn_view, class_perf = evaluator.analyze_model_failures()
print(f"False Positives: {len(fp_view)}")
print(f"False Negatives: {len(fn_view)}")
print(f"类别性能: {class_perf}")
# 6. 数据集管理
print("\n=== 数据集管理 ===")
manager = DatasetManager()
# 版本控制
snapshot = manager.version_control(dataset, "v1.0")
print(f"创建数据集快照: {snapshot.name}")
# 筛选高质量数据
high_quality_view = dataset.match(F("image_quality_score") > 0.8)
manager.filter_and_export(
dataset,
F("image_quality_score") > 0.8,
"./high_quality_export",
"coco"
)
# 7. 启动可视化界面
session = fo.launch_app(dataset)
# 在界面中可以:
# - 查看数据分布
# - 交互式筛选样本
# - 比较预测和真实标签
# - 查看评估指标
# - 标注和修正数据
if __name__ == "__main__":
main()
7. 高级功能扩展
class AdvancedAnalytics:
"""高级分析功能"""
def active_learning_selection(self, dataset, model, n_samples=100):
"""主动学习样本选择"""
# 1. 熵采样
entropy_samples = self._entropy_sampling(dataset, model, n_samples//3)
# 2. 边缘采样
margin_samples = self._margin_sampling(dataset, model, n_samples//3)
# 3. 多样性采样
diverse_samples = self._diversity_sampling(dataset, n_samples//3)
# 合并选择的样本
selected = entropy_samples + margin_samples + diverse_samples
return dataset.select(selected)
def data_augmentation_analysis(self, dataset):
"""数据增强需求分析"""
augmentation_needs = {
"low_light_images": 0,
"small_objects_dominant": 0,
"limited_viewpoints": 0,
"class_imbalance": {}
}
# 分析每个样本
for sample in dataset:
# 检查亮度
if sample.metadata.brightness < 0.3:
augmentation_needs["low_light_images"] += 1
# 检查目标大小
if sample.ground_truth:
small_obj_count = sum(1 for det in sample.ground_truth.detections
if det.bounding_box[2] * det.bounding_box[3] < 0.05)
if small_obj_count > len(sample.ground_truth.detections) * 0.5:
augmentation_needs["small_objects_dominant"] += 1
return augmentation_needs
def temporal_consistency_check(self, video_dataset):
"""视频数据时序一致性检查"""
inconsistencies = []
for video in video_dataset:
frames = video.frames
for i in range(1, len(frames)):
prev_frame = frames[i-1]
curr_frame = frames[i]
# 检查标签突变
if self._check_label_discontinuity(prev_frame, curr_frame):
inconsistencies.append({
"video": video.id,
"frame": i,
"type": "label_discontinuity"
})
return inconsistencies
更多推荐
所有评论(0)