2.4
Part II / Ship · From laptop to cloud

Deployment: agents don't fit normal web APIs.

Your agent works in a REPL. Now it needs to serve real users, survive crashes, scale under uneven load, and degrade gracefully when things go wrong. Agents have unusual shape — long-running, stateful, expensive per request, prone to surprise — and the standard advice for normal web services breaks against them in specific ways. This chapter teaches the patterns that work: synchronous endpoints with SSE streaming for fast runs, queue-backed workers for long runs, the state store split, and the operational guardrails that keep your bill from blowing up at 3am.

STEP 1

The shape problem.

Normal web services have predictable shape. A request comes in. Some code runs. A response goes out. Latency is usually under a second; if it isn't, you've done something wrong. CPU is the bottleneck. Sessions are stateless. Horizontal scaling means "add more replicas." None of this describes an agent.

Agents are I/O-bound (waiting on model APIs), can run for minutes or hours, hold conversation state across turns, cost real money per request, and exhibit failure modes — model outages, rate limits, runaway tool loops — that don't show up in conventional services. If you take the standard playbook ("wrap your code in Flask, put it behind nginx, scale on CPU") and apply it directly, you get a stack that's wrong in three specific ways before you've even shipped.

Three things that break

Break 1: HTTP timeouts during long runs. Browsers, load balancers, and CDNs all have request timeouts — typically 30 seconds for browsers, 60 seconds for ALB, 100 seconds for Cloudflare's free tier. Your agent might run for 5 minutes on a deep research task. If you hold an HTTP connection open that whole time, the connection drops mid-run and the user gets nothing. The standard "synchronous request/response" pattern is not compatible with multi-minute agent runs.

Break 2: WSGI workers blocked on model calls. Flask, Django, Rails — all WSGI-based — assign one OS thread per request. That thread sits blocked while you wait for the model API. Four workers, four concurrent users; user number five waits in a queue. For a typical agent that spends 95% of its time in await client.messages.create(...), this means 95% of your worker capacity is idle while the queue grows. The standard "more workers" answer doesn't help because each worker is still blocking on a network call.

Break 3: Autoscaling on CPU never fires. CPU utilization on an agent server is typically under 10%. The bottleneck isn't compute — it's concurrent in-flight model calls, latency-budget consumption, and queue depth. Standard CPU-based autoscaling never triggers. You either hand-tune fixed replica counts (and over-provision to handle peaks) or you find a different scaling signal.

The decision tree

The fix for all three is the same idea: match your deployment shape to your agent's expected duration. Three patterns cover almost every case.

┌────────────────────────────────────────────────────────────────┐ │ WHICH PATTERN DO I NEED? │ │ │ │ Q1: How long does a typical agent run take? │ │ │ │ < 30 seconds │ │ ↓ │ │ Pattern A: SYNCHRONOUS WITH STREAMING │ │ Single FastAPI endpoint, async/await, stream tokens │ │ back over SSE. HTTP stays open the whole time. │ │ Q2 below. │ │ │ │ 30 seconds – 5 minutes │ │ ↓ │ │ Still works synchronously — IF you control the whole │ │ network path (no Cloudflare, no aggressive timeouts) and │ │ tune your reverse proxy. Otherwise → Pattern B. │ │ │ │ > 5 minutes (deep research, multi-stage workflows) │ │ ↓ │ │ Pattern B: SUBMIT / POLL with a QUEUE │ │ POST /runs returns immediately with a run_id. │ │ Worker pulls from queue, executes, streams updates to │ │ Redis/Postgres. Client subscribes to updates by run_id. │ │ │ │ Q2: Do I need server-side state across multiple requests? │ │ (Conversation history, user preferences, mid-run memory) │ │ │ │ Stateless (one-shot questions) │ │ ↓ │ │ Done. Each request is self-contained. │ │ │ │ Stateful (conversations, sessions) │ │ ↓ │ │ Pattern C: STATE STORE SPLIT │ │ Postgres for durability (the truth). │ │ Redis for hot path (sub-millisecond reads on each turn). │ │ See Step 4 for the schema. │ └────────────────────────────────────────────────────────────────┘

Most production agents end up with some combination — Pattern A for short interactive runs and Pattern B for long background runs, sharing the same Pattern C state store. The chapter walks each in order.

The async-from-the-start commitment

Before any of the patterns: pick FastAPI (or Starlette, or any ASGI framework), not Flask. This is non-negotiable for agents. The reason is the I/O-bound nature: a FastAPI worker can hold thousands of in-flight await client.messages.create(...) calls concurrently in a single process, because each one yields control while waiting. The same workload on Flask would require thousands of OS threads, which is not viable.

The mental model: with ASGI, your concurrency limit is set by your API provider's rate limit, not by your server's worker count. That's the correct shape.

Question
My agent is currently in a Flask app that's working fine for low traffic. Do I really need to migrate?

If you have <5 concurrent users and runs are under a few seconds, Flask is fine. The migration becomes urgent at the point where any one of these is true: you exceed ~20 concurrent users; your runs occasionally take >30 seconds; you start streaming tokens back to clients; you need to do parallel tool calls efficiently. Below that threshold, the engineering cost of migrating exceeds the gain.

The honest version: most teams discover they need ASGI sooner than they expected, because traffic grows or run length grows or both. Doing the migration once at 5 users is cheaper than doing it under fire at 50.

Question
Do I need Kubernetes for any of this?

Almost certainly not for a starting deployment. Fly.io, Railway, Render, Modal, AWS App Runner — all of these let you deploy a containerized FastAPI app with one command, and they handle the boring parts (HTTPS, load balancing, basic autoscaling). For a single-team agent serving up to several thousand users a day, this is the right level of abstraction.

