【LangGraph】langgraph.checkpoint.mysql.pymysql.PyMySQLSaver 类:将状态图的检查点保存到 MySQL 数据库
langgraph.checkpoint.mysql.pymysql.PyMySQLSaver 是 LangGraph 的检查点保存器,基于 MySQL 数据库实现持久化存储,适合生产环境。它继承自 BaseCheckpointSaver,支持同步操作,通过 setup() 初始化表结构,提供线程隔离和序列化支持。核心功能包括持久化检查点、状态恢复,适用于多轮对话和复杂工作流。与 InMemory
langgraph.checkpoint.mysql.pymysql.PyMySQLSaver 是 LangGraph 库中 langgraph.checkpoint.mysql.pymysql 模块的一个检查点保存器类,继承自 BaseCheckpointSaver,用于将状态图的检查点保存到 MySQL 数据库中。LangGraph 是 LangChain 生态的扩展框架,专注于构建复杂、有状态的 AI 系统,通过状态图(StateGraph)管理节点和边,支持动态路由、循环和状态管理。检查点(Checkpoint)是 LangGraph 的核心功能,用于在图执行的每一步保存状态,支持状态持久化、恢复和多轮交互。PyMySQLSaver 使用 PyMySQL 库实现同步数据库操作,适合需要持久化存储的同步编程环境。
1. 定义与功能
1.1 类定义
PyMySQLSaver 是 BaseCheckpointSaver 的子类,定义如下:
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
class PyMySQLSaver(BaseCheckpointSaver):
"""
使用 PyMySQL 连接 MySQL 数据库的检查点保存器(同步操作)。
参数:
connection: PyMySQL 连接对象,需设置 autocommit=True。
serde: 可选的序列化器,默认为 JsonPlusSerializer。
示例:
import pymysql
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
connection = pymysql.connect(host='localhost', user='user', password='password', database='dbname', autocommit=True)
checkpointer = PyMySQLSaver(connection=connection)
checkpointer.setup()
"""
- 继承:继承自
BaseCheckpointSaver,实现其抽象方法,提供 MySQL 存储逻辑。 - 依赖:使用 PyMySQL 库(纯 Python 的 MySQL 客户端)与 MySQL 数据库交互。
- 作用:将检查点数据持久化存储到 MySQL 数据库,支持同步操作,适合生产环境中的同步编程场景。
1.2 核心功能
- 持久化存储:将检查点保存到 MySQL 数据库,数据在应用重启后仍可恢复。
- 线程隔离:通过
thread_id管理多线程,确保不同会话的状态独立。 - 同步操作:提供同步方法(如
get、put),适合同步编程环境。 - 数据库初始化:通过
setup()方法创建必要的数据库表(如checkpoints)。 - 连接管理:支持传入 PyMySQL 连接对象,需配置
autocommit=True。 - 序列化支持:通过
serde参数支持自定义序列化,默认使用JsonPlusSerializer,兼容复杂数据类型。
1.3 使用场景
- 生产环境:需要持久化存储的 AI 应用,如聊天机器人、自动化工作流。
- 多轮对话:保存对话历史,支持上下文连续性。
- 状态恢复:从中断点恢复任务,确保工作流连续性。
- 同步编程:在不需要异步优化的场景中,适合同步操作环境。
- MySQL 偏好:在已有 MySQL 基础设施的系统中,优先选择。
2. 参数与初始化
2.1 初始化参数
connection:- 类型:
pymysql.connections.Connection - 描述:PyMySQL 连接对象,必须设置
autocommit=True以确保事务一致性。 - 示例:
import pymysql connection = pymysql.connect(host='localhost', user='user', password='password', database='dbname', autocommit=True)
- 类型:
serde:- 类型:
Optional[SerializerProtocol] - 默认值:
None(使用JsonPlusSerializer) - 描述:序列化器,处理检查点数据的序列化和反序列化,支持 LangChain 和 LangGraph 原生类型。
- 类型:
conn_string(通过from_conn_string类方法):- 类型:
str - 描述:MySQL 连接字符串(如
"mysql://user:password@localhost:3306/dbname")。
- 类型:
2.2 初始化方法
- 直接初始化:
import pymysql from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver connection = pymysql.connect(host='localhost', user='user', password='password', database='dbname', autocommit=True) checkpointer = PyMySQLSaver(connection=connection) - 使用连接字符串:
checkpointer = PyMySQLSaver.from_conn_string("mysql://user:password@localhost:3306/dbname")
2.3 数据库初始化
- 方法:
setup()- 描述:创建必要的数据库表(如
checkpoints),首次使用时必须调用。 - 调用:
checkpointer.setup() - 注意:
- 确保数据库用户有创建表的权限。
- 表结构通常包括
thread_id、checkpoint_data等字段。
- 描述:创建必要的数据库表(如
3. 使用方法
3.1 安装与环境准备
- 安装依赖:
pip install langgraph-checkpoint-mysql[pymysql]langgraph-checkpoint-mysql:提供PyMySQLSaver。pymysql:MySQL 客户端库,支持同步操作。
- 数据库配置:
- 确保 MySQL 数据库运行,推荐版本:
- MySQL >= 8.0.19
- MariaDB >= 10.7.1
- 配置连接信息(主机、端口、数据库名、用户名、密码)。
- 确保 MySQL 数据库运行,推荐版本:
- 连接设置:
- 创建 PyMySQL 连接时,设置
autocommit=True:import pymysql connection = pymysql.connect(host='localhost', user='user', password='password', database='dbname', autocommit=True)
- 创建 PyMySQL 连接时,设置
3.2 集成到状态图
- 创建状态图:
from langgraph.graph import StateGraph builder = StateGraph(int) builder.add_node("add_one", lambda x: x + 1) builder.set_entry_point("add_one") builder.set_finish_point("add_one") - 编译图:
checkpointer = PyMySQLSaver.from_conn_string("mysql://user:password@localhost:3306/dbname") checkpointer.setup() # 创建表 graph = builder.compile(checkpointer=checkpointer) - 运行图:
config = {"configurable": {"thread_id": "thread-1"}} result = graph.invoke(1, config=config) print(result) # 输出: 2
3.3 操作检查点
- 获取检查点:
checkpoint = checkpointer.get_tuple(config) print(checkpoint) # 输出: CheckpointTuple(...) - 列出检查点:
checkpoints = list(checkpointer.list(config)) for cp in checkpoints: print(cp) - 保存检查点:由状态图自动调用
put,无需手动操作。
3.4 完整示例:多轮对话
以下示例展示如何使用 PyMySQLSaver 实现多轮对话:
from typing import List
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
from langchain_core.messages import HumanMessage
import pymysql
# 定义状态
class State(TypedDict):
messages: List[dict]
# 定义节点
def agent_node(state: State) -> State:
last_message = state["messages"][-1]["content"]
return {"messages": state["messages"] + [{"role": "assistant", "content": "Echo: " + last_message}]}
# 创建数据库连接
connection = pymysql.connect(
host='localhost',
user='user',
password='password',
database='dbname',
autocommit=True
)
# 初始化 PyMySQLSaver
checkpointer = PyMySQLSaver(connection=connection)
checkpointer.setup()
# 构建状态图
builder = StateGraph(State)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.set_finish_point("agent")
# 编译图
graph = builder.compile(checkpointer=checkpointer)
# 运行多轮对话
config = {"configurable": {"thread_id": "thread-1"}}
result1 = graph.invoke({"messages": [{"role": "user", "content": "Hello"}]}, config=config)
print(result1["messages"][-1]["content"]) # 输出: Echo: Hello
result2 = graph.invoke({"messages": [{"role": "user", "content": "How are you?"}]}, config=config)
print(result2["messages"][-1]["content"]) # 输出: Echo: How are you?
# 关闭连接
connection.close()
解析
- 状态:
State包含消息列表,模拟对话历史。 - 检查点:
PyMySQLSaver将状态保存到 MySQL 数据库,thread_id确保连续性。 - 结果:对话历史持久化存储,支持跨会话访问。
3.5 异步替代
- 限制:
PyMySQLSaver仅支持同步操作。 - 替代方案:
- 使用
AIOMySQLSaver或AsyncMySaver(需安装aiomysql或asyncmy)。 - 示例:
from langgraph.checkpoint.mysql.aiomysql import AIOMySQLSaver checkpointer = AIOMySQLSaver.from_conn_string("mysql://user:password@localhost:3306/dbname")
- 使用
4. 实现原理
4.1 数据库存储
- 表结构:
PostgresSaver创建checkpoints表,典型字段包括:thread_id:会话标识。checkpoint_data:序列化的检查点数据(JSON 或 Pickle 格式)。timestamp:记录保存时间。
- SQL 操作:通过
psycopg执行插入、查询和更新操作,事务确保数据一致性。
4.2 线程隔离
- 使用
thread_id区分会话,存储在数据库中,防止状态冲突。 checkpoint_ns支持命名空间管理。
4.3 序列化
- 默认使用
JsonPlusSerializer,支持 LangChain 和 LangGraph 原生类型。 - 可自定义
serde,如PickleCheckpointSerializer,但需注意安全。
4.4 同步操作
- 所有方法均为同步,适合同步编程环境。
- 使用 PyMySQL 的同步接口,确保操作顺序。
5. 适用场景与限制
5.1 适用场景
- 生产环境:需要持久化存储的 AI 应用,如聊天机器人、自动化工作流。
- 多轮对话:保存对话历史,支持上下文连续性。
- 状态恢复:从中断点恢复任务。
- 同步编程:在同步编程环境中使用。
5.2 限制
- 数据库依赖:需要 MySQL 数据库,增加部署复杂性。
- 初始化要求:首次使用需调用
setup()创建表。 - 性能:依赖数据库配置,未优化可能导致延迟。
- 版本要求:MySQL >= 8.0.19 或 MariaDB >= 10.7.1。
- 异步支持:仅同步,异步需求需用
AIOMySQLSaver或AsyncMySaver.
6. 对比其他检查点保存器
| 特性 | PyMySQLSaver | InMemorySaver | PostgresSaver |
|---|---|---|---|
| 存储位置 | MySQL 数据库 | 内存 | PostgreSQL 数据库 |
| 适用场景 | 生产环境、同步操作 | 调试、测试、短期实验 | 生产环境、长期运行 |
| 持久化 | 是 | 否(重启后丢失) | 是 |
| 安装要求 | 需 langgraph-checkpoint-mysql[pymysql] |
无需额外安装 | 需 langgraph-checkpoint-postgres |
| 异步支持 | 否(同步) | 是 | 是 |
选择建议:
- 开发阶段:使用
InMemorySaver,快速迭代。 - 生产环境:根据数据库偏好选择
PyMySQLSaver(MySQL)或PostgresSaver(PostgreSQL)。
7. 注意事项
- 数据库配置:
- 确保 MySQL 版本符合要求(>= 8.0.19 或 MariaDB >= 10.7.1)。
- 配置连接时设置
autocommit=True。
- 初始化:
- 调用
setup()创建表,确保用户有权限。
- 调用
- 序列化安全:
- 默认
JsonPlusSerializer安全。 - 使用
PickleCheckpointSerializer时,仅反序列化可信数据。
- 默认
- 性能优化:
- 使用连接池(如
pymysqlpool)优化连接管理。 - 优化数据库索引,加速查询。
- 使用连接池(如
- 调试:
- 启用
debug=True查看日志。 - 使用 SQL 工具检查
checkpoints表。
- 启用
- 版本兼容性:
- 确保
langgraph-checkpoint-mysql与 LangGraph 版本匹配(建议 0.2+)。 - 参考 LangGraph 文档 和 langgraph-checkpoint-mysql GitHub。
- 确保
8. 学习建议
- 基础知识:掌握 LangGraph 的状态图、检查点和 MySQL 基础。
- 文档:阅读 LangGraph 文档 和 langgraph-checkpoint-mysql GitHub。
- 实践:
- 搭建本地 MySQL 数据库,测试简单状态图。
- 实现多轮对话,验证持久化效果。
- 社区:加入 LangChain Discord,查看 GitHub 示例。
- 调试:使用日志和 SQL 工具分析检查点数据。
9. 总结
langgraph.checkpoint.mysql.pymysql.PyMySQLSaver 是 LangGraph 的检查点保存器,基于 MySQL 数据库实现持久化存储,适合生产环境。它继承自 BaseCheckpointSaver,支持同步操作,通过 setup() 初始化表结构,提供线程隔离和序列化支持。核心功能包括持久化检查点、状态恢复,适用于多轮对话和复杂工作流。与 InMemorySaver 相比,提供持久化优势;与 PostgresSaver 类似,适用于 MySQL 环境。通过示例和实践,开发者可以快速掌握其用法,构建可靠的 AI 应用。
关键引用
更多推荐
所有评论(0)