步骤一 环境安装

1.1Paddle安装

找到paddlepaddle 官方网址:开始使用

根据你的硬件情况选择合适的安装方式,我这里选择的是docker, 因为我系统环境cuda version是12.0,所以我选择cuda11.8的镜像。
在这里插入图片描述
在这里插入图片描述
拉取预安装 PaddlePaddle 的镜像:

docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddle:3.3.0-gpu-cuda11.8-cudnn8.9

我使用的服务器拉取不了,我还得本地执行,然后分包****xxxx 上传img的tar包+docker load

然后用镜像构建并进入Docker容器:

docker run --gpus all --name paddle -it -v $PWD:/paddle ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddle:3.3.0-gpu-cuda11.8-cudnn8.9 /bin/bash

1.2 paddleocr安装

进入容器,回到paddleocr安装文档

安装paddleocr:

# 只希望使用基础文字识别功能(返回文字位置坐标和文本内容),包含 PP-OCR 系列
python -m pip install paddleocr
# 希望使用文档解析、文档理解、文档翻译、关键信息抽取等全部功能
# python -m pip install "paddleocr[all]"

步骤二 服务部署

基础API 服务

paddleocr官方给了4种示例, 本人使用的是第4种Run inference by API。
因为我只是做OCR识别,不做版面分析啥的,而且资源有限,所以这几个都配置的False
use_doc_orientation_classify/use_doc_unwarping/use_textline_orientation
具体的服务代码如下:

"""
OCR API Service - 基于 PaddleOCR ppocr_v5
支持 100 并发,接受 base64_str 输入
启动命令: uvicorn ocr_service:app --host 0.0.0.0 --port 8000 --workers 4
"""
import asyncio
import base64
import io
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from typing import Optional

import threading

import numpy as np
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from paddleocr import PaddleOCR
from PIL import Image
from pydantic import BaseModel, Field

# ─────────────────────────── 日志配置 ────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger("ocr_service")


# ─────────────────────────── 配置项 ──────────────────────────────
class Settings:
    # OCR 模型配置(ppocr_v5 新版参数)
    OCR_LANG: str = "ch"                        # 识别语言:ch / en / japan 等
    OCR_DEVICE: str = "gpu"                     # 推理设备:'gpu' 或 'cpu'
    OCR_USE_DOC_ORIENTATION: bool = False       # 文档方向分类(一般关闭提速)
    OCR_USE_DOC_UNWARPING: bool = False         # 文档矫正(一般关闭提速)
    OCR_USE_TEXTLINE_ORIENTATION: bool = False  # 文本行方向分类(一般关闭提速)

    # 并发配置
    THREAD_POOL_SIZE: int = 16          # 线程池大小(CPU 核数 × 4 为宜)
    MAX_CONCURRENT_REQUESTS: int = 100  # 最大并发数(信号量控制)
    REQUEST_TIMEOUT: int = 60          # 单请求超时(秒)

    # 图片限制
    MAX_IMAGE_SIZE_MB: int = 10         # 最大图片大小

settings = Settings()
# ─────────────────────────── 全局资源 ────────────────────────────
class AppState:
    ocr_engine: Optional[PaddleOCR] = None
    executor: Optional[ThreadPoolExecutor] = None
    semaphore: Optional[asyncio.Semaphore] = None
    ocr_lock: threading.Lock = threading.Lock()  # ← 修复线程安全问题


state = AppState()


