WebAPI扩展实践:SenseVoice-Small ONNX后端服务接口开发教程

1. 引言

语音识别技术正变得越来越普及,从智能助手到会议纪要,它正在改变我们与机器交互的方式。然而,对于开发者而言,将先进的语音识别能力集成到自己的应用中,常常面临几个现实挑战:模型太大、部署复杂、接口不友好,以及隐私数据的安全顾虑。

今天,我们将一起动手,将一个轻量、高效、完全本地运行的语音识别工具——SenseVoice-Small ONNX,从简单的可视化界面,扩展成一个功能完备的后端WebAPI服务。这个服务将允许你通过标准的HTTP请求,轻松地将语音识别能力集成到你的网站、移动应用或任何需要语音转文字功能的系统中。

通过本教程,你将学会如何:

  • 理解SenseVoice-Small ONNX工具的核心架构与数据处理流程
  • 使用FastAPI框架构建一个高性能、异步的WebAPI后端
  • 设计合理、易用的RESTful接口,支持文件上传、状态查询和结果获取
  • 实现音频文件的异步处理与任务队列管理
  • 为你的API添加基础的监控、日志和错误处理机制

无论你是想为自己的产品添加语音输入功能,还是希望构建一个供团队内部使用的语音处理服务,这篇教程都将为你提供一条清晰的实践路径。我们从一个已经可以运行的Streamlit应用出发,一步步将其“后端化”,整个过程注重代码的清晰度和可维护性。

2. 项目核心架构解析

在开始编写代码之前,我们需要深入理解现有SenseVoice-Small ONNX工具的内部工作原理。这就像盖房子前要先看明白图纸,知道承重墙在哪里,管道如何走线。

2.1 核心组件与数据流

现有的Streamlit工具虽然界面简单,但其后台逻辑已经包含了语音识别的一个完整流水线。我们可以将其抽象为以下几个核心模块:

  1. 模型加载器 (Model Loader):负责在服务启动时,从指定路径加载Int8量化后的SenseVoiceSmall主模型和CT-Transformer标点模型。
  2. 音频预处理器 (Audio Preprocessor):接收用户上传的音频文件(WAV, MP3等),将其转换为模型推理所需的格式(如采样率16kHz的PCM数据)。
  3. 推理引擎 (Inference Engine):这是核心,调用已加载的模型进行语音识别。它依次执行:
    • 自动语种识别:判断音频中的语言。
    • 语音转文本:生成原始的识别文本。
    • 逆文本正则化 (ITN):将“一百二十三”这样的口语表述转换为“123”。
  4. 后处理器 (Post-Processor):对推理引擎的原始输出进行加工。
    • 文本清洗:移除模型可能输出的富文本标签等无关字符。
    • 标点恢复:调用CT-Transformer模型,为文本添加逗号、句号等标点,极大提升可读性。
  5. 结果返回器 (Result Returner):将最终处理好的、带标点的文本返回给用户。

在Streamlit版本中,这些模块的调用是线性、同步的:上传文件 -> 点击按钮 -> 依次执行上述所有步骤 -> 页面显示结果。而在WebAPI版本中,我们需要将其改造为异步、任务驱动的架构,以支持并发请求和长时间任务的处理。

2.2 从同步到异步:架构演进思路

将工具升级为WebAPI服务,关键在于解耦“请求接收”和“任务执行”。我们不能让用户的一个HTTP请求一直等待整个语音识别流程完成(尤其是长音频),这会导致请求超时和糟糕的用户体验。

我们的新架构将引入两个核心概念:

  • 异步端点 (Async Endpoint):API接口收到请求后,立即返回一个“任务ID”,告诉用户“任务已接收,正在处理”,而不是最终结果。
  • 任务队列与后台Worker:识别任务被放入一个队列(如内存队列或Redis),由后台的工作进程(Worker)异步地从队列中取出任务并执行。Worker会严格按顺序执行我们上面分析的识别流水线。

这样,前端应用可以先快速拿到任务ID,然后通过另一个接口,轮询查询这个ID对应的任务状态和结果。这种模式非常适合处理耗时不确定的AI推理任务。

接下来,我们就开始用代码实现这个新的架构。

3. 使用FastAPI构建WebAPI后端

FastAPI是一个现代、快速(高性能)的Python Web框架,特别适合构建API。它基于标准Python类型提示,自动生成交互式API文档,并且原生支持异步编程,这与我们的需求完美契合。

3.1 项目初始化与依赖安装

首先,创建一个新的项目目录,并建立虚拟环境。

# 创建项目目录
mkdir sensevoice-api-server
cd sensevoice-api-server

# 创建虚拟环境(推荐使用Python 3.8+)
python -m venv venv

# 激活虚拟环境
# Windows:
venv\Scripts\activate
# Linux/Mac:
source venv/bin/activate

# 安装核心依赖
pip install fastapi uvicorn python-multipart
pip install funasr
pip install modelscope

