🚀 从组件到状态机,全面掌握 AI Agent 开发的核心框架


📑 目录


1. 🌟 引言:为什么需要 LangChain 和 LangGraph?

在这里插入图片描述

在 AI Agent 开发的浪潮中,如何高效地构建可靠、可扩展的智能应用成为了开发者面临的核心挑战。LangChainLangGraph 作为 LangChain 公司推出的两大核心框架,为开发者提供了从快速原型到生产部署的完整解决方案。

💡 思考:传统 LLM 应用开发面临哪些痛点?

在没有统一框架之前,开发者需要:

  1. 手动处理与多个 LLM 提供商的 API 集成
  2. 自行实现对话历史管理和上下文窗口控制
  3. 从零构建工具调用和结果处理逻辑
  4. 缺乏标准化的 Agent 执行流程
  5. 难以实现复杂的多步骤工作流和状态管理

🤔 解答:LangChain 和 LangGraph 如何解决这些问题?

挑战 LangChain 解决方案 LangGraph 增强能力
多模型集成 统一的 Chat Model 接口 动态模型选择
工具调用 @tool 装饰器 + 自动 Schema 工具状态追踪
对话管理 Messages + Memory 持久化检查点
复杂工作流 Chain 链式调用 图状态机编排
人工干预 基础回调支持 Human-in-the-Loop
错误恢复 简单重试 持久化执行恢复
┌─────────────────────────────────────────────────────────────────┐
│                    LangChain 生态系统全景图                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐        │
│   │  LangChain  │───▶│  LangGraph  │───▶│  LangSmith  │        │
│   │   (高层API)  │    │  (编排引擎)  │    │  (可观测性)  │        │
│   └─────────────┘    └─────────────┘    └─────────────┘        │
│          │                   │                  │               │
│          ▼                   ▼                  ▼               │
│   ┌─────────────────────────────────────────────────────┐      │
│   │              Integrations (100+ 集成)                │      │
│   │  OpenAI │ Anthropic │ Google │ Azure │ HuggingFace  │      │
│   └─────────────────────────────────────────────────────┘      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2. 🏗️ LangChain 整体架构

2.1 核心设计理念

LangChain 的设计遵循几个核心原则:

  1. 模块化(Modularity):每个组件都可以独立使用,也可以组合使用
  2. 可组合性(Composability):通过 LCEL 表达式语言实现灵活的组件组合
  3. 互换性(Interoperability):统一接口让不同提供商的模型可以无缝切换
  4. 可观测性(Observability):与 LangSmith 深度集成,提供完整的调试和监控能力

2.2 模块化架构图

┌────────────────────────────────────────────────────────────────────────────┐
│                           LangChain 核心架构                                │
├────────────────────────────────────────────────────────────────────────────┤
│                                                                            │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                        Application Layer (应用层)                     │  │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                │  │
│  │  │    Agents    │  │    Chains    │  │   Workflows  │                │  │
│  │  │   (智能体)    │  │   (链式调用)  │  │   (工作流)   │                │  │
│  │  └──────────────┘  └──────────────┘  └──────────────┘                │  │
│  └──────────────────────────────────────────────────────────────────────┘  │
│                                    │                                       │
│                                    ▼                                       │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                      Component Layer (组件层)                         │  │
│  │                                                                       │  │
│  │   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐   │  │
│  │   │ Models  │  │Messages │  │  Tools  │  │ Prompts │  │ Memory  │   │  │
│  │   │  模型   │  │  消息   │  │  工具   │  │ 提示词  │  │  记忆   │   │  │
│  │   └─────────┘  └─────────┘  └─────────┘  └─────────┘  └─────────┘   │  │
│  │                                                                       │  │
│  └──────────────────────────────────────────────────────────────────────┘  │
│                                    │                                       │
│                                    ▼                                       │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                    Integration Layer (集成层)                         │  │
│  │                                                                       │  │
│  │   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐   │  │
│  │   │ OpenAI  │  │Anthropic│  │ Google  │  │  Azure  │  │ Bedrock │   │  │
│  │   └─────────┘  └─────────┘  └─────────┘  └─────────┘  └─────────┘   │  │
│  │                                                                       │  │
│  └──────────────────────────────────────────────────────────────────────┘  │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

3. 🔧 LangChain 核心组件详解

3.1 Models(模型层)

Models 是 LangChain 的基础,提供了统一的接口来访问各种 LLM 提供商。

3.1.1 初始化模型
from langchain.chat_models import init_chat_model

# 方式一:使用 init_chat_model(推荐)
model = init_chat_model(
    "claude-sonnet-4-5-20250929",  # 模型名称
    temperature=0.7,               # 温度参数
    max_tokens=1000,               # 最大 token 数
    timeout=30                     # 超时时间
)

# 方式二:使用 "provider:model" 格式
model = init_chat_model("openai:gpt-4o")

# 方式三:直接使用提供商类
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
    model="gpt-4o",
    temperature=0.1,
    max_tokens=1000
)
3.1.2 模型调用方式
# 1. 同步调用 - invoke
response = model.invoke("为什么天空是蓝色的?")
print(response.content)

# 2. 流式输出 - stream
for chunk in model.stream("讲一个关于 AI 的故事"):
    print(chunk.content, end="", flush=True)

# 3. 批量处理 - batch
responses = model.batch([
    "什么是机器学习?",
    "什么是深度学习?",
    "什么是强化学习?"
])
3.1.3 模型能力探测
# 获取模型能力配置
profile = model.profile
print(f"最大输入 tokens: {profile.get('max_input_tokens')}")
print(f"支持图像输入: {profile.get('image_inputs')}")
print(f"支持工具调用: {profile.get('tool_calling')}")
print(f"支持推理输出: {profile.get('reasoning_output')}")

💡 思考:如何在运行时动态选择模型?

在实际应用中,我们可能需要根据任务复杂度、成本考虑或用户等级来动态选择模型。

🤔 解答:使用 Middleware 实现动态模型路由

from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse

# 定义不同级别的模型
basic_model = ChatOpenAI(model="gpt-4.1-mini")
advanced_model = ChatOpenAI(model="gpt-4.1")

@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
    """根据对话复杂度动态选择模型"""
    message_count = len(request.state["messages"])
    
    # 对话轮次超过 10 轮,使用高级模型
    if message_count > 10:
        model = advanced_model
    else:
        model = basic_model
    
    return handler(request.override(model=model))

# 创建使用动态模型选择的 Agent
agent = create_agent(
    model=basic_model,
    tools=tools,
    middleware=[dynamic_model_selection]
)

3.2 Messages(消息系统)

LangChain 的消息系统是与 LLM 交互的核心抽象。

from langchain.messages import (
    HumanMessage,    # 用户消息
    AIMessage,       # AI 回复
    SystemMessage,   # 系统提示
    ToolMessage      # 工具调用结果
)

# 构建对话历史
conversation = [
    SystemMessage(content="你是一个专业的技术助手。"),
    HumanMessage(content="什么是 LangChain?"),
    AIMessage(content="LangChain 是一个用于构建 LLM 应用的框架..."),
    HumanMessage(content="它和 LangGraph 有什么区别?")
]

# 调用模型
response = model.invoke(conversation)
消息类型对照表
消息类型 角色标识 用途
SystemMessage system 设置 AI 的行为和角色
HumanMessage user 用户输入
AIMessage assistant AI 的回复
ToolMessage tool 工具执行结果

3.3 Tools(工具系统)

Tools 赋予 Agent 与外部世界交互的能力。

3.3.1 基础工具定义
from langchain.tools import tool

@tool
def search_database(query: str, limit: int = 10) -> str:
    """在客户数据库中搜索匹配的记录。
    
    Args:
        query: 搜索关键词
        limit: 返回结果的最大数量
    """
    # 实际的数据库查询逻辑
    return f"找到 {limit} 条匹配 '{query}' 的记录"

@tool
def get_weather(location: str) -> str:
    """获取指定城市的天气信息。
    
    Args:
        location: 城市名称
    """
    # 实际的天气 API 调用
    return f"{location}:晴天,25°C"

# 将工具绑定到模型
model_with_tools = model.bind_tools([search_database, get_weather])
3.3.2 高级工具:访问运行时上下文
from langchain.tools import tool, ToolRuntime
from langgraph.types import Command

