Workshop

LangGraph 1.2 实战:用 error_handler + per-node timeout 把 Agent 容错率拉到生产级

3 min read ·

💡 一句话总结:LangGraph 1.2 不是新功能堆砌,是把”在云函数被 SIGTERM 杀掉怎么办、LLM 流到一半外部 API 挂了怎么办、重试 3 次还失败需要发邮件通知 + 写补偿表怎么办”这三个问题做成了一等公民 API。

生产环境 Agent 的三种死法

如果你的 LangGraph 跑在 Kubernetes、Cloud Run 或 Lambda 上,你迟早会遇到这三种死法:

  1. 抢占式重启:节点滚动升级,Pod 收到 SIGTERM,30 秒后 SIGKILL。你的 Agent 跑到一半,state 丢了,用户那边看到”系统繁忙”。
  2. 僵尸节点:调用 OpenAI、Pinecone 或自家慢服务时,对方 socket 不断不连,aiohttp 干等 10 分钟。整个 graph 卡死,下游用户超时。
  3. 失败后无人收尾:第三方 webhook 重试 3 次都返回 500,graph 优雅退出了,但 charge_card 已经扣了钱,refund 没人触发,客服第二天才发现。

LangGraph 1.0/1.1 对这三个问题的答案都比较含糊:自己写 finally 块、自己埋 wait_for 超时、自己挂 PostgresSaver。LangGraph 1.2(2026-05-11 发布)把这三个场景提升为一等公民 API:error_handlerTimeoutPolicyRunControl

三件新武器

1. error_handler:重试耗尽后的兜底

from langgraph.graph import StateGraph
from langgraph.errors import NodeError
from langgraph.types import Command, RetryPolicy

async def charge_card_handler(error: NodeError, state: dict) -> Command:
    # 重试 3 次都失败,跑补偿逻辑
    await write_compensation_log(state["order_id"], error)
    await notify_ops(error)
    return Command(
        update={"status": "charge_failed", "error": str(error)},
        goto="refund_node",  # 跳到回滚 node
    )

graph = StateGraph(OrderState)
graph.add_node(
    "charge",
    charge_card,
    retry_policy=RetryPolicy(max_attempts=3, backoff_factor=2.0),
    error_handler=charge_card_handler,
)

关键点:error_handler 接收一个 typed NodeError(包含原始 exception、attempt 计数、node name),返回 Command 决定状态变更和下一跳。这就是 Saga/补偿模式的标准实现。

2. TimeoutPolicy:双层超时

from langgraph.types import TimeoutPolicy

graph.add_node(
    "llm_summarize",
    summarize_with_streaming,
    timeout=TimeoutPolicy(
        run_timeout=300,   # 总时长不超过 5 分钟
        idle_timeout=30,   # 任何 30 秒没进展即视为挂死
    ),
)

run_timeout 是墙钟硬上限,idle_timeout 在每次 stream chunk 或 state write 时重置。对 LLM 流式场景,idle_timeout 是更合理的判活方式——整体可能要 4 分钟但只要在吐 token 就不该被杀。

触发超时时抛 NodeTimeoutError,被抛出的 attempt 的所有 state writes 会被清空,然后交给 retry policy 决定是否重试。

3. RunControl + request_drain():可恢复的优雅关闭

from langgraph.types import RunControl

run_control = RunControl()

# 在 graph 启动时把 run_control 注入
async def main():
    async for event in graph.astream(
        initial_state,
        config={"configurable": {"thread_id": "user-123"}},
        run_control=run_control,
    ):
        yield event

# 在另一个线程 / signal handler 里触发优雅关闭
import signal
def handle_sigterm(signum, frame):
    run_control.request_drain()  # 等当前 superstep 跑完 + checkpoint 落盘后退出
signal.signal(signal.SIGTERM, handle_sigterm)

被 drain 时 graph 抛 GraphDrained,但 checkpoint 已经落盘。新 Pod 启动后用同样的 thread_idgraph.astream(),会从 checkpoint 恢复继续跑——对用户完全无感。

完整实战:抗故障的订单结算 Agent

下面是一个生产级的订单结算 graph,把上面三件武器都串起来:

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, END
from langgraph.types import Command, RetryPolicy, TimeoutPolicy
from langgraph.errors import NodeError, NodeTimeoutError
from langgraph.checkpoint.postgres import PostgresSaver