接下来,创建我们的项目文件结构。一个清晰的结构有助于长期维护。

sensevoice-api-server/
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI应用主文件
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py    # 配置文件
│   │   └── models.py    # 数据模型(Pydantic)
│   ├── api/
│   │   ├── __init__.py
│   │   └── endpoints/
│   │       ├── __init__.py
│   │       ├── tasks.py # 任务相关接口
│   │       └── health.py # 健康检查接口
│   ├── services/
│   │   ├── __init__.py
│   │   ├── inference.py # 核心推理服务
│   │   └── task_manager.py # 任务管理服务
│   └── utils/
│       ├── __init__.py
│       ├── audio_utils.py # 音频处理工具
│       └── file_utils.py  # 文件操作工具
├── models/              # 存放本地模型文件(如果需要)
├── tasks/               # 临时任务文件存储
├── requirements.txt
└── README.md

3.2 核心服务层实现

服务层是业务逻辑的核心。我们先实现最关键的推理服务。

文件:app/services/inference.py 这个文件封装了原始的语音识别逻辑,使其可以被API层调用。

import os
import tempfile
import logging
from pathlib import Path
from typing import Optional, Tuple, Dict, Any

from funasr import AutoModel

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SenseVoiceInferenceService:
    """
    SenseVoice-Small ONNX 模型推理服务
    封装了模型加载、音频推理、后处理等核心逻辑
    """
    
    def __init__(self, model_dir: str = "./models"):
        """
        初始化推理服务
        
        Args:
            model_dir: 本地模型文件目录路径
        """
        self.model_dir = Path(model_dir)
        self.model = None
        self._model_loaded = False
        logger.info(f"初始化推理服务,模型目录: {model_dir}")
        
    def load_model(self):
        """加载语音识别模型"""
        if self._model_loaded:
            logger.info("模型已加载,跳过重复加载")
            return
            
        try:
            # 构建模型路径,假设模型文件位于 model_dir 下
            # 根据你的实际模型文件结构调整路径
            model_path = str(self.model_dir / "sensevoice_small_int8.onnx")
            
            # 初始化FunASR模型管道
            # 注意:这里需要根据你实际的模型配置调整参数
            self.model = AutoModel(
                model=model_path,
                model_revision="v1.0.0",  # 模型版本
                vad_model="fsmn-vad",      # 语音活动检测模型
                vad_model_revision="v1.0.0",
                punc_model="ct-punc",      # 标点模型
                punc_model_revision="v1.0.0",
                # 启用量化模式
                quantize=True,
                # 批处理大小设为1,适合API单请求场景
                batch_size=1,
                # 设备类型,自动选择可用设备
                device="cuda" if torch.cuda.is_available() else "cpu",
                # 启用逆文本正则化
                use_itn=True,
            )
            
            self._model_loaded = True
            logger.info("SenseVoice-Small ONNX 模型加载成功")
            
        except Exception as e:
            logger.error(f"模型加载失败: {e}")
            raise RuntimeError(f"无法加载语音识别模型: {e}")
    
    def transcribe_audio(
        self, 
        audio_path: str, 
        language: str = "auto"
    ) -> Dict[str, Any]:
        """
        转录音频文件
        
        Args:
            audio_path: 音频文件路径
            language: 语言设置,"auto"为自动检测
            
        Returns:
            包含转录结果和元数据的字典
        """
        if not self._model_loaded:
            self.load_model()
            
        if not os.path.exists(audio_path):
            raise FileNotFoundError(f"音频文件不存在: {audio_path}")
            
        try:
            logger.info(f"开始转录音频: {audio_path}, 语言: {language}")
            
            # 执行语音识别
            # 注意:resample_fn 用于音频重采样,确保输入格式正确
            result = self.model.generate(
                input=audio_path,
                language=language,
                cache={},  # 可选的缓存,用于提升长音频处理性能
            )
            
            # 解析结果
            # FunASR返回的结果结构可能需要根据实际输出调整
            if result and len(result) > 0:
                # 假设result[0]包含文本和可能的其他信息
                text_result = result[0].get("text", "") if isinstance(result[0], dict) else result[0]
                
                # 后处理:清理文本(移除可能的HTML标签等)
                clean_text = self._clean_text(text_result)
                
                return {
                    "success": True,
                    "text": clean_text,
                    "language": result[0].get("lang", "unknown") if isinstance(result[0], dict) else "unknown",
                    "audio_duration": self._get_audio_duration(audio_path),
                    "model_info": {
                        "name": "SenseVoice-Small-ONNX-Int8",
                        "quantized": True
                    }
                }
            else:
                return {
                    "success": False,
                    "error": "模型未返回有效结果",
                    "text": ""
                }
                
        except Exception as e:
            logger.error(f"音频转录失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "text": ""
            }
    
    def _clean_text(self, text: str) -> str:
        """清理识别文本,移除不必要的标签和字符"""
        if not text:
            return text
            
        # 移除可能的HTML标签
        import re
        clean = re.sub(r'<[^>]+>', '', text)
        # 移除多余的空格和换行
        clean = ' '.join(clean.split())
        return clean
    
    def _get_audio_duration(self, audio_path: str) -> float:
        """获取音频时长(秒)"""
        try:
            import librosa
            duration = librosa.get_duration(path=audio_path)
            return round(duration, 2)
        except:
            # 如果librosa不可用,返回估计值或0
            return 0.0
    
    def is_ready(self) -> bool:
        """检查服务是否就绪"""
        return self._model_loaded and self.model is not None