Reach for Kubernetes when you genuinely need: multiple agent services that share infrastructure, control over networking between services, complex deployment topologies (canary, blue/green), or your company already runs on it. Don't reach for it because it's the default answer at scale-up companies.

STEP 2

Pattern A: synchronous endpoint with SSE streaming.

For runs that complete in under 30 seconds, the simplest production-shaped deployment is a FastAPI endpoint that runs the agent and streams updates back over Server-Sent Events. The client sees output as it's generated; the HTTP connection stays open until the agent finishes; no queues, no workers, no extra moving parts.

Why SSE and not WebSockets: agent output is unidirectional — server pushes to client. SSE handles unidirectional streaming with less complexity, works through corporate proxies and HTTP/1.1 infrastructure without special configuration, and supports automatic reconnection at the protocol level via the Last-Event-ID header. WebSockets buy you bidirectional communication you almost never need for agent output. Save them for the "stop" button, which can be its own tiny endpoint.

The full endpoint

This is the production-shaped version — async throughout, semaphore-limited concurrency, request-scoped tracing, structured errors, clean cancellation. The single file you'd actually deploy.

# app/main.py
import asyncio, json, uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
from anthropic import AsyncAnthropic

# Cap concurrent in-flight agent runs.  Don't let a traffic spike
# issue 10,000 simultaneous model calls and blow your rate limit.
AGENT_SEMAPHORE = asyncio.Semaphore(50)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Share one AsyncAnthropic client across all requests.
    # Has internal connection pool; creating per-request is wasteful.
    app.state.anthropic = AsyncAnthropic()
    yield
    await app.state.anthropic.close()

app = FastAPI(lifespan=lifespan)

class ChatRequest(BaseModel):
    message: str
    user_id: str

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest, request: Request):
    run_id = str(uuid.uuid4())

    async def event_stream():
        # Hold the semaphore for the duration of the run.
        async with AGENT_SEMAPHORE:
            try:
                yield {"event": "start",
                       "data": json.dumps({"run_id": run_id})}

                # Drive the agent loop; emit events as the run progresses.
                async for event in run_agent_streaming(
                    request.app.state.anthropic,
                    req.message,
                    req.user_id,
                ):
                    # Client disconnected?  Stop generating tokens (and stop paying).
                    if await request.is_disconnected():
                        break
                    yield {"event": event["type"],
                           "data": json.dumps(event["payload"])}

                yield {"event": "done", "data": ""}

            except asyncio.CancelledError:
                # Client disconnected mid-run; clean up and don't re-raise the cancellation
                # as an error event (the connection is gone anyway).
                return
            except Exception as e:
                yield {"event": "error",
                       "data": json.dumps({"type": type(e).__name__,
                                          "message": str(e)})}

    return EventSourceResponse(event_stream())

@app.get("/health")
async def health():
    return {"ok": True,
            "in_flight": 50 - AGENT_SEMAPHORE._value}
# app/main.py
import asyncio, json, uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
from openai import AsyncOpenAI

AGENT_SEMAPHORE = asyncio.Semaphore(50)

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.openai = AsyncOpenAI()
    yield
    await app.state.openai.close()

app = FastAPI(lifespan=lifespan)

class ChatRequest(BaseModel):
    message: str
    user_id: str

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest, request: Request):
    run_id = str(uuid.uuid4())

    async def event_stream():
        async with AGENT_SEMAPHORE:
            try:
                yield {"event": "start",
                       "data": json.dumps({"run_id": run_id})}

                async for event in run_agent_streaming(
                    request.app.state.openai,
                    req.message,
                    req.user_id,
                ):
                    if await request.is_disconnected():
                        break
                    yield {"event": event["type"],
                           "data": json.dumps(event["payload"])}

                yield {"event": "done", "data": ""}

            except asyncio.CancelledError:
                return
            except Exception as e:
                yield {"event": "error",
                       "data": json.dumps({"type": type(e).__name__,
                                          "message": str(e)})}

    return EventSourceResponse(event_stream())

@app.get("/health")
async def health():
    return {"ok": True,
            "in_flight": 50 - AGENT_SEMAPHORE._value}

Four things in this code are non-obvious and earn their place. Let me walk them.

The semaphore

AGENT_SEMAPHORE = asyncio.Semaphore(50) is the most important line in the file. It caps concurrent agent runs at 50 per replica. Why this matters: without it, a traffic spike issues 10,000 simultaneous calls to the model provider, you blow your rate limit, every call returns 429, and the cascade takes the whole service down. With it, the 51st request waits (gracefully) for one of the first 50 to finish.

The right number: divide your provider rate limit by your average tokens per run, then leave 30% headroom for retries and the occasional long run. For Anthropic Tier 4 with 8k tokens/minute per organization and an average run of 3k tokens with 4 turns = 12k tokens per run, you can sustain about... actually do the math for your specific setup; the principle is to have a number, not to copy mine.

Client disconnect detection

if await request.is_disconnected(): break is the line that saves you money. Without it, if the user closes their browser tab mid-run, your agent keeps generating tokens, keeps calling tools, keeps spending money — to nobody. With it, the loop breaks at the next iteration and the run terminates. The check happens between events, so it adds zero latency to the normal path.

Be careful: is_disconnected() is sometimes flaky on certain reverse proxies — it may not detect the disconnect for a few seconds. That's usually fine; you'll just pay for a few extra tokens. Don't try to detect disconnects more aggressively than this; you'll create more bugs than you fix.

The asynccontextmanager lifespan

