0.4
Part 0 / Foundations · The async patterns the rest of the guide assumes

异步 Python:每个代理循环中都会出现的模式。

本指南中的所有代码示例均使用异步 Python。async defawaitasyncio.gather——这些并非偶然。它们是智能体(agent)代码的形态,因为智能体的大部分时间都花在等待 I/O 上(模型调用、工具调用(tool calling)、网络请求),而异步让你无需线程即可获得并发能力。本章假定你能读懂异步 Python。其目标是提炼出对智能体代码尤为重要的模式——并行工具调度、超时、结构化取消、错误隔离、背压——以及那些在代码规模扩展时会让原本良好的代码踩坑的陷阱。本章有意比本部分其他章节更短:你已在示例中吸收了大多数模式。这里只是给它们命名。

STEP 1

为何智能体采用异步:I/O 等待模式。

一个真正在工作的智能体,其绝大部分挂钟时间都花在等待上。模型调用耗时 1–5 秒。工具调用耗时 100ms–2s。数据库查询、网页抓取、文件读取——都在等待。智能体的 CPU 工作(解析 JSON、校验模式(schema)、决定下一步做什么)只需毫秒级。运行时成本几乎全是 I/O 等待。

这正是异步所针对的场景。当一个操作在等待 I/O 时,事件循环可以运行其他操作。你并不能完成更多 CPU 工作——但你不再需要逐个阻塞等待。一个多工具智能体轮次,若顺序执行 5 次各 400ms 的工具调用需要 2 秒;同一智能体并发调度则只需约 400ms。总工作量相同,挂钟延迟(latency)却截然不同。

并发与并行的区别

有一个值得先厘清的混淆点。异步 Python 给你的是并发(多个任务互不阻塞地推进)而非并行(多个任务同时在多个 CPU 上执行)。GIL 依然存在;Python 代码在任意时刻只运行在一个线程上。

对于智能体来说,这完全没问题——你想要重叠的时间几乎全是 I/O 等待时间,实际工作发生在别人的服务器上(模型提供商的 GPU、数据库、远程 API)。你的 Python 进程只是在等待响应。等待期间,事件循环运行其他任务。并发模型与工作负载完美匹配。

并行(真正的多 CPU 执行)对智能体来说真正重要的场景很少:在进程内运行大型嵌入模型等繁重的本地 CPU 工作,或批量本地推理。遇到这些情况,才需要使用多进程或加速库。其他所有场景(工具调度、模型调用、检索(retrieval)、并行子智能体),异步并发都是正确的选择。

异步在具体数字上带来的收益

一个在单次轮次中执行以下操作的智能体:

  • 3 次网页搜索(各 400ms,可并行)
  • 5 次对返回 URL 的网页抓取(各 600ms,可并行)
  • 1 次合成模型调用(3 秒)

串行执行:0.4×3 + 0.6×5 + 3 = 7.2 秒。

使用异步并发执行:max(0.4, 0.4, 0.4) + max(0.6, 0.6, 0.6, 0.6, 0.6) + 3 = 0.4 + 0.6 + 3 = 4 秒。

这是在不改变任何底层工作、零额外计算成本的情况下,仅通过并发运行独立操作,就实现了 45% 的延迟降低。对于延迟直接影响用户体验的交互式智能体来说,这至关重要;对于运行数分钟的自主智能体来说,这种效果会在多次轮次中不断叠加。

本指南使用的三种异步模式

三种模式在本指南后续内容中反复出现。明确命名它们:

模式 1:并行工具调度。当模型在一次轮次中发出多个工具调用(tool calling)块时,通过 asyncio.gather 并发调度它们。第 0.3 章引入了此模式;第 2.2 章将其作为延迟杠杆;第 4.4 章将其推广到多智能体(multi-agent)协调。

模式 2:有界子智能体执行。当协调器生成子智能体时(第 4.3、4.4 章),每个子智能体并发运行并设有挂钟超时。asyncio.wait_for 限制单个运行时长;asyncio.gather 统一调度它们。

