Qwen3-ASR-0.6B使用技巧:如何提高语音识别准确率

你是不是也遇到过这种情况:用语音识别工具转写会议录音,结果发现识别出来的文字错漏百出,特别是人名、专业术语,还有带点口音的发言,简直没法看。或者想用语音识别做个字幕,结果时间轴对不上,文字也乱七八糟。

我最近在做一个多语言会议记录的项目,试了好几个语音识别模型,最后发现Qwen3-ASR-0.6B在准确率上确实有优势,特别是对中文方言和英语各种口音的支持。但再好的模型,如果不会用,效果也会大打折扣。经过一个多月的实际使用和调试,我总结出了一套提升识别准确率的实用技巧,今天全部分享给你。

1. 理解影响识别准确率的关键因素

在开始调优之前,我们得先搞清楚,到底是什么在影响语音识别的准确率。很多人一上来就调模型参数,其实方向可能就错了。

语音识别准确率主要受四个因素影响,我把它们叫做“识别四要素”:

音频质量:这是最基础也是最重要的。如果音频本身噪音大、声音小、有回声,再好的模型也识别不准。就像你让一个听力再好的人,在嘈杂的菜市场里听人说话,他也听不清。

说话人特征:每个人的声音都不一样。有的人说话快,有的人说话慢;有的人普通话标准,有的人带口音;有的人声音洪亮,有的人声音低沉。这些都会影响识别效果。

语言和内容:Qwen3-ASR-0.6B支持52种语言和方言,但不同语言的识别难度不一样。中文和英文的识别准确率通常比较高,一些小语种或者方言可能就需要特别处理。另外,专业术语、人名、地名这些专有名词,模型可能没见过,容易识别错。

模型设置:最后才是模型本身的设置。比如采样率、语言检测模式、batch size这些参数,调好了能提升效果,但前提是前面三个因素都处理好了。

我刚开始用的时候,犯过一个错误:拿到一段质量很差的会议录音,直接扔给模型,然后抱怨识别不准。后来才发现,问题出在音频本身,不是模型不行。

2. 音频预处理:让模型“听”得更清楚

音频预处理就像给模型戴上一副好耳机,让它能听清楚你在说什么。这一步做得好,识别准确率能提升30%以上。

2.1 音频格式和采样率处理

Qwen3-ASR-0.6B对音频格式有一定要求,虽然它支持wav、mp3、flac等多种格式,但为了最好的效果,我建议统一转换成wav格式,采样率设为16000Hz。

import librosa
import soundfile as sf
import numpy as np

def preprocess_audio(input_path, output_path=None):
    """
    音频预处理函数
    将任意格式的音频转换为模型友好的格式
    """
    if output_path is None:
        output_path = input_path.replace('.mp3', '_processed.wav').replace('.m4a', '_processed.wav')
    
    try:
        # 读取音频,保持原始采样率
        audio, sr = librosa.load(input_path, sr=None, mono=True)
        
        print(f"原始音频信息 - 采样率: {sr}Hz, 时长: {len(audio)/sr:.2f}秒, 声道数: {'单声道' if len(audio.shape) == 1 else '立体声'}")
        
        # 如果是立体声,转换为单声道(取平均值)
        if len(audio.shape) > 1:
            audio = np.mean(audio, axis=1)
            print("已转换为单声道")
        
        # 重采样到16000Hz(模型推荐采样率)
        if sr != 16000:
            audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
            sr = 16000
            print(f"已重采样到{sr}Hz")
        
        # 标准化音量(避免声音太小或太大)
        max_amplitude = np.max(np.abs(audio))
        if max_amplitude > 0:
            audio = audio / max_amplitude * 0.9  # 缩放到90%的最大音量
            print("已标准化音量")
        
        # 保存为wav格式
        sf.write(output_path, audio, sr, subtype='PCM_16')
        print(f"处理完成,保存到: {output_path}")
        
        return output_path, audio, sr
        
    except Exception as e:
        print(f"音频处理失败: {e}")
        return None, None, None

# 使用示例
processed_file, audio_data, sample_rate = preprocess_audio("你的音频文件.mp3")

这个预处理函数做了几件事:统一转换成单声道、重采样到16000Hz、标准化音量。特别是音量标准化,很多人会忽略这一点。如果音频声音太小,模型可能听不清;声音太大,又可能产生削波失真。

2.2 降噪处理

背景噪音是语音识别的大敌。会议室的环境噪音、键盘敲击声、空调声,都会干扰识别。这里我推荐一个简单有效的降噪方法:

import noisereduce as nr
import soundfile as sf

def reduce_noise(audio_path, output_path=None):
    """
    降低音频背景噪音
    """
    if output_path is None:
        output_path = audio_path.replace('.wav', '_denoised.wav')
    
    try:
        # 读取音频
        audio, sr = sf.read(audio_path)
        
        # 如果音频太长,只取前5秒作为噪音样本
        # 假设前5秒是纯噪音(比如会议开始前的安静时刻)
        noise_sample_duration = 5  # 秒
        noise_samples = int(noise_sample_duration * sr)
        
        if len(audio) > noise_samples:
            # 取前5秒作为噪音样本
            noise_clip = audio[:noise_samples]
            
            # 使用noisereduce降噪
            reduced_noise = nr.reduce_noise(
                y=audio,
                sr=sr,
                y_noise=noise_clip,
                prop_decrease=0.8,  # 降噪强度,0.8表示降低80%的噪音
                stationary=True
            )
            
            # 保存降噪后的音频
            sf.write(output_path, reduced_noise, sr)
            print(f"降噪完成,保存到: {output_path}")
            
            return output_path
        else:
            print("音频太短,无法提取噪音样本")
            return audio_path
            
    except Exception as e:
        print(f"降噪失败: {e}")
        return audio_path

# 使用前需要安装noisereduce
# pip install noisereduce

这个方法的关键是找到一个纯噪音的片段作为样本。在会议录音中,通常会议开始前的那几秒比较安静,只有环境噪音,正好可以用来做噪音样本。

2.3 语音增强

对于声音特别小或者说话人距离麦克风比较远的录音,可以用语音增强技术:

def enhance_speech(audio_path, output_path=None):
    """
    语音增强,提升语音清晰度
    """
    if output_path is None:
        output_path = audio_path.replace('.wav', '_enhanced.wav')
    
    try:
        audio, sr = sf.read(audio_path)
        
        # 方法1:动态范围压缩(让小声变大,大声变小)
        # 这是一个简化的实现
        compressed = np.tanh(audio * 2) * 0.5  # 简单的压缩函数
        
        # 方法2:高频增强(提升语音清晰度)
        # 使用一个简单的高通滤波器
        from scipy import signal
        b, a = signal.butter(4, 100/(sr/2), btype='high')  # 100Hz高通滤波
        enhanced = signal.filtfilt(b, a, compressed)
        
        # 方法3:音量归一化
        enhanced = enhanced / np.max(np.abs(enhanced)) * 0.8
        
        sf.write(output_path, enhanced, sr)
        print(f"语音增强完成,保存到: {output_path}")
        
        return output_path
        
    except Exception as e:
        print(f"语音增强失败: {e}")
        return audio_path

语音增强要适度,过度处理反而会引入失真。我建议先试听一下处理后的效果,确保语音听起来自然。

3. 模型参数调优:让Qwen3-ASR发挥最佳性能

音频处理好之后,接下来就是调整模型参数了。Qwen3-ASR-0.6B提供了不少可调参数,但并不是所有参数都需要调。我根据实际使用经验,总结出了几个最关键参数。

3.1 语言设置:自动检测 vs 手动指定

Qwen3-ASR-0.6B支持自动语言检测,但在某些情况下,手动指定语言效果更好。

from qwen_asr import Qwen3ASRModel
import torch

# 初始化模型
model = Qwen3ASRModel.from_pretrained(
    "Qwen/Qwen3-ASR-0.6B",
    dtype=torch.float16,
    device_map="cuda:0" if torch.cuda.is_available() else "cpu",
)

# 场景1:明确知道语言 - 手动指定
# 比如你知道这段音频是中文普通话
result_zh = model.transcribe(
    audio="中文音频.wav",
    language="zh",  # 明确指定中文
)
print(f"手动指定中文: {result_zh[0].text}")

# 场景2:混合语言 - 让模型自动检测
# 比如中英文混合的会议
result_auto = model.transcribe(
    audio="中英文混合音频.wav",
    language=None,  # 自动检测
)
print(f"自动检测: {result_auto[0].text} (语言: {result_auto[0].language})")

# 场景3:方言识别
# 如果需要识别方言,可以指定具体的方言代码
result_dialect = model.transcribe(
    audio="粤语音频.wav",
    language="yue",  # 粤语
)
print(f"粤语识别: {result_dialect[0].text}")