文件:app/services/task_manager.py 这个类负责管理异步任务,是连接API请求和后台Worker的桥梁。

import uuid
import asyncio
import logging
from datetime import datetime
from enum import Enum
from typing import Dict, Optional, Any
from concurrent.futures import ThreadPoolExecutor

from app.services.inference import SenseVoiceInferenceService

logger = logging.getLogger(__name__)

class TaskStatus(str, Enum):
    """任务状态枚举"""
    PENDING = "pending"      # 等待中
    PROCESSING = "processing" # 处理中
    COMPLETED = "completed"  # 已完成
    FAILED = "failed"        # 失败

class TranscriptionTask:
    """转录任务类"""
    
    def __init__(self, task_id: str, audio_path: str, language: str = "auto"):
        self.task_id = task_id
        self.audio_path = audio_path
        self.language = language
        self.status = TaskStatus.PENDING
        self.result: Optional[Dict[str, Any]] = None
        self.error: Optional[str] = None
        self.created_at = datetime.now()
        self.started_at: Optional[datetime] = None
        self.completed_at: Optional[datetime] = None
        self.progress: float = 0.0  # 进度 0-1

class TaskManager:
    """
    任务管理器
    负责创建、跟踪和管理语音识别任务
    """
    
    def __init__(self, inference_service: SenseVoiceInferenceService, max_workers: int = 2):
        """
        初始化任务管理器
        
        Args:
            inference_service: 推理服务实例
            max_workers: 最大并发工作线程数
        """
        self.inference_service = inference_service
        self.tasks: Dict[str, TranscriptionTask] = {}
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self._loop = asyncio.get_event_loop()
        logger.info(f"任务管理器初始化完成,最大工作线程: {max_workers}")
    
    def create_task(self, audio_path: str, language: str = "auto") -> str:
        """
        创建新的转录任务
        
        Args:
            audio_path: 音频文件路径
            language: 语言设置
            
        Returns:
            任务ID
        """
        task_id = str(uuid.uuid4())
        task = TranscriptionTask(task_id, audio_path, language)
        self.tasks[task_id] = task
        
        # 异步执行任务
        asyncio.create_task(self._execute_task(task))
        
        logger.info(f"创建新任务: {task_id}, 音频: {audio_path}")
        return task_id
    
    async def _execute_task(self, task: TranscriptionTask):
        """在后台线程中执行转录任务"""
        task.status = TaskStatus.PROCESSING
        task.started_at = datetime.now()
        task.progress = 0.1  # 开始处理
        
        try:
            # 在线程池中执行阻塞的推理操作
            result = await self._loop.run_in_executor(
                self.executor,
                self._run_inference,
                task
            )
            
            task.result = result
            task.status = TaskStatus.COMPLETED
            task.progress = 1.0
            logger.info(f"任务完成: {task.task_id}")
            
        except Exception as e:
            task.status = TaskStatus.FAILED
            task.error = str(e)
            logger.error(f"任务失败 {task.task_id}: {e}")
            
        finally:
            task.completed_at = datetime.now()
    
    def _run_inference(self, task: TranscriptionTask) -> Dict[str, Any]:
        """执行实际的语音识别推理"""
        # 更新进度
        task.progress = 0.3
        
        # 调用推理服务
        result = self.inference_service.transcribe_audio(
            audio_path=task.audio_path,
            language=task.language
        )
        
        # 更新进度
        task.progress = 0.8
        
        return result
    
    def get_task(self, task_id: str) -> Optional[TranscriptionTask]:
        """根据任务ID获取任务"""
        return self.tasks.get(task_id)
    
    def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
        """获取任务状态信息"""
        task = self.get_task(task_id)
        if not task:
            return None
            
        return {
            "task_id": task.task_id,
            "status": task.status.value,
            "progress": task.progress,
            "created_at": task.created_at.isoformat() if task.created_at else None,
            "started_at": task.started_at.isoformat() if task.started_at else None,
            "completed_at": task.completed_at.isoformat() if task.completed_at else None,
            "has_result": task.result is not None,
            "error": task.error
        }
    
    def cleanup_old_tasks(self, max_age_hours: int = 24):
        """清理超过指定时间的老任务"""
        now = datetime.now()
        to_delete = []
        
        for task_id, task in self.tasks.items():
            if task.completed_at:
                age = (now - task.completed_at).total_seconds() / 3600
                if age > max_age_hours:
                    to_delete.append(task_id)
        
        for task_id in to_delete:
            del self.tasks[task_id]
        
        if to_delete:
            logger.info(f"清理了 {len(to_delete)} 个老任务")

