部署(Deployment):智能体不适合普通的 Web API。
你的智能体(agent)在 REPL 中运行正常。现在它需要为真实用户提供服务、在崩溃后恢复运行、在不均匀的负载下弹性扩容,并在出现问题时优雅降级。智能体具有特殊的形态——运行时间长、有状态、每次请求成本(cost)高、容易出现意外——而针对普通 Web 服务的标准建议在与它们碰撞时会以特定的方式失效。本章讲解真正有效的模式:用于快速运行的同步端点配合 SSE 流式传输、用于长时运行的队列驱动工作进程、状态存储分离方案,以及防止账单在凌晨三点爆炸的运营护栏(guardrail)。
形态问题。
普通 Web 服务具有可预测的形态。一个请求进来,一些代码执行,一个响应发出。延迟(latency)通常在一秒以内;如果不是,那就是哪里出了问题。瓶颈是 CPU,会话是无状态的,水平扩展意味着"添加更多副本"。以上这些描述的都不是智能体。
智能体是 I/O 密集型的(等待模型 API),可以运行数分钟乃至数小时,在多轮对话中保持会话状态,每次请求花费真实金钱,并且表现出常规服务中不会出现的故障模式——模型服务中断、速率限制、工具调用(tool use)无限循环。如果你拿标准套路("把代码包进 Flask,放在 nginx 后面,按 CPU 扩容")直接套用,还没开始发布就已经在三个具体方面犯了错。
会出问题的三件事
问题一:长时运行期间 HTTP 超时。浏览器、负载均衡器和 CDN 都有请求超时——通常是浏览器 30 秒、ALB 60 秒、Cloudflare 免费套餐 100 秒。你的智能体在深度调研任务上可能需要运行 5 分钟。如果你在整个过程中保持 HTTP 连接开启,连接会在运行中途断开,用户什么都得不到。标准的"同步请求/响应"模式与需要运行数分钟的智能体不兼容。
问题二:WSGI 工作进程阻塞在模型调用上。Flask、Django、Rails——所有基于 WSGI 的框架——都为每个请求分配一个 OS 线程。在等待模型 API 时,该线程处于阻塞状态。四个工作进程,四个并发用户;第五个用户要在队列里等待。对于一个 95% 时间都在执行 await client.messages.create(...) 的典型智能体而言,这意味着你 95% 的工作进程容量处于空闲状态,同时队列不断增长。标准的"增加更多工作进程"方案并无帮助,因为每个工作进程仍然在网络调用上阻塞。
问题三:基于 CPU 的自动扩容从不触发。智能体服务器上的 CPU 利用率通常低于 10%。瓶颈不是计算——而是并发中的模型调用数量、延迟预算消耗和队列深度。标准的基于 CPU 的自动扩容永远不会触发。你要么手动调整固定的副本数(并为峰值过度配置),要么找到不同的扩容信号。
决策树
解决这三个问题的方案是同一个思路:让你的部署形态匹配智能体的预期运行时长。三种模式几乎覆盖了所有情况。
大多数生产环境中的智能体最终会组合使用——模式 A 用于短时交互运行,模式 B 用于长时后台运行,共享同一套模式 C 的状态存储。本章将依次讲解每种模式。
从一开始就采用异步的承诺
在讨论任何模式之前,先说一点:选 FastAPI(或 Starlette,或任何 ASGI 框架),而不是 Flask。对于智能体而言,这是不可商量的。原因在于 I/O 密集型的本质:FastAPI 工作进程可以在单个进程中并发保持数千个处于飞行中的 await client.messages.create(...) 调用,因为每个调用在等待时都会让出控制权。同样的工作负载在 Flask 上则需要数千个 OS 线程,这根本不可行。
心智模型:使用 ASGI 时,你的并发限制由你的 API 提供商的速率限制决定,而不是由你的服务器工作进程数决定。这才是正确的形态。
如果你的并发用户少于 5 个且运行时间在几秒以内,Flask 没问题。当以下任一条件成立时,迁移就变得紧迫了:并发用户数超过约 20 人;你的运行偶尔需要超过 30 秒;你开始向客户端流式传输令牌(token);你需要高效地并行执行工具调用。低于这个阈值时,迁移的工程成本超过收益。
说实话:大多数团队发现自己比预期更早需要 ASGI,因为流量在增长,或者运行时间在增长,或者两者兼有。在 5 个用户时完成一次迁移,比在 50 个用户时焦头烂额地迁移要便宜得多。
对于初始部署,几乎肯定不需要。Fly.io、Railway、Render、Modal、AWS App Runner——这些平台都允许你用一条命令部署容器化的 FastAPI 应用,并且处理好那些无聊的部分(HTTPS、负载均衡、基本自动扩容)。对于一个为每天数千用户提供服务的单团队智能体而言,这是合适的抽象层级。
当你真正需要以下功能时再考虑 Kubernetes:多个共享基础设施的智能体服务、对服务间网络的控制、复杂的部署拓扑(金丝雀、蓝绿),或者公司本来就跑在 Kubernetes 上。不要仅仅因为它是规模化公司的默认答案就去用它。
模式 A:带 SSE 流式传输的同步端点。
对于在 30 秒内完成的运行,最简单的生产级部署是一个 FastAPI 端点:运行智能体,并通过服务器发送事件(Server-Sent Events,SSE)将更新流式传回。客户端在内容生成时即可看到输出;HTTP 连接保持开启直到智能体完成;无需队列、无需工作进程、无需额外的活动部件。
为什么选 SSE 而不是 WebSocket:智能体输出是单向的——服务器推送给客户端。SSE 以更低的复杂度处理单向流式传输,无需特殊配置即可穿透企业代理和 HTTP/1.1 基础设施,并且通过 Last-Event-ID 头在协议层面支持自动重连。WebSocket 给你双向通信,而你几乎从不需要它来处理智能体输出。把它留给"停止"按钮——那可以是一个独立的小端点。
完整端点
这是生产级版本——全程异步、信号量限制并发、请求级追踪(trace)、结构化错误、优雅取消。这是你实际会部署的单文件。
# app/main.py import asyncio, json, uuid from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Request from sse_starlette.sse import EventSourceResponse from pydantic import BaseModel from anthropic import AsyncAnthropic # Cap concurrent in-flight agent runs. Don't let a traffic spike # issue 10,000 simultaneous model calls and blow your rate limit. AGENT_SEMAPHORE = asyncio.Semaphore(50) @asynccontextmanager async def lifespan(app: FastAPI): # Share one AsyncAnthropic client across all requests. # Has internal connection pool; creating per-request is wasteful. app.state.anthropic = AsyncAnthropic() yield await app.state.anthropic.close() app = FastAPI(lifespan=lifespan) class ChatRequest(BaseModel): message: str user_id: str @app.post("/chat/stream") async def chat_stream(req: ChatRequest, request: Request): run_id = str(uuid.uuid4()) async def event_stream(): # Hold the semaphore for the duration of the run. async with AGENT_SEMAPHORE: try: yield {"event": "start", "data": json.dumps({"run_id": run_id})} # Drive the agent loop; emit events as the run progresses. async for event in run_agent_streaming( request.app.state.anthropic, req.message, req.user_id, ): # Client disconnected? Stop generating tokens (and stop paying). if await request.is_disconnected(): break yield {"event": event["type"], "data": json.dumps(event["payload"])} yield {"event": "done", "data": ""} except asyncio.CancelledError: # Client disconnected mid-run; clean up and don't re-raise the cancellation # as an error event (the connection is gone anyway). return except Exception as e: yield {"event": "error", "data": json.dumps({"type": type(e).__name__, "message": str(e)})} return EventSourceResponse(event_stream()) @app.get("/health") async def health(): return {"ok": True, "in_flight": 50 - AGENT_SEMAPHORE._value}
# app/main.py import asyncio, json, uuid from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Request from sse_starlette.sse import EventSourceResponse from pydantic import BaseModel from openai import AsyncOpenAI AGENT_SEMAPHORE = asyncio.Semaphore(50) @asynccontextmanager async def lifespan(app: FastAPI): app.state.openai = AsyncOpenAI() yield await app.state.openai.close() app = FastAPI(lifespan=lifespan) class ChatRequest(BaseModel): message: str user_id: str @app.post("/chat/stream") async def chat_stream(req: ChatRequest, request: Request): run_id = str(uuid.uuid4()) async def event_stream(): async with AGENT_SEMAPHORE: try: yield {"event": "start", "data": json.dumps({"run_id": run_id})} async for event in run_agent_streaming( request.app.state.openai, req.message, req.user_id, ): if await request.is_disconnected(): break yield {"event": event["type"], "data": json.dumps(event["payload"])} yield {"event": "done", "data": ""} except asyncio.CancelledError: return except Exception as e: yield {"event": "error", "data": json.dumps({"type": type(e).__name__, "message": str(e)})} return EventSourceResponse(event_stream()) @app.get("/health") async def health(): return {"ok": True, "in_flight": 50 - AGENT_SEMAPHORE._value}
这段代码中有四处不显眼但值得细说的设计。让我逐一讲解。
信号量
AGENT_SEMAPHORE = asyncio.Semaphore(50) 是这个文件里最重要的一行。它将每个副本的并发智能体运行数上限设为 50。为什么重要:如果没有它,一次流量峰值会同时向模型提供商发出 10,000 个并发调用,你会突破速率限制,每个调用都返回 429,级联效应会把整个服务打垮。有了它,第 51 个请求会(优雅地)等待前 50 个中的某一个完成。
合适的数字:用你的提供商速率限制除以你每次运行的平均令牌数,再留出 30% 的余量用于重试和偶尔的长时运行。对于 Anthropic Tier 4,每分钟每组织 8k 令牌,平均每次运行 3k 令牌共 4 轮 = 每次运行 12k 令牌,你大约可以维持……实际上请根据你的具体情况做这道数学题;关键原则是要有一个数字,而不是照抄我的。
客户端断开检测
if await request.is_disconnected(): break 是那条能帮你省钱的代码。没有它,如果用户在运行途中关闭了浏览器标签页,你的智能体会继续生成令牌、继续调用工具、继续花钱——却没有任何人在接收。有了它,循环在下一次迭代时中断,运行终止。检查发生在事件之间,所以它在正常路径上不增加任何延迟。
注意:is_disconnected() 在某些反向代理上有时不稳定——可能需要几秒钟才能检测到断开。这通常没问题;你只是多付了几个令牌的钱。不要尝试比这更积极地检测断开;那会引入更多 bug。
asynccontextmanager 生命周期
Anthropic/OpenAI 客户端有内部连接池。每次请求都创建一个会很浪费。创建一个全局实例是正确做法——但它必须在关闭时被清理。lifespan 上下文管理器是 FastAPI 惯用的方式:它为你提供与应用生命周期绑定的启动和关闭钩子。对任何共享资源(HTTP 客户端、数据库连接池、模型客户端)都使用它。
反模式:在模块导入时创建客户端,却从不关闭它。连接池在关闭时会成为孤儿,文件描述符泄漏,你的容器优雅关闭探针超时。第一次踩到这个坑要花十分钟调试。
智能体作为异步生成器
这是让流式传输感觉自然的设计。run_agent_streaming 是一个 async def 函数,随着运行进展 yield 出事件。端点消费这些事件并将其转发给客户端。智能体不知道自己在 HTTP 端点后面;它只是 yield 事件。这种分离使智能体可以独立测试,使端点成为一个薄薄的适配器。
# agent/streaming.py async def run_agent_streaming(client, user_msg: str, user_id: str): # Initial: emit a status event so the client knows we got the message yield {"type": "status", "payload": {"message": "thinking..."}} messages = [{"role": "user", "content": user_msg}] for step in range(20): # step budget from chapter 1.1 # Stream the model's response token by token (or block by block) async with client.messages.stream( model="claude-sonnet-4-5", max_tokens=4096, tools=TOOLS, messages=messages, ) as stream: async for text in stream.text_stream: yield {"type": "token", "payload": {"text": text}} response = await stream.get_final_message() messages.append({"role": "assistant", "content": response.content}) if response.stop_reason == "end_turn": return # Tool calls: emit per-call events so client can show "Searching docs..." results = [] for block in response.content: if block.type == "tool_use": yield {"type": "tool_start", "payload": {"name": block.name, "args": block.input}} try: result = await HANDLERS[block.name](**block.input) yield {"type": "tool_end", "payload": {"name": block.name, "ok": True}} results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(result)}) except Exception as e: yield {"type": "tool_end", "payload": {"name": block.name, "ok": False, "error": str(e)}} results.append({"type": "tool_result", "tool_use_id": block.id, "content": f"Error: {e}", "is_error": True}) messages.append({"role": "user", "content": results})
客户端收到的内容
浏览器观察到的 SSE 流的形态。这是真实的线上格式,便于调试:
event: start
data: {"run_id": "8c1e..."}
event: status
data: {"message": "thinking..."}
event: token
data: {"text": "I'll search"}
event: token
data: {"text": " the documentation"}
event: tool_start
data: {"name": "search_docs", "args": {"query": "autovacuum"}}
event: tool_end
data: {"name": "search_docs", "ok": true}
event: token
data: {"text": "Autovacuum naptime is..."}
event: done
data:
客户端(浏览器、移动应用、CLI)使用标准 SSE API 消费这些事件。在 JavaScript 中,用一个 EventSource 加上每个事件名的事件监听器。显示逻辑——在 tool_start 期间显示加载动画、动态展示令牌、在 error 时显示错误——由前端决定,但这些事件的设计使得构建一个精致的 UI 成本很低。
部署它
两行 Dockerfile,部署到任何支持容器的地方。生产环境调优只需合理配置 uvicorn:
# Dockerfile FROM python:3.12-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4", # one per CPU core "--timeout-keep-alive", "120"] # long enough for SSE
注意 --workers 4——即使每个工作进程可以并发保持数千个处于飞行中的请求,你仍然需要多个 OS 级进程,这样一个崩溃或卡住的进程不会拖垮整个服务。大约每个 CPU 核一个工作进程是经验法则。--timeout-keep-alive 120 使 HTTP/1.1 连接保持足够长的时间,以支持长时流式运行,而不会被激进地关闭。
在 Cloudflare、ALB、nginx 或任何反向代理后面:关闭代理缓冲。大多数反向代理默认缓冲响应以优化吞吐量(throughput),这意味着 SSE 事件会被积压并以块的形式传递,而不是流式传输。在响应头中设置 X-Accel-Buffering: no(sse-starlette 会自动为你设置),并配置你的反向代理来遵守它。跳过这步,你的"流式"UI 会每 30 秒才更新一次。
线程/进程池是为 CPU 密集型工作设计的。智能体运行是 I/O 密集型的——它们主要在 await 等待模型 API。对异步代码使用信号量才是正确的原语:它在不创建线程的情况下限制并发中的任务数。效果相同(有界并发),开销低得多。
线程的用武之地:如果你有一个无法规避的同步库(例如,某个供应商 SDK 没有提供异步版本),用 asyncio.to_thread(...) 包装它的调用,使其不阻塞事件循环。信号量仍然在其上方的智能体运行层级发挥作用。
这说明你正在触碰模式 A 的范围上限,应该考虑模式 B。对于负载峰值期间几十秒的信号量等待是可以接受的;如果等待时间经常超过 10 秒,就会成为用户体验问题。三种应对方式,按顺序:
- 为"长"运行添加一个单独的信号量(通过步骤数或已用时间检测),避免它们饿死"短"运行。
- 将长时运行路径迁移到模式 B(队列 + 工作进程)。
- 水平扩展——更多副本,每个副本有自己的信号量。数学公式:总并发容量 = 副本数 × 每副本信号量值。
模式 B:用于长时运行的队列驱动工作进程。
模式 A 适合智能体运行数秒的对话式交互。但对于那些真正展现智能体 AI 价值的场景——耗时 10 分钟的深度调研、对 100 份文档的批量处理、需要暂停等待用户审阅的多阶段工作流——它就不管用了。对于这些情况,HTTP 请求-响应模型从根本上就是错的:你没办法保持连接开启那么久,而且即便能,用户也不应该一直开着浏览器标签页等待。
解决方案是提交/轮询架构。客户端发起 POST 请求;服务器立即返回一个 run_id 并将任务入队;工作进程从队列中取出任务并执行;客户端通过单独的 SSE 连接按 run_id 订阅更新。用户可以关闭浏览器,之后再回来查看结果——运行在服务器端持续进行,更新从上次中断的地方继续流式传输。
架构形态
需要独立搭建三个部分:立即返回的提交端点、从队列消费并发出事件的工作进程,以及按 run_id 订阅的流式传输端点。我们将使用 Redis Streams 同时作为队列和事件日志——这是正确的原语(持久、有序,支持带最后 ID 恢复的发布/订阅)。
提交端点
# app/routes/runs.py import uuid, json from fastapi import APIRouter, Request from redis.asyncio import Redis router = APIRouter() @router.post("/runs", status_code=202) async def submit_run(req: ChatRequest, request: Request): redis: Redis = request.app.state.redis run_id = str(uuid.uuid4()) # Persist run metadata so client can poll status at any time await redis.hset(f"run:{run_id}", mapping={ "status": "queued", "user_id": req.user_id, "message": req.message, "created_at": int(time.time()), }) await redis.expire(f"run:{run_id}", 86400) # 24h TTL # Enqueue. XADD is atomic; the worker will pick it up. await redis.xadd("queue:runs", { "run_id": run_id, "user_id": req.user_id, "message": req.message, }) return {"run_id": run_id, "status": "queued"}
该端点在毫秒内返回。202 状态码告诉客户端"已接受但尚未完成"——它们应该通过轮询或订阅来获取结果。
工作进程
一个独立的进程(独立的 Docker 容器、独立的 Kubernetes pod、独立的 Fly.io 机器——取决于你的部署基础设施的叫法)。它永远循环,从队列中拉取任务。
# worker/main.py import asyncio, json, time, signal from redis.asyncio import Redis from anthropic import AsyncAnthropic async def worker_loop(name: str): redis = Redis(host="redis", decode_responses=True) anthropic = AsyncAnthropic() shutdown = asyncio.Event() signal.signal(signal.SIGTERM, lambda *_: shutdown.set()) # Consumer group: multiple workers can share one queue with # at-least-once delivery and automatic re-claim on crash. try: await redis.xgroup_create("queue:runs", "workers", id="0", mkstream=True) except: pass # already exists while not shutdown.is_set(): # Block for up to 5 seconds waiting for new work msgs = await redis.xreadgroup( "workers", name, {"queue:runs": ">"}, count=1, block=5000, ) if not msgs: continue for _, items in msgs: for msg_id, data in items: run_id = data["run_id"] try: await execute_run(redis, anthropic, run_id, data) await redis.xack("queue:runs", "workers", msg_id) except Exception as e: # Mark run as failed but ack so we don't infinite-loop await mark_failed(redis, run_id, e) await redis.xack("queue:runs", "workers", msg_id) async def execute_run(redis, anthropic, run_id: str, data: dict): """Run the agent. Each event also gets appended to a Redis Stream that streaming clients can subscribe to.""" events_key = f"events:{run_id}" await redis.hset(f"run:{run_id}", "status", "running") await redis.xadd(events_key, {"event": "start", "data": ""}) async for event in run_agent_streaming(anthropic, data["message"], data["user_id"]): await redis.xadd(events_key, { "event": event["type"], "data": json.dumps(event["payload"]), }) await redis.xadd(events_key, {"event": "done", "data": ""}) await redis.hset(f"run:{run_id}", "status", "complete") await redis.expire(events_key, 3600) # 1h TTL on event log if __name__ == "__main__": asyncio.run(worker_loop(name=os.environ.get("WORKER_NAME", "worker-1")))
有三点值得停下来细看。消费者组(XREADGROUP)在多个工作进程间提供至少一次(at-least-once)交付:如果某个工作进程在运行中途崩溃,消息未被确认,另一个工作进程可以通过 XPENDING + XCLAIM 认领它。这就是让队列具备弹性的属性。SIGTERM 处理对于优雅关闭至关重要——当平台通知工作进程终止时,先完成当前运行再退出。events:{run_id} 流是流式端点将要订阅的对象;它同时也是每次运行的完整审计日志,以 1 小时 TTL 持久化保存。
流式端点
这是客户端提交运行后连接的端点。它订阅给定 run_id 的事件流,并将事件以 SSE 形式转发。
@router.get("/runs/{run_id}/events") async def stream_events(run_id: str, request: Request, last_event_id: str = Header(default="0")): """Subscribe to a run's event stream. Supports resumption via Last-Event-ID — drop the connection and reconnect, no events lost.""" redis = request.app.state.redis events_key = f"events:{run_id}" async def subscribe(): last_id = last_event_id or "0" while True: if await request.is_disconnected(): break # Read events newer than last_id. Block up to 5s. msgs = await redis.xread({events_key: last_id}, count=20, block=5000) if not msgs: # Periodic keepalive comment so proxies don't time out yield {"comment": "keepalive"} continue for _, items in msgs: for msg_id, data in items: last_id = msg_id yield { "id": msg_id, # client tracks this for resume "event": data["event"], "data": data["data"], } if data["event"] == "done": return return EventSourceResponse(subscribe())
Last-Event-ID 头是 SSE 内置的恢复机制。当客户端在连接断开后重连时,浏览器会自动发回它收到的最后一个事件 ID。我们以此作为 Redis 流中的起始点——客户端从上次中断的地方精确恢复,无重复、无遗漏。没有这个机制,每次不稳定的网络或浏览器标签页切换都会迫使用户重新开始。
完整的生命周期,端到端
t=0 client POSTs /runs
server enqueues, returns 202 {run_id: "8c1e..."}
t=0.05 worker picks up message from queue
t=0.06 client opens /runs/8c1e.../events SSE connection
t=0.10 worker emits "start" event → Redis stream
t=0.10 server forwards "start" to client over SSE
...
t=180 worker emits "tool_start: search_web (query=Q3 GDP)"
t=183 worker emits "tool_end" + tokens
t=240 CLIENT'S WIFI DROPS — SSE connection closes
but the worker keeps running. Events keep landing
in Redis stream.
t=247 client reconnects, sends Last-Event-ID: 240-3
t=247.1 server resumes stream from event 240-3+1
client receives all events from t=240 onward
...
t=412 worker emits "done"
server closes connection
run record persists in Redis for 24h
这是用户期望从智能体获得的体验:提交一个长任务,合上笔记本,回来看结果。提交/轮询架构正是使这成为可能的关键。
对于初始部署,使用 supervisord 等进程管理器将 API 服务和工作进程作为独立进程运行在同一个 Docker 容器中。你获得了架构分离,而无需承担两个服务的运维开销。当流量模式要求时再拆分为独立部署(不同的扩容配置,或工作进程需要不同的资源)。
以上这些都可以用。选择 Redis Streams 的条件是:(a) 你已经在运行 Redis 用于缓存/会话,因此不需要引入新的基础设施;(b) 你希望事件日志同时作为流式订阅后端(一种机制而不是两种);(c) 消费者组提供了你所需要的持久性和重认领语义。
Celery 更重量级,功能也更多(定时任务、复杂工作流、带退避的重试)。如果你需要那些功能就选它。RQ 比 Celery 更简单。SQS 是你在 AWS 上且想要托管基础设施时的正确选择。这些方案的模式都类似——提交端点、工作循环、确认/重试——只是 API 不同。
这是至少一次交付的问题。三种方案,按工作量排序:
- 让工具具备幂等性。如果
send_email有去重键(run_id + tool_call_idx),重试就是安全的。这是最干净的修复,也是长期正确的答案。 - 对进度做检查点。每次工具调用后,将结果以
(run_id, step)为键持久化到 Redis。恢复时,跳过已有结果的步骤。这就是 LangGraph 等智能体运行时替你做的事情。 - 对非幂等工具直接禁用重认领。如果工作进程崩溃,将运行标记为失败,让用户重新提交。最简单,但意味着崩溃对用户可见。对某些产品来说可以接受。
对于首次部署:方案三。对于生产环境:方案一和方案二的组合。
决定你能否持续运行的四个运营关注点。
你已经发布了智能体,它在正常工作。现在运营现实开始了。本步骤涵盖四件决定你的服务能否随负载增长保持可用、经济可行和可调试的事情:状态的存放位置、应该按什么信号自动扩容、如何优雅降级,以及如何防止成本失控。每个都是一小节,但跳过任何一个,就是"部署了一个智能体"和"真正发布了一个智能体"之间的差距。
1. 状态存储:Postgres 存真相,Redis 走热路径
对话历史存放在某处。错误的地方会产生问题。两个存储,两项职责:
Postgres 持有真相。每次对话、每条消息、每次工具调用结果的持久记录。模式(schema)大致如下:
CREATE TABLE conversations ( id UUID PRIMARY KEY, user_id TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), title TEXT, metadata JSONB DEFAULT '{}'::JSONB ); CREATE TABLE messages ( id BIGSERIAL PRIMARY KEY, conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE, role TEXT NOT NULL, -- user / assistant / tool content JSONB NOT NULL, -- the full message blocks token_count INTEGER, cost_usd NUMERIC(10, 6), created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX messages_conv_idx ON messages(conversation_id, id);
content 列是包含完整消息结构的 JSONB——包括 tool_use 和 tool_result 块。用 JSONB 是为了以后可以查询它("上个月智能体调用了多少次 search_docs?"),而无需解析字符串。
Redis 走热路径。在一次活跃对话中,每一轮都需要读取消息历史。每次都查询 Postgres 会增加 5–10 毫秒的延迟,并造成 N+1 查询问题。正确的缓存做法:将近期消息序列化为 JSON,以 conv:{id}:messages 为键存储在 Redis 中,设置 1 小时 TTL,每次新消息时写穿更新。
# app/state.py async def get_messages(redis, db, conversation_id: str) -> list[dict]: # Hot path: Redis cached = await redis.get(f"conv:{conversation_id}:messages") if cached: return json.loads(cached) # Cold path: Postgres rows = await db.fetch( "SELECT content FROM messages WHERE conversation_id = $1 ORDER BY id", conversation_id, ) messages = [r["content"] for r in rows] # Warm the cache for next time await redis.set( f"conv:{conversation_id}:messages", json.dumps(messages), ex=3600, ) return messages async def append_message(redis, db, conversation_id: str, msg: dict): # Write to truth first await db.execute( "INSERT INTO messages (conversation_id, role, content, ...) VALUES ...", conversation_id, msg["role"], json.dumps(msg["content"]), ) # Invalidate cache; will be repopulated on next read await redis.delete(f"conv:{conversation_id}:messages")
不变式:Postgres 是真相;Redis 是可能出错但能自我修复的缓存。写入时使缓存失效是最简单有效的方案——除非有充分理由,否则不要尝试写穿缓存更新。缓存会在下次读取时重新填充,这完全没问题。
2. 按正确的信号自动扩容
如步骤 1 所述,基于 CPU 的自动扩容对智能体永远不会触发。真正与"我需要更多容量"相关的信号:
- 信号量饱和度(模式 A)。
/health端点上的in_flight计数。当其超过上限的 80% 持续超过 1 分钟时扩容。 - 队列深度(模式 B)。
XLEN queue:runs。当深度增长时扩容工作进程;当其持续为零超过 5 分钟时缩容。 - 提交端点的 P95 延迟。即使是应该在毫秒内返回的提交端点,在 Redis 或 Postgres 饱和时也会变慢。监测这个指标以感知上游压力。
- 模型提供商的错误率。429 错误激增意味着你已到达速率限制上限;添加更多副本无济于事,你需要降低每副本的并发数或联系提供商。
大多数托管平台(Fly.io、Modal、Railway)允许你通过接入 Prometheus 或轮询 HTTP 端点来基于自定义指标自动扩容。要暴露的指标:API 副本的 in_flight,工作进程副本的 queue_depth。将它们接入你的平台自动扩容器,设定阈值,发布上线。
3. 优雅降级:你会遇到的三种故障
三种生产环境故障模式足够常见,值得提前计划应对。
模型提供商中断或 5xx 风暴。Anthropic 或 OpenAI 持续 10 分钟返回错误。你的智能体无法运行。正确的行为:向用户提供清晰的错误信息("模型暂时不可用,请稍后重试"),而不是让请求超时。实现方式:一个熔断器(circuit breaker),当错误率在 60 秒内超过 50% 时触发,之后 5 分钟内快速失败,并定期发送探测请求以检测恢复。
# app/circuit.py class CircuitBreaker: def __init__(self, threshold=0.5, window=60, cooldown=300): self.threshold = threshold self.window = window self.cooldown = cooldown self.results: list[tuple[float, bool]] = [] # (timestamp, ok) self.tripped_until = 0 def record(self, ok: bool): now = time.time() self.results.append((now, ok)) self.results = [r for r in self.results if r[0] > now - self.window] if len(self.results) >= 10: error_rate = 1 - sum(ok for _, ok in self.results) / len(self.results) if error_rate > self.threshold: self.tripped_until = now + self.cooldown def is_open(self) -> bool: return time.time() < self.tripped_until
回退到更便宜或不同的模型。当主模型的熔断器触发时,有时可以回退到备用模型——Sonnet → Haiku,GPT-5.5 → GPT-5-mini,或完全不同的提供商。备用模型通常更慢或能力稍弱,但给出一个真实的答案总好过报错。两点注意:备用模型需要自己的熔断器(你不希望两个提供商都宕机时让你永远无法恢复),并且你的评估(eval)(第 1.4 章、第 3.1 章)应该涵盖备用路径,以便你了解在那里的质量表现。
失控的成本。一次有缺陷的提示词(prompt)变更导致智能体每次运行调用 search_docs 50 次,而不是 2 次。你在第二天早上从成本仪表板发现了这个问题,此时已花费 4,000 美元。防御手段:在循环本身中强制执行每用户、每次运行的成本上限。
# In the agent loop, after each model call: run_cost += response.usage.input_tokens * INPUT_PRICE / 1e6 run_cost += response.usage.output_tokens * OUTPUT_PRICE / 1e6 if run_cost > MAX_COST_PER_RUN: log.warn("run_cost_exceeded", run_id=run_id, cost=run_cost) yield {"type": "error", "payload": {"reason": "cost_budget_exceeded"}} return # Also check daily user budget daily = await redis.incrbyfloat(f"cost:user:{user_id}:{today}", run_cost) if daily > MAX_COST_PER_USER_PER_DAY: yield {"type": "error", "payload": {"reason": "daily_budget_exceeded"}} return
合理的默认值:面向终端用户的智能体每次运行 1 美元,每用户每天 20 美元,可信账户允许显式覆盖。这些上限与第 3.1 章中的预算互为补充——那些在回归(regression)发布时把关,这些在悄悄溜进来的回归消费时把关。
4. 运行手册
最后一块,也是最常被跳过的:写下出问题时该怎么做。一两页纯文本,和你的部署配置放在一起。有用的最低要求:
- 如何部署。具体的命令。同时包括正常路径和紧急回滚路径。
- 如何查看正在发生什么。仪表板在哪里。日志在哪里。被呼叫时运行的三条查询。
- 如何让它停下来。紧急开关(第 2.3 章——那个能暂停所有运行的标志)。成本上限开关。模型提供商切换。
- 最可能的五种故障模式以及每种的第一检查项。与第 2.1 章的可观测性(observability)仪表板关联。"延迟 P95 飙升 → 检查 /health 上的信号量饱和度;如果 >90%,扩容。""成本激增 → 检查哪个工具的调用次数增加了;检查过去 24 小时的 PR 中是否有提示词回归。"
运行手册是让另一位工程师(或凌晨三点的未来的你)能够胜任地处理故障的东西。在发布后的第二天写它,而不是在需要它的时候。
三种选项。并发连接数(大多数平台暴露此指标)是运行中请求数的不错代理,前提是你没有做奇怪的事情。1 分钟窗口内的 HTTP 请求率是粗略但可用的信号。外部驱动的扩容:一个小的定时任务,轮询你的指标端点并调用平台的"设置副本数"API。不如内置自动扩容优雅,但在任何平台上都能用。
如果你发现自己在构建复杂的变通方案:这是一个信号,说明你已经超出了当前平台的能力范围。考虑迁移到支持自定义指标一等公民自动扩容的平台(Modal、Kubernetes with HPA、Fly.io 的自动扩容 beta)。
对于初始部署:平台原生 secrets(Fly Secrets、Railway 环境变量、Kubernetes Secrets)。不要把 API 密钥放进 Docker 镜像、Git 仓库或运行时命令行。平台的机制已经足够好。
在以下情况下考虑 HashiCorp Vault、AWS Secrets Manager 或类似工具:(a) 你有多个环境且有密钥轮换要求;(b) 你需要谁访问了哪个 secret 的审计日志;(c) 合规要求。对于大多数发布首个智能体的团队而言,平台原生方案没问题,而且少一件需要搭建的东西。
当它们的扩容配置出现显著差异时。具体来说:API 服务器按连接数扩容(处理大量短请求),工作进程按队列深度扩容(处理较少的长任务)。如果你经常需要扩容其中一个而不需要另一个,拆分可以避免过度配置闲置的那一侧。监测的指标:你是否曾经因为需要工作进程容量而运行了比实际需要更多的 API 副本(反之亦然)?如果是,就拆分。
一个服务的头 6 个月:一个容器包含两者。之后:通常拆分。拆分的成本很低(两个 Dockerfile 而不是一个,两个部署配置);不拆分的代价是过度配置。
30 分钟完成你的首次部署:Fly.io 全程实操。
模式在你真正部署过一次之后更容易内化。本实操演练取步骤 2 中的模式 A 端点,将其部署到挂载了 Redis 实例的 Fly.io,最终得到一个可以 curl 的可用 URL。选择 Fly.io 是因为它是从本地到近似生产的最低开销路径:无需 Kubernetes,无需 AWS 控制台,两条命令搞定。这些原则可以干净地映射到 Railway、Render、Modal 或 AWS App Runner——如果你偏好这些平台,区别仅在于语法。
前提条件:你已将步骤 2 中的模式 A 代码放在某个目录中,有一个可用的 requirements.txt,以及一个 ANTHROPIC_API_KEY(或 OPENAI_API_KEY)在你的环境中。
第 0–5 分钟:项目结构
如果你还没有这种目录结构,先建起来:
my-agent/
├── app/
│ ├── __init__.py
│ ├── main.py # the FastAPI app from Step 2
│ └── agent/
│ ├── __init__.py
│ └── streaming.py # run_agent_streaming() from Step 2
├── requirements.txt
├── Dockerfile
└── fly.toml # we'll create this next
最小可行部署的 requirements.txt:
fastapi==0.115.0 uvicorn[standard]==0.32.0 sse-starlette==2.1.3 anthropic==0.40.0 pydantic==2.9.0
步骤 2 中的 Dockerfile 加一个改动——添加健康检查二进制路径,避免 Fly 的探测器在慢启动期间误报:
# Dockerfile FROM python:3.12-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . ENV PYTHONUNBUFFERED=1 EXPOSE 8000 CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2", "--timeout-keep-alive", "120"]
第 5–10 分钟:Fly 设置和首次启动
如果还没有安装 CLI,先安装:curl -L https://fly.io/install.sh | sh。然后注册并登录:
$ fly auth signup # or: fly auth login
$ fly launch --no-deploy # creates fly.toml interactively
CLI 会问几个问题。首次部署的合理回答:
- 应用名称:全局唯一的名称,例如
my-agent-yourname。 - 区域:最靠近你用户的区域。
- Postgres / Redis / Sentry:暂时全部选否——我们稍后会显式添加 Redis。
- 立即部署:否(我们想先设置 secrets)。
结果是一个 fly.toml 文件。打开它,添加内部端口和健康检查部分——默认值已很接近,这些调整是让它在流式传输下正常工作的关键:
# fly.toml app = "my-agent-yourname" primary_region = "sjc" [build] [http_service] internal_port = 8000 force_https = true auto_stop_machines = "stop" # save $ when idle auto_start_machines = true min_machines_running = 1 # at least one warm replica [[http_service.checks]] interval = "30s" timeout = "5s" grace_period = "10s" method = "GET" path = "/health" [[vm]] size = "shared-cpu-1x" memory = "512mb" # enough for agent + SDK + asyncio
第 10–15 分钟:secrets 和 Redis
将你的 API 密钥设置为 Fly secret(静态加密,运行时以环境变量形式注入):
$ fly secrets set ANTHROPIC_API_KEY=sk-ant-...
$ fly secrets set OPENAI_API_KEY=sk-... # if also using OpenAI
通过 Upstash 添加 Redis(托管服务,起步免费套餐已足够):
$ fly redis create
# CLI walks you through region selection, naming
# On success it sets REDIS_URL as a secret automatically
$ fly redis status <your-redis-name>
# Verify it's running
你的应用代码读取 os.environ["REDIS_URL"]——Fly 已将其作为 secret 注入。无需其他配置。
第 15–20 分钟:部署
$ fly deploy
==> Verifying app config
==> Building image
Image: registry.fly.io/my-agent-yourname:deployment-...
Image size: 187 MB
==> Creating release
==> Monitoring deployment
1 desired, 1 placed, 1 healthy, 0 unhealthy
--> v1 deployed successfully
Visit your newly deployed app at https://my-agent-yourname.fly.dev/
你的应用现已公开在 https://my-agent-yourname.fly.dev/。先检查健康状态:
$ curl https://my-agent-yourname.fly.dev/health
{"ok": true, "in_flight": 0}
现在是真正的测试——流式端点:
$ curl -N -X POST https://my-agent-yourname.fly.dev/chat/stream \
-H "Content-Type: application/json" \
-d '{"message": "what is 2 + 2?", "user_id": "test"}'
event: start
data: {"run_id": "8c1e..."}
event: status
data: {"message": "thinking..."}
event: token
data: {"text": "2"}
event: token
data: {"text": " + 2"}
event: token
data: {"text": " equals 4"}
event: done
data:
curl 上的 -N 标志禁用缓冲,让你实时看到令牌流式传输。看着事件一条一条到达,是流式传输真正端到端工作的直观确认。
第 20–25 分钟:可观测性健康检查
Fly 内置了日志和基本指标。你最常用的命令:
$ fly logs # tail logs from all replicas
$ fly logs --instance <machine-id> # tail one replica
$ fly status # replica health, memory, CPU
$ fly dashboard # opens web UI in browser
如果你已接入第 2.1 章的可观测性(OpenTelemetry → Langfuse),现在就是确认追踪是否到达你的仪表板的时候了。发几个请求,打开 Langfuse,看它们以正确的跨度(span)树形式出现。如果没有,OTLP 导出器 URL 或认证头有问题——查看 fly logs 中的错误。
第 25–30 分钟:扩容和锁定
你已上线,但还是单副本、无扩容、无速率限制。三条命令就能让它更接近生产就绪:
# Autoscale to 1-3 replicas based on connection count.
$ fly autoscale set min=1 max=3 \
--metric concurrent_connections=20
# Add a simple rate limit at the Fly edge (10 req/min per IP).
# Edit fly.toml:
[http_service.concurrency]
type = "requests"
hard_limit = 25
soft_limit = 20
# Cost ceiling at the platform level — set a monthly budget alert.
# In the web dashboard: Organization → Billing → Spend Alert.
# Set to $50/mo so an accidental loop doesn't bankrupt you.
用 fly deploy 重新部署。你现在拥有一个智能体,它:
- 通过 HTTPS 在稳定的 URL 提供服务,使用托管证书。
- 实时通过 SSE 将令牌流式传输给客户端。
- 有健康检查,崩溃时自动重启。
- 根据负载在 1 到 3 个副本之间弹性扩容。
- 在闲置时段停止空闲机器以节省费用。
- 附加了 Redis,为你需要时的模式 B 升级做好准备。
- 日志可实时查看和查询。
- 有消费告警,失控循环不会让你措手不及。
这还不是什么
诚实地说你发布了什么:这是"近似生产",而不是"生产就绪"。具体而言,你还没有接入模式 B(没有工作进程;长时运行仍然会遇到 HTTP 超时)、状态存储分离(没有 Postgres;对话在重启后不持久),也没有熔断器(如果 Anthropic 出现 5xx 风暴,你的用户会看到错误,没有优雅降级)。那些是下一个部署里程碑;30 分钟版本是地板,不是天花板。
你确实拥有的是基础:一个服务真实流量的真实 URL,具备正确的形态来持续演进。模式 B 就是"在 fly.toml 中将工作进程添加为第二个机器组"。状态存储分离就是"添加 Fly Postgres,编写模式迁移"。熔断器只需在智能体代码中增加几百行。这些都不需要重新架构;它们会叠加在你现有的基础上。
映射到其他平台
如果你不用 Fly,同样的工作流可以干净地迁移:
- Railway:推送到 GitHub,点击"从仓库部署",在控制台中设置环境变量。添加 Redis 作为插件。同样的 Dockerfile 可用。
- Render:从你的仓库创建一个 Web Service。添加 Redis 实例。设置环境变量。同样的 Dockerfile。
- Modal:用
@app.asgi_app()包装你的 FastAPI 应用,用modal deploy部署。Redis 通过 Upstash。 - AWS App Runner:将你的镜像推送到 ECR,创建一个指向它的 App Runner 服务。Redis 通过 ElastiCache。
- Kubernetes:如果你有集群,一个基础的 Deployment + Service + ConfigMap 就够了。不要仅仅为这个就用 k8s;除非你有多个服务或特定的网络需求,否则它过于复杂。
任何平台的经济基准:在 Fly 上,中等流量的小型智能体每月计算成本 5–20 美元(免费套餐覆盖大部分),Railway 或 Render 上是 10–30 美元,再加上你向模型提供商支付的实际使用费。模型账单通常是平台成本的 10–100 倍。不要过度优化平台支出。
交付物
在你选择的云平台上部署的智能体,采用真正重要的生产模式为真实流量提供服务:用于短时运行的 SSE 流式端点、用于长时运行的队列加工作进程组合、Postgres 加 Redis 的状态分离、基于真正有意义的信号的自动扩容、应对实际会遇到的故障模式的熔断器和每用户成本上限,以及你未来的自己在凌晨三点能看懂的运行手册。在接入第 2.1 章的可观测性和第 2.3 章的安全措施之后,这便是智能体发布所依托的基础设施。
- FastAPI + sse-starlette 流式端点,具备信号量限制的并发控制
- 带令牌流式传输和 tool_start/tool_end 事件的异步智能体循环
- 客户端断开检测(request.is_disconnected),防止失控运行
- 基于 Redis Streams 和消费者组的提交/轮询端点对
- 带 SIGTERM 处理和至少一次交付的工作进程
- 通过 Last-Event-ID 实现 SSE 恢复,增强对不稳定网络的韧性
- Postgres 存储对话真相;Redis 缓存配合写入时失效
- 基于信号量饱和度(API)和队列深度(工作进程)的自动扩容
- 应对模型提供商中断的熔断器;备用模型已接入
- 在循环中强制执行的每次运行和每用户每天成本上限
- 涵盖部署、观测、紧急开关和前五大故障模式的运行手册