什么时候该用自动检测,什么时候该手动指定?我的经验是:

  • 如果音频是单一语言,而且你知道是什么语言,手动指定准确率更高
  • 如果音频中有多种语言混合,或者你不确定是什么语言,用自动检测
  • 对于方言,如果模型支持该方言,手动指定方言代码效果更好

Qwen3-ASR支持的语言代码可以在官方文档里找到,常见的有:

  • zh: 中文普通话
  • en: 英语
  • yue: 粤语
  • ja: 日语
  • ko: 韩语

3.2 调整识别参数

除了语言设置,还有几个参数对识别效果影响很大:

def transcribe_with_optimized_params(audio_path, language=None):
    """
    使用优化参数进行语音识别
    """
    # 加载模型时调整参数
    model = Qwen3ASRModel.from_pretrained(
        "Qwen/Qwen3-ASR-0.6B",
        dtype=torch.float16,
        device_map="cuda:0" if torch.cuda.is_available() else "cpu",
        
        # 关键参数调整
        max_inference_batch_size=4,  # 批处理大小,根据GPU内存调整
        max_new_tokens=512,  # 最大生成token数,对于长音频可以调大
        temperature=0.1,  # 温度参数,越低结果越确定
        repetition_penalty=1.1,  # 重复惩罚,避免重复词语
    )
    
    # 转录时调整参数
    results = model.transcribe(
        audio=audio_path,
        language=language,
        
        # 音频相关参数
        sample_rate=16000,  # 确保和音频采样率一致
        chunk_length_s=30,  # 分块长度,长音频需要分块处理
        stride_length_s=5,  # 分块重叠,避免边界处识别错误
        
        # 解码参数
        beam_size=5,  # beam search大小,越大越准但越慢
        patience=1.0,  # beam search耐心值
        length_penalty=1.0,  # 长度惩罚
    )
    
    return results

# 参数说明:
# 1. max_inference_batch_size: 如果你的GPU内存小(比如8GB),设为2或4
# 2. chunk_length_s: 对于超过30秒的音频,分块处理效果更好
# 3. stride_length_s: 分块重叠,避免在分块边界处丢失内容
# 4. beam_size: 如果对准确率要求高,可以设为5或10,但速度会变慢

这里重点说一下chunk_length_sstride_length_s这两个参数。对于长音频(比如1小时的会议录音),直接整段识别效果可能不好。分块处理,每块30秒,块与块之间重叠5秒,这样既能处理长音频,又不会在分块边界处丢失内容。

3.3 处理长音频的最佳实践

长音频的识别需要特别处理,我总结了一个完整的长音频处理流程:

import os
from pydub import AudioSegment
import tempfile

def transcribe_long_audio(audio_path, language=None, chunk_duration=30, overlap=5):
    """
    长音频转录优化方案
    """
    # 1. 加载音频
    audio = AudioSegment.from_file(audio_path)
    duration_ms = len(audio)
    chunk_ms = chunk_duration * 1000
    overlap_ms = overlap * 1000
    
    print(f"音频总时长: {duration_ms/1000:.1f}秒")
    print(f"分块处理: 每块{chunk_duration}秒, 重叠{overlap}秒")
    
    # 2. 初始化模型
    model = Qwen3ASRModel.from_pretrained(
        "Qwen/Qwen3-ASR-0.6B",
        dtype=torch.float16,
        device_map="cuda:0" if torch.cuda.is_available() else "cpu",
        max_inference_batch_size=2,  # 长音频用小batch size
    )
    
    # 3. 分块处理
    all_texts = []
    start_ms = 0
    
    with tempfile.TemporaryDirectory() as temp_dir:
        while start_ms < duration_ms:
            # 计算当前块的结束时间(考虑重叠)
            end_ms = min(start_ms + chunk_ms + overlap_ms, duration_ms)
            
            # 提取音频块
            chunk = audio[start_ms:end_ms]
            
            # 保存临时文件
            chunk_path = os.path.join(temp_dir, f"chunk_{start_ms//1000}.wav")
            chunk.export(chunk_path, format="wav")
            
            print(f"处理块 {start_ms//1000}-{end_ms//1000}秒...")
            
            # 转录当前块
            try:
                results = model.transcribe(
                    audio=chunk_path,
                    language=language,
                    chunk_length_s=chunk_duration,
                    stride_length_s=overlap,
                )
                
                if results and len(results) > 0:
                    chunk_text = results[0].text
                    all_texts.append({
                        'start': start_ms/1000,
                        'end': end_ms/1000,
                        'text': chunk_text
                    })
                    print(f"  识别结果: {chunk_text[:50]}...")
                else:
                    print(f"  块 {start_ms//1000}-{end_ms//1000}秒识别失败")
                    
            except Exception as e:
                print(f"  处理失败: {e}")
            
            # 移动到下一块(减去重叠部分)
            start_ms += chunk_ms
    
    # 4. 合并结果(简单的重叠处理)
    full_text = ""
    for i, chunk in enumerate(all_texts):
        if i == 0:
            full_text += chunk['text']
        else:
            # 对于重叠部分,取后一段的内容
            # 这里可以更智能地处理重叠,比如根据置信度选择
            full_text += chunk['text']
    
    print(f"\n转录完成,总文本长度: {len(full_text)}字符")
    return full_text, all_texts

