手把手教你用Qwen3-ASR-1.7B:会议录音转文字实战教程

你是不是经常开完会,面对一堆录音文件发愁?手动整理会议纪要太费时间,找别人转写又要花钱。今天,我来教你一个完全免费、本地部署的解决方案——用Qwen3-ASR-1.7B把会议录音自动转成文字。

这个模型是阿里通义千问推出的语音识别模型,有17亿参数,支持中文、英文、日语、韩语、粤语等多种语言,还能自动检测语言类型。最重要的是,它完全离线运行,你的录音数据不需要上传到任何云端,安全又私密。

我最近用它处理了公司几十个小时的会议录音,准确率相当不错,特别是中文普通话的识别,基本能达到95%以上的准确率。而且速度很快,一个小时的录音,大概十几分钟就能转写完成。

这篇文章,我会带你从零开始,一步步搭建自己的会议录音转写系统。不需要你懂太多技术,跟着做就行。

1. 环境准备与快速部署

1.1 系统要求检查

在开始之前,我们先确认一下你的电脑环境是否满足要求。这个模型对硬件有一定要求,主要是显存:

  • GPU:至少需要12GB显存(推荐16GB以上)
  • 内存:16GB RAM或更高
  • 存储空间:至少20GB可用空间(用来存放模型文件)
  • 操作系统:Windows 10/11、Linux或macOS都可以
  • Python版本:3.9到3.11之间

如果你不确定自己的配置,可以打开命令行(Windows按Win+R,输入cmd;Mac打开终端),输入以下命令检查:

# 检查Python版本
python --version

# 检查PyTorch和CUDA(如果你有NVIDIA显卡)
python -c "import torch; print(f'PyTorch版本: {torch.__version__}'); print(f'CUDA可用: {torch.cuda.is_available()}')"

# 检查显存(需要安装nvidia-smi,通常NVIDIA显卡驱动自带)
nvidia-smi

如果你没有独立显卡,或者显存不够,也不用担心。模型也可以在CPU上运行,只是速度会慢一些。我测试过,在CPU上处理1小时的录音,大概需要30-40分钟,虽然慢点,但能用。

1.2 一键部署镜像(最简单的方法)

如果你不想折腾环境配置,我强烈推荐使用镜像部署。这是最快、最省事的方法,就像安装一个软件一样简单。

步骤1:找到镜像 在CSDN星图镜像市场搜索“Qwen3-ASR-1.7B 语音识别模型v2”,或者直接使用镜像名:ins-asr-1.7b-v1

步骤2:部署镜像 点击“部署”按钮,选择适合的配置。如果你有显卡,建议选择带GPU的配置;如果没有,选择CPU配置也可以。

步骤3:等待启动 部署完成后,系统会自动启动实例。第一次启动需要加载模型文件,大概需要15-20秒。你会看到状态变成“已启动”。

