💡 一句话总结:LangGraph 1.2 不是新功能堆砌,是把”在云函数被 SIGTERM 杀掉怎么办、LLM 流到一半外部 API 挂了怎么办、重试 3 次还失败需要发邮件通知 + 写补偿表怎么办”这三个问题做成了一等公民 API。
生产环境 Agent 的三种死法
如果你的 LangGraph 跑在 Kubernetes、Cloud Run 或 Lambda 上,你迟早会遇到这三种死法:
- 抢占式重启:节点滚动升级,Pod 收到 SIGTERM,30 秒后 SIGKILL。你的 Agent 跑到一半,state 丢了,用户那边看到”系统繁忙”。
- 僵尸节点:调用 OpenAI、Pinecone 或自家慢服务时,对方 socket 不断不连,aiohttp 干等 10 分钟。整个 graph 卡死,下游用户超时。
- 失败后无人收尾:第三方 webhook 重试 3 次都返回 500,graph 优雅退出了,但 charge_card 已经扣了钱,refund 没人触发,客服第二天才发现。
LangGraph 1.0/1.1 对这三个问题的答案都比较含糊:自己写 finally 块、自己埋 wait_for 超时、自己挂 PostgresSaver。LangGraph 1.2(2026-05-11 发布)把这三个场景提升为一等公民 API:error_handler、TimeoutPolicy、RunControl。
三件新武器
1. error_handler:重试耗尽后的兜底
from langgraph.graph import StateGraph
from langgraph.errors import NodeError
from langgraph.types import Command, RetryPolicy
async def charge_card_handler(error: NodeError, state: dict) -> Command:
# 重试 3 次都失败,跑补偿逻辑
await write_compensation_log(state["order_id"], error)
await notify_ops(error)
return Command(
update={"status": "charge_failed", "error": str(error)},
goto="refund_node", # 跳到回滚 node
)
graph = StateGraph(OrderState)
graph.add_node(
"charge",
charge_card,
retry_policy=RetryPolicy(max_attempts=3, backoff_factor=2.0),
error_handler=charge_card_handler,
)
关键点:error_handler 接收一个 typed NodeError(包含原始 exception、attempt 计数、node name),返回 Command 决定状态变更和下一跳。这就是 Saga/补偿模式的标准实现。
2. TimeoutPolicy:双层超时
from langgraph.types import TimeoutPolicy
graph.add_node(
"llm_summarize",
summarize_with_streaming,
timeout=TimeoutPolicy(
run_timeout=300, # 总时长不超过 5 分钟
idle_timeout=30, # 任何 30 秒没进展即视为挂死
),
)
run_timeout 是墙钟硬上限,idle_timeout 在每次 stream chunk 或 state write 时重置。对 LLM 流式场景,idle_timeout 是更合理的判活方式——整体可能要 4 分钟但只要在吐 token 就不该被杀。
触发超时时抛 NodeTimeoutError,被抛出的 attempt 的所有 state writes 会被清空,然后交给 retry policy 决定是否重试。
3. RunControl + request_drain():可恢复的优雅关闭
from langgraph.types import RunControl
run_control = RunControl()
# 在 graph 启动时把 run_control 注入
async def main():
async for event in graph.astream(
initial_state,
config={"configurable": {"thread_id": "user-123"}},
run_control=run_control,
):
yield event
# 在另一个线程 / signal handler 里触发优雅关闭
import signal
def handle_sigterm(signum, frame):
run_control.request_drain() # 等当前 superstep 跑完 + checkpoint 落盘后退出
signal.signal(signal.SIGTERM, handle_sigterm)
被 drain 时 graph 抛 GraphDrained,但 checkpoint 已经落盘。新 Pod 启动后用同样的 thread_id 调 graph.astream(),会从 checkpoint 恢复继续跑——对用户完全无感。
完整实战:抗故障的订单结算 Agent
下面是一个生产级的订单结算 graph,把上面三件武器都串起来:
from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, END
from langgraph.types import Command, RetryPolicy, TimeoutPolicy
from langgraph.errors import NodeError, NodeTimeoutError
from langgraph.checkpoint.postgres import PostgresSaver
class OrderState(TypedDict):
order_id: str
items: list
user_id: str
status: str
logs: Annotated[list, add]
error: str | None
# Node 1: 校验库存
async def validate_inventory(state: OrderState) -> dict:
result = await inventory_service.check(state["items"])
if not result.all_available:
return {"status": "out_of_stock", "logs": ["inventory_failed"]}
return {"status": "validated", "logs": ["inventory_ok"]}
# Node 2: 扣款(不可幂等,retry 必须 0)
async def charge_card(state: OrderState) -> dict:
txn_id = await payment_service.charge(
user_id=state["user_id"],
amount=calc_total(state["items"]),
idempotency_key=state["order_id"], # 用 order_id 做幂等键
)
return {"status": "charged", "logs": [f"charged:{txn_id}"]}
# Node 3: LLM 生成订单确认邮件(流式,用 idle_timeout)
async def generate_email(state: OrderState):
async for chunk in llm.astream(build_email_prompt(state)):
yield {"logs": [chunk]}
# Node 4: 发货
async def ship_order(state: OrderState) -> dict:
tracking = await shipping_service.create(state["order_id"])
return {"status": "shipped", "logs": [f"tracking:{tracking}"]}
# error_handler:扣款失败的补偿
async def charge_failed_handler(error: NodeError, state: OrderState) -> Command:
await ops_alert.send(
f"Order {state['order_id']} charge failed after {error.attempt} attempts: {error}"
)
return Command(
update={"status": "charge_failed", "error": str(error)},
goto=END,
)
# error_handler:发货失败需要 refund
async def ship_failed_handler(error: NodeError, state: OrderState) -> Command:
await payment_service.refund(state["order_id"])
await ops_alert.send(f"Order {state['order_id']} shipped failed, refunded: {error}")
return Command(
update={"status": "refunded", "error": str(error)},
goto=END,
)
# 组装 graph
graph = StateGraph(OrderState)
graph.add_node(
"validate",
validate_inventory,
retry_policy=RetryPolicy(max_attempts=3, backoff_factor=1.5),
timeout=TimeoutPolicy(run_timeout=10),
)
graph.add_node(
"charge",
charge_card,
retry_policy=RetryPolicy(max_attempts=1), # 不可幂等,只试一次
timeout=TimeoutPolicy(run_timeout=30),
error_handler=charge_failed_handler,
)
graph.add_node(
"email",
generate_email,
retry_policy=RetryPolicy(max_attempts=2),
timeout=TimeoutPolicy(run_timeout=120, idle_timeout=20), # LLM 用双超时
)
graph.add_node(
"ship",
ship_order,
retry_policy=RetryPolicy(max_attempts=5, backoff_factor=2.0),
timeout=TimeoutPolicy(run_timeout=60),
error_handler=ship_failed_handler, # 重要:发货失败必须 refund
)
graph.add_edge("validate", "charge")
graph.add_edge("charge", "email")
graph.add_edge("email", "ship")
graph.add_edge("ship", END)
graph.set_entry_point("validate")
# 编译,挂 PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = graph.compile(checkpointer=checkpointer)
这个 graph 同时具备:
- 库存校验失败自动重试 3 次(10 秒超时)
- 扣款只试一次,失败走补偿(30 秒超时)
- 邮件生成 LLM 流式 + 双层超时(120s 总 / 20s 空闲)
- 发货失败重试 5 次,最终失败自动 refund
Kubernetes 集成:preStop + checkpoint 恢复
spec:
terminationGracePeriodSeconds: 90
containers:
- name: agent
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- "curl -X POST http://localhost:8080/internal/drain && sleep 60"
服务端:
from fastapi import FastAPI
app_http = FastAPI()
active_runs: dict[str, RunControl] = {}
@app_http.post("/internal/drain")
async def drain():
for run_control in active_runs.values():
run_control.request_drain()
return {"draining": len(active_runs)}
新 Pod 启动后只要 thread_id 一致,调 app.astream(state, config={"configurable": {"thread_id": "user-123"}}) 就会从 checkpoint 恢复继续跑。
监控:四个必须埋的指标
from prometheus_client import Counter, Histogram
node_errors = Counter("langgraph_node_errors_total", "Node errors", ["node", "error_type"])
node_timeouts = Counter("langgraph_node_timeouts_total", "Node timeouts", ["node", "timeout_type"])
error_handler_fired = Counter("langgraph_error_handler_fired_total", "Error handler fires", ["node"])
drain_events = Counter("langgraph_drain_events_total", "Graceful drains")
# 在 error_handler 里埋点
async def charge_failed_handler(error, state):
error_handler_fired.labels(node="charge").inc()
# ...
报警规则建议:
error_handler_fired{node="charge"}5 分钟内 > 5 次:扣款异常波动,立即查node_timeouts{timeout_type="idle"}暴涨:LLM 或下游慢drain_events触发后 30 秒内有未恢复的 thread_id:checkpoint 出问题
升级清单
pip install -U "langgraph>=1.2.0" "langgraph-prebuilt>=1.0.2" "langgraph-checkpoint-postgres>=2.0.0"
如果你之前用 RecursionLimit 控制超时,建议迁移到 TimeoutPolicy:RecursionLimit 防的是死循环,TimeoutPolicy 防的是慢调用,两件事。
如果你之前自己写过 finally 块做错误兜底,可以把它挪到 error_handler 里——好处是会自动写入 checkpoint,新 Pod 恢复时不会重跑补偿。
写在最后
LangGraph 1.2 把”生产 Agent”和”demo Agent”的距离明显缩短了。Saga/补偿、双超时、可恢复优雅关闭这三件事过去要靠 Celery + Temporal + 自己写状态机,现在一个 graph DSL 全搞定。如果你的 LangGraph 还在跑 1.0 或 1.1,这次升级值得花一天时间做完。