# 使用示例
long_audio_path = "长时间会议录音.wav"
full_text, chunks = transcribe_long_audio(long_audio_path, language="zh")

这个长音频处理方案有几个优点:分块处理避免内存不足、重叠分块避免边界错误、保存时间戳方便后续处理。

4. 后处理技巧:提升识别文本的质量

模型识别出来的文本,往往还需要一些后处理才能用。特别是标点符号、数字格式、专有名词这些,模型可能处理得不够好。

4.1 自动添加标点

语音识别出来的文本通常没有标点,或者标点位置不对。我们可以用规则或者小模型来添加标点:

import re

def add_punctuation(text):
    """
    给识别文本添加标点符号(基于规则的方法)
    """
    if not text:
        return text
    
    # 1. 处理常见的句子结尾
    # 在"吗、呢、吧、啊"等疑问词后加问号
    text = re.sub(r'([吗呢吧啊])([^?。!])', r'\1?\2', text)
    
    # 2. 根据关键词添加句号
    # 在"所以、但是、然后、因此"等词前加句号
    sentence_enders = ['所以', '但是', '然后', '因此', '另外', '同时', '最后']
    for word in sentence_enders:
        pattern = rf'([^。!?]){word}'
        text = re.sub(pattern, rf'\1。{word}', text)
    
    # 3. 处理数字和单位
    # 在数字和单位之间加空格,如"100元" -> "100 元"
    text = re.sub(r'(\d)([元个件张本])', r'\1 \2', text)
    
    # 4. 处理常见的专有名词
    # 公司名、产品名等(这里只是示例,实际需要根据领域定制)
    proper_nouns = {
        'Qwen': 'Qwen',
        'ASR': 'ASR',
        'AI': 'AI',
        'GPU': 'GPU',
        'CPU': 'CPU',
    }
    
    for word, replacement in proper_nouns.items():
        text = text.replace(word.lower(), replacement)
    
    # 5. 确保文本以标点结尾
    if text and text[-1] not in ['。', '!', '?', '.', '!', '?']:
        text += '。'
    
    return text

# 使用更智能的标点恢复模型(如果有的话)
def add_punctuation_with_model(text):
    """
    使用预训练模型添加标点(如果有相关模型)
    """
    # 这里可以集成一个标点恢复模型
    # 比如使用BERT等模型进行标点预测
    # 由于篇幅限制,这里只展示思路
    
    # 如果安装了相关库,可以这样用:
    # from punctuator import Punctuator
    # p = Punctuator('model.pcl')
    # return p.punctuate(text)
    
    # 暂时用规则方法
    return add_punctuation(text)

# 使用示例
raw_text = "今天天气真好我们一起去公园吧你想去吗我想去但是我要先完成工作"
punctuated = add_punctuation(raw_text)
print(f"原始文本: {raw_text}")
print(f"加标点后: {punctuated}")
# 输出: 今天天气真好。我们一起去公园吧?你想去吗?我想去,但是我要先完成工作。

4.2 数字和单位规范化

语音识别中,数字和单位经常识别错误。比如"一百"可能识别成"1百","3点5"可能识别成"3.5"。

def normalize_numbers(text):
    """
    规范化数字格式
    """
    if not text:
        return text
    
    # 中文数字转阿拉伯数字
    chinese_numbers = {
        '零': '0', '一': '1', '二': '2', '两': '2', '三': '3', '四': '4',
        '五': '5', '六': '6', '七': '7', '八': '8', '九': '9', '十': '10',
        '百': '100', '千': '1000', '万': '10000', '亿': '100000000'
    }
    
    # 简单的数字转换(实际项目可能需要更复杂的逻辑)
    for cn, num in chinese_numbers.items():
        text = text.replace(cn, num)
    
    # 规范化小数点和千分位
    text = re.sub(r'(\d)点(\d)', r'\1.\2', text)  # "3点5" -> "3.5"
    text = re.sub(r'(\d)点', r'\1.', text)  # "3点" -> "3."
    
    # 规范化百分比
    text = re.sub(r'(\d+)百分之', r'\1%', text)
    text = re.sub(r'百分之(\d+)', r'\1%', text)
    
    # 规范化货币
    text = re.sub(r'(\d+)块钱', r'\1元', text)
    text = re.sub(r'(\d+)块', r'\1元', text)
    
    return text