3.3 API接口层设计

现在,我们使用FastAPI来创建对外提供服务的HTTP接口。

文件:app/core/models.py 使用Pydantic定义清晰的数据模型,这有助于自动生成API文档和请求验证。

from pydantic import BaseModel, Field
from typing import Optional, List
from enum import Enum

class LanguageOption(str, Enum):
    """支持的语言选项"""
    AUTO = "auto"
    ZH = "zh"  # 中文
    EN = "en"  # 英文
    # 可根据需要添加其他语言代码

class TranscriptionRequest(BaseModel):
    """语音转录请求模型"""
    language: LanguageOption = Field(
        default=LanguageOption.AUTO,
        description="识别语言,'auto'为自动检测"
    )
    
    class Config:
        schema_extra = {
            "example": {
                "language": "auto"
            }
        }

class TaskStatusResponse(BaseModel):
    """任务状态响应模型"""
    task_id: str = Field(..., description="任务唯一ID")
    status: str = Field(..., description="任务状态: pending/processing/completed/failed")
    progress: float = Field(..., description="处理进度 0-1")
    created_at: Optional[str] = Field(None, description="任务创建时间")
    started_at: Optional[str] = Field(None, description="任务开始处理时间")
    completed_at: Optional[str] = Field(None, description="任务完成时间")
    error: Optional[str] = Field(None, description="错误信息(如果有)")

class TranscriptionResult(BaseModel):
    """转录结果模型"""
    success: bool = Field(..., description="是否成功")
    text: str = Field(..., description="转录文本")
    language: Optional[str] = Field(None, description="检测到的语言")
    audio_duration: Optional[float] = Field(None, description="音频时长(秒)")
    model_info: Optional[dict] = Field(None, description="模型信息")
    error: Optional[str] = Field(None, description="错误信息")

class HealthResponse(BaseModel):
    """健康检查响应模型"""
    status: str = Field(..., description="服务状态")
    model_loaded: bool = Field(..., description="模型是否已加载")
    total_tasks: int = Field(..., description="总任务数")
    active_tasks: int = Field(..., description="活跃任务数")
    uptime: Optional[float] = Field(None, description="服务运行时间(秒)")

文件:app/api/endpoints/tasks.py 实现具体的任务相关API端点。

import os
import tempfile
import logging
from pathlib import Path
from typing import List

from fastapi import APIRouter, UploadFile, File, Form, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse

from app.core.models import (
    TranscriptionRequest, 
    TaskStatusResponse, 
    TranscriptionResult,
    LanguageOption
)
from app.services.task_manager import TaskManager, TaskStatus
from app.utils.file_utils import save_upload_file

# 创建路由
router = APIRouter(prefix="/tasks", tags=["tasks"])

logger = logging.getLogger(__name__)

# 依赖注入:在实际应用中,这些应该通过FastAPI的Depends注入
# 这里为了简化,我们假设已经初始化好了
from app.main import get_task_manager
task_manager = get_task_manager()

@router.post("/transcribe", response_model=dict)
async def create_transcription_task(
    background_tasks: BackgroundTasks,
    audio_file: UploadFile = File(..., description="音频文件 (WAV, MP3, M4A等)"),
    language: LanguageOption = Form(LanguageOption.AUTO, description="识别语言")
):
    """
    创建语音转录任务
    
    - **audio_file**: 上传的音频文件
    - **language**: 识别语言,默认为自动检测
    
    返回任务ID,用于查询任务状态和结果
    """
    # 验证文件类型
    allowed_extensions = {'.wav', '.mp3', '.m4a', '.ogg', '.flac', '.aac'}
    file_ext = Path(audio_file.filename).suffix.lower() if audio_file.filename else ''
    
    if file_ext not in allowed_extensions:
        raise HTTPException(
            status_code=400,
            detail=f"不支持的文件格式。支持格式: {', '.join(allowed_extensions)}"
        )
    
    # 保存上传的文件到临时目录
    try:
        # 创建临时目录(如果不存在)
        temp_dir = Path("tasks/temp")
        temp_dir.mkdir(parents=True, exist_ok=True)
        
        # 保存文件
        temp_file_path = await save_upload_file(audio_file, temp_dir)
        
        logger.info(f"文件上传成功: {audio_file.filename} -> {temp_file_path}")
        
        # 创建转录任务
        task_id = task_manager.create_task(
            audio_path=str(temp_file_path),
            language=language.value
        )
        
        # 添加后台任务:处理完成后清理临时文件
        background_tasks.add_task(
            cleanup_temp_file,
            task_id,
            str(temp_file_path)
        )
        
        return {
            "task_id": task_id,
            "message": "转录任务已创建",
            "status_url": f"/tasks/{task_id}/status",
            "result_url": f"/tasks/{task_id}/result"
        }
        
    except Exception as e:
        logger.error(f"创建任务失败: {e}")
        raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")

