LangGraph 统一工具集成标准——模型上下文协议 (MCP) 工业级实战
本文介绍了模型上下文协议(MCP)作为标准化工具集成的解决方案,解决Agent开发中工具复用性差、跨框架迁移难等问题。MCP基于JSON-RPC 2.0协议,实现工具与框架、模型的解耦,支持“一次编写,到处运行”。通过LangGraph与MCP的协同架构,可构建工业级Agent工作流。文章对比了传统Function Calling与MCP的差异,并演示了使用FastMCP开发本地工具服务器的实战案

🔥 系列进度:LangGraph系列已更新5篇核心内容,从Checkpointer状态持久化到人机协作(HIL),累计帮助上千名开发者落地生产级Agent!今天聚焦Agent工具集成的核心标准——模型上下文协议(MCP),实现"一次编写,到处运行",让工具复用、跨框架迁移不再内耗。
一、引言:终结Agent工具集成的"战国时代"
做过大模型Agent开发的同学,几乎都被工具集成的"内耗"折磨过:
- 切换框架必返工:用LangChain写的工具,切换到AutoGen、CrewAI就必须重写调用逻辑;
- 模型适配成本高:OpenAI、Anthropic等不同供应商的Function Calling格式不统一;
- 工具复用性极差:同一个工具(如天气查询、文件写入),在不同项目中需要重复编写;
- 安全性难以保障:权限控制、资源审计全靠业务层硬编码,易出现路径穿越、越权访问等安全隐患。
模型上下文协议(Model Context Protocol, MCP) 正是为了终结这种混乱——它是基于 JSON-RPC 2.0 的开放标准,实现了工具定义与框架、模型的解耦,让工具"一次编写,到处运行"。
核心结论:MCP 不是简单的"接口规范",而是 Agentic Workflow 迈向工业化标准化的基石,是 2025 年每一位大模型开发者必须掌握的核心协议。
二、架构深度解析:MCP 与 LangGraph 的强强联手
LangGraph 与 MCP 的结合,是"复杂工作流"与"标准化工具"的完美搭配——LangGraph 负责搭建有状态、可循环的复杂工作流,MCP 负责提供标准化的工具接入,二者协同构建出工业级 Agent 的核心架构。
2.1 MCP 与 LangGraph 架构协同图
说明:原图中存在两处重复节点 ID(
B和F),Mermaid 不允许同一节点 ID 在subgraph中既作为源又作为目标出现重复定义,已修正为规范写法。
2.2 传统 Function Calling 与 MCP 标准对比
| 特性 | 传统 Function Calling | MCP 标准(基于 JSON-RPC 2.0) |
|---|---|---|
| 扩展性 | 强耦合于特定模型 API,切换模型/框架需重写 | 插件化架构,与框架、模型解耦,支持跨生态复用 |
| 复用性 | 工具代码需为不同项目重复编写 | “一次编写,到处运行”,标准 Server 接入,跨项目复用 |
| 安全性 | 权限控制通常在业务层硬编码,易遗漏 | 协议层原生支持资源审计、路径过滤与标准鉴权 |
| 控制流 | 主要是线性触发,状态传递隐式 | 深度集成图状态机,支持显式共享状态管理 |
| 适用场景 | 简单工具调用、快速 MVP 开发 | 工业级 Agent、多框架协同、高安全要求场景 |
2.3 MCP 通信流交互图
MCP Client(LangGraph Agent)与 MCP Server 之间的交互遵循严格的 JSON-RPC 2.0 规范,分为 4 个阶段:
三、环境准备:工业级实战基座
⚠️ 版本说明:以下版本号为写作时的参考版本,实际安装时建议查阅各库最新稳定版。
langgraph当前正式版已达0.2.x,fastmcp已达2.x,请以 PyPI 实际发布为准。
# 1. 安装 LangGraph 图编排引擎
pip install "langgraph>=0.2.0"
# 2. 安装 MCP 协议核心库与 FastMCP 开发框架
pip install "mcp>=1.0.0" "fastmcp>=2.0.0"
# 3. 安装 LangChain 集成库(按需选择模型提供商)
pip install "langchain-anthropic>=0.2.0" "langchain-openai>=0.2.0"
# 4. 可选:安装 LangSmith(全链路追踪,生产级监控必备)
pip install "langsmith>=0.1.100"
# 5. 可选:安装 PostgreSQL 依赖(Checkpointer 持久化)
pip install psycopg2-binary
# 验证安装
python -c "import langgraph, mcp, fastmcp; print('依赖安装成功')"
四、实战演示 A:用 FastMCP 开发本地工具服务器(生产级)
本节开发一个具备"实时天气查询"和"本地报告写入"功能的工具服务器,重点实现路径过滤安全审计(防止路径穿越攻击)。
# mcp_server.py
from fastmcp import FastMCP
import os
# 1. 初始化工业级 MCP 工具服务器
mcp = FastMCP("Production_Tool_Server", version="1.0.0")
# 2. 注册工具1:实时天气查询
@mcp.tool()
def get_weather(location: str) -> str:
"""获取指定地点的实时天气信息。
Args:
location: 地点名称(如"北京"、"上海"),支持国内主要城市
Returns:
str: 结构化的天气信息,包含天气状况、气温、体感
"""
try:
# 模拟不同城市的天气返回(生产级可替换为真实天气 API)
city_weather = {
"北京": "晴转多云,气温 22°C,体感舒适,东北风2级",
"上海": "阴有小雨,气温 19°C,体感微凉,东南风3级",
"广州": "晴,气温 28°C,体感炎热,南风1级",
}
return city_weather.get(location, f"暂未获取到 {location} 的天气信息,请检查地点名称")
except Exception as e:
return f"天气查询失败:{e}"
# 3. 注册工具2:本地报告写入(含路径过滤安全审计)
@mcp.tool()
def save_report(filename: str, content: str) -> str:
"""将 Agent 生成的报告安全地写入本地指定目录。
Args:
filename: 报告文件名(需包含后缀,如"report_2025.txt"),禁止包含路径分隔符
content: 报告内容(字符串格式)
Returns:
str: 执行结果(成功提示或错误信息)
"""
# ── 安全审计:防止路径穿越攻击 ──────────────────────────────
# 拒绝文件名中包含路径分隔符,例如 "../../etc/passwd"
if os.sep in filename or (os.altsep and os.altsep in filename):
return "Error: 文件名不得包含路径分隔符,拒绝写入。"
safe_dir = os.path.abspath("./agent_outputs")
os.makedirs(safe_dir, exist_ok=True)
target_path = os.path.abspath(os.path.join(safe_dir, filename))
# 二次校验:确保最终路径在安全目录内
if not target_path.startswith(safe_dir + os.sep):
return "Error: 越权访问,禁止写入安全目录以外的路径。"
# ─────────────────────────────────────────────────────────────
try:
with open(target_path, "w", encoding="utf-8") as f:
f.write(content)
return f"报告已成功持久化至: {target_path}"
except OSError as e:
return f"报告写入失败:{e}"
if __name__ == "__main__":
# 启动 MCP Server(默认 stdio 模式,HTTP 模式请参考 FastMCP 文档)
mcp.run()
避坑提示:
- 原代码中
target_path.startswith(safe_dir)存在路径前缀混淆漏洞——若safe_dir为/app/outputs,攻击者传入/app/outputs_evil/x也会通过校验。修正方式:将比较改为startswith(safe_dir + os.sep),同时在入口处拒绝含路径分隔符的文件名。fastmcp2.x 默认通信方式为stdio,不再直接支持mcp.run(port=8000)的 HTTP 启动方式;若需 HTTP,请参考 FastMCP 文档配置transport="http"。
五、实战演示 B:将 MCP 服务接入 LangGraph 客户端(核心实战)
⚠️ 重要说明:原代码中使用的
from mcp.client import MCPClient并不存在于 mcp 官方 SDK。mcpPython SDK 的客户端 API 为异步接口,且 LangGraph 官方提供了langchain-mcp-adapters作为标准集成方案。以下代码已按官方方式重写。
# langgraph_agent.py
import asyncio
from typing import Annotated, List
import operator
from langchain_anthropic import ChatAnthropic
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode, tools_condition
from typing_extensions import TypedDict
# ── 1. 定义有状态的数据结构 ────────────────────────────────────────
class AgentState(TypedDict):
messages: Annotated[List, operator.add] # 对话历史 / 工具调用记录
approval_status: str # pending / approved / rejected
tool_result: str # 工具调用结果缓存(可选辅助字段)
# ── 2. 构建 LangGraph Agent(异步,官方推荐方式) ──────────────────
async def build_and_run_agent():
"""通过 langchain-mcp-adapters 连接 MCP Server,构建并运行 LangGraph Agent。"""
# 连接本地 MCP Server(stdio 方式,与 server.py 进程通信)
async with MultiServerMCPClient(
{
"production_tools": {
"command": "python",
"args": ["mcp_server.py"], # 启动 server 子进程
"transport": "stdio",
}
}
) as mcp_client:
# 获取所有 MCP 工具(转换为 LangChain 可识别格式)
tools = mcp_client.get_tools()
print("可用 MCP 工具:", [t.name for t in tools])
# 初始化模型并绑定工具
model = ChatAnthropic(model="claude-3-5-sonnet-latest", temperature=0.1)
model_with_tools = model.bind_tools(tools)
# ── 3. 定义 Agent 决策节点 ─────────────────────────────────
def agent_node(state: AgentState):
response = model_with_tools.invoke(state["messages"])
return {"messages": [response]}
# ── 4. 构建 LangGraph 状态机 ───────────────────────────────
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_node("tools", ToolNode(tools))
builder.set_entry_point("agent")
# tools_condition:内置条件函数,自动判断是否有工具调用请求
builder.add_conditional_edges(
"agent",
tools_condition, # 有工具调用 → "tools",否则 → END
{"tools": "tools", END: END},
)
builder.add_edge("tools", "agent") # 工具执行完成后返回 Agent 决策
graph = builder.compile()
# ── 5. 运行 Agent ──────────────────────────────────────────
initial_state: AgentState = {
"messages": [
("user", "查询北京的天气,然后将天气信息写入报告,文件名:beijing_weather.txt")
],
"approval_status": "pending",
"tool_result": "",
}
final_state = await graph.ainvoke(initial_state)
print("\n" + "=" * 50)
print("流程执行完成,最终对话历史:")
for msg in final_state["messages"]:
print(f" [{type(msg).__name__}] {getattr(msg, 'content', msg)}")
if __name__ == "__main__":
asyncio.run(build_and_run_agent())
关键修正说明:
- 使用
langchain-mcp-adapters(pip install langchain-mcp-adapters)替代不存在的mcp.client.MCPClient;- 使用内置
tools_condition替代手写条件路由,避免直接解析last_msg["tool_calls"](字段名在不同消息类型间存在差异);- 原代码缺少"无工具调用时结束流程"的边,会导致 Agent 无限循环,已通过
tools_condition修正;- 使用
ainvoke异步接口,与 MCP 异步客户端兼容。
六、核心突破:Human-in-the-Loop(HIL)生产级审计
对于文件写入、数据库操作、资金转账等敏感 MCP 工具,必须集成 LangGraph 的中断(interrupt)逻辑,实现"审核-批准/拒绝-反馈"的闭环。
# langgraph_agent_hil.py
import asyncio
from typing import Annotated, List
import operator
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import AIMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from typing_extensions import TypedDict
class AgentState(TypedDict):
messages: Annotated[List, operator.add]
approval_status: str # pending / approved / rejected
tool_result: str
# ── 条件路由:判断是否需要人工审计 ────────────────────────────────
SENSITIVE_TOOLS = {"save_report", "delete_record", "write_db", "transfer_money"}
def route_after_agent(state: AgentState) -> str:
"""Agent 决策后的路由逻辑。"""
last_msg = state["messages"][-1]
# 非 AI 消息或无工具调用 → 结束
if not isinstance(last_msg, AIMessage) or not last_msg.tool_calls:
return END
# 任一工具调用属于敏感工具 → 进入人工审计
for tc in last_msg.tool_calls:
if tc["name"] in SENSITIVE_TOOLS:
return "human_review"
# 非敏感工具 → 直接执行
return "tools"
# ── 人工审核节点 ────────────────────────────────────────────────
def human_review_node(state: AgentState) -> dict:
"""人工审核节点:敏感工具调用前中断,等待人工审批。
生产级场景:此处应挂起流程,通过消息队列/webhook 等待外部审批回调。
本示例以控制台输入模拟人工审批。
"""
last_msg = state["messages"][-1]
tool_calls_info = [
f"{tc['name']}({tc['args']})" for tc in last_msg.tool_calls
] if isinstance(last_msg, AIMessage) else []
print("\n" + "=" * 50)
print("【HIL 人工审计】敏感操作预警!")
print(f"Agent 申请调用敏感工具:{', '.join(tool_calls_info)}")
print("=" * 50)
decision = input("请输入审批结果(y=批准,其他=拒绝并说明原因):").strip()
if decision.lower() == "y":
return {
"approval_status": "approved",
"messages": [("user", "人工审批通过,允许执行工具操作。")],
}
else:
feedback = decision if decision else "未说明原因"
return {
"approval_status": "rejected",
"messages": [("user", f"人工审批拒绝,原因:{feedback},请调整策略。")],
}
# ── 审计后路由:根据审批结果决定执行还是重新决策 ────────────────
def route_after_review(state: AgentState) -> str:
return "tools" if state["approval_status"] == "approved" else "agent"
# ── 构建带 HIL 的 LangGraph Agent ───────────────────────────────
async def build_hil_agent():
async with MultiServerMCPClient(
{
"production_tools": {
"command": "python",
"args": ["mcp_server.py"],
"transport": "stdio",
}
}
) as mcp_client:
tools = mcp_client.get_tools()
model = ChatAnthropic(model="claude-3-5-sonnet-latest", temperature=0.1)
model_with_tools = model.bind_tools(tools)
def agent_node(state: AgentState):
response = model_with_tools.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_node("human_review", human_review_node)
builder.add_node("tools", ToolNode(tools))
builder.set_entry_point("agent")
builder.add_conditional_edges(
"agent",
route_after_agent,
{"human_review": "human_review", "tools": "tools", END: END},
)
builder.add_conditional_edges(
"human_review",
route_after_review,
{"tools": "tools", "agent": "agent"},
)
builder.add_edge("tools", "agent")
# 生产级:使用 PostgreSQL Checkpointer 实现持久化
# from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
# async with AsyncPostgresSaver.from_conn_string("postgresql://...") as saver:
# graph = builder.compile(checkpointer=saver, interrupt_before=["human_review"])
#
# 开发 / 测试:使用内存 Checkpointer
checkpointer = MemorySaver()
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"], # 在人工审核节点前中断
)
initial_state: AgentState = {
"messages": [
("user", "查询北京的天气,然后将天气信息写入报告,文件名:beijing_weather.txt")
],
"approval_status": "pending",
"tool_result": "",
}
# 使用 thread_id 支持断点续跑
config = {"configurable": {"thread_id": "session_001"}}
final_state = await graph.ainvoke(initial_state, config=config)
print("\n流程执行完成,最终状态:", final_state["approval_status"])
if __name__ == "__main__":
asyncio.run(build_hil_agent())
关键修正说明:
- 原代码通过
last_msg["tool_calls"]字典访问消息字段,实际 LangChain 消息对象应使用last_msg.tool_calls属性,已修正;compile_with_checkpointer不应在build_langgraph_agent_with_hil内部通过asyncio.run()嵌套调用——在已有事件循环的环境中会抛出RuntimeError,已重构为单一异步函数;- 新增审计拒绝后返回 Agent 重新决策的路由,使拒绝流程形成完整闭环;
interrupt_before配置在compile()时指定,而非运行时。
七、高阶架构:将 LangGraph Agent 逆向封装为 MCP 服务
将复杂 LangGraph Agent 封装成标准 MCP Server,使任何支持 MCP 协议的客户端(如 Claude Desktop、IDE 插件、其他框架的 Agent)都能直接调用。
# research_agent_server.py
import asyncio
from typing import Annotated, List
import operator
from fastmcp import FastMCP
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# ── 1. 定义复杂 LangGraph Agent(科研推理示例) ────────────────────
class ResearchAgentState(TypedDict):
messages: Annotated[List, operator.add]
research_result: str
def research_node(state: ResearchAgentState) -> dict:
"""多步科研推理节点(生产级可集成文献检索、数据计算等工具)。"""
# 取最后一条用户消息
last = state["messages"][-1]
query = last[1] if isinstance(last, tuple) else getattr(last, "content", str(last))
result = f"针对查询「{query}」的科研推理结果:(多步分析内容占位)"
return {
"research_result": result,
"messages": [("assistant", result)],
}
def build_research_graph():
builder = StateGraph(ResearchAgentState)
builder.add_node("research", research_node)
builder.set_entry_point("research")
builder.add_edge("research", END)
return builder.compile()
compiled_graph = build_research_graph()
# ── 2. 将 LangGraph Agent 封装为 MCP 工具 ─────────────────────────
mcp = FastMCP("Complex_Research_Agent_Server", version="1.0.0")
@mcp.tool()
def complex_research_agent(query: str) -> str:
"""调用内部多步科研推理 LangGraph Agent。
Args:
query: 科研查询关键词(如"LangGraph MCP 集成最佳实践")
Returns:
str: 多步推理后的科研结果
"""
initial_state: ResearchAgentState = {
"messages": [("user", query)],
"research_result": "",
}
# 注意:如果内部 Agent 使用异步接口,需改为 asyncio.run(compiled_graph.ainvoke(...))
final_state = compiled_graph.invoke(initial_state)
return final_state["research_result"]
if __name__ == "__main__":
mcp.run()
关键修正说明:
- 原代码
research_node直接用state["messages"][-1][1]索引消息,若消息为 LangChain 消息对象而非 tuple,会抛出TypeError,已做类型兼容处理;- 高阶封装中,若内部 LangGraph Agent 含异步工具,
compiled_graph.invoke()需替换为asyncio.run(compiled_graph.ainvoke(...)),已在注释中说明。
八、生产环境最佳实践:监控、持久化与安全
8.1 状态持久化
严禁在生产环境仅依赖内存存储状态,必须配置 Checkpointer:
# 生产级:PostgreSQL Checkpointer
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async with AsyncPostgresSaver.from_conn_string(
"postgresql://user:pass@host:5432/langgraph_db"
) as saver:
graph = builder.compile(checkpointer=saver)
# 开发 / 测试:内存 Checkpointer(不跨进程持久化)
from langgraph.checkpoint.memory import MemorySaver
graph = builder.compile(checkpointer=MemorySaver())
持久化的核心价值:
- 应对 HIL 模式下的长时间挂起(人工审批可能耗时数小时);
- 支持"时间旅行"调试:可回溯到任意历史状态;
- 多节点部署时共享状态,适配高并发场景。
8.2 多维性能监控(LangSmith 配置)
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "mcp-langgraph-production"
工业级监控阈值:
| 指标 | 安全阈值 | 告警阈值 |
|---|---|---|
| 单次工具调用耗时 | < 10s | > 30s |
| 单次请求工具调用次数 | < 10 次 | > 20 次(防死循环) |
| 工具调用错误率 | < 0.5% | > 1% |
| Checkpoint 缓存命中率 | > 80% | < 60% |
8.3 安全合规红线
| 安全项 | 要求 |
|---|---|
| 路径沙箱化 | 文件操作工具必须实施严格路径过滤,同时拒绝含分隔符的文件名 |
| 强制 HIL 审计 | delete_record、write_db、transfer_money 等敏感工具禁止无人工审批自动执行 |
| 身份验证 | MCP Server 必须配置 API Key 或 OAuth 2.0,防止未授权访问 |
| 日志审计 | 所有工具调用、审批记录、状态更新必须留存 ≥ 30 天 |
九、总结
MCP 与 LangGraph 的结合,标志着 AI Agent 开发从"作坊式原型"向"工业化生产"的跨越。
核心收获:
- MCP 是基于 JSON-RPC 2.0 的开放标准,终结工具集成的碎片化,实现跨框架、跨模型复用;
- FastMCP 可快速开发生产级 MCP Server,内置安全审计、工具注册等功能;
- 官方推荐通过
langchain-mcp-adapters将 MCP 工具接入 LangGraph,使用异步接口; - HIL 审计结合 Checkpointer 持久化,构建完整的"安全-可靠-可追溯"生产链路;
- 高阶场景中,可将 LangGraph Agent 逆向封装为 MCP 服务,实现"Agent 即工具"的复用模式。
💡 如果本文对你有帮助,欢迎点赞、收藏、评论!如果在 MCP 实战中遇到跨平台连接、状态同步问题,欢迎在评论区交流~
更多推荐
所有评论(0)