2.4
Part II / Ship · From laptop to cloud

部署(Deployment):智能体不适合普通的 Web API。

你的智能体(agent)在 REPL 中运行正常。现在它需要为真实用户提供服务、在崩溃后恢复运行、在不均匀的负载下弹性扩容,并在出现问题时优雅降级。智能体具有特殊的形态——运行时间长、有状态、每次请求成本(cost)高、容易出现意外——而针对普通 Web 服务的标准建议在与它们碰撞时会以特定的方式失效。本章讲解真正有效的模式:用于快速运行的同步端点配合 SSE 流式传输、用于长时运行的队列驱动工作进程、状态存储分离方案,以及防止账单在凌晨三点爆炸的运营护栏(guardrail)。

STEP 1

形态问题。

普通 Web 服务具有可预测的形态。一个请求进来,一些代码执行,一个响应发出。延迟(latency)通常在一秒以内;如果不是,那就是哪里出了问题。瓶颈是 CPU,会话是无状态的,水平扩展意味着"添加更多副本"。以上这些描述的都不是智能体。

智能体是 I/O 密集型的(等待模型 API),可以运行数分钟乃至数小时,在多轮对话中保持会话状态,每次请求花费真实金钱,并且表现出常规服务中不会出现的故障模式——模型服务中断、速率限制、工具调用(tool use)无限循环。如果你拿标准套路("把代码包进 Flask,放在 nginx 后面,按 CPU 扩容")直接套用,还没开始发布就已经在三个具体方面犯了错。

会出问题的三件事

问题一:长时运行期间 HTTP 超时。浏览器、负载均衡器和 CDN 都有请求超时——通常是浏览器 30 秒、ALB 60 秒、Cloudflare 免费套餐 100 秒。你的智能体在深度调研任务上可能需要运行 5 分钟。如果你在整个过程中保持 HTTP 连接开启,连接会在运行中途断开,用户什么都得不到。标准的"同步请求/响应"模式与需要运行数分钟的智能体不兼容。

问题二:WSGI 工作进程阻塞在模型调用上。Flask、Django、Rails——所有基于 WSGI 的框架——都为每个请求分配一个 OS 线程。在等待模型 API 时,该线程处于阻塞状态。四个工作进程,四个并发用户;第五个用户要在队列里等待。对于一个 95% 时间都在执行 await client.messages.create(...) 的典型智能体而言,这意味着你 95% 的工作进程容量处于空闲状态,同时队列不断增长。标准的"增加更多工作进程"方案并无帮助,因为每个工作进程仍然在网络调用上阻塞。

问题三:基于 CPU 的自动扩容从不触发。智能体服务器上的 CPU 利用率通常低于 10%。瓶颈不是计算——而是并发中的模型调用数量、延迟预算消耗和队列深度。标准的基于 CPU 的自动扩容永远不会触发。你要么手动调整固定的副本数(并为峰值过度配置),要么找到不同的扩容信号。

决策树

解决这三个问题的方案是同一个思路:让你的部署形态匹配智能体的预期运行时长。三种模式几乎覆盖了所有情况。

┌────────────────────────────────────────────────────────────────┐ │ WHICH PATTERN DO I NEED? │ │ │ │ Q1: How long does a typical agent run take? │ │ │ │ < 30 seconds │ │ ↓ │ │ Pattern A: SYNCHRONOUS WITH STREAMING │ │ Single FastAPI endpoint, async/await, stream tokens │ │ back over SSE. HTTP stays open the whole time. │ │ Q2 below. │ │ │ │ 30 seconds – 5 minutes │ │ ↓ │ │ Still works synchronously — IF you control the whole │ │ network path (no Cloudflare, no aggressive timeouts) and │ │ tune your reverse proxy. Otherwise → Pattern B. │ │ │ │ > 5 minutes (deep research, multi-stage workflows) │ │ ↓ │ │ Pattern B: SUBMIT / POLL with a QUEUE │ │ POST /runs returns immediately with a run_id. │ │ Worker pulls from queue, executes, streams updates to │ │ Redis/Postgres. Client subscribes to updates by run_id. │ │ │ │ Q2: Do I need server-side state across multiple requests? │ │ (Conversation history, user preferences, mid-run memory) │ │ │ │ Stateless (one-shot questions) │ │ ↓ │ │ Done. Each request is self-contained. │ │ │ │ Stateful (conversations, sessions) │ │ ↓ │ │ Pattern C: STATE STORE SPLIT │ │ Postgres for durability (the truth). │ │ Redis for hot path (sub-millisecond reads on each turn). │ │ See Step 4 for the schema. │ └────────────────────────────────────────────────────────────────┘

大多数生产环境中的智能体最终会组合使用——模式 A 用于短时交互运行,模式 B 用于长时后台运行,共享同一套模式 C 的状态存储。本章将依次讲解每种模式。

从一开始就采用异步的承诺

在讨论任何模式之前,先说一点:选 FastAPI(或 Starlette,或任何 ASGI 框架),而不是 Flask。对于智能体而言,这是不可商量的。原因在于 I/O 密集型的本质:FastAPI 工作进程可以在单个进程中并发保持数千个处于飞行中的 await client.messages.create(...) 调用,因为每个调用在等待时都会让出控制权。同样的工作负载在 Flask 上则需要数千个 OS 线程,这根本不可行。

心智模型:使用 ASGI 时,你的并发限制由你的 API 提供商的速率限制决定,而不是由你的服务器工作进程数决定。这才是正确的形态。

Question
我的智能体目前在一个 Flask 应用中运行,低流量下工作正常。我真的需要迁移吗?

如果你的并发用户少于 5 个且运行时间在几秒以内,Flask 没问题。当以下任一条件成立时,迁移就变得紧迫了:并发用户数超过约 20 人;你的运行偶尔需要超过 30 秒;你开始向客户端流式传输令牌(token);你需要高效地并行执行工具调用。低于这个阈值时,迁移的工程成本超过收益。

说实话:大多数团队发现自己比预期更早需要 ASGI,因为流量在增长,或者运行时间在增长,或者两者兼有。在 5 个用户时完成一次迁移,比在 50 个用户时焦头烂额地迁移要便宜得多。