@tool
def get_user_account(runtime: ToolRuntime) -> str:
    """获取当前用户的账户信息。"""
    # 访问运行时上下文
    user_id = runtime.context.user_id
    
    # 访问当前状态
    messages = runtime.state["messages"]
    
    # 访问持久化存储
    user_info = runtime.store.get(("users",), user_id)
    
    return f"用户 {user_id} 的账户信息: {user_info}"

@tool
def update_conversation_topic(topic: str, runtime: ToolRuntime) -> Command:
    """更新对话主题并修改 Agent 状态。"""
    return Command(
        update={"current_topic": topic},  # 更新状态
        goto="topic_handler"               # 路由到特定节点
    )
3.3.3 工具错误处理
from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage

@wrap_tool_call
def handle_tool_errors(request, handler):
    """统一处理工具执行错误"""
    try:
        return handler(request)
    except Exception as e:
        # 返回自定义错误消息给模型
        return ToolMessage(
            content=f"工具执行失败: {str(e)}。请检查参数后重试。",
            tool_call_id=request.tool_call["id"]
        )

agent = create_agent(
    model="gpt-4o",
    tools=[search_database, get_weather],
    middleware=[handle_tool_errors]
)

3.4 Prompts(提示词模板)

3.4.1 基础提示词模板
from langchain.prompts import ChatPromptTemplate

# 创建提示词模板
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个{role},请用{language}回答问题。"),
    ("human", "{question}")
])

# 格式化模板
messages = prompt.format_messages(
    role="专业程序员",
    language="中文",
    question="如何学习 Python?"
)

response = model.invoke(messages)
3.4.2 动态系统提示
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest

@dynamic_prompt
def user_role_prompt(request: ModelRequest) -> str:
    """根据用户角色动态生成系统提示"""
    user_role = request.runtime.context.get("user_role", "user")
    
    base_prompt = "你是一个专业的技术助手。"
    
    if user_role == "expert":
        return f"{base_prompt}请提供详细的技术分析和代码示例。"
    elif user_role == "beginner":
        return f"{base_prompt}请用简单易懂的语言解释,避免使用专业术语。"
    
    return base_prompt

agent = create_agent(
    model="gpt-4o",
    tools=tools,
    middleware=[user_role_prompt]
)

3.5 Memory(记忆系统)

记忆系统分为短期记忆(会话内)和长期记忆(跨会话)。

3.5.1 短期记忆:自定义状态
from langchain.agents import AgentState, create_agent

# 定义扩展状态
class CustomState(AgentState):
    user_preferences: dict
    conversation_summary: str
    interaction_count: int

agent = create_agent(
    model="gpt-4o",
    tools=tools,
    state_schema=CustomState
)

# 调用时传入初始状态
result = agent.invoke({
    "messages": [{"role": "user", "content": "你好"}],
    "user_preferences": {"language": "zh-CN", "verbosity": "detailed"},
    "conversation_summary": "",
    "interaction_count": 0
})
3.5.2 长期记忆:使用 Store
from langgraph.store.memory import InMemoryStore
from langchain.tools import tool, ToolRuntime

# 创建持久化存储
store = InMemoryStore()

@tool
def remember_user_preference(key: str, value: str, runtime: ToolRuntime) -> str:
    """记住用户的偏好设置"""
    user_id = runtime.context.user_id
    runtime.store.put(("preferences", user_id), key, {"value": value})
    return f"已保存偏好: {key}={value}"

@tool
def recall_user_preference(key: str, runtime: ToolRuntime) -> str:
    """回忆用户的偏好设置"""
    user_id = runtime.context.user_id
    pref = runtime.store.get(("preferences", user_id), key)
    return pref.value["value"] if pref else "未找到该偏好设置"

agent = create_agent(
    model="gpt-4o",
    tools=[remember_user_preference, recall_user_preference],
    store=store
)

4. ⛓️ Chain:链式调用的艺术

4.1 什么是 Chain?

Chain 是 LangChain 中最基础的抽象之一,它将多个组件串联起来,形成一个处理流水线。

┌────────────────────────────────────────────────────────────────┐
│                        Chain 执行流程                           │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│   Input ──▶ [Prompt Template] ──▶ [Model] ──▶ [Output Parser]  │
│                                                                │
│     │              │               │               │           │
│     ▼              ▼               ▼               ▼           │
│   用户问题    格式化提示词      LLM 生成        结构化输出       │
│                                                                │
└────────────────────────────────────────────────────────────────┘

4.2 LCEL 表达式语言

LCEL(LangChain Expression Language)是 LangChain 的核心表达式语言,使用 | 管道符连接组件。

from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser

# 定义组件
prompt = ChatPromptTemplate.from_template(
    "用一句话解释什么是 {concept}"
)
model = ChatOpenAI(model="gpt-4o")
output_parser = StrOutputParser()

# 使用 LCEL 组合成 Chain
chain = prompt | model | output_parser

# 调用 Chain
result = chain.invoke({"concept": "机器学习"})
print(result)  # "机器学习是让计算机从数据中自动学习模式的技术。"
LCEL 的核心优势
# 1. 支持流式输出
for chunk in chain.stream({"concept": "深度学习"}):
    print(chunk, end="", flush=True)

# 2. 支持异步调用
import asyncio

async def async_call():
    result = await chain.ainvoke({"concept": "神经网络"})
    return result

# 3. 支持批量处理
results = chain.batch([
    {"concept": "CNN"},
    {"concept": "RNN"},
    {"concept": "Transformer"}
])

# 4. 支持并行执行
from langchain.schema.runnable import RunnableParallel

parallel_chain = RunnableParallel(
    explanation=chain,
    example=prompt | model | StrOutputParser()
)

4.3 常用 Chain 类型

4.3.1 检索增强生成(RAG)Chain
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema.runnable import RunnablePassthrough

# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_texts(
    texts=["LangChain 是一个 LLM 框架", "LangGraph 用于构建状态机"],
    embedding=embeddings
)
retriever = vectorstore.as_retriever()

# RAG 提示词模板
rag_prompt = ChatPromptTemplate.from_template("""
基于以下上下文回答问题:

上下文:
{context}

问题:{question}

答案:
""")

# 构建 RAG Chain
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | rag_prompt
    | ChatOpenAI(model="gpt-4o")
    | StrOutputParser()
)

# 调用
answer = rag_chain.invoke("LangChain 是什么?")
4.3.2 路由 Chain
from langchain.schema.runnable import RunnableBranch

# 定义不同的处理链
math_chain = ChatPromptTemplate.from_template(
    "你是数学专家。解答:{question}"
) | model | StrOutputParser()

code_chain = ChatPromptTemplate.from_template(
    "你是编程专家。解答:{question}"
) | model | StrOutputParser()

general_chain = ChatPromptTemplate.from_template(
    "回答:{question}"
) | model | StrOutputParser()

# 分类函数
def classify_question(input_dict):
    question = input_dict["question"].lower()
    if any(word in question for word in ["计算", "数学", "公式"]):
        return "math"
    elif any(word in question for word in ["代码", "编程", "函数"]):
        return "code"
    return "general"

# 构建路由 Chain
router_chain = RunnableBranch(
    (lambda x: classify_question(x) == "math", math_chain),
    (lambda x: classify_question(x) == "code", code_chain),
    general_chain  # 默认分支
)

5. 🤖 Agent:智能体的灵魂

5.1 Agent 核心概念

Agent 是 LangChain 中最强大的抽象,它将 LLM 与工具结合,形成能够自主决策和执行任务的智能体。

┌─────────────────────────────────────────────────────────────────────┐
│                         Agent 工作原理                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│                      ┌─────────────────┐                            │
│                      │    用户输入      │                            │
│                      └────────┬────────┘                            │
│                               │                                     │
│                               ▼                                     │
│   ┌───────────────────────────────────────────────────────────┐    │
│   │                      Agent 循环                            │    │
│   │  ┌─────────────────────────────────────────────────────┐  │    │
│   │  │                                                      │  │    │
│   │  │   ┌─────────┐     ┌─────────┐     ┌─────────────┐   │  │    │
│   │  │   │  思考   │────▶│  决策   │────▶│ 执行工具    │   │  │    │
│   │  │   │(Reason) │     │ (Act)   │     │ (Tool Call) │   │  │    │
│   │  │   └─────────┘     └─────────┘     └──────┬──────┘   │  │    │
│   │  │        ▲                                  │          │  │    │
│   │  │        │          ┌─────────────┐         │          │  │    │
│   │  │        └──────────│  观察结果   │◀────────┘          │  │    │
│   │  │                   │(Observation)│                    │  │    │
│   │  │                   └─────────────┘                    │  │    │
│   │  │                                                      │  │    │
│   │  └──────────────────────────────────────────────────────┘  │    │
│   │              │                                              │    │
│   │              ▼ (满足停止条件)                                │    │
│   │       ┌─────────────┐                                      │    │
│   │       │   最终答案   │                                      │    │
│   │       └─────────────┘                                      │    │
│   └───────────────────────────────────────────────────────────┘    │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

