打造流式对话 AI:FastAPI + SSE + DeepSeek
在第一周,我们已经完成了 FastAPI 基础与 DeepSeek API 的同步调用。但真正的 AI 对话应该是“边想边说”的,并且能记住上下文。大模型 API 本身是无状态的,每次请求独立。要实现多轮对话,必须由客户端维护历史记录,并在每次请求时发送给后端。存储所有对话,每次发送时深拷贝后传给后端,收到完整回复后再 push 进数组。不是普通的等待,而是“等但不死”——在等待期间,程序可以去做
第二周学习总结:从零实现一个具备多轮记忆和打字机效果的 AI 对话应用
一、背景与目标
在第一周,我们已经完成了 FastAPI 基础与 DeepSeek API 的同步调用。但真正的 AI 对话应该是“边想边说”的,并且能记住上下文。因此第二周的目标是:
- 理解异步编程,为流式输出打下基础
- 使用 SSE(Server-Sent Events)实现逐字推送
- 前端接收流式数据,呈现打字机效果
- 增加多轮对话记忆,让 AI 能记住之前聊过什么
最终产出一个完整的、可本地运行的流式对话应用。
二、技术栈
- 后端:FastAPI + Uvicorn + OpenAI 库(兼容 DeepSeek)
- 前端:原生 HTML/JavaScript + Fetch API + EventSource(后期改用 fetch 流式读取)
- 异步:Python asyncio
- 协议:SSE(Server-Sent Events)
三、异步编程速成
在实现流式输出前,必须先理解 Python 的异步。核心概念如下:
| 概念 | 说明 |
|---|---|
async def |
定义一个协程函数,调用时返回协程对象,不会立即执行 |
await |
等待一个可等待对象(协程、任务、Future),非阻塞,允许事件循环切换任务 |
asyncio.gather() |
并发运行多个协程,等待所有完成 |
asyncio.sleep() |
非阻塞延时,模拟 IO 操作 |
同步与异步的直观对比:
# 同步阻塞版本
def sync_task():
time.sleep(2) # 整个线程卡住 2 秒
return "done"
# 异步非阻塞版本
async def async_task():
await asyncio.sleep(2) # 仅当前协程让出控制权,事件循环可执行其他任务
return "done"
关键理解:await 不是普通的等待,而是“等但不死”——在等待期间,程序可以去做别的事。
四、后端实现 SSE 流式输出
SSE 是一种基于 HTTP 的单向推送协议,非常适合 AI 逐字回复。
4.1 SSE 消息格式
每条消息以 data: 开头,以两个换行符 \\n\\n 结尾。例如:
data: 你
data: 好
data: [DONE]
4.2 FastAPI 中的实现
使用 StreamingResponse 配合异步生成器:
from fastapi.responses import StreamingResponse
from openai import OpenAI
import asyncio
client = OpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"), base_url="https://api.deepseek.com/v1")
async def generate_stream(prompt: str):
stream = client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
stream=True # 关键!
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content # 注意是 delta.content
yield f"data:{content}\n\n"
await asyncio.sleep(0.02) # 模拟打字间隔
yield "data: [DONE]\n\n"
@app.get("/chat-stream")
async def chat_stream(prompt: str):
return StreamingResponse(generate_stream(prompt), media_type="text/event-stream")
要点:
stream=True让 API 返回增量块- 使用
delta.content而非message.content - 结束标志
data: [DONE]\\n\\n前端依靠它关闭连接
五、多轮对话记忆(Memory)
大模型 API 本身是无状态的,每次请求独立。要实现多轮对话,必须由客户端维护历史记录,并在每次请求时发送给后端。
5.1 消息格式
messages = [
{"role": "user", "content": "我叫张三"},
{"role": "assistant", "content": "你好张三!"},
{"role": "user", "content": "我叫什么名字?"}
]
role可以是user、assistant、system- 顺序必须按时间交替排列
5.2 后端接收历史
定义 Pydantic 模型:
class ChatRequest(BaseModel):
message: str
history: list[dict] | None = None
在生成器中合并历史:
async def generate_stream(prompt: str, history: list[dict] | None = None):
messages = []
if history:
messages.extend(history)
messages.append({"role": "user", "content": prompt})
# 调用 API...
5.3 前端维护历史
前端用一个数组 history 存储所有对话,每次发送时深拷贝后传给后端,收到完整回复后再 push 进数组。
六、前端集成:打字机效果(为深入学习原理,仅对基础进行浅层学习)
因为 EventSource 只支持 GET 且无法携带复杂的 JSON,我们改用 fetch + response.body.getReader() 实现流式读取。
核心代码片段:
const response = await fetch('/chat-stream-history', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: prompt, history: currentHistory })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let fullReply = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
// 保存历史
history.push({ role: 'user', content: prompt });
history.push({ role: 'assistant', content: fullReply });
return;
} else {
fullReply += data;
// 实时更新页面
aiMsgDiv.textContent = fullReply;
}
}
}
}
七、踩坑记录与解决方案
| 问题 | 原因 | 解决 |
|---|---|---|
| 流式接口一次性返回全部内容 | stream=False 或用了 .content |
设置 stream=True,提取 .delta.content |
前端收不到 [DONE],历史不保存 |
后端返回 data:[DONE](无空格) |
统一为 data: [DONE]\\n\\n(有空格) |
| AI 记不住之前的对话 | 前端未正确维护 history 数组 |
在收到 [DONE] 后 push 本轮对话 |
| CORS 错误 | 后端未配置跨域 | 添加 CORSMiddleware,开发时 allow_origins=["*"] |
| Postman 请求报 422 | Body 未选择 JSON 格式 | 在 Body 标签页选择 raw 并设为 JSON |
更多推荐

所有评论(0)