LangChain Checkpointer结合Redis实现对话记忆功能并持久化存储

构建具备多轮对话能力和高鲁棒性的智能体(Agent)系统,核心挑战在于为其配置一个能够持久化状态的机制。在 LangChain 框架中,这正是通过 LangGraph 底层提供的 Checkpointer 机制实现的 。

Checkpointer 机制是实现智能体 记忆(Memory)弹性(Resilience) 的关键,它将整个工作流的状态保存下来,从而赋予智能体强大的运行能力 。

一. Checkpointer 的核心功能与架构定位

Checkpointer 是一种状态持久化服务,它将 LangGraph 工作流中离散的执行步骤(称为“超步骤”)连接起来,形成一个连续的、有状态的会话 。

1.1. Checkpointer 实现的关键能力

通过将 LangGraph 工作流与 Checkpointer 绑定,智能体获得了以下关键能力 :

  1. 1.

    短期记忆(Short-term Memory): 允许智能体追踪并记住多轮对话中的上下文,使其能够进行连贯、有意义的交互 。

  2. 2.

    弹性与容错(Resilience): 允许长期运行的智能体在执行中断或遇到错误时,能够从上一次已保存的检查点恢复状态,避免从头开始计算 。

  3. 3.

    人机协作(Human-in-the-Loop): 支持图执行流程的中断,以等待用户的外部输入,然后从精确的暂停点恢复 。

  4. 4.

    时间旅行与调试(Time Travel): 能够检索历史上的任何检查点状态,允许回滚、重试或分支执行路径,这对于复杂的调试和审计至关重要 。

1.2 与Redis的集成

LangGraph 通过 langgraph-checkpoint-redis 库提供了 RedisSaver,这是在高并发生产环境中实现 Checkpointer 的理想选择。

Redis 被广泛用于 LLM 对话记忆中的存储,主要因为它提供了高性能和灵活的存储解决方案。

  1. 1.

    高性能持久化: Redis 是一种内存数据库,提供亚毫秒级(<1ms)的超快读写延迟。这对于需要频繁检索和保存 Checkpoint 状态的 LangGraph 智能体至关重要,能显著降低 I/O 带来的性能开销。

  2. 2.

    灵活的内存类型: Redis 不仅支持线程级(短期)的 Checkpoint 记忆,还可通过 RedisStore 支持跨线程(长期)的记忆。

  3. 3.

    可扩展性与模块支持: Redis 支持线性扩展,并可通过 Redis Stack 提供的模块(如 RedisJSON 和 RediSearch)对复杂的 Checkpoint 状态进行优化存储和高性能索引。

在LangGraph中使用Redis作为Checkpointer,需要安装langgraph-checkpoint-redis,如下命令:

1
#pip install langgraph-checkpoint-redis

然后,在代码中如下方式使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import redis
from langgraph.checkpoint.redis import RedisSaver

# -----------------------------------------------------
# 1. Redis 连接池与 Checkpointer 初始化
# -----------------------------------------------------

# Gunicorn 部署下的最佳实践:使用显式连接池实现资源隔离
REDIS_POOL: Optional[redis.ConnectionPool] = None

def initialize_redis_checkpointer():
"""初始化 Redis 连接池和 Checkpointer 实例。"""
global REDIS_POOL

# 实例化同步连接池
if REDIS_POOL is None:
REDIS_POOL = redis.ConnectionPool(
host=config.REDIS_HOST,
port=config.REDIS_PORT,
password=config.REDIS_PASSWORD,
db=config.REDIS_DB,
max_connections=50, # 限制每个 Gunicorn 进程的最大连接数
)

# 实例化同步 Redis 客户端(使用连接池)
redis_client_instance = redis.Redis(connection_pool=REDIS_POOL)

# 使用显式客户端初始化 RedisSaver
# 传入 redis_client 实例,以确保 LangGraph 使用我们配置的连接池
checkpointer = RedisSaver(
redis_client=redis_client_instance,
# 设置 TTL 自动清理不活跃的会话状态
ttl={"checkpoints": 86400}
)

# 提示:首次运行时,需要调用 checkpointer.setup() 来初始化 Redis 中的索引。
# checkpointer.setup()

return checkpointer

1.3. 在 create_agent 中使用checkpointer

create_agent 函数是 LangChain 框架中用于创建生产级智能体的核心接口 。要启用状态持久化和记忆功能,只需在创建智能体时通过 checkpointer 参数传入一个配置好的检查点保存器实例 。

1
2
3
# 核心集成模式
from langchain.agents import create_agent
agent = create_agent(model, tools=tools, checkpointer=redis_checkpointer_instance)

1.3. Checkpointer详解

Checkpointer 的强大之处在于它不仅仅保存了数据,还保存了精确的流程控制元数据,这使得智能体可以智能地恢复执行。

A. 线程(Thread)与会话标识符

