python使用websocket服务并在fastAPI中启动websocket服务
【代码】python使用websocket服务。
·
- 依赖
pip install websockets-routes
- 代码
import asyncio import json from typing import Union import websockets import websockets_routes from websockets.legacy.server import WebSocketServerProtocol from websockets_routes import RoutedPath # 初始化一个router对象 router = websockets_routes.Router() # 连接句柄 connections: dict[str, Union[None, WebSocketServerProtocol]] = { 'setting': None, 'fastapi': None, 'administrator': None } flags: dict[str, any] = { 'is_allow_query': True, 'opt_success': False } # 消息监听 @router.route("/command/{identification}") async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath): """ 更新连接句柄 """ if path.params['identification'] == 'setting': connections['setting'] = websocket elif path.params["identification"] == 'fastapi': connections['fastapi'] = websocket elif path.params["identification"] == 'administrator': connections['administrator'] = websocket else: return """ 处理消息 """ async for message in websocket: # 处理setting用户的业务 if path.params['identification'] == 'setting': if connections['fastapi'] is None: await websocket.send("fastapi的websocket未连接") else: msg: dict = json.loads(message) if msg['cmd'] == 'is_allow_query': flags['is_allow_query'] = True if msg['data'] == 'true' else False # 处理fastapi用户的业务 elif path.params["identification"] == 'fastapi': if connections['setting'] is None: await websocket.send("setting的websocket未连接") else: await connections['setting'].send(message) await websocket.send("发送成功") # 处理administrator用户的业务 elif path.params["identification"] == 'administrator': await websocket.send("【administrator】你发给我的消息是:" + json.dumps(message)) # 其他 else: await websocket.send("身份错误") # 启动WebSocket服务器 async def main(): # 启动WebSocket服务 async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089): await asyncio.Future() # run forever def start(): # 启动WebSocket服务 asyncio.run(main())
-
fastAPI中调用
-
连接服务
-
在fastAPI中启动websocket服务
import asyncio import websockets import websockets_routes from websockets.legacy.server import WebSocketServerProtocol from websockets_routes import RoutedPath # 初始化一个router对象 router = websockets_routes.Router() # 消息监听 @router.route("/command/{identification}") async def light_status(websocket: WebSocketServerProtocol, path: RoutedPath): # 收到消息 async for message in websocket: # 处理setting用户的业务 if path.params['identification'] == 'setting': await websocket.send("你发给我的消息是:" + message) # 处理administrator用户的业务 elif path.params["identification"] == 'administrator': await websocket.send("你发给我的消息是:" + message) else: await websocket.send("指令码错误") # 启动WebSocket服务器 async def main(): # 启动WebSocket服务 async with websockets.serve(lambda x, y: router(x, y), "localhost", 8089): await asyncio.Future() # run forever def start(): # 启动WebSocket服务 asyncio.run(main())
if __name__ == "__main__": # 开启一个线程去启动WebSocket服务器 thread = Thread(target=start) thread.start() # 启动Web服务 uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)
更多推荐
已为社区贡献13条内容
所有评论(0)