SenseVoice-small-onnx语音识别实战:Python批量处理1000+音频文件

1. 引言

你有没有遇到过这样的场景?手头积压了几百甚至上千个音频文件,可能是会议录音、访谈记录、播客内容,或者是用户上传的语音反馈。一个个点开听、手动转成文字,不仅耗时耗力,还容易出错。如果这些音频里还混杂着中文、英文、粤语,那工作量更是让人头疼。

今天,我们就来解决这个痛点。我将带你用 SenseVoice-small-onnx 这个轻量高效的语音识别模型,写一个Python脚本,一次性自动处理成百上千个音频文件。这个模型最大的特点就是“小而快”——它经过了ONNX格式的量化优化,体积小巧(约230M),但识别能力一点不弱,支持包括中文、粤语、英语、日语、韩语在内的50多种语言,还能自动检测。

想象一下,原本需要几天才能完成的工作,现在可能一杯咖啡的时间就搞定了。这篇文章就是你的实战手册,我会从环境搭建、脚本编写,到批量处理、结果整理,一步步带你走完整个流程。即使你之前没怎么接触过语音识别,跟着做也能轻松上手。

2. 准备工作:环境与模型

在开始写批量处理的脚本之前,我们需要先把“工具”准备好。这里主要分两步:安装必要的Python库,以及确保语音识别模型就位。

2.1 安装Python依赖

SenseVoice-small-onnx模型主要通过 funasr-onnx 这个库来调用。我们还需要一些辅助库来处理音频文件和构建简单的Web界面(可选)。打开你的终端或命令行,执行以下命令:

pip install funasr-onnx soundfile

简单解释一下:

  • funasr-onnx: 核心库,提供了调用ONNX格式SenseVoice模型的接口。
  • soundfile: 一个非常常用的音频文件读写库,支持wav, flac, ogg等多种格式。

如果你想体验模型自带的Web界面或API服务,可以安装更完整的依赖:

pip install funasr-onnx gradio fastapi uvicorn soundfile jieba

不过对于纯粹的批量处理脚本,我们只需要前两个就足够了。

2.2 获取与确认模型

根据你提供的资料,模型已经预置在了一个特定路径。在批量处理脚本中,我们需要告诉程序模型在哪里。

模型路径通常是:/root/ai-models/danieldong/sensevoice-small-onnx-quant

关键点:这个路径下应该有一个名为 model_quant.onnx 的主模型文件。你的脚本能否成功运行,第一步就是确认这个路径和文件是否存在。

如果你是在其他环境(比如你自己的电脑),可能需要先下载模型。funasr-onnx 库通常支持自动下载,但指定一个明确的缓存路径是更稳妥的做法。

3. 核心脚本:批量识别引擎

现在,我们来构建批量处理的核心。我会把脚本分成几个部分,并详细解释每一块的作用。

3.1 脚本框架与思路

整个脚本的流程其实很清晰:

  1. 扫描:找到指定文件夹里所有的音频文件。
  2. 加载:初始化语音识别模型。
  3. 循环处理:对每一个音频文件,调用模型进行转写。
  4. 保存结果:把识别出来的文本,按照一定格式(比如和音频文件同名的txt文件)保存下来。
  5. 汇总报告:最后生成一个简单的处理日志,告诉我们成功了几个,失败了几个。

下面就是这个思路的代码实现:

import os
from pathlib import Path
import soundfile as sf
from funasr_onnx import SenseVoiceSmall
import time
import json