def normalize_units(text):
    """
    规范化单位
    """
    if not text:
        return text
    
    # 常见单位规范化
    unit_mapping = {
        'g b': 'GB',
        'm b': 'MB',
        'k b': 'KB',
        'g h z': 'GHz',
        'm h z': 'MHz',
        'k m': 'km',
        'c m': 'cm',
        'm m': 'mm',
    }
    
    for wrong, correct in unit_mapping.items():
        text = text.replace(wrong, correct)
    
    # 时间单位
    text = re.sub(r'(\d+)个小时', r'\1小时', text)
    text = re.sub(r'(\d+)个分钟', r'\1分钟', text)
    
    return text

# 使用示例
raw_text = "这个手机有八GB内存CPU频率是三点五GHz价格一千二百元"
normalized = normalize_numbers(raw_text)
normalized = normalize_units(normalized)
print(f"原始: {raw_text}")
print(f"规范化后: {normalized}")
# 输出: 这个手机有8GB内存CPU频率是3.5GHz价格1200元

4.3 领域术语校正

如果你在特定领域使用语音识别(比如医疗、法律、技术),领域术语的识别准确率可能不高。这时候需要建立一个术语库来校正。

class DomainTermCorrector:
    def __init__(self, term_file=None):
        """
        领域术语校正器
        term_file: 术语表文件,每行"错误术语|正确术语"
        """
        self.term_map = {}
        
        # 内置一些常见错误
        self.common_errors = {
            '语音识别': '语音识别',
            '人工智能': '人工智能',
            '机器学习': '机器学习',
            '深度学习': '深度学习',
            '神经网络': '神经网络',
        }
        
        self.term_map.update(self.common_errors)
        
        # 从文件加载术语表
        if term_file and os.path.exists(term_file):
            with open(term_file, 'r', encoding='utf-8') as f:
                for line in f:
                    line = line.strip()
                    if line and '|' in line:
                        wrong, correct = line.split('|', 1)
                        self.term_map[wrong.strip()] = correct.strip()
    
    def add_term(self, wrong, correct):
        """添加术语对"""
        self.term_map[wrong] = correct
    
    def correct_text(self, text):
        """校正文本中的术语"""
        if not text:
            return text
        
        # 按术语长度排序,先匹配长术语
        sorted_terms = sorted(self.term_map.items(), key=lambda x: len(x[0]), reverse=True)
        
        for wrong, correct in sorted_terms:
            if wrong in text:
                text = text.replace(wrong, correct)
        
        return text
    
    def build_from_transcripts(self, transcripts, correct_versions):
        """
        从转录文本和正确版本自动构建术语表
        transcripts: 识别出的文本列表
        correct_versions: 对应的正确文本列表
        """
        from difflib import SequenceMatcher
        
        for trans, correct in zip(transcripts, correct_versions):
            # 使用difflib找出差异部分
            matcher = SequenceMatcher(None, trans, correct)
            
            for tag, i1, i2, j1, j2 in matcher.get_opcodes():
                if tag == 'replace':  # 替换操作,可能是术语错误
                    wrong_term = trans[i1:i2]
                    right_term = correct[j1:j2]
                    
                    # 只处理长度适中的差异(避免单个字的差异)
                    if 2 <= len(wrong_term) <= 20 and 2 <= len(right_term) <= 20:
                        self.add_term(wrong_term, right_term)

# 使用示例
corrector = DomainTermCorrector()

# 添加一些技术术语
corrector.add_term('qwen', 'Qwen')
corrector.add_term('asr', 'ASR')
corrector.add_term('gpu', 'GPU')
corrector.add_term('transformer', 'Transformer')

# 校正文本
raw_text = "我们使用qwen asr模型在gpu上进行推理"
corrected = corrector.correct_text(raw_text)
print(f"原始: {raw_text}")
print(f"校正后: {corrected}")
# 输出: 我们使用Qwen ASR模型在GPU上进行推理

5. 针对特定场景的优化策略

不同的使用场景,优化策略也不一样。我根据实际项目经验,总结了几种常见场景的优化方案。