线程是一个唯一的标识符(thread_id),它代表了智能体与特定用户的一系列连续运行的累积状态。在调用智能体时,必须在配置中指定一个 thread_id,Checkpointer 将以此 ID 为索引来存储和检索会话状态。

B. 检查点(Checkpointer)的组件

一个检查点是一个 TypedDict 类型的状态快照,用于在给定时间点捕获图的完整执行上下文。核心组件包括:

组件 (键) 类型 功能描述与架构意义 如何支持记忆与弹性
channel_values dict[str, Any] 图中所有通道(Channels)的最新数据值 [4]。对话历史 (messages 通道) 就是其中最关键的值。 对话记忆:存储和恢复完整的会话历史。
channel_versions defaultdict[str, int] 每个通道最后更新的逻辑时间步(版本)。 时序依赖:是判断数据新旧、维护数据流时序的基准。
versions_seen defaultdict[str, defaultdict[str, int]] 映射:节点 ID -> 通道名称 -> 已处理的版本。 流程控制:恢复时,LangGraph通过比较这个值来精确计算哪些节点收到了新数据,从而确定下一步应该执行哪个节点。
tasks tuple 包含需要执行的待处理操作或潜在的错误信息。 故障恢复:用于捕获执行中断或失败状态,系统恢复后可继续处理挂起的任务。
next tuple 元组,列出了在下一个超步骤中预定执行的节点名称。 状态移交:确保执行流能够从中断点无缝地转移到下一个正确的节点。

二. LangGraph的Checkpointer 实现

LangGraph 提供了多种 Checkpointer 实现,以适应不同的部署环境和规模需求。

A. Checkpointer 实现列表

实现 后端技术 推荐用例 部署环境
InMemorySaver Python 进程内存 原型设计、测试、短期实验。 本地开发
SqliteSaver SQLite 数据库 本地工作流、单用户应用。 本地/小型规模
PostgresSaver PostgreSQL 数据库 企业级生产环境、高可用(HA)和强事务完整性要求。 平台部署/大型企业
RedisSaver Redis 数据库 高性能、低延迟、高并发的多租户应用。 高并发服务

B. 浅层与深层持久化 (Shallow vs. Deep)

默认的 Checkpointer 实现(如 RedisSaver)会存储一个线程的所有历史检查点(深层持久化)。这对于长周期任务和详细审计至关重要。

然而,对于只需要对话记忆(即只需要最新的状态来生成下一个回复)的应用,LangGraph 还提供了浅层(Shallow)检查点保存器(如 ShallowRedisSaver)。浅层保存器仅存储最新的检查点,可以显著节省存储空间和内存占用。

三. 使用 Redis进行对话记忆的示例

以下示例展示了如何在追求高性能的同步 Web 环境(如 Flask + Gunicorn)中,使用显式连接池将 RedisSaver 集成到 LangChain 智能体中。

  • 同步环境与 Redis 关联: 在同步(WSGI)架构下,必须使用同步的 RedisSaverredis.ConnectionPool 来管理连接,确保在高并发下连接的安全复用和低延迟。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    import os
    import redis
    from langgraph.checkpoint.redis import RedisSaver
    from langchain.agents import create_agent
    from langchain_openai import ChatOpenAI
    from langchain_core.messages import HumanMessage
    from langchain.tools import tool
    from typing import Optional

    # -----------------------------------------------------
    # 2. 智能体创建与调用 (实现记忆)
    # -----------------------------------------------------

    # 定义模型和工具
    model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

    @tool
    def demo_tool(name: str) -> str:
    """这里需要描述你的工具作用."""
    return f"User name {name} Demo Tool."

    # 创建 Checkpointer 实例
    redis_checkpointer = initialize_redis_checkpointer()

    # 创建智能体,并传入 Checkpointer 启用记忆功能
    chat_agent = create_agent(
    model=model,
    tools=[demo_tool],
    checkpointer=redis_checkpointer
    )

    # --- 示例:模拟两次同步调用,实现会话记忆 ---

    # 定义会话 ID (thread_id)
    chat_session_id = ""
    agent_config = {"configurable": {"thread_id": chat_session_id }} # 传递线程 ID

    print("--- Turn 1: 首次调用 ---")
    chat_agent.invoke(
    {"messages": [HumanMessage(content="你好,我叫 Jane。")]},
    agent_config
    )

    # 第二次调用 (Turn 2): 检索状态并继续对话
    print("\n--- Turn 2: 再次调用 ---")
    # Checkpointer 自动检索 Turn 1 的完整状态(包括 "你好,我叫 Jane。")
    res_t2 = chat_agent.invoke(
    {"messages": [HumanMessage(content="我刚才告诉了你什么名字?")]},
    agent_config # 保持相同的 SESSION_ID
    )

    print(f"智能体回复(基于记忆):{res_t2['messages'][-1].content}")
    ```