YOLOv12模型服务化:使用Flask构建高并发RESTful API接口

你是不是也遇到过这样的场景?自己训练了一个很棒的YOLOv12模型,检测效果非常精准,但每次想用的时候,都得打开Python脚本,加载模型,处理图片,然后才能看到结果。这要是自己用用还行,可要是想给同事用,或者集成到其他系统里,就变得特别麻烦。

其实,把模型变成一个Web服务,让任何人都能通过一个简单的网络请求来调用它,这才是模型真正发挥价值的方式。今天,我就来手把手教你,怎么用最轻量、最流行的Flask框架,把YOLOv12模型包装成一个既稳定又能扛住高并发访问的RESTful API。

整个过程就像给模型盖一个对外营业的“小窗口”,你只需要把图片递进去,它就能把检测结果吐出来。学完这篇,你不仅能部署自己的模型服务,还能掌握一套应对多人同时访问的实用技巧。

1. 环境准备与项目搭建

在开始敲代码之前,我们得先把“厨房”收拾好,把需要的“食材”和“工具”备齐。整个过程很简单,跟着步骤走就行。

1.1 安装核心依赖

首先,确保你的电脑上已经安装了Python(建议3.8或以上版本)。然后,我们打开命令行,创建一个新的项目文件夹,并安装几个关键的包。

# 创建一个新的项目目录
mkdir yolov12_flask_api
cd yolov12_flask_api

# 创建并激活虚拟环境(可选,但强烈推荐,可以避免包版本冲突)
python -m venv venv
# Windows系统激活命令:
# venv\Scripts\activate
# Mac/Linux系统激活命令:
# source venv/bin/activate

# 安装核心依赖
pip install flask torch torchvision opencv-python pillow numpy

这里简单解释一下这几个包是干什么的:

  • flask: 我们用来构建Web服务的轻量级框架,非常容易上手。
  • torch & torchvision: PyTorch深度学习框架,YOLOv12模型运行的基础。
  • opencv-python (cv2): 用来读取和处理图片,功能非常强大。
  • pillow (PIL): 另一个常用的图像处理库,和Flask配合得很好。
  • numpy: 科学计算的基础包,处理数组数据离不开它。

1.2 准备YOLOv12模型文件

接下来,你需要准备好训练好的YOLOv12模型权重文件(通常是 .pt.pth 格式)。假设你的模型文件叫 yolov12_best.pt,把它放到项目根目录下。

如果你还没有现成的模型,只是想先测试流程,可以去YOLO的官方仓库下载一个预训练模型,比如 yolov12s.pt,同样放到项目根目录。

现在,你的项目文件夹结构看起来应该是这样的:

yolov12_flask_api/
├── yolov12_best.pt  # 你的模型权重文件
├── venv/            # 虚拟环境目录(如果创建了)
└── (后续会创建app.py等文件)

好了,准备工作完成,我们可以开始动手搭建服务了。

2. 构建Flask应用骨架

Flask应用的核心就是一个Python脚本。我们在项目根目录下创建一个名为 app.py 的文件,这是我们的主程序入口。

2.1 创建基础应用

打开 app.py,我们先写一个最基础的“Hello World”服务,确保Flask能跑起来。

from flask import Flask, request, jsonify
import cv2
import numpy as np
from PIL import Image
import io
import torch
import threading
import queue
import time
import logging

# 初始化Flask应用
app = Flask(__name__)

# 设置日志,方便查看运行情况
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 第一个路由:健康检查端点
@app.route('/health', methods=['GET'])
def health_check():
    """健康检查接口,用于确认服务是否正常运行"""
    return jsonify({'status': 'healthy', 'message': 'YOLOv12 API service is running.'})

# 启动服务
if __name__ == '__main__':
    # debug=True 仅用于开发环境,生产环境必须设为False
    app.run(host='0.0.0.0', port=5000, debug=True)

保存文件,然后在命令行运行:

python app.py

你应该会看到类似这样的输出:

* Serving Flask app 'app'
* Debug mode: on
WARNING: This is a development server. Do not use it in a production deployment.
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:5000
* Running on http://192.168.x.x:5000

打开浏览器,访问 http://127.0.0.1:5000/health,如果看到返回的JSON信息 {"status": "healthy", ...},恭喜你,Flask服务已经成功启动了!

2.2 加载YOLOv12模型

