在这里插入图片描述

🔥 系列进度:LangGraph系列已更新5篇核心内容,从Checkpointer状态持久化到人机协作(HIL),累计帮助上千名开发者落地生产级Agent!今天聚焦Agent工具集成的核心标准——模型上下文协议(MCP),实现"一次编写,到处运行",让工具复用、跨框架迁移不再内耗。


一、引言:终结Agent工具集成的"战国时代"

做过大模型Agent开发的同学,几乎都被工具集成的"内耗"折磨过:

  • 切换框架必返工:用LangChain写的工具,切换到AutoGen、CrewAI就必须重写调用逻辑;
  • 模型适配成本高:OpenAI、Anthropic等不同供应商的Function Calling格式不统一;
  • 工具复用性极差:同一个工具(如天气查询、文件写入),在不同项目中需要重复编写;
  • 安全性难以保障:权限控制、资源审计全靠业务层硬编码,易出现路径穿越、越权访问等安全隐患。

模型上下文协议(Model Context Protocol, MCP) 正是为了终结这种混乱——它是基于 JSON-RPC 2.0 的开放标准,实现了工具定义与框架、模型的解耦,让工具"一次编写,到处运行"。

核心结论:MCP 不是简单的"接口规范",而是 Agentic Workflow 迈向工业化标准化的基石,是 2025 年每一位大模型开发者必须掌握的核心协议。


二、架构深度解析:MCP 与 LangGraph 的强强联手

LangGraph 与 MCP 的结合,是"复杂工作流"与"标准化工具"的完美搭配——LangGraph 负责搭建有状态、可循环的复杂工作流,MCP 负责提供标准化的工具接入,二者协同构建出工业级 Agent 的核心架构。

2.1 MCP 与 LangGraph 架构协同图

服务端(MCP Server)

客户端(LangGraph Agent)

基于 JSON-RPC 2.0 通信

LangGraph 状态机

MCP Client

Agent 决策节点

ToolNode 工具调用节点

全局 State(显式共享状态)

MCP Server

工具注册与管理

安全审计:路径过滤 / 鉴权

工具逻辑执行

说明:原图中存在两处重复节点 ID(BF),Mermaid 不允许同一节点 ID 在 subgraph 中既作为源又作为目标出现重复定义,已修正为规范写法。

2.2 传统 Function Calling 与 MCP 标准对比

特性 传统 Function Calling MCP 标准(基于 JSON-RPC 2.0)
扩展性 强耦合于特定模型 API,切换模型/框架需重写 插件化架构,与框架、模型解耦,支持跨生态复用
复用性 工具代码需为不同项目重复编写 “一次编写,到处运行”,标准 Server 接入,跨项目复用
安全性 权限控制通常在业务层硬编码,易遗漏 协议层原生支持资源审计、路径过滤与标准鉴权
控制流 主要是线性触发,状态传递隐式 深度集成图状态机,支持显式共享状态管理
适用场景 简单工具调用、快速 MVP 开发 工业级 Agent、多框架协同、高安全要求场景

2.3 MCP 通信流交互图

MCP Client(LangGraph Agent)与 MCP Server 之间的交互遵循严格的 JSON-RPC 2.0 规范,分为 4 个阶段:

MCP Server (Tools) LangGraph Agent (MCP Client) MCP Server (Tools) LangGraph Agent (MCP Client) 第一阶段:初始化(协商版本/功能) 第二阶段:工具发现(动态获取工具列表) 第三阶段:工具执行(核心交互) 第四阶段:状态更新(LangGraph 更新全局 State) initialize(携带客户端版本、支持的协议类型) initialized(返回服务端元数据、支持的工具类型) list_tools(请求获取所有可用工具) tools_metadata(返回工具名称、描述、参数 Schema) call_tool(工具名、结构化参数、可选状态上下文) 执行本地逻辑(调用外部 API / 文件操作 / 数据库) 安全审计(路径过滤、权限校验) tool_result(结构化结果、执行状态、错误信息) 更新 AgentState(存入工具返回结果、执行日志)

三、环境准备:工业级实战基座

⚠️ 版本说明:以下版本号为写作时的参考版本,实际安装时建议查阅各库最新稳定版。langgraph 当前正式版已达 0.2.xfastmcp 已达 2.x,请以 PyPI 实际发布为准。

# 1. 安装 LangGraph 图编排引擎
pip install "langgraph>=0.2.0"

# 2. 安装 MCP 协议核心库与 FastMCP 开发框架
pip install "mcp>=1.0.0" "fastmcp>=2.0.0"