class OrderState(TypedDict):
    order_id: str
    items: list
    user_id: str
    status: str
    logs: Annotated[list, add]
    error: str | None

# Node 1: 校验库存
async def validate_inventory(state: OrderState) -> dict:
    result = await inventory_service.check(state["items"])
    if not result.all_available:
        return {"status": "out_of_stock", "logs": ["inventory_failed"]}
    return {"status": "validated", "logs": ["inventory_ok"]}

# Node 2: 扣款(不可幂等,retry 必须 0)
async def charge_card(state: OrderState) -> dict:
    txn_id = await payment_service.charge(
        user_id=state["user_id"],
        amount=calc_total(state["items"]),
        idempotency_key=state["order_id"],  # 用 order_id 做幂等键
    )
    return {"status": "charged", "logs": [f"charged:{txn_id}"]}

# Node 3: LLM 生成订单确认邮件(流式,用 idle_timeout)
async def generate_email(state: OrderState):
    async for chunk in llm.astream(build_email_prompt(state)):
        yield {"logs": [chunk]}

# Node 4: 发货
async def ship_order(state: OrderState) -> dict:
    tracking = await shipping_service.create(state["order_id"])
    return {"status": "shipped", "logs": [f"tracking:{tracking}"]}

# error_handler:扣款失败的补偿
async def charge_failed_handler(error: NodeError, state: OrderState) -> Command:
    await ops_alert.send(
        f"Order {state['order_id']} charge failed after {error.attempt} attempts: {error}"
    )
    return Command(
        update={"status": "charge_failed", "error": str(error)},
        goto=END,
    )

# error_handler:发货失败需要 refund
async def ship_failed_handler(error: NodeError, state: OrderState) -> Command:
    await payment_service.refund(state["order_id"])
    await ops_alert.send(f"Order {state['order_id']} shipped failed, refunded: {error}")
    return Command(
        update={"status": "refunded", "error": str(error)},
        goto=END,
    )

# 组装 graph
graph = StateGraph(OrderState)

graph.add_node(
    "validate",
    validate_inventory,
    retry_policy=RetryPolicy(max_attempts=3, backoff_factor=1.5),
    timeout=TimeoutPolicy(run_timeout=10),
)

graph.add_node(
    "charge",
    charge_card,
    retry_policy=RetryPolicy(max_attempts=1),  # 不可幂等,只试一次
    timeout=TimeoutPolicy(run_timeout=30),
    error_handler=charge_failed_handler,
)

graph.add_node(
    "email",
    generate_email,
    retry_policy=RetryPolicy(max_attempts=2),
    timeout=TimeoutPolicy(run_timeout=120, idle_timeout=20),  # LLM 用双超时
)

graph.add_node(
    "ship",
    ship_order,
    retry_policy=RetryPolicy(max_attempts=5, backoff_factor=2.0),
    timeout=TimeoutPolicy(run_timeout=60),
    error_handler=ship_failed_handler,  # 重要:发货失败必须 refund
)

graph.add_edge("validate", "charge")
graph.add_edge("charge", "email")
graph.add_edge("email", "ship")
graph.add_edge("ship", END)
graph.set_entry_point("validate")

# 编译,挂 PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = graph.compile(checkpointer=checkpointer)

这个 graph 同时具备:

Kubernetes 集成:preStop + checkpoint 恢复

spec:
  terminationGracePeriodSeconds: 90
  containers:
    - name: agent
      lifecycle:
        preStop:
          exec:
            command:
              - /bin/sh
              - -c
              - "curl -X POST http://localhost:8080/internal/drain && sleep 60"

服务端:

from fastapi import FastAPI

app_http = FastAPI()
active_runs: dict[str, RunControl] = {}

@app_http.post("/internal/drain")
async def drain():
    for run_control in active_runs.values():
        run_control.request_drain()
    return {"draining": len(active_runs)}

新 Pod 启动后只要 thread_id 一致,调 app.astream(state, config={"configurable": {"thread_id": "user-123"}}) 就会从 checkpoint 恢复继续跑。

监控:四个必须埋的指标

from prometheus_client import Counter, Histogram

node_errors = Counter("langgraph_node_errors_total", "Node errors", ["node", "error_type"])
node_timeouts = Counter("langgraph_node_timeouts_total", "Node timeouts", ["node", "timeout_type"])
error_handler_fired = Counter("langgraph_error_handler_fired_total", "Error handler fires", ["node"])
drain_events = Counter("langgraph_drain_events_total", "Graceful drains")

