跳转至

LangGraph 八股文

一、LangGraph 概述

1. LangGraph 是什么?它的核心定位是什么?

LangGraph 是专注于 Agent 编排(orchestration) 的低层框架。

核心定位:

  • 专注于 Agent 编排,而非 Chain 的顺序执行
  • 提供 有状态、多角色 的工作流编排能力
  • 支持 循环图 结构(突破 DAG 限制)

2. LangGraph解决了什么痛点?

参考答案

LangChain 推出 LangGraph 主要是为了解决四个痛点:

第一,Chain 天然是 DAG,只能顺序执行,难以处理复杂编排;

第二,Agent 执行是黑盒,无法精细干预;

第三,缺乏状态管理能力;

第四,不支持持续执行、断点续跑等;

LangGraph 通过提供循环图、显式状态管理、精细流程控制和持久化执行,弥补了这些不足。

3. LangGraph vs LangChain:两者是什么关系?各自适用场景?

维度 LangChain LangGraph
定位 高层抽象,快速构建 LLM 应用 低层编排,精细控制工作流
执行模型 DAG(有向无环图) 循环图(允许循环)
状态管理 隐式传递 显式 State 共享
流程控制 固定链式调用 动态路由、条件分支
适用场景 简单问答、固定流程 复杂 Agent、多轮对话、协作工作流

关系:LangGraph 是 LangChain 生态的一部分,两者可配合使用。LangGraph 底层仍可使用 LangChain 的 Chain、Tool 等组件。

参考答案

LangChain 和 LangGraph 的定位不同:LangChain 是高层抽象,用于快速构建 LLM 应用,执行模型是 DAG;LangGraph 是低层编排,用于精细控制工作流,支持循环图和显式状态管理。两者关系是:LangGraph 是 LangChain 生态的一部分,可以配合使用。适用场景上,LangChain 适合简单问答、固定流程;LangGraph 适合复杂 Agent、多轮对话、协作工作流。

4. LangGraph 的核心优势有哪些?

主要看前三个就行

优势 说明
Durable Execution 故障恢复、断点续跑、长时间运行
Human-in-the-loop 可随时中断、人工审核、修改后继续
Comprehensive Memory 短期会话记忆 + 跨会话长期记忆
Debugging with LangSmith 可视化追踪、状态转换、运行时指标
Production-ready 专为有状态工作流设计的可扩展架构

提示

主要特性是前三个: durable execution、human-in-the-loop、comprehensive memory。

5. 什么场景下应该选择 LangGraph 而不是 LangChain?

场景 为什么选 LangGraph
多轮对话 Agent 需维护对话状态和上下文记忆
复杂工作流 循环执行、条件分支、动态路由
人工审核流程 关键节点需人工确认或修改
长期运行任务 需故障恢复、断点续跑
Multi-Agent 协作 多 Agent 间有依赖或通信

参考答案

判断标准很简单:需要精细控制流程、维护状态、人工介入时,选 LangGraph。典型场景有 5 类:多轮对话 Agent、复杂工作流(循环执行、条件分支、动态路由)、人工审核流程、长期运行任务、Multi-Agent 协作。举个例子,做一个客服 Agent,需要根据用户问题动态路由到不同专家 Agent,还要维护对话历史,这种情况下 LangChain 的 Chain 就很难处理,而 LangGraph 可以用条件边实现动态路由,用 State 维护对话状态。

二、State(状态管理)

1. State 在 LangGraph 中是什么?

State 是图中所有节点共享的数据结构,代表应用的当前快照。

2. LangGraph 支持哪几种 State 定义方式?

支持三种定义方式:

方式 适用场景 代码示例
TypedDict 大多数场景(最常用) class State(TypedDict):
dataclass 需要默认值时 @dataclass class State:
Pydantic 需要参数校验时 class State(BaseModel):
# TypedDict(最常用)
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    count: int

# dataclass(需要默认值)
@dataclass
class State:
    messages: list = field(default_factory=list)
    retries: int = 3

# Pydantic(需要校验)
class AgentState(BaseModel):
    name: str = Field(..., max_length=10)
    age: int = Field(..., gt=0)

注意

LangChain 的 create_agent 工厂函数不支持 Pydantic 定义的 State。

3. TypedDict、dataclass、Pydantic 三种方式各适用什么场景?

方式 适用场景 优点 局限
TypedDict 大多数场景 简洁、类型检查 不支持默认值
dataclass 需要默认值 支持默认值、类型检查 稍 verbose
Pydantic 需要参数校验 自动校验、序列化 create_agent 不支持

参考答案

默认用 TypedDict,需要默认值用 dataclass,需要校验用 Pydantic(但避开 create_agent)。

4. 什么是 Reducer?它的函数签名是什么?

Reducer 是定义 State 中每个键如何被更新的函数。

函数签名

reducer(current_value, new_value) -> merged_value

参考答案

Reducer 是定义 State 中每个键如何被更新的函数,函数签名是 reducer(current_value, new_value) -> merged_value。每个 State 字段都对应一个 Reducer,决定该字段如何被更新。

