Workshop

Mistral Workflows 实战:用 Temporal 引擎在 30 分钟把 AI 流程从 PoC 推到生产

5 min read ·

💡 一句话总结:Mistral Workflows 不是又一个 Agent 框架,是把 Netflix / Stripe 早就在用的 Temporal durable execution 引擎给 AI 流程做了 LLM-native 封装。LangGraph 是『可用』,Workflows 是『可生产』。

为什么需要又一个编排引擎

过去 12 个月,我们看着团队拿 LangGraph 跑的 Agent 频繁出问题:

LangGraph 1.2 在 5/14 加了 per-node timeout、error recovery、graceful shutdown,但底层仍是『进程内状态』模型——只要 K8s 把 pod 杀掉,进程外的状态就只能祈祷 checkpoint 写全了。

Mistral 的判断是:AI 流程已经到了需要 Temporal 这种 durable execution 引擎托底的阶段。4 月 28 日他们把 Workflows 推向公开预览时,Le Chat 已经在生产跑了几个月,单日执行数百万次。

30 分钟跑通一个发票审批流

我们从一个真实场景起步:每天上千张发票需要解析、和 ERP 对账、超过阈值送人工审批、最终入账。这种流程有三个让传统编排崩溃的点:

  1. 长持续:等待人工审批可能 8 小时也可能 3 天
  2. 混合 IO:要调 LLM(解析)、ERP API(查 PO)、Slack(通知)、数据库(落账)
  3. 必须可重放:审计要求每张发票的处理路径都能追溯

项目初始化

pip install mistralai mistralai-workflows temporalio
mistral-workflows init invoice-approval
cd invoice-approval

CLI 会生成:

invoice-approval/
├── workflows/
│   └── invoice.py          # 你的 workflow 定义
├── activities/
│   ├── llm.py              # LLM 调用 activity
│   ├── erp.py              # ERP 接入 activity
│   └── notify.py           # 通知 activity
├── helm/
│   └── values.yaml         # Worker 部署
├── studio.yaml             # Studio 注册元数据
└── .env.example

写第一个 workflow

from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy

with workflow.unsafe.imports_passed_through():
    from activities.llm import parse_invoice
    from activities.erp import lookup_po, post_to_gl
    from activities.notify import slack_request_approval

@workflow.defn
class InvoiceApproval:
    @workflow.run
    async def run(self, invoice_url: str) -> dict:
        retry_3x = RetryPolicy(maximum_attempts=3)

        parsed = await workflow.execute_activity(
            parse_invoice,
            invoice_url,
            start_to_close_timeout=timedelta(minutes=2),
            retry_policy=retry_3x,
        )

        po = await workflow.execute_activity(
            lookup_po,
            parsed["po_number"],
            start_to_close_timeout=timedelta(seconds=10),
        )

        if parsed["amount"] > 10000:
            decision = await workflow.execute_activity(
                slack_request_approval,
                {"invoice": parsed, "po": po},
                start_to_close_timeout=timedelta(hours=72),
            )
            if decision != "approved":
                return {"status": "rejected", "reason": decision}

        gl_entry = await workflow.execute_activity(
            post_to_gl,
            {"invoice": parsed, "po": po},
            start_to_close_timeout=timedelta(seconds=30),
        )

        return {"status": "posted", "gl_id": gl_entry["id"]}

注意三个关键点:

activity 实现示例

from temporalio import activity
from mistralai import Mistral

mistral = Mistral(api_key=os.environ["MISTRAL_API_KEY"])

@activity.defn
async def parse_invoice(invoice_url: str) -> dict:
    activity.logger.info(f"parsing {invoice_url}")
    resp = await mistral.chat.complete_async(
        model="mistral-large-2026",
        messages=[
            {"role": "system", "content": "提取发票字段,返回 JSON"},
            {"role": "user", "content": [
                {"type": "document_url", "document_url": invoice_url}
            ]},
        ],
        response_format={"type": "json_object"},
    )
    return json.loads(resp.choices[0].message.content)

activity 可以失败、可以重试、可以超时,但每次重试都是『从 activity 边界重新开始』,workflow 主流程的状态不会丢。

部署 worker 到 K8s