模式 3:流式传输与并发工作。智能体向用户流式传输输出(第 2.4 章)的同时继续执行后台工作——在生成响应的同时记录追踪(trace)、更新仪表盘、排队后续工具调用。异步让这些工作并行进行,而不会阻塞流式传输。

三者使用的是同一种底层技术(带适当错误处理的 asyncio.gather);区别在于它们协调的对象不同。

Question
如果异步只是并发而非并行,为何要用它而不是线程?

对于智能体代码来说有三个重要原因。第一,异步每个"任务"的开销极低——以协程方式运行 100 个并发操作比运行 5 个占用的额外内存微乎其微,而线程有真实的操作系统开销。第二,异步的可调试性是线程代码无法比拟的——事件循环在任意时刻都是顺序执行的,没有线程交错的不确定性,堆栈跟踪和日志读起来很自然。第三,异步与 Python 生态中用于 HTTP、数据库和 AI SDK 的库无缝集成——你在智能体中会用到的所有库(anthropic、openai、httpx、asyncpg)都有原生异步 API。

线程的适用场景主要是历史遗留代码或 CPU 密集型工作。对于代理循环(agent loop),异步是当之无愧的默认选择。

Question
同一个智能体中能否混用同步和异步代码?

可以,但有摩擦。异步函数内部的同步调用会阻塞事件循环——同步调用运行期间,其他协程无法推进。对于同时处理大量并发事务的智能体来说,这是严重问题;一次 200ms 的同步数据库调用会让你所有 10 个并发子智能体冻结 200ms。

解决方法:将同步调用包装在 asyncio.to_thread 中,让它们在线程池中运行,而不阻塞事件循环。await asyncio.to_thread(slow_sync_func, args) 会返回一个可等待对象。对于无法替换的同步库以及真正 CPU 密集型的工作,使用此方法。

不要随意将同步调用混入异步代码而不做此包装——症状很隐蔽(神秘的延迟,平时正常、高负载时才出问题),但一旦知道问题所在,修复就是机械性的操作。

STEP 2

每个智能体中都会出现的四种模式。

四种异步模式几乎出现在所有生产级智能体代码库中。每种模式都很简短;每种模式都有在实际发布时不可忽视的微妙之处。

模式 1:使用 asyncio.gather 并行调度

主力模式。asyncio.gather 接收 N 个协程并发运行,在所有协程完成(或某个协程抛出异常)时返回。

# The pattern from chapter 0.3, expanded

async def dispatch_tools(tool_use_blocks):
    # Build a list of coroutines (not running them yet)
    coros = [run_one_tool(block) for block in tool_use_blocks]
    # gather runs them concurrently; returns when all finish
    results = await asyncio.gather(*coros)
    return results

async def run_one_tool(block):
    return await HANDLERS[block.name](**block.input)

三个微妙之处:

列表推导式创建的是协程,不是结果。[run_one_tool(b) for b in blocks] 对每个 run_one_tool(b) 表达式求值,返回协程对象但不执行。执行发生在 gather 内部。这是正确的;也是有人误以为"列表推导式已经并行运行了它们"的困惑来源。

默认情况下,gather 在第一个异常时抛出。如果某个工具失败,其他工具会被取消,异常向上传播。这通常不是你想要的——一般你希望所有工具都完成(或失败),这样智能体才能逐个处理结果。

return_exceptions=True 标志改变了这一行为。asyncio.gather(*coros, return_exceptions=True) 在同一个列表中返回结果或异常对象。异常不再传播;你按每个结果单独处理。对于工具调度来说,这几乎总是你想要的——一个工具失败不应该杀死其他工具。

# The production version

async def dispatch_tools_safe(tool_use_blocks):
    coros = [run_one_tool(block) for block in tool_use_blocks]
    results = await asyncio.gather(*coros, return_exceptions=True)

    # results is a list where some elements may be Exception objects
    tool_results = []
    for block, result in zip(tool_use_blocks, results):
        if isinstance(result, Exception):
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": f"Error: {result}",
                "is_error": True,
            })
        else:
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": result,
            })
    return tool_results