The Anthropic/OpenAI clients have internal connection pools. Creating one per request would be wasteful. Creating one global instance is the right move — but it has to be cleaned up on shutdown. The lifespan context manager is FastAPI's idiomatic way to do this: it gives you setup and teardown hooks tied to the app's lifecycle. Use it for any shared resource (HTTP clients, DB pools, model clients).

Anti-pattern: putting the client creation at module import time and never closing it. The connection pool will be orphaned on shutdown, file descriptors leak, and your container's graceful-shutdown probe times out. Costs you ten minutes of debugging the first time it bites.

The agent-as-async-generator

This is the design that makes streaming feel natural. run_agent_streaming is an async def function that yields events as the run progresses. The endpoint consumes those events and forwards them to the client. The agent doesn't know it's behind an HTTP endpoint; it just yields events. This separation makes the agent testable in isolation and makes the endpoint a thin adapter.

# agent/streaming.py
async def run_agent_streaming(client, user_msg: str, user_id: str):
    # Initial: emit a status event so the client knows we got the message
    yield {"type": "status", "payload": {"message": "thinking..."}}

    messages = [{"role": "user", "content": user_msg}]
    for step in range(20):  # step budget from chapter 1.1
        # Stream the model's response token by token (or block by block)
        async with client.messages.stream(
            model="claude-sonnet-4-5",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        ) as stream:
            async for text in stream.text_stream:
                yield {"type": "token", "payload": {"text": text}}
            response = await stream.get_final_message()

        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason == "end_turn":
            return

        # Tool calls: emit per-call events so client can show "Searching docs..."
        results = []
        for block in response.content:
            if block.type == "tool_use":
                yield {"type": "tool_start",
                       "payload": {"name": block.name, "args": block.input}}
                try:
                    result = await HANDLERS[block.name](**block.input)
                    yield {"type": "tool_end",
                           "payload": {"name": block.name, "ok": True}}
                    results.append({"type": "tool_result",
                                    "tool_use_id": block.id, "content": str(result)})
                except Exception as e:
                    yield {"type": "tool_end",
                           "payload": {"name": block.name, "ok": False, "error": str(e)}}
                    results.append({"type": "tool_result",
                                    "tool_use_id": block.id, "content": f"Error: {e}",
                                    "is_error": True})
        messages.append({"role": "user", "content": results})

What the client sees

The shape of the SSE stream as observed by the browser. Real wire format, useful for debugging:

event: start
data: {"run_id": "8c1e..."}

event: status
data: {"message": "thinking..."}

event: token
data: {"text": "I'll search"}

event: token
data: {"text": " the documentation"}

event: tool_start
data: {"name": "search_docs", "args": {"query": "autovacuum"}}

event: tool_end
data: {"name": "search_docs", "ok": true}

event: token
data: {"text": "Autovacuum naptime is..."}

event: done
data: 

The client (browser, mobile app, CLI) consumes these events with the standard SSE API. In JavaScript, an EventSource with event listeners per event name. The display logic — show a spinner during tool_start, animate tokens, show errors on error — is up to the front-end, but the events are designed to make a polished UI cheap.

Deploying it

Two-line Dockerfile, deploy to anywhere that runs containers. Production tuning is just configuring uvicorn appropriately:

# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app",
     "--host", "0.0.0.0",
     "--port", "8000",
     "--workers", "4",            # one per CPU core
     "--timeout-keep-alive", "120"]   # long enough for SSE

Note --workers 4 — even though each worker can hold thousands of concurrent in-flight requests, you still want multiple OS-level processes so that one crashing or stuck doesn't take down the whole service. Roughly one worker per CPU core is the rule of thumb. --timeout-keep-alive 120 keeps the HTTP/1.1 connection open long enough for long-streaming runs without aggressive teardowns.

Behind Cloudflare, ALB, nginx, or any reverse proxy: turn off proxy buffering. Most reverse proxies buffer responses by default to optimize throughput, which means SSE events get held up and delivered in chunks instead of streaming. Set X-Accel-Buffering: no in response headers (sse-starlette does this for you), and configure your reverse proxy to honor it. Skip this and your "streaming" UI updates in 30-second bursts.

Question
Why a semaphore and not a thread pool / process pool?

Thread/process pools are for CPU-bound work. Agent runs are I/O-bound — they're mostly awaiting on the model API. A semaphore on async code is the right primitive: it limits concurrent in-flight tasks without spawning threads. Same effect (bounded concurrency), much lower overhead.

Where threads come back: if you have a synchronous library you can't avoid (e.g., a vendor SDK that doesn't ship an async version), wrap its calls in asyncio.to_thread(...) so they don't block the event loop. The semaphore still applies at the agent-run level above it.

Question
What if a single run blocks the semaphore for 60 seconds and others wait?

That's a sign you're hitting the upper end of Pattern A's range and should consider Pattern B. The semaphore-wait is acceptable for tens of seconds of queueing under load spikes; it becomes a UX problem if the wait routinely exceeds 10 seconds. Three responses, in order:

  • Add a separate semaphore for "long" runs (detected by step count or elapsed time) so they don't starve "short" runs.
  • Move long-running paths to Pattern B (queue + worker).
  • Scale horizontally — more replicas, each with its own semaphore. The math: total concurrent capacity = replicas × per-replica semaphore.
STEP 3

Pattern B: queue-backed worker for long runs.

Pattern A is right for chat-style interactions where the agent runs for seconds. It breaks for the cases that actually showcase agentic AI — deep research that takes 10 minutes, batch processing of 100 documents, multi-stage workflows that pause for user review. For these, the HTTP request-response model is wrong on its face: you can't hold a connection open for that long, and even if you could, the user shouldn't have to keep their browser tab open the whole time.