Question
我需要 Kubernetes 才能做到这些吗?

对于初始部署,几乎肯定不需要。Fly.io、Railway、Render、Modal、AWS App Runner——这些平台都允许你用一条命令部署容器化的 FastAPI 应用,并且处理好那些无聊的部分(HTTPS、负载均衡、基本自动扩容)。对于一个为每天数千用户提供服务的单团队智能体而言,这是合适的抽象层级。

当你真正需要以下功能时再考虑 Kubernetes:多个共享基础设施的智能体服务、对服务间网络的控制、复杂的部署拓扑(金丝雀、蓝绿),或者公司本来就跑在 Kubernetes 上。不要仅仅因为它是规模化公司的默认答案就去用它。

STEP 2

模式 A:带 SSE 流式传输的同步端点。

对于在 30 秒内完成的运行,最简单的生产级部署是一个 FastAPI 端点:运行智能体,并通过服务器发送事件(Server-Sent Events,SSE)将更新流式传回。客户端在内容生成时即可看到输出;HTTP 连接保持开启直到智能体完成;无需队列、无需工作进程、无需额外的活动部件。

为什么选 SSE 而不是 WebSocket:智能体输出是单向的——服务器推送给客户端。SSE 以更低的复杂度处理单向流式传输,无需特殊配置即可穿透企业代理和 HTTP/1.1 基础设施,并且通过 Last-Event-ID 头在协议层面支持自动重连。WebSocket 给你双向通信,而你几乎从不需要它来处理智能体输出。把它留给"停止"按钮——那可以是一个独立的小端点。

完整端点

这是生产级版本——全程异步、信号量限制并发、请求级追踪(trace)、结构化错误、优雅取消。这是你实际会部署的单文件。

# app/main.py
import asyncio, json, uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
from anthropic import AsyncAnthropic

# Cap concurrent in-flight agent runs.  Don't let a traffic spike
# issue 10,000 simultaneous model calls and blow your rate limit.
AGENT_SEMAPHORE = asyncio.Semaphore(50)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Share one AsyncAnthropic client across all requests.
    # Has internal connection pool; creating per-request is wasteful.
    app.state.anthropic = AsyncAnthropic()
    yield
    await app.state.anthropic.close()

app = FastAPI(lifespan=lifespan)

class ChatRequest(BaseModel):
    message: str
    user_id: str

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest, request: Request):
    run_id = str(uuid.uuid4())

    async def event_stream():
        # Hold the semaphore for the duration of the run.
        async with AGENT_SEMAPHORE:
            try:
                yield {"event": "start",
                       "data": json.dumps({"run_id": run_id})}

                # Drive the agent loop; emit events as the run progresses.
                async for event in run_agent_streaming(
                    request.app.state.anthropic,
                    req.message,
                    req.user_id,
                ):
                    # Client disconnected?  Stop generating tokens (and stop paying).
                    if await request.is_disconnected():
                        break
                    yield {"event": event["type"],
                           "data": json.dumps(event["payload"])}

                yield {"event": "done", "data": ""}

            except asyncio.CancelledError:
                # Client disconnected mid-run; clean up and don't re-raise the cancellation
                # as an error event (the connection is gone anyway).
                return
            except Exception as e:
                yield {"event": "error",
                       "data": json.dumps({"type": type(e).__name__,
                                          "message": str(e)})}

    return EventSourceResponse(event_stream())

@app.get("/health")
async def health():
    return {"ok": True,
            "in_flight": 50 - AGENT_SEMAPHORE._value}
# app/main.py
import asyncio, json, uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
from openai import AsyncOpenAI

AGENT_SEMAPHORE = asyncio.Semaphore(50)

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.openai = AsyncOpenAI()
    yield
    await app.state.openai.close()

app = FastAPI(lifespan=lifespan)

class ChatRequest(BaseModel):
    message: str
    user_id: str

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest, request: Request):
    run_id = str(uuid.uuid4())

    async def event_stream():
        async with AGENT_SEMAPHORE:
            try:
                yield {"event": "start",
                       "data": json.dumps({"run_id": run_id})}

                async for event in run_agent_streaming(
                    request.app.state.openai,
                    req.message,
                    req.user_id,
                ):
                    if await request.is_disconnected():
                        break
                    yield {"event": event["type"],
                           "data": json.dumps(event["payload"])}

                yield {"event": "done", "data": ""}

            except asyncio.CancelledError:
                return
            except Exception as e:
                yield {"event": "error",
                       "data": json.dumps({"type": type(e).__name__,
                                          "message": str(e)})}

    return EventSourceResponse(event_stream())

@app.get("/health")
async def health():
    return {"ok": True,
            "in_flight": 50 - AGENT_SEMAPHORE._value}

这段代码中有四处不显眼但值得细说的设计。让我逐一讲解。

信号量

AGENT_SEMAPHORE = asyncio.Semaphore(50) 是这个文件里最重要的一行。它将每个副本的并发智能体运行数上限设为 50。为什么重要:如果没有它,一次流量峰值会同时向模型提供商发出 10,000 个并发调用,你会突破速率限制,每个调用都返回 429,级联效应会把整个服务打垮。有了它,第 51 个请求会(优雅地)等待前 50 个中的某一个完成。

合适的数字:用你的提供商速率限制除以你每次运行的平均令牌数,再留出 30% 的余量用于重试和偶尔的长时运行。对于 Anthropic Tier 4,每分钟每组织 8k 令牌,平均每次运行 3k 令牌共 4 轮 = 每次运行 12k 令牌,你大约可以维持……实际上请根据你的具体情况做这道数学题;关键原则是要有一个数字,而不是照抄我的。

客户端断开检测

if await request.is_disconnected(): break 是那条能帮你省钱的代码。没有它,如果用户在运行途中关闭了浏览器标签页,你的智能体会继续生成令牌、继续调用工具、继续花钱——却没有任何人在接收。有了它,循环在下一次迭代时中断,运行终止。检查发生在事件之间,所以它在正常路径上不增加任何延迟。

注意:is_disconnected() 在某些反向代理上有时不稳定——可能需要几秒钟才能检测到断开。这通常没问题;你只是多付了几个令牌的钱。不要尝试比这更积极地检测断开;那会引入更多 bug。