class BatchAudioTranscriber:
    """
    批量音频转写器
    核心功能:遍历文件夹,识别所有音频文件的内容并保存为文本
    """
    def __init__(self, model_dir, batch_size=4, language="auto", use_itn=True):
        """
        初始化转写器
        :param model_dir: ONNX量化模型所在的目录路径
        :param batch_size: 批处理大小,同时处理多个文件可提升效率
        :param language: 识别语言,'auto'为自动检测
        :param use_itn: 是否启用逆文本正则化(如将“三”转为“3”)
        """
        self.model_dir = model_dir
        self.batch_size = batch_size
        self.language = language
        self.use_itn = use_itn
        self.model = None
        self.supported_extensions = {'.wav', '.mp3', '.flac', '.m4a', '.ogg', '.aac'}

    def initialize_model(self):
        """初始化语音识别模型"""
        print(f"[INFO] 正在加载模型,路径: {self.model_dir}")
        try:
            self.model = SenseVoiceSmall(
                model_dir=self.model_dir,
                batch_size=self.batch_size,
                quantize=True  # 使用量化模型
            )
            print("[INFO] 模型加载成功!")
        except Exception as e:
            print(f"[ERROR] 模型加载失败: {e}")
            raise

    def is_audio_file(self, file_path):
        """检查文件是否为支持的音频格式"""
        return Path(file_path).suffix.lower() in self.supported_extensions

    def find_audio_files(self, input_dir):
        """递归查找输入目录下的所有音频文件"""
        input_path = Path(input_dir)
        if not input_path.exists():
            raise FileNotFoundError(f"输入目录不存在: {input_dir}")

        audio_files = []
        for ext in self.supported_extensions:
            audio_files.extend(input_path.rglob(f"*{ext}"))
        # 去重并转换为字符串路径
        audio_files = list(set([str(f) for f in audio_files]))
        audio_files.sort()  # 排序,保证处理顺序一致
        return audio_files

    def transcribe_audio(self, audio_path):
        """
        转写单个音频文件
        :return: 识别文本,如果失败则返回None
        """
        try:
            # 检查音频文件是否可读
            if not os.path.exists(audio_path):
                print(f"[WARN] 文件不存在: {audio_path}")
                return None

            # 调用模型进行识别
            # 注意:模型接收一个文件路径列表,这里我们传入单个文件的列表
            results = self.model([audio_path], language=self.language, use_itn=self.use_itn)
            
            if results and len(results) > 0:
                return results[0]  # 返回第一个(也是唯一一个)结果
            else:
                print(f"[WARN] 未识别到有效内容: {audio_path}")
                return ""
                
        except Exception as e:
            print(f"[ERROR] 处理文件失败 {audio_path}: {e}")
            return None

    def save_transcription(self, audio_path, text, output_dir):
        """将识别文本保存到文件"""
        audio_name = Path(audio_path).stem  # 获取文件名(不含扩展名)
        output_path = Path(output_dir) / f"{audio_name}.txt"
        
        # 确保输出目录存在
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(text)
        return str(output_path)

    def process_batch(self, input_dir, output_dir):
        """
        批量处理主函数
        :param input_dir: 音频文件所在目录
        :param output_dir: 文本输出目录
        :return: 处理结果统计
        """
        print("=" * 50)
        print("开始批量音频转写任务")
        print(f"输入目录: {input_dir}")
        print(f"输出目录: {output_dir}")
        print(f"语言设置: {self.language}")
        print("=" * 50)
        
        # 初始化模型
        self.initialize_model()
        
        # 查找所有音频文件
        audio_files = self.find_audio_files(input_dir)
        total_files = len(audio_files)
        
        if total_files == 0:
            print("[INFO] 未找到任何支持的音频文件,任务结束。")
            return {"total": 0, "success": 0, "failed": 0, "skipped": 0}
        
        print(f"[INFO] 共找到 {total_files} 个音频文件")
        
        # 准备输出目录
        Path(output_dir).mkdir(parents=True, exist_ok=True)
        
        # 统计变量
        success_count = 0
        failed_count = 0
        skipped_count = 0
        results = []
        
        # 开始批量处理
        start_time = time.time()
        
        for i, audio_file in enumerate(audio_files, 1):
            print(f"\n[{i}/{total_files}] 处理中: {Path(audio_file).name}")
            
            # 转写音频
            text = self.transcribe_audio(audio_file)
            
            if text is None:
                # 处理失败
                failed_count += 1
                results.append({
                    "file": audio_file,
                    "status": "failed",
                    "text": None
                })
                print(f"  -> 处理失败")
                
            elif text == "":
                # 空结果(可能是静音或无法识别)
                skipped_count += 1
                results.append({
                    "file": audio_file,
                    "status": "skipped",
                    "text": ""
                })
                print(f"  -> 无有效内容,已跳过")
                
            else:
                # 处理成功
                output_file = self.save_transcription(audio_file, text, output_dir)
                success_count += 1
                results.append({
                    "file": audio_file,
                    "status": "success",
                    "text": text[:100] + "..." if len(text) > 100 else text,  # 预览
                    "output": output_file
                })
                print(f"  -> 识别成功,已保存至: {Path(output_file).name}")
                print(f"     内容预览: {text[:80]}...")
        
        # 计算耗时
        elapsed_time = time.time() - start_time
        
        # 生成统计报告
        report = {
            "total_files": total_files,
            "success": success_count,
            "failed": failed_count,
            "skipped": skipped_count,
            "time_elapsed": round(elapsed_time, 2),
            "avg_time_per_file": round(elapsed_time / total_files, 2) if total_files > 0 else 0,
            "output_dir": output_dir,
            "language": self.language,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }
        
        # 保存详细报告
        report_file = Path(output_dir) / "transcription_report.json"
        with open(report_file, 'w', encoding='utf-8') as f:
            json.dump({
                "summary": report,
                "details": results
            }, f, ensure_ascii=False, indent=2)
        
        # 打印总结
        print("\n" + "=" * 50)
        print("批量转写任务完成!")
        print(f"总计处理: {total_files} 个文件")
        print(f"成功: {success_count} 个")
        print(f"失败: {failed_count} 个")
        print(f"跳过: {skipped_count} 个")
        print(f"总耗时: {elapsed_time:.2f} 秒")
        print(f"平均每个文件: {report['avg_time_per_file']:.2f} 秒")
        print(f"详细报告已保存至: {report_file}")
        print("=" * 50)
        
        return report