服务跑起来了,现在要把我们的“主角”——YOLOv12模型请上场。我们需要一个地方来加载并保存模型,避免每次请求都重复加载,那样太慢了。

我们在 app.py 的开头,初始化Flask应用之后,添加模型加载的代码。这里假设你使用的是Ultralytics版本的YOLO。

# ... 前面的import语句 ...

# 初始化Flask应用
app = Flask(__name__)

# --- 模型加载部分 ---
MODEL_PATH = 'yolov12_best.pt'  # 你的模型文件路径
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
logger.info(f"Using device: {device}")

# 加载模型(这里以Ultralytics YOLO为例)
try:
    # 注意:这里需要根据你实际使用的YOLO版本调整导入和加载方式
    # 如果是Ultralytics YOLOv8/v10/v12,通常这样加载
    from ultralytics import YOLO
    model = YOLO(MODEL_PATH).to(device)
    model.eval()  # 设置为评估模式
    logger.info(f"YOLOv12 model loaded successfully from {MODEL_PATH}")
except ImportError:
    logger.error("Failed to import 'ultralytics'. Please install it: pip install ultralytics")
    model = None
except Exception as e:
    logger.error(f"Failed to load model: {e}")
    model = None

# 定义模型能够识别的类别名称(根据你自己的模型修改)
# 这里是一个COCO数据集的示例,共80类
CLASS_NAMES = [
    'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat',
    'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat',
    'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack',
    'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
    'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair',
    'couch', 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse',
    'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator',
    'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
] if model is not None else []

# ... 后面的health_check路由和启动代码 ...

重要提示:上面的代码使用了 ultralytics 库来加载YOLO模型。如果你的模型是基于其他框架(比如原版Darknet或PyTorch重实现)训练的,加载方式会不同。你需要根据自己模型的实际情况调整这部分代码。核心思想就是:在服务启动时一次性加载模型到内存(或显存)中,后面所有请求都共用这个加载好的模型。

现在,模型已经准备好了,接下来我们要设计一个接口,让它能接收图片并返回检测结果。

3. 设计核心检测API接口

一个友好的API接口,最好能灵活支持多种图片输入方式。我们设计一个接口,同时支持Base64编码字符串和直接上传图片文件两种形式。

3.1 图片预处理与后处理函数

在写主接口之前,我们先封装几个工具函数,让代码更清晰。

# ... 前面的代码 ...

def preprocess_image(image_data, input_type='file'):
    """
    将接收到的图片数据转换为模型需要的格式(numpy数组)。
    支持文件和base64两种输入类型。

    参数:
        image_data: 图片数据,可能是文件对象或base64字符串。
        input_type: 'file' 或 'base64'。

    返回:
        numpy.ndarray: 处理后的图片数组,形状为(H, W, C)。
    """
    try:
        if input_type == 'file':
            # 从上传的文件对象读取
            image = Image.open(io.BytesIO(image_data.read())).convert('RGB')
            img_np = np.array(image)
        elif input_type == 'base64':
            # 从base64字符串解码
            # 假设传入的base64字符串不包含头部信息(如'data:image/jpeg;base64,')
            import base64
            img_bytes = base64.b64decode(image_data)
            img_np = np.array(Image.open(io.BytesIO(img_bytes)).convert('RGB'))
        else:
            raise ValueError(f"Unsupported input_type: {input_type}")

        # 确保是彩色三通道图像
        if len(img_np.shape) == 2:
            img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2RGB)
        elif img_np.shape[2] == 4:
            img_np = cv2.cvtColor(img_np, cv2.COLOR_RGBA2RGB)

        return img_np
    except Exception as e:
        logger.error(f"Error in preprocess_image: {e}")
        raise