# ─────────────────────────── 启动 / 关闭 ─────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理:启动时初始化模型,关闭时释放资源"""
    logger.info("🚀 正在初始化 PaddleOCR 引擎 (ppocr_v5)...")
    t0 = time.perf_counter()

    # PaddleOCR 不是线程安全的,使用单例 + 线程池串行调用
    # ppocr_v5 新版初始化参数(对应图示)
    state.ocr_engine = PaddleOCR(
        use_doc_orientation_classify=settings.OCR_USE_DOC_ORIENTATION,
        use_doc_unwarping=settings.OCR_USE_DOC_UNWARPING,
        use_textline_orientation=settings.OCR_USE_TEXTLINE_ORIENTATION,
        lang=settings.OCR_LANG,
        device=settings.OCR_DEVICE,   # 'gpu' 使用 GPU 推理
    )
    # 线程池:PaddleOCR 推理是阻塞 CPU/GPU 操作,放到线程池避免阻塞事件循环
    state.executor = ThreadPoolExecutor(
        max_workers=settings.THREAD_POOL_SIZE,
        thread_name_prefix="ocr_worker",
    )

    # 信号量:控制最大并发数
    state.semaphore = asyncio.Semaphore(settings.MAX_CONCURRENT_REQUESTS)
    elapsed = time.perf_counter() - t0
    logger.info(f"✅ OCR 引擎初始化完成,耗时 {elapsed:.2f}s")
    yield  # ← 应用运行中
    logger.info("🛑 正在关闭 OCR 服务...")
    state.executor.shutdown(wait=False)
    logger.info("👋 资源已释放")


# ─────────────────────────── FastAPI App ─────────────────────────
app = FastAPI(
    title="OCR API Service",
    description="基于 PaddleOCR ppocr_v5 的高并发 OCR 识别服务",
    version="1.0.0",
    lifespan=lifespan,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["POST", "GET"],
    allow_headers=["*"],
)


# ─────────────────────────── 请求 / 响应模型 ──────────────────────
class OCRRequest(BaseModel):
    base64_str: str = Field(..., description="图片的 base64 编码字符串")
    task: str = Field(default="ocr", description="任务类型,当前仅支持 'ocr'")


class OCRResponse(BaseModel):
    result: str = Field(..., description="OCR 识别结果文本")
    elapsed_ms: float = Field(..., description="推理耗时(毫秒)")


# ─────────────────────────── 核心推理函数 ─────────────────────────
def _run_ocr(image_np: np.ndarray) -> str:
    """
    在线程池中执行 OCR 推理(阻塞)

    ppocr_v5 (PaddleX 新版) 返回格式:
    [
      {
        'rec_texts':  ['文字1', '文字2', ...],
        'rec_scores': [0.99, 0.98, ...],
        'dt_polys':   [...],
        ...
      }
    ]
    """
    with state.ocr_lock:   # ← PaddleOCR 非线程安全,必须串行调用
        results = state.ocr_engine.ocr(image_np)
    logger.info(f"OCR raw result type={type(results)}, value={str(results)[:300]}")

    if not results:
        return ""

    lines = []
    for item in results:
        # ── 新版 PaddleX 格式:rec_texts 是列表 ──
        if isinstance(item, dict):
            texts = item.get("rec_texts") or item.get("rec_text") or []
            if isinstance(texts, list):
                lines.extend([t for t in texts if t])
            elif isinstance(texts, str) and texts:
                lines.append(texts)

        # ── 旧版格式:[[bbox, (text, conf)], ...] ──
        elif isinstance(item, (list, tuple)):
            for line in item:
                if line and len(line) >= 2:
                    text_info = line[1]
                    if isinstance(text_info, (list, tuple)) and text_info:
                        lines.append(text_info[0])
                    elif isinstance(text_info, str):
                        lines.append(text_info)
    return "\n".join(lines)

def _decode_base64_image(base64_str: str) -> np.ndarray:
    """
    将 base64 字符串解码为 numpy 图像数组
    支持带 data:image/xxx;base64, 前缀的格式
    """
    # 去除可能存在的 data URL 前缀
    if "," in base64_str:
        base64_str = base64_str.split(",", 1)[1]

    img_bytes = base64.b64decode(base64_str)
    # 检查图片大小
    size_mb = len(img_bytes) / (1024 * 1024)
    if size_mb > settings.MAX_IMAGE_SIZE_MB:
        raise ValueError(f"图片大小 {size_mb:.1f}MB 超出限制 {settings.MAX_IMAGE_SIZE_MB}MB")

    img = Image.open(io.BytesIO(img_bytes)).convert("RGB")
    return np.array(img)