asynccontextmanager 生命周期

Anthropic/OpenAI 客户端有内部连接池。每次请求都创建一个会很浪费。创建一个全局实例是正确做法——但它必须在关闭时被清理。lifespan 上下文管理器是 FastAPI 惯用的方式:它为你提供与应用生命周期绑定的启动和关闭钩子。对任何共享资源(HTTP 客户端、数据库连接池、模型客户端)都使用它。

反模式:在模块导入时创建客户端,却从不关闭它。连接池在关闭时会成为孤儿,文件描述符泄漏,你的容器优雅关闭探针超时。第一次踩到这个坑要花十分钟调试。

智能体作为异步生成器

这是让流式传输感觉自然的设计。run_agent_streaming 是一个 async def 函数,随着运行进展 yield 出事件。端点消费这些事件并将其转发给客户端。智能体不知道自己在 HTTP 端点后面;它只是 yield 事件。这种分离使智能体可以独立测试,使端点成为一个薄薄的适配器。

# agent/streaming.py
async def run_agent_streaming(client, user_msg: str, user_id: str):
    # Initial: emit a status event so the client knows we got the message
    yield {"type": "status", "payload": {"message": "thinking..."}}

    messages = [{"role": "user", "content": user_msg}]
    for step in range(20):  # step budget from chapter 1.1
        # Stream the model's response token by token (or block by block)
        async with client.messages.stream(
            model="claude-sonnet-4-5",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        ) as stream:
            async for text in stream.text_stream:
                yield {"type": "token", "payload": {"text": text}}
            response = await stream.get_final_message()

        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason == "end_turn":
            return

        # Tool calls: emit per-call events so client can show "Searching docs..."
        results = []
        for block in response.content:
            if block.type == "tool_use":
                yield {"type": "tool_start",
                       "payload": {"name": block.name, "args": block.input}}
                try:
                    result = await HANDLERS[block.name](**block.input)
                    yield {"type": "tool_end",
                           "payload": {"name": block.name, "ok": True}}
                    results.append({"type": "tool_result",
                                    "tool_use_id": block.id, "content": str(result)})
                except Exception as e:
                    yield {"type": "tool_end",
                           "payload": {"name": block.name, "ok": False, "error": str(e)}}
                    results.append({"type": "tool_result",
                                    "tool_use_id": block.id, "content": f"Error: {e}",
                                    "is_error": True})
        messages.append({"role": "user", "content": results})

客户端收到的内容

浏览器观察到的 SSE 流的形态。这是真实的线上格式,便于调试:

event: start
data: {"run_id": "8c1e..."}

event: status
data: {"message": "thinking..."}

event: token
data: {"text": "I'll search"}

event: token
data: {"text": " the documentation"}

event: tool_start
data: {"name": "search_docs", "args": {"query": "autovacuum"}}

event: tool_end
data: {"name": "search_docs", "ok": true}

event: token
data: {"text": "Autovacuum naptime is..."}

event: done
data: 

客户端(浏览器、移动应用、CLI)使用标准 SSE API 消费这些事件。在 JavaScript 中,用一个 EventSource 加上每个事件名的事件监听器。显示逻辑——在 tool_start 期间显示加载动画、动态展示令牌、在 error 时显示错误——由前端决定,但这些事件的设计使得构建一个精致的 UI 成本很低。

部署它

两行 Dockerfile,部署到任何支持容器的地方。生产环境调优只需合理配置 uvicorn:

# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app",
     "--host", "0.0.0.0",
     "--port", "8000",
     "--workers", "4",            # one per CPU core
     "--timeout-keep-alive", "120"]   # long enough for SSE

注意 --workers 4——即使每个工作进程可以并发保持数千个处于飞行中的请求,你仍然需要多个 OS 级进程,这样一个崩溃或卡住的进程不会拖垮整个服务。大约每个 CPU 核一个工作进程是经验法则。--timeout-keep-alive 120 使 HTTP/1.1 连接保持足够长的时间,以支持长时流式运行,而不会被激进地关闭。

在 Cloudflare、ALB、nginx 或任何反向代理后面:关闭代理缓冲。大多数反向代理默认缓冲响应以优化吞吐量(throughput),这意味着 SSE 事件会被积压并以块的形式传递,而不是流式传输。在响应头中设置 X-Accel-Buffering: no(sse-starlette 会自动为你设置),并配置你的反向代理来遵守它。跳过这步,你的"流式"UI 会每 30 秒才更新一次。

Question
为什么用信号量而不是线程池/进程池?

线程/进程池是为 CPU 密集型工作设计的。智能体运行是 I/O 密集型的——它们主要在 await 等待模型 API。对异步代码使用信号量才是正确的原语:它在不创建线程的情况下限制并发中的任务数。效果相同(有界并发),开销低得多。

线程的用武之地:如果你有一个无法规避的同步库(例如,某个供应商 SDK 没有提供异步版本),用 asyncio.to_thread(...) 包装它的调用,使其不阻塞事件循环。信号量仍然在其上方的智能体运行层级发挥作用。

Question
如果一次运行占用信号量 60 秒,其他请求要等待怎么办?

这说明你正在触碰模式 A 的范围上限,应该考虑模式 B。对于负载峰值期间几十秒的信号量等待是可以接受的;如果等待时间经常超过 10 秒,就会成为用户体验问题。三种应对方式,按顺序:

  • 为"长"运行添加一个单独的信号量(通过步骤数或已用时间检测),避免它们饿死"短"运行。
  • 将长时运行路径迁移到模式 B(队列 + 工作进程)。
  • 水平扩展——更多副本,每个副本有自己的信号量。数学公式:总并发容量 = 副本数 × 每副本信号量值。
STEP 3

模式 B:用于长时运行的队列驱动工作进程。

模式 A 适合智能体运行数秒的对话式交互。但对于那些真正展现智能体 AI 价值的场景——耗时 10 分钟的深度调研、对 100 份文档的批量处理、需要暂停等待用户审阅的多阶段工作流——它就不管用了。对于这些情况,HTTP 请求-响应模型从根本上就是错的:你没办法保持连接开启那么久,而且即便能,用户也不应该一直开着浏览器标签页等待。

