1. 依赖
    pip install websockets-routes
  2. 代码
    import asyncio
    import json
    from typing import Union
    
    import websockets
    import websockets_routes
    from websockets.legacy.server import WebSocketServerProtocol
    from websockets_routes import RoutedPath
    
    # 初始化一个router对象
    router = websockets_routes.Router()
    
    # 连接句柄
    connections: dict[str, Union[None, WebSocketServerProtocol]] = {
        'setting': None,
        'fastapi': None,
        'administrator': None
    }
    
    flags: dict[str, any] = {
        'is_allow_query': True,
        'opt_success': False
    }
    
    
    # 消息监听
    @router.route("/command/{identification}")
    async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
        """
        更新连接句柄
        """
        if path.params['identification'] == 'setting':
            connections['setting'] = websocket
        elif path.params["identification"] == 'fastapi':
            connections['fastapi'] = websocket
        elif path.params["identification"] == 'administrator':
            connections['administrator'] = websocket
        else:
            return
        """
        处理消息
        """
        async for message in websocket:
            # 处理setting用户的业务
            if path.params['identification'] == 'setting':
                if connections['fastapi'] is None:
                    await websocket.send("fastapi的websocket未连接")
                else:
                    msg: dict = json.loads(message)
                    if msg['cmd'] == 'is_allow_query':
                        flags['is_allow_query'] = True if msg['data'] == 'true' else False
    
            # 处理fastapi用户的业务
            elif path.params["identification"] == 'fastapi':
                if connections['setting'] is None:
                    await websocket.send("setting的websocket未连接")
                else:
                    await connections['setting'].send(message)
                    await websocket.send("发送成功")
    
            # 处理administrator用户的业务
            elif path.params["identification"] == 'administrator':
                await websocket.send("【administrator】你发给我的消息是:" + json.dumps(message))
            # 其他
            else:
                await websocket.send("身份错误")
    
    
    # 启动WebSocket服务器
    async def main():
        # 启动WebSocket服务
        async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
            await asyncio.Future()  # run forever
    
    
    def start():
        # 启动WebSocket服务
        asyncio.run(main())
    
  3. fastAPI中调用

     

  4. 连接服务

  5. 在fastAPI中启动websocket服务

    import asyncio
    import websockets
    import websockets_routes
    from websockets.legacy.server import WebSocketServerProtocol
    from websockets_routes import RoutedPath
    
    # 初始化一个router对象
    router = websockets_routes.Router()
    
    
    # 消息监听
    @router.route("/command/{identification}")
    async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath):
        # 收到消息
        async for message in websocket:
            # 处理setting用户的业务
            if path.params['identification'] == 'setting':
                await websocket.send("你发给我的消息是:" + message)
            # 处理administrator用户的业务
            elif path.params["identification"] == 'administrator':
                await websocket.send("你发给我的消息是:" + message)
            else:
                await websocket.send("指令码错误")
    
    
    # 启动WebSocket服务器
    async def main():
        # 启动WebSocket服务
        async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089):
            await asyncio.Future()  # run forever
    
    
    def start():
        # 启动WebSocket服务
        asyncio.run(main())
    
    if __name__ == "__main__":
        # 开启一个线程去启动WebSocket服务器
        thread = Thread(target=start)
        thread.start()
        # 启动Web服务
        uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