The fix is a submit/poll architecture. The client POSTs a request; the server immediately returns a run_id and queues the work; a worker picks up the queued job and executes it; the client subscribes to updates by run_id over a separate SSE connection. The user can close their browser and come back later — the run keeps going server-side, and updates are streamed from where they left off.

The shape

┌─────────────────────────────────────────────────────────────────┐ │ │ │ client fastapi redis queue │ │ │ │ │ │ │ │ POST /runs │ │ │ │ │ ──────────────────────►│ │ │ │ │ │ XADD runs │ │ │ │ │ ────────────────────►│ │ │ │ 202 {run_id} │ │ │ │ │ ◄──────────────────────│ │ │ │ │ │ │ XREAD │ │ │ │ │◄─────────┐ │ │ │ │ │ │ │ │ │ GET /runs/{id}/events │ │ worker│ │ │ ──────────────────────►│ │ XADD events│ │ │ (SSE connection) │ XREAD events:{id} │◄─────────│ │ │ │ │ ◄────────────────────│ │ │ │ │ event: token … │ │ ... │ │ │ │ ◄──────────────────────│ │ │ │ │ │ │ │ XADD done│ │ │ │ event: done │ │◄─────────┘ │ │ │ ◄──────────────────────│ │ │ │ │ │ Two-process architecture: API server (fast, never blocks) + │ │ worker process (slow, runs agents). Connected by Redis │ │ streams. Client subscribes by run_id, can reconnect at any │ │ time using Last-Event-ID. │ └─────────────────────────────────────────────────────────────────┘

Three independent things to set up: the submission endpoint that returns immediately, the worker that consumes from the queue and emits events, and the streaming endpoint that subscribes by run_id. We'll use Redis Streams for both the queue and the event log — it's the right primitive (durable, ordered, supports pub/sub with last-id resumption).

The submission endpoint

# app/routes/runs.py
import uuid, json
from fastapi import APIRouter, Request
from redis.asyncio import Redis

router = APIRouter()

@router.post("/runs", status_code=202)
async def submit_run(req: ChatRequest, request: Request):
    redis: Redis = request.app.state.redis
    run_id = str(uuid.uuid4())

    # Persist run metadata so client can poll status at any time
    await redis.hset(f"run:{run_id}", mapping={
        "status": "queued",
        "user_id": req.user_id,
        "message": req.message,
        "created_at": int(time.time()),
    })
    await redis.expire(f"run:{run_id}", 86400)  # 24h TTL

    # Enqueue.  XADD is atomic; the worker will pick it up.
    await redis.xadd("queue:runs", {
        "run_id": run_id,
        "user_id": req.user_id,
        "message": req.message,
    })

    return {"run_id": run_id, "status": "queued"}

The endpoint returns in milliseconds. The 202 status code tells clients "accepted but not finished" — they should expect to poll or subscribe for results.

The worker

A separate process (separate Docker container, separate Kubernetes pod, separate Fly.io machine — whatever your deployment substrate calls it). It loops forever, pulling from the queue.

# worker/main.py
import asyncio, json, time, signal
from redis.asyncio import Redis
from anthropic import AsyncAnthropic

async def worker_loop(name: str):
    redis = Redis(host="redis", decode_responses=True)
    anthropic = AsyncAnthropic()
    shutdown = asyncio.Event()
    signal.signal(signal.SIGTERM, lambda *_: shutdown.set())

    # Consumer group: multiple workers can share one queue with
    # at-least-once delivery and automatic re-claim on crash.
    try:
        await redis.xgroup_create("queue:runs", "workers", id="0", mkstream=True)
    except:
        pass  # already exists

    while not shutdown.is_set():
        # Block for up to 5 seconds waiting for new work
        msgs = await redis.xreadgroup(
            "workers", name, {"queue:runs": ">"},
            count=1, block=5000,
        )
        if not msgs:
            continue

        for _, items in msgs:
            for msg_id, data in items:
                run_id = data["run_id"]
                try:
                    await execute_run(redis, anthropic, run_id, data)
                    await redis.xack("queue:runs", "workers", msg_id)
                except Exception as e:
                    # Mark run as failed but ack so we don't infinite-loop
                    await mark_failed(redis, run_id, e)
                    await redis.xack("queue:runs", "workers", msg_id)

async def execute_run(redis, anthropic, run_id: str, data: dict):
    """Run the agent.  Each event also gets appended to a Redis
    Stream that streaming clients can subscribe to."""
    events_key = f"events:{run_id}"
    await redis.hset(f"run:{run_id}", "status", "running")
    await redis.xadd(events_key,
                       {"event": "start", "data": ""})

    async for event in run_agent_streaming(anthropic, data["message"], data["user_id"]):
        await redis.xadd(events_key, {
            "event": event["type"],
            "data": json.dumps(event["payload"]),
        })

    await redis.xadd(events_key, {"event": "done", "data": ""})
    await redis.hset(f"run:{run_id}", "status", "complete")
    await redis.expire(events_key, 3600)  # 1h TTL on event log

if __name__ == "__main__":
    asyncio.run(worker_loop(name=os.environ.get("WORKER_NAME", "worker-1")))

Three things worth pausing on. Consumer groups (XREADGROUP) give you at-least-once delivery across multiple workers: if a worker crashes mid-run, the message is unacked, and another worker can claim it via XPENDING + XCLAIM. This is the property that makes the queue resilient. SIGTERM handling matters for graceful shutdown — when your platform tells the worker to terminate, you finish the current run before exiting. The events:{run_id} stream is what the streaming endpoint will subscribe to; it's also a complete audit log of every run, persisted with a 1-hour TTL.

The streaming endpoint

This is what the client connects to after submitting a run. It subscribes to the events stream for the given run_id and forwards events as SSE.