# 3. 安装 LangChain 集成库(按需选择模型提供商)
pip install "langchain-anthropic>=0.2.0" "langchain-openai>=0.2.0"

# 4. 可选:安装 LangSmith(全链路追踪,生产级监控必备)
pip install "langsmith>=0.1.100"

# 5. 可选:安装 PostgreSQL 依赖(Checkpointer 持久化)
pip install psycopg2-binary

# 验证安装
python -c "import langgraph, mcp, fastmcp; print('依赖安装成功')"

四、实战演示 A:用 FastMCP 开发本地工具服务器(生产级)

本节开发一个具备"实时天气查询"和"本地报告写入"功能的工具服务器,重点实现路径过滤安全审计(防止路径穿越攻击)。

# mcp_server.py
from fastmcp import FastMCP
import os

# 1. 初始化工业级 MCP 工具服务器
mcp = FastMCP("Production_Tool_Server", version="1.0.0")

# 2. 注册工具1:实时天气查询
@mcp.tool()
def get_weather(location: str) -> str:
    """获取指定地点的实时天气信息。

    Args:
        location: 地点名称(如"北京"、"上海"),支持国内主要城市

    Returns:
        str: 结构化的天气信息,包含天气状况、气温、体感
    """
    try:
        # 模拟不同城市的天气返回(生产级可替换为真实天气 API)
        city_weather = {
            "北京": "晴转多云,气温 22°C,体感舒适,东北风2级",
            "上海": "阴有小雨,气温 19°C,体感微凉,东南风3级",
            "广州": "晴,气温 28°C,体感炎热,南风1级",
        }
        return city_weather.get(location, f"暂未获取到 {location} 的天气信息,请检查地点名称")
    except Exception as e:
        return f"天气查询失败:{e}"


# 3. 注册工具2:本地报告写入(含路径过滤安全审计)
@mcp.tool()
def save_report(filename: str, content: str) -> str:
    """将 Agent 生成的报告安全地写入本地指定目录。

    Args:
        filename: 报告文件名(需包含后缀,如"report_2025.txt"),禁止包含路径分隔符
        content: 报告内容(字符串格式)

    Returns:
        str: 执行结果(成功提示或错误信息)
    """
    # ── 安全审计:防止路径穿越攻击 ──────────────────────────────
    # 拒绝文件名中包含路径分隔符,例如 "../../etc/passwd"
    if os.sep in filename or (os.altsep and os.altsep in filename):
        return "Error: 文件名不得包含路径分隔符,拒绝写入。"

    safe_dir = os.path.abspath("./agent_outputs")
    os.makedirs(safe_dir, exist_ok=True)

    target_path = os.path.abspath(os.path.join(safe_dir, filename))
    # 二次校验:确保最终路径在安全目录内
    if not target_path.startswith(safe_dir + os.sep):
        return "Error: 越权访问,禁止写入安全目录以外的路径。"
    # ─────────────────────────────────────────────────────────────

    try:
        with open(target_path, "w", encoding="utf-8") as f:
            f.write(content)
        return f"报告已成功持久化至: {target_path}"
    except OSError as e:
        return f"报告写入失败:{e}"


if __name__ == "__main__":
    # 启动 MCP Server(默认 stdio 模式,HTTP 模式请参考 FastMCP 文档)
    mcp.run()

避坑提示

  • 原代码中 target_path.startswith(safe_dir) 存在路径前缀混淆漏洞——若 safe_dir/app/outputs,攻击者传入 /app/outputs_evil/x 也会通过校验。修正方式:将比较改为 startswith(safe_dir + os.sep),同时在入口处拒绝含路径分隔符的文件名。
  • fastmcp 2.x 默认通信方式为 stdio,不再直接支持 mcp.run(port=8000) 的 HTTP 启动方式;若需 HTTP,请参考 FastMCP 文档配置 transport="http"

五、实战演示 B:将 MCP 服务接入 LangGraph 客户端(核心实战)

⚠️ 重要说明:原代码中使用的 from mcp.client import MCPClient 并不存在于 mcp 官方 SDKmcp Python SDK 的客户端 API 为异步接口,且 LangGraph 官方提供了 langchain-mcp-adapters 作为标准集成方案。以下代码已按官方方式重写。

# langgraph_agent.py
import asyncio
from typing import Annotated, List
import operator

from langchain_anthropic import ChatAnthropic
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode, tools_condition
from typing_extensions import TypedDict


