既然aiohttp也能实现Websocket,那就直接测试应用一下吧

1)先写个服务端test_aiohttp_WSServer.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
' a test aiohttp Websocket Server '

__author__ = 'TianJiang Gui'

import aiohttp
from aiohttp import web
import json

async def hello(request):
    return web.Response(text="Hello, world")

async def websocket_handler(request):

    # ws对象
    ws = web.WebSocketResponse()
    # 等待用户连接
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            if msg.data == 'close':
                await ws.close()
            else:
                #---gtj 注意如果带中文的显示内容
                print('msg.data:',type(msg.data),msg.data)
                jddd=json.loads(msg.data)
                #---gtj 如果带中文就显示正确了
                print('jddd',type(jddd),jddd)
                #---gtj str转换dict
                dicts = eval(jddd)
                #---gtj 从json中获取数据
                name = dicts['name']
                age = dicts['age']
                await ws.send_str('[%s]:%s 你好'%(name,age))
                print('dicts:',type(dicts),dicts)

        elif msg.type == aiohttp.WSMsgType.ERROR:
            print('ws connection closed with exception %s' % ws.exception())

    # 断开连接了
    print('websocket connection closed')
    return ws

if __name__ == '__main__':
    app = web.Application()
    app.add_routes([web.get('/', hello)])
    app.add_routes([web.get('/ws', websocket_handler)])

    web.run_app(app,host='127.0.0.1',port=8900)

代码中注意中文问题:

2)再写个客户端test_aiohttp_WSClient.py用于测试

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
' a test aiohttp Websocket Client '

__author__ = 'TianJiang Gui'

import aiohttp
from aiohttp import web
import asyncio

import json

async def callback(msg):
    print('callback:',msg)

async def websocket(session,url):
    params = {'name': '桂天江', 'age': '33'}
    async with session.ws_connect(url) as ws:
        #---gtj 一定注意中文发送前要采用ensure_ascii=False
        jsondata=json.dumps(params,ensure_ascii=False)
        print('send jsondata:',type(jsondata),jsondata,type(params),params)
        await ws.send_json(jsondata)
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                await callback(msg.data)
            elif msg.type == aiohttp.WSMsgType.CLOSED:
                break
            elif msg.type == aiohttp.WSMsgType.ERROR:
                break
        await ws.close()

async def main(app):
    session = aiohttp.ClientSession()
    await websocket(session,'http://127.0.0.1:8900/ws')

if __name__ == '__main__':
    app = web.Application()
    asyncio.run(main(app))

 此处要注意中文发送前的转换

最后看看运行结果吧:

服务端: 

 客户端:

最后总结:希望此文能帮到正在苦于寻找没有完整的客户端/服务端案例的你

其他前置基础文章请参见:

python小技巧大应用--用aiohttp实现HTTP C/S收发JSON数据

python小技巧大应用--用aiohttp实现HTTP C/S应用小测试

写文不易,请多关注,点赞

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