langgraph.checkpoint.mysql.pymysql.PyMySQLSaver 是 LangGraph 库中 langgraph.checkpoint.mysql.pymysql 模块的一个检查点保存器类,继承自 BaseCheckpointSaver,用于将状态图的检查点保存到 MySQL 数据库中。LangGraph 是 LangChain 生态的扩展框架,专注于构建复杂、有状态的 AI 系统,通过状态图(StateGraph)管理节点和边,支持动态路由、循环和状态管理。检查点(Checkpoint)是 LangGraph 的核心功能,用于在图执行的每一步保存状态,支持状态持久化、恢复和多轮交互。PyMySQLSaver 使用 PyMySQL 库实现同步数据库操作,适合需要持久化存储的同步编程环境。


1. 定义与功能

1.1 类定义

PyMySQLSaverBaseCheckpointSaver 的子类,定义如下:

from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver

class PyMySQLSaver(BaseCheckpointSaver):
    """
    使用 PyMySQL 连接 MySQL 数据库的检查点保存器(同步操作)。

    参数:
        connection: PyMySQL 连接对象,需设置 autocommit=True。
        serde: 可选的序列化器,默认为 JsonPlusSerializer。

    示例:
        import pymysql
        from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver

        connection = pymysql.connect(host='localhost', user='user', password='password', database='dbname', autocommit=True)
        checkpointer = PyMySQLSaver(connection=connection)
        checkpointer.setup()
    """
  • 继承:继承自 BaseCheckpointSaver,实现其抽象方法,提供 MySQL 存储逻辑。
  • 依赖:使用 PyMySQL 库(纯 Python 的 MySQL 客户端)与 MySQL 数据库交互。
  • 作用:将检查点数据持久化存储到 MySQL 数据库,支持同步操作,适合生产环境中的同步编程场景。

1.2 核心功能

  • 持久化存储:将检查点保存到 MySQL 数据库,数据在应用重启后仍可恢复。
  • 线程隔离:通过 thread_id 管理多线程,确保不同会话的状态独立。
  • 同步操作:提供同步方法(如 getput),适合同步编程环境。
  • 数据库初始化:通过 setup() 方法创建必要的数据库表(如 checkpoints)。
  • 连接管理:支持传入 PyMySQL 连接对象,需配置 autocommit=True
  • 序列化支持:通过 serde 参数支持自定义序列化,默认使用 JsonPlusSerializer,兼容复杂数据类型。

1.3 使用场景

  • 生产环境:需要持久化存储的 AI 应用,如聊天机器人、自动化工作流。
  • 多轮对话:保存对话历史,支持上下文连续性。
  • 状态恢复:从中断点恢复任务,确保工作流连续性。
  • 同步编程:在不需要异步优化的场景中,适合同步操作环境。
  • MySQL 偏好:在已有 MySQL 基础设施的系统中,优先选择。

2. 参数与初始化