解决方案是提交/轮询架构。客户端发起 POST 请求;服务器立即返回一个 run_id 并将任务入队;工作进程从队列中取出任务并执行;客户端通过单独的 SSE 连接按 run_id 订阅更新。用户可以关闭浏览器,之后再回来查看结果——运行在服务器端持续进行,更新从上次中断的地方继续流式传输。

架构形态

┌─────────────────────────────────────────────────────────────────┐ │ │ │ client fastapi redis queue │ │ │ │ │ │ │ │ POST /runs │ │ │ │ │ ──────────────────────►│ │ │ │ │ │ XADD runs │ │ │ │ │ ────────────────────►│ │ │ │ 202 {run_id} │ │ │ │ │ ◄──────────────────────│ │ │ │ │ │ │ XREAD │ │ │ │ │◄─────────┐ │ │ │ │ │ │ │ │ │ GET /runs/{id}/events │ │ worker│ │ │ ──────────────────────►│ │ XADD events│ │ │ (SSE connection) │ XREAD events:{id} │◄─────────│ │ │ │ │ ◄────────────────────│ │ │ │ │ event: token … │ │ ... │ │ │ │ ◄──────────────────────│ │ │ │ │ │ │ │ XADD done│ │ │ │ event: done │ │◄─────────┘ │ │ │ ◄──────────────────────│ │ │ │ │ │ Two-process architecture: API server (fast, never blocks) + │ │ worker process (slow, runs agents). Connected by Redis │ │ streams. Client subscribes by run_id, can reconnect at any │ │ time using Last-Event-ID. │ └─────────────────────────────────────────────────────────────────┘

需要独立搭建三个部分:立即返回的提交端点、从队列消费并发出事件的工作进程,以及按 run_id 订阅的流式传输端点。我们将使用 Redis Streams 同时作为队列和事件日志——这是正确的原语(持久、有序,支持带最后 ID 恢复的发布/订阅)。

提交端点

# app/routes/runs.py
import uuid, json
from fastapi import APIRouter, Request
from redis.asyncio import Redis

router = APIRouter()

@router.post("/runs", status_code=202)
async def submit_run(req: ChatRequest, request: Request):
    redis: Redis = request.app.state.redis
    run_id = str(uuid.uuid4())

    # Persist run metadata so client can poll status at any time
    await redis.hset(f"run:{run_id}", mapping={
        "status": "queued",
        "user_id": req.user_id,
        "message": req.message,
        "created_at": int(time.time()),
    })
    await redis.expire(f"run:{run_id}", 86400)  # 24h TTL

    # Enqueue.  XADD is atomic; the worker will pick it up.
    await redis.xadd("queue:runs", {
        "run_id": run_id,
        "user_id": req.user_id,
        "message": req.message,
    })

    return {"run_id": run_id, "status": "queued"}

该端点在毫秒内返回。202 状态码告诉客户端"已接受但尚未完成"——它们应该通过轮询或订阅来获取结果。

工作进程

一个独立的进程(独立的 Docker 容器、独立的 Kubernetes pod、独立的 Fly.io 机器——取决于你的部署基础设施的叫法)。它永远循环,从队列中拉取任务。

# worker/main.py
import asyncio, json, time, signal
from redis.asyncio import Redis
from anthropic import AsyncAnthropic

async def worker_loop(name: str):
    redis = Redis(host="redis", decode_responses=True)
    anthropic = AsyncAnthropic()
    shutdown = asyncio.Event()
    signal.signal(signal.SIGTERM, lambda *_: shutdown.set())

    # Consumer group: multiple workers can share one queue with
    # at-least-once delivery and automatic re-claim on crash.
    try:
        await redis.xgroup_create("queue:runs", "workers", id="0", mkstream=True)
    except:
        pass  # already exists

    while not shutdown.is_set():
        # Block for up to 5 seconds waiting for new work
        msgs = await redis.xreadgroup(
            "workers", name, {"queue:runs": ">"},
            count=1, block=5000,
        )
        if not msgs:
            continue

        for _, items in msgs:
            for msg_id, data in items:
                run_id = data["run_id"]
                try:
                    await execute_run(redis, anthropic, run_id, data)
                    await redis.xack("queue:runs", "workers", msg_id)
                except Exception as e:
                    # Mark run as failed but ack so we don't infinite-loop
                    await mark_failed(redis, run_id, e)
                    await redis.xack("queue:runs", "workers", msg_id)

async def execute_run(redis, anthropic, run_id: str, data: dict):
    """Run the agent.  Each event also gets appended to a Redis
    Stream that streaming clients can subscribe to."""
    events_key = f"events:{run_id}"
    await redis.hset(f"run:{run_id}", "status", "running")
    await redis.xadd(events_key,
                       {"event": "start", "data": ""})

    async for event in run_agent_streaming(anthropic, data["message"], data["user_id"]):
        await redis.xadd(events_key, {
            "event": event["type"],
            "data": json.dumps(event["payload"]),
        })

    await redis.xadd(events_key, {"event": "done", "data": ""})
    await redis.hset(f"run:{run_id}", "status", "complete")
    await redis.expire(events_key, 3600)  # 1h TTL on event log

if __name__ == "__main__":
    asyncio.run(worker_loop(name=os.environ.get("WORKER_NAME", "worker-1")))

有三点值得停下来细看。消费者组XREADGROUP)在多个工作进程间提供至少一次(at-least-once)交付:如果某个工作进程在运行中途崩溃,消息未被确认,另一个工作进程可以通过 XPENDING + XCLAIM 认领它。这就是让队列具备弹性的属性。SIGTERM 处理对于优雅关闭至关重要——当平台通知工作进程终止时,先完成当前运行再退出。events:{run_id} 流是流式端点将要订阅的对象;它同时也是每次运行的完整审计日志,以 1 小时 TTL 持久化保存。

流式端点

这是客户端提交运行后连接的端点。它订阅给定 run_id 的事件流,并将事件以 SSE 形式转发。