这是每个生产级智能体工具调度器的形态。变体是次要的——不同的错误格式化、不同的结果后处理——但核心模式(带异常捕获的 gather、逐结果处理)在各类智能体中保持稳定。

模式 2:使用 asyncio.wait_for 设置超时

任何 I/O 操作都应该有超时。模型调用、工具调用、网络请求——都可能挂起。没有超时,你的智能体会永远等待;有了超时,你得到干净的取消和清晰的错误。

# Wrap any awaitable with a timeout

async def run_with_timeout(coro, timeout_s: float):
    try:
        return await asyncio.wait_for(coro, timeout=timeout_s)
    except asyncio.TimeoutError:
        return {"status": "timeout", "elapsed_s": timeout_s}

# Applied to a tool call
result = await run_with_timeout(
    HANDLERS["slow_web_fetch"](url=long_url),
    timeout_s=10.0,
)

选择超时时长的三条经验法则:

  • 模型调用:60–120 秒。一些具有扩展思考能力的模型在处理难题时可能运行超过一分钟。将超时设置为预期 P99 延迟的 2 倍。
  • 网页抓取和外部 API:10–30 秒。合理的请求很快就会响应;超过这个时间说明请求出了问题。
  • 数据库/内部服务:1–5 秒。内部基础设施应该很快。

超时会级联:智能体的总运行超时应大于所有单步超时之和(加上一些余量)。如果单次模型调用最多需要 60 秒,而你的智能体最多执行 10 次,那么总预算应至少为 700–800 秒,且需将重试因素纳入考虑。

超时触发时会发生什么:被等待的操作在执行中途被取消。对于纯异步代码(网络请求、asyncio.sleep),这是干净的——操作停止。对于将同步工作包装在 to_thread 中的操作,取消更复杂:线程继续运行,取消只在线程完成时才生效。请据此规划——不要依赖超时来真正停止同步工作,只能依靠它停止你的代码等待该工作。

模式 3:使用 TaskGroup 实现结构化并发

Python 3.11+ 新增了 asyncio.TaskGroup,作为许多场景下原始 gather 的现代替代品。它强制执行更严格的约束:在 async with 块退出前,组内所有任务都保证完成(或被取消)。

async def run_subagents(task_specs: list):
    results = {}
    async with asyncio.TaskGroup() as tg:
        for spec in task_specs:
            task = tg.create_task(run_subagent(spec))
            results[spec.id] = task

    # Outside the `async with`, every task is guaranteed done
    return {sid: task.result() for sid, task in results.items()}

TaskGroup 相对于 gather 的优势:异常会被聚合为 ExceptionGroup,而不是悄悄杀死兄弟任务。如果两个子智能体失败,你能看到两个异常,而不只是第一个。结构化并发约束也让生命周期更清晰——你不可能因为忘记等待而意外泄漏任务。

对于 Python 3.11+ 上的新智能体代码,优先使用 TaskGroup 而非原始 gather。对于需要兼容 3.10 的代码库,带 return_exceptions=True 的原始 gather 仍然是正确模式。

模式 4:流式传输与后台工作并发执行

第 2.4 章中的流式传输模式:在令牌(token)到达时将其推送给客户端,同时并发执行其他工作(日志记录、可观测性(observability)、后续调度)。

async def stream_with_logging(messages):
    # Start the model stream
    async with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=4096,
        messages=messages,
    ) as stream:
        full_text = []
        async for text in stream.text_stream:
            full_text.append(text)
            # Yield to client as tokens arrive
            yield {"type": "token", "text": text}

        # After stream completes, kick off background work
        final = await stream.get_final_message()

        # asyncio.create_task launches without awaiting — the background
        # work runs after the stream caller finishes consuming events
        asyncio.create_task(log_run_async(
            messages=messages,
            response=final,
            tokens=len("".join(full_text)),
        ))

        yield {"type": "done", "usage": final.usage}

asyncio.create_task 模式让你能够启动不需要阻塞响应的工作。常见用途:写入可观测性跨度(span)、更新仪表盘、排队后续操作。调用方无需等待日志写完;日志记录按事件循环自己的节奏进行。