# helm/values.yaml
worker:
  image: registry.company.com/invoice-approval:0.1.0
  replicas: 3
  env:
    MISTRAL_API_KEY:
      valueFrom:
        secretKeyRef:
          name: mistral-keys
          key: api-key
    ERP_BASE_URL: https://erp.internal.company.com
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
helm install invoice-worker mistral-workflows/worker -f helm/values.yaml

worker 启动后会自动连接 Mistral 托管的 Temporal cluster,注册自己能处理的 task queue。Mistral 的控制平面看到 worker 上线,就把所有标记 invoice-approval 的 workflow 实例分发过来。

Studio 里发生了什么

打开 studio.mistral.ai,进入 Workflows tab,能看到刚部署的 InvoiceApproval。每次执行的页面包含:

最有用的是『signal』和『query』两个接口。signal 让外部系统(比如 Slack bot)能往运行中的 workflow 发消息(『审批结果是 approved』);query 让运维能问『这个 workflow 现在在哪一步』而不影响主流程。

Le Chat 集成:让业务人员自然语言触发

studio.yaml 里声明:

name: invoice-approval
display_name: 发票审批
description: 自动解析发票、对账、超阈值审批、入账
trigger:
  type: chat_action
  examples:
    - "帮我跑一遍今天的发票审批"
    - "审批这张发票 https://files/inv-2026-05-001.pdf"
inputs:
  - name: invoice_url
    type: url
    description: 发票 PDF 链接
permissions:
  roles:
    - finance.operator
    - finance.manager

部署后业务人员在 Le Chat 里输入『审批这张发票 https://…』,Le Chat 自动识别意图、参数提取、触发 workflow、流式显示执行进度。审批环节的 Slack 通知也会附带『回到 Le Chat 查看完整 timeline』的链接。

容错策略的三层防御

生产部署后我们踩过的坑,浓缩成三条 best practice:

第一层:activity 内部用 RetryPolicy 处理瞬时故障

RetryPolicy(
    initial_interval=timedelta(seconds=1),
    backoff_coefficient=2.0,
    maximum_attempts=5,
    non_retryable_error_types=["InvalidInvoiceFormat"],
)

业务校验错误(发票格式不对)不要重试,会浪费 LLM tokens;网络/超时错误指数退避重试。

第二层:workflow 主流程用 try/except 处理业务异常

try:
    parsed = await workflow.execute_activity(parse_invoice, ...)
except ActivityError as e:
    await workflow.execute_activity(
        notify_ops, f"发票解析失败:{e}",
        start_to_close_timeout=timedelta(seconds=10),
    )
    return {"status": "manual_review", "reason": str(e)}

第三层:用 saga 模式做补偿

如果『入账』成功但『更新 ERP 状态』失败,需要回滚入账。Workflows 推荐显式写补偿 activity 而不是依赖事务:

compensations = []
try:
    gl_id = await workflow.execute_activity(post_to_gl, ...)
    compensations.append(lambda: workflow.execute_activity(reverse_gl, gl_id))

    await workflow.execute_activity(update_erp_status, parsed["po_number"])
except Exception:
    for comp in reversed(compensations):
        await comp()
    raise

成本和性能账

跑了一个月的真实数据(每天约 800 张发票):

指标LangGraph 方案(迁移前)Workflows 方案
流程成功率91.2%99.6%
平均端到端耗时4.2 min3.8 min
因 worker 重启重跑成本$180/月$0
人工干预次数68 次/月9 次/月
月固定开销$0$1200(Mistral Pro 套餐 + Helm worker EC2)
月可变成本$850$620
综合月成本$850 + 工程师 30h debug$1820 + 工程师 4h 运维

按工程师 $80/h 算,LangGraph 方案隐性成本 $3250,Workflows 方案 $2140,单月节省 $1100,年化 $13K。规模到日万级发票后差距会进一步拉大。

什么时候不该用 Workflows

不是所有 AI 流程都需要 durable execution:

判断标准简单:『如果这个流程跑失败一次,业务方会要求你排查到底跑了什么』,就该上 Workflows。

下一步

Mistral 在 5/22 announce 了 Workflows 的几个新特性:

要上手,先从 mistral-workflows init 跑个 hello-world,再 fork 一个模板改造,比从零搭强。文档在 mistral.ai/docs/workflows,社区频道在 Discord 的 #workflows 子频道。

Agent 框架的『可用性竞赛』2024-2025 卷完了,2026 的关键词是『可生产』。Workflows 是这条路线上目前最清晰的答案。

Frequently asked questions

Mistral Workflows 和 LangGraph、CrewAI、Mastra 这些 Agent 框架的本质区别?
定位差一档。LangGraph / CrewAI / Mastra 是『单进程内的 Agent 编排』,状态在 Python/JS 进程里,挂了就丢;它们靠 checkpoint 半持久化但语义不强,重试/补偿/调度都要自己写。Mistral Workflows 是『durable execution 平台』,每个 step 的输入输出和异常都被 Temporal 持久化到 event store,进程挂掉重启会自动从断点恢复,比 LangGraph 1.2 的 error recovery 强一档。代价是写法不再是 Python async 函数随便加 LLM 调用,每个 step 必须是幂等的 @activity 函数,对开发者纪律要求更高。简单 demo 用 Mastra/LangGraph,跑 7×24 业务流必须 Temporal 级。
为什么选 Temporal 而不是 Airflow / Prefect / Dagster?
Airflow / Dagster 是 batch 任务调度器,时间粒度是分钟到天,不适合需要秒级响应、长会话保持的 AI 业务流。Prefect 介于两者之间但 stateful 流程仍然偏弱。Temporal 的核心优势是『durable function』——开发者写起来像普通 Python 函数,框架自动把每行代码的执行记录写入 event log,挂了重启时 replay 到断点继续。这对 AI 流程关键:一个 LLM 调用 200 美元的成本不能因为 worker 重启就重新跑。Mistral 在 Temporal 之上加了 LLM streaming、payload handling、multi-tenancy、observability,覆盖了 vanilla Temporal 的 AI 短板。
控制平面在 Mistral、数据平面在自己 K8s,敏感数据真的不出企业网络吗?
对。架构上 Mistral 只看到 workflow 的 metadata(哪个 step 在跑、状态、时间戳),实际 payload(document content、LLM input/output)通过 Mistral 提供的 codec 接口本地编码后传输,可选 envelope encryption。worker 跑在你的 Helm chart 部署的 pod 里,调 LLM 时可以选 Mistral 私有部署的 endpoint、Azure OpenAI、本地 vLLM、Anthropic API,控制权完全在你这边。La Banque Postale 这类金融客户的选型就是『Mistral 托管控制平面 + 私有 Mistral Large 部署 + worker on-prem』,审计 / GDPR / DORA 三关都过。
workflow 一旦写成 Python 代码就不能让业务人员动了,怎么平衡?
Mistral 的解法是『开发者写、业务人员调用』。workflow 由开发者用 Python 写好、注册到 Studio;Studio 把它暴露为 Le Chat 里的一个『action』,业务人员通过自然语言触发(『帮我跑一遍 5 月发票审批流』)。如果业务规则要变(比如审批阈值从 1 万改成 5 万),可以在 workflow 里抽出 config 参数,业务人员在 Studio UI 里改,无需开发者改代码部署。更复杂的逻辑变更仍然走 Git PR + CI,但日常 99% 的运营调整都能不动代码完成。这是『低代码 + 强工程』的合理边界。
迁移现有 LangGraph / CrewAI 项目到 Workflows,工作量大概多大?
看 Agent 复杂度。三条经验:(1) 简单线性流(fetch→summarize→write_db)改造 1-2 天,把每个 node 改成 @activity 即可,控制流不变;(2) 包含子 Agent 调用的多层流程要 3-7 天,因为 LangGraph 的 ConditionalEdge / Send 需要重写为 Temporal 的 workflow.execute_child_workflow;(3) 重度依赖 LangGraph checkpoint 中间状态的项目要 1-2 周,因为 Temporal 的状态语义是『每次 replay 必须 deterministic』,任何非确定性操作(datetime.now、random、外部 IO)必须包成 activity。建议新项目直接上 Workflows,老项目按价值优先级渐进迁移,先把『跑挂代价高』的关键流程迁过去。
// next.txt ›

Some outbound links in this post are affiliate links — see disclosure.