# ── 1. 定义有状态的数据结构 ────────────────────────────────────────
class AgentState(TypedDict):
    messages: Annotated[List, operator.add]   # 对话历史 / 工具调用记录
    approval_status: str                       # pending / approved / rejected
    tool_result: str                           # 工具调用结果缓存(可选辅助字段)


# ── 2. 构建 LangGraph Agent(异步,官方推荐方式) ──────────────────
async def build_and_run_agent():
    """通过 langchain-mcp-adapters 连接 MCP Server,构建并运行 LangGraph Agent。"""

    # 连接本地 MCP Server(stdio 方式,与 server.py 进程通信)
    async with MultiServerMCPClient(
        {
            "production_tools": {
                "command": "python",
                "args": ["mcp_server.py"],  # 启动 server 子进程
                "transport": "stdio",
            }
        }
    ) as mcp_client:
        # 获取所有 MCP 工具(转换为 LangChain 可识别格式)
        tools = mcp_client.get_tools()
        print("可用 MCP 工具:", [t.name for t in tools])

        # 初始化模型并绑定工具
        model = ChatAnthropic(model="claude-3-5-sonnet-latest", temperature=0.1)
        model_with_tools = model.bind_tools(tools)

        # ── 3. 定义 Agent 决策节点 ─────────────────────────────────
        def agent_node(state: AgentState):
            response = model_with_tools.invoke(state["messages"])
            return {"messages": [response]}

        # ── 4. 构建 LangGraph 状态机 ───────────────────────────────
        builder = StateGraph(AgentState)
        builder.add_node("agent", agent_node)
        builder.add_node("tools", ToolNode(tools))

        builder.set_entry_point("agent")

        # tools_condition:内置条件函数,自动判断是否有工具调用请求
        builder.add_conditional_edges(
            "agent",
            tools_condition,           # 有工具调用 → "tools",否则 → END
            {"tools": "tools", END: END},
        )
        builder.add_edge("tools", "agent")  # 工具执行完成后返回 Agent 决策

        graph = builder.compile()

        # ── 5. 运行 Agent ──────────────────────────────────────────
        initial_state: AgentState = {
            "messages": [
                ("user", "查询北京的天气,然后将天气信息写入报告,文件名:beijing_weather.txt")
            ],
            "approval_status": "pending",
            "tool_result": "",
        }

        final_state = await graph.ainvoke(initial_state)

        print("\n" + "=" * 50)
        print("流程执行完成,最终对话历史:")
        for msg in final_state["messages"]:
            print(f"  [{type(msg).__name__}] {getattr(msg, 'content', msg)}")


if __name__ == "__main__":
    asyncio.run(build_and_run_agent())

关键修正说明

  • 使用 langchain-mcp-adapterspip install langchain-mcp-adapters)替代不存在的 mcp.client.MCPClient
  • 使用内置 tools_condition 替代手写条件路由,避免直接解析 last_msg["tool_calls"](字段名在不同消息类型间存在差异);
  • 原代码缺少"无工具调用时结束流程"的边,会导致 Agent 无限循环,已通过 tools_condition 修正;
  • 使用 ainvoke 异步接口,与 MCP 异步客户端兼容。

六、核心突破:Human-in-the-Loop(HIL)生产级审计

对于文件写入、数据库操作、资金转账等敏感 MCP 工具,必须集成 LangGraph 的中断(interrupt)逻辑,实现"审核-批准/拒绝-反馈"的闭环。

# langgraph_agent_hil.py
import asyncio
from typing import Annotated, List
import operator

from langchain_anthropic import ChatAnthropic
from langchain_core.messages import AIMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from typing_extensions import TypedDict


class AgentState(TypedDict):
    messages: Annotated[List, operator.add]
    approval_status: str   # pending / approved / rejected
    tool_result: str


# ── 条件路由:判断是否需要人工审计 ────────────────────────────────
SENSITIVE_TOOLS = {"save_report", "delete_record", "write_db", "transfer_money"}


def route_after_agent(state: AgentState) -> str:
    """Agent 决策后的路由逻辑。"""
    last_msg = state["messages"][-1]

    # 非 AI 消息或无工具调用 → 结束
    if not isinstance(last_msg, AIMessage) or not last_msg.tool_calls:
        return END

    # 任一工具调用属于敏感工具 → 进入人工审计
    for tc in last_msg.tool_calls:
        if tc["name"] in SENSITIVE_TOOLS:
            return "human_review"

    # 非敏感工具 → 直接执行
    return "tools"