@router.get("/runs/{run_id}/events")
async def stream_events(run_id: str, request: Request,
                         last_event_id: str = Header(default="0")):
    """Subscribe to a run's event stream.  Supports resumption via
    Last-Event-ID — drop the connection and reconnect, no events lost."""
    redis = request.app.state.redis
    events_key = f"events:{run_id}"

    async def subscribe():
        last_id = last_event_id or "0"
        while True:
            if await request.is_disconnected():
                break

            # Read events newer than last_id.  Block up to 5s.
            msgs = await redis.xread({events_key: last_id}, count=20, block=5000)
            if not msgs:
                # Periodic keepalive comment so proxies don't time out
                yield {"comment": "keepalive"}
                continue

            for _, items in msgs:
                for msg_id, data in items:
                    last_id = msg_id
                    yield {
                        "id": msg_id,           # client tracks this for resume
                        "event": data["event"],
                        "data": data["data"],
                    }
                    if data["event"] == "done":
                        return

    return EventSourceResponse(subscribe())

Last-Event-ID 头是 SSE 内置的恢复机制。当客户端在连接断开后重连时,浏览器会自动发回它收到的最后一个事件 ID。我们以此作为 Redis 流中的起始点——客户端从上次中断的地方精确恢复,无重复、无遗漏。没有这个机制,每次不稳定的网络或浏览器标签页切换都会迫使用户重新开始。

完整的生命周期,端到端

  t=0       client POSTs /runs
            server enqueues, returns 202 {run_id: "8c1e..."}
  t=0.05    worker picks up message from queue
  t=0.06    client opens /runs/8c1e.../events SSE connection
  t=0.10    worker emits "start" event → Redis stream
  t=0.10    server forwards "start" to client over SSE
  ...
  t=180     worker emits "tool_start: search_web (query=Q3 GDP)"
  t=183     worker emits "tool_end" + tokens
  t=240     CLIENT'S WIFI DROPS — SSE connection closes
            but the worker keeps running.  Events keep landing
            in Redis stream.
  t=247     client reconnects, sends Last-Event-ID: 240-3
  t=247.1   server resumes stream from event 240-3+1
            client receives all events from t=240 onward
  ...
  t=412     worker emits "done"
            server closes connection
            run record persists in Redis for 24h

这是用户期望从智能体获得的体验:提交一个长任务,合上笔记本,回来看结果。提交/轮询架构正是使这成为可能的关键。

对于初始部署,使用 supervisord 等进程管理器将 API 服务和工作进程作为独立进程运行在同一个 Docker 容器中。你获得了架构分离,而无需承担两个服务的运维开销。当流量模式要求时再拆分为独立部署(不同的扩容配置,或工作进程需要不同的资源)。

Question
为什么用 Redis Streams 而不是 Celery / RQ / SQS?

以上这些都可以用。选择 Redis Streams 的条件是:(a) 你已经在运行 Redis 用于缓存/会话,因此不需要引入新的基础设施;(b) 你希望事件日志同时作为流式订阅后端(一种机制而不是两种);(c) 消费者组提供了你所需要的持久性和重认领语义。

Celery 更重量级,功能也更多(定时任务、复杂工作流、带退避的重试)。如果你需要那些功能就选它。RQ 比 Celery 更简单。SQS 是你在 AWS 上且想要托管基础设施时的正确选择。这些方案的模式都类似——提交端点、工作循环、确认/重试——只是 API 不同。

Question
幂等性怎么处理?如果工作进程在运行中途崩溃,另一个接手后,智能体会运行两次。

这是至少一次交付的问题。三种方案,按工作量排序:

  • 让工具具备幂等性。如果 send_email 有去重键(run_id + tool_call_idx),重试就是安全的。这是最干净的修复,也是长期正确的答案。
  • 对进度做检查点。每次工具调用后,将结果以 (run_id, step) 为键持久化到 Redis。恢复时,跳过已有结果的步骤。这就是 LangGraph 等智能体运行时替你做的事情。
  • 对非幂等工具直接禁用重认领。如果工作进程崩溃,将运行标记为失败,让用户重新提交。最简单,但意味着崩溃对用户可见。对某些产品来说可以接受。

对于首次部署:方案三。对于生产环境:方案一和方案二的组合。

STEP 4

决定你能否持续运行的四个运营关注点。

你已经发布了智能体,它在正常工作。现在运营现实开始了。本步骤涵盖四件决定你的服务能否随负载增长保持可用、经济可行和可调试的事情:状态的存放位置、应该按什么信号自动扩容、如何优雅降级,以及如何防止成本失控。每个都是一小节,但跳过任何一个,就是"部署了一个智能体"和"真正发布了一个智能体"之间的差距。

1. 状态存储:Postgres 存真相,Redis 走热路径

对话历史存放在某处。错误的地方会产生问题。两个存储,两项职责:

Postgres 持有真相。每次对话、每条消息、每次工具调用结果的持久记录。模式(schema)大致如下:

CREATE TABLE conversations (
    id              UUID PRIMARY KEY,
    user_id         TEXT NOT NULL,
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    updated_at      TIMESTAMPTZ DEFAULT NOW(),
    title           TEXT,
    metadata        JSONB DEFAULT '{}'::JSONB
);