@router.get("/{task_id}/status", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
    """
    获取任务状态
    
    - **task_id**: 任务ID
    
    返回任务的当前状态和处理进度
    """
    status_info = task_manager.get_task_status(task_id)
    
    if not status_info:
        raise HTTPException(status_code=404, detail="任务不存在")
    
    return status_info

@router.get("/{task_id}/result", response_model=TranscriptionResult)
async def get_task_result(task_id: str):
    """
    获取任务结果
    
    - **task_id**: 任务ID
    
    如果任务完成,返回转录结果;否则返回错误
    """
    task = task_manager.get_task(task_id)
    
    if not task:
        raise HTTPException(status_code=404, detail="任务不存在")
    
    if task.status == TaskStatus.PENDING:
        raise HTTPException(status_code=202, detail="任务等待处理中")
    
    if task.status == TaskStatus.PROCESSING:
        raise HTTPException(status_code=202, detail="任务处理中")
    
    if task.status == TaskStatus.FAILED:
        raise HTTPException(
            status_code=500,
            detail=f"任务处理失败: {task.error}"
        )
    
    if task.status == TaskStatus.COMPLETED and task.result:
        return TranscriptionResult(**task.result)
    
    raise HTTPException(status_code=500, detail="任务状态异常")

@router.get("/", response_model=List[TaskStatusResponse])
async def list_tasks(limit: int = 20, offset: int = 0):
    """
    列出所有任务(用于调试和管理)
    
    - **limit**: 返回的最大任务数
    - **offset**: 偏移量,用于分页
    
    返回任务状态列表
    """
    # 获取所有任务ID
    all_task_ids = list(task_manager.tasks.keys())
    
    # 应用分页
    start = offset
    end = offset + limit
    paginated_ids = all_task_ids[start:end]
    
    # 获取每个任务的状态
    tasks_status = []
    for task_id in paginated_ids:
        status = task_manager.get_task_status(task_id)
        if status:
            tasks_status.append(status)
    
    return tasks_status

async def cleanup_temp_file(task_id: str, file_path: str):
    """
    清理临时文件的后台任务
    
    当转录任务完成后,删除临时音频文件
    """
    import asyncio
    from pathlib import Path
    
    # 等待任务完成
    while True:
        await asyncio.sleep(5)  # 每5秒检查一次
        
        task = task_manager.get_task(task_id)
        if not task:
            # 任务不存在,可能是被清理了
            break
            
        if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
            # 任务完成或失败,清理文件
            try:
                path = Path(file_path)
                if path.exists():
                    path.unlink()
                    logger.info(f"已清理临时文件: {file_path}")
                break
            except Exception as e:
                logger.error(f"清理临时文件失败 {file_path}: {e}")
                break

文件:app/api/endpoints/health.py 健康检查端点,用于监控服务状态。

import time
import logging
from fastapi import APIRouter
from app.core.models import HealthResponse

router = APIRouter(prefix="/health", tags=["health"])

logger = logging.getLogger(__name__)

# 服务启动时间
START_TIME = time.time()

# 依赖注入:在实际应用中,这些应该通过FastAPI的Depends注入
from app.main import get_inference_service, get_task_manager
inference_service = get_inference_service()
task_manager = get_task_manager()

@router.get("/", response_model=HealthResponse)
async def health_check():
    """
    服务健康检查
    
    返回服务状态、模型加载情况、任务统计等信息
    """
    # 计算运行时间
    uptime = time.time() - START_TIME
    
    # 统计任务
    total_tasks = len(task_manager.tasks) if task_manager else 0
    
    # 统计活跃任务(处理中)
    active_tasks = 0
    if task_manager:
        for task in task_manager.tasks.values():
            if task.status.value == "processing":
                active_tasks += 1
    
    return HealthResponse(
        status="healthy",
        model_loaded=inference_service.is_ready() if inference_service else False,
        total_tasks=total_tasks,
        active_tasks=active_tasks,
        uptime=round(uptime, 2)
    )

@router.get("/ready")
async def readiness_check():
    """
    服务就绪检查
    
    检查服务是否准备好接收请求(模型是否加载完成)
    """
    if inference_service and inference_service.is_ready():
        return {"status": "ready"}
    else:
        return {"status": "not_ready"}, 503

3.4 工具函数与配置

文件:app/utils/file_utils.py 处理文件上传和保存的工具函数。

import shutil
import logging
from pathlib import Path
from typing import BinaryIO
from fastapi import UploadFile

logger = logging.getLogger(__name__)

async def save_upload_file(upload_file: UploadFile, destination: Path) -> Path:
    """
    保存上传的文件到指定目录
    
    Args:
        upload_file: FastAPI UploadFile对象
        destination: 目标目录Path对象
        
    Returns:
        保存后的文件完整路径
    """
    try:
        # 确保目标目录存在
        destination.mkdir(parents=True, exist_ok=True)
        
        # 生成安全的文件名
        safe_filename = get_safe_filename(upload_file.filename)
        file_path = destination / safe_filename
        
        # 保存文件
        with open(file_path, "wb") as buffer:
            # 分块读取和写入,避免内存问题
            shutil.copyfileobj(upload_file.file, buffer)
        
        logger.debug(f"文件保存成功: {file_path}")
        return file_path
        
    except Exception as e:
        logger.error(f"保存文件失败: {e}")
        raise
    finally:
        # 确保文件指针关闭
        await upload_file.close()

def get_safe_filename(filename: str) -> str:
    """
    生成安全的文件名,避免路径遍历攻击
    
    Args:
        filename: 原始文件名
        
    Returns:
        安全的文件名
    """
    if not filename:
        return "uploaded_file"
    
    # 只保留文件名部分,移除路径
    safe_name = Path(filename).name
    
    # 可选:添加时间戳避免重名
    # from datetime import datetime
    # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    # safe_name = f"{timestamp}_{safe_name}"
    
    return safe_name

def cleanup_directory(directory: Path, max_age_seconds: int = 3600):
    """
    清理目录中超过指定时间的文件
    
    Args:
        directory: 要清理的目录
        max_age_seconds: 最大文件年龄(秒)
    """
    import time
    import os
    
    if not directory.exists():
        return
    
    current_time = time.time()
    
    for file_path in directory.iterdir():
        if file_path.is_file():
            file_age = current_time - os.path.getmtime(file_path)
            if file_age > max_age_seconds:
                try:
                    file_path.unlink()
                    logger.info(f"清理旧文件: {file_path}")
                except Exception as e:
                    logger.error(f"清理文件失败 {file_path}: {e}")

文件:app/core/config.py 配置文件,集中管理各种设置。

import os
from pathlib import Path
from typing import Optional
from pydantic import BaseSettings

class Settings(BaseSettings):
    """应用配置"""
    
    # 应用设置
    app_name: str = "SenseVoice-Small ONNX API Server"
    app_version: str = "1.0.0"
    debug: bool = False
    
    # 服务器设置
    host: str = "0.0.0.0"
    port: int = 8000
    workers: int = 1
    
    # 模型设置
    model_dir: str = "./models"
    model_path: Optional[str] = None
    
    # 任务设置
    max_workers: int = 2  # 最大并发任务数
    temp_file_ttl: int = 3600  # 临时文件存活时间(秒)
    
    # 日志设置
    log_level: str = "INFO"
    log_file: Optional[str] = None
    
    # CORS设置
    cors_origins: list = ["*"]
    
    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

def get_settings() -> Settings:
    """获取配置实例"""
    return Settings()

# 创建必要的目录
def init_directories():
    """初始化应用所需的目录"""
    directories = [
        "models",
        "tasks/temp",
        "logs"
    ]
    
    for dir_path in directories:
        Path(dir_path).mkdir(parents=True, exist_ok=True)

3.5 主应用入口

文件:app/main.py 这是FastAPI应用的入口点,负责组装所有组件。

import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.core.config import Settings, init_directories
from app.services.inference import SenseVoiceInferenceService
from app.services.task_manager import TaskManager
from app.api.endpoints import tasks, health

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 全局服务实例
_inference_service = None
_task_manager = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理
    
    启动时初始化服务,关闭时清理资源
    """
    global _inference_service, _task_manager
    
    # 启动时
    logger.info("正在启动 SenseVoice-Small ONNX API 服务...")
    
    # 初始化目录
    init_directories()
    
    # 初始化配置
    settings = Settings()
    
    # 初始化推理服务
    try:
        _inference_service = SenseVoiceInferenceService(
            model_dir=settings.model_dir
        )
        _inference_service.load_model()
        logger.info("推理服务初始化完成")
    except Exception as e:
        logger.error(f"推理服务初始化失败: {e}")
        # 根据实际情况决定是否退出
        # raise
    
    # 初始化任务管理器
    if _inference_service:
        _task_manager = TaskManager(
            inference_service=_inference_service,
            max_workers=settings.max_workers
        )
        logger.info("任务管理器初始化完成")
    
    yield  # 应用运行期间
    
    # 关闭时
    logger.info("正在关闭服务...")
    if _task_manager:
        _task_manager.executor.shutdown(wait=True)
        logger.info("任务管理器已关闭")

# 创建FastAPI应用
app = FastAPI(
    title="SenseVoice-Small ONNX API",
    description="基于SenseVoice-Small ONNX量化模型的语音识别API服务",
    version="1.0.0",
    lifespan=lifespan
)

# 添加CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境应限制为具体域名
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(tasks.router)
app.include_router(health.router)

# 根路由
@app.get("/")
async def root():
    """根端点,返回服务信息"""
    return {
        "service": "SenseVoice-Small ONNX API",
        "version": "1.0.0",
        "docs": "/docs",
        "endpoints": {
            "create_task": "POST /tasks/transcribe",
            "check_status": "GET /tasks/{task_id}/status",
            "get_result": "GET /tasks/{task_id}/result",
            "health": "GET /health"
        }
    }

# 获取服务实例的函数(用于依赖注入)
def get_inference_service():
    return _inference_service

def get_task_manager():
    return _task_manager

if __name__ == "__main__":
    import uvicorn
    
    settings = Settings()
    
    uvicorn.run(
        "app.main:app",
        host=settings.host,
        port=settings.port,
        reload=settings.debug,
        workers=settings.workers
    )

4. 部署与使用指南

现在,我们已经完成了WebAPI服务的代码开发。接下来,让我们看看如何部署和使用这个服务。

4.1 环境配置与启动

首先,确保你已经准备好了模型文件。根据原始工具的说明,你需要:

  1. 准备模型文件:将SenseVoiceSmall ONNX量化模型文件放置在./models/目录下。具体的模型文件名和结构可能需要根据你的实际情况调整app/services/inference.py中的加载逻辑。

  2. 安装依赖:创建并激活虚拟环境后,安装所有依赖。

# 安装项目依赖
pip install -r requirements.txt

# requirements.txt 内容示例:
fastapi==0.104.1
uvicorn[standard]==0.24.0
funasr==0.1.0
modelscope==1.11.0
python-multipart==0.0.6
librosa==0.10.1
pydantic==2.5.0
  1. 启动服务:有多种方式可以启动FastAPI服务。

方式一:直接运行(开发环境)

python -m app.main

方式二:使用Uvicorn(生产环境推荐)

# 开发模式(带热重载)
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

# 生产模式(多worker)
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4

方式三:使用Gunicorn + Uvicorn(高并发生产环境)

# 安装gunicorn
pip install gunicorn

# 启动(Linux/Mac)
gunicorn -w 4 -k uvicorn.workers.UvicornWorker app.main:app

# 或使用配置文件
gunicorn -c gunicorn.conf.py app.main:app

服务启动后,你会看到类似下面的输出:

INFO:     Started server process [12345]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

4.2 API接口使用示例

服务启动后,可以通过以下方式使用API:

4.2.1 使用交互式文档

FastAPI自动生成了交互式API文档,访问 http://localhost:8000/docs 即可查看和测试所有接口。

4.2.2 使用cURL命令行测试

1. 创建转录任务

curl -X POST "http://localhost:8000/tasks/transcribe" \
  -H "accept: application/json" \
  -H "Content-Type: multipart/form-data" \
  -F "audio_file=@/path/to/your/audio.wav" \
  -F "language=auto"

成功响应示例:

{
  "task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "message": "转录任务已创建",
  "status_url": "/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/status",
  "result_url": "/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/result"
}

2. 查询任务状态

curl -X GET "http://localhost:8000/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/status" \
  -H "accept: application/json"

响应示例(处理中):

{
  "task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "status": "processing",
  "progress": 0.5,
  "created_at": "2024-01-15T10:30:00",
  "started_at": "2024-01-15T10:30:01",
  "completed_at": null,
  "error": null
}

3. 获取转录结果

curl -X GET "http://localhost:8000/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/result" \
  -H "accept: application/json"

成功响应示例:

{
  "success": true,
  "text": "你好,这是一个测试音频。今天天气真好,适合出去散步。",
  "language": "zh",
  "audio_duration": 5.23,
  "model_info": {
    "name": "SenseVoice-Small-ONNX-Int8",
    "quantized": true
  },
  "error": null
}
4.2.3 使用Python客户端

你也可以编写Python代码来调用这个API服务:

import requests
import time

class SenseVoiceClient:
    def __init__(self, base_url="http://localhost:8000"):
        self.base_url = base_url
    
    def transcribe_audio(self, audio_path, language="auto"):
        """上传音频并创建转录任务"""
        with open(audio_path, 'rb') as f:
            files = {'audio_file': f}
            data = {'language': language}
            
            response = requests.post(
                f"{self.base_url}/tasks/transcribe",
                files=files,
                data=data
            )
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"创建任务失败: {response.text}")
    
    def get_task_status(self, task_id):
        """查询任务状态"""
        response = requests.get(f"{self.base_url}/tasks/{task_id}/status")
        return response.json()
    
    def get_result(self, task_id, wait_for_completion=True, poll_interval=2):
        """获取任务结果,可选等待完成"""
        if wait_for_completion:
            while True:
                status = self.get_task_status(task_id)
                if status['status'] == 'completed':
                    break
                elif status['status'] == 'failed':
                    raise Exception(f"任务失败: {status.get('error', '未知错误')}")
                print(f"任务处理中... 进度: {status['progress']:.0%}")
                time.sleep(poll_interval)
        
        response = requests.get(f"{self.base_url}/tasks/{task_id}/result")
        return response.json()

# 使用示例
if __name__ == "__main__":
    client = SenseVoiceClient()
    
    # 创建任务
    task_info = client.transcribe_audio("test_audio.wav", language="auto")
    task_id = task_info["task_id"]
    print(f"任务已创建: {task_id}")
    
    # 等待并获取结果
    try:
        result = client.get_result(task_id, wait_for_completion=True)
        if result["success"]:
            print(f"识别结果: {result['text']}")
            print(f"音频时长: {result['audio_duration']}秒")
            print(f"检测语言: {result['language']}")
        else:
            print(f"识别失败: {result['error']}")
    except Exception as e:
        print(f"获取结果时出错: {e}")

4.3 生产环境部署建议

对于生产环境,你还需要考虑以下几个方面:

  1. 反向代理:使用Nginx或Apache作为反向代理,处理SSL/TLS、负载均衡和静态文件服务。

Nginx配置示例

server {
    listen 80;
    server_name your-domain.com;
    
    location / {
        proxy_pass http://127.0.0.1:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # WebSocket支持(如果需要)
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}
  1. 进程管理:使用systemd或supervisor管理服务进程,确保服务在崩溃后自动重启。

systemd服务文件示例 (/etc/systemd/system/sensevoice-api.service):

[Unit]
Description=SenseVoice-Small ONNX API Service
After=network.target

[Service]
User=www-data
Group=www-data
WorkingDirectory=/path/to/your/sensevoice-api-server
Environment="PATH=/path/to/venv/bin"
ExecStart=/path/to/venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
  1. 监控与日志:配置日志轮转,集成监控系统(如Prometheus + Grafana)监控API性能。

  2. 安全加固

  • 使用HTTPS(通过Let's Encrypt获取免费证书)
  • 限制CORS来源
  • 添加API密钥认证(如果需要)
  • 设置文件上传大小限制

5. 总结

通过本教程,我们成功地将一个本地的SenseVoice-Small ONNX语音识别工具,扩展成了一个功能完备的WebAPI服务。让我们回顾一下这个过程中的关键收获:

5.1 核心成果

我们构建的API服务具有以下特点:

  1. 完整的RESTful接口:提供了任务创建、状态查询、结果获取的标准HTTP接口,易于集成到任何系统中。
  2. 异步任务处理:采用任务队列模式,支持并发处理,避免长时任务阻塞请求。
  3. 健壮的错误处理:完善的异常捕获和错误信息返回,让客户端能够准确了解处理状态。
  4. 自动资源管理:自动清理临时文件,避免磁盘空间浪费。
  5. 完整的监控支持:健康检查接口让运维人员能够随时了解服务状态。
  6. 交互式文档:FastAPI自动生成的Swagger UI文档,让API测试和调试变得非常简单。

5.2 实际应用场景

这个API服务可以轻松集成到各种应用中:

  • 内容创作平台:为用户提供语音转文字功能,快速生成视频字幕、会议纪要
  • 在线教育系统:将授课音频自动转为文字,方便学生复习和搜索
  • 客服系统:分析客服通话录音,提取关键信息,进行质量检查
  • 智能家居:通过语音指令控制设备,将语音转为文本指令
  • 移动应用:为App添加语音输入功能,提升用户体验

5.3 后续优化方向

虽然我们已经构建了一个可用的API服务,但还有不少可以继续优化的地方:

  1. 性能优化
  • 添加模型预热机制,减少首次推理延迟
  • 实现请求批处理,提升GPU利用率
  • 添加结果缓存,对相同音频文件避免重复处理
  1. 功能增强
  • 支持实时流式语音识别(WebSocket接口)
  • 添加更多音频预处理选项(降噪、音量标准化)
  • 支持自定义词汇表,提升特定领域术语识别准确率
  • 添加语音活动检测(VAD)参数调节
  1. 运维改进
  • 添加Prometheus指标导出,监控QPS、延迟、错误率等
  • 实现灰度发布和A/B测试支持
  • 添加请求限流和配额管理
  1. 用户体验
  • 提供更多客户端SDK(JavaScript、Java、Go等)
  • 添加Web界面,让非技术人员也能轻松使用
  • 支持任务进度WebSocket推送,避免客户端轮询

5.4 开始使用

现在,你已经拥有了一个功能完整的语音识别API服务。你可以:

  1. 直接使用:按照第4部分的指南部署服务,立即开始使用
  2. 二次开发:基于现有代码,添加你需要的特定功能
  3. 集成测试:将API集成到你的应用中,体验语音识别的便利

这个项目展示了如何将一个本地AI工具"服务化"的完整过程。同样的思路也可以应用到其他AI模型上,无论是图像识别、自然语言处理还是其他机器学习模型。希望这个教程对你有所帮助,祝你在AI应用开发的道路上越走越远!


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