FastAPI 使用案例 & 长链接(SSE & WebSocket)& 缓存框架
·
目录
1️⃣ FastAPI 最简路由拆分案例
一、最终目录结构(极简版)
your_project/
├── main.py # 主入口文件,挂载所有路由
└── app/ # 应用目录
└── api/ # 所有二级API路由都放这里
├── user.py # 用户相关接口
└── order.py # 订单相关接口
二、核心文件代码
1. 主入口:main.py
from fastapi import FastAPI
# 导入我们写的二级路由
from app.api.user import router as user_router
from app.api.order import router as order_router
# 初始化FastAPI应用
app = FastAPI(
title="FastAPI路由拆分示例",
version="1.0.0"
)
# ✅ 挂载用户路由
# prefix: 所有用户接口自动加前缀 /api/user
# tags: 自动生成的文档会按标签分类
app.include_router(
user_router,
prefix="/api/user", # 一级路由,访问时拼接上二级路由的接口
tags=["用户模块"]
)
# ✅ 挂载订单路由
app.include_router(
order_router,
prefix="/api/order", # 一级路由
tags=["订单模块"]
)
# 根路径测试
@app.get("/", summary="根路径")
async def root():
return {"message": "FastAPI服务运行中", "docs": "/docs"}
# 所有异常,全部走这里
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
# 统一返回固定格式的json
return JSONResponse(
status_code=500,
content={
"code": 500,
"msg": "服务器内部错误",
"data": None,
"error": str(exc)
}
)
if __name__ == "__main__":
import uvicorn
# 开发环境启动命令
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True # 热加载,当替换 / 修改 .py 文件 → 自动重启加载新代码【生产环境禁止开启】
)
2. 二级路由示例:app/api/user.py
from fastapi import APIRouter
# ✅ 创建二级路由对象,这是路由拆分的核心
router = APIRouter()
# 异步接口示例(I/O密集型推荐)
@router.get("/{user_id}", summary="获取用户信息")
async def get_user(user_id: int):
"""根据用户ID获取用户信息"""
return {
"code": 0,
"data": {
"id": user_id,
"name": f"用户{user_id}",
"email": f"user{user_id}@example.com"
}
}
# 同步接口示例(CPU密集型用)
@router.post("/", summary="创建用户")
def create_user(username: str, password: str):
"""创建新用户"""
return {
"code": 0,
"message": "用户创建成功",
"data": {"username": username}
}
3. 二级路由示例:app/api/order.py
from fastapi import APIRouter
router = APIRouter()
@router.get("/{order_id}", summary="获取订单信息")
async def get_order(order_id: int):
"""根据订单ID获取订单详情"""
return {
"code": 0,
"data": {
"id": order_id,
"status": "已支付",
"amount": 99.9
}
}
@router.delete("/{order_id}", summary="删除订单")
async def delete_order(order_id: int):
"""删除指定订单"""
return {
"code": 0,
"message": f"订单{order_id}删除成功"
}
三、启动与测试
Uvicorn + FastAPI:(Uvicorn 高性能 Web 服务器)
async def 异步接口→ 走原生协程事件循环,框架无默认并发数量上限,非阻塞高并发;仅受操作系统连接数、下游数据库 / 缓存连接池、服务器资源限制。普通 def 同步接口→ FastAPI 自动内置线程池处理,默认最大并发 40,超出请求排队等待线程释放。
- 安装依赖 fastapi uvicorn
pip install fastapi uvicorn
- 开发环境启动服务
python main.py
- 生产环境启动服务
# 在main.py同级目录执行
uvicorn main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \ # 核心:设置为CPU核心数(利用多核能力,并发能力 × 4)
--limit-concurrency 1000 \ # 最大并发连接
--loop uvloop \ # 高性能异步
--http httptools # 高性能解析
- 访问测试
访问格式:http://localhost:8000{一级路由}{二级路由}
- 自动生成的接口文档:http://localhost:8000/docs
- 用户接口:http://localhost:8000/api/user/1
- 订单接口:http://localhost:8000/api/order/1001
四、关键说明与常见坑
- 路由自动分类:在
include_router时加tags,Swagger文档会自动按模块分组,非常清晰 - 前缀统一管理:
prefix="/api/user"让所有该模块的接口自动加前缀,不用每个接口都写 - 并发特性保留:路由拆分后,
async def和普通def的并发特性完全不变 - ❌ 常见错误:忘记在
main.py里include_router,或者写错导入路径 - 扩展方式:新增模块时,只需要在
app/api下新建文件,然后在main.py里挂载即可
2️⃣ fastapi-cache2 缓存框架
特点 :
- 集成简单,可用于接口和普通函数上
- 支持 Redis、Memcached、DynamoDB、内存缓存
- 自动处理 Pydantic 模型序列化
- 支持 HTTP 缓存头(ETag、Cache-Control)
安装
pip install “fastapi-cache2[redis]”
from contextlib import asynccontextmanager
from typing import AsyncIterator, Any, List
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
# app.py(项目入口)
# 初始化的是一个 全局缓存管理器 ,之后在任何地方使用 @cache 都会自动复用这个配置,不需要重复初始化。
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""应用启动时连接 Redis 并初始化缓存"""
redis = aioredis.from_url("redis://localhost:6379/0") # /0 表示使用第 0 号数据库
# prefix 为缓存键前缀,避免与其他应用冲突
FastAPICache.init(RedisBackend(redis), prefix="myapp:cache")
yield
# 创建 FastAPI 应用实例
app = FastAPI(lifespan=lifespan)
# 可以在其他任何文件中使用@cache注解,直接导入 @cache 使用即可,不需要再初始化
@cache(expire=300)
async def get_prediction(model_name: str, features: List[float]) -> dict[str, Any]:
"""
缓存返回结果,有效期 300 秒
缓存键生成规则:默认由函数模块名 + 函数名 + 参数值组成 MD5 哈希
"""
return {"prediction": 0.8}
3️⃣ SSE 流式响应案例
SSE(Server-Sent Events)建立单向 HTTP 长连接(后端 -> 前端),
自动断线重连,
适用场景:AI 对话(逐字输出)、通知、流式输出。
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
async def generate():
"""
业务函数:实时输出数据到前端
"""
messages = [
"第一条消息",
"第二条消息",
"推送完成"
]
for msg in messages:
# yield 实时输出数据到前端
yield {"event": "message", "data": msg}
await asyncio.sleep(1) # 模拟间隔
@app.get("/stream")
async def stream():
"""SSE 流式推送:一次请求,后端持续推送"""
# 将函数 generate() 包装成符合 SSE 规范的 HTTP 响应
return EventSourceResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
4️⃣ WebSocket 流式响应案例
业务场景:
- 后端收到前端消息后,不等待全部处理完成,而是边处理边推送,
- 实现类似 ChatGPT 的逐字输出效果。
特点:
- 一次连接,多次对话(长连接复用)
- 后端逐段推送,前端实时展示
- 降低用户等待感,提升交互体验
典型应用:
- AI 对话(逐字输出)
- 数据流实时展示
- 实时日志推送
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws/chat")
async def ws_chat(websocket: WebSocket):
# 接受 WebSocket 建立长连接
await websocket.accept()
try:
# 循环不退出,持续监听
while True:
# 接收前端发送的消息(阻塞等待,收到消息才继续)
message = await websocket.receive_text()
# 模拟流式返回:逐段发送数据
response_parts = [
"你好,",
"我是",
"AI",
"助手,"
]
for part in response_parts:
# 实时推送数据到前端
await websocket.send_text(part)
await asyncio.sleep(0.3) # 模拟生成延迟
# 前端断开,会触发 WebSocketDisconnect 异常
except WebSocketDisconnect:
print("客户端断开连接,FastAPI 会自动清理连接资源")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
4.1. HTML 测试建立长链接
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>WebSocket 测试 - qwen-turbo</title>
<style>
body { font-family: ui-sans-serif, system-ui, -apple-system; margin: 24px; max-width: 800px; }
.row { margin-bottom: 16px; }
label { display: block; margin-bottom: 6px; font-weight: 500; }
input[type="text"], input[type="number"], textarea, select {
width: 100%; padding: 10px; border: 1px solid #ccc; border-radius: 6px; box-sizing: border-box; font-size: 14px;
}
textarea { min-height: 100px; resize: vertical; }
button {
padding: 10px 16px; background: #0078d4; color: white; border: none; border-radius: 6px; cursor: pointer; margin-right: 8px;
font-size: 14px;
}
button:disabled { background: #cccccc; cursor: not-allowed; }
pre {
background: #f5f5f5; border: 1px solid #e0e0e0; padding: 12px; border-radius: 6px;
min-height: 60px; white-space: pre-wrap; font-family: inherit; margin: 0; color: #333;
}
.log { color: #666; font-size: 12px; margin-top: 4px; }
</style>
</head>
<body>
<h2>💬 WebSocket 与 Qwen 对话测试</h2>
<div class="row">
<label>WebSocket 地址</label>
<input id="wsUrl" value="ws://127.0.0.1:8000/ws/chat" placeholder="ws://your-server/ws/chat" />
</div>
<div class="row">
<label>提示词 (Prompt)</label>
<textarea id="prompt" rows="3">用一句话解释 Transformer 是什么?</textarea>
</div>
<div class="row">
<label>模型名称</label>
<input id="model" value="qwen-turbo" />
</div>
<div class="row">
<label>温度 temperature (0~2)</label>
<input id="temperature" type="number" step="0.1" min="0" max="2" value="0.7" placeholder="0.7" />
</div>
<div class="row">
<button id="connectBtn">🔗 连接</button>
<button id="sendBtn" disabled>📤 发送</button>
<button id="closeBtn" disabled>❌ 断开</button>
</div>
<h3>输出</h3>
<pre id="output"></pre>
<div class="log" id="statusLog"></div>
<script>
let ws = null;
const outputEl = document.getElementById('output');
const statusLogEl = document.getElementById('statusLog');
function logStatus(msg) {
statusLogEl.textContent = `[${new Date().toLocaleTimeString()}] ${msg}`;
}
function appendOutput(text) {
outputEl.textContent += text;
}
function clearOutput() {
outputEl.textContent = '';
}
document.getElementById('connectBtn').onclick = () => {
const url = document.getElementById('wsUrl').value.trim();
if (!url.startsWith('ws://') && !url.startsWith('wss://')) {
alert('请输入有效的 WebSocket URL,如:ws://127.0.0.1:8000/ws/chat');
return;
}
ws = new WebSocket(url);
clearOutput();
logStatus('正在连接...');
ws.onopen = () => {
logStatus('✅ 已连接');
document.getElementById('sendBtn').disabled = false;
document.getElementById('closeBtn').disabled = false;
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.event === 'start') {
appendOutput(`【使用模型:${data.model}】\n`);
} else if (data.event === 'delta') {
appendOutput(data.data);
} else if (data.event === 'done') {
appendOutput("\n\n✅ 回答结束。\n");
} else if (data.event === 'error') {
appendOutput(`\n❌ 错误:${data.message}\n`);
} else {
appendOutput(`\nℹ️ ${JSON.stringify(data)}\n`);
}
} catch (e) {
appendOutput(`\n📦 ${event.data}\n`);
}
};
ws.onerror = (err) => {
logStatus('❌ 连接出错');
console.error('WebSocket error:', err);
};
ws.onclose = () => {
logStatus('🔌 连接已关闭');
document.getElementById('sendBtn').disabled = true;
document.getElementById('closeBtn').disabled = true;
};
};
document.getElementById('sendBtn').onclick = () => {
if (!ws || ws.readyState !== WebSocket.OPEN) {
alert('请先连接!');
return;
}
const prompt = document.getElementById('prompt').value.trim();
if (!prompt) {
alert('请输入提示词');
return;
}
const model = document.getElementById('model').value.trim() || 'qwen-turbo';
const temperatureStr = document.getElementById('temperature').value.trim();
const payload = { prompt, model };
if (temperatureStr) payload.temperature = Number(temperatureStr);
ws.send(JSON.stringify(payload));
clearOutput();
appendOutput("🧑💻:" + prompt + "\n\n🤖:");
};
document.getElementById('closeBtn').onclick = () => {
if (ws) {
ws.close(1000, "用户手动断开");
}
};
</script>
</body>
</html>
更多推荐
所有评论(0)