陷阱:create_task 创建的任务,如果事件循环在其完成前结束,可能被悄悄丢弃。对于必须持久化的关键后台工作(如必须留存的审计日志),在退出智能体上下文前显式 gather 它们,或使用结构化并发模式。对于尽力而为的工作(比较方便有但非必须的仪表盘更新),即发即忘是可以的。

有一个每个团队迟早都会踩的具体 bug:任务必须持有引用才能在垃圾回收中存活。如果你写了 asyncio.create_task(my_coro()) 却不保存返回值,Python 可能在任务运行之前就将其垃圾回收,且不报任何错误。解决方法:将任务保存在你持有的列表或集合中。推荐模式是 background_tasks.add(task) 配合 task.add_done_callback(background_tasks.discard)

Question
我应该用信号量来限制并发,还是 gather 就够了?

对于单次轮次内的工具调度(5–10 个并发操作),原始 gather 就足够了——你下游服务的速率限制通常能轻松承受这种并发量。对于更大规模的扇出(针对有速率限制的 API 进行 50+ 个并发操作),你需要信号量来限制并发数量。

模式如下:

sem = asyncio.Semaphore(10)  # max 10 concurrent

async def bounded(coro):
    async with sem:
        return await coro

results = await asyncio.gather(*[bounded(c) for c in coros])

信号量限制任意时刻实际运行的操作数量;其余操作等待。当你的下游有速率限制且否则会被打穿时,使用此方法。

Question
asyncio.run 与手动创建事件循环,什么时候有区别?

对大多数智能体代码来说:在入口点使用 asyncio.run(main()),其他什么都不用做。它会创建事件循环、运行你的协程、清理资源。这是正确的默认选择。

手动管理循环(asyncio.get_event_loop()loop.run_until_complete)是旧版 API,有微妙的坑。只在将异步代码集成到拥有自己事件循环的同步框架(某些 Web 框架、某些测试环境)时才需要用到。否则,asyncio.run 就足够了。

STEP 3

每个团队都会踩的五个陷阱。

异步 Python 有一些众所周知的陷阱——那些能通过代码审查、在测试中正常运行、却在生产负载下崩溃的模式。其中五个对智能体代码的影响尤为频繁,值得明确命名。

陷阱 1:忘记 await

错误:调用异步函数却不等待其结果。

# Bug — calls the function but doesn't wait for the result
result = run_subagent(spec)        # result is a coroutine, not the answer
process(result)                    # processing a coroutine — bug

# Correct
result = await run_subagent(spec)  # result is the answer
process(result)

现代类型检查器(mypy、pyright)和 IDE 警告大多数时候都能捕获到这个问题。没有它们,失败表现为一个容易忽略的 RuntimeWarning,以及下游代码处理协程对象而非实际结果的问题。请启用类型检查——在异步代码中,这不是可选项。

陷阱 2:把本应并行的操作串行化

看起来并行实际上是串行:

# Bug — these run sequentially even though they look parallel
results = []
for spec in task_specs:
    result = await run_subagent(spec)   # awaits each before continuing
    results.append(result)

# Correct — actually concurrent
results = await asyncio.gather(*[
    run_subagent(spec) for spec in task_specs
])

第一个版本是异步但串行的——每次 await 都会在进入下一次迭代前阻塞。你承受了异步的所有开销,却没有得到任何并发收益。第二个版本并发调度所有工作。5 个子智能体各需 3 秒时的挂钟差异:15 秒 vs 约 3 秒。

需要内化的模式:如果工作是独立的,就批量放入 gather;如果是顺序的,就按顺序 await。不要因为在循环内 await 而意外串行化了独立的工作。

陷阱 3:用同步工作阻塞事件循环

异步函数内部的同步调用会阻塞整个事件循环:

async def process_document(doc_path: str):
    # Bug — open() and read() are sync; they block the event loop
    with open(doc_path) as f:
        content = f.read()    # blocks; nothing else makes progress
    return await process_content_async(content)