5.2 ReAct 模式详解

ReAct(Reasoning + Acting)是 Agent 的核心执行模式,它交替进行推理和行动。

ReAct 执行示例
用户问题: 查找最受欢迎的无线耳机并确认是否有货

================================ Human Message =================================
查找最受欢迎的无线耳机并确认是否有货

================================== AI Message ==================================
💭 思考: 需要先搜索当前最受欢迎的无线耳机产品

Tool Calls:
  search_products (call_abc123)
    Args:
      query: 无线耳机 热销

================================= Tool Message =================================
找到 5 款产品。热销榜单: 1. Sony WH-1000XM5, 2. AirPods Pro...

================================== AI Message ==================================
💭 思考: 找到了热销耳机,需要确认库存情况

Tool Calls:
  check_inventory (call_def456)
    Args:
      product_id: WH-1000XM5

================================= Tool Message =================================
Sony WH-1000XM5: 库存 10 件

================================== AI Message ==================================
根据我的搜索结果:

最受欢迎的无线耳机是 **Sony WH-1000XM5**,目前有 **10 件** 库存。

5.3 创建第一个 Agent

from langchain.agents import create_agent
from langchain.tools import tool

# 定义工具
@tool
def get_weather(city: str) -> str:
    """获取城市天气信息"""
    weather_data = {
        "北京": "晴天,25°C",
        "上海": "多云,28°C",
        "深圳": "雷阵雨,30°C"
    }
    return weather_data.get(city, f"未找到 {city} 的天气信息")

@tool
def search_flights(origin: str, destination: str, date: str) -> str:
    """搜索航班信息"""
    return f"从 {origin}{destination}{date} 有 5 个航班可选"

@tool
def book_hotel(city: str, checkin: str, checkout: str) -> str:
    """预订酒店"""
    return f"已在 {city} 预订酒店,入住: {checkin},退房: {checkout}"

# 创建 Agent
agent = create_agent(
    model="anthropic:claude-sonnet-4-5",
    tools=[get_weather, search_flights, book_hotel],
    system_prompt="你是一个专业的旅行规划助手。帮助用户规划行程、查询天气、预订航班和酒店。"
)

# 运行 Agent
result = agent.invoke({
    "messages": [{
        "role": "user",
        "content": "我想下周去上海出差3天,帮我查下天气和航班"
    }]
})

print(result["messages"][-1].content)

5.4 Middleware 中间件机制

Middleware 提供了在 Agent 执行的各个阶段插入自定义逻辑的能力。

┌──────────────────────────────────────────────────────────────────┐
│                      Middleware 执行流程                          │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Input ─────────────────────────────────────────────────▶ Output│
│     │                                                       ▲    │
│     ▼                                                       │    │
│  ┌──────────────┐                                           │    │
│  │ before_model │ ─ 预处理状态、注入上下文                   │    │
│  └──────┬───────┘                                           │    │
│         │                                                   │    │
│         ▼                                                   │    │
│  ┌──────────────┐                                           │    │
│  │wrap_model_call│ ─ 动态选择模型、修改工具列表              │    │
│  └──────┬───────┘                                           │    │
│         │                                                   │    │
│         ▼                                                   │    │
│  ┌──────────────┐                                           │    │
│  │  Model Call  │ ─ 实际调用 LLM                            │    │
│  └──────┬───────┘                                           │    │
│         │                                                   │    │
│         ▼                                                   │    │
│  ┌──────────────┐                                           │    │
│  │wrap_tool_call │ ─ 工具调用前后处理                        │    │
│  └──────┬───────┘                                           │    │
│         │                                                   │    │
│         ▼                                                   │    │
│  ┌──────────────┐                                           │    │
│  │ after_model  │ ─ 后处理、日志记录                         │────┘
│  └──────────────┘                                                │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘
自定义 Middleware 示例
from langchain.agents import AgentState, create_agent
from langchain.agents.middleware import AgentMiddleware, ModelRequest
from langchain.agents.middleware.types import ToolCallRequest
from typing import Any

class LoggingMiddleware(AgentMiddleware):
    """日志记录中间件"""
    
    def before_model(self, state: AgentState, runtime) -> dict[str, Any] | None:
        """模型调用前记录"""
        print(f"📥 收到消息: {state['messages'][-1].content[:50]}...")
        return None
    
    def wrap_model_call(self, request: ModelRequest, handler):
        """包装模型调用"""
        import time
        start = time.time()
        response = handler(request)
        elapsed = time.time() - start
        print(f"⏱️ 模型响应耗时: {elapsed:.2f}s")
        return response
    
    def wrap_tool_call(self, request: ToolCallRequest, handler):
        """包装工具调用"""
        tool_name = request.tool_call["name"]
        print(f"🔧 调用工具: {tool_name}")
        result = handler(request)
        print(f"✅ 工具返回: {str(result)[:100]}...")
        return result

class PermissionMiddleware(AgentMiddleware):
    """权限控制中间件"""
    
    def wrap_model_call(self, request: ModelRequest, handler):
        """根据用户权限过滤工具"""
        user_role = request.runtime.context.get("user_role", "guest")
        
        if user_role == "admin":
            # 管理员可以使用所有工具
            tools = request.tools
        else:
            # 普通用户只能使用只读工具
            tools = [t for t in request.tools if not t.name.startswith("delete_")]
        
        return handler(request.override(tools=tools))

# 组合使用多个 Middleware
agent = create_agent(
    model="gpt-4o",
    tools=tools,
    middleware=[LoggingMiddleware(), PermissionMiddleware()]
)

6. 📊 LangGraph:状态机的力量

6.1 LangGraph 核心理念

LangGraph 是 LangChain 公司推出的低级别 Agent 编排框架,专注于构建长时间运行的、有状态的工作流。

💡 思考:为什么需要 LangGraph?LangChain Agent 不够用吗?

🤔 解答:

场景 LangChain Agent LangGraph
简单工具调用 ✅ 足够 过于复杂
线性工作流 ✅ Chain 解决 可选
复杂分支逻辑 ❌ 困难 ✅ 条件边
多 Agent 协作 ❌ 需要自行实现 ✅ 子图
人工干预 ❌ 有限 ✅ 中断/恢复
故障恢复 ❌ 从头开始 ✅ 检查点恢复
长时间任务 ❌ 容易超时 ✅ 持久化执行
┌─────────────────────────────────────────────────────────────────────────────┐
│                        LangGraph vs LangChain Agent                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   LangChain Agent                        LangGraph                          │
│   ┌──────────────────┐                   ┌──────────────────────────────┐   │
│   │                  │                   │                              │   │
│   │   ┌──────────┐   │                   │  ┌──────┐    ┌──────┐       │   │
│   │   │   LLM    │◀─┐│                   │  │Node A│───▶│Node B│       │   │
│   │   └────┬─────┘  ││                   │  └──┬───┘    └──┬───┘       │   │
│   │        │        ││                   │     │    ╲      │           │   │
│   │        ▼        ││                   │     │     ╲     ▼           │   │
│   │   ┌──────────┐  ││                   │     │      ╲ ┌──────┐       │   │
│   │   │   Tool   │──┘│                   │     │       ▶│Node C│       │   │
│   │   └──────────┘   │                   │     │        └──┬───┘       │   │
│   │                  │                   │     ▼           │           │   │
│   │   单一循环模式    │                   │  ┌──────┐       │           │   │
│   │                  │                   │  │Node D│◀──────┘           │   │
│   └──────────────────┘                   │  └──────┘                   │   │
│                                          │                              │   │
│                                          │  灵活的图结构                 │   │
│                                          └──────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

6.2 Graph API 深入剖析

6.2.1 基础图构建
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# 1. 定义状态 Schema
class State(TypedDict):
    text: str
    count: int