5. Reducer 的默认行为是什么?如何自定义 Reducer?

默认行为:直接覆盖(Replace)

class State(TypedDict):
    name: str  # 默认 Reducer 是直接覆盖
    messages: list  # 也是直接覆盖

自定义 Reducer:使用 Annotated[type, reducer]

def custom_reducer(current: list, new: list) -> list:
    """去重合并"""
    return current + [x for x in new if x not in current]

class State(TypedDict):
    items: Annotated[list[str], custom_reducer]

参考答案

Reducer 的默认行为是直接覆盖。自定义 Reducer 可以使用 Annotated[type, reducer] 语法,比如 Annotated[list[str], custom_reducer]。LangGraph 还内置了一些常用 Reducer,如 operator.add、add_messages、merge_dicts 等。

6. 内置 Reducer 有哪些?

Reducer 说明 示例
operator.add 列表追加/数值累加 Annotated[list, operator.add]
add_messages 消息追加(LangGraph 内置) Annotated[list[Message], add_messages]
operator.mul 数值累乘 Annotated[int, operator.mul]
merge_dicts 字典合并 Annotated[dict, merge_dicts]

参考答案

LangGraph 内置的 Reducer 有:operator.add(列表追加/数值累加)、add_messages(消息追加,LangGraph 内置)、operator.mul(数值累乘)、merge_dicts(字典合并)。使用时可以用 Annotated[type, reducer] 语法指定,比如 Annotated[list, operator.add]

7. 如何绕过 Reducer 直接覆盖 State?

使用 Overwrite 类型可绕过 Reducer 直接覆盖:

from langgraph.graph import Overwrite

class State(TypedDict):
    bar: Annotated[list[str], operator.add]  # 配置了追加策略

def force_overwrite_node(state: State):
    return {"bar": Overwrite(["new_value"])}  # 强制覆盖,不追加

适用场景:某些节点需要重置某个字段,而不是按 Reducer 逻辑累积。

参考答案

使用 Overwrite 类型可以绕过 Reducer 直接覆盖 State。比如某个字段配置了 operator.add 追加策略,但某节点需要强制覆盖而非追加,可以返回 {"bar": Overwrite(["new_value"])}。适用场景是某些节点需要重置某个字段,而不是按 Reducer 逻辑累积。

8. Reducer 初始值陷阱是什么?如何解决?

问题:非覆盖型 Reducer(如 operator.mul)与默认初始值运算会产生意外结果。

class State(TypedDict):
    age: Annotated[float, operator.mul]  # float 默认值 0.0

app.invoke({'age': 1})  # 结果:age = 0.0 ❌
# 原因:0.0 * 1 = 0.0

解决方案

方案 说明
不用 Annotated 直接 age: float,节点返回什么就是什么
使用 Overwrite app.invoke({'age': Overwrite(1)})
自定义 Reducer 处理初始值特殊情况

参考答案

Reducer 初始值陷阱是指:非覆盖型 Reducer(如 operator.mul)与默认初始值运算会产生意外结果。比如 Annotated[float, operator.mul],float 默认值是 0.0,传入 1 后结果是 0.0 * 1 = 0.0。解决方案有三种:第一,不用 Annotated,直接声明类型;第二,使用 Overwrite 包裹初始值;第三,自定义 Reducer 处理初始值。注意累加型 reducer 没问题(0 + x = x),但累乘型必须特殊处理。

9. 什么是 Private State?为什么需要 Input/Output Schema 分离?

Private State:仅内部节点使用的状态,不暴露给图的输入/输出。

Schema 分离的原因

  • 内部节点传递的信息不需要出现在图的输入/输出中
  • 图的输入/输出 Schema 与内部 Schema 不同
  • 输出只包含少数关键字段,隐藏内部细节

定义方式

class PrivateState(TypedDict):
    bar: str  # 仅内部使用

class InputState(TypedDict):
    user_input: str

class OutputState(TypedDict):
    graph_output: str

class OverallState(TypedDict):
    foo: str
    user_input: str
    graph_output: str

# 创建图时指定 input/output schema
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)

参考答案

Private State 是仅内部节点使用的状态,不暴露给图的输入/输出。Input/Output Schema 分离的原因是:内部节点传递的信息不需要出现在图的输入/输出中;图的输入/输出 Schema 可能与内部 Schema 不同;输出只包含少数关键字段,可以隐藏内部细节。实现方式是在创建 StateGraph 时指定 input_schema 和 output_schema 参数。

10. 图的 State Schema 是如何整合的?节点能写入哪些字段?

整合规则

  1. 图的 State 是所有定义 Schema 的合集(OverallState + PrivateState + InputState + OutputState)
  2. 节点可写入图中任何已定义的字段:即使节点输入 Schema 不包含某字段,只要该字段在图的某个 Schema 中定义过,节点就能写入
  3. 新 Schema 自动注册:只要 Schema 类定义存在,节点就能使用,LangGraph 自动注册

示例

