WebAPI扩展实践:SenseVoice-Small ONNX后端服务接口开发教程
本文介绍了如何在星图GPU平台上自动化部署⚡ SenseVoice-Small ONNX语音识别工具,并构建高性能WebAPI服务。该平台简化了部署流程,使开发者能快速搭建语音转文字服务,典型应用场景包括为在线会议、内容创作平台等自动生成字幕或会议纪要,提升工作效率。
WebAPI扩展实践:SenseVoice-Small ONNX后端服务接口开发教程
1. 引言
语音识别技术正变得越来越普及,从智能助手到会议纪要,它正在改变我们与机器交互的方式。然而,对于开发者而言,将先进的语音识别能力集成到自己的应用中,常常面临几个现实挑战:模型太大、部署复杂、接口不友好,以及隐私数据的安全顾虑。
今天,我们将一起动手,将一个轻量、高效、完全本地运行的语音识别工具——SenseVoice-Small ONNX,从简单的可视化界面,扩展成一个功能完备的后端WebAPI服务。这个服务将允许你通过标准的HTTP请求,轻松地将语音识别能力集成到你的网站、移动应用或任何需要语音转文字功能的系统中。
通过本教程,你将学会如何:
- 理解SenseVoice-Small ONNX工具的核心架构与数据处理流程
- 使用FastAPI框架构建一个高性能、异步的WebAPI后端
- 设计合理、易用的RESTful接口,支持文件上传、状态查询和结果获取
- 实现音频文件的异步处理与任务队列管理
- 为你的API添加基础的监控、日志和错误处理机制
无论你是想为自己的产品添加语音输入功能,还是希望构建一个供团队内部使用的语音处理服务,这篇教程都将为你提供一条清晰的实践路径。我们从一个已经可以运行的Streamlit应用出发,一步步将其“后端化”,整个过程注重代码的清晰度和可维护性。
2. 项目核心架构解析
在开始编写代码之前,我们需要深入理解现有SenseVoice-Small ONNX工具的内部工作原理。这就像盖房子前要先看明白图纸,知道承重墙在哪里,管道如何走线。
2.1 核心组件与数据流
现有的Streamlit工具虽然界面简单,但其后台逻辑已经包含了语音识别的一个完整流水线。我们可以将其抽象为以下几个核心模块:
- 模型加载器 (Model Loader):负责在服务启动时,从指定路径加载Int8量化后的SenseVoiceSmall主模型和CT-Transformer标点模型。
- 音频预处理器 (Audio Preprocessor):接收用户上传的音频文件(WAV, MP3等),将其转换为模型推理所需的格式(如采样率16kHz的PCM数据)。
- 推理引擎 (Inference Engine):这是核心,调用已加载的模型进行语音识别。它依次执行:
- 自动语种识别:判断音频中的语言。
- 语音转文本:生成原始的识别文本。
- 逆文本正则化 (ITN):将“一百二十三”这样的口语表述转换为“123”。
- 后处理器 (Post-Processor):对推理引擎的原始输出进行加工。
- 文本清洗:移除模型可能输出的富文本标签等无关字符。
- 标点恢复:调用CT-Transformer模型,为文本添加逗号、句号等标点,极大提升可读性。
- 结果返回器 (Result Returner):将最终处理好的、带标点的文本返回给用户。
在Streamlit版本中,这些模块的调用是线性、同步的:上传文件 -> 点击按钮 -> 依次执行上述所有步骤 -> 页面显示结果。而在WebAPI版本中,我们需要将其改造为异步、任务驱动的架构,以支持并发请求和长时间任务的处理。
2.2 从同步到异步:架构演进思路
将工具升级为WebAPI服务,关键在于解耦“请求接收”和“任务执行”。我们不能让用户的一个HTTP请求一直等待整个语音识别流程完成(尤其是长音频),这会导致请求超时和糟糕的用户体验。
我们的新架构将引入两个核心概念:
- 异步端点 (Async Endpoint):API接口收到请求后,立即返回一个“任务ID”,告诉用户“任务已接收,正在处理”,而不是最终结果。
- 任务队列与后台Worker:识别任务被放入一个队列(如内存队列或Redis),由后台的工作进程(Worker)异步地从队列中取出任务并执行。Worker会严格按顺序执行我们上面分析的识别流水线。
这样,前端应用可以先快速拿到任务ID,然后通过另一个接口,轮询查询这个ID对应的任务状态和结果。这种模式非常适合处理耗时不确定的AI推理任务。
接下来,我们就开始用代码实现这个新的架构。
3. 使用FastAPI构建WebAPI后端
FastAPI是一个现代、快速(高性能)的Python Web框架,特别适合构建API。它基于标准Python类型提示,自动生成交互式API文档,并且原生支持异步编程,这与我们的需求完美契合。
3.1 项目初始化与依赖安装
首先,创建一个新的项目目录,并建立虚拟环境。
# 创建项目目录
mkdir sensevoice-api-server
cd sensevoice-api-server
# 创建虚拟环境(推荐使用Python 3.8+)
python -m venv venv
# 激活虚拟环境
# Windows:
venv\Scripts\activate
# Linux/Mac:
source venv/bin/activate
# 安装核心依赖
pip install fastapi uvicorn python-multipart
pip install funasr
pip install modelscope
接下来,创建我们的项目文件结构。一个清晰的结构有助于长期维护。
sensevoice-api-server/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI应用主文件
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py # 配置文件
│ │ └── models.py # 数据模型(Pydantic)
│ ├── api/
│ │ ├── __init__.py
│ │ └── endpoints/
│ │ ├── __init__.py
│ │ ├── tasks.py # 任务相关接口
│ │ └── health.py # 健康检查接口
│ ├── services/
│ │ ├── __init__.py
│ │ ├── inference.py # 核心推理服务
│ │ └── task_manager.py # 任务管理服务
│ └── utils/
│ ├── __init__.py
│ ├── audio_utils.py # 音频处理工具
│ └── file_utils.py # 文件操作工具
├── models/ # 存放本地模型文件(如果需要)
├── tasks/ # 临时任务文件存储
├── requirements.txt
└── README.md
3.2 核心服务层实现
服务层是业务逻辑的核心。我们先实现最关键的推理服务。
文件:app/services/inference.py 这个文件封装了原始的语音识别逻辑,使其可以被API层调用。
import os
import tempfile
import logging
from pathlib import Path
from typing import Optional, Tuple, Dict, Any
from funasr import AutoModel
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SenseVoiceInferenceService:
"""
SenseVoice-Small ONNX 模型推理服务
封装了模型加载、音频推理、后处理等核心逻辑
"""
def __init__(self, model_dir: str = "./models"):
"""
初始化推理服务
Args:
model_dir: 本地模型文件目录路径
"""
self.model_dir = Path(model_dir)
self.model = None
self._model_loaded = False
logger.info(f"初始化推理服务,模型目录: {model_dir}")
def load_model(self):
"""加载语音识别模型"""
if self._model_loaded:
logger.info("模型已加载,跳过重复加载")
return
try:
# 构建模型路径,假设模型文件位于 model_dir 下
# 根据你的实际模型文件结构调整路径
model_path = str(self.model_dir / "sensevoice_small_int8.onnx")
# 初始化FunASR模型管道
# 注意:这里需要根据你实际的模型配置调整参数
self.model = AutoModel(
model=model_path,
model_revision="v1.0.0", # 模型版本
vad_model="fsmn-vad", # 语音活动检测模型
vad_model_revision="v1.0.0",
punc_model="ct-punc", # 标点模型
punc_model_revision="v1.0.0",
# 启用量化模式
quantize=True,
# 批处理大小设为1,适合API单请求场景
batch_size=1,
# 设备类型,自动选择可用设备
device="cuda" if torch.cuda.is_available() else "cpu",
# 启用逆文本正则化
use_itn=True,
)
self._model_loaded = True
logger.info("SenseVoice-Small ONNX 模型加载成功")
except Exception as e:
logger.error(f"模型加载失败: {e}")
raise RuntimeError(f"无法加载语音识别模型: {e}")
def transcribe_audio(
self,
audio_path: str,
language: str = "auto"
) -> Dict[str, Any]:
"""
转录音频文件
Args:
audio_path: 音频文件路径
language: 语言设置,"auto"为自动检测
Returns:
包含转录结果和元数据的字典
"""
if not self._model_loaded:
self.load_model()
if not os.path.exists(audio_path):
raise FileNotFoundError(f"音频文件不存在: {audio_path}")
try:
logger.info(f"开始转录音频: {audio_path}, 语言: {language}")
# 执行语音识别
# 注意:resample_fn 用于音频重采样,确保输入格式正确
result = self.model.generate(
input=audio_path,
language=language,
cache={}, # 可选的缓存,用于提升长音频处理性能
)
# 解析结果
# FunASR返回的结果结构可能需要根据实际输出调整
if result and len(result) > 0:
# 假设result[0]包含文本和可能的其他信息
text_result = result[0].get("text", "") if isinstance(result[0], dict) else result[0]
# 后处理:清理文本(移除可能的HTML标签等)
clean_text = self._clean_text(text_result)
return {
"success": True,
"text": clean_text,
"language": result[0].get("lang", "unknown") if isinstance(result[0], dict) else "unknown",
"audio_duration": self._get_audio_duration(audio_path),
"model_info": {
"name": "SenseVoice-Small-ONNX-Int8",
"quantized": True
}
}
else:
return {
"success": False,
"error": "模型未返回有效结果",
"text": ""
}
except Exception as e:
logger.error(f"音频转录失败: {e}")
return {
"success": False,
"error": str(e),
"text": ""
}
def _clean_text(self, text: str) -> str:
"""清理识别文本,移除不必要的标签和字符"""
if not text:
return text
# 移除可能的HTML标签
import re
clean = re.sub(r'<[^>]+>', '', text)
# 移除多余的空格和换行
clean = ' '.join(clean.split())
return clean
def _get_audio_duration(self, audio_path: str) -> float:
"""获取音频时长(秒)"""
try:
import librosa
duration = librosa.get_duration(path=audio_path)
return round(duration, 2)
except:
# 如果librosa不可用,返回估计值或0
return 0.0
def is_ready(self) -> bool:
"""检查服务是否就绪"""
return self._model_loaded and self.model is not None
文件:app/services/task_manager.py 这个类负责管理异步任务,是连接API请求和后台Worker的桥梁。
import uuid
import asyncio
import logging
from datetime import datetime
from enum import Enum
from typing import Dict, Optional, Any
from concurrent.futures import ThreadPoolExecutor
from app.services.inference import SenseVoiceInferenceService
logger = logging.getLogger(__name__)
class TaskStatus(str, Enum):
"""任务状态枚举"""
PENDING = "pending" # 等待中
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
class TranscriptionTask:
"""转录任务类"""
def __init__(self, task_id: str, audio_path: str, language: str = "auto"):
self.task_id = task_id
self.audio_path = audio_path
self.language = language
self.status = TaskStatus.PENDING
self.result: Optional[Dict[str, Any]] = None
self.error: Optional[str] = None
self.created_at = datetime.now()
self.started_at: Optional[datetime] = None
self.completed_at: Optional[datetime] = None
self.progress: float = 0.0 # 进度 0-1
class TaskManager:
"""
任务管理器
负责创建、跟踪和管理语音识别任务
"""
def __init__(self, inference_service: SenseVoiceInferenceService, max_workers: int = 2):
"""
初始化任务管理器
Args:
inference_service: 推理服务实例
max_workers: 最大并发工作线程数
"""
self.inference_service = inference_service
self.tasks: Dict[str, TranscriptionTask] = {}
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self._loop = asyncio.get_event_loop()
logger.info(f"任务管理器初始化完成,最大工作线程: {max_workers}")
def create_task(self, audio_path: str, language: str = "auto") -> str:
"""
创建新的转录任务
Args:
audio_path: 音频文件路径
language: 语言设置
Returns:
任务ID
"""
task_id = str(uuid.uuid4())
task = TranscriptionTask(task_id, audio_path, language)
self.tasks[task_id] = task
# 异步执行任务
asyncio.create_task(self._execute_task(task))
logger.info(f"创建新任务: {task_id}, 音频: {audio_path}")
return task_id
async def _execute_task(self, task: TranscriptionTask):
"""在后台线程中执行转录任务"""
task.status = TaskStatus.PROCESSING
task.started_at = datetime.now()
task.progress = 0.1 # 开始处理
try:
# 在线程池中执行阻塞的推理操作
result = await self._loop.run_in_executor(
self.executor,
self._run_inference,
task
)
task.result = result
task.status = TaskStatus.COMPLETED
task.progress = 1.0
logger.info(f"任务完成: {task.task_id}")
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
logger.error(f"任务失败 {task.task_id}: {e}")
finally:
task.completed_at = datetime.now()
def _run_inference(self, task: TranscriptionTask) -> Dict[str, Any]:
"""执行实际的语音识别推理"""
# 更新进度
task.progress = 0.3
# 调用推理服务
result = self.inference_service.transcribe_audio(
audio_path=task.audio_path,
language=task.language
)
# 更新进度
task.progress = 0.8
return result
def get_task(self, task_id: str) -> Optional[TranscriptionTask]:
"""根据任务ID获取任务"""
return self.tasks.get(task_id)
def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
"""获取任务状态信息"""
task = self.get_task(task_id)
if not task:
return None
return {
"task_id": task.task_id,
"status": task.status.value,
"progress": task.progress,
"created_at": task.created_at.isoformat() if task.created_at else None,
"started_at": task.started_at.isoformat() if task.started_at else None,
"completed_at": task.completed_at.isoformat() if task.completed_at else None,
"has_result": task.result is not None,
"error": task.error
}
def cleanup_old_tasks(self, max_age_hours: int = 24):
"""清理超过指定时间的老任务"""
now = datetime.now()
to_delete = []
for task_id, task in self.tasks.items():
if task.completed_at:
age = (now - task.completed_at).total_seconds() / 3600
if age > max_age_hours:
to_delete.append(task_id)
for task_id in to_delete:
del self.tasks[task_id]
if to_delete:
logger.info(f"清理了 {len(to_delete)} 个老任务")
3.3 API接口层设计
现在,我们使用FastAPI来创建对外提供服务的HTTP接口。
文件:app/core/models.py 使用Pydantic定义清晰的数据模型,这有助于自动生成API文档和请求验证。
from pydantic import BaseModel, Field
from typing import Optional, List
from enum import Enum
class LanguageOption(str, Enum):
"""支持的语言选项"""
AUTO = "auto"
ZH = "zh" # 中文
EN = "en" # 英文
# 可根据需要添加其他语言代码
class TranscriptionRequest(BaseModel):
"""语音转录请求模型"""
language: LanguageOption = Field(
default=LanguageOption.AUTO,
description="识别语言,'auto'为自动检测"
)
class Config:
schema_extra = {
"example": {
"language": "auto"
}
}
class TaskStatusResponse(BaseModel):
"""任务状态响应模型"""
task_id: str = Field(..., description="任务唯一ID")
status: str = Field(..., description="任务状态: pending/processing/completed/failed")
progress: float = Field(..., description="处理进度 0-1")
created_at: Optional[str] = Field(None, description="任务创建时间")
started_at: Optional[str] = Field(None, description="任务开始处理时间")
completed_at: Optional[str] = Field(None, description="任务完成时间")
error: Optional[str] = Field(None, description="错误信息(如果有)")
class TranscriptionResult(BaseModel):
"""转录结果模型"""
success: bool = Field(..., description="是否成功")
text: str = Field(..., description="转录文本")
language: Optional[str] = Field(None, description="检测到的语言")
audio_duration: Optional[float] = Field(None, description="音频时长(秒)")
model_info: Optional[dict] = Field(None, description="模型信息")
error: Optional[str] = Field(None, description="错误信息")
class HealthResponse(BaseModel):
"""健康检查响应模型"""
status: str = Field(..., description="服务状态")
model_loaded: bool = Field(..., description="模型是否已加载")
total_tasks: int = Field(..., description="总任务数")
active_tasks: int = Field(..., description="活跃任务数")
uptime: Optional[float] = Field(None, description="服务运行时间(秒)")
文件:app/api/endpoints/tasks.py 实现具体的任务相关API端点。
import os
import tempfile
import logging
from pathlib import Path
from typing import List
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from app.core.models import (
TranscriptionRequest,
TaskStatusResponse,
TranscriptionResult,
LanguageOption
)
from app.services.task_manager import TaskManager, TaskStatus
from app.utils.file_utils import save_upload_file
# 创建路由
router = APIRouter(prefix="/tasks", tags=["tasks"])
logger = logging.getLogger(__name__)
# 依赖注入:在实际应用中,这些应该通过FastAPI的Depends注入
# 这里为了简化,我们假设已经初始化好了
from app.main import get_task_manager
task_manager = get_task_manager()
@router.post("/transcribe", response_model=dict)
async def create_transcription_task(
background_tasks: BackgroundTasks,
audio_file: UploadFile = File(..., description="音频文件 (WAV, MP3, M4A等)"),
language: LanguageOption = Form(LanguageOption.AUTO, description="识别语言")
):
"""
创建语音转录任务
- **audio_file**: 上传的音频文件
- **language**: 识别语言,默认为自动检测
返回任务ID,用于查询任务状态和结果
"""
# 验证文件类型
allowed_extensions = {'.wav', '.mp3', '.m4a', '.ogg', '.flac', '.aac'}
file_ext = Path(audio_file.filename).suffix.lower() if audio_file.filename else ''
if file_ext not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"不支持的文件格式。支持格式: {', '.join(allowed_extensions)}"
)
# 保存上传的文件到临时目录
try:
# 创建临时目录(如果不存在)
temp_dir = Path("tasks/temp")
temp_dir.mkdir(parents=True, exist_ok=True)
# 保存文件
temp_file_path = await save_upload_file(audio_file, temp_dir)
logger.info(f"文件上传成功: {audio_file.filename} -> {temp_file_path}")
# 创建转录任务
task_id = task_manager.create_task(
audio_path=str(temp_file_path),
language=language.value
)
# 添加后台任务:处理完成后清理临时文件
background_tasks.add_task(
cleanup_temp_file,
task_id,
str(temp_file_path)
)
return {
"task_id": task_id,
"message": "转录任务已创建",
"status_url": f"/tasks/{task_id}/status",
"result_url": f"/tasks/{task_id}/result"
}
except Exception as e:
logger.error(f"创建任务失败: {e}")
raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")
@router.get("/{task_id}/status", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
"""
获取任务状态
- **task_id**: 任务ID
返回任务的当前状态和处理进度
"""
status_info = task_manager.get_task_status(task_id)
if not status_info:
raise HTTPException(status_code=404, detail="任务不存在")
return status_info
@router.get("/{task_id}/result", response_model=TranscriptionResult)
async def get_task_result(task_id: str):
"""
获取任务结果
- **task_id**: 任务ID
如果任务完成,返回转录结果;否则返回错误
"""
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
if task.status == TaskStatus.PENDING:
raise HTTPException(status_code=202, detail="任务等待处理中")
if task.status == TaskStatus.PROCESSING:
raise HTTPException(status_code=202, detail="任务处理中")
if task.status == TaskStatus.FAILED:
raise HTTPException(
status_code=500,
detail=f"任务处理失败: {task.error}"
)
if task.status == TaskStatus.COMPLETED and task.result:
return TranscriptionResult(**task.result)
raise HTTPException(status_code=500, detail="任务状态异常")
@router.get("/", response_model=List[TaskStatusResponse])
async def list_tasks(limit: int = 20, offset: int = 0):
"""
列出所有任务(用于调试和管理)
- **limit**: 返回的最大任务数
- **offset**: 偏移量,用于分页
返回任务状态列表
"""
# 获取所有任务ID
all_task_ids = list(task_manager.tasks.keys())
# 应用分页
start = offset
end = offset + limit
paginated_ids = all_task_ids[start:end]
# 获取每个任务的状态
tasks_status = []
for task_id in paginated_ids:
status = task_manager.get_task_status(task_id)
if status:
tasks_status.append(status)
return tasks_status
async def cleanup_temp_file(task_id: str, file_path: str):
"""
清理临时文件的后台任务
当转录任务完成后,删除临时音频文件
"""
import asyncio
from pathlib import Path
# 等待任务完成
while True:
await asyncio.sleep(5) # 每5秒检查一次
task = task_manager.get_task(task_id)
if not task:
# 任务不存在,可能是被清理了
break
if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
# 任务完成或失败,清理文件
try:
path = Path(file_path)
if path.exists():
path.unlink()
logger.info(f"已清理临时文件: {file_path}")
break
except Exception as e:
logger.error(f"清理临时文件失败 {file_path}: {e}")
break
文件:app/api/endpoints/health.py 健康检查端点,用于监控服务状态。
import time
import logging
from fastapi import APIRouter
from app.core.models import HealthResponse
router = APIRouter(prefix="/health", tags=["health"])
logger = logging.getLogger(__name__)
# 服务启动时间
START_TIME = time.time()
# 依赖注入:在实际应用中,这些应该通过FastAPI的Depends注入
from app.main import get_inference_service, get_task_manager
inference_service = get_inference_service()
task_manager = get_task_manager()
@router.get("/", response_model=HealthResponse)
async def health_check():
"""
服务健康检查
返回服务状态、模型加载情况、任务统计等信息
"""
# 计算运行时间
uptime = time.time() - START_TIME
# 统计任务
total_tasks = len(task_manager.tasks) if task_manager else 0
# 统计活跃任务(处理中)
active_tasks = 0
if task_manager:
for task in task_manager.tasks.values():
if task.status.value == "processing":
active_tasks += 1
return HealthResponse(
status="healthy",
model_loaded=inference_service.is_ready() if inference_service else False,
total_tasks=total_tasks,
active_tasks=active_tasks,
uptime=round(uptime, 2)
)
@router.get("/ready")
async def readiness_check():
"""
服务就绪检查
检查服务是否准备好接收请求(模型是否加载完成)
"""
if inference_service and inference_service.is_ready():
return {"status": "ready"}
else:
return {"status": "not_ready"}, 503
3.4 工具函数与配置
文件:app/utils/file_utils.py 处理文件上传和保存的工具函数。
import shutil
import logging
from pathlib import Path
from typing import BinaryIO
from fastapi import UploadFile
logger = logging.getLogger(__name__)
async def save_upload_file(upload_file: UploadFile, destination: Path) -> Path:
"""
保存上传的文件到指定目录
Args:
upload_file: FastAPI UploadFile对象
destination: 目标目录Path对象
Returns:
保存后的文件完整路径
"""
try:
# 确保目标目录存在
destination.mkdir(parents=True, exist_ok=True)
# 生成安全的文件名
safe_filename = get_safe_filename(upload_file.filename)
file_path = destination / safe_filename
# 保存文件
with open(file_path, "wb") as buffer:
# 分块读取和写入,避免内存问题
shutil.copyfileobj(upload_file.file, buffer)
logger.debug(f"文件保存成功: {file_path}")
return file_path
except Exception as e:
logger.error(f"保存文件失败: {e}")
raise
finally:
# 确保文件指针关闭
await upload_file.close()
def get_safe_filename(filename: str) -> str:
"""
生成安全的文件名,避免路径遍历攻击
Args:
filename: 原始文件名
Returns:
安全的文件名
"""
if not filename:
return "uploaded_file"
# 只保留文件名部分,移除路径
safe_name = Path(filename).name
# 可选:添加时间戳避免重名
# from datetime import datetime
# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# safe_name = f"{timestamp}_{safe_name}"
return safe_name
def cleanup_directory(directory: Path, max_age_seconds: int = 3600):
"""
清理目录中超过指定时间的文件
Args:
directory: 要清理的目录
max_age_seconds: 最大文件年龄(秒)
"""
import time
import os
if not directory.exists():
return
current_time = time.time()
for file_path in directory.iterdir():
if file_path.is_file():
file_age = current_time - os.path.getmtime(file_path)
if file_age > max_age_seconds:
try:
file_path.unlink()
logger.info(f"清理旧文件: {file_path}")
except Exception as e:
logger.error(f"清理文件失败 {file_path}: {e}")
文件:app/core/config.py 配置文件,集中管理各种设置。
import os
from pathlib import Path
from typing import Optional
from pydantic import BaseSettings
class Settings(BaseSettings):
"""应用配置"""
# 应用设置
app_name: str = "SenseVoice-Small ONNX API Server"
app_version: str = "1.0.0"
debug: bool = False
# 服务器设置
host: str = "0.0.0.0"
port: int = 8000
workers: int = 1
# 模型设置
model_dir: str = "./models"
model_path: Optional[str] = None
# 任务设置
max_workers: int = 2 # 最大并发任务数
temp_file_ttl: int = 3600 # 临时文件存活时间(秒)
# 日志设置
log_level: str = "INFO"
log_file: Optional[str] = None
# CORS设置
cors_origins: list = ["*"]
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
def get_settings() -> Settings:
"""获取配置实例"""
return Settings()
# 创建必要的目录
def init_directories():
"""初始化应用所需的目录"""
directories = [
"models",
"tasks/temp",
"logs"
]
for dir_path in directories:
Path(dir_path).mkdir(parents=True, exist_ok=True)
3.5 主应用入口
文件:app/main.py 这是FastAPI应用的入口点,负责组装所有组件。
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import Settings, init_directories
from app.services.inference import SenseVoiceInferenceService
from app.services.task_manager import TaskManager
from app.api.endpoints import tasks, health
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 全局服务实例
_inference_service = None
_task_manager = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理
启动时初始化服务,关闭时清理资源
"""
global _inference_service, _task_manager
# 启动时
logger.info("正在启动 SenseVoice-Small ONNX API 服务...")
# 初始化目录
init_directories()
# 初始化配置
settings = Settings()
# 初始化推理服务
try:
_inference_service = SenseVoiceInferenceService(
model_dir=settings.model_dir
)
_inference_service.load_model()
logger.info("推理服务初始化完成")
except Exception as e:
logger.error(f"推理服务初始化失败: {e}")
# 根据实际情况决定是否退出
# raise
# 初始化任务管理器
if _inference_service:
_task_manager = TaskManager(
inference_service=_inference_service,
max_workers=settings.max_workers
)
logger.info("任务管理器初始化完成")
yield # 应用运行期间
# 关闭时
logger.info("正在关闭服务...")
if _task_manager:
_task_manager.executor.shutdown(wait=True)
logger.info("任务管理器已关闭")
# 创建FastAPI应用
app = FastAPI(
title="SenseVoice-Small ONNX API",
description="基于SenseVoice-Small ONNX量化模型的语音识别API服务",
version="1.0.0",
lifespan=lifespan
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应限制为具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(tasks.router)
app.include_router(health.router)
# 根路由
@app.get("/")
async def root():
"""根端点,返回服务信息"""
return {
"service": "SenseVoice-Small ONNX API",
"version": "1.0.0",
"docs": "/docs",
"endpoints": {
"create_task": "POST /tasks/transcribe",
"check_status": "GET /tasks/{task_id}/status",
"get_result": "GET /tasks/{task_id}/result",
"health": "GET /health"
}
}
# 获取服务实例的函数(用于依赖注入)
def get_inference_service():
return _inference_service
def get_task_manager():
return _task_manager
if __name__ == "__main__":
import uvicorn
settings = Settings()
uvicorn.run(
"app.main:app",
host=settings.host,
port=settings.port,
reload=settings.debug,
workers=settings.workers
)
4. 部署与使用指南
现在,我们已经完成了WebAPI服务的代码开发。接下来,让我们看看如何部署和使用这个服务。
4.1 环境配置与启动
首先,确保你已经准备好了模型文件。根据原始工具的说明,你需要:
-
准备模型文件:将SenseVoiceSmall ONNX量化模型文件放置在
./models/目录下。具体的模型文件名和结构可能需要根据你的实际情况调整app/services/inference.py中的加载逻辑。 -
安装依赖:创建并激活虚拟环境后,安装所有依赖。
# 安装项目依赖
pip install -r requirements.txt
# requirements.txt 内容示例:
fastapi==0.104.1
uvicorn[standard]==0.24.0
funasr==0.1.0
modelscope==1.11.0
python-multipart==0.0.6
librosa==0.10.1
pydantic==2.5.0
- 启动服务:有多种方式可以启动FastAPI服务。
方式一:直接运行(开发环境)
python -m app.main
方式二:使用Uvicorn(生产环境推荐)
# 开发模式(带热重载)
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# 生产模式(多worker)
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
方式三:使用Gunicorn + Uvicorn(高并发生产环境)
# 安装gunicorn
pip install gunicorn
# 启动(Linux/Mac)
gunicorn -w 4 -k uvicorn.workers.UvicornWorker app.main:app
# 或使用配置文件
gunicorn -c gunicorn.conf.py app.main:app
服务启动后,你会看到类似下面的输出:
INFO: Started server process [12345]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
4.2 API接口使用示例
服务启动后,可以通过以下方式使用API:
4.2.1 使用交互式文档
FastAPI自动生成了交互式API文档,访问 http://localhost:8000/docs 即可查看和测试所有接口。
4.2.2 使用cURL命令行测试
1. 创建转录任务
curl -X POST "http://localhost:8000/tasks/transcribe" \
-H "accept: application/json" \
-H "Content-Type: multipart/form-data" \
-F "audio_file=@/path/to/your/audio.wav" \
-F "language=auto"
成功响应示例:
{
"task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"message": "转录任务已创建",
"status_url": "/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/status",
"result_url": "/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/result"
}
2. 查询任务状态
curl -X GET "http://localhost:8000/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/status" \
-H "accept: application/json"
响应示例(处理中):
{
"task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"status": "processing",
"progress": 0.5,
"created_at": "2024-01-15T10:30:00",
"started_at": "2024-01-15T10:30:01",
"completed_at": null,
"error": null
}
3. 获取转录结果
curl -X GET "http://localhost:8000/tasks/a1b2c3d4-e5f6-7890-abcd-ef1234567890/result" \
-H "accept: application/json"
成功响应示例:
{
"success": true,
"text": "你好,这是一个测试音频。今天天气真好,适合出去散步。",
"language": "zh",
"audio_duration": 5.23,
"model_info": {
"name": "SenseVoice-Small-ONNX-Int8",
"quantized": true
},
"error": null
}
4.2.3 使用Python客户端
你也可以编写Python代码来调用这个API服务:
import requests
import time
class SenseVoiceClient:
def __init__(self, base_url="http://localhost:8000"):
self.base_url = base_url
def transcribe_audio(self, audio_path, language="auto"):
"""上传音频并创建转录任务"""
with open(audio_path, 'rb') as f:
files = {'audio_file': f}
data = {'language': language}
response = requests.post(
f"{self.base_url}/tasks/transcribe",
files=files,
data=data
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"创建任务失败: {response.text}")
def get_task_status(self, task_id):
"""查询任务状态"""
response = requests.get(f"{self.base_url}/tasks/{task_id}/status")
return response.json()
def get_result(self, task_id, wait_for_completion=True, poll_interval=2):
"""获取任务结果,可选等待完成"""
if wait_for_completion:
while True:
status = self.get_task_status(task_id)
if status['status'] == 'completed':
break
elif status['status'] == 'failed':
raise Exception(f"任务失败: {status.get('error', '未知错误')}")
print(f"任务处理中... 进度: {status['progress']:.0%}")
time.sleep(poll_interval)
response = requests.get(f"{self.base_url}/tasks/{task_id}/result")
return response.json()
# 使用示例
if __name__ == "__main__":
client = SenseVoiceClient()
# 创建任务
task_info = client.transcribe_audio("test_audio.wav", language="auto")
task_id = task_info["task_id"]
print(f"任务已创建: {task_id}")
# 等待并获取结果
try:
result = client.get_result(task_id, wait_for_completion=True)
if result["success"]:
print(f"识别结果: {result['text']}")
print(f"音频时长: {result['audio_duration']}秒")
print(f"检测语言: {result['language']}")
else:
print(f"识别失败: {result['error']}")
except Exception as e:
print(f"获取结果时出错: {e}")
4.3 生产环境部署建议
对于生产环境,你还需要考虑以下几个方面:
- 反向代理:使用Nginx或Apache作为反向代理,处理SSL/TLS、负载均衡和静态文件服务。
Nginx配置示例:
server {
listen 80;
server_name your-domain.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket支持(如果需要)
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
- 进程管理:使用systemd或supervisor管理服务进程,确保服务在崩溃后自动重启。
systemd服务文件示例 (/etc/systemd/system/sensevoice-api.service):
[Unit]
Description=SenseVoice-Small ONNX API Service
After=network.target
[Service]
User=www-data
Group=www-data
WorkingDirectory=/path/to/your/sensevoice-api-server
Environment="PATH=/path/to/venv/bin"
ExecStart=/path/to/venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
-
监控与日志:配置日志轮转,集成监控系统(如Prometheus + Grafana)监控API性能。
-
安全加固:
- 使用HTTPS(通过Let's Encrypt获取免费证书)
- 限制CORS来源
- 添加API密钥认证(如果需要)
- 设置文件上传大小限制
5. 总结
通过本教程,我们成功地将一个本地的SenseVoice-Small ONNX语音识别工具,扩展成了一个功能完备的WebAPI服务。让我们回顾一下这个过程中的关键收获:
5.1 核心成果
我们构建的API服务具有以下特点:
- 完整的RESTful接口:提供了任务创建、状态查询、结果获取的标准HTTP接口,易于集成到任何系统中。
- 异步任务处理:采用任务队列模式,支持并发处理,避免长时任务阻塞请求。
- 健壮的错误处理:完善的异常捕获和错误信息返回,让客户端能够准确了解处理状态。
- 自动资源管理:自动清理临时文件,避免磁盘空间浪费。
- 完整的监控支持:健康检查接口让运维人员能够随时了解服务状态。
- 交互式文档:FastAPI自动生成的Swagger UI文档,让API测试和调试变得非常简单。
5.2 实际应用场景
这个API服务可以轻松集成到各种应用中:
- 内容创作平台:为用户提供语音转文字功能,快速生成视频字幕、会议纪要
- 在线教育系统:将授课音频自动转为文字,方便学生复习和搜索
- 客服系统:分析客服通话录音,提取关键信息,进行质量检查
- 智能家居:通过语音指令控制设备,将语音转为文本指令
- 移动应用:为App添加语音输入功能,提升用户体验
5.3 后续优化方向
虽然我们已经构建了一个可用的API服务,但还有不少可以继续优化的地方:
- 性能优化:
- 添加模型预热机制,减少首次推理延迟
- 实现请求批处理,提升GPU利用率
- 添加结果缓存,对相同音频文件避免重复处理
- 功能增强:
- 支持实时流式语音识别(WebSocket接口)
- 添加更多音频预处理选项(降噪、音量标准化)
- 支持自定义词汇表,提升特定领域术语识别准确率
- 添加语音活动检测(VAD)参数调节
- 运维改进:
- 添加Prometheus指标导出,监控QPS、延迟、错误率等
- 实现灰度发布和A/B测试支持
- 添加请求限流和配额管理
- 用户体验:
- 提供更多客户端SDK(JavaScript、Java、Go等)
- 添加Web界面,让非技术人员也能轻松使用
- 支持任务进度WebSocket推送,避免客户端轮询
5.4 开始使用
现在,你已经拥有了一个功能完整的语音识别API服务。你可以:
- 直接使用:按照第4部分的指南部署服务,立即开始使用
- 二次开发:基于现有代码,添加你需要的特定功能
- 集成测试:将API集成到你的应用中,体验语音识别的便利
这个项目展示了如何将一个本地AI工具"服务化"的完整过程。同样的思路也可以应用到其他AI模型上,无论是图像识别、自然语言处理还是其他机器学习模型。希望这个教程对你有所帮助,祝你在AI应用开发的道路上越走越远!
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐
所有评论(0)