# 2. 定义节点函数
def node_a(state: State) -> dict:
    """处理节点 A"""
    return {
        "text": state["text"] + " -> A",
        "count": state["count"] + 1
    }

def node_b(state: State) -> dict:
    """处理节点 B"""
    return {
        "text": state["text"] + " -> B",
        "count": state["count"] + 1
    }

# 3. 构建图
graph = StateGraph(State)

# 添加节点
graph.add_node("node_a", node_a)
graph.add_node("node_b", node_b)

# 添加边
graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")
graph.add_edge("node_b", END)

# 4. 编译图
compiled_graph = graph.compile()

# 5. 执行
result = compiled_graph.invoke({
    "text": "开始",
    "count": 0
})

print(result)
# {'text': '开始 -> A -> B', 'count': 2}

6.3 State(状态管理)

6.3.1 使用 TypedDict 定义状态
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
from langgraph.graph.message import add_messages
from langchain.messages import AnyMessage

class AgentState(TypedDict):
    # 基础字段
    query: str
    
    # 使用 Annotated 指定归约器
    messages: Annotated[list[AnyMessage], add_messages]  # 消息累加
    steps: Annotated[list[str], add]                      # 步骤累加
    
    # 普通字段(覆盖更新)
    final_answer: str
    confidence: float
6.3.2 多 Schema 设计
# 输入 Schema
class InputState(TypedDict):
    user_query: str

# 输出 Schema
class OutputState(TypedDict):
    final_response: str
    source_documents: list[str]

# 内部完整 Schema
class FullState(TypedDict):
    user_query: str
    retrieved_docs: list[str]
    generated_response: str
    final_response: str
    source_documents: list[str]
    # 私有状态(不暴露给输入/输出)
    _internal_cache: dict

# 私有状态(节点间通信)
class PrivateState(TypedDict):
    intermediate_result: str

# 构建图时指定不同 Schema
builder = StateGraph(
    FullState,
    input_schema=InputState,
    output_schema=OutputState
)

6.4 Nodes(节点)与 Edges(边)

6.4.1 节点类型
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
from dataclasses import dataclass

@dataclass
class AppContext:
    user_id: str
    session_id: str

# 1. 简单节点
def simple_node(state: State) -> dict:
    return {"result": "处理完成"}

# 2. 带配置的节点
def node_with_config(state: State, config: RunnableConfig) -> dict:
    thread_id = config["configurable"]["thread_id"]
    return {"result": f"线程 {thread_id} 处理完成"}

# 3. 带运行时上下文的节点
def node_with_runtime(state: State, runtime: Runtime[AppContext]) -> dict:
    user_id = runtime.context.user_id
    
    # 访问持久化存储
    user_prefs = runtime.store.get(("users",), user_id)
    
    # 流式输出
    runtime.stream_writer(f"正在处理用户 {user_id} 的请求...")
    
    return {"result": f"用户 {user_id} 处理完成"}

# 添加节点
graph.add_node("simple", simple_node)
graph.add_node("with_config", node_with_config)
graph.add_node("with_runtime", node_with_runtime)
6.4.2 边类型
from langgraph.graph import START, END
from typing import Literal

# 1. 普通边:固定跳转
graph.add_edge("node_a", "node_b")
graph.add_edge(START, "node_a")
graph.add_edge("node_b", END)

# 2. 条件边:动态路由
def route_by_confidence(state: State) -> Literal["high_quality", "needs_review"]:
    """根据置信度路由"""
    if state["confidence"] > 0.8:
        return "high_quality"
    return "needs_review"

graph.add_conditional_edges(
    "evaluation_node",
    route_by_confidence,
    {
        "high_quality": "output_node",
        "needs_review": "human_review_node"
    }
)

# 3. 条件入口点
def route_by_intent(state: State) -> Literal["qa", "summarize", "chat"]:
    """根据用户意图路由到不同处理流程"""
    intent = classify_intent(state["user_query"])
    return intent

graph.add_conditional_edges(START, route_by_intent)

6.5 Reducers(归约器)

Reducers 决定了状态更新时如何合并新旧值。

from typing import Annotated
from operator import add
from langgraph.graph.message import add_messages

class AdvancedState(TypedDict):
    # 1. 默认归约器:覆盖
    current_step: str  # 新值覆盖旧值
    
    # 2. 累加归约器:使用 operator.add
    all_steps: Annotated[list[str], add]  # 列表累加
    token_count: Annotated[int, add]       # 数值累加
    
    # 3. 消息归约器:智能合并
    messages: Annotated[list[AnyMessage], add_messages]
    
    # 4. 自定义归约器
    scores: Annotated[list[float], lambda old, new: old + new if old else new]

# 自定义归约器函数
def merge_dicts(old: dict, new: dict) -> dict:
    """合并字典,新值优先"""
    if old is None:
        return new
    return {**old, **new}

class StateWithCustomReducer(TypedDict):
    metadata: Annotated[dict, merge_dicts]
add_messages 归约器详解
from langgraph.graph.message import add_messages, REMOVE_ALL_MESSAGES
from langchain.messages import HumanMessage, AIMessage, RemoveMessage

# add_messages 的智能行为:
# 1. 新消息追加到列表
# 2. 相同 ID 的消息会被更新
# 3. 支持删除操作

# 示例状态更新
state = {"messages": [HumanMessage(content="你好", id="msg1")]}

# 追加新消息
update1 = {"messages": [AIMessage(content="你好!", id="msg2")]}
# 结果: [HumanMessage("你好"), AIMessage("你好!")]

# 更新已有消息(相同 ID)
update2 = {"messages": [HumanMessage(content="你好呀", id="msg1")]}
# 结果: [HumanMessage("你好呀"), AIMessage("你好!")]

# 删除特定消息
update3 = {"messages": [RemoveMessage(id="msg1")]}
# 结果: [AIMessage("你好!")]

# 清空所有消息
update4 = {"messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES)]}
# 结果: []

7. 🔄 LangGraph 高级特性

7.1 条件边与动态路由

7.1.1 使用 Command 实现动态控制流
from langgraph.types import Command
from typing import Literal

def decision_node(state: State) -> Command[Literal["process", "reject", "escalate"]]:
    """根据条件动态决定下一步"""
    score = state["evaluation_score"]
    
    if score > 0.9:
        return Command(
            update={"status": "approved"},
            goto="process"
        )
    elif score < 0.3:
        return Command(
            update={"status": "rejected", "reason": "低分"},
            goto="reject"
        )
    else:
        return Command(
            update={"status": "pending_review"},
            goto="escalate"
        )

# 在子图中跳转到父图节点
def subgraph_node(state: State) -> Command[Literal["parent_handler"]]:
    """从子图跳转到父图"""
    return Command(
        update={"handoff_data": state["result"]},
        goto="parent_handler",
        graph=Command.PARENT  # 指定跳转到父图
    )
7.1.2 使用 Send 实现 Map-Reduce
from langgraph.types import Send

class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], add]

class JokeState(TypedDict):
    subject: str

def generate_subjects(state: OverallState) -> dict:
    """生成主题列表"""
    return {"subjects": ["猫", "狗", "程序员"]}

def continue_to_jokes(state: OverallState) -> list[Send]:
    """为每个主题创建一个并行任务"""
    return [
        Send("generate_joke", {"subject": subject})
        for subject in state["subjects"]
    ]

def generate_joke(state: JokeState) -> dict:
    """为单个主题生成笑话"""
    joke = f"为什么{state['subject']}喜欢编程?因为..."
    return {"jokes": [joke]}

# 构建 Map-Reduce 图
graph = StateGraph(OverallState)
graph.add_node("generate_subjects", generate_subjects)
graph.add_node("generate_joke", generate_joke)
graph.add_edge(START, "generate_subjects")
graph.add_conditional_edges("generate_subjects", continue_to_jokes)
graph.add_edge("generate_joke", END)

7.2 Human-in-the-Loop

人工干预是 LangGraph 的核心特性之一。

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command

class ApprovalState(TypedDict):
    request: str
    approval_status: str
    reviewer_notes: str

def submit_request(state: ApprovalState) -> dict:
    """提交请求"""
    return {"request": state["request"]}