class PrivateState(TypedDict):
    internal_data: str

class OverallState(TypedDict):
    user_input: str
    output: str

builder = StateGraph(OverallState)

def node_1(state: OverallState):
    # 可写入 PrivateState 中定义的字段
    return {"internal_data": "hello"}  # ✅ 合法

def node_2(state: OverallState):
    # 可写入 OverallState 中定义的字段
    return {"output": "world"}  # ✅ 合法

参考答案

图的 State Schema 整合规则有三点:第一,图的 State 是所有定义 Schema 的合集;第二,节点可写入图中任何已定义的字段,即使节点输入 Schema 不包含某字段,只要该字段在图的某个 Schema 中定义过,节点就能写入;第三,新 Schema 自动注册,只要 Schema 类定义存在,LangGraph 会自动注册。

三、Node(节点)

1. Node 在 LangGraph 中是什么?本质是什么?

Node(节点) 是图中的处理单元,本质是一个 Python 函数(同步或异步)。它接收当前 State,执行计算或副作用(如调用 LLM、API),返回 State 的部分更新。Node 会被转换为 RunnableLambda,自动获得 batch、async、tracing 支持。

2. Node 函数可以接收哪些参数?

Node 函数可接收 3 种参数(可选):

参数 说明
state 图的当前 State
config RunnableConfig 对象,包含 thread_id、tags 等配置信息
runtime Runtime 对象,包含 contextstorestream_writerexecution_info
# 只接收 state
def plain_node(state: State):
    return state

# 接收 state + config
def node_with_config(state: State, config: RunnableConfig):
    thread_id = config["configurable"]["thread_id"]
    return {"result": f"Hello from thread {thread_id}"}

# 接收 state + runtime
def node_with_runtime(state: State, runtime: Runtime):
    user_id = runtime.context.user_id
    return {"result": f"Hello user {user_id}"}

参考答案

Node 函数可接收 3 种参数:第一,state,图的当前 State;第二,config,RunnableConfig 对象,包含 thread_id、tags 等配置信息;第三,runtime,Runtime 对象,包含 context、store、stream_writer、execution_info 等。

3. RunnableConfig 包含哪些信息?典型用途是什么?