3.2 如何使用这个脚本

把上面的代码保存为一个文件,比如叫做 batch_transcribe.py。然后,在同一目录下创建一个 main.py 来调用它:

#!/usr/bin/env python3
"""
批量语音识别主程序
用法:python main.py --input 音频目录 --output 结果目录
"""

import argparse
from batch_transcribe import BatchAudioTranscriber

def main():
    parser = argparse.ArgumentParser(description='批量语音识别工具')
    parser.add_argument('--input', '-i', required=True, help='音频文件所在目录')
    parser.add_argument('--output', '-o', default='./transcription_results', help='文本输出目录')
    parser.add_argument('--model_dir', '-m', default='/root/ai-models/danieldong/sensevoice-small-onnx-quant',
                       help='ONNX模型目录路径')
    parser.add_argument('--language', '-l', default='auto', help='识别语言,如 zh, en, yue, auto')
    parser.add_argument('--batch_size', '-b', type=int, default=4, help='批处理大小')
    
    args = parser.parse_args()
    
    # 创建转写器实例
    transcriber = BatchAudioTranscriber(
        model_dir=args.model_dir,
        batch_size=args.batch_size,
        language=args.language,
        use_itn=True
    )
    
    # 开始批量处理
    report = transcriber.process_batch(args.input, args.output)
    
    # 根据结果给出提示
    if report['success'] == 0 and report['total_files'] > 0:
        print("\n⚠️  所有文件处理均失败,请检查:")
        print("   1. 模型路径是否正确")
        print("   2. 音频文件格式是否支持")
        print("   3. 音频文件是否损坏")

if __name__ == "__main__":
    main()

现在,打开终端,进入脚本所在目录,就可以运行了:

# 基本用法:处理当前目录下的audio_files文件夹
python main.py --input ./audio_files --output ./results

# 指定语言为中文
python main.py --input ./meetings --output ./meeting_texts --language zh

# 指定模型路径和批处理大小
python main.py -i ./podcasts -o ./transcripts -m /path/to/your/model -b 8

运行后,你会看到实时的处理进度,每个文件识别成功后还会预览前80个字符。所有结果都会保存到指定的输出目录,每个音频文件对应一个同名的 .txt 文件。