@router.get("/runs/{run_id}/events")
async def stream_events(run_id: str, request: Request,
                         last_event_id: str = Header(default="0")):
    """Subscribe to a run's event stream.  Supports resumption via
    Last-Event-ID — drop the connection and reconnect, no events lost."""
    redis = request.app.state.redis
    events_key = f"events:{run_id}"

    async def subscribe():
        last_id = last_event_id or "0"
        while True:
            if await request.is_disconnected():
                break

            # Read events newer than last_id.  Block up to 5s.
            msgs = await redis.xread({events_key: last_id}, count=20, block=5000)
            if not msgs:
                # Periodic keepalive comment so proxies don't time out
                yield {"comment": "keepalive"}
                continue

            for _, items in msgs:
                for msg_id, data in items:
                    last_id = msg_id
                    yield {
                        "id": msg_id,           # client tracks this for resume
                        "event": data["event"],
                        "data": data["data"],
                    }
                    if data["event"] == "done":
                        return

    return EventSourceResponse(subscribe())

The Last-Event-ID header is SSE's built-in resumption mechanism. When a client reconnects after a dropped connection, the browser automatically sends back the last event ID it received. We use that as the starting point in the Redis stream — the client picks up exactly where it left off, no duplicates, no gaps. Without this, every flaky network or browser tab change would force the user to start over.

The lifecycle, end-to-end

  t=0       client POSTs /runs
            server enqueues, returns 202 {run_id: "8c1e..."}
  t=0.05    worker picks up message from queue
  t=0.06    client opens /runs/8c1e.../events SSE connection
  t=0.10    worker emits "start" event → Redis stream
  t=0.10    server forwards "start" to client over SSE
  ...
  t=180     worker emits "tool_start: search_web (query=Q3 GDP)"
  t=183     worker emits "tool_end" + tokens
  t=240     CLIENT'S WIFI DROPS — SSE connection closes
            but the worker keeps running.  Events keep landing
            in Redis stream.
  t=247     client reconnects, sends Last-Event-ID: 240-3
  t=247.1   server resumes stream from event 240-3+1
            client receives all events from t=240 onward
  ...
  t=412     worker emits "done"
            server closes connection
            run record persists in Redis for 24h

This is the experience users expect from an agent. Submit a long task, close your laptop, come back, see the result. The submit/poll architecture is what makes that possible.

For a starting deployment, run both the API server and the worker as separate processes in the same Docker container using a process manager like supervisord. You get the architectural separation without the operational overhead of two services. Split into separate deployments later if traffic patterns demand it (different scaling profiles, or the worker needs different resources).

Question
Why Redis Streams and not Celery / RQ / SQS?

All of these work. Redis Streams is the choice if (a) you're already running Redis for caching / sessions, so you don't add a new piece of infrastructure, (b) you want the event log to also serve as the streaming-subscription backend (one mechanism instead of two), and (c) consumer groups give you the durability and re-claim semantics you need.

Celery is heavier and has more features (scheduled jobs, complex workflows, retries with backoff). Pick it if you need those features. RQ is simpler than Celery. SQS is the right choice if you're on AWS and want managed infrastructure. The patterns are similar across all of them — submission endpoint, worker loop, ack/retry — only the API differs.

Question
What about idempotency? If the worker crashes mid-run and another picks it up, the agent runs twice.

This is the at-least-once problem. Three approaches, in order of effort:

  • Make tools idempotent. If send_email has a deduplication key (the run_id + a tool_call_idx), retrying is safe. This is the cleanest fix and the right long-term answer.
  • Checkpoint progress. After each tool call, persist the result to Redis keyed by (run_id, step). On resume, skip steps that already have results. This is what LangGraph and similar agent runtimes do for you.
  • Just disable re-claim for non-idempotent tools. If a worker crashes, mark the run failed and let the user resubmit. Simplest, but means crashes are visible to users. Acceptable for some products.

For a first deployment: option 3. For production: combination of 1 and 2.

STEP 4

The four operational concerns that determine whether you stay up.

You've shipped the agent. It works. Now the operational reality starts. This step covers the four things that determine whether your service stays available, affordable, and debuggable as load grows: where state lives, what signal to autoscale on, how to degrade gracefully, and how to keep cost from blowing up. Each is a small section, but skipping any of them is what separates "deployed an agent" from "shipped an agent."

1. State store: Postgres for truth, Redis for hot path

Conversation history lives somewhere. The wrong somewhere creates problems. Two stores, two jobs:

Postgres holds the truth. The durable record of every conversation, every message, every tool result. Schema is roughly:

CREATE TABLE conversations (
    id              UUID PRIMARY KEY,
    user_id         TEXT NOT NULL,
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    updated_at      TIMESTAMPTZ DEFAULT NOW(),
    title           TEXT,
    metadata        JSONB DEFAULT '{}'::JSONB
);