def run_inference(image_np):
    """
    使用加载好的模型对图片进行推理。

    参数:
        image_np (numpy.ndarray): 预处理后的图片数组。

    返回:
        list: 检测结果的列表,每个元素是一个字典,包含bbox、置信度、类别等信息。
    """
    if model is None:
        raise RuntimeError("Model is not loaded.")

    # 使用模型进行预测
    # 注意:ultralytics YOLO模型的predict方法接收numpy数组或图片路径
    results = model(image_np, verbose=False)  # verbose=False关闭预测日志

    # 解析结果
    detections = []
    for result in results:
        if result.boxes is not None:
            boxes = result.boxes.xyxy.cpu().numpy()  # 边界框 [x1, y1, x2, y2]
            confs = result.boxes.conf.cpu().numpy()  # 置信度
            cls_ids = result.boxes.cls.cpu().numpy().astype(int)  # 类别ID

            for box, conf, cls_id in zip(boxes, confs, cls_ids):
                detections.append({
                    'bbox': box.tolist(),  # 转为Python list
                    'confidence': float(conf),
                    'class_id': int(cls_id),
                    'class_name': CLASS_NAMES[cls_id] if cls_id < len(CLASS_NAMES) else f'class_{cls_id}'
                })
    return detections

def postprocess_to_json(detections, image_np):
    """
    将检测结果组织成结构化的JSON格式。

    参数:
        detections (list): run_inference返回的检测结果列表。
        image_np (numpy.ndarray): 原始图片数组,用于获取尺寸。

    返回:
        dict: 结构化的检测结果。
    """
    height, width = image_np.shape[:2]
    return {
        'image_info': {
            'width': width,
            'height': height,
            'channels': image_np.shape[2] if len(image_np.shape) > 2 else 1
        },
        'detections': detections,
        'detection_count': len(detections),
        'timestamp': time.time()
    }

3.2 实现主检测接口

工具函数准备好了,现在来实现最核心的 /detect 接口。

# ... 前面的代码 ...

@app.route('/detect', methods=['POST'])
def detect():
    """
    主检测接口。
    支持两种图片上传方式:
    1. 表单文件上传 (multipart/form-data): 参数名为 'image'
    2. JSON Base64上传 (application/json): JSON体为 {'image': 'base64_string'}

    返回:
        JSON格式的检测结果。
    """
    start_time = time.time()
    response_data = {'success': False, 'message': '', 'data': None}

    try:
        # 判断请求内容类型
        content_type = request.headers.get('Content-Type', '')

        image_np = None
        input_type = 'unknown'

        if 'multipart/form-data' in content_type:
            # 方式1: 通过表单上传文件
            if 'image' not in request.files:
                response_data['message'] = 'No image file provided in form data.'
                return jsonify(response_data), 400

            file = request.files['image']
            if file.filename == '':
                response_data['message'] = 'No selected file.'
                return jsonify(response_data), 400

            # 检查文件类型
            if not file.filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
                response_data['message'] = 'Unsupported file format. Please upload an image file.'
                return jsonify(response_data), 400

            image_np = preprocess_image(file, input_type='file')
            input_type = 'file_upload'
            logger.info(f"Received image file: {file.filename}")

        elif 'application/json' in content_type:
            # 方式2: 通过JSON Body传递Base64字符串
            data = request.get_json()
            if not data or 'image' not in data:
                response_data['message'] = 'No image data provided in JSON body.'
                return jsonify(response_data), 400

            base64_str = data['image']
            # 简单清理可能存在的base64头部信息
            if ',' in base64_str:
                base64_str = base64_str.split(',')[1]

            image_np = preprocess_image(base64_str, input_type='base64')
            input_type = 'base64_json'
            logger.info("Received image via base64 JSON.")

        else:
            response_data['message'] = f'Unsupported Content-Type: {content_type}. Use multipart/form-data or application/json.'
            return jsonify(response_data), 400

        # 执行推理
        detections = run_inference(image_np)

        # 组织返回结果
        result_json = postprocess_to_json(detections, image_np)
        result_json['input_type'] = input_type

        response_data.update({
            'success': True,
            'message': 'Detection completed successfully.',
            'data': result_json
        })

        process_time = time.time() - start_time
        logger.info(f"Detection finished. Took {process_time:.3f}s, found {len(detections)} objects.")

    except ValueError as ve:
        response_data['message'] = f'Input error: {str(ve)}'
        logger.warning(f"Input error: {ve}")
        return jsonify(response_data), 400
    except RuntimeError as re:
        response_data['message'] = f'Model error: {str(re)}'
        logger.error(f"Model error: {re}")
        return jsonify(response_data), 500
    except Exception as e:
        response_data['message'] = f'Internal server error: {str(e)}'
        logger.error(f"Unexpected error in /detect: {e}", exc_info=True)
        return jsonify(response_data), 500

    return jsonify(response_data)

# ... 后面的启动代码 ...