def wait_for_approval(state: ApprovalState) -> dict:
    """等待人工审批 - 使用 interrupt"""
    # 中断执行,等待人工输入
    approval = interrupt({
        "type": "approval_required",
        "request": state["request"],
        "options": ["approve", "reject", "modify"]
    })
    
    return {
        "approval_status": approval["decision"],
        "reviewer_notes": approval.get("notes", "")
    }

def process_approved(state: ApprovalState) -> dict:
    """处理已批准的请求"""
    return {"status": "completed"}

def handle_rejection(state: ApprovalState) -> dict:
    """处理被拒绝的请求"""
    return {"status": "rejected"}

# 构建审批工作流
graph = StateGraph(ApprovalState)
graph.add_node("submit", submit_request)
graph.add_node("await_approval", wait_for_approval)
graph.add_node("process", process_approved)
graph.add_node("reject", handle_rejection)

graph.add_edge(START, "submit")
graph.add_edge("submit", "await_approval")
graph.add_conditional_edges(
    "await_approval",
    lambda s: "process" if s["approval_status"] == "approve" else "reject"
)
graph.add_edge("process", END)
graph.add_edge("reject", END)

# 编译时配置检查点
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

# 执行到中断点
config = {"configurable": {"thread_id": "approval-001"}}
result = app.invoke({"request": "申请服务器资源"}, config)

# 人工审批后恢复执行
app.invoke(
    Command(resume={"decision": "approve", "notes": "资源已分配"}),
    config
)

7.3 持久化与检查点

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver

# 1. 内存检查点(开发/测试)
from langgraph.checkpoint.memory import MemorySaver
memory_saver = MemorySaver()

# 2. SQLite 检查点(单机部署)
sqlite_saver = SqliteSaver.from_conn_string("checkpoints.db")

# 3. PostgreSQL 检查点(生产部署)
# postgres_saver = PostgresSaver.from_conn_string(
#     "postgresql://user:pass@localhost/checkpoints"
# )

# 编译图时使用检查点
app = graph.compile(checkpointer=memory_saver)

# 执行并保存状态
config = {"configurable": {"thread_id": "session-123"}}
result = app.invoke({"query": "你好"}, config)

# 获取检查点历史
for checkpoint in app.get_state_history(config):
    print(f"步骤: {checkpoint.metadata.get('langgraph_step')}")
    print(f"状态: {checkpoint.values}")
    print("---")

# 回滚到特定检查点
app.update_state(config, {"query": "重新开始"})

7.4 子图与多 Agent 编排

from langgraph.graph import StateGraph, START, END

# ============ 定义子图:研究 Agent ============
class ResearchState(TypedDict):
    topic: str
    findings: str

def research_node(state: ResearchState) -> dict:
    """执行研究任务"""
    return {"findings": f"关于 {state['topic']} 的研究结果..."}

research_graph = StateGraph(ResearchState)
research_graph.add_node("research", research_node)
research_graph.add_edge(START, "research")
research_graph.add_edge("research", END)
research_subgraph = research_graph.compile()

# ============ 定义子图:写作 Agent ============
class WritingState(TypedDict):
    content: str
    article: str

def writing_node(state: WritingState) -> dict:
    """执行写作任务"""
    return {"article": f"基于以下内容的文章: {state['content'][:100]}..."}

writing_graph = StateGraph(WritingState)
writing_graph.add_node("write", writing_node)
writing_graph.add_edge(START, "write")
writing_graph.add_edge("write", END)
writing_subgraph = writing_graph.compile()

# ============ 主图:协调多个子图 ============
class OrchestratorState(TypedDict):
    user_request: str
    research_topic: str
    research_result: str
    final_article: str

def parse_request(state: OrchestratorState) -> dict:
    """解析用户请求"""
    return {"research_topic": state["user_request"]}

def call_research_agent(state: OrchestratorState) -> dict:
    """调用研究子图"""
    result = research_subgraph.invoke({
        "topic": state["research_topic"]
    })
    return {"research_result": result["findings"]}

def call_writing_agent(state: OrchestratorState) -> dict:
    """调用写作子图"""
    result = writing_subgraph.invoke({
        "content": state["research_result"]
    })
    return {"final_article": result["article"]}

# 构建主图
main_graph = StateGraph(OrchestratorState)
main_graph.add_node("parse", parse_request)
main_graph.add_node("research_agent", call_research_agent)
main_graph.add_node("writing_agent", call_writing_agent)

main_graph.add_edge(START, "parse")
main_graph.add_edge("parse", "research_agent")
main_graph.add_edge("research_agent", "writing_agent")
main_graph.add_edge("writing_agent", END)

# 编译并运行
orchestrator = main_graph.compile()
result = orchestrator.invoke({
    "user_request": "写一篇关于人工智能发展历史的文章"
})
┌─────────────────────────────────────────────────────────────────────────────┐
│                           多 Agent 协作架构                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         Orchestrator (主图)                          │   │
│   │                                                                      │   │
│   │   ┌──────────┐      ┌─────────────────┐      ┌─────────────────┐    │   │
│   │   │  Parse   │─────▶│ Research Agent  │─────▶│ Writing Agent   │    │   │
│   │   └──────────┘      │    (子图)        │      │    (子图)        │    │   │
│   │                     └────────┬────────┘      └────────┬────────┘    │   │
│   │                              │                        │              │   │
│   └──────────────────────────────┼────────────────────────┼──────────────┘   │
│                                  │                        │                  │
│                    ┌─────────────┴─────────────┐  ┌──────┴──────────┐       │
│                    │                           │  │                 │       │
│                    ▼                           │  ▼                 │       │
│            ┌──────────────┐                    │ ┌──────────────┐   │       │
│            │   Research   │                    │ │    Write     │   │       │
│            │    Node      │                    │ │    Node      │   │       │
│            └──────────────┘                    │ └──────────────┘   │       │
│                    │                           │        │           │       │
│                    ▼                           │        ▼           │       │
│              研究结果 ────────────────────────────────▶ 最终文章      │       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

8. 🛠️ 实战案例

8.1 案例一:智能客服 Agent

from langchain.agents import create_agent, AgentState
from langchain.tools import tool, ToolRuntime
from langchain_openai import ChatOpenAI
from dataclasses import dataclass

# 定义用户上下文
@dataclass
class CustomerContext:
    customer_id: str
    tier: str  # "standard", "premium", "vip"

# 模拟客户数据库
CUSTOMER_DB = {
    "C001": {"name": "张三", "orders": ["ORD001", "ORD002"], "balance": 1500},
    "C002": {"name": "李四", "orders": ["ORD003"], "balance": 300},
}

ORDER_DB = {
    "ORD001": {"product": "笔记本电脑", "status": "已发货", "amount": 5999},
    "ORD002": {"product": "无线鼠标", "status": "待发货", "amount": 199},
    "ORD003": {"product": "键盘", "status": "已完成", "amount": 399},
}

# 定义客服工具
@tool
def get_customer_info(runtime: ToolRuntime[CustomerContext]) -> str:
    """获取当前客户的账户信息"""
    customer_id = runtime.context.customer_id
    customer = CUSTOMER_DB.get(customer_id)
    if customer:
        return f"""
客户信息:
- 姓名:{customer['name']}
- 账户余额:{customer['balance']} 元
- 订单数量:{len(customer['orders'])} 个
- 会员等级:{runtime.context.tier}
"""
    return "未找到客户信息"

@tool
def query_order_status(order_id: str) -> str:
    """查询订单状态
    
    Args:
        order_id: 订单编号,如 ORD001
    """
    order = ORDER_DB.get(order_id)
    if order:
        return f"订单 {order_id}{order['product']},状态:{order['status']},金额:{order['amount']} 元"
    return f"未找到订单 {order_id}"

@tool
def list_recent_orders(runtime: ToolRuntime[CustomerContext]) -> str:
    """列出客户最近的订单"""
    customer_id = runtime.context.customer_id
    customer = CUSTOMER_DB.get(customer_id)
    if not customer:
        return "未找到客户信息"
    
    orders_info = []
    for order_id in customer["orders"]:
        order = ORDER_DB.get(order_id)
        if order:
            orders_info.append(f"- {order_id}: {order['product']} ({order['status']})")
    
    return "最近订单:\n" + "\n".join(orders_info) if orders_info else "暂无订单"