4. 实战技巧与优化建议

脚本能跑起来只是第一步,要高效稳定地处理大量文件,还需要一些技巧。

4.1 处理不同类型的音频

现实中的音频文件可能五花八门。我们的脚本虽然支持常见格式,但还有几点要注意:

  • 长音频处理:如果单个音频文件很长(比如超过30分钟),可以考虑先将其分割成小段。模型对短音频的识别效率和准确率通常更高。
  • 音频质量:背景噪音过大、音量过低或过高的音频,识别效果会打折扣。对于重要的批量任务,可以先用小样本测试一下。
  • 采样率funasr-onnx 模型通常期望16kHz采样率的音频。如果您的音频是其他采样率,soundfile 库在读取时可能会自动重采样,但最稳妥的方式是使用 librosapydub 进行统一的预处理。

4.2 性能优化点

当你需要处理成千上万个文件时,效率很重要。

  1. 调整 batch_size:这是最重要的参数。SenseVoiceSmall 初始化时的 batch_size 决定了模型一次能处理多少个音频。如果你的显卡内存足够(比如有8G以上显存),可以适当调大(如8或16)。对于纯CPU环境,保持较小的值(如2或4)更稳定。
  2. 文件I/O优化:脚本是顺序处理文件的。如果文件存储在慢速硬盘(如机械硬盘)上,I/O可能成为瓶颈。可以考虑将待处理的音频文件先复制到SSD上进行处理。
  3. 错误处理与重试:网络波动或临时性错误可能导致个别文件处理失败。可以在 transcribe_audio 方法中添加简单的重试逻辑。
  4. 并发处理(高级):对于极大量的文件,可以使用Python的 concurrent.futures 模块实现多进程处理,将文件列表拆分,同时运行多个处理实例。但要注意模型本身可能不是线程安全的,通常建议用多进程而非多线程。

4.3 结果后处理

识别出来的原始文本,可能还需要进一步加工才能用。

  • 标点与分段:SenseVoice模型本身已经包含了较好的标点预测。如果觉得段落过长,可以基于句号、问号等标点进行简单分段。
  • 说话人分离:目前的脚本没有区分说话人。如果音频中有多人对话,并且需要区分,你需要使用支持“说话人日志”功能的模型,这比单纯的语音识别更复杂一些。
  • 关键词过滤与提取:对于会议记录,你可能想提取“决议”、“待办”等关键词附近的句子。这可以在保存文本后,用另一个脚本进行正则匹配或简单的NLP处理来实现。
  • 生成结构化数据:你可以修改 save_transcription 方法,将结果保存为CSV或JSON格式,方便导入数据库或Excel进行分析。

5. 总结

我们来回顾一下今天完成的事情。我们利用 SenseVoice-small-onnx 这个轻量且支持多语言的语音识别模型,构建了一个完整的、健壮的Python批量处理工具。这个工具能够:

  1. 自动遍历文件夹,找到所有支持的音频文件(wav, mp3, flac等)。
  2. 高效调用模型,进行准确的语音转文字,并支持中文、粤语、英语等多语言自动识别。
  3. 智能保存结果,每个音频生成一个对应的文本文件,并保留原始文件名,方便对照。
  4. 生成处理报告,详细记录每个文件的状态、识别预览和整体统计信息,让你对处理结果一目了然。

这个脚本的价值在于将重复劳动自动化。无论是处理客户服务录音、整理内部会议纪要,还是为视频内容生成字幕,它都能节省你大量的时间和精力。模型本身的量化版本速度很快,在普通的CPU机器上也能获得不错的速度,而如果你有GPU,批处理(batch_size)更能让效率飞起。

你可以以今天的脚本为基础,根据自己具体的业务需求进行扩展,比如添加音频预处理降噪、集成说话人识别、或者将结果直接推送至你的笔记软件或知识库。语音识别的技术已经非常成熟,让它为你高效工作的关键,就是这样一个能够贴合你工作流的自动化脚本。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