保存文件,重启Flask服务(按Ctrl+C停止,再运行 python app.py)。现在,你的模型服务就有了一个功能完整的检测接口了!

4. 应对高并发:从同步到异步处理

如果你的服务只是自己用,上面的代码已经足够了。但想象一下,如果同时有几十、上百个人给你的接口发送图片,会怎么样?Flask默认是同步处理请求的,意味着前一个人的请求没处理完,后一个人就得干等着。这显然不行。

我们需要让服务能同时处理多个请求。这里介绍两种实用的方法:多线程和异步队列。

4.1 使用Flask的多线程模式

最简单的方法,就是利用Flask/Werkzeug内置的多线程能力。修改启动方式即可。

找到 app.py 文件最后启动服务的那行代码:

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

修改 app.run 的参数,启用多线程并设置线程数:

if __name__ == '__main__':
    # 生产环境务必设置 debug=False
    # threaded=True 启用多线程处理
    # 通过 processes 参数可以设置多进程,但通常配合WSGI服务器使用
    app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

这样修改后,Flask就能用多个线程来处理并发请求了。对于CPU推理(如果你的模型在CPU上跑)来说,这能有效利用多核。但要注意,Python的全局解释器锁(GIL)可能会限制多线程在CPU密集型任务上的性能。对于GPU推理,由于计算主要在GPU上,多线程通常能很好地提升并发能力。

4.2 实现一个简单的请求队列(生产者-消费者模型)

对于更高并发或更复杂的场景,我们可以引入一个任务队列。思路是:API接口(生产者)快速接收请求,把任务丢进一个队列;后台有一组工作线程(消费者)从队列里取任务,执行耗时的模型推理,然后把结果存起来;接口立刻返回一个“任务ID”,客户端可以用这个ID稍后查询结果。

这种方法能避免HTTP连接长时间等待,特别适合推理速度较慢的模型。

我们在 app.py 中添加队列相关的代码:

# ... 在文件开头import部分之后,定义全局变量 ...

# 任务队列和结果字典
task_queue = queue.Queue()
result_dict = {}
result_dict_lock = threading.Lock()

# ... 在模型加载代码之后,可以定义一个工作线程函数 ...

def worker_thread(worker_id):
    """工作线程函数,不断从队列中取任务并执行推理。"""
    logger.info(f"Worker thread {worker_id} started.")
    while True:
        try:
            # 从队列获取任务,最多等待10秒
            task_id, image_np, input_type = task_queue.get(timeout=10)
            logger.info(f"Worker {worker_id} processing task {task_id}")

            try:
                detections = run_inference(image_np)
                result = postprocess_to_json(detections, image_np)
                result['input_type'] = input_type
                result['status'] = 'completed'

                with result_dict_lock:
                    result_dict[task_id] = result

                logger.info(f"Worker {worker_id} completed task {task_id}, found {len(detections)} objects.")
            except Exception as e:
                logger.error(f"Worker {worker_id} failed on task {task_id}: {e}")
                with result_dict_lock:
                    result_dict[task_id] = {'status': 'failed', 'error': str(e)}
            finally:
                task_queue.task_done()  # 告诉队列这个任务处理完了

        except queue.Empty:
            # 队列为空,继续等待
            continue
        except Exception as e:
            logger.error(f"Worker {worker_id} encountered an error: {e}")
            time.sleep(1)  # 出错后稍作休息

# 启动工作线程
NUM_WORKERS = 2  # 根据你的CPU/GPU能力调整,通常等于CPU核心数或GPU能并行处理的任务数
for i in range(NUM_WORKERS):
    thread = threading.Thread(target=worker_thread, args=(i,), daemon=True)
    thread.start()
logger.info(f"Started {NUM_WORKERS} worker threads.")

然后,我们创建两个新的API接口:一个用于提交异步任务,一个用于查询结果。

# ... 在 /detect 接口后面,添加新的接口 ...

import uuid  # 用于生成唯一任务ID