# Better — wrap sync I/O to run in a thread
async def process_document(doc_path: str):
    content = await asyncio.to_thread(
        lambda: open(doc_path).read()
    )
    return await process_content_async(content)

# Best — use an async-native library (aiofiles, etc.)
async def process_document(doc_path: str):
    async with aiofiles.open(doc_path) as f:
        content = await f.read()
    return await process_content_async(content)

意外阻塞的症状:智能体延迟随并发负载线性增长(因为每个智能体的同步调用都会阻塞其他所有智能体),或在流量峰值时出现神秘的慢速。诊断方法:在事件循环迭代中加日志,观察长时间间隔。一旦定位到问题,修复是机械性的。

需要留意的常见同步调用:不使用 aiofiles 的文件 I/O、同步数据库库(用 psycopg2 而非 asyncpg)、处理大型文档的 CPU 密集型 JSON 解析(使用 aiojson 或移至线程),以及任何不是为异步设计的第三方库。

陷阱 4:取消实际上并未取消

你用 wait_for 包装了一个带超时的操作。超时触发了。你预期操作会停止。但它没有:

# The operation wraps sync work in a thread
async def slow_op():
    await asyncio.to_thread(very_long_sync_function)

# Timeout fires after 5s, but the thread keeps running
try:
    await asyncio.wait_for(slow_op(), timeout=5.0)
except asyncio.TimeoutError:
    pass
# The sync function is still running in the thread pool!

取消只对真正的异步操作有效。asyncio.sleep、通过异步库进行的网络 I/O 以及其他可等待对象都能干净地取消。在线程中运行的同步工作不行——Python 线程不能从外部安全地终止,所以线程会继续运行直到完成。你的异步代码在超时后返回了控制权,但底层工作仍在继续。

实际影响:不要依赖超时来真正停止同步工作,只能依靠它停止你的代码等待该工作。如果操作有外部副作用(写文件、调用 API),这些在超时后仍会发生。请据此规划——要么接受"超时"意味着"我停止等待,工作仍在继续",要么将工作迁移到真正的异步实现。

陷阱 5:即发即忘任务中被悄悄吞掉的异常

create_task 创建的任务,若抛出异常:该异常被任务对象捕获,但除非有人 await 该任务,否则异常永远不会重新抛出。如果你从不 await,异常就悄悄消失了。

# Bug — exceptions in this task disappear silently
asyncio.create_task(log_async(message))

# Better — at minimum, log the exception when it happens
def _log_exception(task):
    if task.exception():
        logger.error("Background task failed", exc_info=task.exception())

t = asyncio.create_task(log_async(message))
t.add_done_callback(_log_exception)

add_done_callback 模式至少能将失败暴露到你的日志中。对于关键后台工作,更好的修复是在退出智能体上下文前显式 await 该任务(或使用 TaskGroup,它会从已完成的任务中抛出异常)。"后台任务静默失败"模式是生产环境中许多"仪表盘怎么不更新了"谜团的根源。

STEP 4

进阶模式:取消、背压、异步迭代。

三种超出基础之外的模式,出现在成熟的智能体代码中。先浏览了解;当基础模式不够用时再来使用它们。

协作式取消:保护关键工作

当你的智能体运行被取消(超时触发、用户中止),你通常希望一切都能干净地停止。但有时,有些关键的清理工作必须完成,即使取消信号正在传播——刷写最终日志条目、释放锁、提交部分事务。

asyncio.shield 保护协程免受外部取消:

async def run_with_cleanup(work_coro):
    try:
        return await work_coro
    finally:
        # This cleanup runs even if work_coro was cancelled,
        # and shield protects the cleanup from also being cancelled
        await asyncio.shield(write_final_audit_log())

谨慎使用 shield——如果滥用,它可能显著延长取消延迟。正确的使用场景是简短、有界的必须完成的清理工作;其他所有情况,正常取消才是正确行为。

背压:下游缓慢时的受控流量