CREATE TABLE messages (
    id              BIGSERIAL PRIMARY KEY,
    conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
    role            TEXT NOT NULL,    -- user / assistant / tool
    content         JSONB NOT NULL,   -- the full message blocks
    token_count     INTEGER,
    cost_usd        NUMERIC(10, 6),
    created_at      TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX messages_conv_idx
    ON messages(conversation_id, id);

The content column is JSONB containing the full message structure — including tool_use and tool_result blocks. JSONB so you can query it later ("how many times did the agent call search_docs last month?") without parsing strings.

Redis holds the hot path. For an active conversation, every turn reads the message history. Hitting Postgres for that on every turn adds 5–10ms of latency and creates an N+1 query problem. The right cache: serialize the recent messages as JSON, stash in Redis under conv:{id}:messages, set a 1-hour TTL, write through on every new message.

# app/state.py
async def get_messages(redis, db, conversation_id: str) -> list[dict]:
    # Hot path: Redis
    cached = await redis.get(f"conv:{conversation_id}:messages")
    if cached:
        return json.loads(cached)

    # Cold path: Postgres
    rows = await db.fetch(
        "SELECT content FROM messages WHERE conversation_id = $1 ORDER BY id",
        conversation_id,
    )
    messages = [r["content"] for r in rows]

    # Warm the cache for next time
    await redis.set(
        f"conv:{conversation_id}:messages",
        json.dumps(messages),
        ex=3600,
    )
    return messages

async def append_message(redis, db, conversation_id: str, msg: dict):
    # Write to truth first
    await db.execute(
        "INSERT INTO messages (conversation_id, role, content, ...) VALUES ...",
        conversation_id, msg["role"], json.dumps(msg["content"]),
    )
    # Invalidate cache; will be repopulated on next read
    await redis.delete(f"conv:{conversation_id}:messages")

The invariant: Postgres is the truth; Redis is a cache that can be wrong and self-heal. Cache invalidation on writes is the simplest scheme that works — don't try to do write-through cache updates unless you have a reason. The cache is repopulated on the next read; that's fine.

2. Autoscale on the right signal

CPU-based autoscaling, as covered in Step 1, never triggers for agents. The signals that actually correlate with "I need more capacity":

  • Semaphore saturation (Pattern A). The in_flight count on the /health endpoint. Scale up when it's been >80% of cap for >1 minute.
  • Queue depth (Pattern B). XLEN queue:runs. Scale workers up when the depth grows; scale down when it's been zero for >5 minutes.
  • P95 latency on submit. Even the submit endpoint (which should be milliseconds) gets slow if Redis or Postgres are saturated. Watch this for upstream pressure.
  • Error rate from the model provider. 429 spikes mean you're at rate-limit ceiling; scaling more replicas doesn't help, you need to reduce per-replica concurrency or contact the provider.

Most managed platforms (Fly.io, Modal, Railway) let you autoscale on custom metrics by hooking up Prometheus or by polling an HTTP endpoint. The metric to expose: in_flight for API replicas, queue_depth for worker replicas. Wire them into your platform's autoscaler, set the thresholds, ship it.

3. Graceful degradation: the three failures you'll see

Three production failure modes happen often enough that you should plan for them.

Model provider outage or 5xx storm. Anthropic or OpenAI returns errors for 10 minutes. Your agent can't run. The right behavior: serve a clear error to users ("model temporarily unavailable, retry in a moment") rather than letting requests time out. The implementation: a circuit breaker that trips when error rate >50% for >60 seconds and fails fast for the next 5 minutes, with periodic test pings to detect recovery.

# app/circuit.py
class CircuitBreaker:
    def __init__(self, threshold=0.5, window=60, cooldown=300):
        self.threshold = threshold
        self.window = window
        self.cooldown = cooldown
        self.results: list[tuple[float, bool]] = []   # (timestamp, ok)
        self.tripped_until = 0

    def record(self, ok: bool):
        now = time.time()
        self.results.append((now, ok))
        self.results = [r for r in self.results if r[0] > now - self.window]
        if len(self.results) >= 10:
            error_rate = 1 - sum(ok for _, ok in self.results) / len(self.results)
            if error_rate > self.threshold:
                self.tripped_until = now + self.cooldown

    def is_open(self) -> bool:
        return time.time() < self.tripped_until

Fallback to a cheaper or different model. When the circuit is tripped on your primary, sometimes you can fall back to a secondary — Sonnet → Haiku, GPT-5.5 → GPT-5-mini, or a different provider entirely. The fallback is usually slower or less capable, but it's a real answer instead of an error. Two cautions: the fallback model needs its own circuit breaker (you don't want both providers down to take you down forever), and your evals (chapter 1.4, chapter 3.1) should include the fallback paths so you know what quality looks like there.

Runaway costs. A buggy prompt change causes the agent to call search_docs 50 times per run instead of 2. You discover this from the cost dashboard the next morning, after $4,000 in spend. The defense: per-user, per-run cost ceilings enforced in the loop itself.

# In the agent loop, after each model call:
run_cost += response.usage.input_tokens * INPUT_PRICE / 1e6
run_cost += response.usage.output_tokens * OUTPUT_PRICE / 1e6

if run_cost > MAX_COST_PER_RUN:
    log.warn("run_cost_exceeded", run_id=run_id, cost=run_cost)
    yield {"type": "error",
           "payload": {"reason": "cost_budget_exceeded"}}
    return

# Also check daily user budget
daily = await redis.incrbyfloat(f"cost:user:{user_id}:{today}", run_cost)
if daily > MAX_COST_PER_USER_PER_DAY:
    yield {"type": "error",
           "payload": {"reason": "daily_budget_exceeded"}}
    return

Sensible defaults: $1 per run for end-user agents, $20 per user per day, with explicit overrides for trusted accounts. The ceilings are belt-and-suspenders to the budgets in chapter 3.1 — those gate shipping a regression, these gate spending on one that snuck through.

4. The runbook

Last piece, often skipped: write down what to do when things break. A page or two of plain text living next to your deploy config. The minimum that's useful:

  • How to deploy. The exact commands. Include both the normal path and the emergency rollback path.
  • How to see what's happening. Where the dashboards live. Where the logs are. The three queries you run when paged.
  • How to make it stop. The kill switch (chapter 2.3 — the flag that halts every run). The cost ceiling toggle. The model provider switch.
  • The five most likely failure modes and the first thing to check for each. Tied to your observability dashboards from chapter 2.1. "Latency p95 spiking → check semaphore saturation on /health; if >90%, scale up." "Cost spike → check which tool's call count grew; check for prompt regression in last 24h of PRs."