@app.route('/async/detect', methods=['POST'])
def async_detect():
    """
    异步检测接口。
    接收图片,立即返回一个任务ID,推理在后台进行。
    参数格式同 /detect 接口。
    """
    # 图片接收和预处理逻辑与 /detect 接口前面部分完全相同
    # ... (这里省略重复的代码,请参考上面/detect接口的请求解析部分) ...
    # 假设我们得到了 image_np 和 input_type

    # 生成唯一任务ID
    task_id = str(uuid.uuid4())

    # 将任务放入队列
    # 注意:这里放入的是预处理好的image_np,避免在worker中重复IO操作
    task_queue.put((task_id, image_np, input_type))

    logger.info(f"Async task submitted: {task_id}")

    # 立即返回任务ID
    return jsonify({
        'success': True,
        'message': 'Detection task submitted successfully.',
        'task_id': task_id,
        'status_url': f'/async/result/{task_id}'  # 告知客户端查询结果的URL
    })

@app.route('/async/result/<task_id>', methods=['GET'])
def get_async_result(task_id):
    """
    查询异步任务结果。
    """
    with result_dict_lock:
        result = result_dict.get(task_id)

    if result is None:
        return jsonify({'success': False, 'message': 'Task not found or still processing.'}), 404

    if result.get('status') == 'completed':
        return jsonify({
            'success': True,
            'message': 'Task completed.',
            'task_id': task_id,
            'data': result
        })
    elif result.get('status') == 'failed':
        return jsonify({
            'success': False,
            'message': f"Task failed: {result.get('error')}",
            'task_id': task_id
        }), 500
    else:
        # 理论上不会走到这里,因为status只有上面两种
        return jsonify({'success': False, 'message': 'Task status unknown.'}), 500

现在,你的服务就有了同步(/detect)和异步(/async/detect)两种调用方式。对于轻量级、要求实时返回的请求,用同步接口;对于大批量、可以接受延迟的请求,用异步接口,体验会更好。

5. 编写API文档与客户端调用示例

服务写好了,得告诉别人怎么用。一个好的API,文档是必不可少的。我们直接在代码里用注释写好,并提供一个简单的调用示例。

5.1 完善接口文档

我们给每个接口函数都加上详细的文档字符串(Docstring),说明其用途、参数和返回值。上面代码中已经包含了一些,这里再强调一下格式,这有助于自动生成API文档。

@app.route('/detect', methods=['POST'])
def detect():
    """
    主检测接口(同步)。
    支持两种图片上传方式。

    **请求格式 1 (multipart/form-data):**
    - 使用表单上传,字段名必须为 `image`。
    - curl示例: `curl -X POST -F "image=@test.jpg" http://localhost:5000/detect`

    **请求格式 2 (application/json):**
    - JSON体格式: `{"image": "base64_encoded_string"}`
    - curl示例: `curl -X POST -H "Content-Type: application/json" -d '{"image":"<base64_str>"}' http://localhost:5000/detect`

    **成功响应 (200 OK):**
    ```json
    {
      "success": true,
      "message": "Detection completed successfully.",
      "data": {
        "image_info": {"width": 640, "height": 480, "channels": 3},
        "detections": [
          {
            "bbox": [100, 150, 200, 300],
            "confidence": 0.95,
            "class_id": 2,
            "class_name": "car"
          }
        ],
        "detection_count": 1,
        "timestamp": 1678886400.123,
        "input_type": "file_upload"
      }
    }
    ```

    **错误响应:**
    - 400 Bad Request: 请求参数错误。
    - 500 Internal Server Error: 服务器内部错误(如模型加载失败)。
    """
    # ... 函数实现 ...

5.2 提供客户端调用示例

光有文档还不够,最好再提供一个实实在在能跑的客户端脚本,让用户能快速测试。我们在项目根目录创建一个 client_example.py 文件。

# client_example.py
import requests
import base64
import json
import time

# API的基础地址
BASE_URL = "http://127.0.0.1:5000"

def test_health():
    """测试健康检查接口"""
    print("Testing health check...")
    resp = requests.get(f"{BASE_URL}/health")
    print(f"Status: {resp.status_code}")
    print(f"Response: {resp.json()}")
    print("-" * 40)

def test_sync_detect_with_file(image_path):
    """测试同步接口:通过文件上传"""
    print(f"Testing sync detection with file: {image_path}")
    with open(image_path, 'rb') as f:
        files = {'image': f}
        resp = requests.post(f"{BASE_URL}/detect", files=files)

    print(f"Status: {resp.status_code}")
    if resp.status_code == 200:
        result = resp.json()
        if result['success']:
            detections = result['data']['detections']
            print(f"Found {len(detections)} objects:")
            for det in detections[:3]:  # 只打印前3个检测结果
                print(f"  - {det['class_name']} (conf: {det['confidence']:.2f}) at {det['bbox']}")
        else:
            print(f"Error: {result['message']}")
    else:
        print(f"Request failed: {resp.text}")
    print("-" * 40)