# ─────────────────────────── 路由 ─────────────────────────────────
@app.post("/ocr", response_model=OCRResponse, summary="OCR 识别接口")
async def ocr_endpoint(request: OCRRequest):
    """
    接受 base64 编码的图片,返回 OCR 识别文本。
    
    - **base64_str**: 图片 base64 字符串(支持带 data URL 前缀)
    - **task**: 任务类型(目前固定为 ocr)
    """
    if request.task != "ocr":
        raise HTTPException(status_code=400, detail=f"不支持的任务类型: {request.task}")

    async with state.semaphore:  # 控制并发数上限
        t0 = time.perf_counter()
        loop = asyncio.get_event_loop()

        try:
            # 1. 解码图片(CPU 密集,放线程池)
            image_np = await loop.run_in_executor(
                state.executor,
                _decode_base64_image,
                request.base64_str,
            )

            # 2. OCR 推理(放线程池,不阻塞事件循环)
            text = await asyncio.wait_for(
                loop.run_in_executor(state.executor, _run_ocr, image_np),
                timeout=settings.REQUEST_TIMEOUT,
            )
        except asyncio.TimeoutError:
            raise HTTPException(status_code=504, detail="OCR 推理超时")
        except ValueError as e:
            raise HTTPException(status_code=400, detail=str(e))
        except Exception as e:
            logger.exception("OCR 推理异常")
            raise HTTPException(status_code=500, detail=f"推理失败: {str(e)}")

        elapsed_ms = (time.perf_counter() - t0) * 1000
        logger.info(f"OCR 完成,耗时 {elapsed_ms:.1f}ms,文本长度 {len(text)}")

        return OCRResponse(result=text, elapsed_ms=round(elapsed_ms, 2))

@app.get("/health", summary="健康检查")
async def health_check():
    """服务健康检查,确认 OCR 引擎已就绪"""
    if state.ocr_engine is None:
        raise HTTPException(status_code=503, detail="OCR 引擎未初始化")
    return {"status": "ok", "engine": "PaddleOCR ppocr_v5"}

@app.get("/", include_in_schema=False)
async def root():
    return {"message": "OCR Service Running. POST /ocr to use."}

# ─────────────────────────── 全局异常处理 ─────────────────────────
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error(f"未捕获异常: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"detail": "服务器内部错误"},
    )
# ─────────────────────────── 入口 ─────────────────────────────────
if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", type=int, default=8000, help="监听端口")
    parser.add_argument("--host", type=str, default="0.0.0.0", help="监听地址")
    args = parser.parse_args()

    uvicorn.run(
        "ocr_service:app",
        host=args.host,
        port=args.port,
        workers=1,
        loop="uvloop",
        http="httptools",
        log_level="info",
        access_log=True,
    )

启动时:

CUDA_VISIBLE_DEVICES=1 nohup python ocr_service.py  --port 8008 > logs/ocr_8008.log 2>&1 &

负载均衡

单独服务不够稳定所以要用ng进行多节点配置

# /etc/nginx/conf.d/ocr.conf
# 3个 OCR 实例负载均衡

upstream ocr_backend {
    least_conn;
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
    server 127.0.0.1:8003;
    keepalive 64;
}

server {
    listen 6666;

    location /ocr {
        proxy_pass         http://ocr_backend;
        proxy_http_version 1.1;
        proxy_set_header   Connection "";
        proxy_set_header   Host $host;

        proxy_read_timeout  120s;    # OCR 推理最长等待
        proxy_send_timeout  120s;
        proxy_connect_timeout 5s;

        # 失败自动切换到下一个实例
        proxy_next_upstream error timeout http_500;
        proxy_next_upstream_tries 2;
    }

    location /health {
        proxy_pass http://ocr_backend;
    }
}