@tool
def request_refund(order_id: str, reason: str) -> str:
    """申请退款
    
    Args:
        order_id: 要退款的订单编号
        reason: 退款原因
    """
    order = ORDER_DB.get(order_id)
    if not order:
        return f"订单 {order_id} 不存在"
    
    if order["status"] == "已完成":
        return f"退款申请已提交。订单 {order_id}{order['product']}),退款原因:{reason}。预计 3-5 个工作日处理。"
    elif order["status"] == "已发货":
        return f"订单 {order_id} 正在配送中,请先拒收后再申请退款。"
    else:
        return f"订单 {order_id} 尚未发货,已直接取消,款项将原路退回。"

# 创建客服 Agent
customer_service_agent = create_agent(
    model=ChatOpenAI(model="gpt-4o", temperature=0.3),
    tools=[get_customer_info, query_order_status, list_recent_orders, request_refund],
    system_prompt="""你是一名专业的电商客服助手。请遵循以下原则:

1. 始终保持礼貌和专业
2. 优先查询客户信息以提供个性化服务
3. 对于退款等敏感操作,先确认客户意图
4. VIP 客户享有优先处理权
5. 如果无法解决问题,建议转人工客服

当前日期:2024年12月15日
""",
    context_schema=CustomerContext
)

# 使用示例
result = customer_service_agent.invoke(
    {"messages": [{"role": "user", "content": "我想查一下我最近的订单状态,还有那个键盘能退款吗?"}]},
    context=CustomerContext(customer_id="C002", tier="premium")
)

print(result["messages"][-1].content)

8.2 案例二:多步骤数据分析工作流

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from typing import Annotated
from typing_extensions import TypedDict
import pandas as pd
import json

class AnalysisState(TypedDict):
    # 输入
    raw_data: str
    analysis_goal: str
    
    # 处理过程
    cleaned_data: str
    statistics: str
    insights: Annotated[list[str], lambda x, y: (x or []) + y]
    
    # 输出
    final_report: str
    visualizations: list[str]

model = ChatOpenAI(model="gpt-4o", temperature=0)

# 节点 1:数据清洗
def clean_data(state: AnalysisState) -> dict:
    """清洗和预处理数据"""
    prompt = ChatPromptTemplate.from_template("""
你是一个数据工程师。请分析以下原始数据,识别并处理:
1. 缺失值
2. 异常值
3. 数据类型问题
4. 重复记录

原始数据:
{raw_data}

请返回清洗后的数据摘要和处理说明。
""")
    
    chain = prompt | model
    result = chain.invoke({"raw_data": state["raw_data"]})
    
    return {"cleaned_data": result.content}

# 节点 2:统计分析
def compute_statistics(state: AnalysisState) -> dict:
    """计算描述性统计"""
    prompt = ChatPromptTemplate.from_template("""
基于清洗后的数据,计算以下统计指标:
1. 基本描述统计(均值、中位数、标准差等)
2. 分布特征
3. 相关性分析

清洗后数据:
{cleaned_data}

分析目标:{goal}

请提供详细的统计结果。
""")
    
    chain = prompt | model
    result = chain.invoke({
        "cleaned_data": state["cleaned_data"],
        "goal": state["analysis_goal"]
    })
    
    return {"statistics": result.content}

# 节点 3:洞察提取
def extract_insights(state: AnalysisState) -> dict:
    """从数据中提取关键洞察"""
    prompt = ChatPromptTemplate.from_template("""
基于统计分析结果,提取 3-5 个关键业务洞察。

统计结果:
{statistics}

分析目标:{goal}

请用简洁的语言描述每个洞察,并说明其业务意义。
""")
    
    chain = prompt | model
    result = chain.invoke({
        "statistics": state["statistics"],
        "goal": state["analysis_goal"]
    })
    
    # 解析洞察列表
    insights = [line.strip() for line in result.content.split("\n") if line.strip()]
    
    return {"insights": insights}

# 节点 4:生成报告
def generate_report(state: AnalysisState) -> dict:
    """生成最终分析报告"""
    prompt = ChatPromptTemplate.from_template("""
请生成一份专业的数据分析报告,包含以下部分:

## 1. 执行摘要
## 2. 数据概述
## 3. 分析方法
## 4. 主要发现
## 5. 建议措施
## 6. 附录:详细统计

数据清洗结果:
{cleaned_data}

统计分析:
{statistics}

关键洞察:
{insights}

分析目标:{goal}
""")
    
    chain = prompt | model
    result = chain.invoke({
        "cleaned_data": state["cleaned_data"],
        "statistics": state["statistics"],
        "insights": "\n".join(state["insights"]),
        "goal": state["analysis_goal"]
    })
    
    return {"final_report": result.content}

# 节点 5:生成可视化建议
def suggest_visualizations(state: AnalysisState) -> dict:
    """建议数据可视化方案"""
    prompt = ChatPromptTemplate.from_template("""
基于分析结果,建议 3 种最适合的数据可视化方案。

对于每种可视化,请说明:
1. 图表类型
2. 使用的数据字段
3. 能够展示的洞察

统计结果:
{statistics}

关键洞察:
{insights}
""")
    
    chain = prompt | model
    result = chain.invoke({
        "statistics": state["statistics"],
        "insights": "\n".join(state["insights"])
    })
    
    return {"visualizations": [result.content]}

# 构建分析工作流图
analysis_graph = StateGraph(AnalysisState)

# 添加节点
analysis_graph.add_node("clean", clean_data)
analysis_graph.add_node("statistics", compute_statistics)
analysis_graph.add_node("insights", extract_insights)
analysis_graph.add_node("report", generate_report)
analysis_graph.add_node("visualize", suggest_visualizations)

# 添加边(定义执行顺序)
analysis_graph.add_edge(START, "clean")
analysis_graph.add_edge("clean", "statistics")
analysis_graph.add_edge("statistics", "insights")
analysis_graph.add_edge("insights", "report")
analysis_graph.add_edge("insights", "visualize")  # 并行执行
analysis_graph.add_edge("report", END)
analysis_graph.add_edge("visualize", END)

# 编译图
analysis_pipeline = analysis_graph.compile()

# 使用示例
sample_data = """
日期,销售额,客户数,退货率
2024-01,150000,320,0.05
2024-02,180000,380,0.04
2024-03,165000,350,0.06
2024-04,null,290,0.08
2024-05,210000,420,0.03
2024-06,195000,400,0.04
"""

result = analysis_pipeline.invoke({
    "raw_data": sample_data,
    "analysis_goal": "分析销售趋势并识别改进机会",
    "insights": []
})

print("=== 最终报告 ===")
print(result["final_report"])

8.3 案例三:多 Agent 协作系统

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from typing import Annotated, Literal
from typing_extensions import TypedDict
from operator import add

# ============ 定义共享状态 ============
class TeamState(TypedDict):
    task: str
    current_agent: str
    research_notes: Annotated[list[str], add]
    code_snippets: Annotated[list[str], add]
    review_comments: Annotated[list[str], add]
    final_output: str
    iteration_count: int

model = ChatOpenAI(model="gpt-4o", temperature=0.7)

# ============ 研究员 Agent ============
@tool
def web_search(query: str) -> str:
    """搜索网络获取信息"""
    return f"搜索结果: 关于 '{query}' 的相关信息..."

@tool
def read_documentation(topic: str) -> str:
    """阅读技术文档"""
    return f"文档内容: {topic} 的详细说明..."

def researcher_agent(state: TeamState) -> dict:
    """研究员:负责收集和整理信息"""
    from langchain.agents import create_agent
    
    agent = create_agent(
        model=model,
        tools=[web_search, read_documentation],
        system_prompt="""你是团队的研究员。你的职责是:
1. 理解任务需求
2. 搜索相关资料
3. 整理关键信息供开发者使用

当前任务:{task}
请收集相关信息。"""
    )
    
    result = agent.invoke({
        "messages": [{"role": "user", "content": f"研究任务:{state['task']}"}]
    })
    
    return {
        "research_notes": [result["messages"][-1].content],
        "current_agent": "researcher"
    }

# ============ 开发者 Agent ============
@tool
def write_code(description: str, language: str = "python") -> str:
    """编写代码"""
    return f"```{language}\n# 根据 {description} 编写的代码\ndef solution():\n    pass\n```"

@tool
def run_tests(code: str) -> str:
    """运行测试"""
    return "测试结果: 全部通过 ✓"