def test_sync_detect_with_base64(image_path):
    """测试同步接口:通过Base64 JSON上传"""
    print(f"Testing sync detection with base64: {image_path}")
    with open(image_path, 'rb') as f:
        img_bytes = f.read()
        img_b64 = base64.b64encode(img_bytes).decode('utf-8')

    payload = {'image': img_b64}
    headers = {'Content-Type': 'application/json'}
    resp = requests.post(f"{BASE_URL}/detect", json=payload, headers=headers)

    print(f"Status: {resp.status_code}")
    if resp.status_code == 200:
        result = resp.json()
        if result['success']:
            print(f"Found {result['data']['detection_count']} objects.")
        else:
            print(f"Error: {result['message']}")
    else:
        print(f"Request failed: {resp.text}")
    print("-" * 40)

def test_async_detect(image_path):
    """测试异步接口"""
    print(f"Testing async detection: {image_path}")
    with open(image_path, 'rb') as f:
        files = {'image': f}
        resp = requests.post(f"{BASE_URL}/async/detect", files=files)

    print(f"Submit Status: {resp.status_code}")
    if resp.status_code == 200:
        result = resp.json()
        if result['success']:
            task_id = result['task_id']
            status_url = result['status_url']
            print(f"Task ID: {task_id}")
            print(f"Polling for result...")

            # 轮询查询结果,最多尝试10次
            for i in range(10):
                time.sleep(1)  # 每秒查一次
                result_resp = requests.get(f"{BASE_URL}{status_url}")
                if result_resp.status_code == 200:
                    task_result = result_resp.json()
                    if task_result['success']:
                        data = task_result['data']
                        print(f"Task completed! Found {data['detection_count']} objects.")
                        break
                    else:
                        print(f"Task failed: {task_result['message']}")
                        break
                elif result_resp.status_code == 404:
                    print(f"Still processing... ({i+1}/10)")
                else:
                    print(f"Error polling result: {result_resp.status_code}")
                    break
            else:
                print("Polling timeout after 10 seconds.")
        else:
            print(f"Submit Error: {result['message']}")
    else:
        print(f"Submit Request failed: {resp.text}")
    print("-" * 40)

if __name__ == '__main__':
    # 替换成你自己的测试图片路径
    test_image = "test.jpg"  # 确保项目目录下有一张名为test.jpg的图片

    test_health()
    test_sync_detect_with_file(test_image)
    test_sync_detect_with_base64(test_image)
    test_async_detect(test_image)
    print("All tests completed.")

运行这个客户端脚本,就能完整地测试你刚搭建好的所有API功能了。

6. 总结与后续优化建议

跟着走完一遍,你应该已经成功搭建起一个属于自己的YOLOv12模型服务了。从最基础的Flask应用,到加载模型、设计灵活的接口,再到考虑高并发引入异步处理,最后还提供了完整的文档和测试客户端。这个过程其实也是很多AI模型服务化的标准路径。

实际用起来,这个基础版本已经能解决大部分个人或小团队的需求了。不过,如果你打算把它用到更正式的生产环境,或者面对更大的流量,还有几个方向可以考虑优化。

首先是性能,如果推理速度是瓶颈,可以研究一下模型量化、剪枝或者用TensorRT、ONNX Runtime这些推理引擎来加速。其次是部署,用Flask自带的开发服务器跑生产环境是不推荐的,最好搭配Gunicorn、uWSGI或者用Docker容器化部署,这样更稳定。然后是监控,给服务加上日志记录、性能指标(比如请求耗时、QPS)和健康检查,出了问题能快速定位。

最后,功能上也可以扩展,比如增加批量图片处理接口、支持视频流检测、添加简单的认证机制,或者做一个更友好的Web前端界面。这些都可以根据你的实际需求慢慢加上去。

最关键的还是动手试试。把你训练好的模型放进去,用客户端脚本调用一下,看看效果。遇到问题就查查文档,或者调整一下代码。这个过程里积累的经验,比看十篇教程都有用。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