RunnableConfig 包含:

  • configurable:用户自定义配置(如 thread_id
  • tags:用于追踪和日志的标签
  • metadata:额外元数据
  • callbacks:回调函数列表

典型用途

  • 获取 thread_id 实现会话隔离
  • 添加 tracing 标签便于 LangSmith 调试
  • 注入自定义配置信息

参考答案

RunnableConfig 包含 configurable(用户自定义配置,如 thread_id)、tags(用于追踪和日志的标签)、metadata(额外元数据)、callbacks(回调函数列表)。典型用途是获取 thread_id 实现会话隔离,添加 tracing 标签便于 LangSmith 调试,注入自定义配置信息。

4. Runtime 对象包含哪些信息?execution_info 的作用?

Runtime 对象包含:

  • context:运行时上下文(如 user_id
  • store:持久化存储接口
  • stream_writer:流式输出写入器
  • execution_info:执行信息

execution_info 属性

属性 说明
node_attempt 当前执行尝试次数(1-indexed),重试时递增
node_first_attempt_time 第一次尝试的 Unix 时间戳

典型用途:重试时获取当前重试次数

def my_node(state: State, runtime: Runtime):
    info = runtime.execution_info
    if info.node_attempt > 1:
        return {"result": call_fallback()}  # 重试时使用备用方案
    return {"result": call_primary()}

参考答案

Runtime 对象包含 context(运行时上下文)、store(持久化存储接口)、stream_writer(流式输出写入器)、execution_info(执行信息)。execution_info 的作用是获取当前执行状态,主要有两个属性:node_attempt(当前执行尝试次数,重试时递增)和 node_first_attempt_time(第一次尝试的 Unix 时间戳)。典型用途是在重试时获取当前重试次数,根据重试次数选择不同的处理策略。


5. 什么是 START 和 END 节点?它们的作用?

START:入口节点常量,表示用户输入进入图的起点。

from langgraph.graph import START
graph.add_edge(START, "node_a")  # 指定首个执行的节点

END:终止节点常量,表示流程结束。

from langgraph.graph import END
graph.add_edge("node_a", END)  # node_a 执行完毕后结束

它们不是真正的节点,而是表示"开始"和"结束"的标记常量。

6. Node 缓存(Cache)如何使用?适用什么场景?

使用步骤

  1. 编译图时指定 cache
  2. 添加节点时指定 cache_policy
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy

def expensive_node(state: State):
    time.sleep(2)  # 耗时计算
    return {"result": state["x"] * 2}

builder.add_node("expensive_node", expensive_node,
                 cache_policy=CachePolicy(ttl=3))
graph = builder.compile(cache=InMemoryCache())

# 第一次:耗时 2s
graph.invoke({"x": 5})  # {'result': 10}

# 第二次(3s 内):命中缓存
graph.invoke({"x": 5})  # {'result': 10, '__metadata__': {'cached': True}}

适用场景:耗时计算节点(如 LLM 调用、数据处理),相同输入频繁执行时。

参考答案

使用 Node 缓存分两步:第一,编译图时指定 cache;第二,添加节点时指定 cache_policy。适用场景是耗时计算节点,如 LLM 调用、数据处理,相同输入频繁执行时可以显著提升性能。

7. CachePolicy 的 ttl 和 key_func 参数作用?

CachePolicy 参数

参数 说明
ttl 缓存过期时间(秒),不指定则永不过期
key_func 生成缓存 key 的函数,默认对输入做 pickle hash
# 自定义 key_func
def custom_key_func(state):
    return state["user_id"]  # 按用户 ID 生成缓存 key

CachePolicy(key_func=custom_key_func, ttl=300)  # 5 分钟过期

参考答案

CachePolicy 有两个参数:ttl 是缓存过期时间(秒),不指定则永不过期;key_func 是生成缓存 key 的函数,默认对输入做 pickle hash。

8. Node 异常重试(RetryPolicy)如何配置?

基本用法

from langgraph.types import RetryPolicy

builder.add_node("node_name", node_function,
                 retry_policy=RetryPolicy(max_attempts=3))

RetryPolicy 参数

参数 说明
max_attempts 最大重试次数
retry_on 触发重试的异常列表或判断函数
wait 重试等待策略(如指数退避)

参考答案

配置 Node 异常重试使用 RetryPolicy,基本用法是在添加节点时指定 retry_policy 参数。RetryPolicy 有三个参数:max_attempts(最大重试次数)、retry_on(触发重试的异常列表或判断函数)、wait(重试等待策略,如指数退避)。

9. 默认重试策略对哪些异常不重试?为什么?

默认不重试的异常(认为是代码逻辑错误,重试无意义):

  • ValueErrorTypeErrorArithmeticError
  • ImportErrorLookupErrorNameError
  • SyntaxErrorRuntimeErrorReferenceError
  • StopIterationStopAsyncIteration
  • OSError(部分情况)

HTTP 库特殊处理:仅对 5xx 状态码 重试,4xx 不重试。

参考答案

默认不重试的异常包括:ValueError、TypeError、ArithmeticError、ImportError、LookupError、NameError、SyntaxError、RuntimeError、ReferenceError、StopIteration、StopAsyncIteration、OSError 等。这些异常通常表示代码 bug 或参数错误,重试不会改变结果。HTTP 库特殊处理:仅对 5xx 状态码重试,4xx 不重试。

10. 如何在重试时获取当前重试次数?

通过 runtime.execution_info.node_attempt 获取:

from langgraph.runtime import Runtime
from langgraph.types import RetryPolicy

def my_node(state: State, runtime: Runtime):
    info = runtime.execution_info
    print(f"当前是第 {info.node_attempt} 次尝试")

    if info.node_attempt > 1:
        return {"result": call_fallback()}  # 重试时使用备用方案

    return {"result": call_primary_api()}

builder.add_node("my_node", my_node,
                 retry_policy=RetryPolicy(max_attempts=3))

node_attempt 从 1 开始计数,首次执行是 1,第一次重试是 2。

参考答案

通过 runtime.execution_info.node_attempt 获取当前重试次数。node_attempt 从 1 开始计数,首次执行是 1,第一次重试是 2。典型用途是在重试时根据重试次数选择不同的处理策略,比如重试时使用备用方案。

四、Edge(边)

1. Edge 有哪几种类型?

类型 说明
Normal Edge 固定从一个节点到另一个节点
Conditional Edge 调用函数动态决定下一个节点
Entry Point 指定图的入口节点
Conditional Entry Point 调用函数动态决定入口节点

参考答案

Edge 有四种类型:Normal Edge(固定从一个节点到另一个节点)、Conditional Edge(调用函数动态决定下一个节点)、Entry Point(指定图的入口节点)、Conditional Entry Point(调用函数动态决定入口节点)。

2. Normal Edge 和 Conditional Edge 的区别?

维度 Normal Edge Conditional Edge
路由方式 固定跳转 动态路由
定义方式 add_edge(from, to) add_conditional_edges(from, func)
返回值 函数返回目标节点名
适用场景 固定流程 条件分支、动态路由
# Normal Edge:A → B(固定)
graph.add_edge("node_a", "node_b")

# Conditional Edge:根据条件决定去 B 还是 C
def route(state):
    return "node_b" if state["success"] else "node_c"

graph.add_conditional_edges("node_a", route)

参考答案

Normal Edge 和 Conditional Edge 的区别有四点:第一,路由方式不同,Normal Edge 是固定跳转,Conditional Edge 是动态路由;第二,定义方式不同;第三,返回值不同;第四,适用场景不同,Normal Edge 适合固定流程,Conditional Edge 适合条件分支、动态路由。

3. 条件边的返回值是什么?

条件边函数返回 目标节点的名称(字符串)。

def routing_function(state: State) -> str:
    return "node_b" if state["success"] else "node_c"

# 可映射返回值到节点名
graph.add_conditional_edges(
    "node_a",
    routing_function,
    {True: "node_b", False: "node_c"}
)

参考答案

条件边函数返回目标节点的名称(字符串)。也可以通过映射字典将返回值映射到节点名,比如 {True: "node_b", False: "node_c"}

4. Entry Point 和 Conditional Entry Point 的区别?

维度 Entry Point Conditional Entry Point
目标 固定入口节点 动态决定入口节点
定义 add_edge(START, "node_a") add_conditional_edges(START, func)
适用 单一入口 多入口、条件路由
# Entry Point:固定从 node_a 开始
graph.add_edge(START, "node_a")

# Conditional Entry Point:根据条件决定入口
def choose_entry(state):
    return "node_b" if state["urgent"] else "node_c"

graph.add_conditional_edges(START, choose_entry)

参考答案

Entry Point 和 Conditional Entry Point 的区别有三点:第一,目标不同,Entry Point 是固定入口节点,Conditional Entry Point 是动态决定入口节点;第二,定义方式不同;第三,适用场景不同,Entry Point 适合单一入口,Conditional Entry Point 适合多入口、条件路由。

5. 一个节点可以有多个出边吗?执行行为是什么?

可以。一个节点可以有多个出边,所有目标节点将在下一个 superstep并行执行

# node_a 的出边指向 node_b 和 node_c
graph.add_edge("node_a", "node_b")
graph.add_edge("node_a", "node_c")

# 执行顺序:
# 1. node_a 执行
# 2. node_b 和 node_c 并行执行

参考答案

一个节点可以有多个出边,所有目标节点将在下一个 superstep 中并行执行。Superstep 是 LangGraph 的执行模型,每轮执行所有可执行的节点。

6. 什么是 Command?它和条件边有什么区别?

Command 是条件边的替代方案,可同时更新状态和路由

from langgraph.types import Command

def my_node(state: State):
    # 同时返回状态更新和路由决策
    return Command(
        update={"result": "processed"},  # 更新状态
        goto="next_node"  # 路由到下一节点
    )

与条件边的区别

维度 条件边 Command
状态更新 ❌ 只能路由 ✅ 可同时更新状态
路由
简洁性 需要单独节点更新状态 一个函数完成两件事

参考答案

Command 是条件边的替代方案,可同时更新状态和路由。与条件边的区别是:条件边只能路由,不能更新状态;Command 可同时更新状态和路由,一个函数完成两件事,更加简洁。

五、Send(动态分发)

1. Send 的使用场景是什么?为什么需要它?

使用场景

  • 边的数量预先未知(如动态生成的列表)
  • 需要不同版本的 State 同时存在(如 Map-Reduce 模式)

默认情况下,节点和边在编译前就已定义,所有节点共享同一个 State。Send 用于支持动态分发模式。

参考答案

Send 的使用场景有两个:第一,边的数量预先未知,如动态生成的列表;第二,需要不同版本的 State 同时存在,如 Map-Reduce 模式。默认情况下,节点和边在编译前就已定义,所有节点共享同一个 State,而 Send 用于支持动态分发模式。

2. Send 对象的两个参数是什么?

Send(node_name, state_dict)
参数 说明
node_name 目标节点名称
state_dict 传递给该节点的 State(可以是不同的 State)

参考答案

Send 对象有两个参数:node_name(目标节点名称)和 state_dict(传递给该节点的 State,可以是不同的 State)。

3. 如何用 Send 实现 Map-Reduce 模式?

from langgraph.types import Send

def continue_to_jokes(state: OverallState):
    # 动态生成多个 Send,每个 subject 发送到 generate_joke 节点
    return [
        Send("generate_joke", {"subject": s})
        for s in state['subjects']
    ]

graph.add_conditional_edges("node_a", continue_to_jokes)

# 后续添加 reduce 节点汇总结果
graph.add_edge("generate_joke", "reduce_node")

执行流程

  1. node_a 生成 subjects 列表
  2. 条件边返回多个 Send,每个 subject 发送到 generate_joke
  3. 所有 generate_joke 实例并行执行
  4. 结果汇总到 reduce_node

参考答案

使用 Send 实现 Map-Reduce 模式的步骤:首先在一个节点中返回 Send 列表,每个 Send 指向同一个节点但传入不同的 State;然后所有实例并行执行;最后添加一个 reduce 节点汇总结果。执行流程是:node_a 生成 subjects 列表,条件边返回多个 Send,每个 subject 发送到 generate_joke 节点并行执行,结果汇总到 reduce_node。

六、Graph 构建

1. LangGraph 构建图的基本流程?

from langgraph.graph import StateGraph, START, END

# 1. 定义 State
class State(TypedDict):
    messages: list
    count: int

# 2. 创建图
graph = StateGraph(State)

# 3. 添加节点
graph.add_node('node_a', node_a_func)
graph.add_node('node_b', node_b_func)

# 4. 添加边
graph.add_edge(START, 'node_a')
graph.add_edge('node_a', 'node_b')
graph.add_edge('node_b', END)

# 5. 编译
app = graph.compile()

# 6. 执行
result = app.invoke({"messages": [], "count": 0})

参考答案

LangGraph 构建图的基本流程有 6 步:第一,定义 State;第二,创建图(StateGraph);第三,添加节点;第四,添加边(包括 START 和 END);第五,编译(compile);第六,执行(invoke)。

2. StateGraph 构造函数可以指定哪些 Schema?

StateGraph(state_schema, input_schema=None, output_schema=None)
参数 说明
state_schema 图的整体 State Schema(必需)
input_schema 输入 Schema,约束图的输入
output_schema 输出 Schema,约束图的输出

示例

class InputState(TypedDict):
    user_input: str

class OutputState(TypedDict):
    graph_output: str

class OverallState(TypedDict):
    foo: str
    user_input: str
    graph_output: str

builder = StateGraph(OverallState,
                     input_schema=InputState,
                     output_schema=OutputState)

参考答案

StateGraph 构造函数可以指定三个 Schema:state_schema(图的整体 State Schema,必需)、input_schema(输入 Schema,约束图的输入)、output_schema(输出 Schema,约束图的输出)。

3. compile() 方法的作用?编译后返回什么?

compile() 方法将图编译为可执行的 RunnableGraph

作用

  • 验证图结构完整性(无死循环、所有节点可达)
  • 初始化执行引擎
  • 配置缓存、持久化等特性

返回值CompiledGraph 对象,支持:

  • invoke(input):同步执行
  • ainvoke(input):异步执行
  • stream(input):流式输出
  • get_state(thread_id):获取指定线程的状态

参考答案

compile() 方法将图编译为可执行的 RunnableGraph。它的作用是验证图结构完整性、初始化执行引擎、配置缓存和持久化等特性。返回值是 CompiledGraph 对象,支持 invoke(同步执行)、ainvoke(异步执行)、stream(流式输出)、get_state(获取指定线程的状态)等方法。

七、高级特性

1. 什么是 Durable Execution?它支持哪些能力?

Durable Execution(持久化执行) 是指工作流可以在故障后恢复、支持长时间运行。

支持的能力:

  • 故障恢复:进程崩溃后可从断点继续
  • 断点续跑:长时间任务可暂停后继续
  • 状态持久化:State 自动保存到存储
  • 时间旅行:可查看历史任意时刻的状态

参考答案

Durable Execution(持久化执行)是指工作流可以在故障后恢复、支持长时间运行。它支持的能力有:故障恢复(进程崩溃后可从断点继续)、断点续跑(长时间任务可暂停后继续)、状态持久化(State 自动保存到存储)、时间旅行(可查看历史任意时刻的状态)。适用场景是需要运行数小时甚至数天的长期任务。

2. Human-in-the-loop 是什么?如何实现?

Human-in-the-loop 是指在工作流中引入人工审核节点,让人可以在关键节点检查、修改状态后再继续。

实现方式

  1. 在需要人工介入的节点前设置中断点
  2. 使用 interrupt() 函数暂停执行
  3. 人工检查/修改状态后,调用 continue() 恢复
from langgraph.types import interrupt

def review_node(state: State):
    # 暂停执行,等待人工审核
    decision = interrupt({
        "type": "review",
        "data": state["pending_content"]
    })

    if decision["approve"]:
        return {"status": "approved"}
    else:
        return {"status": "rejected"}

参考答案

Human-in-the-loop 是指在工作流中引入人工审核节点,让人可以在关键节点检查、修改状态后再继续。实现方式是:在需要人工介入的节点前设置中断点,使用 interrupt() 函数暂停执行,人工检查/修改状态后,调用 continue() 恢复。

3. LangGraph 的 Memory 机制是什么?

LangGraph 提供两层记忆

类型 绑定对象 存储位置 用途
短期记忆 thread_id State 的 messages 字段 会话内对话历史
长期记忆 user_id Store 接口 用户偏好、跨会话记忆

参考答案

LangGraph 有两层记忆:短期记忆绑定 thread_id,存储在 State 的 messages 字段,用于会话内对话历史;长期记忆绑定 user_id,通过 Store 接口存取,用于用户偏好和跨会话记忆。简单说:短期记对话,长期记偏好。


4. Checkpointer 的作用是什么?

Checkpointer(检查点器) 是 LangGraph 的内置持久化层,每个 Super-step 边界自动保存 State 快照。它是后续实现时间旅行的基础。


5. Checkpointer 有哪些核心概念?

参考答案

Checkpointer 有四个核心概念:Threads(线程,关联 thread_id)、StateSnapshot(State 快照)、Super-step(一轮并行执行的最小单元)、Checkpoint namespace(标记属于哪个图)。


6. 如何获取和更新 State 快照?

StateSnapshot 字段

  • values:该 checkpoint 的 State 值
  • next:下一步要执行的节点(空表示完成)
  • config:包含 thread_id、checkpoint_ns、checkpoint_id
  • metadata:执行元数据(source、writes、step)
  • parent_config:上一个 checkpoint 的配置

获取状态

# 获取最新状态
latest_state = graph.get_state(config)

# 获取特定 checkpoint
historical_state = graph.get_state(config)

# 获取历史记录(逆序,最新在前)
history = list(graph.get_state_history(config))

更新状态

# 增量更新(产生新 checkpoint)
graph.update_state(config, {"foo": "new_value"})

# 伪装更新来源,改变下一步走向
graph.update_state(config, {"foo": "new"}, as_node="node_a")

参考答案

获取 State 快照使用 get_state() 方法,获取历史记录使用 get_state_history()。更新状态使用 update_state() 方法,每次更新产生新 checkpoint;通过 as_node 参数可伪装更新来源,改变下一步执行路径。


7. 什么是 Replay 和 Fork?有什么区别?

维度 Replay(重放) Fork(分支)
目的 重新执行历史路径 探索不同执行路径
历史记录 沿用原历史 创建新分支,原历史不变
状态修改 不可修改 可修改后再执行
# Replay:从特定 checkpoint 重放
result = graph.invoke(None, before_joke.config)

# Fork:修改状态,创建分支
fork_config = graph.update_state(
    before_joke.config,
    values={"topic": "chickens"},
)
fork_result = graph.invoke(None, fork_config)

参考答案

Replay 是从历史 checkpoint 恢复执行,checkpoint 之前的节点跳过,之后的节点重新执行,不可修改状态。Fork 是从历史 checkpoint 创建分支,可修改状态后继续执行,原历史记录保持不变。简单说:Replay 是重新走老路,Fork 是开辟新路径。


8. Checkpointer 有哪些实现?

实现 适用场景
InMemorySaver 开发测试,重启丢失
SqliteSaver 本地开发、小型应用
PostgresSaver 生产环境(推荐)
RedisSaver 需要高性能缓存
MongoDBSaver 已有 MongoDB 基础设施

参考答案

Checkpointer 有五种实现:InMemorySaver(开发测试)、SqliteSaver(本地开发)、PostgresSaver(生产推荐)、RedisSaver(高性能)、MongoDBSaver(已有基础设施)。


9. Store 和 Checkpointer 有什么区别?

维度 Checkpointer Store
绑定对象 thread_id(会话) user_id(用户)
作用范围 线程内持久化 跨线程持久化
用途 保存对话进度 保存用户偏好、长期记忆
接口 自动保存 手动 put/search

参考答案

Store 和 Checkpointer 的区别:Checkpointer 绑定 thread_id,负责当前对话的"进度条";Store 绑定 user_id,负责长期留存的"档案袋"。一个管会话进度,一个管用户档案。


10. Store 如何存取数据和语义搜索?

基础存取

namespace = ("user_123", "memories")
store.put(namespace, uuid, {"food_preference": "I like pizza"})
memories = store.search(namespace)

语义搜索(需配置 Embedding):

memories = store.search(
    namespace,
    query="What does the user like to eat?",  # 模糊匹配
    limit=3
)

参考答案

Store 使用元组 (用户 ID, 分类名) 作为命名空间。put() 存入数据,search() 检索数据。配置 Embedding 后,可通过自然语言进行语义搜索,打破精准关键字匹配的限制。


11. Streaming 有哪些模式?

模式 说明 适用场景
values 每步后的完整 State 监控完整状态变化
updates 每步后的State 更新 监控节点输出
messages LLM token 级别流式 实时展示 LLM 回复
custom 自定义数据流式 进度条、日志
debug 最全信息 调试场景

参考答案

Streaming 有七种模式:values(完整 State)、updates(State 更新)、messages(token 级流式)、custom(自定义数据)、checkpoints(检查点事件)、tasks(任务事件)、debug(最全信息)。常用的是 updates(监控节点输出)、messages(实时展示 LLM 输出)、custom(进度条)。


12. 如何实现 LLM Token 级别的流式输出?

使用 messages 模式:

for chunk in graph.stream(inputs, stream_mode="messages", version="v2"):
    if chunk["type"] == "messages":
        msg, metadata = chunk["data"]
        if msg.content:
            print(msg.content, end="", flush=True)

可通过 tagslanggraph_node 过滤特定节点或模型的输出。

参考答案

使用 messages 模式实现 LLM Token 级别流式输出。messages 模式返回 (msg, metadata) 二元组,可通过 tagslanggraph_node 过滤特定输出。


13. 什么是 Recursion Limit?如何设置?

Recursion Limit 限制图执行的最大步数,超过则抛出 GraphRecursionError

  • 默认限制:1000 步(v1.0.6+)
  • 设置方式config={"recursion_limit": 5}
  • 注意recursion_limit 是独立配置项,不放在 configurable

参考答案

Recursion Limit 限制图执行的最大步数,超过则抛出 GraphRecursionError。默认 1000 步,通过 config={"recursion_limit": 5} 设置。注意它是独立配置项,不放在 configurable 中。


14. 如何获取当前执行步数?RemainingSteps 的作用?

获取当前步数

current_step = config["metadata"]["langgraph_step"]

RemainingSteps(剩余步数追踪)

from langgraph.managed import RemainingSteps

class State(TypedDict):
    remaining_steps: RemainingSteps  # 自动填充

主动式 vs 被动式: - 主动式(RemainingSteps):达到限制前在图内主动降级 - 被动式:超过限制后在图外 try/catch

参考答案

通过 config["metadata"]["langgraph_step"] 获取当前执行步数。RemainingSteps 是自动追踪剩余步数的管理值,可在节点内根据剩余步数主动降级。主动式优势是优雅降级、图正常完成;被动式优势是实现简单。


15. 什么是 Context Schema?

Context Schema 用于向节点传递不属于 State 的运行时信息(如模型名称、数据库连接、用户信息等)。

@dataclass
class ContextSchema:
    llm_provider: str = "openai"

graph = StateGraph(State, context_schema=ContextSchema)
graph.invoke(inputs, context={"llm_provider": "anthropic"})

# 节点中访问
def node_a(state: State, runtime: Runtime[ContextSchema]):
    llm = get_llm(runtime.context.llm_provider)

参考答案

Context Schema 用于向节点传递不属于 State 的运行时信息,如配置、连接、用户信息等。定义方式是创建 dataclass 并传给 StateGraph 的 context_schema 参数,执行时通过 context 传入,节点中通过 runtime.context 访问。


八、Subgraphs(子图)

1. 子图的使用场景?

  • 多 Agent 系统:每个 Agent 是独立子图
  • 代码复用:通用逻辑封装为子图
  • 团队协作:不同团队开发独立子图

参考答案

子图是作为节点嵌入另一个图中的图,适用于多 Agent 系统、代码复用、团队协作开发。


2. 父子图通信有哪两种模式?

模式 适用场景 说明
节点内调用子图 父子图State Schema 不同 需编写包装函数,手动转换 State
子图作为节点添加 父子图共享 State key 直接将编译后的子图传给 add_node

参考答案

父子图通信有两种模式:第一,节点内调用子图,适用于父子图 State Schema 不同(无共享 key),需编写包装函数手动转换 State;第二,将子图作为节点添加,适用于父子图共享 State key 的场景,直接将编译后的子图传给 add_node。


3. 子图的持久化有哪三种模式?

模式 checkpointer 说明
Per-invocation(默认) None 每次调用重新开始
Per-thread True 跨调用累积状态
Stateless False 无持久化,不支持 interrupt

注意

Per-thread 模式下,同一子图不能在单次调用中并行执行多次。

参考答案

子图持久化有三种模式:Per-invocation(默认),每次调用重新开始;Per-thread,跨调用累积状态;Stateless,无持久化,不支持 interrupt。多 Agent 系统推荐 Per-invocation 模式,子 Agent 不需要记忆历史。


九、Multi-Agent(多 Agent 协作)

1. 为什么需要多 Agent 系统?

需求 说明
上下文管理 提供专门知识而不压垮上下文窗口
分布式开发 不同团队独立开发和维护能力
并行化 生成专门化的 worker 并发执行子任务

适用场景

  • 单个 Agent 工具过多,决策困难
  • 任务需要专门知识和大量上下文
  • 需要顺序约束,某些能力仅在特定条件后解锁

参考答案

多 Agent 系统解决三个问题:上下文管理(不压垮上下文窗口)、分布式开发(独立开发)、并行化(并发执行子任务)。适用场景:单个 Agent 工具过多决策困难、任务需要专门知识、需要顺序约束。


2. 多 Agent 有哪些协作模式?

模式 工作方式
Subagents(子代理) 主 Agent 将子 Agent 作为工具协调
Handoffs(交接) Agent 通过 Tool 调用转移控制权
Skills(技能) 单 Agent 按需加载专门化 prompt
Router(路由器) 分类输入后分发给专门 Agent
Custom Workflow 用 LangGraph 构建自定义执行流

参考答案

多 Agent 协作有五种模式:Subagents(子代理,主 Agent 将子 Agent 作为工具)、Handoffs(交接,通过 Tool 调用转移控制权)、Skills(技能,单 Agent 按需加载专门化 prompt)、Router(路由器,分类后分发)、Custom Workflow(自定义工作流)。


3. 如何实现 Subagents 模式?

官方推荐三步:

  1. 创建子 Agent
  2. 将子 Agent 包装为 Tool
  3. 创建主 Agent,挂载子 Agent 作为 Tool

特点

  • 主 Agent 作为唯一入口,统一调度
  • 子 Agent 彼此隔离,各自维护独立上下文
  • 每次调用子 Agent 都是从头开始
  • 支持并行调用多个不同的子 Agent

参考答案

实现 Subagents 模式分三步:创建子 Agent、将子 Agent 包装为 Tool、创建主 Agent 并挂载子 Agent 作为 Tool。特点是主 Agent 统一调度,子 Agent 彼此隔离,每次调用从头开始,支持并行调用。