5.1 会议录音转写

会议录音的特点是:多人说话、有重叠、有背景噪音、有专业术语。

class MeetingTranscriber:
    def __init__(self, model_name="Qwen/Qwen3-ASR-0.6B"):
        """会议转录优化器"""
        self.model = Qwen3ASRModel.from_pretrained(
            model_name,
            dtype=torch.float16,
            device_map="cuda:0" if torch.cuda.is_available() else "cpu",
            max_inference_batch_size=2,  # 会议音频通常较长,用小batch
        )
        
        # 会议特定术语
        self.meeting_terms = {
            '开会': '开会',
            '讨论': '讨论',
            '议题': '议题',
            '决议': '决议',
            'action item': 'Action Item',
            'todo': 'TODO',
            'okr': 'OKR',
            'kpi': 'KPI',
        }
    
    def transcribe_meeting(self, audio_path, speaker_info=None):
        """
        转录会议录音
        speaker_info: 可选,说话人信息 {start_time: '姓名', ...}
        """
        # 1. 预处理:降噪和语音增强
        print("1. 音频预处理...")
        cleaned_audio = self.preprocess_meeting_audio(audio_path)
        
        # 2. 分说话人处理(如果有说话人信息)
        if speaker_info:
            print("2. 分说话人处理...")
            segments = self.split_by_speaker(cleaned_audio, speaker_info)
        else:
            # 如果没有说话人信息,整段处理
            segments = [(0, 'unknown', cleaned_audio)]
        
        # 3. 分段转录
        print("3. 分段转录...")
        transcripts = []
        
        for start_time, speaker, segment_audio in segments:
            # 保存临时音频文件
            temp_file = f"temp_segment_{start_time}.wav"
            sf.write(temp_file, segment_audio, 16000)
            
            # 转录
            results = self.model.transcribe(
                audio=temp_file,
                language="zh",  # 会议通常有主要语言
                chunk_length_s=20,  # 会议片段通常20秒左右
                stride_length_s=3,  # 重叠3秒避免遗漏
                beam_size=3,  # 中等beam size平衡速度和准确率
            )
            
            if results and len(results) > 0:
                text = results[0].text
                # 术语校正
                text = self.correct_meeting_terms(text)
                # 添加说话人标签
                if speaker != 'unknown':
                    text = f"{speaker}: {text}"
                
                transcripts.append({
                    'start_time': start_time,
                    'speaker': speaker,
                    'text': text
                })
            
            # 清理临时文件
            os.remove(temp_file)
        
        # 4. 后处理:添加时间戳,合并文本
        print("4. 后处理...")
        final_text = self.postprocess_meeting(transcripts)
        
        return final_text, transcripts
    
    def preprocess_meeting_audio(self, audio_path):
        """会议音频专用预处理"""
        # 会议音频通常需要更强的降噪
        audio, sr = sf.read(audio_path)
        
        # 使用更激进的降噪参数
        if len(audio) > sr * 5:  # 如果有至少5秒的音频
            noise_sample = audio[:sr * 5]  # 取前5秒作为噪音样本
            import noisereduce as nr
            audio = nr.reduce_noise(
                y=audio,
                sr=sr,
                y_noise=noise_sample,
                prop_decrease=0.9,  # 90%降噪
                stationary=True
            )
        
        return audio
    
    def correct_meeting_terms(self, text):
        """校正会议术语"""
        for wrong, correct in self.meeting_terms.items():
            text = text.replace(wrong, correct)
        return text
    
    def postprocess_meeting(self, transcripts):
        """会议转录后处理"""
        # 按时间排序
        transcripts.sort(key=lambda x: x['start_time'])
        
        # 生成带时间戳的文本
        result_lines = []
        for item in transcripts:
            # 格式化时间戳
            minutes = int(item['start_time'] // 60)
            seconds = int(item['start_time'] % 60)
            timestamp = f"[{minutes:02d}:{seconds:02d}]"
            
            line = f"{timestamp} {item['text']}"
            result_lines.append(line)
        
        return "\n".join(result_lines)

# 使用示例
transcriber = MeetingTranscriber()
meeting_text, segments = transcriber.transcribe_meeting("会议录音.wav")
print(meeting_text)

5.2 视频字幕生成

视频字幕的特点是:需要时间轴对齐、有背景音乐和音效、可能有多种语言。

class SubtitleGenerator:
    def __init__(self, model_name="Qwen/Qwen3-ASR-0.6B"):
        """字幕生成器"""
        self.model = Qwen3ASRModel.from_pretrained(
            model_name,
            dtype=torch.float16,
            device_map="cuda:0" if torch.cuda.is_available() else "cpu",
        )
        
    def generate_subtitles(self, audio_path, output_srt_path, language=None):
        """
        生成SRT字幕文件
        """
        # 1. 提取音频(如果输入是视频)
        if audio_path.endswith(('.mp4', '.avi', '.mov')):
            audio_path = self.extract_audio_from_video(audio_path)
        
        # 2. 语音识别,带时间戳
        print("进行语音识别...")
        results = self.model.transcribe(
            audio=audio_path,
            language=language,
            return_timestamps=True,  # 关键:返回时间戳
            chunk_length_s=10,  # 字幕通常10秒一段
            stride_length_s=2,  # 重叠2秒
        )
        
        # 3. 生成SRT格式
        print("生成SRT字幕...")
        srt_content = self.create_srt_from_timestamps(results)
        
        # 4. 保存文件
        with open(output_srt_path, 'w', encoding='utf-8') as f:
            f.write(srt_content)
        
        print(f"字幕已保存到: {output_srt_path}")
        return srt_content
    
    def create_srt_from_timestamps(self, results):
        """从时间戳创建SRT格式"""
        srt_lines = []
        
        for i, segment in enumerate(results.segments, 1):
            # SRT格式的时间戳:00:00:00,000 --> 00:00:05,000
            start_time = self.format_timestamp(segment.start)
            end_time = self.format_timestamp(segment.end)
            
            # 字幕文本(每行最多20个中文字符)
            text = segment.text
            wrapped_text = self.wrap_subtitle_text(text, max_chars=20)
            
            # 添加到SRT
            srt_lines.append(str(i))
            srt_lines.append(f"{start_time} --> {end_time}")
            srt_lines.append(wrapped_text)
            srt_lines.append("")  # 空行分隔
            
        return "\n".join(srt_lines)
    
    def format_timestamp(self, seconds):
        """格式化时间戳为SRT格式"""
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        millis = int((seconds - int(seconds)) * 1000)
        
        return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
    
    def wrap_subtitle_text(self, text, max_chars=20):
        """字幕文本换行"""
        if len(text) <= max_chars:
            return text
        
        # 简单的中文文本换行
        lines = []
        current_line = ""
        
        for char in text:
            if len(current_line) + 1 <= max_chars:
                current_line += char
            else:
                lines.append(current_line)
                current_line = char
        
        if current_line:
            lines.append(current_line)
        
        return "\n".join(lines)
    
    def extract_audio_from_video(self, video_path):
        """从视频提取音频"""
        try:
            import moviepy.editor as mp
            
            # 创建临时音频文件
            temp_audio = video_path.replace('.mp4', '_audio.wav').replace('.avi', '_audio.wav')
            
            # 提取音频
            video = mp.VideoFileClip(video_path)
            video.audio.write_audiofile(temp_audio, codec='pcm_s16le')
            
            return temp_audio
            
        except ImportError:
            print("需要安装moviepy: pip install moviepy")
            return video_path

# 使用示例
generator = SubtitleGenerator()
srt_content = generator.generate_subtitles(
    audio_path="视频文件.mp4",
    output_srt_path="字幕.srt",
    language="zh"  # 指定语言
)

5.3 实时语音识别

实时识别对延迟要求高,需要特别优化。

class RealtimeASROptimized:
    def __init__(self, model_name="Qwen/Qwen3-ASR-0.6B", use_vad=True):
        """优化的实时语音识别"""
        self.use_vad = use_vad  # 是否使用语音活动检测
        self.sample_rate = 16000
        self.chunk_duration = 2.0  # 每次处理2秒音频
        self.chunk_size = int(self.sample_rate * self.chunk_duration)
        
        # 加载模型,使用优化配置
        self.model = Qwen3ASRModel.from_pretrained(
            model_name,
            dtype=torch.float16,
            device_map="cuda:0" if torch.cuda.is_available() else "cpu",
            max_inference_batch_size=1,  # 实时识别用batch size 1
            max_new_tokens=128,  # 实时识别不需要太长的文本
        )
        
        # 初始化语音活动检测(如果启用)
        if use_vad:
            try:
                import webrtcvad
                self.vad = webrtcvad.Vad(2)  # 中等灵敏度
            except ImportError:
                print("未安装webrtcvad,语音活动检测不可用")
                self.use_vad = False
        
        # 音频缓冲区
        self.audio_buffer = []
        self.buffer_duration = 0
        self.max_buffer_duration = 10.0  # 最大缓冲10秒
        
    def process_realtime_audio(self, audio_chunk):
        """
        处理实时音频流
        audio_chunk: 音频数据数组
        """
        # 添加到缓冲区
        self.audio_buffer.append(audio_chunk)
        self.buffer_duration += len(audio_chunk) / self.sample_rate
        
        # 如果缓冲区足够长,或者检测到语音结束
        should_process = False
        
        if self.use_vad:
            # 使用VAD检测是否有语音
            is_speech = self.detect_speech(audio_chunk)
            if not is_speech and self.buffer_duration > 1.0:
                # 没有语音且缓冲区有内容,处理缓冲区
                should_process = True
        elif self.buffer_duration >= self.chunk_duration:
            # 不使用VAD,定时处理
            should_process = True
        
        if should_process and self.audio_buffer:
            # 处理缓冲区中的音频
            full_audio = np.concatenate(self.audio_buffer)
            
            # 转录
            results = self.model.transcribe(
                audio=full_audio,
                sample_rate=self.sample_rate,
                language=None,  # 自动检测
                chunk_length_s=min(self.buffer_duration, 5.0),  # 不超过5秒
                stride_length_s=0.5,
                beam_size=2,  # 实时识别用小beam size
            )
            
            # 清空缓冲区
            self.audio_buffer = []
            self.buffer_duration = 0
            
            if results and len(results) > 0:
                return results[0].text
        
        return None
    
    def detect_speech(self, audio_chunk):
        """检测音频块是否有语音"""
        if not self.use_vad:
            return True
        
        try:
            # 转换为16-bit PCM
            audio_int16 = (audio_chunk * 32767).astype(np.int16)
            
            # 分帧检测(每帧30ms)
            frame_duration = 0.03  # 30ms
            frame_size = int(self.sample_rate * frame_duration)
            
            has_speech = False
            for i in range(0, len(audio_int16), frame_size):
                frame = audio_int16[i:i+frame_size]
                if len(frame) < frame_size:
                    continue
                
                # 检测这一帧是否有语音
                if self.vad.is_speech(frame.tobytes(), self.sample_rate):
                    has_speech = True
                    break
            
            return has_speech
            
        except Exception as e:
            print(f"VAD检测失败: {e}")
            return True

# 使用示例(简化版)
realtime_asr = RealtimeASROptimized(use_vad=True)

# 模拟实时音频流
def simulate_realtime_stream(audio_file, chunk_size=3200):  # 3200 samples = 0.2秒
    audio, sr = sf.read(audio_file)
    
    for i in range(0, len(audio), chunk_size):
        chunk = audio[i:i+chunk_size]
        if len(chunk) < chunk_size:
            break
        
        # 处理音频块
        text = realtime_asr.process_realtime_audio(chunk)
        if text:
            print(f"识别结果: {text}")

6. 总结

提高Qwen3-ASR-0.6B的语音识别准确率,不是简单地调几个参数就能解决的。它需要从音频预处理、模型参数调优、到后处理的一整套方案。

根据我这一个多月的使用经验,最重要的几点是:

音频质量是关键。再好的模型也识别不了质量很差的音频。一定要做好降噪、音量标准化、格式转换这些预处理工作。我建议把音频预处理做成一个标准流程,每段音频都先走一遍预处理。

参数要因场景而异。会议录音、视频字幕、实时识别,不同的场景需要不同的参数设置。会议录音可以容忍一定的延迟,可以用更大的beam size追求准确率;实时识别则要优先考虑速度,用小beam size。

后处理不能少。模型识别出来的文本,特别是标点、数字、专有名词,往往需要后处理才能用。建立一个领域术语库,能大幅提升专业场景的识别准确率。

长音频要分块。超过1分钟的音频,一定要分块处理,并且块与块之间要有重叠。这样既能避免内存不足,又能防止在分块边界处丢失内容。

实时识别用VAD。如果做实时识别,一定要用语音活动检测(VAD)。只在实际有语音的时候才识别,能减少无效计算,提高响应速度。

最后,Qwen3-ASR-0.6B本身是个很优秀的模型,支持52种语言和方言,体积小速度快。把这些技巧用上,你会发现它的识别准确率能有明显的提升。特别是在中文场景下,对普通话和各种方言的支持都很不错。

实际用的时候,建议你先从简单的开始,把音频预处理做好,然后用默认参数跑一下看看效果。根据效果再调整参数,最后加上后处理。这样一步步来,既能看到改进的效果,也不会一开始就被复杂的调参搞晕。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