def developer_agent(state: TeamState) -> dict:
    """开发者:负责编写代码"""
    from langchain.agents import create_agent
    
    agent = create_agent(
        model=model,
        tools=[write_code, run_tests],
        system_prompt="""你是团队的开发者。你的职责是:
1. 根据研究结果编写代码
2. 确保代码质量和可测试性
3. 处理代码审查反馈

研究笔记:
{research_notes}

请基于这些信息编写解决方案。"""
    )
    
    result = agent.invoke({
        "messages": [{"role": "user", "content": f"开发任务:{state['task']}\n\n研究笔记:{state['research_notes']}"}]
    })
    
    return {
        "code_snippets": [result["messages"][-1].content],
        "current_agent": "developer"
    }

# ============ 审查员 Agent ============
def reviewer_agent(state: TeamState) -> dict:
    """审查员:负责代码审查"""
    from langchain.prompts import ChatPromptTemplate
    
    prompt = ChatPromptTemplate.from_template("""
你是团队的代码审查员。请审查以下代码:

代码:
{code}

请检查:
1. 代码逻辑是否正确
2. 是否有潜在的 bug
3. 代码风格和可读性
4. 性能问题

如果代码质量良好,回复 "APPROVED"。
如果需要修改,提供具体建议。
""")
    
    chain = prompt | model
    result = chain.invoke({"code": "\n".join(state["code_snippets"])})
    
    content = result.content
    is_approved = "APPROVED" in content.upper()
    
    return {
        "review_comments": [content],
        "current_agent": "reviewer",
        "iteration_count": state["iteration_count"] + 1
    }

# ============ 协调器:决定下一步 ============
def coordinator(state: TeamState) -> Command[Literal["researcher", "developer", "reviewer", "finalize"]]:
    """协调各个 Agent 的工作流程"""
    
    current = state.get("current_agent", "")
    iterations = state.get("iteration_count", 0)
    
    # 流程逻辑
    if not current:
        # 开始:先研究
        return Command(goto="researcher")
    elif current == "researcher":
        # 研究完成,开始开发
        return Command(goto="developer")
    elif current == "developer":
        # 开发完成,进行审查
        return Command(goto="reviewer")
    elif current == "reviewer":
        # 检查审查结果
        last_review = state["review_comments"][-1] if state["review_comments"] else ""
        
        if "APPROVED" in last_review.upper() or iterations >= 3:
            # 审查通过或达到最大迭代次数
            return Command(goto="finalize")
        else:
            # 需要修改,返回开发
            return Command(goto="developer")
    
    return Command(goto="finalize")

# ============ 最终整合 ============
def finalize(state: TeamState) -> dict:
    """整合所有工作成果"""
    output = f"""
# 任务完成报告

## 原始任务
{state['task']}

## 研究成果
{chr(10).join(state['research_notes'])}

## 最终代码
{chr(10).join(state['code_snippets'])}

## 审查意见
{chr(10).join(state['review_comments'])}

## 迭代次数
{state['iteration_count']}
"""
    return {"final_output": output}

# ============ 构建协作图 ============
team_graph = StateGraph(TeamState)

# 添加所有 Agent 节点
team_graph.add_node("coordinator", coordinator)
team_graph.add_node("researcher", researcher_agent)
team_graph.add_node("developer", developer_agent)
team_graph.add_node("reviewer", reviewer_agent)
team_graph.add_node("finalize", finalize)

# 从 START 进入协调器
team_graph.add_edge(START, "coordinator")

# 从各个 Agent 返回协调器
team_graph.add_edge("researcher", "coordinator")
team_graph.add_edge("developer", "coordinator")
team_graph.add_edge("reviewer", "coordinator")

# 最终输出
team_graph.add_edge("finalize", END)

# 编译
team = team_graph.compile()

# 运行
result = team.invoke({
    "task": "实现一个 Python 函数,计算斐波那契数列的第 n 项",
    "research_notes": [],
    "code_snippets": [],
    "review_comments": [],
    "final_output": "",
    "iteration_count": 0
})

print(result["final_output"])
┌─────────────────────────────────────────────────────────────────────────────┐
│                        多 Agent 协作流程图                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                           ┌──────────────┐                                  │
│                           │   START      │                                  │
│                           └──────┬───────┘                                  │
│                                  │                                          │
│                                  ▼                                          │
│                        ┌──────────────────┐                                 │
│              ┌────────▶│  Coordinator     │◀────────┐                       │
│              │         │  (协调决策)       │         │                       │
│              │         └────────┬─────────┘         │                       │
│              │                  │                   │                       │
│              │    ┌─────────────┼─────────────┐     │                       │
│              │    │             │             │     │                       │
│              │    ▼             ▼             ▼     │                       │
│              │ ┌──────────┐ ┌──────────┐ ┌──────────┐                       │
│              │ │Researcher│ │Developer │ │Reviewer  │                       │
│              │ │ (研究)    │ │ (开发)   │ │ (审查)   │                       │
│              │ └────┬─────┘ └────┬─────┘ └────┬─────┘                       │
│              │      │            │            │     │                       │
│              └──────┴────────────┴────────────┴─────┘                       │
│                                  │                                          │
│                                  │ (完成条件满足)                            │
│                                  ▼                                          │
│                        ┌──────────────────┐                                 │
│                        │    Finalize      │                                 │
│                        │   (最终整合)      │                                 │
│                        └────────┬─────────┘                                 │
│                                 │                                           │
│                                 ▼                                           │
│                           ┌──────────┐                                      │
│                           │   END    │                                      │
│                           └──────────┘                                      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

9. 🎯 LangChain vs LangGraph:如何选择?

决策流程图

┌─────────────────────────────────────────────────────────────────────────────┐
│                          框架选择决策树                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                          你的需求是什么?                                     │
│                               │                                             │
│               ┌───────────────┼───────────────┐                             │
│               │               │               │                             │
│               ▼               ▼               ▼                             │
│        ┌───────────┐   ┌───────────┐   ┌───────────┐                       │
│        │ 简单对话  │   │ 工具调用  │   │ 复杂工作流 │                       │
│        │   应用    │   │  Agent   │   │   系统    │                       │
│        └─────┬─────┘   └─────┬─────┘   └─────┬─────┘                       │
│              │               │               │                             │
│              ▼               │               │                             │
│     ┌──────────────┐         │               │                             │
│     │ LangChain    │         │               │                             │
│     │   Chain      │         │               │                             │
│     └──────────────┘         │               │                             │
│                              │               │                             │
│                    需要人工干预?            │                             │
│                        │                     │                             │
│              ┌─────────┼─────────┐           │                             │
│              │         │         │           │                             │
│              No        │        Yes          │                             │
│              │         │         │           │                             │
│              ▼         │         ▼           │                             │
│     ┌──────────────┐   │   ┌──────────────┐  │                             │
│     │ LangChain    │   │   │  LangGraph   │  │                             │
│     │ create_agent │   │   │ + Interrupt  │  │                             │
│     └──────────────┘   │   └──────────────┘  │                             │
│                        │                     │                             │
│                需要故障恢复?                │                             │
│                    │                         ▼                             │
│          ┌─────────┼─────────┐     ┌──────────────┐                        │
│          │         │         │     │  LangGraph   │                        │
│          No        │        Yes    │  (推荐)      │                        │
│          │         │         │     └──────────────┘                        │
│          ▼         │         ▼                                             │
│   ┌──────────────┐ │   ┌──────────────┐                                    │
│   │ LangChain    │ │   │  LangGraph   │                                    │
│   │ create_agent │ │   │ + Checkpoint │                                    │
│   └──────────────┘ │   └──────────────┘                                    │
│                    │                                                        │
└─────────────────────────────────────────────────────────────────────────────┘

对比总结表

维度 LangChain (create_agent) LangGraph
学习曲线 简单,快速上手 较陡,需理解图论概念
适用场景 简单 Agent、快速原型 复杂工作流、生产系统
控制粒度 高层抽象 细粒度控制
状态管理 自动处理 显式定义 Schema
执行流程 固定 ReAct 循环 自定义图结构
人工干预 有限支持 完整 HITL 支持
故障恢复 需自行实现 内置检查点
并行执行 不支持 原生支持
可视化 有限 图形化调试

💡 建议

  1. 初学者/快速原型:从 LangChain create_agent 开始
  2. 需要人工审批:使用 LangGraph + interrupt
  3. 多步骤工作流:使用 LangGraph StateGraph
  4. 多 Agent 系统:使用 LangGraph 子图
  5. 生产部署:LangGraph + 持久化检查点 + LangSmith

