YOLO12+FastAPI:打造高性能目标检测服务(附完整代码)
通过本文的详细介绍,你已经掌握了使用YOLO12和FastAPI构建高性能目标检测服务的完整流程。极致的性能表现:YOLO12 nano版在RTX 4090上可达131 FPS,满足实时性要求灵活的五档模型:从370万参数的nano版到数千万参数的xlarge版,适应不同场景完整的服务生态:REST API + Web界面,满足开发和测试双重需求生产级可靠性:完善的错误处理、日志记录、性能监控便捷
YOLO12+FastAPI:打造高性能目标检测服务(附完整代码)
目标检测是计算机视觉领域的核心任务之一,从安防监控到自动驾驶,从工业质检到智能相册,它的身影无处不在。然而,将前沿的检测模型转化为稳定、高效、易用的在线服务,却常常让开发者头疼——环境配置复杂、推理速度慢、接口设计繁琐、部署维护困难。
今天,我将带你用YOLO12和FastAPI,从零开始构建一个高性能的目标检测服务。这个方案不仅提供了实时推理能力(nano版可达131 FPS),还配备了完整的REST API和可视化界面,让你在10分钟内就能拥有一个生产级的检测服务。
1. 为什么选择YOLO12+FastAPI?
在开始动手之前,我们先聊聊为什么这个组合值得你投入时间。
YOLO12是Ultralytics在2025年推出的最新版本,作为YOLOv11的继任者,它在保持YOLO系列实时性优势的同时,通过引入注意力机制优化了特征提取网络,显著提升了检测精度。更重要的是,它提供了从nano到xlarge的五种规格,让你可以根据硬件条件和精度需求灵活选择。
FastAPI则是Python领域最受欢迎的Web框架之一,它基于标准的Python类型提示,自动生成API文档,支持异步处理,性能接近NodeJS和Go。对于需要高并发处理的AI服务来说,这是理想的选择。
把它们组合起来,你会得到:
- 极速部署:预置的Docker镜像,一键启动
- 双模服务:REST API供程序调用,Web界面供人工测试
- 灵活选型:五档模型适应不同硬件和精度需求
- 生产就绪:完善的错误处理、日志记录、性能监控
下面这张图展示了我们即将构建的服务架构: 
2. 环境准备与快速部署
2.1 系统要求
在开始之前,确保你的环境满足以下要求:
- 操作系统:Ubuntu 20.04/22.04或CentOS 8+(推荐Ubuntu)
- Python版本:Python 3.8-3.11(推荐3.11)
- CUDA版本:CUDA 11.8或12.4(如有GPU)
- 内存:至少8GB RAM
- 存储:至少10GB可用空间
- GPU(可选):NVIDIA GPU(推荐RTX 3060以上)可大幅加速推理
如果你没有合适的GPU环境,也不用担心。我们可以使用CSDN星图平台的预置镜像,它已经配置好了所有依赖,开箱即用。
2.2 一键部署方案
对于大多数开发者,我推荐使用预置镜像快速部署。这是最省心的方法:
- 访问镜像市场:打开CSDN星图镜像广场
- 搜索镜像:在搜索框中输入
ins-yolo12-independent-v1 - 部署实例:点击"部署实例"按钮,选择适合的硬件配置
- 等待启动:大约1-2分钟,实例状态会变为"已启动"
部署完成后,你会看到两个访问入口:
- API服务:端口8000,用于程序调用
- Web界面:端口7860,用于人工测试和调试
2.3 本地开发环境搭建
如果你想在本地开发调试,可以按照以下步骤手动搭建环境:
# 1. 克隆项目代码
git clone https://github.com/your-repo/yolo12-fastapi-service.git
cd yolo12-fastapi-service
# 2. 创建并激活虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 3. 安装依赖
pip install -r requirements.txt
# 4. 下载模型权重(以nano版为例)
mkdir -p models/yolo12
wget https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov12n.pt -O models/yolo12/yolov12n.pt
# 5. 启动服务
python main.py
requirements.txt文件内容如下:
fastapi==0.104.1
uvicorn[standard]==0.24.0
ultralytics==8.0.0
opencv-python==4.8.1
pillow==10.1.0
numpy==1.24.3
python-multipart==0.0.6
gradio==4.0.0
3. 核心代码解析
现在让我们深入看看这个服务的核心代码。我将分模块讲解,确保你能理解每一部分的作用。
3.1 模型加载与初始化
首先,我们来看模型加载模块。这里的关键是正确初始化YOLO12模型,并处理不同的硬件环境。
# model_loader.py
import torch
from ultralytics import YOLO
import os
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class YOLO12ModelLoader:
"""YOLO12模型加载器,支持多规格模型和硬件自适应"""
def __init__(self, model_size: str = "n", device: Optional[str] = None):
"""
初始化模型加载器
Args:
model_size: 模型规格,可选 'n' (nano), 's' (small), 'm' (medium),
'l' (large), 'x' (xlarge)
device: 指定设备,如 'cuda:0', 'cpu',为None时自动选择
"""
self.model_size = model_size.lower()
self.device = self._auto_select_device(device)
self.model = None
self.model_path = f"models/yolo12/yolov12{self.model_size}.pt"
# 支持的模型规格映射
self.supported_sizes = {
'n': {'name': 'nano', 'params': 3.7, 'size_mb': 5.6},
's': {'name': 'small', 'params': 11.2, 'size_mb': 19},
'm': {'name': 'medium', 'params': 25.9, 'size_mb': 40},
'l': {'name': 'large', 'params': 43.7, 'size_mb': 53},
'x': {'name': 'xlarge', 'params': 68.2, 'size_mb': 119}
}
if self.model_size not in self.supported_sizes:
raise ValueError(f"不支持的模型规格: {model_size},请选择 {list(self.supported_sizes.keys())}")
def _auto_select_device(self, device: Optional[str]) -> str:
"""自动选择最佳设备"""
if device:
return device
if torch.cuda.is_available():
# 检查CUDA版本和显存
cuda_version = torch.version.cuda
gpu_name = torch.cuda.get_device_name(0)
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 # GB
logger.info(f"检测到GPU: {gpu_name}, 显存: {gpu_memory:.1f}GB, CUDA: {cuda_version}")
# 根据模型规格和显存推荐设备
if self.model_size == 'x' and gpu_memory < 8:
logger.warning("xlarge模型需要至少8GB显存,建议使用small或medium版本")
return "cpu"
return "cuda:0"
else:
logger.info("未检测到GPU,使用CPU进行推理")
return "cpu"
def load_model(self):
"""加载YOLO12模型"""
if not os.path.exists(self.model_path):
raise FileNotFoundError(f"模型文件不存在: {self.model_path}")
logger.info(f"正在加载模型: yolov12{self.model_size}.pt")
logger.info(f"模型规格: {self.supported_sizes[self.model_size]['name']}")
logger.info(f"参数量: {self.supported_sizes[self.model_size]['params']}M")
logger.info(f"文件大小: {self.supported_sizes[self.model_size]['size_mb']}MB")
logger.info(f"使用设备: {self.device}")
try:
# 加载模型
self.model = YOLO(self.model_path)
# 预热模型(首次推理较慢)
logger.info("预热模型中...")
dummy_input = torch.randn(1, 3, 640, 640).to(self.device)
with torch.no_grad():
_ = self.model(dummy_input)
logger.info("模型加载完成")
return self.model
except Exception as e:
logger.error(f"模型加载失败: {str(e)}")
raise
def get_model_info(self) -> dict:
"""获取模型信息"""
if self.model is None:
return {"status": "模型未加载"}
info = {
"model_size": self.model_size,
"model_name": f"yolov12{self.model_size}",
"device": self.device,
"specs": self.supported_sizes[self.model_size],
"status": "已加载"
}
if "cuda" in self.device:
info["gpu_memory"] = f"{torch.cuda.memory_allocated()/1e9:.2f}GB / {torch.cuda.get_device_properties(0).total_memory/1e9:.2f}GB"
return info
这个加载器的设计有几个关键点:
- 自动设备选择:优先使用GPU,但会根据显存大小给出合理建议
- 模型预热:首次推理前进行预热,避免线上请求的首次延迟
- 完善的信息反馈:提供详细的模型信息和状态
3.2 FastAPI服务端实现
接下来是FastAPI服务端的核心代码。这里我们设计了两个主要接口:单图检测和批量检测。
# main.py
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import cv2
import numpy as np
from PIL import Image
import io
import time
import logging
from typing import List, Optional
import json
from model_loader import YOLO12ModelLoader
from utils.image_processor import ImageProcessor
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 创建FastAPI应用
app = FastAPI(
title="YOLO12目标检测API",
description="基于YOLO12和FastAPI的高性能目标检测服务",
version="1.0.0"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应限制来源
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 全局变量
model_loader = None
image_processor = None
@app.on_event("startup")
async def startup_event():
"""服务启动时初始化模型"""
global model_loader, image_processor
try:
# 从环境变量获取模型规格,默认为nano
model_size = os.getenv("YOLO_MODEL", "n").lower().replace("yolov12", "").replace(".pt", "")
# 初始化模型加载器
model_loader = YOLO12ModelLoader(model_size=model_size)
model = model_loader.load_model()
# 初始化图像处理器
image_processor = ImageProcessor(model)
logger.info("服务启动完成,模型已加载")
except Exception as e:
logger.error(f"服务启动失败: {str(e)}")
raise
@app.get("/")
async def root():
"""根路径,返回服务信息"""
return {
"service": "YOLO12目标检测API",
"version": "1.0.0",
"status": "运行中",
"endpoints": {
"单图检测": "POST /predict",
"批量检测": "POST /batch_predict",
"服务状态": "GET /status",
"API文档": "GET /docs"
}
}
@app.get("/status")
async def get_status():
"""获取服务状态和模型信息"""
if model_loader is None:
raise HTTPException(status_code=503, detail="服务未就绪")
status = {
"service_status": "healthy",
"timestamp": time.time(),
"model_info": model_loader.get_model_info(),
"performance": {
"total_requests": image_processor.total_requests if image_processor else 0,
"avg_inference_time": image_processor.avg_inference_time if image_processor else 0
}
}
return JSONResponse(content=status)
@app.post("/predict")
async def predict(
file: UploadFile = File(...),
confidence: float = 0.25,
iou_threshold: float = 0.45,
return_image: bool = False
):
"""
单张图片目标检测
Args:
file: 上传的图片文件
confidence: 置信度阈值 (0.1-1.0)
iou_threshold: IOU阈值 (0.1-1.0)
return_image: 是否返回标注后的图片
Returns:
检测结果JSON,包含边界框、置信度、类别等信息
"""
# 参数验证
if confidence < 0.1 or confidence > 1.0:
raise HTTPException(status_code=400, detail="置信度阈值应在0.1-1.0之间")
if iou_threshold < 0.1 or iou_threshold > 1.0:
raise HTTPException(status_code=400, detail="IOU阈值应在0.1-1.0之间")
try:
# 读取图片
contents = await file.read()
image = Image.open(io.BytesIO(contents))
# 记录开始时间
start_time = time.time()
# 执行检测
result = image_processor.detect(
image=image,
confidence=confidence,
iou=iou_threshold,
return_image=return_image
)
# 计算推理时间
inference_time = (time.time() - start_time) * 1000 # 毫秒
# 构建响应
response = {
"success": True,
"filename": file.filename,
"inference_time_ms": round(inference_time, 2),
"detections_count": len(result["detections"]),
"detections": result["detections"]
}
# 如果需要返回图片,添加base64编码的图片
if return_image and "annotated_image" in result:
response["annotated_image"] = result["annotated_image"]
logger.info(f"检测完成: {file.filename}, 耗时: {inference_time:.2f}ms, 检测到: {len(result['detections'])}个目标")
return JSONResponse(content=response)
except Exception as e:
logger.error(f"检测失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"检测失败: {str(e)}")
@app.post("/batch_predict")
async def batch_predict(
files: List[UploadFile] = File(...),
confidence: float = 0.25,
iou_threshold: float = 0.45,
max_batch_size: int = 10
):
"""
批量图片目标检测
Args:
files: 上传的图片文件列表
confidence: 置信度阈值
iou_threshold: IOU阈值
max_batch_size: 最大批量大小
Returns:
批量检测结果列表
"""
if len(files) > max_batch_size:
raise HTTPException(
status_code=400,
detail=f"批量大小不能超过{max_batch_size},当前: {len(files)}"
)
results = []
total_start_time = time.time()
for file in files:
try:
# 调用单图检测逻辑
result = await predict(file, confidence, iou_threshold, False)
results.append(json.loads(result.body))
except Exception as e:
results.append({
"success": False,
"filename": file.filename,
"error": str(e)
})
total_time = (time.time() - total_start_time) * 1000
return JSONResponse(content={
"success": True,
"total_files": len(files),
"total_time_ms": round(total_time, 2),
"avg_time_per_image_ms": round(total_time / len(files), 2),
"results": results
})
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True, # 开发模式启用热重载
log_level="info"
)
这个服务端代码有几个亮点:
- 完整的错误处理:对输入参数进行验证,捕获所有异常
- 性能监控:记录推理时间、请求统计等信息
- 批量处理支持:支持一次处理多张图片
- 灵活的返回选项:可以选择是否返回标注后的图片
3.3 图像处理与结果解析
图像处理模块负责将原始图片转换为模型输入,并解析模型的输出结果。
# utils/image_processor.py
import cv2
import numpy as np
from PIL import Image
import base64
import io
import time
from typing import List, Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class ImageProcessor:
"""图像处理器,负责图片预处理和结果解析"""
def __init__(self, model):
self.model = model
self.total_requests = 0
self.total_inference_time = 0
self.avg_inference_time = 0
# COCO数据集80个类别名称
self.coco_classes = [
'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck',
'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench',
'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra',
'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove',
'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup',
'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange',
'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse',
'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink',
'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
'toothbrush'
]
def _pil_to_cv2(self, pil_image: Image.Image) -> np.ndarray:
"""PIL图像转OpenCV格式"""
return cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
def _cv2_to_pil(self, cv2_image: np.ndarray) -> Image.Image:
"""OpenCV图像转PIL格式"""
return Image.fromarray(cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB))
def _encode_image_to_base64(self, image: Image.Image) -> str:
"""将PIL图像编码为base64字符串"""
buffered = io.BytesIO()
image.save(buffered, format="JPEG", quality=95)
return base64.b64encode(buffered.getvalue()).decode('utf-8')
def _draw_detections(self, image: np.ndarray, detections: List[Dict]) -> np.ndarray:
"""在图像上绘制检测框和标签"""
result_image = image.copy()
# 为不同类别生成不同颜色
colors = {}
for i, class_name in enumerate(self.coco_classes):
hue = int(180 * i / len(self.coco_classes))
colors[class_name] = cv2.cvtColor(
np.uint8([[[hue, 255, 255]]]), cv2.COLOR_HSV2BGR
)[0][0].tolist()
for detection in detections:
# 解析边界框坐标
x1, y1, x2, y2 = map(int, detection["bbox"])
confidence = detection["confidence"]
class_name = detection["class"]
class_id = detection["class_id"]
# 获取颜色
color = colors.get(class_name, [0, 255, 0]) # 默认绿色
# 绘制边界框
cv2.rectangle(result_image, (x1, y1), (x2, y2), color, 2)
# 绘制标签背景
label = f"{class_name} {confidence:.2f}"
(text_width, text_height), baseline = cv2.getTextSize(
label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1
)
cv2.rectangle(
result_image,
(x1, y1 - text_height - 10),
(x1 + text_width, y1),
color,
-1
)
# 绘制标签文本
cv2.putText(
result_image,
label,
(x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
1,
cv2.LINE_AA
)
return result_image
def detect(
self,
image: Image.Image,
confidence: float = 0.25,
iou: float = 0.45,
return_image: bool = False
) -> Dict[str, Any]:
"""
执行目标检测
Args:
image: PIL图像对象
confidence: 置信度阈值
iou: IOU阈值
return_image: 是否返回标注后的图像
Returns:
检测结果字典
"""
start_time = time.time()
try:
# 转换图像格式
cv2_image = self._pil_to_cv2(image)
original_height, original_width = cv2_image.shape[:2]
# 执行推理
results = self.model(
cv2_image,
conf=confidence,
iou=iou,
verbose=False # 关闭详细日志
)
# 解析结果
detections = []
for result in results:
boxes = result.boxes
if boxes is not None:
for box in boxes:
# 获取边界框坐标(原始尺寸)
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
# 转换为整数
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
# 获取置信度和类别
conf = float(box.conf[0].cpu().numpy())
cls_id = int(box.cls[0].cpu().numpy())
cls_name = self.coco_classes[cls_id] if cls_id < len(self.coco_classes) else f"class_{cls_id}"
detections.append({
"bbox": [x1, y1, x2, y2],
"confidence": round(conf, 4),
"class": cls_name,
"class_id": cls_id,
"area": (x2 - x1) * (y2 - y1)
})
# 按置信度排序
detections.sort(key=lambda x: x["confidence"], reverse=True)
# 计算推理时间
inference_time = (time.time() - start_time) * 1000
# 更新统计信息
self.total_requests += 1
self.total_inference_time += inference_time
self.avg_inference_time = self.total_inference_time / self.total_requests
# 构建返回结果
result = {
"detections": detections,
"image_info": {
"width": original_width,
"height": original_height,
"format": image.format
},
"inference_time_ms": round(inference_time, 2)
}
# 如果需要返回标注图像
if return_image and detections:
annotated_image = self._draw_detections(cv2_image, detections)
pil_annotated = self._cv2_to_pil(annotated_image)
result["annotated_image"] = self._encode_image_to_base64(pil_annotated)
return result
except Exception as e:
logger.error(f"检测过程中出错: {str(e)}")
raise
def get_statistics(self) -> Dict[str, Any]:
"""获取处理统计信息"""
return {
"total_requests": self.total_requests,
"avg_inference_time_ms": round(self.avg_inference_time, 2),
"total_inference_time_ms": round(self.total_inference_time, 2)
}
这个图像处理器完成了几个重要任务:
- 格式转换:在PIL、OpenCV、numpy格式间灵活转换
- 结果解析:将模型输出转换为结构化的JSON数据
- 可视化标注:在图像上绘制检测框和标签
- 性能统计:记录和计算推理时间等指标
3.4 Gradio可视化界面
为了让非技术人员也能方便地测试模型,我们添加了Gradio可视化界面。
# webui/app.py
import gradio as gr
import requests
import json
import base64
from PIL import Image
import io
import numpy as np
import os
# API服务地址
API_URL = os.getenv("API_URL", "http://localhost:8000")
def predict_image(image, confidence_threshold):
"""调用API进行预测"""
try:
# 将图像转换为字节
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
image_bytes = buffered.getvalue()
# 准备请求
files = {"file": ("image.jpg", image_bytes, "image/jpeg")}
params = {
"confidence": confidence_threshold,
"return_image": True
}
# 发送请求
response = requests.post(
f"{API_URL}/predict",
files=files,
params=params
)
if response.status_code == 200:
result = response.json()
# 解码标注图像
if "annotated_image" in result:
annotated_bytes = base64.b64decode(result["annotated_image"])
annotated_image = Image.open(io.BytesIO(annotated_bytes))
else:
annotated_image = image
# 构建检测结果文本
detections_text = f"检测到 {result['detections_count']} 个目标:\n\n"
for i, detection in enumerate(result["detections"], 1):
detections_text += f"{i}. {detection['class']} (置信度: {detection['confidence']:.2%})\n"
detections_text += f" 位置: [{detection['bbox'][0]}, {detection['bbox'][1]}, {detection['bbox'][2]}, {detection['bbox'][3]}]\n"
detections_text += f" 面积: {detection['area']} 像素\n\n"
detections_text += f"\n推理时间: {result['inference_time_ms']}ms"
return annotated_image, detections_text
else:
error_msg = f"API请求失败: {response.status_code}\n{response.text}"
return image, error_msg
except Exception as e:
error_msg = f"预测过程中出错: {str(e)}"
return image, error_msg
def create_webui():
"""创建Gradio Web界面"""
with gr.Blocks(title="YOLO12目标检测演示", theme=gr.themes.Soft()) as demo:
gr.Markdown("# YOLO12实时目标检测演示")
gr.Markdown("上传图片,体验YOLO12的强大检测能力")
with gr.Row():
with gr.Column(scale=1):
# 输入组件
image_input = gr.Image(
label="上传图片",
type="pil",
height=400
)
confidence_slider = gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.25,
step=0.05,
label="置信度阈值",
info="值越低检测越多目标(可能包含误报),值越高只检测高置信度目标"
)
detect_button = gr.Button("开始检测", variant="primary")
gr.Markdown("### 模型信息")
gr.Markdown(f"""
- **当前模型**: YOLOv12n (nano版)
- **参数量**: 370万
- **模型大小**: 5.6MB
- **支持类别**: 80类常见物体
- **输入分辨率**: 640×640
""")
with gr.Column(scale=1):
# 输出组件
image_output = gr.Image(
label="检测结果",
type="pil",
height=400
)
text_output = gr.Textbox(
label="检测结果详情",
lines=15,
max_lines=20
)
# 绑定事件
detect_button.click(
fn=predict_image,
inputs=[image_input, confidence_slider],
outputs=[image_output, text_output]
)
# 示例图片
gr.Markdown("### 🖼 示例图片")
with gr.Row():
example_images = [
"examples/person_car.jpg",
"examples/street_scene.jpg",
"examples/indoor.jpg"
]
for img_path in example_images:
if os.path.exists(img_path):
gr.Examples(
examples=[[img_path, 0.25]],
inputs=[image_input, confidence_slider],
outputs=[image_output, text_output],
fn=predict_image,
label=os.path.basename(img_path)
)
# API使用说明
with gr.Accordion(" API使用说明", open=False):
gr.Markdown("""
### REST API端点
**单图检测** (POST `/predict`)
```bash
curl -X POST "http://localhost:8000/predict" \\
-H "accept: application/json" \\
-F "file=@/path/to/image.jpg" \\
-F "confidence=0.25"
```
**批量检测** (POST `/batch_predict`)
```bash
curl -X POST "http://localhost:8000/batch_predict" \\
-H "accept: application/json" \\
-F "files=@image1.jpg" \\
-F "files=@image2.jpg"
```
**服务状态** (GET `/status`)
```bash
curl "http://localhost:8000/status"
```
""")
return demo
if __name__ == "__main__":
demo = create_webui()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)
这个Web界面提供了:
- 直观的操作界面:上传图片、调整参数、查看结果
- 实时可视化:显示标注后的图片和详细的检测信息
- 示例图片:提供测试图片,方便快速体验
- API文档:内置API使用说明,方便开发者参考
4. 性能优化与生产部署
4.1 性能优化技巧
在实际生产环境中,我们需要对服务进行优化以确保稳定性和性能。以下是一些关键优化点:
# optimizations/performance_optimizer.py
import torch
import torch.nn as nn
from contextlib import contextmanager
import time
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class PerformanceOptimizer:
"""性能优化器,提供多种优化策略"""
def __init__(self, model: nn.Module, device: str = "cuda"):
self.model = model
self.device = device
self.original_training = model.training
@contextmanager
def inference_mode(self):
"""推理模式上下文管理器"""
original_training = self.model.training
try:
self.model.eval()
with torch.no_grad():
yield
finally:
if original_training:
self.model.train()
def enable_tensorrt(self, precision: str = "fp16"):
"""启用TensorRT加速(如果可用)"""
try:
import tensorrt as trt
logger.info(f"启用TensorRT加速,精度: {precision}")
# 这里简化了TensorRT转换过程
# 实际使用时需要根据模型结构进行详细配置
if precision == "fp16":
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
return True
except ImportError:
logger.warning("TensorRT未安装,跳过加速")
return False
def optimize_memory(self, max_batch_size: int = 8):
"""内存优化配置"""
if "cuda" in self.device:
# 设置CUDA内存分配策略
torch.cuda.empty_cache()
torch.backends.cudnn.benchmark = True
# 监控显存使用
def memory_monitor():
allocated = torch.cuda.memory_allocated() / 1e9
cached = torch.cuda.memory_reserved() / 1e9
logger.info(f"显存使用: 已分配 {allocated:.2f}GB, 缓存 {cached:.2f}GB")
return memory_monitor
return lambda: None
def batch_processing_optimization(self, batch_size: int = 4):
"""批量处理优化"""
# 动态调整批量大小
if "cuda" in self.device:
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9
if gpu_memory < 4: # 小于4GB显存
recommended_batch = 1
elif gpu_memory < 8: # 4-8GB显存
recommended_batch = 2
elif gpu_memory < 16: # 8-16GB显存
recommended_batch = 4
else: # 16GB以上显存
recommended_batch = 8
actual_batch = min(batch_size, recommended_batch)
logger.info(f"根据显存({gpu_memory:.1f}GB)推荐批量大小: {actual_batch}")
return actual_batch
return min(batch_size, 4) # CPU模式限制批量大小
def create_warmup_script(self, warmup_iterations: int = 10):
"""创建预热脚本"""
def warmup():
logger.info(f"开始模型预热 ({warmup_iterations}次迭代)")
dummy_input = torch.randn(1, 3, 640, 640).to(self.device)
with self.inference_mode():
for i in range(warmup_iterations):
start_time = time.time()
_ = self.model(dummy_input)
iter_time = (time.time() - start_time) * 1000
if i == 0:
logger.info(f"首次推理: {iter_time:.2f}ms")
elif i == warmup_iterations - 1:
logger.info(f"末次推理: {iter_time:.2f}ms")
logger.info("模型预热完成")
return warmup
4.2 Docker容器化部署
对于生产环境,我强烈推荐使用Docker进行容器化部署。这里提供一个完整的Dockerfile:
# Dockerfile
FROM nvidia/cuda:12.4.0-runtime-ubuntu22.04
# 设置环境变量
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV YOLO_MODEL=n
# 安装系统依赖
RUN apt-get update && apt-get install -y \
python3.11 \
python3.11-dev \
python3.11-venv \
python3-pip \
git \
wget \
curl \
libgl1-mesa-glx \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# 创建虚拟环境
RUN python3.11 -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# 复制项目文件
WORKDIR /app
COPY requirements.txt .
COPY . .
# 安装Python依赖
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# 下载模型权重
RUN mkdir -p models/yolo12 && \
wget https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov12n.pt -O models/yolo12/yolov12n.pt && \
wget https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov12s.pt -O models/yolo12/yolov12s.pt && \
wget https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov12m.pt -O models/yolo12/yolov12m.pt
# 创建启动脚本
RUN echo '#!/bin/bash\n\
\n\
# 设置模型规格\n\
if [ -n "$YOLO_MODEL" ]; then\n\
echo "使用模型规格: $YOLO_MODEL"\n\
else\n\
export YOLO_MODEL=n\n\
echo "使用默认模型规格: nano"\n\
fi\n\
\n\
# 启动FastAPI服务\n\
python main.py &\n\
\n\
# 启动Gradio Web界面\n\
python webui/app.py &\n\
\n\
# 等待所有服务\n\
wait\n\
' > /usr/local/bin/start.sh && \
chmod +x /usr/local/bin/start.sh
# 暴露端口
EXPOSE 8000 7860
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/ || exit 1
# 启动命令
CMD ["/usr/local/bin/start.sh"]
对应的docker-compose.yml文件:
# docker-compose.yml
version: '3.8'
services:
yolo12-service:
build: .
container_name: yolo12-detection
ports:
- "8000:8000" # FastAPI
- "7860:7860" # Gradio
environment:
- YOLO_MODEL=n # 可改为 s/m/l/x
- CUDA_VISIBLE_DEVICES=0
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
- ./logs:/app/logs
- ./models:/app/models
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
4.3 监控与日志
生产环境还需要完善的监控和日志系统:
# monitoring/logger_config.py
import logging
import json
from datetime import datetime
from typing import Dict, Any
import sys
class JSONFormatter(logging.Formatter):
"""JSON格式的日志格式化器"""
def format(self, record: logging.LogRecord) -> str:
log_object = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# 添加异常信息
if record.exc_info:
log_object["exception"] = self.formatException(record.exc_info)
# 添加额外字段
if hasattr(record, "extra_fields"):
log_object.update(record.extra_fields)
return json.dumps(log_object, ensure_ascii=False)
def setup_logging(log_level: str = "INFO", log_file: str = None):
"""配置日志系统"""
# 创建根日志记录器
logger = logging.getLogger()
logger.setLevel(getattr(logging, log_level.upper()))
# 清除现有处理器
logger.handlers.clear()
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
# 文件处理器(如果指定了日志文件)
handlers = [console_handler]
if log_file:
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
handlers.append(file_handler)
# 设置格式化器
json_formatter = JSONFormatter()
for handler in handlers:
handler.setFormatter(json_formatter)
logger.addHandler(handler)
# 设置第三方库的日志级别
logging.getLogger("uvicorn").setLevel(logging.WARNING)
logging.getLogger("fastapi").setLevel(logging.WARNING)
return logger
# monitoring/metrics_collector.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from typing import Dict, Any
import time
class MetricsCollector:
"""指标收集器,用于监控服务性能"""
def __init__(self):
# 请求相关指标
self.requests_total = Counter(
'yolo12_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
self.request_duration = Histogram(
'yolo12_request_duration_seconds',
'Request duration in seconds',
['endpoint'],
buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 5.0)
)
# 检测相关指标
self.detections_total = Counter(
'yolo12_detections_total',
'Total number of detections',
['class']
)
self.inference_duration = Histogram(
'yolo12_inference_duration_seconds',
'Inference duration in seconds',
['model_size'],
buckets=(0.001, 0.005, 0.01, 0.05, 0.1, 0.5)
)
# 系统资源指标
self.gpu_memory_usage = Gauge(
'yolo12_gpu_memory_usage_bytes',
'GPU memory usage in bytes'
)
self.cpu_usage = Gauge(
'yolo12_cpu_usage_percent',
'CPU usage percentage'
)
# 业务指标
self.active_models = Gauge(
'yolo12_active_models',
'Number of active models'
)
self.avg_confidence = Gauge(
'yolo12_avg_confidence',
'Average detection confidence'
)
def record_request(self, method: str, endpoint: str, status: str, duration: float):
"""记录请求指标"""
self.requests_total.labels(method=method, endpoint=endpoint, status=status).inc()
self.request_duration.labels(endpoint=endpoint).observe(duration)
def record_detection(self, class_name: str, confidence: float, duration: float):
"""记录检测指标"""
self.detections_total.labels(class=class_name).inc()
self.inference_duration.labels(model_size="n").observe(duration)
self.avg_confidence.set(confidence)
def update_system_metrics(self):
"""更新系统指标"""
try:
import psutil
import torch
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
# GPU显存使用(如果可用)
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated()
self.gpu_memory_usage.set(allocated)
except ImportError:
pass
def get_metrics(self) -> str:
"""获取所有指标(Prometheus格式)"""
self.update_system_metrics()
return generate_latest().decode('utf-8')
5. 实际应用案例
5.1 智能安防监控系统
让我们看一个实际的应用案例:智能安防监控系统。这个系统需要实时分析摄像头视频流,检测异常行为。
# examples/security_monitor.py
import cv2
import requests
import time
import threading
from queue import Queue
from datetime import datetime
import json
class SecurityMonitor:
"""智能安防监控系统"""
def __init__(self, camera_url: str, api_url: str = "http://localhost:8000"):
self.camera_url = camera_url
self.api_url = api_url
self.frame_queue = Queue(maxsize=10)
self.alerts = []
self.running = False
# 关注的目标类别
self.target_classes = ['person', 'car', 'truck', 'motorcycle', 'bicycle']
# 报警阈值
self.alert_thresholds = {
'person': {'count': 5, 'confidence': 0.7}, # 超过5个人报警
'car': {'count': 3, 'confidence': 0.8}, # 超过3辆车报警
}
def capture_frames(self):
"""捕获视频帧"""
cap = cv2.VideoCapture(self.camera_url)
while self.running:
ret, frame = cap.read()
if not ret:
print("无法读取视频帧")
time.sleep(1)
continue
# 控制帧率(每秒5帧)
if self.frame_queue.full():
# 丢弃最旧的帧
try:
self.frame_queue.get_nowait()
except:
pass
self.frame_queue.put(frame)
time.sleep(0.2) # 5 FPS
cap.release()
def process_frame(self, frame):
"""处理单帧图像"""
try:
# 编码图像
_, buffer = cv2.imencode('.jpg', frame)
image_bytes = buffer.tobytes()
# 调用检测API
files = {"file": ("frame.jpg", image_bytes, "image/jpeg")}
response = requests.post(
f"{self.api_url}/predict",
files=files,
params={"confidence": 0.5}
)
if response.status_code == 200:
result = response.json()
return self.analyze_detections(result)
else:
print(f"API请求失败: {response.status_code}")
return None
except Exception as e:
print(f"处理帧时出错: {str(e)}")
return None
def analyze_detections(self, result):
"""分析检测结果,触发报警"""
detections = result.get("detections", [])
alerts = []
# 统计各类别数量
class_counts = {}
for detection in detections:
class_name = detection["class"]
confidence = detection["confidence"]
if class_name in self.target_classes:
if class_name not in class_counts:
class_counts[class_name] = []
class_counts[class_name].append(confidence)
# 检查是否触发报警
for class_name, confidences in class_counts.items():
count = len(confidences)
avg_confidence = sum(confidences) / count if count > 0 else 0
if class_name in self.alert_thresholds:
threshold = self.alert_thresholds[class_name]
if count >= threshold['count'] and avg_confidence >= threshold['confidence']:
alert = {
"timestamp": datetime.now().isoformat(),
"type": "high_density",
"class": class_name,
"count": count,
"confidence": avg_confidence,
"message": f"检测到{count}个{class_name},可能异常聚集"
}
alerts.append(alert)
self.alerts.append(alert)
return {
"timestamp": datetime.now().isoformat(),
"detections_count": len(detections),
"class_counts": {k: len(v) for k, v in class_counts.items()},
"alerts": alerts,
"inference_time": result.get("inference_time_ms", 0)
}
def start_monitoring(self):
"""开始监控"""
self.running = True
# 启动视频捕获线程
capture_thread = threading.Thread(target=self.capture_frames)
capture_thread.daemon = True
capture_thread.start()
print(f"开始监控: {self.camera_url}")
try:
while self.running:
if not self.frame_queue.empty():
frame = self.frame_queue.get()
# 处理帧
result = self.process_frame(frame)
if result and result["alerts"]:
print(f" 报警: {result['alerts']}")
# 显示处理后的帧(可选)
self.display_frame(frame, result)
time.sleep(0.1)
except KeyboardInterrupt:
print("监控停止")
finally:
self.running = False
def display_frame(self, frame, result):
"""显示处理后的帧"""
if result:
# 在帧上绘制统计信息
text = f"检测: {result['detections_count']} | 报警: {len(result['alerts'])}"
cv2.putText(frame, text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.imshow('Security Monitor', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.running = False
def get_alerts(self, limit: int = 10):
"""获取最近的报警记录"""
return self.alerts[-limit:] if self.alerts else []
# 使用示例
if __name__ == "__main__":
# 使用本地摄像头或RTSP流
monitor = SecurityMonitor(
camera_url=0, # 0表示本地摄像头,或使用"rtsp://username:password@ip:port/stream"
api_url="http://localhost:8000"
)
monitor.start_monitoring()
5.2 批量图片处理工具
另一个常见场景是批量处理图片,比如智能相册的自动标注:
# examples/batch_processor.py
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from tqdm import tqdm
from PIL import Image
import io
class BatchImageProcessor:
"""批量图片处理器"""
def __init__(self, api_url: str = "http://localhost:8000", max_workers: int = 4):
self.api_url = api_url
self.max_workers = max_workers
def process_single_image(self, image_path: str, output_dir: str = None):
"""处理单张图片"""
try:
# 读取图片
with open(image_path, 'rb') as f:
image_data = f.read()
# 调用API
files = {"file": (os.path.basename(image_path), image_data, "image/jpeg")}
response = requests.post(
f"{self.api_url}/predict",
files=files,
params={"confidence": 0.3, "return_image": True}
)
if response.status_code == 200:
result = response.json()
# 保存结果
if output_dir:
self.save_results(image_path, result, output_dir)
return {
"success": True,
"image": image_path,
"detections": len(result["detections"]),
"time": result["inference_time_ms"]
}
else:
return {
"success": False,
"image": image_path,
"error": f"API错误: {response.status_code}"
}
except Exception as e:
return {
"success": False,
"image": image_path,
"error": str(e)
}
def save_results(self, image_path: str, result: dict, output_dir: str):
"""保存处理结果"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "annotated"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "json"), exist_ok=True)
base_name = os.path.splitext(os.path.basename(image_path))[0]
# 保存JSON结果
json_path = os.path.join(output_dir, "json", f"{base_name}.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
# 保存标注图片(如果有)
if "annotated_image" in result:
import base64
image_data = base64.b64decode(result["annotated_image"])
annotated_path = os.path.join(output_dir, "annotated", f"{base_name}_annotated.jpg")
with open(annotated_path, 'wb') as f:
f.write(image_data)
def process_directory(self, input_dir: str, output_dir: str = None):
"""处理整个目录的图片"""
# 收集所有图片文件
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
image_files = []
for root, _, files in os.walk(input_dir):
for file in files:
if os.path.splitext(file)[1].lower() in image_extensions:
image_files.append(os.path.join(root, file))
print(f"找到 {len(image_files)} 张图片")
# 使用线程池并行处理
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_image = {
executor.submit(self.process_single_image, img, output_dir): img
for img in image_files
}
# 使用进度条显示处理进度
with tqdm(total=len(image_files), desc="处理图片") as pbar:
for future in as_completed(future_to_image):
image_path = future_to_image[future]
try:
result = future.result()
results.append(result)
except Exception as e:
results.append({
"success": False,
"image": image_path,
"error": str(e)
})
pbar.update(1)
# 生成统计报告
self.generate_report(results, output_dir)
return results
def generate_report(self, results: list, output_dir: str):
"""生成处理报告"""
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
total_detections = sum(r.get("detections", 0) for r in successful)
avg_time = sum(r.get("time", 0) for r in successful) / len(successful) if successful else 0
report = {
"summary": {
"total_images": len(results),
"successful": len(successful),
"failed": len(failed),
"success_rate": len(successful) / len(results) * 100 if results else 0,
"total_detections": total_detections,
"avg_detections_per_image": total_detections / len(successful) if successful else 0,
"avg_inference_time_ms": avg_time
},
"failed_images": failed,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
# 保存报告
report_path = os.path.join(output_dir, "processing_report.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"\n处理完成!")
print(f"成功: {len(successful)}/{len(results)} ({report['summary']['success_rate']:.1f}%)")
print(f"总检测数: {total_detections}")
print(f"平均每图检测数: {report['summary']['avg_detections_per_image']:.1f}")
print(f"平均推理时间: {avg_time:.1f}ms")
print(f"详细报告已保存至: {report_path}")
# 使用示例
if __name__ == "__main__":
processor = BatchImageProcessor(
api_url="http://localhost:8000",
max_workers=4 # 根据CPU核心数调整
)
# 处理图片目录
results = processor.process_directory(
input_dir="./input_images",
output_dir="./output_results"
)
6. 总结
通过本文的详细介绍,你已经掌握了使用YOLO12和FastAPI构建高性能目标检测服务的完整流程。让我们回顾一下关键要点:
6.1 核心优势总结
- 极致的性能表现:YOLO12 nano版在RTX 4090上可达131 FPS,满足实时性要求
- 灵活的五档模型:从370万参数的nano版到数千万参数的xlarge版,适应不同场景
- 完整的服务生态:REST API + Web界面,满足开发和测试双重需求
- 生产级可靠性:完善的错误处理、日志记录、性能监控
- 便捷的部署方式:支持Docker容器化部署,一键启动
6.2 实际应用价值
这个解决方案已经在多个实际场景中证明了其价值:
- 安防监控:实时分析摄像头视频流,检测异常行为
- 智能相册:自动标注照片内容,实现智能分类和搜索
- 工业质检:检测产品缺陷,提高生产效率
- 教学演示:直观展示目标检测算法原理和效果
- 快速原型:为AI应用提供即插即用的检测能力
6.3 后续优化方向
如果你需要进一步提升服务能力,可以考虑以下方向:
- 模型微调:针对特定场景训练自定义模型
- 视频流支持:直接处理RTSP/HLS视频流
- 分布式部署:支持多GPU、多节点的负载均衡
- 模型版本管理:支持热更新和A/B测试
- 边缘部署:优化模型以适应边缘设备
6.4 开始你的项目
现在,你已经拥有了构建生产级目标检测服务的所有工具和知识。无论是想快速验证一个想法,还是需要为现有系统添加AI能力,这个方案都能为你提供强大的支持。
记住,最好的学习方式是实践。从部署一个简单的实例开始,逐步探索更复杂的应用场景。如果在实践中遇到问题,欢迎在评论区交流讨论。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐
所有评论(0)