python asyncio 异步调用实例
本文介绍了使用WebSocket 实现ASR音频转文字服务的交互流程,基于Python异步处理代码实现
·
使用场景
采用websocket方式,客户端采集音频后,实现ASR服务转换为文字
交互流程
- 业务后端向算法服务发送start指令,建立连接
- 算法服务立即返回task_id
- 业务端接收返回消息,获取task_id
- 业务后端开始向算法服务发送音频(binary格式)
- 算法服务实时识别音频,在识别到开始指令“患者你好”后,返回sub_task_id
- 业务端接收返回消息,获取task_id(需异步方式发送音频、接收消息)
- 在音频发送结束、并获取sub_task_id后,业务后端向算法服务发送结束指令(入参需包含task_id和sub_task_id)
使用说明
- 异步函数定义
async def 函数名(): - 创建异步任务
audio_task = asyncio.create_task(异步函数名(self)) - 等待异步任务完成
await audio_task #await [任务名] - 调用方式
user = WebSocketUser(websocketAddr, audio)
asyncio.run(user.send_and_receive_message(interval=0.02))
简单样例
#引入库
import asyncio
import websockets
# 异步函数:连接到WebSocket服务器
class WebSocketUser:
async def send_and_receive_message(self):
async def receive_message(self):
print(f"调用异步函数接收信息 receive_message")
wait_time = 1
while wait_time<10 and self.sub_task_id is None:
time.sleep(1)
wait_time += 1
response = self.ws.recv()
print('receive response:', response)
if response and 'sub_task_id' in response:
response_json = json.loads(response)
self.sub_task_id = response_json["header"]["sub_task_id"] #从json中提取key
print(f"打印接收的sub_task_id: {self.sub_task_id}")
async def send_binary_data(self):
print(f"调用异步函数 send_binary_data发送音频")
'''发送音频'''
#创建异步任务:发送音频、接收消息
audio_task = asyncio.create_task(send_binary_data(self))
receive_task = asyncio.create_task(receive_message(self))
#等待音频发送完成
audio_success=await audio_task
# 等待获取sub_task_id 完成
receive_success=await receive_task
# 调用异步函数
if __name__ == "__main__":
user = WebSocketUser(websocketAddr, audio)
asyncio.run(user.send_and_receive_message(interval=0.02)) #异步调用
代码实现
from websocket import create_connection
import asyncio
import json
import time
import uuid
import logging
import wave
import threading
import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
#定义类
class WebSocketUser:
def __init__(self, websocketAddr, audio):
self.ws = None
self._lock = None
self.task_id = None
self.sub_task_id = None
self.websocketAddr = websocketAddr
self.audio = audio
#建立连接
def on_start(self):
self._lock = threading.Lock()
try:
# 显式请求支持二进制
self.ws = create_connection(
self.websocketAddr,
timeout=10
)
self.ws.settimeout(30)
logging.info("WebSocket 连接成功")
except Exception as e:
logging.error(f"连接失败: {e}")
self.ws = None
raise
#关闭连接
def on_stop(self):
if self.ws:
try:
self.ws.close()
logging.info("WebSocket 连接已关闭")
except Exception as e:
logging.warning(f"关闭失败: {e}")
def read_wav_pcm_chunks(self, file_path, chunk_size=320):
"""读取 16kHz 单通道 PCM 数据(跳过头部)"""
try:
with wave.open(file_path, 'rb') as wav:
if wav.getframerate() != 16000:
raise ValueError(f"采样率错误: {wav.getframerate()},应为 16000")
if wav.getnchannels() != 1:
raise ValueError(f"通道数错误: {wav.getnchannels()},应为 1")
if wav.getsampwidth() != 2:
raise ValueError(f"位深错误: {wav.getsampwidth() * 8}bit,应为 16bit")
frames = wav.readframes(wav.getnframes()) # 返回 bytes
for i in range(0, len(frames), chunk_size):
yield frames[i:i + chunk_size]
except Exception as e:
logging.error(f"读取 WAV 失败: {e}")
raise
#定义 发送音频和接收信息为异步函数
async def send_and_receive_message(self, interval=0.02):
if not self.ws:
logging.error("WebSocket 未连接")
return
random_24 = uuid.uuid4().hex[:24]
timestamp = int(time.time() * 1000)
logging.info(f"开始任务(message id): {random_24}")
# # 1. 发送 start(TextMessage)
start_msg = {
"header": {
"namespace": "SpeechRecognizer",
"name": "StartSession",
"message_id": random_24,
"timestamp": timestamp
},
"payload": {
"type": 1
}
}
logging.info(f"就诊录音的start_msg: {start_msg}")
try:
self.ws.send(json.dumps(start_msg))
logging.info("Start 消息发送成功")
response = self.ws.recv()
print('receive response:', response)
if response:
response_json = json.loads(response)
self.task_id = response_json["header"]["task_id"]
print(f"{self.task_id}")
time.sleep(0.05) # 0.05
#定义 接收消息 异步函数
async def receive_message(self):
print(f"调用异步函数接收信息 receive_message")
wait_time = 1
while wait_time<10 and self.sub_task_id is None:
time.sleep(1)
wait_time += 1
response = self.ws.recv()
print('receive response_2:', response)
if response and 'sub_task_id' in response:
response_json = json.loads(response)
self.sub_task_id = response_json["header"]["sub_task_id"]
print(f"打印sub_task_id: {self.sub_task_id}")
#定义 发送音频 异步函数
async def send_binary_data(self):
# 2. 发送 PCM 二进制数据(BinaryMessage)
print(f"调用异步函数 send_binary_data发送音频")
i = 1
sent_bytes = 0
for chunk in self.read_wav_pcm_chunks(self.audio, 640): # 320
if not isinstance(chunk, bytes):
logging.error(f"Chunk 不是 bytes 类型!类型为: {type(chunk)}")
continue
logging.debug(f"发送第 {i} 个 PCM 块,大小: {len(chunk)} 字节")
# 使用 send_binary 明确发送二进制帧
self.ws.send_binary(chunk)
# 或者直接 send(chunk),但 send_binary 更安全
sent_bytes += len(chunk)
time.sleep(interval)
i += 1
logging.info(f"PCM 数据发送完成,共 {i - 1} 块,{sent_bytes} 字节")
time.sleep(0.05)
#创建异步任务:发送音频 和 接收信息
audio_task = asyncio.create_task(send_binary_data(self))
receive_task = asyncio.create_task(receive_message(self))
#等待音频发送完成
audio_success=await audio_task
#等待获取sub_task_id 完成
receive_success=await receive_task
time.sleep(0.05)
#当音频文件中,没有结束指令时,发送下面消息
end_msg = {
"header": {
"namespace": "SpeechRecognizer",
"name": "ForceEndInteraction", #ForceEndInteraction(强制结束) StopSession
"message_id": random_24,
"task_id": self.task_id,
"sub_task_id": self.sub_task_id,
"timestamp": int(time.time() * 1000)
}
}
logging.info(f"就诊录音的end_msg: {end_msg}")
self.ws.send(json.dumps(end_msg))
logging.info("End 消息发送完成")
# 等待服务端处理
time.sleep(0.5)
except Exception as e:
logging.error(f"发送过程中出错: {e}")
raise
if __name__ == "__main__":
now = datetime.datetime.now()
audio = f'F:\\work\\auto_script\\atp-case\\case\\bdry\\audio_file\\王志强.wav'
websocketAddr = 'ws://bdry-asr-backend.stage.dm-ai.com'
user = WebSocketUser(websocketAddr, audio)
try:
user.on_start()
asyncio.run(user.send_and_receive_message(interval=0.02)) #异步调用
except Exception as e:
logging.error(f"测试失败: {e}")
finally:
user.on_stop()
更多推荐
所有评论(0)