10. 🔮 最佳实践与性能优化

10.1 状态设计最佳实践

from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages

# ✅ 好的状态设计
class WellDesignedState(TypedDict):
    # 1. 使用明确的类型注解
    messages: Annotated[list, add_messages]
    
    # 2. 分离输入、处理、输出字段
    user_input: str              # 输入
    processed_data: dict         # 中间状态
    final_result: str            # 输出
    
    # 3. 包含必要的元数据
    created_at: str
    updated_at: str
    version: int

# ❌ 避免的设计
class PoorlyDesignedState(TypedDict):
    data: dict  # 太宽泛,难以追踪
    temp: str   # 命名不清晰

10.2 递归限制与防护

from langgraph.managed import RemainingSteps
from langgraph.graph import StateGraph, START, END
from typing import Literal

class SafeState(TypedDict):
    messages: list
    remaining_steps: RemainingSteps  # 自动追踪剩余步骤

def safe_agent_node(state: SafeState) -> dict:
    """带有递归保护的 Agent 节点"""
    remaining = state["remaining_steps"]
    
    if remaining <= 2:
        # 接近限制,返回安全响应
        return {"messages": [{"role": "assistant", "content": "正在总结当前进度..."}]}
    
    # 正常处理
    return {"messages": [{"role": "assistant", "content": "处理中..."}]}

def safe_router(state: SafeState) -> Literal["continue", "finalize"]:
    """安全的路由决策"""
    if state["remaining_steps"] <= 2:
        return "finalize"
    return "continue"

# 设置递归限制
result = graph.invoke(
    {"messages": []},
    config={"recursion_limit": 25}  # 明确设置限制
)

10.3 流式输出优化

# 使用 stream_mode 获取实时更新
for event in graph.stream(
    {"messages": [{"role": "user", "content": "分析数据"}]},
    stream_mode="updates"  # 只返回增量更新
):
    node_name = list(event.keys())[0]
    node_output = event[node_name]
    print(f"[{node_name}] {node_output}")

# 在工具中使用 stream_writer
@tool
def long_running_task(runtime: ToolRuntime) -> str:
    """长时间运行的任务,实时反馈进度"""
    writer = runtime.stream_writer
    
    writer("开始处理...")
    # 模拟耗时操作
    for i in range(5):
        time.sleep(1)
        writer(f"进度: {(i+1)*20}%")
    
    return "任务完成"

10.4 节点缓存

from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy
import time

class State(TypedDict):
    query: str
    result: str

def expensive_computation(state: State) -> dict:
    """模拟耗时计算"""
    time.sleep(3)  # 耗时操作
    return {"result": f"计算结果: {state['query']}"}

# 配置缓存策略
builder = StateGraph(State)
builder.add_node(
    "compute",
    expensive_computation,
    cache_policy=CachePolicy(ttl=3600)  # 缓存 1 小时
)

# 编译时启用缓存
graph = builder.compile(cache=InMemoryCache())

# 第一次调用:3秒
result1 = graph.invoke({"query": "test"})

# 第二次调用相同输入:立即返回(使用缓存)
result2 = graph.invoke({"query": "test"})

10.5 错误处理与重试

from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage
import time

class RetryMiddleware:
    """带重试的工具调用中间件"""
    
    def __init__(self, max_retries: int = 3, delay: float = 1.0):
        self.max_retries = max_retries
        self.delay = delay
    
    @wrap_tool_call
    def __call__(self, request, handler):
        last_error = None
        
        for attempt in range(self.max_retries):
            try:
                return handler(request)
            except Exception as e:
                last_error = e
                if attempt < self.max_retries - 1:
                    time.sleep(self.delay * (attempt + 1))  # 指数退避
                    continue
        
        # 所有重试失败
        return ToolMessage(
            content=f"工具调用失败(重试 {self.max_retries} 次): {str(last_error)}",
            tool_call_id=request.tool_call["id"]
        )

agent = create_agent(
    model="gpt-4o",
    tools=tools,
    middleware=[RetryMiddleware(max_retries=3)]
)

11. 📚 总结与展望

核心要点回顾

┌─────────────────────────────────────────────────────────────────────────────┐
│                          LangChain & LangGraph 知识图谱                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                            ┌──────────────┐                                 │
│                            │  LangChain   │                                 │
│                            │    生态系统   │                                 │
│                            └──────┬───────┘                                 │
│                                   │                                         │
│           ┌───────────────────────┼───────────────────────┐                 │
│           │                       │                       │                 │
│           ▼                       ▼                       ▼                 │
│    ┌─────────────┐         ┌─────────────┐         ┌─────────────┐         │
│    │  LangChain  │         │  LangGraph  │         │  LangSmith  │         │
│    │   (组件库)   │         │  (编排引擎)  │         │  (可观测性)  │         │
│    └──────┬──────┘         └──────┬──────┘         └─────────────┘         │
│           │                       │                                         │
│    ┌──────┴──────┐         ┌──────┴──────┐                                  │
│    │             │         │             │                                  │
│    ▼             ▼         ▼             ▼                                  │
│ ┌──────┐    ┌──────┐   ┌──────┐    ┌──────────┐                            │
│ │Models│    │Tools │   │State │    │Checkpoint│                            │
│ │Chain │    │Agent │   │Nodes │    │Interrupt │                            │
│ │Memory│    │LCEL  │   │Edges │    │Subgraph  │                            │
│ └──────┘    └──────┘   └──────┘    └──────────┘                            │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

学习路径建议

  1. 入门阶段

    • 掌握 LangChain 基础组件(Models, Messages, Tools)
    • 学会使用 create_agent 构建简单 Agent
    • 理解 LCEL 表达式语言
  2. 进阶阶段

    • 深入 LangGraph 状态机概念
    • 实践条件边和动态路由
    • 掌握 Human-in-the-Loop 模式
  3. 高级阶段

    • 设计多 Agent 协作系统
    • 实现持久化和故障恢复
    • 优化生产环境性能

未来展望

随着 AI Agent 技术的快速发展,LangChain 和 LangGraph 也在不断演进:

  1. 更强大的编排能力:支持更复杂的工作流模式
  2. 更好的可观测性:与 LangSmith 更深度集成
  3. 更丰富的集成:支持更多 LLM 提供商和工具
  4. 更低的延迟:优化执行引擎性能
  5. 更强的安全性:内置更多安全防护机制

12. 📖 参考文献

官方文档

  1. LangChain Documentation

    • 主站: https://docs.langchain.com/oss/python/langchain/overview
    • Agents: https://docs.langchain.com/oss/python/langchain/agents
    • Tools: https://docs.langchain.com/oss/python/langchain/tools
  2. LangGraph Documentation

    • 主站: https://docs.langchain.com/oss/python/langgraph/overview
    • Graph API: https://docs.langchain.com/oss/python/langgraph/graph-api
    • Concepts: https://docs.langchain.com/oss/python/langgraph/concepts
  3. LangSmith Documentation

    • 主站: https://docs.langchain.com/langsmith/home

GitHub 仓库

  1. LangChain

    • https://github.com/langchain-ai/langchain
    • 描述:🦜🔗 The platform for reliable agents
  2. LangGraph

    • https://github.com/langchain-ai/langgraph
    • 描述:Build resilient language agents as graphs

API 参考

  1. LangChain API Reference

    • https://reference.langchain.com/python/langchain
  2. LangGraph API Reference

    • https://reference.langchain.com/python/langgraph

学习资源

  1. LangChain Academy

    • https://academy.langchain.com/
    • 免费官方课程
  2. LangChain Forum

    • https://forum.langchain.com/
    • 社区讨论

学术参考

  1. ReAct: Synergizing Reasoning and Acting in Language Models

    • Yao et al., 2022
    • https://arxiv.org/abs/2210.03629
  2. Pregel: A System for Large-Scale Graph Processing

    • Google Research
    • https://research.google/pubs/pub37252/

📝 作者注:本文基于 LangChain 和 LangGraph 2024-2025 年最新版本编写。由于框架更新较快,建议读者参考官方文档获取最新 API 变更。

🔗 系列导航:本文是【Agents篇】系列第 11 篇,更多内容请关注系列其他文章。

💬 反馈交流:欢迎在评论区留言讨论,一起探索 AI Agent 开发的无限可能!


Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