【Agents篇】11:LangChain & LangGraph 深度解析
本文系统介绍了LangChain和LangGraph两大框架在AI Agent开发中的应用。LangChain通过模块化设计(Models、Messages、Tools等核心组件)和LCEL表达式语言实现灵活组合,解决多模型集成、工具调用等痛点。LangGraph作为编排引擎,通过状态机机制增强复杂工作流处理能力,支持动态路由、人工干预等高级特性。文章详细解析了从组件到状态机的完整开发框架,包含智
🚀 从组件到状态机,全面掌握 AI Agent 开发的核心框架
📑 目录
- 1. 🌟 引言:为什么需要 LangChain 和 LangGraph?
- 2. 🏗️ LangChain 整体架构
- 3. 🔧 LangChain 核心组件详解
- 4. ⛓️ Chain:链式调用的艺术
- 5. 🤖 Agent:智能体的灵魂
- 6. 📊 LangGraph:状态机的力量
- 7. 🔄 LangGraph 高级特性
- 8. 🛠️ 实战案例
- 9. 🎯 LangChain vs LangGraph:如何选择?
- 10. 🔮 最佳实践与性能优化
- 11. 📚 总结与展望
- 12. 📖 参考文献
1. 🌟 引言:为什么需要 LangChain 和 LangGraph?

