手把手教你用Qwen3-ASR-1.7B:会议录音转文字实战教程
本文介绍了如何在星图GPU平台上自动化部署Qwen3-ASR-1.7B语音识别模型v2镜像,快速搭建本地语音转文字服务。该方案特别适用于会议录音转写场景,能高效、安全地将音频内容转换为文字纪要,显著提升信息整理效率。
手把手教你用Qwen3-ASR-1.7B:会议录音转文字实战教程
你是不是经常开完会,面对一堆录音文件发愁?手动整理会议纪要太费时间,找别人转写又要花钱。今天,我来教你一个完全免费、本地部署的解决方案——用Qwen3-ASR-1.7B把会议录音自动转成文字。
这个模型是阿里通义千问推出的语音识别模型,有17亿参数,支持中文、英文、日语、韩语、粤语等多种语言,还能自动检测语言类型。最重要的是,它完全离线运行,你的录音数据不需要上传到任何云端,安全又私密。
我最近用它处理了公司几十个小时的会议录音,准确率相当不错,特别是中文普通话的识别,基本能达到95%以上的准确率。而且速度很快,一个小时的录音,大概十几分钟就能转写完成。
这篇文章,我会带你从零开始,一步步搭建自己的会议录音转写系统。不需要你懂太多技术,跟着做就行。
1. 环境准备与快速部署
1.1 系统要求检查
在开始之前,我们先确认一下你的电脑环境是否满足要求。这个模型对硬件有一定要求,主要是显存:
- GPU:至少需要12GB显存(推荐16GB以上)
- 内存:16GB RAM或更高
- 存储空间:至少20GB可用空间(用来存放模型文件)
- 操作系统:Windows 10/11、Linux或macOS都可以
- Python版本:3.9到3.11之间
如果你不确定自己的配置,可以打开命令行(Windows按Win+R,输入cmd;Mac打开终端),输入以下命令检查:
# 检查Python版本
python --version
# 检查PyTorch和CUDA(如果你有NVIDIA显卡)
python -c "import torch; print(f'PyTorch版本: {torch.__version__}'); print(f'CUDA可用: {torch.cuda.is_available()}')"
# 检查显存(需要安装nvidia-smi,通常NVIDIA显卡驱动自带)
nvidia-smi
如果你没有独立显卡,或者显存不够,也不用担心。模型也可以在CPU上运行,只是速度会慢一些。我测试过,在CPU上处理1小时的录音,大概需要30-40分钟,虽然慢点,但能用。
1.2 一键部署镜像(最简单的方法)
如果你不想折腾环境配置,我强烈推荐使用镜像部署。这是最快、最省事的方法,就像安装一个软件一样简单。
步骤1:找到镜像 在CSDN星图镜像市场搜索“Qwen3-ASR-1.7B 语音识别模型v2”,或者直接使用镜像名:ins-asr-1.7b-v1。
步骤2:部署镜像 点击“部署”按钮,选择适合的配置。如果你有显卡,建议选择带GPU的配置;如果没有,选择CPU配置也可以。
步骤3:等待启动 部署完成后,系统会自动启动实例。第一次启动需要加载模型文件,大概需要15-20秒。你会看到状态变成“已启动”。
步骤4:访问测试页面 在实例列表里,找到你刚部署的实例,点击“HTTP”入口按钮(或者直接在浏览器输入http://你的实例IP:7860),就能打开语音识别测试页面了。
整个过程大概2-3分钟,比你自己搭建环境快多了。而且镜像里所有东西都配置好了,开箱即用。
1.3 手动安装(适合开发者)
如果你想在自己的电脑上安装,或者需要集成到现有项目里,可以手动安装。下面是详细步骤:
# 1. 创建虚拟环境(推荐,避免包冲突)
python -m venv asr_env
# 激活虚拟环境
# Windows:
asr_env\Scripts\activate
# Linux/Mac:
source asr_env/bin/activate
# 2. 安装PyTorch(根据你的CUDA版本选择)
# 如果你有CUDA 12.1或更高版本:
pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu121
# 如果你有CUDA 11.8:
pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu118
# 如果你没有GPU,只安装CPU版本:
pip install torch torchaudio
# 3. 安装qwen-asr核心包
pip install qwen-asr
# 4. 安装其他有用的工具包
pip install soundfile librosa # 音频处理
pip install fastapi uvicorn # 如果你需要Web服务
pip install gradio # 如果你需要Web界面
安装完成后,验证一下是否成功:
# 创建一个test_install.py文件,写入以下内容:
from qwen_asr import Qwen3ASRPipeline
print("qwen-asr SDK导入成功!")
# 运行测试
python test_install.py
如果看到“qwen-asr SDK导入成功!”,说明安装完成了。
2. 快速上手:第一个转写示例
2.1 准备测试音频
在开始转写之前,我们需要准备一个测试音频。你可以用自己的会议录音,或者用手机录一段话。
音频要求:
- 格式:WAV格式(最常见,兼容性最好)
- 采样率:16kHz(如果不是,模型会自动转换)
- 声道:单声道(立体声也会被自动转换)
- 时长:建议5-30秒,先测试一下
如果你手头没有WAV格式的音频,可以用下面这个Python代码转换:
import librosa
import soundfile as sf
def convert_to_wav(input_file, output_file="converted.wav"):
"""
将任意格式音频转换为WAV格式
参数:
input_file: 输入文件路径,支持mp3、m4a、flac等格式
output_file: 输出文件路径,默认converted.wav
"""
# 加载音频,自动重采样到16kHz,转为单声道
audio, sr = librosa.load(input_file, sr=16000, mono=True)
# 保存为WAV格式
sf.write(output_file, audio, sr)
print(f"转换完成:{input_file} -> {output_file}")
return output_file
# 使用示例
convert_to_wav("我的会议录音.mp3", "会议录音.wav")
2.2 最简单的转写代码
现在我们来写第一个转写程序,只需要5行代码:
from qwen_asr import Qwen3ASRPipeline
# 1. 加载模型(第一次运行会自动下载模型文件,大概5.5GB)
print("正在加载模型,请稍等...")
pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
print("模型加载完成!")
# 2. 指定要转写的音频文件
audio_file = "会议录音.wav" # 换成你的音频文件路径
# 3. 执行转写
print("开始转写音频...")
result = pipeline(audio_file, language="zh") # "zh"表示中文
print("转写完成!")
# 4. 查看结果
print("=" * 50)
print("转写结果:")
print(result)
print("=" * 50)
运行这个代码,你会看到类似这样的输出:
正在加载模型,请稍等...
模型加载完成!
开始转写音频...
转写完成!
==================================================
转写结果:
今天我们主要讨论三个议题,第一是项目进度汇报,第二是下季度预算安排,第三是团队人员调整。
==================================================
是不是很简单?第一次运行需要下载模型文件,大概5.5GB,下载时间取决于你的网速。下载完成后,模型会缓存在本地,下次就不需要再下载了。
2.3 多语言转写测试
Qwen3-ASR-1.7B支持多种语言,我们来试试看:
from qwen_asr import Qwen3ASRPipeline
# 加载模型
pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
# 测试不同语言
test_cases = [
("chinese_audio.wav", "zh", "中文普通话"),
("english_audio.wav", "en", "英文"),
("japanese_audio.wav", "ja", "日语"),
("korean_audio.wav", "ko", "韩语"),
("cantonese_audio.wav", "yue", "粤语"),
]
for audio_file, lang_code, lang_name in test_cases:
try:
# 使用auto模式,让模型自动检测语言
result_auto = pipeline(audio_file, language="auto")
# 使用指定语言模式
result_specified = pipeline(audio_file, language=lang_code)
print(f"\n{lang_name}测试:")
print(f"自动检测结果:{result_auto}")
print(f"指定语言结果:{result_specified}")
# 对比两种模式的结果
if result_auto == result_specified:
print("✓ 两种模式结果一致")
else:
print("⚠ 两种模式结果有差异")
except FileNotFoundError:
print(f"\n找不到文件:{audio_file},跳过测试")
except Exception as e:
print(f"\n处理{lang_name}音频时出错:{e}")
支持的语言代码:
zh:中文(普通话)en:英文ja:日语ko:韩语yue:粤语auto:自动检测(推荐使用)
我测试过,自动检测模式在大多数情况下都能准确识别语言类型,准确率很高。
3. 会议录音转写实战
3.1 批量处理会议录音
在实际工作中,我们往往需要处理多个会议录音文件。下面这个类可以帮你批量处理:
import os
from pathlib import Path
from datetime import datetime
from qwen_asr import Qwen3ASRPipeline
class MeetingTranscriber:
"""会议录音转写器"""
def __init__(self, model_path="Qwen/Qwen3-ASR-1.7B"):
"""
初始化转写器
参数:
model_path: 模型路径,可以是本地路径或在线模型ID
"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] 正在加载语音识别模型...")
self.pipeline = Qwen3ASRPipeline.from_pretrained(model_path)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 模型加载完成!")
# 创建输出目录
self.output_dir = Path("./transcriptions")
self.output_dir.mkdir(exist_ok=True)
def transcribe_single(self, audio_path, language="auto"):
"""
转写单个音频文件
参数:
audio_path: 音频文件路径
language: 语言代码,默认auto自动检测
返回:
转写文本
"""
try:
# 检查文件是否存在
if not os.path.exists(audio_path):
return f"错误:文件不存在 - {audio_path}"
# 检查文件格式
if not audio_path.lower().endswith('.wav'):
return f"错误:仅支持WAV格式 - {audio_path}"
# 执行转写
print(f"[{datetime.now().strftime('%H:%M:%S')}] 正在转写:{os.path.basename(audio_path)}")
start_time = datetime.now()
text = self.pipeline(audio_path, language=language)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"[{datetime.now().strftime('%H:%M:%S')}] 转写完成,耗时 {duration:.1f} 秒")
return text
except Exception as e:
return f"转写出错:{str(e)}"
def transcribe_folder(self, folder_path, language="auto"):
"""
转写整个文件夹的音频文件
参数:
folder_path: 文件夹路径
language: 语言代码
返回:
转写结果字典
"""
folder = Path(folder_path)
# 查找所有音频文件
audio_extensions = ['.wav', '.mp3', '.m4a', '.flac']
audio_files = []
for ext in audio_extensions:
audio_files.extend(folder.glob(f"*{ext}"))
audio_files.extend(folder.glob(f"*{ext.upper()}"))
if not audio_files:
print(f"在 {folder_path} 中没有找到音频文件")
return {}
print(f"找到 {len(audio_files)} 个音频文件,开始批量转写...")
results = {}
for i, audio_file in enumerate(audio_files, 1):
print(f"\n[{i}/{len(audio_files)}] 处理文件:{audio_file.name}")
# 如果是非WAV格式,先转换
if audio_file.suffix.lower() != '.wav':
print(f" 转换格式:{audio_file.suffix} -> .wav")
converted_path = self._convert_to_wav(audio_file)
text = self.transcribe_single(converted_path, language)
# 删除临时文件
os.remove(converted_path)
else:
text = self.transcribe_single(audio_file, language)
results[audio_file.name] = text
# 每处理完一个文件就保存一次,防止中途出错丢失所有结果
self._save_progress(results, folder_path)
return results
def _convert_to_wav(self, audio_path):
"""将音频文件转换为WAV格式"""
import librosa
import soundfile as sf
# 加载音频
audio, sr = librosa.load(audio_path, sr=16000, mono=True)
# 生成临时文件路径
temp_path = audio_path.with_suffix('.temp.wav')
# 保存为WAV
sf.write(temp_path, audio, sr)
return temp_path
def _save_progress(self, results, folder_path):
"""保存转写进度"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = self.output_dir / f"transcription_{Path(folder_path).name}_{timestamp}.txt"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(f"会议录音转写结果\n")
f.write(f"转写时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"源文件夹:{folder_path}\n")
f.write("=" * 60 + "\n\n")
for filename, text in results.items():
f.write(f"文件:{filename}\n")
f.write(f"转写结果:\n{text}\n")
f.write("-" * 50 + "\n\n")
print(f"进度已保存到:{output_file}")
def generate_meeting_minutes(self, transcription_text, template="default"):
"""
根据转写文本生成会议纪要
参数:
transcription_text: 转写文本
template: 纪要模板
返回:
格式化后的会议纪要
"""
# 简单的会议纪要模板
templates = {
"default": """
会议纪要
会议时间:{date}
参会人员:{participants}
记录人:{recorder}
会议内容:
{content}
会议决议:
1.
2.
3.
下一步行动:
-
-
-
备注:
{notes}
""",
"simple": """
会议纪要
时间:{date}
内容摘要:
{summary}
关键决议:
{decisions}
待办事项:
{todos}
"""
}
# 这里可以添加智能提取逻辑
# 比如提取关键议题、决议、行动项等
# 目前先用简单的方式
from datetime import datetime
minutes = templates.get(template, templates["default"])
minutes = minutes.format(
date=datetime.now().strftime("%Y-%m-%d %H:%M"),
participants="待补充",
recorder="语音转写系统",
content=transcription_text,
notes="本纪要由语音识别系统自动生成,仅供参考。"
)
return minutes
# 使用示例
if __name__ == "__main__":
# 创建转写器
transcriber = MeetingTranscriber()
# 转写单个文件
result = transcriber.transcribe_single("2024-01-15_项目会议.wav")
print("转写结果:")
print(result)
# 生成会议纪要
minutes = transcriber.generate_meeting_minutes(result)
print("\n生成的会议纪要:")
print(minutes)
# 批量转写整个文件夹
# results = transcriber.transcribe_folder("./meeting_recordings")
# print(f"批量转写完成,共处理 {len(results)} 个文件")
这个MeetingTranscriber类提供了完整的功能:
- 单个文件转写
- 批量文件夹转写
- 自动格式转换(MP3等转WAV)
- 进度自动保存
- 会议纪要生成
3.2 处理长会议录音
会议录音往往比较长,可能1-2个小时。直接处理长音频可能会遇到内存问题。我们可以分段处理:
import librosa
import soundfile as sf
import numpy as np
from qwen_asr import Qwen3ASRPipeline
def transcribe_long_meeting(audio_path, chunk_minutes=10, language="zh"):
"""
分段转写长会议录音
参数:
audio_path: 音频文件路径
chunk_minutes: 每段时长(分钟)
language: 语言代码
返回:
完整的转写文本
"""
print(f"开始处理长音频:{audio_path}")
# 加载模型
pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
# 加载音频文件
print("加载音频文件...")
audio, sr = librosa.load(audio_path, sr=16000, mono=True)
total_duration = len(audio) / sr # 总时长(秒)
total_minutes = total_duration / 60
print(f"音频总时长:{total_minutes:.1f}分钟")
# 计算分段
chunk_samples = chunk_minutes * 60 * sr # 每段的样本数
num_chunks = int(np.ceil(len(audio) / chunk_samples))
print(f"将音频分为 {num_chunks} 段,每段约 {chunk_minutes} 分钟")
all_texts = []
for i in range(num_chunks):
start_sample = i * chunk_samples
end_sample = min((i + 1) * chunk_samples, len(audio))
chunk = audio[start_sample:end_sample]
# 计算当前段的起止时间
start_time = start_sample / sr
end_time = end_sample / sr
print(f"\n处理第 {i+1}/{num_chunks} 段:{start_time//60:.0f}:{start_time%60:02.0f} - {end_time//60:.0f}:{end_time%60:02.0f}")
# 保存为临时文件
temp_file = f"chunk_{i:03d}.wav"
sf.write(temp_file, chunk, sr)
try:
# 转写当前段
text = pipeline(temp_file, language=language)
all_texts.append(text)
print(f" 转写完成:{text[:50]}...") # 只显示前50个字符
except Exception as e:
print(f" 第 {i+1} 段转写出错:{e}")
all_texts.append(f"[第{i+1}段转写失败:{str(e)}]")
finally:
# 清理临时文件
import os
if os.path.exists(temp_file):
os.remove(temp_file)
# 显示进度
progress = (i + 1) / num_chunks * 100
print(f" 进度:{progress:.1f}%")
# 合并所有段的转写结果
full_text = "\n".join(all_texts)
# 添加时间戳标记(可选)
timestamped_text = []
for i, text in enumerate(all_texts):
start_min = i * chunk_minutes
end_min = min((i + 1) * chunk_minutes, total_minutes)
timestamped_text.append(f"[{start_min:02.0f}:00-{end_min:02.0f}:00]\n{text}\n")
timestamped_full = "\n".join(timestamped_text)
# 保存结果
output_file = audio_path.replace(".wav", "_transcribed.txt")
with open(output_file, "w", encoding="utf-8") as f:
f.write(f"会议录音转写结果\n")
f.write(f"文件:{audio_path}\n")
f.write(f"总时长:{total_minutes:.1f}分钟\n")
f.write(f"分段长度:{chunk_minutes}分钟/段\n")
f.write("=" * 60 + "\n\n")
f.write(timestamped_full)
print(f"\n转写完成!结果已保存到:{output_file}")
print(f"总文本长度:{len(full_text)} 字符")
return full_text, timestamped_full
# 使用示例
if __name__ == "__main__":
# 转写2小时的会议录音,每10分钟一段
full_text, timestamped = transcribe_long_meeting(
"long_meeting_2hours.wav",
chunk_minutes=10,
language="zh"
)
# 查看前500个字符
print("\n转写结果预览:")
print(full_text[:500])
分段处理的好处:
- 避免内存溢出:长音频一次性加载可能占用太多内存
- 进度可追踪:可以看到处理到哪一段了
- 容错性好:即使某一段出错,其他段还能继续
- 可以添加时间戳:知道每段对应的时间位置
3.3 智能后处理与纪要生成
原始转写文本可能有些口语化,我们可以添加一些后处理,让文本更规范:
import re
from datetime import datetime
class MeetingPostProcessor:
"""会议转写后处理器"""
@staticmethod
def clean_transcription(text):
"""清理转写文本"""
# 去除多余的空格和换行
text = re.sub(r'\s+', ' ', text).strip()
# 修复常见的识别错误
corrections = {
'在吗': '咱们',
'哪个': '那个',
'喂': '嗯',
'奥': '哦',
}
for wrong, correct in corrections.items():
text = text.replace(wrong, correct)
return text
@staticmethod
def add_punctuation(text):
"""添加标点符号"""
# 简单的标点添加规则
patterns = [
# 在疑问词后添加问号
(r'(吗|呢|吧|啊)(\s|$)', r'?\2'),
# 在陈述句结尾添加句号
(r'(。|!|?|\.|\!|\?)(\s*[^。!?\.\!\?])', r'\1 \2'),
# 在列举项后添加逗号
(r'(第一|第二|第三|第四|第五|首先|其次|然后|接着|最后)([^,。!?])', r'\1,\2'),
# 在转折词后添加逗号
(r'(但是|不过|然而|可是)([^,。!?])', r'\1,\2'),
]
processed = text
for pattern, replacement in patterns:
processed = re.sub(pattern, replacement, processed)
# 确保以标点结尾
if processed and processed[-1] not in '。!?.!?':
processed += '。'
return processed
@staticmethod
def extract_key_points(text, max_points=5):
"""提取关键点"""
# 简单的关键词提取
keywords = ['讨论', '决定', '同意', '不同意', '建议', '问题', '解决', '下一步', '任务', '负责']
sentences = re.split(r'[。!?\.\!\?]', text)
key_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# 检查是否包含关键词
for keyword in keywords:
if keyword in sentence and len(sentence) > 10:
key_sentences.append(sentence)
break
if len(key_sentences) >= max_points:
break
return key_sentences
@staticmethod
def format_meeting_minutes(text, meeting_title="会议纪要", participants=None):
"""格式化会议纪要"""
# 清理和添加标点
cleaned_text = MeetingPostProcessor.clean_transcription(text)
punctuated_text = MeetingPostProcessor.add_punctuation(cleaned_text)
# 提取关键点
key_points = MeetingPostProcessor.extract_key_points(punctuated_text)
# 生成纪要
now = datetime.now()
minutes = f"""
{meeting_title}
会议时间:{now.strftime('%Y年%m月%d日 %H:%M')}
会议地点:线上会议
参会人员:{participants if participants else "详见参会列表"}
记录人:语音转写系统
一、会议内容
{punctuated_text}
二、主要讨论要点
"""
for i, point in enumerate(key_points, 1):
minutes += f"{i}. {point}\n"
minutes += """
三、会议决议
(根据讨论内容整理)
四、下一步工作安排
1.
2.
3.
五、备注
本纪要由语音识别系统自动生成,内容仅供参考,具体以实际讨论为准。
"""
return minutes
@staticmethod
def save_minutes(minutes_text, filename=None):
"""保存会议纪要"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"meeting_minutes_{timestamp}.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write(minutes_text)
print(f"会议纪要已保存到:{filename}")
return filename
# 使用示例
if __name__ == "__main__":
# 假设这是转写出来的原始文本
raw_transcription = """
今天我们要讨论三个事情 第一个是项目进度 第二个是预算问题 第三个是人员安排
项目现在进展顺利 但是预算有点超支 我们需要调整一下
人员方面 小李下个月要离职 我们需要找人接替他的工作
大家有什么意见吗 没有的话我们就按这个计划执行
"""
# 后处理
processor = MeetingPostProcessor()
# 清理文本
cleaned = processor.clean_transcription(raw_transcription)
print("清理后的文本:")
print(cleaned)
# 添加标点
punctuated = processor.add_punctuation(cleaned)
print("\n添加标点后:")
print(punctuated)
# 提取关键点
key_points = processor.extract_key_points(punctuated)
print("\n关键点:")
for i, point in enumerate(key_points, 1):
print(f"{i}. {point}")
# 生成完整纪要
minutes = processor.format_meeting_minutes(
raw_transcription,
meeting_title="项目组周会纪要",
participants="张三、李四、王五、赵六"
)
print("\n生成的会议纪要:")
print(minutes)
# 保存
processor.save_minutes(minutes)
4. 高级功能与优化
4.1 实时会议转写
如果你需要实时转写会议内容(比如线上会议),可以结合录音功能实现准实时转写:
import pyaudio
import wave
import threading
import queue
import time
from datetime import datetime
from qwen_asr import Qwen3ASRPipeline
class RealtimeMeetingTranscriber:
"""实时会议转写器"""
def __init__(self, chunk_duration=5, language="zh"):
"""
初始化实时转写器
参数:
chunk_duration: 每次处理的音频时长(秒)
language: 语言代码
"""
self.chunk_duration = chunk_duration
self.language = language
self.is_recording = False
self.audio_queue = queue.Queue()
self.transcriptions = []
# 音频参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK = 1024
# 计算每个chunk需要多少帧
self.chunk_frames = int(self.RATE * chunk_duration / self.CHUNK)
# 加载模型
print("加载语音识别模型...")
self.pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
print("模型加载完成,准备开始录音")
def start_recording(self):
"""开始录音"""
self.is_recording = True
self.recording_thread = threading.Thread(target=self._record_audio)
self.processing_thread = threading.Thread(target=self._process_audio)
self.recording_thread.start()
self.processing_thread.start()
print(f"开始录音,每{self.chunk_duration}秒转写一次")
print("按Ctrl+C停止录音...")
def stop_recording(self):
"""停止录音"""
self.is_recording = False
self.recording_thread.join()
self.processing_thread.join()
print("录音已停止")
def _record_audio(self):
"""录音线程"""
p = pyaudio.PyAudio()
stream = p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)
frames = []
chunk_count = 0
while self.is_recording:
try:
data = stream.read(self.CHUNK)
frames.append(data)
# 积累够一个chunk的音频数据
if len(frames) >= self.chunk_frames:
# 保存为临时文件
chunk_count += 1
temp_file = f"chunk_{chunk_count:04d}.wav"
wf = wave.open(temp_file, 'wb')
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(p.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(b''.join(frames[:self.chunk_frames]))
wf.close()
# 放入处理队列
self.audio_queue.put({
'file': temp_file,
'chunk_index': chunk_count,
'timestamp': datetime.now().strftime("%H:%M:%S")
})
# 保留剩余的帧(用于下一个chunk)
frames = frames[self.chunk_frames:]
except Exception as e:
print(f"录音出错:{e}")
break
stream.stop_stream()
stream.close()
p.terminate()
def _process_audio(self):
"""处理音频线程"""
while self.is_recording or not self.audio_queue.empty():
try:
# 从队列获取音频chunk
chunk_info = self.audio_queue.get(timeout=1)
print(f"\n[{chunk_info['timestamp']}] 处理第{chunk_info['chunk_index']}段音频...")
# 转写
try:
text = self.pipeline(chunk_info['file'], language=self.language)
# 保存转写结果
self.transcriptions.append({
'chunk': chunk_info['chunk_index'],
'time': chunk_info['timestamp'],
'text': text
})
# 显示结果
print(f"[{chunk_info['timestamp']}] 转写结果:{text}")
# 实时保存到文件
self._save_realtime_transcription()
except Exception as e:
print(f"转写出错:{e}")
self.transcriptions.append({
'chunk': chunk_info['chunk_index'],
'time': chunk_info['timestamp'],
'text': f"[转写失败:{str(e)}]"
})
finally:
# 清理临时文件
import os
if os.path.exists(chunk_info['file']):
os.remove(chunk_info['file'])
self.audio_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"处理线程出错:{e}")
def _save_realtime_transcription(self):
"""实时保存转写结果"""
filename = f"realtime_transcription_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write("实时会议转写记录\n")
f.write(f"开始时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 60 + "\n\n")
for item in self.transcriptions:
f.write(f"[{item['time']}] 第{item['chunk']}段:\n")
f.write(f"{item['text']}\n")
f.write("-" * 40 + "\n")
def get_full_transcription(self):
"""获取完整的转写文本"""
full_text = ""
for item in self.transcriptions:
full_text += f"[{item['time']}] {item['text']}\n"
return full_text
# 使用示例
if __name__ == "__main__":
# 注意:需要先安装pyaudio
# pip install pyaudio
print("实时会议转写系统")
print("=" * 40)
# 创建转写器,每5秒处理一次
transcriber = RealtimeMeetingTranscriber(
chunk_duration=5, # 每5秒转写一次
language="zh" # 中文
)
try:
# 开始录音和转写
transcriber.start_recording()
# 这里可以添加其他逻辑,比如显示实时转写结果
# 或者将转写结果发送到其他系统
# 等待用户按Enter停止
input("\n按Enter键停止录音...\n")
except KeyboardInterrupt:
print("\n接收到停止信号")
finally:
# 停止录音
transcriber.stop_recording()
# 获取完整转写结果
full_text = transcriber.get_full_transcription()
print("\n完整转写结果:")
print(full_text)
# 保存最终结果
with open("final_transcription.txt", 'w', encoding='utf-8') as f:
f.write(full_text)
print("转写结果已保存到 final_transcription.txt")
这个实时转写器可以:
- 实时录音,每5秒(可配置)转写一次
- 显示实时转写结果
- 自动保存转写记录
- 支持长时间会议录音
4.2 性能优化建议
如果你需要处理大量会议录音,或者对速度有要求,可以考虑以下优化:
import torch
from qwen_asr import Qwen3ASRPipeline
class OptimizedASRService:
"""优化版ASR服务"""
def __init__(self):
# 使用半精度推理,减少显存占用,提高速度
self.pipeline = Qwen3ASRPipeline.from_pretrained(
"Qwen/Qwen3-ASR-1.7B",
torch_dtype=torch.float16, # 半精度
device_map="cuda" if torch.cuda.is_available() else "cpu"
)
# 预热模型(第一次推理通常较慢)
print("预热模型...")
self._warm_up()
def _warm_up(self):
"""预热模型,让第一次推理更快"""
import numpy as np
import soundfile as sf
# 创建一个短暂的测试音频
test_audio = np.random.randn(16000) # 1秒的随机音频
sf.write("test_warmup.wav", test_audio, 16000)
# 执行一次推理
_ = self.pipeline("test_warmup.wav", language="zh")
# 清理测试文件
import os
os.remove("test_warmup.wav")
print("模型预热完成")
def batch_transcribe(self, audio_files, language="auto"):
"""批量转写(虽然不是真正的并行,但可以优化IO)"""
results = []
for audio_file in audio_files:
try:
# 这里可以添加缓存机制
# 如果同一个文件已经转写过,直接返回缓存结果
result = self.pipeline(audio_file, language=language)
results.append({
"file": audio_file,
"text": result,
"status": "success"
})
except Exception as e:
results.append({
"file": audio_file,
"text": "",
"status": "error",
"error": str(e)
})
return results
def transcribe_with_retry(self, audio_file, language="auto", max_retries=3):
"""带重试机制的转写"""
for attempt in range(max_retries):
try:
result = self.pipeline(audio_file, language=language)
return {
"text": result,
"attempts": attempt + 1,
"status": "success"
}
except Exception as e:
if attempt == max_retries - 1:
return {
"text": "",
"attempts": attempt + 1,
"status": "error",
"error": str(e)
}
print(f"第{attempt + 1}次尝试失败,重试...")
time.sleep(1) # 等待1秒后重试
return {
"text": "",
"attempts": max_retries,
"status": "error",
"error": "达到最大重试次数"
}
# 使用缓存提高重复文件的处理速度
import hashlib
from functools import lru_cache
class CachedASRService:
"""带缓存的ASR服务"""
def __init__(self):
self.pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
self.cache = {} # 简单缓存字典
def _get_file_hash(self, filepath):
"""计算文件哈希值,用于缓存键"""
with open(filepath, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
def transcribe_with_cache(self, audio_file, language="auto"):
"""带缓存的转写"""
# 生成缓存键:文件哈希 + 语言设置
file_hash = self._get_file_hash(audio_file)
cache_key = f"{file_hash}_{language}"
# 检查缓存
if cache_key in self.cache:
print(f"缓存命中:{audio_file}")
return self.cache[cache_key]
# 执行转写
print(f"缓存未命中,执行转写:{audio_file}")
result = self.pipeline(audio_file, language=language)
# 存入缓存
self.cache[cache_key] = result
# 如果缓存太大,清理一些旧条目
if len(self.cache) > 100: # 最多缓存100个结果
# 简单的LRU策略:删除第一个条目
first_key = next(iter(self.cache))
del self.cache[first_key]
return result
5. 常见问题与解决方案
5.1 音频格式问题
问题:我的音频不是WAV格式怎么办?
解决方案:使用下面的代码转换格式:
import librosa
import soundfile as sf
def convert_audio_format(input_file, output_file=None, target_sr=16000):
"""
转换音频格式为WAV
参数:
input_file: 输入文件路径
output_file: 输出文件路径,如果不指定则自动生成
target_sr: 目标采样率,默认16000Hz
支持格式:mp3, m4a, flac, wav, ogg等
"""
if output_file is None:
output_file = input_file.rsplit('.', 1)[0] + '_converted.wav'
try:
# 加载音频,自动重采样并转为单声道
audio, sr = librosa.load(input_file, sr=target_sr, mono=True)
# 保存为WAV格式
sf.write(output_file, audio, target_sr)
print(f"转换成功:{input_file} -> {output_file}")
return output_file
except Exception as e:
print(f"转换失败:{e}")
return None
# 批量转换
def batch_convert(folder_path, target_sr=16000):
"""批量转换文件夹内的所有音频文件"""
import os
from pathlib import Path
folder = Path(folder_path)
supported_extensions = ['.mp3', '.m4a', '.flac', '.ogg', '.wav', '.m4a']
converted_files = []
for ext in supported_extensions:
for audio_file in folder.glob(f"*{ext}"):
print(f"处理:{audio_file.name}")
output_file = convert_audio_format(str(audio_file), target_sr=target_sr)
if output_file:
converted_files.append(output_file)
print(f"\n转换完成,共处理 {len(converted_files)} 个文件")
return converted_files
5.2 识别准确率问题
问题:有些专业术语识别不准怎么办?
解决方案:可以尝试以下方法提高准确率:
def improve_recognition_accuracy(audio_path, language="zh"):
"""
提高识别准确率的方法
"""
from qwen_asr import Qwen3ASRPipeline
# 1. 确保使用正确的语言
pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
# 2. 音频预处理
import librosa
import noisereduce as nr
import soundfile as sf
# 加载音频
audio, sr = librosa.load(audio_path, sr=16000, mono=True)
# 降噪
print("进行降噪处理...")
audio_denoised = nr.reduce_noise(y=audio, sr=sr)
# 音量归一化
print("进行音量归一化...")
import numpy as np
audio_normalized = audio_denoised / np.max(np.abs(audio_denoised)) * 0.9
# 保存处理后的音频
processed_file = audio_path.replace('.wav', '_processed.wav')
sf.write(processed_file, audio_normalized, sr)
# 3. 尝试不同的语言设置
results = {}
# 自动检测
print("尝试自动检测语言...")
result_auto = pipeline(processed_file, language="auto")
results["auto"] = result_auto
# 指定语言
print(f"尝试指定语言:{language}...")
result_specified = pipeline(processed_file, language=language)
results["specified"] = result_specified
# 4. 清理临时文件
import os
os.remove(processed_file)
return results
# 使用上下文提示(如果知道会议主题)
def transcribe_with_context(audio_path, context_keywords=None):
"""
使用上下文关键词提高准确率
原理:虽然Qwen3-ASR-1.7B不支持直接传入上下文,
但我们可以通过后处理来修正一些明显的错误
"""
from qwen_asr import Qwen3ASRPipeline
pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
# 先进行普通转写
raw_text = pipeline(audio_path, language="zh")
# 如果有上下文关键词,尝试修正
if context_keywords:
corrected_text = raw_text
# 简单的关键词替换(实际应用可能需要更复杂的逻辑)
for wrong, correct in context_keywords.items():
if wrong in corrected_text:
print(f"检测到可能错误:'{wrong}' -> '{correct}'")
corrected_text = corrected_text.replace(wrong, correct)
return {
"raw": raw_text,
"corrected": corrected_text,
"corrections": len(context_keywords)
}
return {"raw": raw_text, "corrected": raw_text, "corrections": 0}
# 使用示例
if __name__ == "__main__":
# 假设这是一个技术会议的录音,包含一些专业术语
context_keywords = {
"拍森": "Python",
"加瓦": "Java",
"艾哎": "AI",
"机器学习": "机器学习",
"深度学习": "深度学习",
"神经网络": "神经网络"
}
result = transcribe_with_context("tech_meeting.wav", context_keywords)
print("原始转写:")
print(result["raw"])
print("\n修正后:")
print(result["corrected"])
print(f"\n共修正 {result['corrections']} 处")
5.3 内存和性能问题
问题:处理长音频时内存不足怎么办?
解决方案:使用分段处理和内存优化:
def transcribe_large_audio_safely(audio_path, max_chunk_size_mb=50, language="zh"):
"""
安全处理大音频文件,避免内存溢出
参数:
audio_path: 音频文件路径
max_chunk_size_mb: 每个chunk的最大大小(MB)
language: 语言代码
"""
import os
import math
# 获取文件大小
file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
print(f"音频文件大小:{file_size_mb:.1f} MB")
# 计算需要分成多少段
# 假设1分钟音频约1MB(16kHz, 单声道, 16bit)
estimated_duration_minutes = file_size_mb
chunks_needed = math.ceil(file_size_mb / max_chunk_size_mb)
print(f"预计时长:{estimated_duration_minutes:.1f} 分钟")
print(f"需要分成 {chunks_needed} 段处理")
if chunks_needed == 1:
# 文件不大,直接处理
from qwen_asr import Qwen3ASRPipeline
pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
return pipeline(audio_path, language=language)
else:
# 需要分段处理
chunk_duration_minutes = estimated_duration_minutes / chunks_needed
print(f"每段约 {chunk_duration_minutes:.1f} 分钟")
# 调用之前的分段处理函数
full_text, _ = transcribe_long_meeting(
audio_path,
chunk_minutes=chunk_duration_minutes,
language=language
)
return full_text
# 内存优化配置
def get_memory_optimized_pipeline(device="cuda"):
"""
获取内存优化版的pipeline
"""
from qwen_asr import Qwen3ASRPipeline
import torch
# 根据可用内存选择配置
if device == "cuda":
# 检查GPU内存
free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)
free_memory_gb = free_memory / (1024**3)
print(f"GPU可用内存:{free_memory_gb:.1f} GB")
if free_memory_gb < 4:
print("显存不足,使用CPU模式")
device = "cpu"
if device == "cpu":
# CPU模式,使用更节省内存的配置
pipeline = Qwen3ASRPipeline.from_pretrained(
"Qwen/Qwen3-ASR-1.7B",
device_map="cpu",
low_cpu_mem_usage=True,
torch_dtype=torch.float32
)
else:
# GPU模式,使用半精度节省显存
pipeline = Qwen3ASRPipeline.from_pretrained(
"Qwen/Qwen3-ASR-1.7B",
device_map="cuda",
torch_dtype=torch.float16 # 半精度
)
return pipeline
6. 实际应用案例
6.1 公司会议纪要自动化系统
下面是一个完整的公司会议纪要自动化系统示例:
import os
import schedule
import time
from datetime import datetime
from pathlib import Path
class MeetingTranscriptionSystem:
"""会议转写自动化系统"""
def __init__(self, config_file="config.json"):
self.config = self._load_config(config_file)
self.setup_directories()
# 加载模型
from qwen_asr import Qwen3ASRPipeline
self.pipeline = Qwen3ASRPipeline.from_pretrained("Qwen/Qwen3-ASR-1.7B")
print("会议转写系统初始化完成")
def _load_config(self, config_file):
"""加载配置文件"""
import json
default_config = {
"watch_dirs": ["./meetings/recordings"], # 监控的目录
"output_dir": "./meetings/transcriptions", # 输出目录
"processed_dir": "./meetings/processed", # 已处理目录
"file_patterns": ["*.wav", "*.mp3", "*.m4a"], # 监控的文件类型
"language": "auto", # 识别语言
"auto_process": True, # 是否自动处理新文件
"generate_minutes": True, # 是否生成会议纪要
"archive_original": True # 是否归档原始文件
}
if os.path.exists(config_file):
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
default_config.update(user_config)
# 保存配置
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2, ensure_ascii=False)
return default_config
def setup_directories(self):
"""创建必要的目录"""
for dir_path in [self.config["output_dir"],
self.config["processed_dir"]] + self.config["watch_dirs"]:
Path(dir_path).mkdir(parents=True, exist_ok=True)
def process_new_recordings(self):
"""处理新录音文件"""
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 检查新录音文件...")
new_files = []
for watch_dir in self.config["watch_dirs"]:
for pattern in self.config["file_patterns"]:
for audio_file in Path(watch_dir).glob(pattern):
# 检查是否是新文件(今天创建的)
create_time = datetime.fromtimestamp(audio_file.stat().st_ctime)
if create_time.date() == datetime.now().date():
new_files.append(audio_file)
if not new_files:
print("没有发现新录音文件")
return
print(f"发现 {len(new_files)} 个新录音文件")
for audio_file in new_files:
print(f"\n处理文件:{audio_file.name}")
try:
# 转写
transcription = self.transcribe_meeting(audio_file)
# 生成会议纪要
if self.config["generate_minutes"]:
minutes = self.generate_meeting_minutes(
transcription,
audio_file.name
)
self.save_minutes(minutes, audio_file)
# 归档原始文件
if self.config["archive_original"]:
self.archive_file(audio_file)
print(f"✓ 处理完成:{audio_file.name}")
except Exception as e:
print(f"✗ 处理失败:{audio_file.name} - {str(e)}")
def transcribe_meeting(self, audio_file):
"""转写会议录音"""
print(f" 开始转写...")
start_time = time.time()
# 如果是非WAV格式,先转换
if audio_file.suffix.lower() != '.wav':
audio_file = self.convert_to_wav(audio_file)
# 执行转写
text = self.pipeline(str(audio_file), language=self.config["language"])
processing_time = time.time() - start_time
print(f" 转写完成,耗时 {processing_time:.1f} 秒")
# 保存转写结果
output_file = Path(self.config["output_dir"]) / f"{audio_file.stem}_transcribed.txt"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(f"会议录音转写结果\n")
f.write(f"文件:{audio_file.name}\n")
f.write(f"转写时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"处理时长:{processing_time:.1f}秒\n")
f.write("=" * 60 + "\n\n")
f.write(text)
return text
def convert_to_wav(self, audio_file):
"""转换音频格式为WAV"""
import librosa
import soundfile as sf
print(f" 转换格式:{audio_file.suffix} -> .wav")
# 加载音频
audio, sr = librosa.load(audio_file, sr=16000, mono=True)
# 保存为WAV
wav_file = audio_file.with_suffix('.wav')
sf.write(wav_file, audio, sr)
return wav_file
def generate_meeting_minutes(self, transcription, filename):
"""生成会议纪要"""
from .post_processor import MeetingPostProcessor # 假设有后处理模块
processor = MeetingPostProcessor()
# 从文件名提取会议信息
meeting_info = self.extract_meeting_info(filename)
# 生成纪要
minutes = processor.format_meeting_minutes(
transcription,
meeting_title=meeting_info.get("title", "会议纪要"),
participants=meeting_info.get("participants", "")
)
return minutes
def extract_meeting_info(self, filename):
"""从文件名提取会议信息"""
# 简单的文件名解析逻辑
# 例如:2024-01-15_项目评审会_张三李四.wav
name_parts = Path(filename).stem.split('_')
info = {
"date": name_parts[0] if len(name_parts) > 0 else datetime.now().strftime("%Y-%m-%d"),
"title": name_parts[1] if len(name_parts) > 1 else "会议",
"participants": name_parts[2] if len(name_parts) > 2 else ""
}
return info
def save_minutes(self, minutes, original_file):
"""保存会议纪要"""
minutes_file = Path(self.config["output_dir"]) / f"{original_file.stem}_minutes.txt"
with open(minutes_file, 'w', encoding='utf-8') as f:
f.write(minutes)
print(f" 会议纪要已保存:{minutes_file.name}")
def archive_file(self, audio_file):
"""归档原始文件"""
import shutil
target_dir = Path(self.config["processed_dir"]) / datetime.now().strftime("%Y-%m")
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / audio_file.name
shutil.move(audio_file, target_path)
print(f" 原始文件已归档:{target_path}")
def run_daemon(self):
"""运行守护进程,定时检查新文件"""
print("启动会议转写守护进程...")
print(f"监控目录:{self.config['watch_dirs']}")
print(f"输出目录:{self.config['output_dir']}")
print(f"检查频率:每5分钟")
print("按Ctrl+C停止")
# 每5分钟检查一次新文件
schedule.every(5).minutes.do(self.process_new_recordings)
# 立即执行一次
self.process_new_recordings()
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n停止守护进程")
def run_once(self):
"""运行一次处理"""
self.process_new_recordings()
# 使用示例
if __name__ == "__main__":
# 创建系统实例
system = MeetingTranscriptionSystem()
# 运行一次处理
system.run_once()
# 或者运行守护进程(持续监控)
# system.run_daemon()
6.2 与现有系统集成
如果你已经有会议管理系统,可以将转写功能集成进去:
class MeetingSystemIntegration:
"""会议系统集成类"""
@staticmethod
def integrate_with_calendar(calendar_system="outlook"):
"""与日历系统集成"""
# 这里以Outlook为例,其他系统类似
if calendar_system == "outlook":
try:
import win32com.client
outlook = win32com.client.Dispatch("Outlook.Application")
namespace = outlook.GetNamespace("MAPI")
# 获取今天的会议
calendar = namespace.GetDefaultFolder(9) # 日历文件夹
appointments = calendar.Items
appointments.Sort("[Start]")
appointments.IncludeRecurrences = True
today = datetime.now().date()
appointments = appointments.Restrict(
f"[Start] >= '{today.strftime('%m/%d/%Y')}' AND [Start] < '{(today + timedelta(days=1)).strftime('%m/%d/%Y')}'"
)
meetings = []
for appointment in appointments:
meetings.append({
"subject": appointment.Subject,
"start": appointment.Start,
"end": appointment.End,
"location": appointment.Location,
"body": appointment.Body
})
return meetings
except Exception as e:
print(f"连接Outlook失败:{e}")
return []
elif calendar_system == "google":
# Google Calendar集成
# 需要安装google-api-python-client
pass
return []
@staticmethod
def auto_rename_recording(audio_file, meeting_info):
"""根据会议信息自动重命名录音文件"""
if meeting_info:
# 使用会议主题和时间作为文件名
meeting_time = meeting_info.get("start", datetime.now())
if isinstance(meeting_time, str):
meeting_time = datetime.fromisoformat(meeting_time.replace('Z', '+00:00'))
new_name = f"{meeting_time.strftime('%Y-%m-%d_%H%M')}_{meeting_info.get('subject', '会议')}.wav"
new_path = audio_file.parent / new_name
# 重命名文件
audio_file.rename(new_path)
print(f"文件已重命名为:{new_name}")
return new_path
return audio_file
@staticmethod
def send_to_notion(transcription, meeting_info, notion_token, database_id):
"""将转写结果发送到Notion"""
try:
from notion_client import Client
notion = Client(auth=notion_token)
# 创建页面
new_page = {
"parent": {"database_id": database_id},
"properties": {
"标题": {
"title": [
{
"text": {
"content": meeting_info.get("subject", "会议纪要")
}
}
]
},
"日期": {
"date": {
"start": meeting_info.get("start", datetime.now().isoformat())
}
},
"状态": {
"select": {
"name": "已完成"
}
}
},
"children": [
{
"object": "block",
"type": "heading_2",
"heading_2": {
"rich_text": [{"type": "text", "text": {"content": "会议内容"}}]
}
},
{
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": [{"type": "text", "text": {"content": transcription}}]
}
}
]
}
response = notion.pages.create(**new_page)
print(f"已发送到Notion:{response['url']}")
return response
except Exception as e:
print(f"发送到Notion失败:{e}")
return None
@staticmethod
def create_summary_email(transcription, meeting_info, recipients):
"""创建总结邮件"""
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
# 提取关键点
from post_processor import MeetingPostProcessor
processor = MeetingPostProcessor()
key_points = processor.extract_key_points(transcription)
# 创建邮件内容
msg = MIMEMultipart()
msg['Subject'] = f"会议纪要:{meeting_info.get('subject', '会议')}"
msg['From'] = "meeting_transcriber@company.com"
msg['To'] = ", ".join(recipients)
# 正文
body = f"""
会议主题:{meeting_info.get('subject', '会议')}
会议时间:{meeting_info.get('start', datetime.now().strftime('%Y-%m-%d %H:%M'))}
参会人员:{meeting_info.get('participants', '详见邀请')}
会议内容摘要:
{transcription[:500]}...(完整内容请查看附件)
主要讨论要点:
"""
for i, point in enumerate(key_points, 1):
body += f"{i}. {point}\n"
body += """
下一步行动:
1. 待补充
备注:本纪要由语音识别系统自动生成,仅供参考。
"""
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# 添加附件(完整转写)
attachment = MIMEText(transcription, 'plain', 'utf-8')
attachment.add_header('Content-Disposition', 'attachment',
filename=f"会议纪要_{datetime.now().strftime('%Y%m%d')}.txt")
msg.attach(attachment)
return msg
7. 总结
通过这篇教程,你应该已经掌握了使用Qwen3-ASR-1.7B进行会议录音转写的完整流程。我们从最简单的单文件转写开始,一步步实现了批量处理、实时转写、会议纪要生成等高级功能。
7.1 核心要点回顾
- 部署简单:无论是使用镜像一键部署,还是手动安装,都能快速上手
- 多语言支持:中文、英文、日语、韩语、粤语都能识别,还能自动检测语言
- 完全离线:所有处理都在本地完成,数据安全有保障
- 性能优秀:实时因子RTF<0.3,处理速度很快
- 易于集成:可以轻松集成到现有系统中
7.2 实际应用建议
根据我的使用经验,给你几个实用建议:
对于个人使用:
- 直接用镜像部署,最简单快捷
- 每周花10分钟批量处理会议录音
- 结合后处理脚本,自动生成会议纪要
对于团队使用:
- 搭建一个共享的转写服务
- 设置自动监控文件夹,有新录音自动处理
- 将结果自动同步到团队协作工具(如Notion、Confluence)
对于企业级应用:
- 考虑高可用部署,确保服务稳定
- 添加用户管理和权限控制
- 集成到现有的会议管理系统中
- 定期备份转写数据
7.3 遇到的坑和解决方案
在我实际使用中,遇到过这些问题,你可以提前避免:
- 音频格式问题:一定要确保是WAV格式,如果不是就先转换
- 长音频处理:超过10分钟的录音最好分段处理
- 专业术语识别:对于专业会议,可以准备一个术语对照表做后处理
- 多人同时说话:模型对重叠语音的识别还有提升空间,尽量保证录音质量
7.4 下一步可以做什么
如果你对这个方案感兴趣,还可以继续优化:
- 添加说话人分离:结合说话人识别技术,区分不同发言者
- 集成时间戳:为每个句子添加时间戳,方便回听核对
- 情感分析:分析发言者的情感倾向
- 关键词提取:自动提取会议关键词和行动项
- 多模态分析:结合会议PPT、聊天记录等多维度信息
语音转文字技术正在改变我们的工作方式。以前需要人工逐字听写的会议纪要,现在可以自动化完成。虽然现在的准确率还不能达到100%,但对于大多数日常会议已经足够用了。
希望这篇教程能帮你节省大量整理会议记录的时间。技术应该服务于人,而不是增加人的负担。用好这些工具,让我们有更多时间思考真正重要的事情。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐
所有评论(0)