然后运行更新nginx配置

# 更新 nginx 配置
sudo cp nginx_ocr.conf /etc/nginx/conf.d/ocr.conf
sudo nginx -s reload

然后我们要启动多节点服务

#!/bin/bash
# 启动多个 OCR 实例 —— 5张 A40,每实例独占一张卡
# 用法:
#   bash start_ocr.sh          # 启动5实例
#   bash start_ocr.sh stop     # 停止所有实例

LOG_DIR="./logs"
mkdir -p "$LOG_DIR"

start_all() {
    echo "🚀 启动模式:3个实例共享 GPU 3(剩余显存约43GB,足够)"

    CUDA_VISIBLE_DEVICES=3 nohup python ocr_service.py \
        --port 8001 > "$LOG_DIR/ocr_8001.log" 2>&1 &
    echo "  ✅ 实例1  GPU:3  port=8001  PID=$!"

    sleep 15   # 等第一个实例模型加载完再启动下一个,避免同时抢显存

    CUDA_VISIBLE_DEVICES=3 nohup python ocr_service.py \
        --port 8002 > "$LOG_DIR/ocr_8002.log" 2>&1 &
    echo "  ✅ 实例2  GPU:3  port=8002  PID=$!"

    sleep 15

    CUDA_VISIBLE_DEVICES=3 nohup python ocr_service.py \
        --port 8003 > "$LOG_DIR/ocr_8003.log" 2>&1 &
    echo "  ✅ 实例3  GPU:3  port=8003  PID=$!"
}

stop_all() {
    echo "🛑 停止所有 OCR 实例..."
    pkill -f "ocr_service.py" && echo "  ✅ 已停止" || echo "  ⚠️  未找到运行中的实例"
}

wait_ready() {
    echo ""
    echo "⏳ 等待服务就绪(模型加载约 10-20 秒)..."
    for port in 8001 8002 8003; do
        for i in $(seq 1 30); do
            if curl -sf "http://localhost:$port/health" > /dev/null 2>&1; then
                echo "  ✅ port=$port 就绪"
                break
            fi
            sleep 2
        done
    done
    echo ""
    echo "🎉 所有实例已就绪!统一入口: http://localhost:8000/ocr(经过Nginx)"
}

case "$1" in
    stop) stop_all ;;
    *)
        stop_all 2>/dev/null
        start_all
        wait_ready ;;
esac```

### 三 测试
### 单测
```python
"""
快速测试 OCR 是否能识别文字
用法: python quick_test.py [图片路径]
"""
import base64
import sys
import requests

URL = "http://localhost:8000/ocr"

# 读取图片
image_path = sys.argv[1] if len(sys.argv) > 1 else "test.jpg"

with open(image_path, "rb") as f:
    b64 = base64.b64encode(f.read()).decode()

# 发送请求
resp = requests.post(URL, json={"base64_str": b64, "task": "ocr"})
print("状态码:", resp.status_code)
print("识别结果:\n", resp.json().get("result", resp.text))

压力测试

如何要提供生产稳定的服务,肯定要做压力测试

"""
OCR 服务压测脚本
用法: python stress_test.py --image test.jpg --concurrency 100 --total 500
"""
import argparse
import asyncio
import base64
import sys
import time
import statistics
from collections import defaultdict

import aiohttp

URL = "http://localhost:8000/ocr"


def load_image(path: str) -> str:
    with open(path, "rb") as f:
        return base64.b64encode(f.read()).decode()


async def single_request(session, b64: str, idx: int) -> dict:
    t0 = time.perf_counter()
    try:
        async with session.post(
            URL,
            json={"base64_str": b64, "task": "ocr"},
            timeout=aiohttp.ClientTimeout(total=60),
        ) as resp:
            elapsed = (time.perf_counter() - t0) * 1000
            data = await resp.json()
            return {
                "idx": idx,
                "ok": resp.status == 200,
                "status": resp.status,
                "elapsed_ms": elapsed,
                "text_len": len(data.get("result", "")),
            }
    except Exception as e:
        elapsed = (time.perf_counter() - t0) * 1000
        return {"idx": idx, "ok": False, "status": -1, "elapsed_ms": elapsed, "error": str(e)}