# ── 人工审核节点 ────────────────────────────────────────────────
def human_review_node(state: AgentState) -> dict:
    """人工审核节点:敏感工具调用前中断,等待人工审批。

    生产级场景:此处应挂起流程,通过消息队列/webhook 等待外部审批回调。
    本示例以控制台输入模拟人工审批。
    """
    last_msg = state["messages"][-1]
    tool_calls_info = [
        f"{tc['name']}({tc['args']})" for tc in last_msg.tool_calls
    ] if isinstance(last_msg, AIMessage) else []

    print("\n" + "=" * 50)
    print("【HIL 人工审计】敏感操作预警!")
    print(f"Agent 申请调用敏感工具:{', '.join(tool_calls_info)}")
    print("=" * 50)

    decision = input("请输入审批结果(y=批准,其他=拒绝并说明原因):").strip()

    if decision.lower() == "y":
        return {
            "approval_status": "approved",
            "messages": [("user", "人工审批通过,允许执行工具操作。")],
        }
    else:
        feedback = decision if decision else "未说明原因"
        return {
            "approval_status": "rejected",
            "messages": [("user", f"人工审批拒绝,原因:{feedback},请调整策略。")],
        }


# ── 审计后路由:根据审批结果决定执行还是重新决策 ────────────────
def route_after_review(state: AgentState) -> str:
    return "tools" if state["approval_status"] == "approved" else "agent"


# ── 构建带 HIL 的 LangGraph Agent ───────────────────────────────
async def build_hil_agent():
    async with MultiServerMCPClient(
        {
            "production_tools": {
                "command": "python",
                "args": ["mcp_server.py"],
                "transport": "stdio",
            }
        }
    ) as mcp_client:
        tools = mcp_client.get_tools()
        model = ChatAnthropic(model="claude-3-5-sonnet-latest", temperature=0.1)
        model_with_tools = model.bind_tools(tools)

        def agent_node(state: AgentState):
            response = model_with_tools.invoke(state["messages"])
            return {"messages": [response]}

        builder = StateGraph(AgentState)
        builder.add_node("agent", agent_node)
        builder.add_node("human_review", human_review_node)
        builder.add_node("tools", ToolNode(tools))

        builder.set_entry_point("agent")

        builder.add_conditional_edges(
            "agent",
            route_after_agent,
            {"human_review": "human_review", "tools": "tools", END: END},
        )
        builder.add_conditional_edges(
            "human_review",
            route_after_review,
            {"tools": "tools", "agent": "agent"},
        )
        builder.add_edge("tools", "agent")

        # 生产级:使用 PostgreSQL Checkpointer 实现持久化
        # from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
        # async with AsyncPostgresSaver.from_conn_string("postgresql://...") as saver:
        #     graph = builder.compile(checkpointer=saver, interrupt_before=["human_review"])
        #
        # 开发 / 测试:使用内存 Checkpointer
        checkpointer = MemorySaver()
        graph = builder.compile(
            checkpointer=checkpointer,
            interrupt_before=["human_review"],  # 在人工审核节点前中断
        )

        initial_state: AgentState = {
            "messages": [
                ("user", "查询北京的天气,然后将天气信息写入报告,文件名:beijing_weather.txt")
            ],
            "approval_status": "pending",
            "tool_result": "",
        }

        # 使用 thread_id 支持断点续跑
        config = {"configurable": {"thread_id": "session_001"}}
        final_state = await graph.ainvoke(initial_state, config=config)
        print("\n流程执行完成,最终状态:", final_state["approval_status"])


if __name__ == "__main__":
    asyncio.run(build_hil_agent())

关键修正说明

  • 原代码通过 last_msg["tool_calls"] 字典访问消息字段,实际 LangChain 消息对象应使用 last_msg.tool_calls 属性,已修正;
  • compile_with_checkpointer 不应在 build_langgraph_agent_with_hil 内部通过 asyncio.run() 嵌套调用——在已有事件循环的环境中会抛出 RuntimeError,已重构为单一异步函数;
  • 新增审计拒绝后返回 Agent 重新决策的路由,使拒绝流程形成完整闭环;
  • interrupt_before 配置在 compile() 时指定,而非运行时。

七、高阶架构:将 LangGraph Agent 逆向封装为 MCP 服务

将复杂 LangGraph Agent 封装成标准 MCP Server,使任何支持 MCP 协议的客户端(如 Claude Desktop、IDE 插件、其他框架的 Agent)都能直接调用。

# research_agent_server.py
import asyncio
from typing import Annotated, List
import operator

from fastmcp import FastMCP
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict


# ── 1. 定义复杂 LangGraph Agent(科研推理示例) ────────────────────
class ResearchAgentState(TypedDict):
    messages: Annotated[List, operator.add]
    research_result: str