如果你的智能体产出输出的速度超过消费速度(网络无法足够快地传输流式响应),朴素的代码会在内存中积压。带有界大小的 asyncio.Queue 让生产者在消费者落后时等待:

async def produce_with_backpressure(queue: asyncio.Queue):
    async for chunk in stream.text_stream:
        # If queue is full (consumer slow), put() blocks here
        await queue.put(chunk)
    await queue.put(None)   # sentinel for end-of-stream

async def consume_with_pacing(queue: asyncio.Queue):
    while True:
        chunk = await queue.get()
        if chunk is None: break
        await send_to_client(chunk)

# Bounded queue applies backpressure when consumer is slow
q = asyncio.Queue(maxsize=10)
await asyncio.gather(produce_with_backpressure(q), consume_with_pacing(q))

此模式对于服务慢速客户端的流式智能体至关重要。没有背压,慢速客户端(或卡住的网络)会导致内存增长,因为你的生产者不断超前。有了背压,生产者自然会按消费者的速度自我调节。

异步迭代:流式智能体循环

第 2.4 章中的流式智能体模式使用异步生成器(async def + yield)在事件发生时实时发出:

async def stream_agent_events(messages):
    for step in range(MAX_STEPS):
        # Stream the model response, yielding tokens as they arrive
        async with client.messages.stream(...) as stream:
            async for text in stream.text_stream:
                yield {"type": "token", "text": text}

        final = await stream.get_final_message()
        if final.stop_reason != "tool_use":
            yield {"type": "done"}
            return

        # Tool dispatch, yielding progress events
        yield {"type": "tool_use", "blocks": get_tool_blocks(final)}
        results = await dispatch_tools_safe(get_tool_blocks(final))
        yield {"type": "tool_results", "results": results}

        messages.append(...)   # update conversation, loop

# Callers iterate
async for event in stream_agent_events(messages):
    handle_event(event)

异步生成器模式让流式智能体循环变得清晰。每次 yield 向调用方发出一个事件;调用方处理它;循环继续。生产者(智能体)和消费者(客户端渲染器)自然解耦,代码读起来是顺序的,尽管底层是异步运行的。

这是 Agent SDK 的 query() 函数所暴露的模式,是 Anthropic 流式端点所支持的模式,也是你自己的流式智能体应该采用的模式。一旦你需要管理任何面向用户的延迟,这就是对"返回最终结果"的有意义升级。

如果你从本章只记住一件事,记住在 asyncio.gather 上使用 return_exceptions=True。这是智能体中杠杆作用最高的单个异步模式——没有它,一个不稳定的工具可能会杀死整个轮次的已调度工作。有了它,你对每个结果都能独立处理。在所有调度独立工作的地方都应用它。

End of chapter 0.4

可交付成果

掌握每个智能体代码库中都会出现的异步 Python 模式的实用命令。使用 gather 进行并行调度(生产环境加上 return_exceptions=True),使用 wait_for 设置超时,在 3.11+ 上使用 TaskGroup 实现结构化并发,使用异步生成器实现流式传输。了解五个陷阱(缺少 await、意外串行化、阻塞事件循环、取消失效、任务静默失败)及其各自的诊断方法。将进阶模式(shielding、背压、异步迭代)作为基础模式不够用时的备选工具。

  • 工具调度使用 asyncio.gather(*coros, return_exceptions=True)
  • 每次模型调用、工具调用、网络请求都通过 asyncio.wait_for 设置超时
  • 单步超时逐层叠加,形成含重试余量的智能体总运行超时
  • Python 3.11+:新代码优先使用 TaskGroup;旧版本回退到 gather
  • 同步 I/O 包装在 asyncio.to_thread 中,或替换为原生异步库
  • 即发即忘任务在集合中持有引用以避免 GC;添加 done callback 以记录错误日志
  • 启用类型检查(mypy/pyright)以捕获缺少 await 等异步错误
  • 流式智能体循环使用 async def + yield;调用方使用 async for 迭代
  • 对消费者速度可变的流式管道使用有界队列实现背压
  • 了解超时取消的是你的等待,不一定是底层同步工作