The runbook is the thing that lets a different engineer (or future-you at 3am) handle an incident competently. Write it the day after you ship, not when you need it.

Question
My platform doesn't expose custom autoscaling metrics. Workarounds?

Three options. Concurrent connection count (most platforms expose this) is a decent proxy for in-flight runs if you're not doing weird things. HTTP request rate over a 1-minute window is a coarse but usable signal. External-driven scaling: a small cron job that polls your metrics endpoint and calls the platform's "set replica count" API. Less elegant than built-in autoscaling but works on every platform.

If you find yourself building elaborate workarounds: that's a signal you've outgrown your platform. Consider moving to one with first-class autoscaling on custom metrics (Modal, Kubernetes with HPA, Fly.io with their autoscaling beta).

Question
What about secrets management for API keys?

For a starting deployment: platform-native secrets (Fly Secrets, Railway environment variables, Kubernetes Secrets). Don't put API keys in the Docker image, the Git repo, or the runtime command line. The platform's mechanism is good enough.

Reach for HashiCorp Vault, AWS Secrets Manager, or similar when (a) you have multiple environments with rotation requirements, (b) you need audit logs of who accessed what secret, or (c) compliance requires it. For most teams shipping their first agent, platform-native is fine and one fewer thing to set up.

Question
When do I split the API server and worker into separate services?

When their scaling profiles diverge meaningfully. Concretely: the API server scales on connection count (handles many short requests), the worker scales on queue depth (handles fewer long jobs). If you're routinely scaling one without the other, splitting them stops you from over-provisioning the unused side. The marker to watch: are you ever running more API replicas than you need just to get worker capacity (or vice versa)? If yes, split.

For the first 6 months of a service: one container with both. After that: usually split. The cost of splitting is low (two Dockerfiles instead of one, two deployment configs); the cost of not splitting is over-provisioning.

WORKED EXAMPLE

Your first deploy in 30 minutes: Fly.io, end to end.

Patterns are easier to internalize after you've shipped one. This walkthrough takes the Pattern A endpoint from Step 2, deploys it to Fly.io with a Redis instance attached, and ends with a working URL you can curl. Fly.io specifically because it's the lowest-overhead path from localhost to production-ish: no Kubernetes, no AWS console, two commands. The principles map cleanly to Railway, Render, Modal, or AWS App Runner if you prefer those — the only differences are syntax.

Assumptions: you have the Pattern A code from Step 2 in a directory, a working requirements.txt, and an ANTHROPIC_API_KEY (or OPENAI_API_KEY) in your environment.

Minute 0–5: project structure

If you don't already have this shape, set it up:

my-agent/
├── app/
│   ├── __init__.py
│   ├── main.py            # the FastAPI app from Step 2
│   └── agent/
│       ├── __init__.py
│       └── streaming.py   # run_agent_streaming() from Step 2
├── requirements.txt
├── Dockerfile
└── fly.toml               # we'll create this next

The requirements.txt for the minimum viable deploy:

fastapi==0.115.0
uvicorn[standard]==0.32.0
sse-starlette==2.1.3
anthropic==0.40.0
pydantic==2.9.0

The Dockerfile from Step 2 with one addition — a healthcheck binary path so Fly's prober doesn't false-positive during slow starts:

# Dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

ENV PYTHONUNBUFFERED=1
EXPOSE 8000

CMD ["uvicorn", "app.main:app",
     "--host", "0.0.0.0",
     "--port", "8000",
     "--workers", "2",
     "--timeout-keep-alive", "120"]

Minute 5–10: Fly setup and first launch

Install the CLI if you don't have it: curl -L https://fly.io/install.sh | sh. Then sign up and log in:

$ fly auth signup           # or: fly auth login
$ fly launch --no-deploy    # creates fly.toml interactively

The CLI asks a few questions. Sensible answers for a first deploy:

  • App name: something globally unique like my-agent-yourname.
  • Region: closest to your users.
  • Postgres / Redis / Sentry: say No to all for now — we'll add Redis explicitly in a minute.
  • Deploy now: No (we want to set secrets first).

The result is a fly.toml file. Open it and add an internal port and a healthcheck section — the defaults are close, these tweaks are what make it work for streaming:

# fly.toml
app = "my-agent-yourname"
primary_region = "sjc"

[build]

[http_service]
  internal_port = 8000
  force_https = true
  auto_stop_machines = "stop"     # save $ when idle
  auto_start_machines = true
  min_machines_running = 1       # at least one warm replica

  [[http_service.checks]]
    interval = "30s"
    timeout = "5s"
    grace_period = "10s"
    method = "GET"
    path = "/health"

[[vm]]
  size = "shared-cpu-1x"
  memory = "512mb"           # enough for agent + SDK + asyncio

Minute 10–15: secrets and Redis

Set your API key as a Fly secret (encrypted at rest, injected as env var at runtime):

$ fly secrets set ANTHROPIC_API_KEY=sk-ant-...
$ fly secrets set OPENAI_API_KEY=sk-...           # if also using OpenAI

Add Redis via Upstash (managed, free tier sufficient for starting):

$ fly redis create
  # CLI walks you through region selection, naming
  # On success it sets REDIS_URL as a secret automatically
$ fly redis status <your-redis-name>
  # Verify it's running

Your app code reads os.environ["REDIS_URL"] — Fly has injected it as a secret. Nothing else to configure.

Minute 15–20: deploy

$ fly deploy

==> Verifying app config
==> Building image
    Image: registry.fly.io/my-agent-yourname:deployment-...
    Image size: 187 MB
