使用场景

采用websocket方式,客户端采集音频后,实现ASR服务转换为文字

交互流程

  • 业务后端向算法服务发送start指令,建立连接
  • 算法服务立即返回task_id
  • 业务端接收返回消息,获取task_id
  • 业务后端开始向算法服务发送音频(binary格式)
  • 算法服务实时识别音频,在识别到开始指令“患者你好”后,返回sub_task_id
  • 业务端接收返回消息,获取task_id(需异步方式发送音频、接收消息)
  • 在音频发送结束、并获取sub_task_id后,业务后端向算法服务发送结束指令(入参需包含task_id和sub_task_id

使用说明

  • 异步函数定义
    async def 函数名():
  • 创建异步任务
    audio_task = asyncio.create_task(异步函数名(self))
  • 等待异步任务完成
    await audio_task #await [任务名]
  • 调用方式
    user = WebSocketUser(websocketAddr, audio)
    asyncio.run(user.send_and_receive_message(interval=0.02))

简单样例

#引入库
import asyncio
import websockets

# 异步函数:连接到WebSocket服务器
class WebSocketUser:
    async def send_and_receive_message(self):
    
        async def receive_message(self):
                print(f"调用异步函数接收信息 receive_message")
                wait_time = 1
                while wait_time<10 and self.sub_task_id is None:
                    time.sleep(1)
                    wait_time += 1
                    response = self.ws.recv()
                    print('receive response:', response)

                    if response and 'sub_task_id' in response:
                        response_json = json.loads(response)
                        self.sub_task_id = response_json["header"]["sub_task_id"]  #从json中提取key
                    print(f"打印接收的sub_task_id: {self.sub_task_id}")


            async def send_binary_data(self):
                    print(f"调用异步函数 send_binary_data发送音频")
                '''发送音频'''
				  
				  #创建异步任务:发送音频、接收消息
            audio_task = asyncio.create_task(send_binary_data(self))
            receive_task = asyncio.create_task(receive_message(self))

            #等待音频发送完成
            audio_success=await audio_task
            # 等待获取sub_task_id 完成
            receive_success=await receive_task

# 调用异步函数
if __name__ == "__main__":
   
    user = WebSocketUser(websocketAddr, audio)
    asyncio.run(user.send_and_receive_message(interval=0.02))   #异步调用
    

代码实现

from websocket import create_connection
import asyncio
import json
import time
import uuid
import logging
import wave
import threading
import datetime


# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

#定义类
class WebSocketUser:
    def __init__(self, websocketAddr, audio):

        self.ws = None
        self._lock = None
        self.task_id = None
        self.sub_task_id = None
        self.websocketAddr = websocketAddr
        self.audio = audio
	 
	 #建立连接
    def on_start(self):
        self._lock = threading.Lock()
        try:
            # 显式请求支持二进制
            self.ws = create_connection(
                self.websocketAddr,
                timeout=10
            )
            self.ws.settimeout(30)
            logging.info("WebSocket 连接成功")
        except Exception as e:
            logging.error(f"连接失败: {e}")
            self.ws = None
            raise
	 
	 #关闭连接
    def on_stop(self):
        if self.ws:
            try:
                self.ws.close()
                logging.info("WebSocket 连接已关闭")
            except Exception as e:
                logging.warning(f"关闭失败: {e}")

    def read_wav_pcm_chunks(self, file_path, chunk_size=320):
        """读取 16kHz 单通道 PCM 数据(跳过头部)"""
        try:
            with wave.open(file_path, 'rb') as wav:
                if wav.getframerate() != 16000:
                    raise ValueError(f"采样率错误: {wav.getframerate()},应为 16000")
                if wav.getnchannels() != 1:
                    raise ValueError(f"通道数错误: {wav.getnchannels()},应为 1")
                if wav.getsampwidth() != 2:
                    raise ValueError(f"位深错误: {wav.getsampwidth() * 8}bit,应为 16bit")

                frames = wav.readframes(wav.getnframes())  # 返回 bytes
                for i in range(0, len(frames), chunk_size):
                    yield frames[i:i + chunk_size]
        except Exception as e:
            logging.error(f"读取 WAV 失败: {e}")
            raise
	 
	 #定义 发送音频和接收信息为异步函数
    async def send_and_receive_message(self, interval=0.02):
        if not self.ws:
            logging.error("WebSocket 未连接")
            return

        random_24 = uuid.uuid4().hex[:24]
        timestamp = int(time.time() * 1000)

        logging.info(f"开始任务(message id): {random_24}")

        # # 1. 发送 start(TextMessage)
        start_msg = {
            "header": {
                "namespace": "SpeechRecognizer",
                "name": "StartSession",
                "message_id": random_24,
                "timestamp": timestamp
            },
            "payload": {
                "type": 1
            }
        }

        logging.info(f"就诊录音的start_msg: {start_msg}")
        try:
            self.ws.send(json.dumps(start_msg))
            logging.info("Start 消息发送成功")

            response = self.ws.recv()
            print('receive response:', response)

            if response:
                response_json = json.loads(response)
                self.task_id = response_json["header"]["task_id"]
            print(f"{self.task_id}")

            time.sleep(0.05)  # 0.05
            
					#定义 接收消息 异步函数
            async def receive_message(self):
                print(f"调用异步函数接收信息 receive_message")
                wait_time = 1
                while wait_time<10 and self.sub_task_id is None:
                    time.sleep(1)
                    wait_time += 1
                    response = self.ws.recv()
                    print('receive response_2:', response)

                    if response and 'sub_task_id' in response:
                        response_json = json.loads(response)
                        self.sub_task_id = response_json["header"]["sub_task_id"]
                    print(f"打印sub_task_id: {self.sub_task_id}")

					#定义 发送音频 异步函数
            async def send_binary_data(self):
                # 2. 发送 PCM 二进制数据(BinaryMessage)
                print(f"调用异步函数 send_binary_data发送音频")
                i = 1
                sent_bytes = 0
                for chunk in self.read_wav_pcm_chunks(self.audio, 640):  # 320
                    if not isinstance(chunk, bytes):
                        logging.error(f"Chunk 不是 bytes 类型!类型为: {type(chunk)}")
                        continue

                    logging.debug(f"发送第 {i} 个 PCM 块,大小: {len(chunk)} 字节")

                    # 使用 send_binary 明确发送二进制帧
                    self.ws.send_binary(chunk)
                    # 或者直接 send(chunk),但 send_binary 更安全
                    sent_bytes += len(chunk)
                    time.sleep(interval)
                    i += 1

                logging.info(f"PCM 数据发送完成,共 {i - 1} 块,{sent_bytes} 字节")

            time.sleep(0.05)

					#创建异步任务:发送音频 和 接收信息
            audio_task = asyncio.create_task(send_binary_data(self))
            receive_task = asyncio.create_task(receive_message(self))

            #等待音频发送完成
            audio_success=await audio_task
            #等待获取sub_task_id 完成
            receive_success=await receive_task

            time.sleep(0.05)

            #当音频文件中,没有结束指令时,发送下面消息
            end_msg = {
                "header": {
                    "namespace": "SpeechRecognizer",
                    "name": "ForceEndInteraction",  #ForceEndInteraction(强制结束)  StopSession
                    "message_id": random_24,
                    "task_id": self.task_id,
                    "sub_task_id": self.sub_task_id,
                    "timestamp": int(time.time() * 1000)
                }
            }
            logging.info(f"就诊录音的end_msg: {end_msg}")
            self.ws.send(json.dumps(end_msg))
            logging.info("End 消息发送完成")

            # 等待服务端处理
            time.sleep(0.5)

        except Exception as e:
            logging.error(f"发送过程中出错: {e}")
            raise


if __name__ == "__main__":
    now = datetime.datetime.now()
    audio = f'F:\\work\\auto_script\\atp-case\\case\\bdry\\audio_file\\王志强.wav'
    websocketAddr = 'ws://bdry-asr-backend.stage.dm-ai.com'
    user = WebSocketUser(websocketAddr, audio)
    
    try:
        user.on_start()
        asyncio.run(user.send_and_receive_message(interval=0.02))   #异步调用
    except Exception as e:
        logging.error(f"测试失败: {e}")
    finally:
        user.on_stop()
 




Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