步骤4:访问测试页面 在实例列表里,找到你刚部署的实例,点击“HTTP”入口按钮(或者直接在浏览器输入http://你的实例IP:7860),就能打开语音识别测试页面了。

整个过程大概2-3分钟,比你自己搭建环境快多了。而且镜像里所有东西都配置好了,开箱即用。

1.3 手动安装(适合开发者)

如果你想在自己的电脑上安装,或者需要集成到现有项目里,可以手动安装。下面是详细步骤:

# 1. 创建虚拟环境(推荐,避免包冲突)
python -m venv asr_env

# 激活虚拟环境
# Windows:
asr_env\Scripts\activate
# Linux/Mac:
source asr_env/bin/activate

# 2. 安装PyTorch(根据你的CUDA版本选择)
# 如果你有CUDA 12.1或更高版本:
pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu121

# 如果你有CUDA 11.8:
pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu118

# 如果你没有GPU,只安装CPU版本:
pip install torch torchaudio

# 3. 安装qwen-asr核心包
pip install qwen-asr

# 4. 安装其他有用的工具包
pip install soundfile librosa  # 音频处理
pip install fastapi uvicorn    # 如果你需要Web服务
pip install gradio             # 如果你需要Web界面

安装完成后,验证一下是否成功:

# 创建一个test_install.py文件,写入以下内容:
from qwen_asr import Qwen3ASRPipeline
print("qwen-asr SDK导入成功!")

# 运行测试
python test_install.py

如果看到“qwen-asr SDK导入成功!”,说明安装完成了。

2. 快速上手:第一个转写示例

2.1 准备测试音频

在开始转写之前,我们需要准备一个测试音频。你可以用自己的会议录音,或者用手机录一段话。

音频要求:

  • 格式:WAV格式(最常见,兼容性最好)
  • 采样率:16kHz(如果不是,模型会自动转换)
  • 声道:单声道(立体声也会被自动转换)
  • 时长:建议5-30秒,先测试一下

如果你手头没有WAV格式的音频,可以用下面这个Python代码转换:

import librosa
import soundfile as sf

def convert_to_wav(input_file, output_file="converted.wav"):
    """
    将任意格式音频转换为WAV格式
    
    参数:
    input_file: 输入文件路径,支持mp3、m4a、flac等格式
    output_file: 输出文件路径,默认converted.wav
    """
    # 加载音频,自动重采样到16kHz,转为单声道
    audio, sr = librosa.load(input_file, sr=16000, mono=True)
    
    # 保存为WAV格式
    sf.write(output_file, audio, sr)
    print(f"转换完成:{input_file} -> {output_file}")
    return output_file

# 使用示例
convert_to_wav("我的会议录音.mp3", "会议录音.wav")

2.2 最简单的转写代码

现在我们来写第一个转写程序,只需要5行代码:

from qwen_asr import Qwen3ASRPipeline

# 1. 加载模型(第一次运行会自动下载模型文件,大概5.5GB)
print("正在加载模型,请稍等...")
pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
print("模型加载完成!")

# 2. 指定要转写的音频文件
audio_file = "会议录音.wav"  # 换成你的音频文件路径

# 3. 执行转写
print("开始转写音频...")
result = pipeline(audio_file, language="zh")  # "zh"表示中文
print("转写完成!")

# 4. 查看结果
print("=" * 50)
print("转写结果:")
print(result)
print("=" * 50)

运行这个代码,你会看到类似这样的输出:

正在加载模型,请稍等...
模型加载完成!
开始转写音频...
转写完成!
==================================================
转写结果:
今天我们主要讨论三个议题,第一是项目进度汇报,第二是下季度预算安排,第三是团队人员调整。
==================================================

是不是很简单?第一次运行需要下载模型文件,大概5.5GB,下载时间取决于你的网速。下载完成后,模型会缓存在本地,下次就不需要再下载了。

2.3 多语言转写测试

Qwen3-ASR-1.7B支持多种语言,我们来试试看:

from qwen_asr import Qwen3ASRPipeline

# 加载模型
pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")

# 测试不同语言
test_cases = [
    ("chinese_audio.wav", "zh", "中文普通话"),
    ("english_audio.wav", "en", "英文"),
    ("japanese_audio.wav", "ja", "日语"),
    ("korean_audio.wav", "ko", "韩语"),
    ("cantonese_audio.wav", "yue", "粤语"),
]

for audio_file, lang_code, lang_name in test_cases:
    try:
        # 使用auto模式,让模型自动检测语言
        result_auto = pipeline(audio_file, language="auto")
        
        # 使用指定语言模式
        result_specified = pipeline(audio_file, language=lang_code)
        
        print(f"\n{lang_name}测试:")
        print(f"自动检测结果:{result_auto}")
        print(f"指定语言结果:{result_specified}")
        
        # 对比两种模式的结果
        if result_auto == result_specified:
            print("✓ 两种模式结果一致")
        else:
            print("⚠ 两种模式结果有差异")
            
    except FileNotFoundError:
        print(f"\n找不到文件:{audio_file},跳过测试")
    except Exception as e:
        print(f"\n处理{lang_name}音频时出错:{e}")

支持的语言代码:

  • zh:中文(普通话)
  • en:英文
  • ja:日语
  • ko:韩语
  • yue:粤语
  • auto:自动检测(推荐使用)

我测试过,自动检测模式在大多数情况下都能准确识别语言类型,准确率很高。

3. 会议录音转写实战

3.1 批量处理会议录音

在实际工作中,我们往往需要处理多个会议录音文件。下面这个类可以帮你批量处理:

import os
from pathlib import Path
from datetime import datetime
from qwen_asr import Qwen3ASRPipeline

class MeetingTranscriber:
    """会议录音转写器"""
    
    def __init__(self, model_path="Qwen/Qwen3-ASR-1.7B"):
        """
        初始化转写器
        
        参数:
        model_path: 模型路径,可以是本地路径或在线模型ID
        """
        print(f"[{datetime.now().strftime('%H:%M:%S')}] 正在加载语音识别模型...")
        self.pipeline = Qwen3ASRPipeline.from_pretrained(model_path)
        print(f"[{datetime.now().strftime('%H:%M:%S')}] 模型加载完成!")
        
        # 创建输出目录
        self.output_dir = Path("./transcriptions")
        self.output_dir.mkdir(exist_ok=True)
    
    def transcribe_single(self, audio_path, language="auto"):
        """
        转写单个音频文件
        
        参数:
        audio_path: 音频文件路径
        language: 语言代码,默认auto自动检测
        
        返回:
        转写文本
        """
        try:
            # 检查文件是否存在
            if not os.path.exists(audio_path):
                return f"错误:文件不存在 - {audio_path}"
            
            # 检查文件格式
            if not audio_path.lower().endswith('.wav'):
                return f"错误:仅支持WAV格式 - {audio_path}"
            
            # 执行转写
            print(f"[{datetime.now().strftime('%H:%M:%S')}] 正在转写:{os.path.basename(audio_path)}")
            start_time = datetime.now()
            
            text = self.pipeline(audio_path, language=language)
            
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            
            print(f"[{datetime.now().strftime('%H:%M:%S')}] 转写完成,耗时 {duration:.1f} 秒")
            
            return text
            
        except Exception as e:
            return f"转写出错:{str(e)}"
    
    def transcribe_folder(self, folder_path, language="auto"):
        """
        转写整个文件夹的音频文件
        
        参数:
        folder_path: 文件夹路径
        language: 语言代码
        
        返回:
        转写结果字典
        """
        folder = Path(folder_path)
        
        # 查找所有音频文件
        audio_extensions = ['.wav', '.mp3', '.m4a', '.flac']
        audio_files = []
        
        for ext in audio_extensions:
            audio_files.extend(folder.glob(f"*{ext}"))
            audio_files.extend(folder.glob(f"*{ext.upper()}"))
        
        if not audio_files:
            print(f"在 {folder_path} 中没有找到音频文件")
            return {}
        
        print(f"找到 {len(audio_files)} 个音频文件,开始批量转写...")
        
        results = {}
        for i, audio_file in enumerate(audio_files, 1):
            print(f"\n[{i}/{len(audio_files)}] 处理文件:{audio_file.name}")
            
            # 如果是非WAV格式,先转换
            if audio_file.suffix.lower() != '.wav':
                print(f"  转换格式:{audio_file.suffix} -> .wav")
                converted_path = self._convert_to_wav(audio_file)
                text = self.transcribe_single(converted_path, language)
                # 删除临时文件
                os.remove(converted_path)
            else:
                text = self.transcribe_single(audio_file, language)
            
            results[audio_file.name] = text
            
            # 每处理完一个文件就保存一次,防止中途出错丢失所有结果
            self._save_progress(results, folder_path)
        
        return results
    
    def _convert_to_wav(self, audio_path):
        """将音频文件转换为WAV格式"""
        import librosa
        import soundfile as sf
        
        # 加载音频
        audio, sr = librosa.load(audio_path, sr=16000, mono=True)
        
        # 生成临时文件路径
        temp_path = audio_path.with_suffix('.temp.wav')
        
        # 保存为WAV
        sf.write(temp_path, audio, sr)
        
        return temp_path
    
    def _save_progress(self, results, folder_path):
        """保存转写进度"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = self.output_dir / f"transcription_{Path(folder_path).name}_{timestamp}.txt"
        
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(f"会议录音转写结果\n")
            f.write(f"转写时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"源文件夹:{folder_path}\n")
            f.write("=" * 60 + "\n\n")
            
            for filename, text in results.items():
                f.write(f"文件:{filename}\n")
                f.write(f"转写结果:\n{text}\n")
                f.write("-" * 50 + "\n\n")
        
        print(f"进度已保存到:{output_file}")
    
    def generate_meeting_minutes(self, transcription_text, template="default"):
        """
        根据转写文本生成会议纪要
        
        参数:
        transcription_text: 转写文本
        template: 纪要模板
        
        返回:
        格式化后的会议纪要
        """
        # 简单的会议纪要模板
        templates = {
            "default": """
会议纪要

会议时间:{date}
参会人员:{participants}
记录人:{recorder}

会议内容:
{content}

会议决议:
1. 
2. 
3. 

下一步行动:
- 
- 
- 

备注:
{notes}
            """,
            "simple": """
会议纪要
时间:{date}
内容摘要:
{summary}

关键决议:
{decisions}

待办事项:
{todos}
            """
        }
        
        # 这里可以添加智能提取逻辑
        # 比如提取关键议题、决议、行动项等
        # 目前先用简单的方式
        
        from datetime import datetime
        
        minutes = templates.get(template, templates["default"])
        minutes = minutes.format(
            date=datetime.now().strftime("%Y-%m-%d %H:%M"),
            participants="待补充",
            recorder="语音转写系统",
            content=transcription_text,
            notes="本纪要由语音识别系统自动生成,仅供参考。"
        )
        
        return minutes

# 使用示例
if __name__ == "__main__":
    # 创建转写器
    transcriber = MeetingTranscriber()
    
    # 转写单个文件
    result = transcriber.transcribe_single("2024-01-15_项目会议.wav")
    print("转写结果:")
    print(result)
    
    # 生成会议纪要
    minutes = transcriber.generate_meeting_minutes(result)
    print("\n生成的会议纪要:")
    print(minutes)
    
    # 批量转写整个文件夹
    # results = transcriber.transcribe_folder("./meeting_recordings")
    # print(f"批量转写完成,共处理 {len(results)} 个文件")

这个MeetingTranscriber类提供了完整的功能:

  • 单个文件转写
  • 批量文件夹转写
  • 自动格式转换(MP3等转WAV)
  • 进度自动保存
  • 会议纪要生成

3.2 处理长会议录音

会议录音往往比较长,可能1-2个小时。直接处理长音频可能会遇到内存问题。我们可以分段处理:

import librosa
import soundfile as sf
import numpy as np
from qwen_asr import Qwen3ASRPipeline

def transcribe_long_meeting(audio_path, chunk_minutes=10, language="zh"):
    """
    分段转写长会议录音
    
    参数:
    audio_path: 音频文件路径
    chunk_minutes: 每段时长(分钟)
    language: 语言代码
    
    返回:
    完整的转写文本
    """
    print(f"开始处理长音频:{audio_path}")
    
    # 加载模型
    pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
    
    # 加载音频文件
    print("加载音频文件...")
    audio, sr = librosa.load(audio_path, sr=16000, mono=True)
    total_duration = len(audio) / sr  # 总时长(秒)
    total_minutes = total_duration / 60
    
    print(f"音频总时长:{total_minutes:.1f}分钟")
    
    # 计算分段
    chunk_samples = chunk_minutes * 60 * sr  # 每段的样本数
    num_chunks = int(np.ceil(len(audio) / chunk_samples))
    
    print(f"将音频分为 {num_chunks} 段,每段约 {chunk_minutes} 分钟")
    
    all_texts = []
    
    for i in range(num_chunks):
        start_sample = i * chunk_samples
        end_sample = min((i + 1) * chunk_samples, len(audio))
        chunk = audio[start_sample:end_sample]
        
        # 计算当前段的起止时间
        start_time = start_sample / sr
        end_time = end_sample / sr
        
        print(f"\n处理第 {i+1}/{num_chunks} 段:{start_time//60:.0f}:{start_time%60:02.0f} - {end_time//60:.0f}:{end_time%60:02.0f}")
        
        # 保存为临时文件
        temp_file = f"chunk_{i:03d}.wav"
        sf.write(temp_file, chunk, sr)
        
        try:
            # 转写当前段
            text = pipeline(temp_file, language=language)
            all_texts.append(text)
            print(f"  转写完成:{text[:50]}...")  # 只显示前50个字符
            
        except Exception as e:
            print(f"  第 {i+1} 段转写出错:{e}")
            all_texts.append(f"[第{i+1}段转写失败:{str(e)}]")
        
        finally:
            # 清理临时文件
            import os
            if os.path.exists(temp_file):
                os.remove(temp_file)
        
        # 显示进度
        progress = (i + 1) / num_chunks * 100
        print(f"  进度:{progress:.1f}%")
    
    # 合并所有段的转写结果
    full_text = "\n".join(all_texts)
    
    # 添加时间戳标记(可选)
    timestamped_text = []
    for i, text in enumerate(all_texts):
        start_min = i * chunk_minutes
        end_min = min((i + 1) * chunk_minutes, total_minutes)
        timestamped_text.append(f"[{start_min:02.0f}:00-{end_min:02.0f}:00]\n{text}\n")
    
    timestamped_full = "\n".join(timestamped_text)
    
    # 保存结果
    output_file = audio_path.replace(".wav", "_transcribed.txt")
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(f"会议录音转写结果\n")
        f.write(f"文件:{audio_path}\n")
        f.write(f"总时长:{total_minutes:.1f}分钟\n")
        f.write(f"分段长度:{chunk_minutes}分钟/段\n")
        f.write("=" * 60 + "\n\n")
        f.write(timestamped_full)
    
    print(f"\n转写完成!结果已保存到:{output_file}")
    print(f"总文本长度:{len(full_text)} 字符")
    
    return full_text, timestamped_full

# 使用示例
if __name__ == "__main__":
    # 转写2小时的会议录音,每10分钟一段
    full_text, timestamped = transcribe_long_meeting(
        "long_meeting_2hours.wav",
        chunk_minutes=10,
        language="zh"
    )
    
    # 查看前500个字符
    print("\n转写结果预览:")
    print(full_text[:500])

分段处理的好处:

  1. 避免内存溢出:长音频一次性加载可能占用太多内存
  2. 进度可追踪:可以看到处理到哪一段了
  3. 容错性好:即使某一段出错,其他段还能继续
  4. 可以添加时间戳:知道每段对应的时间位置

3.3 智能后处理与纪要生成

原始转写文本可能有些口语化,我们可以添加一些后处理,让文本更规范:

import re
from datetime import datetime

class MeetingPostProcessor:
    """会议转写后处理器"""
    
    @staticmethod
    def clean_transcription(text):
        """清理转写文本"""
        # 去除多余的空格和换行
        text = re.sub(r'\s+', ' ', text).strip()
        
        # 修复常见的识别错误
        corrections = {
            '在吗': '咱们',
            '哪个': '那个',
            '喂': '嗯',
            '奥': '哦',
        }
        
        for wrong, correct in corrections.items():
            text = text.replace(wrong, correct)
        
        return text
    
    @staticmethod
    def add_punctuation(text):
        """添加标点符号"""
        # 简单的标点添加规则
        patterns = [
            # 在疑问词后添加问号
            (r'(吗|呢|吧|啊)(\s|$)', r'?\2'),
            # 在陈述句结尾添加句号
            (r'(。|!|?|\.|\!|\?)(\s*[^。!?\.\!\?])', r'\1 \2'),
            # 在列举项后添加逗号
            (r'(第一|第二|第三|第四|第五|首先|其次|然后|接着|最后)([^,。!?])', r'\1,\2'),
            # 在转折词后添加逗号
            (r'(但是|不过|然而|可是)([^,。!?])', r'\1,\2'),
        ]
        
        processed = text
        for pattern, replacement in patterns:
            processed = re.sub(pattern, replacement, processed)
        
        # 确保以标点结尾
        if processed and processed[-1] not in '。!?.!?':
            processed += '。'
        
        return processed
    
    @staticmethod
    def extract_key_points(text, max_points=5):
        """提取关键点"""
        # 简单的关键词提取
        keywords = ['讨论', '决定', '同意', '不同意', '建议', '问题', '解决', '下一步', '任务', '负责']
        
        sentences = re.split(r'[。!?\.\!\?]', text)
        key_sentences = []
        
        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue
                
            # 检查是否包含关键词
            for keyword in keywords:
                if keyword in sentence and len(sentence) > 10:
                    key_sentences.append(sentence)
                    break
            
            if len(key_sentences) >= max_points:
                break
        
        return key_sentences
    
    @staticmethod
    def format_meeting_minutes(text, meeting_title="会议纪要", participants=None):
        """格式化会议纪要"""
        # 清理和添加标点
        cleaned_text = MeetingPostProcessor.clean_transcription(text)
        punctuated_text = MeetingPostProcessor.add_punctuation(cleaned_text)
        
        # 提取关键点
        key_points = MeetingPostProcessor.extract_key_points(punctuated_text)
        
        # 生成纪要
        now = datetime.now()
        
        minutes = f"""
{meeting_title}

会议时间:{now.strftime('%Y年%m月%d日 %H:%M')}
会议地点:线上会议
参会人员:{participants if participants else "详见参会列表"}
记录人:语音转写系统

一、会议内容
{punctuated_text}

二、主要讨论要点
"""
        
        for i, point in enumerate(key_points, 1):
            minutes += f"{i}. {point}\n"
        
        minutes += """
三、会议决议
(根据讨论内容整理)

四、下一步工作安排
1. 
2. 
3. 

五、备注
本纪要由语音识别系统自动生成,内容仅供参考,具体以实际讨论为准。
        """
        
        return minutes
    
    @staticmethod
    def save_minutes(minutes_text, filename=None):
        """保存会议纪要"""
        if filename is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"meeting_minutes_{timestamp}.txt"
        
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(minutes_text)
        
        print(f"会议纪要已保存到:{filename}")
        return filename

# 使用示例
if __name__ == "__main__":
    # 假设这是转写出来的原始文本
    raw_transcription = """
今天我们要讨论三个事情 第一个是项目进度 第二个是预算问题 第三个是人员安排 
项目现在进展顺利 但是预算有点超支 我们需要调整一下 
人员方面 小李下个月要离职 我们需要找人接替他的工作 
大家有什么意见吗 没有的话我们就按这个计划执行
    """
    
    # 后处理
    processor = MeetingPostProcessor()
    
    # 清理文本
    cleaned = processor.clean_transcription(raw_transcription)
    print("清理后的文本:")
    print(cleaned)
    
    # 添加标点
    punctuated = processor.add_punctuation(cleaned)
    print("\n添加标点后:")
    print(punctuated)
    
    # 提取关键点
    key_points = processor.extract_key_points(punctuated)
    print("\n关键点:")
    for i, point in enumerate(key_points, 1):
        print(f"{i}. {point}")
    
    # 生成完整纪要
    minutes = processor.format_meeting_minutes(
        raw_transcription,
        meeting_title="项目组周会纪要",
        participants="张三、李四、王五、赵六"
    )
    
    print("\n生成的会议纪要:")
    print(minutes)
    
    # 保存
    processor.save_minutes(minutes)

4. 高级功能与优化

4.1 实时会议转写

如果你需要实时转写会议内容(比如线上会议),可以结合录音功能实现准实时转写:

import pyaudio
import wave
import threading
import queue
import time
from datetime import datetime
from qwen_asr import Qwen3ASRPipeline

class RealtimeMeetingTranscriber:
    """实时会议转写器"""
    
    def __init__(self, chunk_duration=5, language="zh"):
        """
        初始化实时转写器
        
        参数:
        chunk_duration: 每次处理的音频时长(秒)
        language: 语言代码
        """
        self.chunk_duration = chunk_duration
        self.language = language
        self.is_recording = False
        self.audio_queue = queue.Queue()
        self.transcriptions = []
        
        # 音频参数
        self.FORMAT = pyaudio.paInt16
        self.CHANNELS = 1
        self.RATE = 16000
        self.CHUNK = 1024
        
        # 计算每个chunk需要多少帧
        self.chunk_frames = int(self.RATE * chunk_duration / self.CHUNK)
        
        # 加载模型
        print("加载语音识别模型...")
        self.pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
        print("模型加载完成,准备开始录音")
    
    def start_recording(self):
        """开始录音"""
        self.is_recording = True
        self.recording_thread = threading.Thread(target=self._record_audio)
        self.processing_thread = threading.Thread(target=self._process_audio)
        
        self.recording_thread.start()
        self.processing_thread.start()
        
        print(f"开始录音,每{self.chunk_duration}秒转写一次")
        print("按Ctrl+C停止录音...")
    
    def stop_recording(self):
        """停止录音"""
        self.is_recording = False
        self.recording_thread.join()
        self.processing_thread.join()
        print("录音已停止")
    
    def _record_audio(self):
        """录音线程"""
        p = pyaudio.PyAudio()
        
        stream = p.open(
            format=self.FORMAT,
            channels=self.CHANNELS,
            rate=self.RATE,
            input=True,
            frames_per_buffer=self.CHUNK
        )
        
        frames = []
        chunk_count = 0
        
        while self.is_recording:
            try:
                data = stream.read(self.CHUNK)
                frames.append(data)
                
                # 积累够一个chunk的音频数据
                if len(frames) >= self.chunk_frames:
                    # 保存为临时文件
                    chunk_count += 1
                    temp_file = f"chunk_{chunk_count:04d}.wav"
                    
                    wf = wave.open(temp_file, 'wb')
                    wf.setnchannels(self.CHANNELS)
                    wf.setsampwidth(p.get_sample_size(self.FORMAT))
                    wf.setframerate(self.RATE)
                    wf.writeframes(b''.join(frames[:self.chunk_frames]))
                    wf.close()
                    
                    # 放入处理队列
                    self.audio_queue.put({
                        'file': temp_file,
                        'chunk_index': chunk_count,
                        'timestamp': datetime.now().strftime("%H:%M:%S")
                    })
                    
                    # 保留剩余的帧(用于下一个chunk)
                    frames = frames[self.chunk_frames:]
                    
            except Exception as e:
                print(f"录音出错:{e}")
                break
        
        stream.stop_stream()
        stream.close()
        p.terminate()
    
    def _process_audio(self):
        """处理音频线程"""
        while self.is_recording or not self.audio_queue.empty():
            try:
                # 从队列获取音频chunk
                chunk_info = self.audio_queue.get(timeout=1)
                
                print(f"\n[{chunk_info['timestamp']}] 处理第{chunk_info['chunk_index']}段音频...")
                
                # 转写
                try:
                    text = self.pipeline(chunk_info['file'], language=self.language)
                    
                    # 保存转写结果
                    self.transcriptions.append({
                        'chunk': chunk_info['chunk_index'],
                        'time': chunk_info['timestamp'],
                        'text': text
                    })
                    
                    # 显示结果
                    print(f"[{chunk_info['timestamp']}] 转写结果:{text}")
                    
                    # 实时保存到文件
                    self._save_realtime_transcription()
                    
                except Exception as e:
                    print(f"转写出错:{e}")
                    self.transcriptions.append({
                        'chunk': chunk_info['chunk_index'],
                        'time': chunk_info['timestamp'],
                        'text': f"[转写失败:{str(e)}]"
                    })
                
                finally:
                    # 清理临时文件
                    import os
                    if os.path.exists(chunk_info['file']):
                        os.remove(chunk_info['file'])
                    
                    self.audio_queue.task_done()
                    
            except queue.Empty:
                continue
            except Exception as e:
                print(f"处理线程出错:{e}")
    
    def _save_realtime_transcription(self):
        """实时保存转写结果"""
        filename = f"realtime_transcription_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        
        with open(filename, 'w', encoding='utf-8') as f:
            f.write("实时会议转写记录\n")
            f.write(f"开始时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write("=" * 60 + "\n\n")
            
            for item in self.transcriptions:
                f.write(f"[{item['time']}] 第{item['chunk']}段:\n")
                f.write(f"{item['text']}\n")
                f.write("-" * 40 + "\n")
    
    def get_full_transcription(self):
        """获取完整的转写文本"""
        full_text = ""
        for item in self.transcriptions:
            full_text += f"[{item['time']}] {item['text']}\n"
        return full_text

# 使用示例
if __name__ == "__main__":
    # 注意:需要先安装pyaudio
    # pip install pyaudio
    
    print("实时会议转写系统")
    print("=" * 40)
    
    # 创建转写器,每5秒处理一次
    transcriber = RealtimeMeetingTranscriber(
        chunk_duration=5,  # 每5秒转写一次
        language="zh"      # 中文
    )
    
    try:
        # 开始录音和转写
        transcriber.start_recording()
        
        # 这里可以添加其他逻辑,比如显示实时转写结果
        # 或者将转写结果发送到其他系统
        
        # 等待用户按Enter停止
        input("\n按Enter键停止录音...\n")
        
    except KeyboardInterrupt:
        print("\n接收到停止信号")
    finally:
        # 停止录音
        transcriber.stop_recording()
        
        # 获取完整转写结果
        full_text = transcriber.get_full_transcription()
        print("\n完整转写结果:")
        print(full_text)
        
        # 保存最终结果
        with open("final_transcription.txt", 'w', encoding='utf-8') as f:
            f.write(full_text)
        print("转写结果已保存到 final_transcription.txt")

这个实时转写器可以:

  1. 实时录音,每5秒(可配置)转写一次
  2. 显示实时转写结果
  3. 自动保存转写记录
  4. 支持长时间会议录音

4.2 性能优化建议

如果你需要处理大量会议录音,或者对速度有要求,可以考虑以下优化:

import torch
from qwen_asr import Qwen3ASRPipeline

class OptimizedASRService:
    """优化版ASR服务"""
    
    def __init__(self):
        # 使用半精度推理,减少显存占用,提高速度
        self.pipeline = Qwen3ASRPipeline.from_pretrained(
            "Qwen/Qwen3-ASR-1.7B",
            torch_dtype=torch.float16,  # 半精度
            device_map="cuda" if torch.cuda.is_available() else "cpu"
        )
        
        # 预热模型(第一次推理通常较慢)
        print("预热模型...")
        self._warm_up()
    
    def _warm_up(self):
        """预热模型,让第一次推理更快"""
        import numpy as np
        import soundfile as sf
        
        # 创建一个短暂的测试音频
        test_audio = np.random.randn(16000)  # 1秒的随机音频
        sf.write("test_warmup.wav", test_audio, 16000)
        
        # 执行一次推理
        _ = self.pipeline("test_warmup.wav", language="zh")
        
        # 清理测试文件
        import os
        os.remove("test_warmup.wav")
        
        print("模型预热完成")
    
    def batch_transcribe(self, audio_files, language="auto"):
        """批量转写(虽然不是真正的并行,但可以优化IO)"""
        results = []
        
        for audio_file in audio_files:
            try:
                # 这里可以添加缓存机制
                # 如果同一个文件已经转写过,直接返回缓存结果
                result = self.pipeline(audio_file, language=language)
                results.append({
                    "file": audio_file,
                    "text": result,
                    "status": "success"
                })
            except Exception as e:
                results.append({
                    "file": audio_file,
                    "text": "",
                    "status": "error",
                    "error": str(e)
                })
        
        return results
    
    def transcribe_with_retry(self, audio_file, language="auto", max_retries=3):
        """带重试机制的转写"""
        for attempt in range(max_retries):
            try:
                result = self.pipeline(audio_file, language=language)
                return {
                    "text": result,
                    "attempts": attempt + 1,
                    "status": "success"
                }
            except Exception as e:
                if attempt == max_retries - 1:
                    return {
                        "text": "",
                        "attempts": attempt + 1,
                        "status": "error",
                        "error": str(e)
                    }
                print(f"第{attempt + 1}次尝试失败,重试...")
                time.sleep(1)  # 等待1秒后重试
        
        return {
            "text": "",
            "attempts": max_retries,
            "status": "error",
            "error": "达到最大重试次数"
        }

# 使用缓存提高重复文件的处理速度
import hashlib
from functools import lru_cache

class CachedASRService:
    """带缓存的ASR服务"""
    
    def __init__(self):
        self.pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
        self.cache = {}  # 简单缓存字典
    
    def _get_file_hash(self, filepath):
        """计算文件哈希值,用于缓存键"""
        with open(filepath, 'rb') as f:
            return hashlib.md5(f.read()).hexdigest()
    
    def transcribe_with_cache(self, audio_file, language="auto"):
        """带缓存的转写"""
        # 生成缓存键:文件哈希 + 语言设置
        file_hash = self._get_file_hash(audio_file)
        cache_key = f"{file_hash}_{language}"
        
        # 检查缓存
        if cache_key in self.cache:
            print(f"缓存命中:{audio_file}")
            return self.cache[cache_key]
        
        # 执行转写
        print(f"缓存未命中,执行转写:{audio_file}")
        result = self.pipeline(audio_file, language=language)
        
        # 存入缓存
        self.cache[cache_key] = result
        
        # 如果缓存太大,清理一些旧条目
        if len(self.cache) > 100:  # 最多缓存100个结果
            # 简单的LRU策略:删除第一个条目
            first_key = next(iter(self.cache))
            del self.cache[first_key]
        
        return result

5. 常见问题与解决方案

5.1 音频格式问题

问题:我的音频不是WAV格式怎么办?

解决方案:使用下面的代码转换格式:

import librosa
import soundfile as sf

def convert_audio_format(input_file, output_file=None, target_sr=16000):
    """
    转换音频格式为WAV
    
    参数:
    input_file: 输入文件路径
    output_file: 输出文件路径,如果不指定则自动生成
    target_sr: 目标采样率,默认16000Hz
    
    支持格式:mp3, m4a, flac, wav, ogg等
    """
    if output_file is None:
        output_file = input_file.rsplit('.', 1)[0] + '_converted.wav'
    
    try:
        # 加载音频,自动重采样并转为单声道
        audio, sr = librosa.load(input_file, sr=target_sr, mono=True)
        
        # 保存为WAV格式
        sf.write(output_file, audio, target_sr)
        
        print(f"转换成功:{input_file} -> {output_file}")
        return output_file
        
    except Exception as e:
        print(f"转换失败:{e}")
        return None

# 批量转换
def batch_convert(folder_path, target_sr=16000):
    """批量转换文件夹内的所有音频文件"""
    import os
    from pathlib import Path
    
    folder = Path(folder_path)
    supported_extensions = ['.mp3', '.m4a', '.flac', '.ogg', '.wav', '.m4a']
    
    converted_files = []
    
    for ext in supported_extensions:
        for audio_file in folder.glob(f"*{ext}"):
            print(f"处理:{audio_file.name}")
            output_file = convert_audio_format(str(audio_file), target_sr=target_sr)
            if output_file:
                converted_files.append(output_file)
    
    print(f"\n转换完成,共处理 {len(converted_files)} 个文件")
    return converted_files

5.2 识别准确率问题

问题:有些专业术语识别不准怎么办?

解决方案:可以尝试以下方法提高准确率:

def improve_recognition_accuracy(audio_path, language="zh"):
    """
    提高识别准确率的方法
    """
    from qwen_asr import Qwen3ASRPipeline
    
    # 1. 确保使用正确的语言
    pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
    
    # 2. 音频预处理
    import librosa
    import noisereduce as nr
    import soundfile as sf
    
    # 加载音频
    audio, sr = librosa.load(audio_path, sr=16000, mono=True)
    
    # 降噪
    print("进行降噪处理...")
    audio_denoised = nr.reduce_noise(y=audio, sr=sr)
    
    # 音量归一化
    print("进行音量归一化...")
    import numpy as np
    audio_normalized = audio_denoised / np.max(np.abs(audio_denoised)) * 0.9
    
    # 保存处理后的音频
    processed_file = audio_path.replace('.wav', '_processed.wav')
    sf.write(processed_file, audio_normalized, sr)
    
    # 3. 尝试不同的语言设置
    results = {}
    
    # 自动检测
    print("尝试自动检测语言...")
    result_auto = pipeline(processed_file, language="auto")
    results["auto"] = result_auto
    
    # 指定语言
    print(f"尝试指定语言:{language}...")
    result_specified = pipeline(processed_file, language=language)
    results["specified"] = result_specified
    
    # 4. 清理临时文件
    import os
    os.remove(processed_file)
    
    return results

# 使用上下文提示(如果知道会议主题)
def transcribe_with_context(audio_path, context_keywords=None):
    """
    使用上下文关键词提高准确率
    
    原理:虽然Qwen3-ASR-1.7B不支持直接传入上下文,
    但我们可以通过后处理来修正一些明显的错误
    """
    from qwen_asr import Qwen3ASRPipeline
    
    pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
    
    # 先进行普通转写
    raw_text = pipeline(audio_path, language="zh")
    
    # 如果有上下文关键词,尝试修正
    if context_keywords:
        corrected_text = raw_text
        
        # 简单的关键词替换(实际应用可能需要更复杂的逻辑)
        for wrong, correct in context_keywords.items():
            if wrong in corrected_text:
                print(f"检测到可能错误:'{wrong}' -> '{correct}'")
                corrected_text = corrected_text.replace(wrong, correct)
        
        return {
            "raw": raw_text,
            "corrected": corrected_text,
            "corrections": len(context_keywords)
        }
    
    return {"raw": raw_text, "corrected": raw_text, "corrections": 0}

# 使用示例
if __name__ == "__main__":
    # 假设这是一个技术会议的录音,包含一些专业术语
    context_keywords = {
        "拍森": "Python",
        "加瓦": "Java",
        "艾哎": "AI",
        "机器学习": "机器学习",
        "深度学习": "深度学习",
        "神经网络": "神经网络"
    }
    
    result = transcribe_with_context("tech_meeting.wav", context_keywords)
    
    print("原始转写:")
    print(result["raw"])
    print("\n修正后:")
    print(result["corrected"])
    print(f"\n共修正 {result['corrections']} 处")

5.3 内存和性能问题

问题:处理长音频时内存不足怎么办?

解决方案:使用分段处理和内存优化:

def transcribe_large_audio_safely(audio_path, max_chunk_size_mb=50, language="zh"):
    """
    安全处理大音频文件,避免内存溢出
    
    参数:
    audio_path: 音频文件路径
    max_chunk_size_mb: 每个chunk的最大大小(MB)
    language: 语言代码
    """
    import os
    import math
    
    # 获取文件大小
    file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
    print(f"音频文件大小:{file_size_mb:.1f} MB")
    
    # 计算需要分成多少段
    # 假设1分钟音频约1MB(16kHz, 单声道, 16bit)
    estimated_duration_minutes = file_size_mb
    chunks_needed = math.ceil(file_size_mb / max_chunk_size_mb)
    
    print(f"预计时长:{estimated_duration_minutes:.1f} 分钟")
    print(f"需要分成 {chunks_needed} 段处理")
    
    if chunks_needed == 1:
        # 文件不大,直接处理
        from qwen_asr import Qwen3ASRPipeline
        pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
        return pipeline(audio_path, language=language)
    else:
        # 需要分段处理
        chunk_duration_minutes = estimated_duration_minutes / chunks_needed
        print(f"每段约 {chunk_duration_minutes:.1f} 分钟")
        
        # 调用之前的分段处理函数
        full_text, _ = transcribe_long_meeting(
            audio_path,
            chunk_minutes=chunk_duration_minutes,
            language=language
        )
        
        return full_text

# 内存优化配置
def get_memory_optimized_pipeline(device="cuda"):
    """
    获取内存优化版的pipeline
    """
    from qwen_asr import Qwen3ASRPipeline
    import torch
    
    # 根据可用内存选择配置
    if device == "cuda":
        # 检查GPU内存
        free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)
        free_memory_gb = free_memory / (1024**3)
        
        print(f"GPU可用内存:{free_memory_gb:.1f} GB")
        
        if free_memory_gb < 4:
            print("显存不足,使用CPU模式")
            device = "cpu"
    
    if device == "cpu":
        # CPU模式,使用更节省内存的配置
        pipeline = Qwen3ASRPipeline.from_pretrained(
            "Qwen/Qwen3-ASR-1.7B",
            device_map="cpu",
            low_cpu_mem_usage=True,
            torch_dtype=torch.float32
        )
    else:
        # GPU模式,使用半精度节省显存
        pipeline = Qwen3ASRPipeline.from_pretrained(
            "Qwen/Qwen3-ASR-1.7B",
            device_map="cuda",
            torch_dtype=torch.float16  # 半精度
        )
    
    return pipeline

6. 实际应用案例

6.1 公司会议纪要自动化系统

下面是一个完整的公司会议纪要自动化系统示例:

import os
import schedule
import time
from datetime import datetime
from pathlib import Path

class MeetingTranscriptionSystem:
    """会议转写自动化系统"""
    
    def __init__(self, config_file="config.json"):
        self.config = self._load_config(config_file)
        self.setup_directories()
        
        # 加载模型
        from qwen_asr import Qwen3ASRPipeline
        self.pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
        
        print("会议转写系统初始化完成")
    
    def _load_config(self, config_file):
        """加载配置文件"""
        import json
        default_config = {
            "watch_dirs": ["./meetings/recordings"],  # 监控的目录
            "output_dir": "./meetings/transcriptions",  # 输出目录
            "processed_dir": "./meetings/processed",    # 已处理目录
            "file_patterns": ["*.wav", "*.mp3", "*.m4a"],  # 监控的文件类型
            "language": "auto",  # 识别语言
            "auto_process": True,  # 是否自动处理新文件
            "generate_minutes": True,  # 是否生成会议纪要
            "archive_original": True  # 是否归档原始文件
        }
        
        if os.path.exists(config_file):
            with open(config_file, 'r', encoding='utf-8') as f:
                user_config = json.load(f)
                default_config.update(user_config)
        
        # 保存配置
        with open(config_file, 'w', encoding='utf-8') as f:
            json.dump(default_config, f, indent=2, ensure_ascii=False)
        
        return default_config
    
    def setup_directories(self):
        """创建必要的目录"""
        for dir_path in [self.config["output_dir"], 
                        self.config["processed_dir"]] + self.config["watch_dirs"]:
            Path(dir_path).mkdir(parents=True, exist_ok=True)
    
    def process_new_recordings(self):
        """处理新录音文件"""
        print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 检查新录音文件...")
        
        new_files = []
        
        for watch_dir in self.config["watch_dirs"]:
            for pattern in self.config["file_patterns"]:
                for audio_file in Path(watch_dir).glob(pattern):
                    # 检查是否是新文件(今天创建的)
                    create_time = datetime.fromtimestamp(audio_file.stat().st_ctime)
                    if create_time.date() == datetime.now().date():
                        new_files.append(audio_file)
        
        if not new_files:
            print("没有发现新录音文件")
            return
        
        print(f"发现 {len(new_files)} 个新录音文件")
        
        for audio_file in new_files:
            print(f"\n处理文件:{audio_file.name}")
            
            try:
                # 转写
                transcription = self.transcribe_meeting(audio_file)
                
                # 生成会议纪要
                if self.config["generate_minutes"]:
                    minutes = self.generate_meeting_minutes(
                        transcription, 
                        audio_file.name
                    )
                    self.save_minutes(minutes, audio_file)
                
                # 归档原始文件
                if self.config["archive_original"]:
                    self.archive_file(audio_file)
                
                print(f"✓ 处理完成:{audio_file.name}")
                
            except Exception as e:
                print(f"✗ 处理失败:{audio_file.name} - {str(e)}")
    
    def transcribe_meeting(self, audio_file):
        """转写会议录音"""
        print(f"  开始转写...")
        start_time = time.time()
        
        # 如果是非WAV格式,先转换
        if audio_file.suffix.lower() != '.wav':
            audio_file = self.convert_to_wav(audio_file)
        
        # 执行转写
        text = self.pipeline(str(audio_file), language=self.config["language"])
        
        processing_time = time.time() - start_time
        print(f"  转写完成,耗时 {processing_time:.1f} 秒")
        
        # 保存转写结果
        output_file = Path(self.config["output_dir"]) / f"{audio_file.stem}_transcribed.txt"
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(f"会议录音转写结果\n")
            f.write(f"文件:{audio_file.name}\n")
            f.write(f"转写时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"处理时长:{processing_time:.1f}秒\n")
            f.write("=" * 60 + "\n\n")
            f.write(text)
        
        return text
    
    def convert_to_wav(self, audio_file):
        """转换音频格式为WAV"""
        import librosa
        import soundfile as sf
        
        print(f"  转换格式:{audio_file.suffix} -> .wav")
        
        # 加载音频
        audio, sr = librosa.load(audio_file, sr=16000, mono=True)
        
        # 保存为WAV
        wav_file = audio_file.with_suffix('.wav')
        sf.write(wav_file, audio, sr)
        
        return wav_file
    
    def generate_meeting_minutes(self, transcription, filename):
        """生成会议纪要"""
        from .post_processor import MeetingPostProcessor  # 假设有后处理模块
        
        processor = MeetingPostProcessor()
        
        # 从文件名提取会议信息
        meeting_info = self.extract_meeting_info(filename)
        
        # 生成纪要
        minutes = processor.format_meeting_minutes(
            transcription,
            meeting_title=meeting_info.get("title", "会议纪要"),
            participants=meeting_info.get("participants", "")
        )
        
        return minutes
    
    def extract_meeting_info(self, filename):
        """从文件名提取会议信息"""
        # 简单的文件名解析逻辑
        # 例如:2024-01-15_项目评审会_张三李四.wav
        name_parts = Path(filename).stem.split('_')
        
        info = {
            "date": name_parts[0] if len(name_parts) > 0 else datetime.now().strftime("%Y-%m-%d"),
            "title": name_parts[1] if len(name_parts) > 1 else "会议",
            "participants": name_parts[2] if len(name_parts) > 2 else ""
        }
        
        return info
    
    def save_minutes(self, minutes, original_file):
        """保存会议纪要"""
        minutes_file = Path(self.config["output_dir"]) / f"{original_file.stem}_minutes.txt"
        
        with open(minutes_file, 'w', encoding='utf-8') as f:
            f.write(minutes)
        
        print(f"  会议纪要已保存:{minutes_file.name}")
    
    def archive_file(self, audio_file):
        """归档原始文件"""
        import shutil
        
        target_dir = Path(self.config["processed_dir"]) / datetime.now().strftime("%Y-%m")
        target_dir.mkdir(parents=True, exist_ok=True)
        
        target_path = target_dir / audio_file.name
        shutil.move(audio_file, target_path)
        
        print(f"  原始文件已归档:{target_path}")
    
    def run_daemon(self):
        """运行守护进程,定时检查新文件"""
        print("启动会议转写守护进程...")
        print(f"监控目录:{self.config['watch_dirs']}")
        print(f"输出目录:{self.config['output_dir']}")
        print(f"检查频率:每5分钟")
        print("按Ctrl+C停止")
        
        # 每5分钟检查一次新文件
        schedule.every(5).minutes.do(self.process_new_recordings)
        
        # 立即执行一次
        self.process_new_recordings()
        
        try:
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            print("\n停止守护进程")
    
    def run_once(self):
        """运行一次处理"""
        self.process_new_recordings()

# 使用示例
if __name__ == "__main__":
    # 创建系统实例
    system = MeetingTranscriptionSystem()
    
    # 运行一次处理
    system.run_once()
    
    # 或者运行守护进程(持续监控)
    # system.run_daemon()

6.2 与现有系统集成

如果你已经有会议管理系统,可以将转写功能集成进去:

class MeetingSystemIntegration:
    """会议系统集成类"""
    
    @staticmethod
    def integrate_with_calendar(calendar_system="outlook"):
        """与日历系统集成"""
        # 这里以Outlook为例,其他系统类似
        if calendar_system == "outlook":
            try:
                import win32com.client
                
                outlook = win32com.client.Dispatch("Outlook.Application")
                namespace = outlook.GetNamespace("MAPI")
                
                # 获取今天的会议
                calendar = namespace.GetDefaultFolder(9)  # 日历文件夹
                appointments = calendar.Items
                appointments.Sort("[Start]")
                appointments.IncludeRecurrences = True
                
                today = datetime.now().date()
                appointments = appointments.Restrict(
                    f"[Start] >= '{today.strftime('%m/%d/%Y')}' AND [Start] < '{(today + timedelta(days=1)).strftime('%m/%d/%Y')}'"
                )
                
                meetings = []
                for appointment in appointments:
                    meetings.append({
                        "subject": appointment.Subject,
                        "start": appointment.Start,
                        "end": appointment.End,
                        "location": appointment.Location,
                        "body": appointment.Body
                    })
                
                return meetings
                
            except Exception as e:
                print(f"连接Outlook失败:{e}")
                return []
        
        elif calendar_system == "google":
            # Google Calendar集成
            # 需要安装google-api-python-client
            pass
        
        return []
    
    @staticmethod
    def auto_rename_recording(audio_file, meeting_info):
        """根据会议信息自动重命名录音文件"""
        if meeting_info:
            # 使用会议主题和时间作为文件名
            meeting_time = meeting_info.get("start", datetime.now())
            if isinstance(meeting_time, str):
                meeting_time = datetime.fromisoformat(meeting_time.replace('Z', '+00:00'))
            
            new_name = f"{meeting_time.strftime('%Y-%m-%d_%H%M')}_{meeting_info.get('subject', '会议')}.wav"
            new_path = audio_file.parent / new_name
            
            # 重命名文件
            audio_file.rename(new_path)
            print(f"文件已重命名为:{new_name}")
            
            return new_path
        
        return audio_file
    
    @staticmethod
    def send_to_notion(transcription, meeting_info, notion_token, database_id):
        """将转写结果发送到Notion"""
        try:
            from notion_client import Client
            
            notion = Client(auth=notion_token)
            
            # 创建页面
            new_page = {
                "parent": {"database_id": database_id},
                "properties": {
                    "标题": {
                        "title": [
                            {
                                "text": {
                                    "content": meeting_info.get("subject", "会议纪要")
                                }
                            }
                        ]
                    },
                    "日期": {
                        "date": {
                            "start": meeting_info.get("start", datetime.now().isoformat())
                        }
                    },
                    "状态": {
                        "select": {
                            "name": "已完成"
                        }
                    }
                },
                "children": [
                    {
                        "object": "block",
                        "type": "heading_2",
                        "heading_2": {
                            "rich_text": [{"type": "text", "text": {"content": "会议内容"}}]
                        }
                    },
                    {
                        "object": "block",
                        "type": "paragraph",
                        "paragraph": {
                            "rich_text": [{"type": "text", "text": {"content": transcription}}]
                        }
                    }
                ]
            }
            
            response = notion.pages.create(**new_page)
            print(f"已发送到Notion:{response['url']}")
            
            return response
            
        except Exception as e:
            print(f"发送到Notion失败:{e}")
            return None
    
    @staticmethod
    def create_summary_email(transcription, meeting_info, recipients):
        """创建总结邮件"""
        from email.mime.text import MIMEText
        from email.mime.multipart import MIMEMultipart
        import smtplib
        
        # 提取关键点
        from post_processor import MeetingPostProcessor
        processor = MeetingPostProcessor()
        key_points = processor.extract_key_points(transcription)
        
        # 创建邮件内容
        msg = MIMEMultipart()
        msg['Subject'] = f"会议纪要:{meeting_info.get('subject', '会议')}"
        msg['From'] = "meeting_transcriber@company.com"
        msg['To'] = ", ".join(recipients)
        
        # 正文
        body = f"""
会议主题:{meeting_info.get('subject', '会议')}
会议时间:{meeting_info.get('start', datetime.now().strftime('%Y-%m-%d %H:%M'))}
参会人员:{meeting_info.get('participants', '详见邀请')}

会议内容摘要:
{transcription[:500]}...(完整内容请查看附件)

主要讨论要点:
"""
        
        for i, point in enumerate(key_points, 1):
            body += f"{i}. {point}\n"
        
        body += """
下一步行动:
1. 待补充

备注:本纪要由语音识别系统自动生成,仅供参考。
"""
        
        msg.attach(MIMEText(body, 'plain', 'utf-8'))
        
        # 添加附件(完整转写)
        attachment = MIMEText(transcription, 'plain', 'utf-8')
        attachment.add_header('Content-Disposition', 'attachment', 
                            filename=f"会议纪要_{datetime.now().strftime('%Y%m%d')}.txt")
        msg.attach(attachment)
        
        return msg

7. 总结

通过这篇教程,你应该已经掌握了使用Qwen3-ASR-1.7B进行会议录音转写的完整流程。我们从最简单的单文件转写开始,一步步实现了批量处理、实时转写、会议纪要生成等高级功能。

7.1 核心要点回顾

  1. 部署简单:无论是使用镜像一键部署,还是手动安装,都能快速上手
  2. 多语言支持:中文、英文、日语、韩语、粤语都能识别,还能自动检测语言
  3. 完全离线:所有处理都在本地完成,数据安全有保障
  4. 性能优秀:实时因子RTF<0.3,处理速度很快
  5. 易于集成:可以轻松集成到现有系统中

7.2 实际应用建议

根据我的使用经验,给你几个实用建议:

对于个人使用:

  • 直接用镜像部署,最简单快捷
  • 每周花10分钟批量处理会议录音
  • 结合后处理脚本,自动生成会议纪要

对于团队使用:

  • 搭建一个共享的转写服务
  • 设置自动监控文件夹,有新录音自动处理
  • 将结果自动同步到团队协作工具(如Notion、Confluence)

对于企业级应用:

  • 考虑高可用部署,确保服务稳定
  • 添加用户管理和权限控制
  • 集成到现有的会议管理系统中
  • 定期备份转写数据

7.3 遇到的坑和解决方案

在我实际使用中,遇到过这些问题,你可以提前避免:

  1. 音频格式问题:一定要确保是WAV格式,如果不是就先转换
  2. 长音频处理:超过10分钟的录音最好分段处理
  3. 专业术语识别:对于专业会议,可以准备一个术语对照表做后处理
  4. 多人同时说话:模型对重叠语音的识别还有提升空间,尽量保证录音质量

7.4 下一步可以做什么

如果你对这个方案感兴趣,还可以继续优化:

  1. 添加说话人分离:结合说话人识别技术,区分不同发言者
  2. 集成时间戳:为每个句子添加时间戳,方便回听核对
  3. 情感分析:分析发言者的情感倾向
  4. 关键词提取:自动提取会议关键词和行动项
  5. 多模态分析:结合会议PPT、聊天记录等多维度信息

语音转文字技术正在改变我们的工作方式。以前需要人工逐字听写的会议纪要,现在可以自动化完成。虽然现在的准确率还不能达到100%,但对于大多数日常会议已经足够用了。

希望这篇教程能帮你节省大量整理会议记录的时间。技术应该服务于人,而不是增加人的负担。用好这些工具,让我们有更多时间思考真正重要的事情。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