==> Creating release
==> Monitoring deployment
    1 desired, 1 placed, 1 healthy, 0 unhealthy
    --> v1 deployed successfully

Visit your newly deployed app at https://my-agent-yourname.fly.dev/

Your app is now public at https://my-agent-yourname.fly.dev/. Health check first:

$ curl https://my-agent-yourname.fly.dev/health
{"ok": true, "in_flight": 0}

Now the real test — the streaming endpoint:

$ curl -N -X POST https://my-agent-yourname.fly.dev/chat/stream \
    -H "Content-Type: application/json" \
    -d '{"message": "what is 2 + 2?", "user_id": "test"}'

event: start
data: {"run_id": "8c1e..."}

event: status
data: {"message": "thinking..."}

event: token
data: {"text": "2"}

event: token
data: {"text": " + 2"}

event: token
data: {"text": " equals 4"}

event: done
data: 

The -N flag on curl disables buffering so you can see tokens stream in real time. Watching the events arrive one at a time is the visceral confirmation that streaming is actually working end-to-end.

Minute 20–25: observability sanity check

Fly gives you logs and basic metrics built in. The commands you'll use most:

$ fly logs                          # tail logs from all replicas
$ fly logs --instance <machine-id>   # tail one replica
$ fly status                         # replica health, memory, CPU
$ fly dashboard                      # opens web UI in browser

If you wired up chapter 2.1 observability (OpenTelemetry → Langfuse), now is when you confirm traces are reaching your dashboard. Send a few requests, open Langfuse, see them appear with correct span trees. If they don't, the OTLP exporter URL or auth header is wrong — check fly logs for the error.

Minute 25–30: scale and lock it down

You're live but on a single replica with no scaling and no rate limits. Three commands away from production-readier:

# Autoscale to 1-3 replicas based on connection count.
$ fly autoscale set min=1 max=3 \
    --metric concurrent_connections=20

# Add a simple rate limit at the Fly edge (10 req/min per IP).
# Edit fly.toml:
[http_service.concurrency]
  type = "requests"
  hard_limit = 25
  soft_limit = 20

# Cost ceiling at the platform level — set a monthly budget alert.
# In the web dashboard: Organization → Billing → Spend Alert.
# Set to $50/mo so an accidental loop doesn't bankrupt you.

Redeploy with fly deploy. You now have an agent that:

  • Serves over HTTPS at a stable URL with a managed cert.
  • Streams tokens to clients via SSE in real time.
  • Has a healthcheck and auto-restarts on crash.
  • Scales between 1 and 3 replicas based on load.
  • Stops idle machines to save money during quiet hours.
  • Has Redis attached for the Pattern B upgrade when you need it.
  • Logs are tailable and queryable.
  • Has a spend alert so a runaway loop won't surprise you.

What this isn't yet

To be honest about what you've shipped: this is "production-ish," not "production." Specifically, you haven't yet wired up Pattern B (no worker process; long runs would still hit HTTP timeouts), the state-store split (no Postgres; conversations don't persist across restarts), or the circuit breaker (if Anthropic has a 5xx storm, your users see errors with no graceful degradation). Those are the next deployment milestones; the 30-minute version is the floor, not the ceiling.

What you do have is the substrate: a real URL serving real traffic, with the right shape to grow. Pattern B becomes "add a worker process to fly.toml as a second machine group." The state-store split becomes "add Fly Postgres, write the schema migration." The circuit breaker is a few hundred lines added to the agent code. None of these require re-architecting; they layer onto what you have.

Mapping to other platforms

If you're not on Fly, the same workflow translates cleanly:

  • Railway: push to GitHub, click "Deploy from repo," set env vars in the dashboard. Add Redis as an addon. Same Dockerfile works.
  • Render: create a Web Service from your repo. Add a Redis instance. Set env vars. Same Dockerfile.
  • Modal: wrap your FastAPI app in @app.asgi_app(), deploy with modal deploy. Redis via Upstash.
  • AWS App Runner: push your image to ECR, create an App Runner service pointed at it. Redis via ElastiCache.
  • Kubernetes: if you have a cluster, a basic Deployment + Service + ConfigMap will do it. Don't reach for k8s just for this; it's overkill until you have multiple services or specific networking needs.

The economic baseline for any of these: a small agent with moderate traffic costs $5–20 per month in compute on Fly (free tier eats most of it), $10–30 on Railway or Render, and however much your model provider bills you for actual usage. The model bill dominates by 10–100×. Don't over-optimize platform spend.

End of chapter 2.4

Deliverable

A deployed agent on a cloud platform of your choice, serving real traffic with the production patterns that matter: SSE streaming endpoint for short runs, queue + worker pair for long runs, Postgres-and-Redis state split, autoscaling on signals that actually move, circuit breakers and per-user cost ceilings against the failure modes you'll actually hit, and a runbook your future self can read at 3am. With chapter 2.1 observability and chapter 2.3 safety in place, this is the substrate an agent ships on.

  • FastAPI + sse-starlette streaming endpoint with semaphore-limited concurrency
  • Async agent loop with token streaming and tool_start/tool_end events
  • Client disconnect detection (request.is_disconnected) to stop runaway runs
  • Submit/poll endpoint pair backed by Redis Streams + consumer groups
  • Worker process with SIGTERM handling and at-least-once delivery
  • SSE resumption via Last-Event-ID for flaky-network resilience
  • Postgres for conversation truth; Redis cache with invalidate-on-write
  • Autoscaling on semaphore saturation (API) and queue depth (workers)
  • Circuit breaker for model provider outages; fallback model wired up
  • Per-run and per-user-per-day cost ceilings enforced in the loop
  • Runbook covering deploy, observe, kill-switch, and top-5 failure modes