def research_node(state: ResearchAgentState) -> dict:
    """多步科研推理节点(生产级可集成文献检索、数据计算等工具)。"""
    # 取最后一条用户消息
    last = state["messages"][-1]
    query = last[1] if isinstance(last, tuple) else getattr(last, "content", str(last))
    result = f"针对查询「{query}」的科研推理结果:(多步分析内容占位)"
    return {
        "research_result": result,
        "messages": [("assistant", result)],
    }


def build_research_graph():
    builder = StateGraph(ResearchAgentState)
    builder.add_node("research", research_node)
    builder.set_entry_point("research")
    builder.add_edge("research", END)
    return builder.compile()


compiled_graph = build_research_graph()


# ── 2. 将 LangGraph Agent 封装为 MCP 工具 ─────────────────────────
mcp = FastMCP("Complex_Research_Agent_Server", version="1.0.0")


@mcp.tool()
def complex_research_agent(query: str) -> str:
    """调用内部多步科研推理 LangGraph Agent。

    Args:
        query: 科研查询关键词(如"LangGraph MCP 集成最佳实践")

    Returns:
        str: 多步推理后的科研结果
    """
    initial_state: ResearchAgentState = {
        "messages": [("user", query)],
        "research_result": "",
    }
    # 注意:如果内部 Agent 使用异步接口,需改为 asyncio.run(compiled_graph.ainvoke(...))
    final_state = compiled_graph.invoke(initial_state)
    return final_state["research_result"]


if __name__ == "__main__":
    mcp.run()

关键修正说明

  • 原代码 research_node 直接用 state["messages"][-1][1] 索引消息,若消息为 LangChain 消息对象而非 tuple,会抛出 TypeError,已做类型兼容处理;
  • 高阶封装中,若内部 LangGraph Agent 含异步工具,compiled_graph.invoke() 需替换为 asyncio.run(compiled_graph.ainvoke(...)),已在注释中说明。

八、生产环境最佳实践:监控、持久化与安全

8.1 状态持久化

严禁在生产环境仅依赖内存存储状态,必须配置 Checkpointer:

# 生产级:PostgreSQL Checkpointer
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

async with AsyncPostgresSaver.from_conn_string(
    "postgresql://user:pass@host:5432/langgraph_db"
) as saver:
    graph = builder.compile(checkpointer=saver)

# 开发 / 测试:内存 Checkpointer(不跨进程持久化)
from langgraph.checkpoint.memory import MemorySaver
graph = builder.compile(checkpointer=MemorySaver())

持久化的核心价值

  • 应对 HIL 模式下的长时间挂起(人工审批可能耗时数小时);
  • 支持"时间旅行"调试:可回溯到任意历史状态;
  • 多节点部署时共享状态,适配高并发场景。

8.2 多维性能监控(LangSmith 配置)

import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "mcp-langgraph-production"

工业级监控阈值

指标 安全阈值 告警阈值
单次工具调用耗时 < 10s > 30s
单次请求工具调用次数 < 10 次 > 20 次(防死循环)
工具调用错误率 < 0.5% > 1%
Checkpoint 缓存命中率 > 80% < 60%

8.3 安全合规红线

安全项 要求
路径沙箱化 文件操作工具必须实施严格路径过滤,同时拒绝含分隔符的文件名
强制 HIL 审计 delete_recordwrite_dbtransfer_money 等敏感工具禁止无人工审批自动执行
身份验证 MCP Server 必须配置 API Key 或 OAuth 2.0,防止未授权访问
日志审计 所有工具调用、审批记录、状态更新必须留存 ≥ 30 天

九、总结

MCP 与 LangGraph 的结合,标志着 AI Agent 开发从"作坊式原型"向"工业化生产"的跨越。

核心收获

  • MCP 是基于 JSON-RPC 2.0 的开放标准,终结工具集成的碎片化,实现跨框架、跨模型复用;
  • FastMCP 可快速开发生产级 MCP Server,内置安全审计、工具注册等功能;
  • 官方推荐通过 langchain-mcp-adapters 将 MCP 工具接入 LangGraph,使用异步接口;
  • HIL 审计结合 Checkpointer 持久化,构建完整的"安全-可靠-可追溯"生产链路;
  • 高阶场景中,可将 LangGraph Agent 逆向封装为 MCP 服务,实现"Agent 即工具"的复用模式。

💡 如果本文对你有帮助,欢迎点赞、收藏、评论!如果在 MCP 实战中遇到跨平台连接、状态同步问题,欢迎在评论区交流~

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