在 AI Agent 开发的浪潮中,如何高效地构建可靠、可扩展的智能应用成为了开发者面临的核心挑战。LangChain 和 LangGraph 作为 LangChain 公司推出的两大核心框架,为开发者提供了从快速原型到生产部署的完整解决方案。
💡 思考:传统 LLM 应用开发面临哪些痛点?
在没有统一框架之前,开发者需要:
- 手动处理与多个 LLM 提供商的 API 集成
- 自行实现对话历史管理和上下文窗口控制
- 从零构建工具调用和结果处理逻辑
- 缺乏标准化的 Agent 执行流程
- 难以实现复杂的多步骤工作流和状态管理
🤔 解答:LangChain 和 LangGraph 如何解决这些问题?
| 挑战 | LangChain 解决方案 | LangGraph 增强能力 |
|---|---|---|
| 多模型集成 | 统一的 Chat Model 接口 | 动态模型选择 |
| 工具调用 | @tool 装饰器 + 自动 Schema | 工具状态追踪 |
| 对话管理 | Messages + Memory | 持久化检查点 |
| 复杂工作流 | Chain 链式调用 | 图状态机编排 |
| 人工干预 | 基础回调支持 | Human-in-the-Loop |
| 错误恢复 | 简单重试 | 持久化执行恢复 |
┌─────────────────────────────────────────────────────────────────┐
│ LangChain 生态系统全景图 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LangChain │───▶│ LangGraph │───▶│ LangSmith │ │
│ │ (高层API) │ │ (编排引擎) │ │ (可观测性) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Integrations (100+ 集成) │ │
│ │ OpenAI │ Anthropic │ Google │ Azure │ HuggingFace │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
2. 🏗️ LangChain 整体架构
2.1 核心设计理念
LangChain 的设计遵循几个核心原则:
- 模块化(Modularity):每个组件都可以独立使用,也可以组合使用
- 可组合性(Composability):通过 LCEL 表达式语言实现灵活的组件组合
- 互换性(Interoperability):统一接口让不同提供商的模型可以无缝切换
- 可观测性(Observability):与 LangSmith 深度集成,提供完整的调试和监控能力
2.2 模块化架构图
┌────────────────────────────────────────────────────────────────────────────┐
│ LangChain 核心架构 │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Application Layer (应用层) │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Agents │ │ Chains │ │ Workflows │ │ │
│ │ │ (智能体) │ │ (链式调用) │ │ (工作流) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Component Layer (组件层) │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Models │ │Messages │ │ Tools │ │ Prompts │ │ Memory │ │ │
│ │ │ 模型 │ │ 消息 │ │ 工具 │ │ 提示词 │ │ 记忆 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Integration Layer (集成层) │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ OpenAI │ │Anthropic│ │ Google │ │ Azure │ │ Bedrock │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
3. 🔧 LangChain 核心组件详解
3.1 Models(模型层)
Models 是 LangChain 的基础,提供了统一的接口来访问各种 LLM 提供商。
3.1.1 初始化模型
from langchain.chat_models import init_chat_model
# 方式一:使用 init_chat_model(推荐)
model = init_chat_model(
"claude-sonnet-4-5-20250929", # 模型名称
temperature=0.7, # 温度参数
max_tokens=1000, # 最大 token 数
timeout=30 # 超时时间
)
# 方式二:使用 "provider:model" 格式
model = init_chat_model("openai:gpt-4o")
# 方式三:直接使用提供商类
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-4o",
temperature=0.1,
max_tokens=1000
)
3.1.2 模型调用方式
# 1. 同步调用 - invoke
response = model.invoke("为什么天空是蓝色的?")
print(response.content)
# 2. 流式输出 - stream
for chunk in model.stream("讲一个关于 AI 的故事"):
print(chunk.content, end="", flush=True)
# 3. 批量处理 - batch
responses = model.batch([
"什么是机器学习?",
"什么是深度学习?",
"什么是强化学习?"
])
3.1.3 模型能力探测
# 获取模型能力配置
profile = model.profile
print(f"最大输入 tokens: {profile.get('max_input_tokens')}")
print(f"支持图像输入: {profile.get('image_inputs')}")
print(f"支持工具调用: {profile.get('tool_calling')}")
print(f"支持推理输出: {profile.get('reasoning_output')}")
💡 思考:如何在运行时动态选择模型?
在实际应用中,我们可能需要根据任务复杂度、成本考虑或用户等级来动态选择模型。
🤔 解答:使用 Middleware 实现动态模型路由
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
# 定义不同级别的模型
basic_model = ChatOpenAI(model="gpt-4.1-mini")
advanced_model = ChatOpenAI(model="gpt-4.1")
@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
"""根据对话复杂度动态选择模型"""
message_count = len(request.state["messages"])
# 对话轮次超过 10 轮,使用高级模型
if message_count > 10:
model = advanced_model
else:
model = basic_model
return handler(request.override(model=model))
# 创建使用动态模型选择的 Agent
agent = create_agent(
model=basic_model,
tools=tools,
middleware=[dynamic_model_selection]
)
3.2 Messages(消息系统)
LangChain 的消息系统是与 LLM 交互的核心抽象。
from langchain.messages import (
HumanMessage, # 用户消息
AIMessage, # AI 回复
SystemMessage, # 系统提示
ToolMessage # 工具调用结果
)
# 构建对话历史
conversation = [
SystemMessage(content="你是一个专业的技术助手。"),
HumanMessage(content="什么是 LangChain?"),
AIMessage(content="LangChain 是一个用于构建 LLM 应用的框架..."),
HumanMessage(content="它和 LangGraph 有什么区别?")
]
# 调用模型
response = model.invoke(conversation)
消息类型对照表
| 消息类型 | 角色标识 | 用途 |
|---|---|---|
SystemMessage |
system | 设置 AI 的行为和角色 |
HumanMessage |
user | 用户输入 |
AIMessage |
assistant | AI 的回复 |
ToolMessage |
tool | 工具执行结果 |
3.3 Tools(工具系统)
Tools 赋予 Agent 与外部世界交互的能力。
3.3.1 基础工具定义
from langchain.tools import tool
@tool
def search_database(query: str, limit: int = 10) -> str:
"""在客户数据库中搜索匹配的记录。
Args:
query: 搜索关键词
limit: 返回结果的最大数量
"""
# 实际的数据库查询逻辑
return f"找到 {limit} 条匹配 '{query}' 的记录"
@tool
def get_weather(location: str) -> str:
"""获取指定城市的天气信息。
Args:
location: 城市名称
"""
# 实际的天气 API 调用
return f"{location}:晴天,25°C"
# 将工具绑定到模型
model_with_tools = model.bind_tools([search_database, get_weather])
3.3.2 高级工具:访问运行时上下文
from langchain.tools import tool, ToolRuntime
from langgraph.types import Command
@tool
def get_user_account(runtime: ToolRuntime) -> str:
"""获取当前用户的账户信息。"""
# 访问运行时上下文
user_id = runtime.context.user_id
# 访问当前状态
messages = runtime.state["messages"]
# 访问持久化存储
user_info = runtime.store.get(("users",), user_id)
return f"用户 {user_id} 的账户信息: {user_info}"
@tool
def update_conversation_topic(topic: str, runtime: ToolRuntime) -> Command:
"""更新对话主题并修改 Agent 状态。"""
return Command(
update={"current_topic": topic}, # 更新状态
goto="topic_handler" # 路由到特定节点
)
3.3.3 工具错误处理
from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage
@wrap_tool_call
def handle_tool_errors(request, handler):
"""统一处理工具执行错误"""
try:
return handler(request)
except Exception as e:
# 返回自定义错误消息给模型
return ToolMessage(
content=f"工具执行失败: {str(e)}。请检查参数后重试。",
tool_call_id=request.tool_call["id"]
)
agent = create_agent(
model="gpt-4o",
tools=[search_database, get_weather],
middleware=[handle_tool_errors]
)
3.4 Prompts(提示词模板)
3.4.1 基础提示词模板
from langchain.prompts import ChatPromptTemplate
# 创建提示词模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个{role},请用{language}回答问题。"),
("human", "{question}")
])
# 格式化模板
messages = prompt.format_messages(
role="专业程序员",
language="中文",
question="如何学习 Python?"
)
response = model.invoke(messages)
3.4.2 动态系统提示
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest
@dynamic_prompt
def user_role_prompt(request: ModelRequest) -> str:
"""根据用户角色动态生成系统提示"""
user_role = request.runtime.context.get("user_role", "user")
base_prompt = "你是一个专业的技术助手。"
if user_role == "expert":
return f"{base_prompt}请提供详细的技术分析和代码示例。"
elif user_role == "beginner":
return f"{base_prompt}请用简单易懂的语言解释,避免使用专业术语。"
return base_prompt
agent = create_agent(
model="gpt-4o",
tools=tools,
middleware=[user_role_prompt]
)
3.5 Memory(记忆系统)
记忆系统分为短期记忆(会话内)和长期记忆(跨会话)。
3.5.1 短期记忆:自定义状态
from langchain.agents import AgentState, create_agent
# 定义扩展状态
class CustomState(AgentState):
user_preferences: dict
conversation_summary: str
interaction_count: int
agent = create_agent(
model="gpt-4o",
tools=tools,
state_schema=CustomState
)
# 调用时传入初始状态
result = agent.invoke({
"messages": [{"role": "user", "content": "你好"}],
"user_preferences": {"language": "zh-CN", "verbosity": "detailed"},
"conversation_summary": "",
"interaction_count": 0
})
3.5.2 长期记忆:使用 Store
from langgraph.store.memory import InMemoryStore
from langchain.tools import tool, ToolRuntime
# 创建持久化存储
store = InMemoryStore()
@tool
def remember_user_preference(key: str, value: str, runtime: ToolRuntime) -> str:
"""记住用户的偏好设置"""
user_id = runtime.context.user_id
runtime.store.put(("preferences", user_id), key, {"value": value})
return f"已保存偏好: {key}={value}"
@tool
def recall_user_preference(key: str, runtime: ToolRuntime) -> str:
"""回忆用户的偏好设置"""
user_id = runtime.context.user_id
pref = runtime.store.get(("preferences", user_id), key)
return pref.value["value"] if pref else "未找到该偏好设置"
agent = create_agent(
model="gpt-4o",
tools=[remember_user_preference, recall_user_preference],
store=store
)
4. ⛓️ Chain:链式调用的艺术
4.1 什么是 Chain?
Chain 是 LangChain 中最基础的抽象之一,它将多个组件串联起来,形成一个处理流水线。
┌────────────────────────────────────────────────────────────────┐
│ Chain 执行流程 │
├────────────────────────────────────────────────────────────────┤
│ │
│ Input ──▶ [Prompt Template] ──▶ [Model] ──▶ [Output Parser] │
│ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 用户问题 格式化提示词 LLM 生成 结构化输出 │
│ │
└────────────────────────────────────────────────────────────────┘
4.2 LCEL 表达式语言
LCEL(LangChain Expression Language)是 LangChain 的核心表达式语言,使用 | 管道符连接组件。
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
# 定义组件
prompt = ChatPromptTemplate.from_template(
"用一句话解释什么是 {concept}"
)
model = ChatOpenAI(model="gpt-4o")
output_parser = StrOutputParser()
# 使用 LCEL 组合成 Chain
chain = prompt | model | output_parser
# 调用 Chain
result = chain.invoke({"concept": "机器学习"})
print(result) # "机器学习是让计算机从数据中自动学习模式的技术。"
LCEL 的核心优势
# 1. 支持流式输出
for chunk in chain.stream({"concept": "深度学习"}):
print(chunk, end="", flush=True)
# 2. 支持异步调用
import asyncio
async def async_call():
result = await chain.ainvoke({"concept": "神经网络"})
return result
# 3. 支持批量处理
results = chain.batch([
{"concept": "CNN"},
{"concept": "RNN"},
{"concept": "Transformer"}
])
# 4. 支持并行执行
from langchain.schema.runnable import RunnableParallel
parallel_chain = RunnableParallel(
explanation=chain,
example=prompt | model | StrOutputParser()
)
4.3 常用 Chain 类型
4.3.1 检索增强生成(RAG)Chain
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema.runnable import RunnablePassthrough
# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_texts(
texts=["LangChain 是一个 LLM 框架", "LangGraph 用于构建状态机"],
embedding=embeddings
)
retriever = vectorstore.as_retriever()
# RAG 提示词模板
rag_prompt = ChatPromptTemplate.from_template("""
基于以下上下文回答问题:
上下文:
{context}
问题:{question}
答案:
""")
# 构建 RAG Chain
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| ChatOpenAI(model="gpt-4o")
| StrOutputParser()
)
# 调用
answer = rag_chain.invoke("LangChain 是什么?")
4.3.2 路由 Chain
from langchain.schema.runnable import RunnableBranch
# 定义不同的处理链
math_chain = ChatPromptTemplate.from_template(
"你是数学专家。解答:{question}"
) | model | StrOutputParser()
code_chain = ChatPromptTemplate.from_template(
"你是编程专家。解答:{question}"
) | model | StrOutputParser()
general_chain = ChatPromptTemplate.from_template(
"回答:{question}"
) | model | StrOutputParser()
# 分类函数
def classify_question(input_dict):
question = input_dict["question"].lower()
if any(word in question for word in ["计算", "数学", "公式"]):
return "math"
elif any(word in question for word in ["代码", "编程", "函数"]):
return "code"
return "general"
# 构建路由 Chain
router_chain = RunnableBranch(
(lambda x: classify_question(x) == "math", math_chain),
(lambda x: classify_question(x) == "code", code_chain),
general_chain # 默认分支
)
5. 🤖 Agent:智能体的灵魂
5.1 Agent 核心概念
Agent 是 LangChain 中最强大的抽象,它将 LLM 与工具结合,形成能够自主决策和执行任务的智能体。
┌─────────────────────────────────────────────────────────────────────┐
│ Agent 工作原理 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ 用户输入 │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Agent 循环 │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │ │
│ │ │ │ 思考 │────▶│ 决策 │────▶│ 执行工具 │ │ │ │
│ │ │ │(Reason) │ │ (Act) │ │ (Tool Call) │ │ │ │
│ │ │ └─────────┘ └─────────┘ └──────┬──────┘ │ │ │
│ │ │ ▲ │ │ │ │
│ │ │ │ ┌─────────────┐ │ │ │ │
│ │ │ └──────────│ 观察结果 │◀────────┘ │ │ │
│ │ │ │(Observation)│ │ │ │
│ │ │ └─────────────┘ │ │ │
│ │ │ │ │ │
│ │ └──────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ (满足停止条件) │ │
│ │ ┌─────────────┐ │ │
│ │ │ 最终答案 │ │ │
│ │ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
5.2 ReAct 模式详解
ReAct(Reasoning + Acting)是 Agent 的核心执行模式,它交替进行推理和行动。
ReAct 执行示例
用户问题: 查找最受欢迎的无线耳机并确认是否有货
================================ Human Message =================================
查找最受欢迎的无线耳机并确认是否有货
================================== AI Message ==================================
💭 思考: 需要先搜索当前最受欢迎的无线耳机产品
Tool Calls:
search_products (call_abc123)
Args:
query: 无线耳机 热销
================================= Tool Message =================================
找到 5 款产品。热销榜单: 1. Sony WH-1000XM5, 2. AirPods Pro...
================================== AI Message ==================================
💭 思考: 找到了热销耳机,需要确认库存情况
Tool Calls:
check_inventory (call_def456)
Args:
product_id: WH-1000XM5
================================= Tool Message =================================
Sony WH-1000XM5: 库存 10 件
================================== AI Message ==================================
根据我的搜索结果:
最受欢迎的无线耳机是 **Sony WH-1000XM5**,目前有 **10 件** 库存。
5.3 创建第一个 Agent
from langchain.agents import create_agent
from langchain.tools import tool
# 定义工具
@tool
def get_weather(city: str) -> str:
"""获取城市天气信息"""
weather_data = {
"北京": "晴天,25°C",
"上海": "多云,28°C",
"深圳": "雷阵雨,30°C"
}
return weather_data.get(city, f"未找到 {city} 的天气信息")
@tool
def search_flights(origin: str, destination: str, date: str) -> str:
"""搜索航班信息"""
return f"从 {origin} 到 {destination},{date} 有 5 个航班可选"
@tool
def book_hotel(city: str, checkin: str, checkout: str) -> str:
"""预订酒店"""
return f"已在 {city} 预订酒店,入住: {checkin},退房: {checkout}"
# 创建 Agent
agent = create_agent(
model="anthropic:claude-sonnet-4-5",
tools=[get_weather, search_flights, book_hotel],
system_prompt="你是一个专业的旅行规划助手。帮助用户规划行程、查询天气、预订航班和酒店。"
)
# 运行 Agent
result = agent.invoke({
"messages": [{
"role": "user",
"content": "我想下周去上海出差3天,帮我查下天气和航班"
}]
})
print(result["messages"][-1].content)
5.4 Middleware 中间件机制
Middleware 提供了在 Agent 执行的各个阶段插入自定义逻辑的能力。
┌──────────────────────────────────────────────────────────────────┐
│ Middleware 执行流程 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ Input ─────────────────────────────────────────────────▶ Output│
│ │ ▲ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │ before_model │ ─ 预处理状态、注入上下文 │ │
│ └──────┬───────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │wrap_model_call│ ─ 动态选择模型、修改工具列表 │ │
│ └──────┬───────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │ Model Call │ ─ 实际调用 LLM │ │
│ └──────┬───────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │wrap_tool_call │ ─ 工具调用前后处理 │ │
│ └──────┬───────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │ after_model │ ─ 后处理、日志记录 │────┘
│ └──────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
自定义 Middleware 示例
from langchain.agents import AgentState, create_agent
from langchain.agents.middleware import AgentMiddleware, ModelRequest
from langchain.agents.middleware.types import ToolCallRequest
from typing import Any
class LoggingMiddleware(AgentMiddleware):
"""日志记录中间件"""
def before_model(self, state: AgentState, runtime) -> dict[str, Any] | None:
"""模型调用前记录"""
print(f"📥 收到消息: {state['messages'][-1].content[:50]}...")
return None
def wrap_model_call(self, request: ModelRequest, handler):
"""包装模型调用"""
import time
start = time.time()
response = handler(request)
elapsed = time.time() - start
print(f"⏱️ 模型响应耗时: {elapsed:.2f}s")
return response
def wrap_tool_call(self, request: ToolCallRequest, handler):
"""包装工具调用"""
tool_name = request.tool_call["name"]
print(f"🔧 调用工具: {tool_name}")
result = handler(request)
print(f"✅ 工具返回: {str(result)[:100]}...")
return result
class PermissionMiddleware(AgentMiddleware):
"""权限控制中间件"""
def wrap_model_call(self, request: ModelRequest, handler):
"""根据用户权限过滤工具"""
user_role = request.runtime.context.get("user_role", "guest")
if user_role == "admin":
# 管理员可以使用所有工具
tools = request.tools
else:
# 普通用户只能使用只读工具
tools = [t for t in request.tools if not t.name.startswith("delete_")]
return handler(request.override(tools=tools))
# 组合使用多个 Middleware
agent = create_agent(
model="gpt-4o",
tools=tools,
middleware=[LoggingMiddleware(), PermissionMiddleware()]
)
6. 📊 LangGraph:状态机的力量
6.1 LangGraph 核心理念
LangGraph 是 LangChain 公司推出的低级别 Agent 编排框架,专注于构建长时间运行的、有状态的工作流。
💡 思考:为什么需要 LangGraph?LangChain Agent 不够用吗?
🤔 解答:
| 场景 | LangChain Agent | LangGraph |
|---|---|---|
| 简单工具调用 | ✅ 足够 | 过于复杂 |
| 线性工作流 | ✅ Chain 解决 | 可选 |
| 复杂分支逻辑 | ❌ 困难 | ✅ 条件边 |
| 多 Agent 协作 | ❌ 需要自行实现 | ✅ 子图 |
| 人工干预 | ❌ 有限 | ✅ 中断/恢复 |
| 故障恢复 | ❌ 从头开始 | ✅ 检查点恢复 |
| 长时间任务 | ❌ 容易超时 | ✅ 持久化执行 |
┌─────────────────────────────────────────────────────────────────────────────┐
│ LangGraph vs LangChain Agent │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ LangChain Agent LangGraph │
│ ┌──────────────────┐ ┌──────────────────────────────┐ │
│ │ │ │ │ │
│ │ ┌──────────┐ │ │ ┌──────┐ ┌──────┐ │ │
│ │ │ LLM │◀─┐│ │ │Node A│───▶│Node B│ │ │
│ │ └────┬─────┘ ││ │ └──┬───┘ └──┬───┘ │ │
│ │ │ ││ │ │ ╲ │ │ │
│ │ ▼ ││ │ │ ╲ ▼ │ │
│ │ ┌──────────┐ ││ │ │ ╲ ┌──────┐ │ │
│ │ │ Tool │──┘│ │ │ ▶│Node C│ │ │
│ │ └──────────┘ │ │ │ └──┬───┘ │ │
│ │ │ │ ▼ │ │ │
│ │ 单一循环模式 │ │ ┌──────┐ │ │ │
│ │ │ │ │Node D│◀──────┘ │ │
│ └──────────────────┘ │ └──────┘ │ │
│ │ │ │
│ │ 灵活的图结构 │ │
│ └──────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
6.2 Graph API 深入剖析
6.2.1 基础图构建
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# 1. 定义状态 Schema
class State(TypedDict):
text: str
count: int
# 2. 定义节点函数
def node_a(state: State) -> dict:
"""处理节点 A"""
return {
"text": state["text"] + " -> A",
"count": state["count"] + 1
}
def node_b(state: State) -> dict:
"""处理节点 B"""
return {
"text": state["text"] + " -> B",
"count": state["count"] + 1
}
# 3. 构建图
graph = StateGraph(State)
# 添加节点
graph.add_node("node_a", node_a)
graph.add_node("node_b", node_b)
# 添加边
graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")
graph.add_edge("node_b", END)
# 4. 编译图
compiled_graph = graph.compile()
# 5. 执行
result = compiled_graph.invoke({
"text": "开始",
"count": 0
})
print(result)
# {'text': '开始 -> A -> B', 'count': 2}
6.3 State(状态管理)
6.3.1 使用 TypedDict 定义状态
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
from langgraph.graph.message import add_messages
from langchain.messages import AnyMessage
class AgentState(TypedDict):
# 基础字段
query: str
# 使用 Annotated 指定归约器
messages: Annotated[list[AnyMessage], add_messages] # 消息累加
steps: Annotated[list[str], add] # 步骤累加
# 普通字段(覆盖更新)
final_answer: str
confidence: float
6.3.2 多 Schema 设计
# 输入 Schema
class InputState(TypedDict):
user_query: str
# 输出 Schema
class OutputState(TypedDict):
final_response: str
source_documents: list[str]
# 内部完整 Schema
class FullState(TypedDict):
user_query: str
retrieved_docs: list[str]
generated_response: str
final_response: str
source_documents: list[str]
# 私有状态(不暴露给输入/输出)
_internal_cache: dict
# 私有状态(节点间通信)
class PrivateState(TypedDict):
intermediate_result: str
# 构建图时指定不同 Schema
builder = StateGraph(
FullState,
input_schema=InputState,
output_schema=OutputState
)
6.4 Nodes(节点)与 Edges(边)
6.4.1 节点类型
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
from dataclasses import dataclass
@dataclass
class AppContext:
user_id: str
session_id: str
# 1. 简单节点
def simple_node(state: State) -> dict:
return {"result": "处理完成"}
# 2. 带配置的节点
def node_with_config(state: State, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
return {"result": f"线程 {thread_id} 处理完成"}
# 3. 带运行时上下文的节点
def node_with_runtime(state: State, runtime: Runtime[AppContext]) -> dict:
user_id = runtime.context.user_id
# 访问持久化存储
user_prefs = runtime.store.get(("users",), user_id)
# 流式输出
runtime.stream_writer(f"正在处理用户 {user_id} 的请求...")
return {"result": f"用户 {user_id} 处理完成"}
# 添加节点
graph.add_node("simple", simple_node)
graph.add_node("with_config", node_with_config)
graph.add_node("with_runtime", node_with_runtime)
6.4.2 边类型
from langgraph.graph import START, END
from typing import Literal
# 1. 普通边:固定跳转
graph.add_edge("node_a", "node_b")
graph.add_edge(START, "node_a")
graph.add_edge("node_b", END)
# 2. 条件边:动态路由
def route_by_confidence(state: State) -> Literal["high_quality", "needs_review"]:
"""根据置信度路由"""
if state["confidence"] > 0.8:
return "high_quality"
return "needs_review"
graph.add_conditional_edges(
"evaluation_node",
route_by_confidence,
{
"high_quality": "output_node",
"needs_review": "human_review_node"
}
)
# 3. 条件入口点
def route_by_intent(state: State) -> Literal["qa", "summarize", "chat"]:
"""根据用户意图路由到不同处理流程"""
intent = classify_intent(state["user_query"])
return intent
graph.add_conditional_edges(START, route_by_intent)
6.5 Reducers(归约器)
Reducers 决定了状态更新时如何合并新旧值。
from typing import Annotated
from operator import add
from langgraph.graph.message import add_messages
class AdvancedState(TypedDict):
# 1. 默认归约器:覆盖
current_step: str # 新值覆盖旧值
# 2. 累加归约器:使用 operator.add
all_steps: Annotated[list[str], add] # 列表累加
token_count: Annotated[int, add] # 数值累加
# 3. 消息归约器:智能合并
messages: Annotated[list[AnyMessage], add_messages]
# 4. 自定义归约器
scores: Annotated[list[float], lambda old, new: old + new if old else new]
# 自定义归约器函数
def merge_dicts(old: dict, new: dict) -> dict:
"""合并字典,新值优先"""
if old is None:
return new
return {**old, **new}
class StateWithCustomReducer(TypedDict):
metadata: Annotated[dict, merge_dicts]
add_messages 归约器详解
from langgraph.graph.message import add_messages, REMOVE_ALL_MESSAGES
from langchain.messages import HumanMessage, AIMessage, RemoveMessage
# add_messages 的智能行为:
# 1. 新消息追加到列表
# 2. 相同 ID 的消息会被更新
# 3. 支持删除操作
# 示例状态更新
state = {"messages": [HumanMessage(content="你好", id="msg1")]}
# 追加新消息
update1 = {"messages": [AIMessage(content="你好!", id="msg2")]}
# 结果: [HumanMessage("你好"), AIMessage("你好!")]
# 更新已有消息(相同 ID)
update2 = {"messages": [HumanMessage(content="你好呀", id="msg1")]}
# 结果: [HumanMessage("你好呀"), AIMessage("你好!")]
# 删除特定消息
update3 = {"messages": [RemoveMessage(id="msg1")]}
# 结果: [AIMessage("你好!")]
# 清空所有消息
update4 = {"messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES)]}
# 结果: []
7. 🔄 LangGraph 高级特性
7.1 条件边与动态路由
7.1.1 使用 Command 实现动态控制流
from langgraph.types import Command
from typing import Literal
def decision_node(state: State) -> Command[Literal["process", "reject", "escalate"]]:
"""根据条件动态决定下一步"""
score = state["evaluation_score"]
if score > 0.9:
return Command(
update={"status": "approved"},
goto="process"
)
elif score < 0.3:
return Command(
update={"status": "rejected", "reason": "低分"},
goto="reject"
)
else:
return Command(
update={"status": "pending_review"},
goto="escalate"
)
# 在子图中跳转到父图节点
def subgraph_node(state: State) -> Command[Literal["parent_handler"]]:
"""从子图跳转到父图"""
return Command(
update={"handoff_data": state["result"]},
goto="parent_handler",
graph=Command.PARENT # 指定跳转到父图
)
7.1.2 使用 Send 实现 Map-Reduce
from langgraph.types import Send
class OverallState(TypedDict):
subjects: list[str]
jokes: Annotated[list[str], add]
class JokeState(TypedDict):
subject: str
def generate_subjects(state: OverallState) -> dict:
"""生成主题列表"""
return {"subjects": ["猫", "狗", "程序员"]}
def continue_to_jokes(state: OverallState) -> list[Send]:
"""为每个主题创建一个并行任务"""
return [
Send("generate_joke", {"subject": subject})
for subject in state["subjects"]
]
def generate_joke(state: JokeState) -> dict:
"""为单个主题生成笑话"""
joke = f"为什么{state['subject']}喜欢编程?因为..."
return {"jokes": [joke]}
# 构建 Map-Reduce 图
graph = StateGraph(OverallState)
graph.add_node("generate_subjects", generate_subjects)
graph.add_node("generate_joke", generate_joke)
graph.add_edge(START, "generate_subjects")
graph.add_conditional_edges("generate_subjects", continue_to_jokes)
graph.add_edge("generate_joke", END)
7.2 Human-in-the-Loop
人工干预是 LangGraph 的核心特性之一。
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
class ApprovalState(TypedDict):
request: str
approval_status: str
reviewer_notes: str
def submit_request(state: ApprovalState) -> dict:
"""提交请求"""
return {"request": state["request"]}
def wait_for_approval(state: ApprovalState) -> dict:
"""等待人工审批 - 使用 interrupt"""
# 中断执行,等待人工输入
approval = interrupt({
"type": "approval_required",
"request": state["request"],
"options": ["approve", "reject", "modify"]
})
return {
"approval_status": approval["decision"],
"reviewer_notes": approval.get("notes", "")
}
def process_approved(state: ApprovalState) -> dict:
"""处理已批准的请求"""
return {"status": "completed"}
def handle_rejection(state: ApprovalState) -> dict:
"""处理被拒绝的请求"""
return {"status": "rejected"}
# 构建审批工作流
graph = StateGraph(ApprovalState)
graph.add_node("submit", submit_request)
graph.add_node("await_approval", wait_for_approval)
graph.add_node("process", process_approved)
graph.add_node("reject", handle_rejection)
graph.add_edge(START, "submit")
graph.add_edge("submit", "await_approval")
graph.add_conditional_edges(
"await_approval",
lambda s: "process" if s["approval_status"] == "approve" else "reject"
)
graph.add_edge("process", END)
graph.add_edge("reject", END)
# 编译时配置检查点
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
# 执行到中断点
config = {"configurable": {"thread_id": "approval-001"}}
result = app.invoke({"request": "申请服务器资源"}, config)
# 人工审批后恢复执行
app.invoke(
Command(resume={"decision": "approve", "notes": "资源已分配"}),
config
)
7.3 持久化与检查点
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
# 1. 内存检查点(开发/测试)
from langgraph.checkpoint.memory import MemorySaver
memory_saver = MemorySaver()
# 2. SQLite 检查点(单机部署)
sqlite_saver = SqliteSaver.from_conn_string("checkpoints.db")
# 3. PostgreSQL 检查点(生产部署)
# postgres_saver = PostgresSaver.from_conn_string(
# "postgresql://user:pass@localhost/checkpoints"
# )
# 编译图时使用检查点
app = graph.compile(checkpointer=memory_saver)
# 执行并保存状态
config = {"configurable": {"thread_id": "session-123"}}
result = app.invoke({"query": "你好"}, config)
# 获取检查点历史
for checkpoint in app.get_state_history(config):
print(f"步骤: {checkpoint.metadata.get('langgraph_step')}")
print(f"状态: {checkpoint.values}")
print("---")
# 回滚到特定检查点
app.update_state(config, {"query": "重新开始"})
7.4 子图与多 Agent 编排
from langgraph.graph import StateGraph, START, END
# ============ 定义子图:研究 Agent ============
class ResearchState(TypedDict):
topic: str
findings: str
def research_node(state: ResearchState) -> dict:
"""执行研究任务"""
return {"findings": f"关于 {state['topic']} 的研究结果..."}
research_graph = StateGraph(ResearchState)
research_graph.add_node("research", research_node)
research_graph.add_edge(START, "research")
research_graph.add_edge("research", END)
research_subgraph = research_graph.compile()
# ============ 定义子图:写作 Agent ============
class WritingState(TypedDict):
content: str
article: str
def writing_node(state: WritingState) -> dict:
"""执行写作任务"""
return {"article": f"基于以下内容的文章: {state['content'][:100]}..."}
writing_graph = StateGraph(WritingState)
writing_graph.add_node("write", writing_node)
writing_graph.add_edge(START, "write")
writing_graph.add_edge("write", END)
writing_subgraph = writing_graph.compile()
# ============ 主图:协调多个子图 ============
class OrchestratorState(TypedDict):
user_request: str
research_topic: str
research_result: str
final_article: str
def parse_request(state: OrchestratorState) -> dict:
"""解析用户请求"""
return {"research_topic": state["user_request"]}
def call_research_agent(state: OrchestratorState) -> dict:
"""调用研究子图"""
result = research_subgraph.invoke({
"topic": state["research_topic"]
})
return {"research_result": result["findings"]}
def call_writing_agent(state: OrchestratorState) -> dict:
"""调用写作子图"""
result = writing_subgraph.invoke({
"content": state["research_result"]
})
return {"final_article": result["article"]}
# 构建主图
main_graph = StateGraph(OrchestratorState)
main_graph.add_node("parse", parse_request)
main_graph.add_node("research_agent", call_research_agent)
main_graph.add_node("writing_agent", call_writing_agent)
main_graph.add_edge(START, "parse")
main_graph.add_edge("parse", "research_agent")
main_graph.add_edge("research_agent", "writing_agent")
main_graph.add_edge("writing_agent", END)
# 编译并运行
orchestrator = main_graph.compile()
result = orchestrator.invoke({
"user_request": "写一篇关于人工智能发展历史的文章"
})
┌─────────────────────────────────────────────────────────────────────────────┐
│ 多 Agent 协作架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Orchestrator (主图) │ │
│ │ │ │
│ │ ┌──────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ Parse │─────▶│ Research Agent │─────▶│ Writing Agent │ │ │
│ │ └──────────┘ │ (子图) │ │ (子图) │ │ │
│ │ └────────┬────────┘ └────────┬────────┘ │ │
│ │ │ │ │ │
│ └──────────────────────────────┼────────────────────────┼──────────────┘ │
│ │ │ │
│ ┌─────────────┴─────────────┐ ┌──────┴──────────┐ │
│ │ │ │ │ │
│ ▼ │ ▼ │ │
│ ┌──────────────┐ │ ┌──────────────┐ │ │
│ │ Research │ │ │ Write │ │ │
│ │ Node │ │ │ Node │ │ │
│ └──────────────┘ │ └──────────────┘ │ │
│ │ │ │ │ │
│ ▼ │ ▼ │ │
│ 研究结果 ────────────────────────────────▶ 最终文章 │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
8. 🛠️ 实战案例
8.1 案例一:智能客服 Agent
from langchain.agents import create_agent, AgentState
from langchain.tools import tool, ToolRuntime
from langchain_openai import ChatOpenAI
from dataclasses import dataclass
# 定义用户上下文
@dataclass
class CustomerContext:
customer_id: str
tier: str # "standard", "premium", "vip"
# 模拟客户数据库
CUSTOMER_DB = {
"C001": {"name": "张三", "orders": ["ORD001", "ORD002"], "balance": 1500},
"C002": {"name": "李四", "orders": ["ORD003"], "balance": 300},
}
ORDER_DB = {
"ORD001": {"product": "笔记本电脑", "status": "已发货", "amount": 5999},
"ORD002": {"product": "无线鼠标", "status": "待发货", "amount": 199},
"ORD003": {"product": "键盘", "status": "已完成", "amount": 399},
}
# 定义客服工具
@tool
def get_customer_info(runtime: ToolRuntime[CustomerContext]) -> str:
"""获取当前客户的账户信息"""
customer_id = runtime.context.customer_id
customer = CUSTOMER_DB.get(customer_id)
if customer:
return f"""
客户信息:
- 姓名:{customer['name']}
- 账户余额:{customer['balance']} 元
- 订单数量:{len(customer['orders'])} 个
- 会员等级:{runtime.context.tier}
"""
return "未找到客户信息"
@tool
def query_order_status(order_id: str) -> str:
"""查询订单状态
Args:
order_id: 订单编号,如 ORD001
"""
order = ORDER_DB.get(order_id)
if order:
return f"订单 {order_id}:{order['product']},状态:{order['status']},金额:{order['amount']} 元"
return f"未找到订单 {order_id}"
@tool
def list_recent_orders(runtime: ToolRuntime[CustomerContext]) -> str:
"""列出客户最近的订单"""
customer_id = runtime.context.customer_id
customer = CUSTOMER_DB.get(customer_id)
if not customer:
return "未找到客户信息"
orders_info = []
for order_id in customer["orders"]:
order = ORDER_DB.get(order_id)
if order:
orders_info.append(f"- {order_id}: {order['product']} ({order['status']})")
return "最近订单:\n" + "\n".join(orders_info) if orders_info else "暂无订单"
@tool
def request_refund(order_id: str, reason: str) -> str:
"""申请退款
Args:
order_id: 要退款的订单编号
reason: 退款原因
"""
order = ORDER_DB.get(order_id)
if not order:
return f"订单 {order_id} 不存在"
if order["status"] == "已完成":
return f"退款申请已提交。订单 {order_id}({order['product']}),退款原因:{reason}。预计 3-5 个工作日处理。"
elif order["status"] == "已发货":
return f"订单 {order_id} 正在配送中,请先拒收后再申请退款。"
else:
return f"订单 {order_id} 尚未发货,已直接取消,款项将原路退回。"
# 创建客服 Agent
customer_service_agent = create_agent(
model=ChatOpenAI(model="gpt-4o", temperature=0.3),
tools=[get_customer_info, query_order_status, list_recent_orders, request_refund],
system_prompt="""你是一名专业的电商客服助手。请遵循以下原则:
1. 始终保持礼貌和专业
2. 优先查询客户信息以提供个性化服务
3. 对于退款等敏感操作,先确认客户意图
4. VIP 客户享有优先处理权
5. 如果无法解决问题,建议转人工客服
当前日期:2024年12月15日
""",
context_schema=CustomerContext
)
# 使用示例
result = customer_service_agent.invoke(
{"messages": [{"role": "user", "content": "我想查一下我最近的订单状态,还有那个键盘能退款吗?"}]},
context=CustomerContext(customer_id="C002", tier="premium")
)
print(result["messages"][-1].content)
8.2 案例二:多步骤数据分析工作流
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from typing import Annotated
from typing_extensions import TypedDict
import pandas as pd
import json
class AnalysisState(TypedDict):
# 输入
raw_data: str
analysis_goal: str
# 处理过程
cleaned_data: str
statistics: str
insights: Annotated[list[str], lambda x, y: (x or []) + y]
# 输出
final_report: str
visualizations: list[str]
model = ChatOpenAI(model="gpt-4o", temperature=0)
# 节点 1:数据清洗
def clean_data(state: AnalysisState) -> dict:
"""清洗和预处理数据"""
prompt = ChatPromptTemplate.from_template("""
你是一个数据工程师。请分析以下原始数据,识别并处理:
1. 缺失值
2. 异常值
3. 数据类型问题
4. 重复记录
原始数据:
{raw_data}
请返回清洗后的数据摘要和处理说明。
""")
chain = prompt | model
result = chain.invoke({"raw_data": state["raw_data"]})
return {"cleaned_data": result.content}
# 节点 2:统计分析
def compute_statistics(state: AnalysisState) -> dict:
"""计算描述性统计"""
prompt = ChatPromptTemplate.from_template("""
基于清洗后的数据,计算以下统计指标:
1. 基本描述统计(均值、中位数、标准差等)
2. 分布特征
3. 相关性分析
清洗后数据:
{cleaned_data}
分析目标:{goal}
请提供详细的统计结果。
""")
chain = prompt | model
result = chain.invoke({
"cleaned_data": state["cleaned_data"],
"goal": state["analysis_goal"]
})
return {"statistics": result.content}
# 节点 3:洞察提取
def extract_insights(state: AnalysisState) -> dict:
"""从数据中提取关键洞察"""
prompt = ChatPromptTemplate.from_template("""
基于统计分析结果,提取 3-5 个关键业务洞察。
统计结果:
{statistics}
分析目标:{goal}
请用简洁的语言描述每个洞察,并说明其业务意义。
""")
chain = prompt | model
result = chain.invoke({
"statistics": state["statistics"],
"goal": state["analysis_goal"]
})
# 解析洞察列表
insights = [line.strip() for line in result.content.split("\n") if line.strip()]
return {"insights": insights}
# 节点 4:生成报告
def generate_report(state: AnalysisState) -> dict:
"""生成最终分析报告"""
prompt = ChatPromptTemplate.from_template("""
请生成一份专业的数据分析报告,包含以下部分:
## 1. 执行摘要
## 2. 数据概述
## 3. 分析方法
## 4. 主要发现
## 5. 建议措施
## 6. 附录:详细统计
数据清洗结果:
{cleaned_data}
统计分析:
{statistics}
关键洞察:
{insights}
分析目标:{goal}
""")
chain = prompt | model
result = chain.invoke({
"cleaned_data": state["cleaned_data"],
"statistics": state["statistics"],
"insights": "\n".join(state["insights"]),
"goal": state["analysis_goal"]
})
return {"final_report": result.content}
# 节点 5:生成可视化建议
def suggest_visualizations(state: AnalysisState) -> dict:
"""建议数据可视化方案"""
prompt = ChatPromptTemplate.from_template("""
基于分析结果,建议 3 种最适合的数据可视化方案。
对于每种可视化,请说明:
1. 图表类型
2. 使用的数据字段
3. 能够展示的洞察
统计结果:
{statistics}
关键洞察:
{insights}
""")
chain = prompt | model
result = chain.invoke({
"statistics": state["statistics"],
"insights": "\n".join(state["insights"])
})
return {"visualizations": [result.content]}
# 构建分析工作流图
analysis_graph = StateGraph(AnalysisState)
# 添加节点
analysis_graph.add_node("clean", clean_data)
analysis_graph.add_node("statistics", compute_statistics)
analysis_graph.add_node("insights", extract_insights)
analysis_graph.add_node("report", generate_report)
analysis_graph.add_node("visualize", suggest_visualizations)
# 添加边(定义执行顺序)
analysis_graph.add_edge(START, "clean")
analysis_graph.add_edge("clean", "statistics")
analysis_graph.add_edge("statistics", "insights")
analysis_graph.add_edge("insights", "report")
analysis_graph.add_edge("insights", "visualize") # 并行执行
analysis_graph.add_edge("report", END)
analysis_graph.add_edge("visualize", END)
# 编译图
analysis_pipeline = analysis_graph.compile()
# 使用示例
sample_data = """
日期,销售额,客户数,退货率
2024-01,150000,320,0.05
2024-02,180000,380,0.04
2024-03,165000,350,0.06
2024-04,null,290,0.08
2024-05,210000,420,0.03
2024-06,195000,400,0.04
"""
result = analysis_pipeline.invoke({
"raw_data": sample_data,
"analysis_goal": "分析销售趋势并识别改进机会",
"insights": []
})
print("=== 最终报告 ===")
print(result["final_report"])
8.3 案例三:多 Agent 协作系统
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from typing import Annotated, Literal
from typing_extensions import TypedDict
from operator import add
# ============ 定义共享状态 ============
class TeamState(TypedDict):
task: str
current_agent: str
research_notes: Annotated[list[str], add]
code_snippets: Annotated[list[str], add]
review_comments: Annotated[list[str], add]
final_output: str
iteration_count: int
model = ChatOpenAI(model="gpt-4o", temperature=0.7)
# ============ 研究员 Agent ============
@tool
def web_search(query: str) -> str:
"""搜索网络获取信息"""
return f"搜索结果: 关于 '{query}' 的相关信息..."
@tool
def read_documentation(topic: str) -> str:
"""阅读技术文档"""
return f"文档内容: {topic} 的详细说明..."
def researcher_agent(state: TeamState) -> dict:
"""研究员:负责收集和整理信息"""
from langchain.agents import create_agent
agent = create_agent(
model=model,
tools=[web_search, read_documentation],
system_prompt="""你是团队的研究员。你的职责是:
1. 理解任务需求
2. 搜索相关资料
3. 整理关键信息供开发者使用
当前任务:{task}
请收集相关信息。"""
)
result = agent.invoke({
"messages": [{"role": "user", "content": f"研究任务:{state['task']}"}]
})
return {
"research_notes": [result["messages"][-1].content],
"current_agent": "researcher"
}
# ============ 开发者 Agent ============
@tool
def write_code(description: str, language: str = "python") -> str:
"""编写代码"""
return f"```{language}\n# 根据 {description} 编写的代码\ndef solution():\n pass\n```"
@tool
def run_tests(code: str) -> str:
"""运行测试"""
return "测试结果: 全部通过 ✓"
def developer_agent(state: TeamState) -> dict:
"""开发者:负责编写代码"""
from langchain.agents import create_agent
agent = create_agent(
model=model,
tools=[write_code, run_tests],
system_prompt="""你是团队的开发者。你的职责是:
1. 根据研究结果编写代码
2. 确保代码质量和可测试性
3. 处理代码审查反馈
研究笔记:
{research_notes}
请基于这些信息编写解决方案。"""
)
result = agent.invoke({
"messages": [{"role": "user", "content": f"开发任务:{state['task']}\n\n研究笔记:{state['research_notes']}"}]
})
return {
"code_snippets": [result["messages"][-1].content],
"current_agent": "developer"
}
# ============ 审查员 Agent ============
def reviewer_agent(state: TeamState) -> dict:
"""审查员:负责代码审查"""
from langchain.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("""
你是团队的代码审查员。请审查以下代码:
代码:
{code}
请检查:
1. 代码逻辑是否正确
2. 是否有潜在的 bug
3. 代码风格和可读性
4. 性能问题
如果代码质量良好,回复 "APPROVED"。
如果需要修改,提供具体建议。
""")
chain = prompt | model
result = chain.invoke({"code": "\n".join(state["code_snippets"])})
content = result.content
is_approved = "APPROVED" in content.upper()
return {
"review_comments": [content],
"current_agent": "reviewer",
"iteration_count": state["iteration_count"] + 1
}
# ============ 协调器:决定下一步 ============
def coordinator(state: TeamState) -> Command[Literal["researcher", "developer", "reviewer", "finalize"]]:
"""协调各个 Agent 的工作流程"""
current = state.get("current_agent", "")
iterations = state.get("iteration_count", 0)
# 流程逻辑
if not current:
# 开始:先研究
return Command(goto="researcher")
elif current == "researcher":
# 研究完成,开始开发
return Command(goto="developer")
elif current == "developer":
# 开发完成,进行审查
return Command(goto="reviewer")
elif current == "reviewer":
# 检查审查结果
last_review = state["review_comments"][-1] if state["review_comments"] else ""
if "APPROVED" in last_review.upper() or iterations >= 3:
# 审查通过或达到最大迭代次数
return Command(goto="finalize")
else:
# 需要修改,返回开发
return Command(goto="developer")
return Command(goto="finalize")
# ============ 最终整合 ============
def finalize(state: TeamState) -> dict:
"""整合所有工作成果"""
output = f"""
# 任务完成报告
## 原始任务
{state['task']}
## 研究成果
{chr(10).join(state['research_notes'])}
## 最终代码
{chr(10).join(state['code_snippets'])}
## 审查意见
{chr(10).join(state['review_comments'])}
## 迭代次数
{state['iteration_count']}
"""
return {"final_output": output}
# ============ 构建协作图 ============
team_graph = StateGraph(TeamState)
# 添加所有 Agent 节点
team_graph.add_node("coordinator", coordinator)
team_graph.add_node("researcher", researcher_agent)
team_graph.add_node("developer", developer_agent)
team_graph.add_node("reviewer", reviewer_agent)
team_graph.add_node("finalize", finalize)
# 从 START 进入协调器
team_graph.add_edge(START, "coordinator")
# 从各个 Agent 返回协调器
team_graph.add_edge("researcher", "coordinator")
team_graph.add_edge("developer", "coordinator")
team_graph.add_edge("reviewer", "coordinator")
# 最终输出
team_graph.add_edge("finalize", END)
# 编译
team = team_graph.compile()
# 运行
result = team.invoke({
"task": "实现一个 Python 函数,计算斐波那契数列的第 n 项",
"research_notes": [],
"code_snippets": [],
"review_comments": [],
"final_output": "",
"iteration_count": 0
})
print(result["final_output"])
┌─────────────────────────────────────────────────────────────────────────────┐
│ 多 Agent 协作流程图 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ START │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ ┌────────▶│ Coordinator │◀────────┐ │
│ │ │ (协调决策) │ │ │
│ │ └────────┬─────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────┼─────────────┐ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ │Researcher│ │Developer │ │Reviewer │ │
│ │ │ (研究) │ │ (开发) │ │ (审查) │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │ │
│ └──────┴────────────┴────────────┴─────┘ │
│ │ │
│ │ (完成条件满足) │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Finalize │ │
│ │ (最终整合) │ │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ END │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
9. 🎯 LangChain vs LangGraph:如何选择?
决策流程图
┌─────────────────────────────────────────────────────────────────────────────┐
│ 框架选择决策树 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 你的需求是什么? │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ 简单对话 │ │ 工具调用 │ │ 复杂工作流 │ │
│ │ 应用 │ │ Agent │ │ 系统 │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ ▼ │ │ │
│ ┌──────────────┐ │ │ │
│ │ LangChain │ │ │ │
│ │ Chain │ │ │ │
│ └──────────────┘ │ │ │
│ │ │ │
│ 需要人工干预? │ │
│ │ │ │
│ ┌─────────┼─────────┐ │ │
│ │ │ │ │ │
│ No │ Yes │ │
│ │ │ │ │ │
│ ▼ │ ▼ │ │
│ ┌──────────────┐ │ ┌──────────────┐ │ │
│ │ LangChain │ │ │ LangGraph │ │ │
│ │ create_agent │ │ │ + Interrupt │ │ │
│ └──────────────┘ │ └──────────────┘ │ │
│ │ │ │
│ 需要故障恢复? │ │
│ │ ▼ │
│ ┌─────────┼─────────┐ ┌──────────────┐ │
│ │ │ │ │ LangGraph │ │
│ No │ Yes │ (推荐) │ │
│ │ │ │ └──────────────┘ │
│ ▼ │ ▼ │
│ ┌──────────────┐ │ ┌──────────────┐ │
│ │ LangChain │ │ │ LangGraph │ │
│ │ create_agent │ │ │ + Checkpoint │ │
│ └──────────────┘ │ └──────────────┘ │
│ │ │
└─────────────────────────────────────────────────────────────────────────────┘
对比总结表
| 维度 | LangChain (create_agent) | LangGraph |
|---|---|---|
| 学习曲线 | 简单,快速上手 | 较陡,需理解图论概念 |
| 适用场景 | 简单 Agent、快速原型 | 复杂工作流、生产系统 |
| 控制粒度 | 高层抽象 | 细粒度控制 |
| 状态管理 | 自动处理 | 显式定义 Schema |
| 执行流程 | 固定 ReAct 循环 | 自定义图结构 |
| 人工干预 | 有限支持 | 完整 HITL 支持 |
| 故障恢复 | 需自行实现 | 内置检查点 |
| 并行执行 | 不支持 | 原生支持 |
| 可视化 | 有限 | 图形化调试 |
💡 建议
- 初学者/快速原型:从 LangChain
create_agent开始 - 需要人工审批:使用 LangGraph + interrupt
- 多步骤工作流:使用 LangGraph StateGraph
- 多 Agent 系统:使用 LangGraph 子图
- 生产部署:LangGraph + 持久化检查点 + LangSmith
10. 🔮 最佳实践与性能优化
10.1 状态设计最佳实践
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
# ✅ 好的状态设计
class WellDesignedState(TypedDict):
# 1. 使用明确的类型注解
messages: Annotated[list, add_messages]
# 2. 分离输入、处理、输出字段
user_input: str # 输入
processed_data: dict # 中间状态
final_result: str # 输出
# 3. 包含必要的元数据
created_at: str
updated_at: str
version: int
# ❌ 避免的设计
class PoorlyDesignedState(TypedDict):
data: dict # 太宽泛,难以追踪
temp: str # 命名不清晰
10.2 递归限制与防护
from langgraph.managed import RemainingSteps
from langgraph.graph import StateGraph, START, END
from typing import Literal
class SafeState(TypedDict):
messages: list
remaining_steps: RemainingSteps # 自动追踪剩余步骤
def safe_agent_node(state: SafeState) -> dict:
"""带有递归保护的 Agent 节点"""
remaining = state["remaining_steps"]
if remaining <= 2:
# 接近限制,返回安全响应
return {"messages": [{"role": "assistant", "content": "正在总结当前进度..."}]}
# 正常处理
return {"messages": [{"role": "assistant", "content": "处理中..."}]}
def safe_router(state: SafeState) -> Literal["continue", "finalize"]:
"""安全的路由决策"""
if state["remaining_steps"] <= 2:
return "finalize"
return "continue"
# 设置递归限制
result = graph.invoke(
{"messages": []},
config={"recursion_limit": 25} # 明确设置限制
)
10.3 流式输出优化
# 使用 stream_mode 获取实时更新
for event in graph.stream(
{"messages": [{"role": "user", "content": "分析数据"}]},
stream_mode="updates" # 只返回增量更新
):
node_name = list(event.keys())[0]
node_output = event[node_name]
print(f"[{node_name}] {node_output}")
# 在工具中使用 stream_writer
@tool
def long_running_task(runtime: ToolRuntime) -> str:
"""长时间运行的任务,实时反馈进度"""
writer = runtime.stream_writer
writer("开始处理...")
# 模拟耗时操作
for i in range(5):
time.sleep(1)
writer(f"进度: {(i+1)*20}%")
return "任务完成"
10.4 节点缓存
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy
import time
class State(TypedDict):
query: str
result: str
def expensive_computation(state: State) -> dict:
"""模拟耗时计算"""
time.sleep(3) # 耗时操作
return {"result": f"计算结果: {state['query']}"}
# 配置缓存策略
builder = StateGraph(State)
builder.add_node(
"compute",
expensive_computation,
cache_policy=CachePolicy(ttl=3600) # 缓存 1 小时
)
# 编译时启用缓存
graph = builder.compile(cache=InMemoryCache())
# 第一次调用:3秒
result1 = graph.invoke({"query": "test"})
# 第二次调用相同输入:立即返回(使用缓存)
result2 = graph.invoke({"query": "test"})
10.5 错误处理与重试
from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage
import time
class RetryMiddleware:
"""带重试的工具调用中间件"""
def __init__(self, max_retries: int = 3, delay: float = 1.0):
self.max_retries = max_retries
self.delay = delay
@wrap_tool_call
def __call__(self, request, handler):
last_error = None
for attempt in range(self.max_retries):
try:
return handler(request)
except Exception as e:
last_error = e
if attempt < self.max_retries - 1:
time.sleep(self.delay * (attempt + 1)) # 指数退避
continue
# 所有重试失败
return ToolMessage(
content=f"工具调用失败(重试 {self.max_retries} 次): {str(last_error)}",
tool_call_id=request.tool_call["id"]
)
agent = create_agent(
model="gpt-4o",
tools=tools,
middleware=[RetryMiddleware(max_retries=3)]
)
11. 📚 总结与展望
核心要点回顾
┌─────────────────────────────────────────────────────────────────────────────┐
│ LangChain & LangGraph 知识图谱 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ LangChain │ │
│ │ 生态系统 │ │
│ └──────┬───────┘ │
│ │ │
│ ┌───────────────────────┼───────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LangChain │ │ LangGraph │ │ LangSmith │ │
│ │ (组件库) │ │ (编排引擎) │ │ (可观测性) │ │
│ └──────┬──────┘ └──────┬──────┘ └─────────────┘ │
│ │ │ │
│ ┌──────┴──────┐ ┌──────┴──────┐ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────────┐ │
│ │Models│ │Tools │ │State │ │Checkpoint│ │
│ │Chain │ │Agent │ │Nodes │ │Interrupt │ │
│ │Memory│ │LCEL │ │Edges │ │Subgraph │ │
│ └──────┘ └──────┘ └──────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
学习路径建议
-
入门阶段
- 掌握 LangChain 基础组件(Models, Messages, Tools)
- 学会使用
create_agent构建简单 Agent - 理解 LCEL 表达式语言
-
进阶阶段
- 深入 LangGraph 状态机概念
- 实践条件边和动态路由
- 掌握 Human-in-the-Loop 模式
-
高级阶段
- 设计多 Agent 协作系统
- 实现持久化和故障恢复
- 优化生产环境性能
未来展望
随着 AI Agent 技术的快速发展,LangChain 和 LangGraph 也在不断演进:
- 更强大的编排能力:支持更复杂的工作流模式
- 更好的可观测性:与 LangSmith 更深度集成
- 更丰富的集成:支持更多 LLM 提供商和工具
- 更低的延迟:优化执行引擎性能
- 更强的安全性:内置更多安全防护机制
12. 📖 参考文献
官方文档
-
LangChain Documentation
- 主站: https://docs.langchain.com/oss/python/langchain/overview
- Agents: https://docs.langchain.com/oss/python/langchain/agents
- Tools: https://docs.langchain.com/oss/python/langchain/tools
-
LangGraph Documentation
- 主站: https://docs.langchain.com/oss/python/langgraph/overview
- Graph API: https://docs.langchain.com/oss/python/langgraph/graph-api
- Concepts: https://docs.langchain.com/oss/python/langgraph/concepts
-
LangSmith Documentation
- 主站: https://docs.langchain.com/langsmith/home
GitHub 仓库
-
LangChain
- https://github.com/langchain-ai/langchain
- 描述:🦜🔗 The platform for reliable agents
-
LangGraph
- https://github.com/langchain-ai/langgraph
- 描述:Build resilient language agents as graphs
API 参考
-
LangChain API Reference
- https://reference.langchain.com/python/langchain
-
LangGraph API Reference
- https://reference.langchain.com/python/langgraph
学习资源
-
LangChain Academy
- https://academy.langchain.com/
- 免费官方课程
-
LangChain Forum
- https://forum.langchain.com/
- 社区讨论
学术参考
-
ReAct: Synergizing Reasoning and Acting in Language Models
- Yao et al., 2022
- https://arxiv.org/abs/2210.03629
-
Pregel: A System for Large-Scale Graph Processing
- Google Research
- https://research.google/pubs/pub37252/
📝 作者注:本文基于 LangChain 和 LangGraph 2024-2025 年最新版本编写。由于框架更新较快,建议读者参考官方文档获取最新 API 变更。
🔗 系列导航:本文是【Agents篇】系列第 11 篇,更多内容请关注系列其他文章。
💬 反馈交流:欢迎在评论区留言讨论,一起探索 AI Agent 开发的无限可能!
更多推荐
所有评论(0)