def print_stats(results: list, total_ms: float, concurrency: int):
    ok = [r for r in results if r["ok"]]
    fail = [r for r in results if not r["ok"]]
    latencies = sorted(r["elapsed_ms"] for r in ok)

    def pct(n):
        if not latencies:
            return 0
        idx = min(int(len(latencies) * n / 100), len(latencies) - 1)
        return latencies[idx]

    print("\n" + "━" * 52)
    print(f"  📊 压测结果汇总")
    print("━" * 52)
    print(f"  总请求数     : {len(results)}")
    print(f"  并发数       : {concurrency}")
    print(f"  成功         : {len(ok)}  ({100*len(ok)//len(results)}%)")
    print(f"  失败         : {len(fail)}")
    print(f"  总耗时       : {total_ms/1000:.2f}s")
    print(f"  吞吐量       : {len(results) / (total_ms/1000):.1f} req/s")
    print("─" * 52)
    if latencies:
        print(f"  延迟 avg     : {statistics.mean(latencies):.0f} ms")
        print(f"  延迟 p50     : {pct(50):.0f} ms")
        print(f"  延迟 p90     : {pct(90):.0f} ms")
        print(f"  延迟 p95     : {pct(95):.0f} ms")
        print(f"  延迟 p99     : {pct(99):.0f} ms")
        print(f"  延迟 max     : {max(latencies):.0f} ms")
        print(f"  延迟 min     : {min(latencies):.0f} ms")
    print("─" * 52)
    if fail:
        err_counts = defaultdict(int)
        for r in fail:
            err_counts[r.get("error", f"HTTP {r['status']}")] += 1
        print("  ❌ 失败原因:")
        for msg, cnt in err_counts.items():
            print(f"     {cnt}x  {msg[:60]}")
    print("━" * 52)


async def run(image_path: str, concurrency: int, total: int):
    print(f"🚀 开始压测")
    print(f"   图片: {image_path}")
    print(f"   并发: {concurrency}  总请求: {total}")

    b64 = load_image(image_path)
    print(f"   图片大小: {len(b64)//1024} KB (base64)\n")

    connector = aiohttp.TCPConnector(limit=concurrency + 20)
    sem = asyncio.Semaphore(concurrency)
    results = []
    done = 0

    async def bounded(session, idx):
        nonlocal done
        async with sem:
            r = await single_request(session, b64, idx)
            results.append(r)
            done += 1
            # 进度打印
            if done % max(1, total // 10) == 0 or done == total:
                ok_so_far = sum(1 for x in results if x["ok"])
                avg = statistics.mean(r["elapsed_ms"] for r in results if r["ok"]) if ok_so_far else 0
                print(f"  进度: {done}/{total}  成功: {ok_so_far}  avg延迟: {avg:.0f}ms")

    async with aiohttp.ClientSession(connector=connector) as session:
        t0 = time.perf_counter()
        await asyncio.gather(*[bounded(session, i) for i in range(total)])
        total_ms = (time.perf_counter() - t0) * 1000

    print_stats(results, total_ms, concurrency)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--image", required=True, help="测试图片路径")
    parser.add_argument("--concurrency", type=int, default=20, help="并发数(默认20)")
    parser.add_argument("--total", type=int, default=100, help="总请求数(默认100)")
    args = parser.parse_args()

    asyncio.run(run(args.image, args.concurrency, args.total))

运行方法

python stress_test.py --image /paddle/test.png --concurrency 30 --total 500

以上,完成了一个基础ocr服务的部署和测试,后续可以比较稳定的使用

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