CREATE TABLE messages (
    id              BIGSERIAL PRIMARY KEY,
    conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
    role            TEXT NOT NULL,    -- user / assistant / tool
    content         JSONB NOT NULL,   -- the full message blocks
    token_count     INTEGER,
    cost_usd        NUMERIC(10, 6),
    created_at      TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX messages_conv_idx
    ON messages(conversation_id, id);

content 列是包含完整消息结构的 JSONB——包括 tool_use 和 tool_result 块。用 JSONB 是为了以后可以查询它("上个月智能体调用了多少次 search_docs?"),而无需解析字符串。

Redis 走热路径。在一次活跃对话中,每一轮都需要读取消息历史。每次都查询 Postgres 会增加 5–10 毫秒的延迟,并造成 N+1 查询问题。正确的缓存做法:将近期消息序列化为 JSON,以 conv:{id}:messages 为键存储在 Redis 中,设置 1 小时 TTL,每次新消息时写穿更新。

# app/state.py
async def get_messages(redis, db, conversation_id: str) -> list[dict]:
    # Hot path: Redis
    cached = await redis.get(f"conv:{conversation_id}:messages")
    if cached:
        return json.loads(cached)

    # Cold path: Postgres
    rows = await db.fetch(
        "SELECT content FROM messages WHERE conversation_id = $1 ORDER BY id",
        conversation_id,
    )
    messages = [r["content"] for r in rows]

    # Warm the cache for next time
    await redis.set(
        f"conv:{conversation_id}:messages",
        json.dumps(messages),
        ex=3600,
    )
    return messages

async def append_message(redis, db, conversation_id: str, msg: dict):
    # Write to truth first
    await db.execute(
        "INSERT INTO messages (conversation_id, role, content, ...) VALUES ...",
        conversation_id, msg["role"], json.dumps(msg["content"]),
    )
    # Invalidate cache; will be repopulated on next read
    await redis.delete(f"conv:{conversation_id}:messages")

不变式:Postgres 是真相;Redis 是可能出错但能自我修复的缓存。写入时使缓存失效是最简单有效的方案——除非有充分理由,否则不要尝试写穿缓存更新。缓存会在下次读取时重新填充,这完全没问题。

2. 按正确的信号自动扩容

如步骤 1 所述,基于 CPU 的自动扩容对智能体永远不会触发。真正与"我需要更多容量"相关的信号:

  • 信号量饱和度(模式 A)。/health 端点上的 in_flight 计数。当其超过上限的 80% 持续超过 1 分钟时扩容。
  • 队列深度(模式 B)。XLEN queue:runs。当深度增长时扩容工作进程;当其持续为零超过 5 分钟时缩容。
  • 提交端点的 P95 延迟。即使是应该在毫秒内返回的提交端点,在 Redis 或 Postgres 饱和时也会变慢。监测这个指标以感知上游压力。
  • 模型提供商的错误率。429 错误激增意味着你已到达速率限制上限;添加更多副本无济于事,你需要降低每副本的并发数或联系提供商。

大多数托管平台(Fly.io、Modal、Railway)允许你通过接入 Prometheus 或轮询 HTTP 端点来基于自定义指标自动扩容。要暴露的指标:API 副本的 in_flight,工作进程副本的 queue_depth。将它们接入你的平台自动扩容器,设定阈值,发布上线。

3. 优雅降级:你会遇到的三种故障

三种生产环境故障模式足够常见,值得提前计划应对。

模型提供商中断或 5xx 风暴。Anthropic 或 OpenAI 持续 10 分钟返回错误。你的智能体无法运行。正确的行为:向用户提供清晰的错误信息("模型暂时不可用,请稍后重试"),而不是让请求超时。实现方式:一个熔断器(circuit breaker),当错误率在 60 秒内超过 50% 时触发,之后 5 分钟内快速失败,并定期发送探测请求以检测恢复。

# app/circuit.py
class CircuitBreaker:
    def __init__(self, threshold=0.5, window=60, cooldown=300):
        self.threshold = threshold
        self.window = window
        self.cooldown = cooldown
        self.results: list[tuple[float, bool]] = []   # (timestamp, ok)
        self.tripped_until = 0

    def record(self, ok: bool):
        now = time.time()
        self.results.append((now, ok))
        self.results = [r for r in self.results if r[0] > now - self.window]
        if len(self.results) >= 10:
            error_rate = 1 - sum(ok for _, ok in self.results) / len(self.results)
            if error_rate > self.threshold:
                self.tripped_until = now + self.cooldown

    def is_open(self) -> bool:
        return time.time() < self.tripped_until

回退到更便宜或不同的模型。当主模型的熔断器触发时,有时可以回退到备用模型——Sonnet → Haiku,GPT-5.5 → GPT-5-mini,或完全不同的提供商。备用模型通常更慢或能力稍弱,但给出一个真实的答案总好过报错。两点注意:备用模型需要自己的熔断器(你不希望两个提供商都宕机时让你永远无法恢复),并且你的评估(eval)(第 1.4 章、第 3.1 章)应该涵盖备用路径,以便你了解在那里的质量表现。

失控的成本。一次有缺陷的提示词(prompt)变更导致智能体每次运行调用 search_docs 50 次,而不是 2 次。你在第二天早上从成本仪表板发现了这个问题,此时已花费 4,000 美元。防御手段:在循环本身中强制执行每用户、每次运行的成本上限。

# In the agent loop, after each model call:
run_cost += response.usage.input_tokens * INPUT_PRICE / 1e6
run_cost += response.usage.output_tokens * OUTPUT_PRICE / 1e6

if run_cost > MAX_COST_PER_RUN:
    log.warn("run_cost_exceeded", run_id=run_id, cost=run_cost)
    yield {"type": "error",
           "payload": {"reason": "cost_budget_exceeded"}}
    return

# Also check daily user budget
daily = await redis.incrbyfloat(f"cost:user:{user_id}:{today}", run_cost)
if daily > MAX_COST_PER_USER_PER_DAY:
    yield {"type": "error",
           "payload": {"reason": "daily_budget_exceeded"}}
    return

合理的默认值:面向终端用户的智能体每次运行 1 美元,每用户每天 20 美元,可信账户允许显式覆盖。这些上限与第 3.1 章中的预算互为补充——那些在回归(regression)发布时把关,这些在悄悄溜进来的回归消费时把关。

4. 运行手册

最后一块,也是最常被跳过的:写下出问题时该怎么做。一两页纯文本,和你的部署配置放在一起。有用的最低要求:

  • 如何部署。具体的命令。同时包括正常路径和紧急回滚路径。
  • 如何查看正在发生什么。仪表板在哪里。日志在哪里。被呼叫时运行的三条查询。
  • 如何让它停下来。紧急开关(第 2.3 章——那个能暂停所有运行的标志)。成本上限开关。模型提供商切换。
  • 最可能的五种故障模式以及每种的第一检查项。与第 2.1 章的可观测性(observability)仪表板关联。"延迟 P95 飙升 → 检查 /health 上的信号量饱和度;如果 >90%,扩容。""成本激增 → 检查哪个工具的调用次数增加了;检查过去 24 小时的 PR 中是否有提示词回归。"

运行手册是让另一位工程师(或凌晨三点的未来的你)能够胜任地处理故障的东西。在发布后的第二天写它,而不是在需要它的时候。

Question
我的平台不暴露自定义自动扩容指标。有什么变通方法吗?

三种选项。并发连接数(大多数平台暴露此指标)是运行中请求数的不错代理,前提是你没有做奇怪的事情。1 分钟窗口内的 HTTP 请求率是粗略但可用的信号。外部驱动的扩容:一个小的定时任务,轮询你的指标端点并调用平台的"设置副本数"API。不如内置自动扩容优雅,但在任何平台上都能用。

如果你发现自己在构建复杂的变通方案:这是一个信号,说明你已经超出了当前平台的能力范围。考虑迁移到支持自定义指标一等公民自动扩容的平台(Modal、Kubernetes with HPA、Fly.io 的自动扩容 beta)。

Question
API 密钥的 secrets 管理怎么做?

对于初始部署:平台原生 secrets(Fly Secrets、Railway 环境变量、Kubernetes Secrets)。不要把 API 密钥放进 Docker 镜像、Git 仓库或运行时命令行。平台的机制已经足够好。

在以下情况下考虑 HashiCorp Vault、AWS Secrets Manager 或类似工具:(a) 你有多个环境且有密钥轮换要求;(b) 你需要谁访问了哪个 secret 的审计日志;(c) 合规要求。对于大多数发布首个智能体的团队而言,平台原生方案没问题,而且少一件需要搭建的东西。

Question
什么时候应该把 API 服务和工作进程拆分为独立服务?

当它们的扩容配置出现显著差异时。具体来说:API 服务器按连接数扩容(处理大量短请求),工作进程按队列深度扩容(处理较少的长任务)。如果你经常需要扩容其中一个而不需要另一个,拆分可以避免过度配置闲置的那一侧。监测的指标:你是否曾经因为需要工作进程容量而运行了比实际需要更多的 API 副本(反之亦然)?如果是,就拆分。

一个服务的头 6 个月:一个容器包含两者。之后:通常拆分。拆分的成本很低(两个 Dockerfile 而不是一个,两个部署配置);拆分的代价是过度配置。

WORKED EXAMPLE

30 分钟完成你的首次部署:Fly.io 全程实操。

模式在你真正部署过一次之后更容易内化。本实操演练取步骤 2 中的模式 A 端点,将其部署到挂载了 Redis 实例的 Fly.io,最终得到一个可以 curl 的可用 URL。选择 Fly.io 是因为它是从本地近似生产的最低开销路径:无需 Kubernetes,无需 AWS 控制台,两条命令搞定。这些原则可以干净地映射到 Railway、Render、Modal 或 AWS App Runner——如果你偏好这些平台,区别仅在于语法。

前提条件:你已将步骤 2 中的模式 A 代码放在某个目录中,有一个可用的 requirements.txt,以及一个 ANTHROPIC_API_KEY(或 OPENAI_API_KEY)在你的环境中。

第 0–5 分钟:项目结构

如果你还没有这种目录结构,先建起来:

my-agent/
├── app/
│   ├── __init__.py
│   ├── main.py            # the FastAPI app from Step 2
│   └── agent/
│       ├── __init__.py
│       └── streaming.py   # run_agent_streaming() from Step 2
├── requirements.txt
├── Dockerfile
└── fly.toml               # we'll create this next

最小可行部署的 requirements.txt

fastapi==0.115.0
uvicorn[standard]==0.32.0
sse-starlette==2.1.3
anthropic==0.40.0
pydantic==2.9.0

步骤 2 中的 Dockerfile 加一个改动——添加健康检查二进制路径,避免 Fly 的探测器在慢启动期间误报:

# Dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

ENV PYTHONUNBUFFERED=1
EXPOSE 8000

CMD ["uvicorn", "app.main:app",
     "--host", "0.0.0.0",
     "--port", "8000",
     "--workers", "2",
     "--timeout-keep-alive", "120"]

第 5–10 分钟:Fly 设置和首次启动

如果还没有安装 CLI,先安装:curl -L https://fly.io/install.sh | sh。然后注册并登录:

$ fly auth signup           # or: fly auth login
$ fly launch --no-deploy    # creates fly.toml interactively

CLI 会问几个问题。首次部署的合理回答:

  • 应用名称:全局唯一的名称,例如 my-agent-yourname
  • 区域:最靠近你用户的区域。
  • Postgres / Redis / Sentry:暂时全部选否——我们稍后会显式添加 Redis。
  • 立即部署:否(我们想先设置 secrets)。

结果是一个 fly.toml 文件。打开它,添加内部端口和健康检查部分——默认值已很接近,这些调整是让它在流式传输下正常工作的关键:

# fly.toml
app = "my-agent-yourname"
primary_region = "sjc"

[build]

[http_service]
  internal_port = 8000
  force_https = true
  auto_stop_machines = "stop"     # save $ when idle
  auto_start_machines = true
  min_machines_running = 1       # at least one warm replica

  [[http_service.checks]]
    interval = "30s"
    timeout = "5s"
    grace_period = "10s"
    method = "GET"
    path = "/health"

[[vm]]
  size = "shared-cpu-1x"
  memory = "512mb"           # enough for agent + SDK + asyncio

第 10–15 分钟:secrets 和 Redis

将你的 API 密钥设置为 Fly secret(静态加密,运行时以环境变量形式注入):

$ fly secrets set ANTHROPIC_API_KEY=sk-ant-...
$ fly secrets set OPENAI_API_KEY=sk-...           # if also using OpenAI

通过 Upstash 添加 Redis(托管服务,起步免费套餐已足够):

$ fly redis create
  # CLI walks you through region selection, naming
  # On success it sets REDIS_URL as a secret automatically
$ fly redis status <your-redis-name>
  # Verify it's running

你的应用代码读取 os.environ["REDIS_URL"]——Fly 已将其作为 secret 注入。无需其他配置。

第 15–20 分钟:部署

$ fly deploy

==> Verifying app config
==> Building image
    Image: registry.fly.io/my-agent-yourname:deployment-...
    Image size: 187 MB
==> Creating release
==> Monitoring deployment
    1 desired, 1 placed, 1 healthy, 0 unhealthy
    --> v1 deployed successfully

Visit your newly deployed app at https://my-agent-yourname.fly.dev/

你的应用现已公开在 https://my-agent-yourname.fly.dev/。先检查健康状态:

$ curl https://my-agent-yourname.fly.dev/health
{"ok": true, "in_flight": 0}

现在是真正的测试——流式端点:

$ curl -N -X POST https://my-agent-yourname.fly.dev/chat/stream \
    -H "Content-Type: application/json" \
    -d '{"message": "what is 2 + 2?", "user_id": "test"}'

event: start
data: {"run_id": "8c1e..."}

event: status
data: {"message": "thinking..."}

event: token
data: {"text": "2"}

event: token
data: {"text": " + 2"}

event: token
data: {"text": " equals 4"}

event: done
data: 

curl 上的 -N 标志禁用缓冲,让你实时看到令牌流式传输。看着事件一条一条到达,是流式传输真正端到端工作的直观确认。

第 20–25 分钟:可观测性健康检查

Fly 内置了日志和基本指标。你最常用的命令:

$ fly logs                          # tail logs from all replicas
$ fly logs --instance <machine-id>   # tail one replica
$ fly status                         # replica health, memory, CPU
$ fly dashboard                      # opens web UI in browser

如果你已接入第 2.1 章的可观测性(OpenTelemetry → Langfuse),现在就是确认追踪是否到达你的仪表板的时候了。发几个请求,打开 Langfuse,看它们以正确的跨度(span)树形式出现。如果没有,OTLP 导出器 URL 或认证头有问题——查看 fly logs 中的错误。

第 25–30 分钟:扩容和锁定

你已上线,但还是单副本、无扩容、无速率限制。三条命令就能让它更接近生产就绪:

# Autoscale to 1-3 replicas based on connection count.
$ fly autoscale set min=1 max=3 \
    --metric concurrent_connections=20

# Add a simple rate limit at the Fly edge (10 req/min per IP).
# Edit fly.toml:
[http_service.concurrency]
  type = "requests"
  hard_limit = 25
  soft_limit = 20

# Cost ceiling at the platform level — set a monthly budget alert.
# In the web dashboard: Organization → Billing → Spend Alert.
# Set to $50/mo so an accidental loop doesn't bankrupt you.

fly deploy 重新部署。你现在拥有一个智能体,它:

  • 通过 HTTPS 在稳定的 URL 提供服务,使用托管证书。
  • 实时通过 SSE 将令牌流式传输给客户端。
  • 有健康检查,崩溃时自动重启。
  • 根据负载在 1 到 3 个副本之间弹性扩容。
  • 在闲置时段停止空闲机器以节省费用。
  • 附加了 Redis,为你需要时的模式 B 升级做好准备。
  • 日志可实时查看和查询。
  • 有消费告警,失控循环不会让你措手不及。

这还不是什么

诚实地说你发布了什么:这是"近似生产",而不是"生产就绪"。具体而言,你还没有接入模式 B(没有工作进程;长时运行仍然会遇到 HTTP 超时)、状态存储分离(没有 Postgres;对话在重启后不持久),也没有熔断器(如果 Anthropic 出现 5xx 风暴,你的用户会看到错误,没有优雅降级)。那些是下一个部署里程碑;30 分钟版本是地板,不是天花板。

确实拥有的是基础:一个服务真实流量的真实 URL,具备正确的形态来持续演进。模式 B 就是"在 fly.toml 中将工作进程添加为第二个机器组"。状态存储分离就是"添加 Fly Postgres,编写模式迁移"。熔断器只需在智能体代码中增加几百行。这些都不需要重新架构;它们会叠加在你现有的基础上。

映射到其他平台

如果你不用 Fly,同样的工作流可以干净地迁移:

  • Railway:推送到 GitHub,点击"从仓库部署",在控制台中设置环境变量。添加 Redis 作为插件。同样的 Dockerfile 可用。
  • Render:从你的仓库创建一个 Web Service。添加 Redis 实例。设置环境变量。同样的 Dockerfile。
  • Modal:@app.asgi_app() 包装你的 FastAPI 应用,用 modal deploy 部署。Redis 通过 Upstash。
  • AWS App Runner:将你的镜像推送到 ECR,创建一个指向它的 App Runner 服务。Redis 通过 ElastiCache。
  • Kubernetes:如果你有集群,一个基础的 Deployment + Service + ConfigMap 就够了。不要仅仅为这个就用 k8s;除非你有多个服务或特定的网络需求,否则它过于复杂。

任何平台的经济基准:在 Fly 上,中等流量的小型智能体每月计算成本 5–20 美元(免费套餐覆盖大部分),Railway 或 Render 上是 10–30 美元,再加上你向模型提供商支付的实际使用费。模型账单通常是平台成本的 10–100 倍。不要过度优化平台支出。

End of chapter 2.4

交付物

在你选择的云平台上部署的智能体,采用真正重要的生产模式为真实流量提供服务:用于短时运行的 SSE 流式端点、用于长时运行的队列加工作进程组合、Postgres 加 Redis 的状态分离、基于真正有意义的信号的自动扩容、应对实际会遇到的故障模式的熔断器和每用户成本上限,以及你未来的自己在凌晨三点能看懂的运行手册。在接入第 2.1 章的可观测性和第 2.3 章的安全措施之后,这便是智能体发布所依托的基础设施。

  • FastAPI + sse-starlette 流式端点,具备信号量限制的并发控制
  • 带令牌流式传输和 tool_start/tool_end 事件的异步智能体循环
  • 客户端断开检测(request.is_disconnected),防止失控运行
  • 基于 Redis Streams 和消费者组的提交/轮询端点对
  • 带 SIGTERM 处理和至少一次交付的工作进程
  • 通过 Last-Event-ID 实现 SSE 恢复,增强对不稳定网络的韧性
  • Postgres 存储对话真相;Redis 缓存配合写入时失效
  • 基于信号量饱和度(API)和队列深度(工作进程)的自动扩容
  • 应对模型提供商中断的熔断器;备用模型已接入
  • 在循环中强制执行的每次运行和每用户每天成本上限
  • 涵盖部署、观测、紧急开关和前五大故障模式的运行手册