# 在 error_handler 里埋点
async def charge_failed_handler(error, state):
    error_handler_fired.labels(node="charge").inc()
    # ...

报警规则建议:

升级清单

pip install -U "langgraph>=1.2.0" "langgraph-prebuilt>=1.0.2" "langgraph-checkpoint-postgres>=2.0.0"

如果你之前用 RecursionLimit 控制超时,建议迁移到 TimeoutPolicyRecursionLimit 防的是死循环,TimeoutPolicy 防的是慢调用,两件事。

如果你之前自己写过 finally 块做错误兜底,可以把它挪到 error_handler 里——好处是会自动写入 checkpoint,新 Pod 恢复时不会重跑补偿。

写在最后

LangGraph 1.2 把”生产 Agent”和”demo Agent”的距离明显缩短了。Saga/补偿、双超时、可恢复优雅关闭这三件事过去要靠 Celery + Temporal + 自己写状态机,现在一个 graph DSL 全搞定。如果你的 LangGraph 还在跑 1.0 或 1.1,这次升级值得花一天时间做完。

Frequently asked questions

LangGraph 1.2 和 1.0/1.1 的兼容性怎么样?老代码要改吗?
1.2 是向后兼容的 minor 升级,绝大多数 1.0/1.1 代码不需要改。需要注意的两点:(1) langgraph-prebuilt 1.0.2 引入了硬性版本约束,如果你 pin 死了 langgraph 1.0.x,需要同步升级;(2) interrupt() 的语义在 1.2 收紧了,之前 race condition 下可能漏掉的 interrupt 现在会准确触发,如果你之前用 sleep 绕过过这个 bug 的话,可以删掉了。错误处理器、timeout、graceful shutdown 三个新 API 都是纯增量,不影响老 graph。
error_handler 和 retry policy 是什么关系?应该用哪个?
两者互补不冲突。retry policy 决定一次 node 调用失败后重试几次(默认 0 次,可设最大次数和退避策略),error_handler 是在所有重试都失败后才会触发的兜底函数。典型组合是:对幂等的 LLM 调用配 retry=3 + 指数退避,error_handler 里跑补偿逻辑(写补偿事务、通知下游、跳到回滚 node)。如果 node 本身不可幂等(比如 charge_card),retry 设 0,所有失败逻辑都在 error_handler 里处理。
run_timeout 和 idle_timeout 怎么区分?什么时候用哪个?
run_timeout 是单次 node 调用的硬性墙钟上限,从开始计时到完成,超了就 NodeTimeoutError。idle_timeout 是空闲超时,每次 node 产出新的 stream chunk 或 state 写入时会重置计时器。LLM 流式输出场景几乎都用 idle_timeout(10-30 秒比较合理),因为整体生成可能要几分钟但只要还在吐 token 就不算挂;同步工具调用(数据库查询、HTTP 调用)用 run_timeout 即可。两者可以同时设,TimeoutPolicy(run_timeout=300, idle_timeout=30) 表示总时长不超过 5 分钟、且任何 30 秒没进展就视为挂死。
graceful shutdown 在 Kubernetes 环境怎么集成?preStop 钩子要写什么?
在 Pod 的 preStop 钩子里调一个内部 HTTP 接口触发 RunControl.request_drain(),然后让 terminationGracePeriodSeconds 给足时间(比如 60 秒),LangGraph 会等当前 superstep 跑完、checkpoint 落盘后再优雅退出。新启动的 Pod 会从 checkpoint 恢复 thread_id 对应的运行状态,对用户无感。注意 checkpoint backend 必须是持久化的(PostgreSQL、Redis with persistence、S3),不能用 InMemorySaver,否则 Pod 重启会丢状态。
DeltaChannel 和普通 Channel 性能差多少?什么场景必须用?
DeltaChannel 只在 state 变化时把 diff 写入 checkpoint 而不是全量写,对于大 state(消息历史长、工具输出大)能减少 60%-90% 的 checkpoint 大小和写入耗时。压力测试里 1MB state 的 graph 每秒处理能力从 25 req/s 提升到 180 req/s。两个场景必须用:(1) state 里挂了完整对话历史的 chatbot;(2) state 里缓存了 RAG 检索结果或文件内容的 retrieval agent。短链路(state < 10KB)用普通 Channel 就行,DeltaChannel 的 diff 计算反而是开销。
// next.txt ›

Some outbound links in this post are affiliate links — see disclosure.