2.1 初始化参数

  • connection
    • 类型pymysql.connections.Connection
    • 描述:PyMySQL 连接对象,必须设置 autocommit=True 以确保事务一致性。
    • 示例
      import pymysql
      connection = pymysql.connect(host='localhost', user='user', password='password', database='dbname', autocommit=True)
      
  • serde
    • 类型Optional[SerializerProtocol]
    • 默认值None(使用 JsonPlusSerializer
    • 描述:序列化器,处理检查点数据的序列化和反序列化,支持 LangChain 和 LangGraph 原生类型。
  • conn_string(通过 from_conn_string 类方法):
    • 类型str
    • 描述:MySQL 连接字符串(如 "mysql://user:password@localhost:3306/dbname")。

2.2 初始化方法

  • 直接初始化
    import pymysql
    from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
    
    connection = pymysql.connect(host='localhost', user='user', password='password', database='dbname', autocommit=True)
    checkpointer = PyMySQLSaver(connection=connection)
    
  • 使用连接字符串
    checkpointer = PyMySQLSaver.from_conn_string("mysql://user:password@localhost:3306/dbname")
    

2.3 数据库初始化

  • 方法setup()
    • 描述:创建必要的数据库表(如 checkpoints),首次使用时必须调用。
    • 调用
      checkpointer.setup()
      
    • 注意
      • 确保数据库用户有创建表的权限。
      • 表结构通常包括 thread_idcheckpoint_data 等字段。

3. 使用方法

3.1 安装与环境准备

  1. 安装依赖
    pip install langgraph-checkpoint-mysql[pymysql]
    
    • langgraph-checkpoint-mysql:提供 PyMySQLSaver
    • pymysql:MySQL 客户端库,支持同步操作。
  2. 数据库配置
    • 确保 MySQL 数据库运行,推荐版本:
      • MySQL >= 8.0.19
      • MariaDB >= 10.7.1
    • 配置连接信息(主机、端口、数据库名、用户名、密码)。
  3. 连接设置
    • 创建 PyMySQL 连接时,设置 autocommit=True
      import pymysql
      connection = pymysql.connect(host='localhost', user='user', password='password', database='dbname', autocommit=True)
      

3.2 集成到状态图

  1. 创建状态图
    from langgraph.graph import StateGraph
    
    builder = StateGraph(int)
    builder.add_node("add_one", lambda x: x + 1)
    builder.set_entry_point("add_one")
    builder.set_finish_point("add_one")
    
  2. 编译图
    checkpointer = PyMySQLSaver.from_conn_string("mysql://user:password@localhost:3306/dbname")
    checkpointer.setup()  # 创建表
    graph = builder.compile(checkpointer=checkpointer)
    
  3. 运行图
    config = {"configurable": {"thread_id": "thread-1"}}
    result = graph.invoke(1, config=config)
    print(result)  # 输出: 2
    

3.3 操作检查点

  • 获取检查点
    checkpoint = checkpointer.get_tuple(config)
    print(checkpoint)  # 输出: CheckpointTuple(...)
    
  • 列出检查点
    checkpoints = list(checkpointer.list(config))
    for cp in checkpoints:
        print(cp)
    
  • 保存检查点:由状态图自动调用 put,无需手动操作。

3.4 完整示例:多轮对话

以下示例展示如何使用 PyMySQLSaver 实现多轮对话:

from typing import List
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
from langchain_core.messages import HumanMessage
import pymysql

# 定义状态
class State(TypedDict):
    messages: List[dict]

# 定义节点
def agent_node(state: State) -> State:
    last_message = state["messages"][-1]["content"]
    return {"messages": state["messages"] + [{"role": "assistant", "content": "Echo: " + last_message}]}

# 创建数据库连接
connection = pymysql.connect(
    host='localhost',
    user='user',
    password='password',
    database='dbname',
    autocommit=True
)

# 初始化 PyMySQLSaver
checkpointer = PyMySQLSaver(connection=connection)
checkpointer.setup()

# 构建状态图
builder = StateGraph(State)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.set_finish_point("agent")

# 编译图
graph = builder.compile(checkpointer=checkpointer)

# 运行多轮对话
config = {"configurable": {"thread_id": "thread-1"}}
result1 = graph.invoke({"messages": [{"role": "user", "content": "Hello"}]}, config=config)
print(result1["messages"][-1]["content"])  # 输出: Echo: Hello

result2 = graph.invoke({"messages": [{"role": "user", "content": "How are you?"}]}, config=config)
print(result2["messages"][-1]["content"])  # 输出: Echo: How are you?

# 关闭连接
connection.close()
解析
  • 状态State 包含消息列表,模拟对话历史。
  • 检查点PyMySQLSaver 将状态保存到 MySQL 数据库,thread_id 确保连续性。
  • 结果:对话历史持久化存储,支持跨会话访问。

3.5 异步替代

  • 限制PyMySQLSaver 仅支持同步操作。
  • 替代方案
    • 使用 AIOMySQLSaverAsyncMySaver(需安装 aiomysqlasyncmy)。
    • 示例:
      from langgraph.checkpoint.mysql.aiomysql import AIOMySQLSaver
      checkpointer = AIOMySQLSaver.from_conn_string("mysql://user:password@localhost:3306/dbname")
      

4. 实现原理

4.1 数据库存储

  • 表结构PostgresSaver 创建 checkpoints 表,典型字段包括:
    • thread_id:会话标识。
    • checkpoint_data:序列化的检查点数据(JSON 或 Pickle 格式)。
    • timestamp:记录保存时间。
  • SQL 操作:通过 psycopg 执行插入、查询和更新操作,事务确保数据一致性。

4.2 线程隔离

  • 使用 thread_id 区分会话,存储在数据库中,防止状态冲突。
  • checkpoint_ns 支持命名空间管理。

4.3 序列化

  • 默认使用 JsonPlusSerializer,支持 LangChain 和 LangGraph 原生类型。
  • 可自定义 serde,如 PickleCheckpointSerializer,但需注意安全。

4.4 同步操作

  • 所有方法均为同步,适合同步编程环境。
  • 使用 PyMySQL 的同步接口,确保操作顺序。

5. 适用场景与限制

5.1 适用场景

  • 生产环境:需要持久化存储的 AI 应用,如聊天机器人、自动化工作流。
  • 多轮对话:保存对话历史,支持上下文连续性。
  • 状态恢复:从中断点恢复任务。
  • 同步编程:在同步编程环境中使用。

5.2 限制

  • 数据库依赖:需要 MySQL 数据库,增加部署复杂性。
  • 初始化要求:首次使用需调用 setup() 创建表。
  • 性能:依赖数据库配置,未优化可能导致延迟。
  • 版本要求:MySQL >= 8.0.19 或 MariaDB >= 10.7.1。
  • 异步支持:仅同步,异步需求需用 AIOMySQLSaverAsyncMySaver.

6. 对比其他检查点保存器

特性 PyMySQLSaver InMemorySaver PostgresSaver
存储位置 MySQL 数据库 内存 PostgreSQL 数据库
适用场景 生产环境、同步操作 调试、测试、短期实验 生产环境、长期运行
持久化 否(重启后丢失)
安装要求 langgraph-checkpoint-mysql[pymysql] 无需额外安装 langgraph-checkpoint-postgres
异步支持 否(同步)

选择建议

  • 开发阶段:使用 InMemorySaver,快速迭代。
  • 生产环境:根据数据库偏好选择 PyMySQLSaver(MySQL)或 PostgresSaver(PostgreSQL)。

7. 注意事项

  1. 数据库配置
    • 确保 MySQL 版本符合要求(>= 8.0.19 或 MariaDB >= 10.7.1)。
    • 配置连接时设置 autocommit=True
  2. 初始化
    • 调用 setup() 创建表,确保用户有权限。
  3. 序列化安全
    • 默认 JsonPlusSerializer 安全。
    • 使用 PickleCheckpointSerializer 时,仅反序列化可信数据。
  4. 性能优化
    • 使用连接池(如 pymysqlpool)优化连接管理。
    • 优化数据库索引,加速查询。
  5. 调试
    • 启用 debug=True 查看日志。
    • 使用 SQL 工具检查 checkpoints 表。
  6. 版本兼容性

8. 学习建议

  • 基础知识:掌握 LangGraph 的状态图、检查点和 MySQL 基础。
  • 文档:阅读 LangGraph 文档langgraph-checkpoint-mysql GitHub
  • 实践
    • 搭建本地 MySQL 数据库,测试简单状态图。
    • 实现多轮对话,验证持久化效果。
  • 社区:加入 LangChain Discord,查看 GitHub 示例。
  • 调试:使用日志和 SQL 工具分析检查点数据。

9. 总结

langgraph.checkpoint.mysql.pymysql.PyMySQLSaver 是 LangGraph 的检查点保存器,基于 MySQL 数据库实现持久化存储,适合生产环境。它继承自 BaseCheckpointSaver,支持同步操作,通过 setup() 初始化表结构,提供线程隔离和序列化支持。核心功能包括持久化检查点、状态恢复,适用于多轮对话和复杂工作流。与 InMemorySaver 相比,提供持久化优势;与 PostgresSaver 类似,适用于 MySQL 环境。通过示例和实践,开发者可以快速掌握其用法,构建可靠的 AI 应用。

关键引用

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