让它可控、可并行、可中断。
现在添加在第一阶段刻意跳过的功能。每一项都针对你在追踪(trace)中观察到的具体失败模式——而非假设性问题。并行工具调用(parallel tool calls)、用于上下文隔离(context isolation)的子智能体(subagent)、规划、以及中断。
启用并行工具调用。
回顾你第一阶段的追踪记录。智能体(智能体)有多少次在搜索明显独立的内容时仍按顺序执行?"查找关于 X 的文档"→ 等待 → "查找关于 Y 的文档"→ 等待。每次等待都是一次到模型的往返加上一次到工具的往返。并行工具调用(parallel tool calls)将总延迟(latency)压缩为一次往返。
两套 API 都支持每轮多次工具调用——模型在单次响应中返回多个 tool_use(或 function_call)块。你的循环只需并发执行这些调用,并将每个结果与正确的调用 ID 配对。
变更:异步执行
我们使用 Python 的 asyncio 并发运行工具处理程序。处理程序本身不必是异步的——我们用 asyncio.to_thread 包装同步函数。
# agent/loop.py — parallel-capable import asyncio from anthropic import AsyncAnthropic client = AsyncAnthropic() async def run_one_tool(block): try: result = await asyncio.to_thread( HANDLERS[block.name], **block.input ) return { "type": "tool_result", "tool_use_id": block.id, "content": str(result), } except Exception as e: return { "type": "tool_result", "tool_use_id": block.id, "content": f"error: {e}", "is_error": True, } async def run_agent(user_query, max_steps=10): messages = [{"role": "user", "content": user_query}] for step in range(max_steps): response = await client.messages.create( model="claude-sonnet-4-5", max_tokens=4096, system=SYSTEM_PROMPT, tools=TOOLS, messages=messages, ) messages.append({"role": "assistant", "content": response.content}) tool_blocks = [b for b in response.content if b.type == "tool_use"] if not tool_blocks: return {"status": "halted"} # Check for submit_answer first (don't run others) for b in tool_blocks: if b.name == "submit_answer": return {"status": "answered", **b.input} # PARALLEL: run all tool blocks at once results = await asyncio.gather( *[run_one_tool(b) for b in tool_blocks] ) messages.append({"role": "user", "content": results}) return {"status": "step_limit"}
# agent/loop.py — parallel-capable import asyncio, json from openai import AsyncOpenAI client = AsyncOpenAI() async def run_one_tool(call): args = json.loads(call.arguments) try: result = await asyncio.to_thread( HANDLERS[call.name], **args ) output = str(result) except Exception as e: output = f"error: {e}" return { "type": "function_call_output", "call_id": call.call_id, "output": output, } async def run_agent(user_query, max_steps=10): input_items = [{"role": "user", "content": user_query}] for step in range(max_steps): response = await client.responses.create( model="gpt-5.5", instructions=SYSTEM_PROMPT, tools=TOOLS, input=input_items, ) for item in response.output: input_items.append(item.model_dump()) calls = [i for i in response.output if i.type == "function_call"] if not calls: return {"status": "halted"} for call in calls: if call.name == "submit_answer": return {"status": "answered", **json.loads(call.arguments)} # PARALLEL: run all calls at once results = await asyncio.gather( *[run_one_tool(c) for c in calls] ) input_items.extend(results) return {"status": "step_limit"}
观察行为变化
用第一阶段的多跳问题重新运行。模型现在知道它可以扇出(API 会告知它并行调用可用),因此它往往会自动利用这一能力。
$ python scripts/run.py "How does PgBouncer transaction pooling
interact with prepared statements?"
──────────────────── Step 0 ────────────────────
┌─ thinking ─────────────────────────────────────┐
│ I need info on two separate topics. I'll │
│ search for both in parallel. │
└────────────────────────────────────────────────┘
→ search_docs({'query': 'PgBouncer transaction pooling'})
→ search_docs({'query': 'prepared statements PostgreSQL'})
[both run concurrently, ~180ms total]
──────────────────── Step 1 ────────────────────
→ fetch_doc({'chunk_id': 'pgbouncer-modes::1'})
→ fetch_doc({'chunk_id': 'sql-prepare::0'})
[both run concurrently, ~40ms total]
──────────────────── Step 2 ────────────────────
→ submit_answer({...})
status: answered (3 steps)
5 步降至 3 步。完成了相同的工作,却只用了一半的往返次数。对于每次调查要进行 20–50 次工具调用的生产级智能体而言,并行化能大幅压缩运行时间——对于检索(retrieval)密集型工作流,往往能提速 3–5 倍。
代价:处理程序必须可以安全地并发执行。search_docs 和 fetch_doc 都是纯读操作——没问题。如果你有一个会写入共享文件的 save_note 工具,则需要加锁。
两个模型在调用明显独立(不同主题的不同搜索)时都会并行化。对于模棱两可的情况,你可以在系统提示词中加入这样的引导:"当你需要查询多个不相关主题的信息时,请并行搜索,而非顺序执行。"
不要过度提示并行化——有时顺序执行才是正确的。"先搜索 X,再根据结果搜索 Y。"是一个合理的计划,并行化反而会破坏它。
asyncio.to_thread 用错了吗?能用,但不是最优解。to_thread 将同步函数移到工作线程执行,对于阻塞 I/O 来说是可行的,但每个调用都会消耗一个线程。如果你有原生异步处理程序(如 httpx.AsyncClient),应直接用 await 调用它们。
对于第一阶段的子字符串扫描处理程序,to_thread 是正确的,因为它们是受 CPU 限制的单线程 Python 代码。
添加用于上下文隔离的子智能体。
这是现代智能体设计中最重要的模式,也是最难做好的。问题在于:主智能体的上下文窗口(context window)十分宝贵。一旦填满原始搜索结果、抓取的文档和冗长的工具输出,质量就会崩溃。模型会丢失原始问题的线索,难以厘清事实,并开始重复自身。
解决方案:将研究任务委托给子智能体。子智能体有自己独立的上下文窗口。它针对一项具体任务进行深度研究,积累所需的中间状态,然后将压缩摘要返回给父智能体。父智能体永远不会接触原始工作内容。
这一模式正是 Claude 的 Research 功能、Perplexity 的深度研究以及 OpenAI 研究智能体底层的工作方式。
实现:子智能体只是递归
添加一个新工具:spawn_subagent(task, focus)。处理程序以该任务为查询递归调用 run_agent,但使用不同的系统提示词,并移除 spawn 工具(以防止无限递归)。
# agent/tools.py — add to TOOLS list { "name": "spawn_subagent", "description": ( "Delegate a focused research task to a subagent. " "The subagent has its own context window, runs its " "own retrieval, and returns a concise summary with " "citations. Use when researching multiple topics " "that would each require many sources." ), "input_schema": { # or "parameters" for OpenAI "type": "object", "properties": { "task": {"type": "string", "description": "the question to research"}, "focus": {"type": "string", "description": "what aspect matters most"}, }, "required": ["task"], }, }
# agent/subagent.py SUBAGENT_PROMPT = """You are a research subagent. Your task: {task} Focus on: {focus} You have search_docs, fetch_doc, and submit_answer. You cannot spawn further subagents. Return a summary under 1500 tokens with citations. Be dense — the parent agent only sees what you submit.""" async def spawn_subagent(task: str, focus: str = "") -> dict: # Same loop, different prompt, restricted tools sub_tools = [t for t in TOOLS if t.get("name") != "spawn_subagent"] sub_prompt = SUBAGENT_PROMPT.format(task=task, focus=focus) result = await run_agent( user_query=task, system_prompt=sub_prompt, tools=sub_tools, max_steps=15, ) return { "summary": result.get("answer", ""), "citations": result.get("citations", []), "steps_used": result.get("steps_used", 0), }
来看看比较查询的效果
$ python scripts/run.py "Compare PgBouncer modes for use with
prepared statements and ORM features"
──────────────────── Step 0 ────────────────────
┌─ thinking ─────────────────────────────────────┐
│ This needs deep research on (1) PgBouncer │
│ modes and (2) prepared statements / ORM │
│ implications. I'll spawn two subagents in │
│ parallel to keep my context lean. │
└────────────────────────────────────────────────┘
→ spawn_subagent(task='Explain all three PgBouncer
pool modes in detail',
focus='which features each mode breaks')
→ spawn_subagent(task='How do prepared statements
and ORM features interact with connection pooling',
focus='session vs transaction scoping')
[each subagent runs its own ~10-step investigation
in its own context window, in parallel]
──────────────────── Step 1 ────────────────────
subagent A returned: 1400-token summary citing
pgbouncer-modes::0, pgbouncer-modes::1,
pgbouncer-modes::2, pgbouncer-features::3
subagent B returned: 1200-token summary citing
sql-prepare::0, libpq-prepare::1,
pgbouncer-features::4
┌─ thinking ─────────────────────────────────────┐
│ I have both research summaries. Synthesizing. │
└────────────────────────────────────────────────┘
→ submit_answer({
'answer': 'PgBouncer offers three pool modes...
Session pooling is safe for all features but
limits concurrency. Transaction pooling breaks
prepared statements, SET LOCAL, advisory locks...
ORMs that rely on prepared statement caches
(SQLAlchemy, Django ORM) require session mode
or PREPARE workarounds...',
'citations': ['pgbouncer-modes::0', ...]
})
status: answered (2 steps, but subagents ran 18 internal steps)
主智能体只进行了 2 个可见步骤。但在底层,两个子智能体各自执行了 9 个步骤。总工作量是 20 次模型调用——然而主上下文窗口只接收了约 3000 个令牌(token)的摘要,而非 80,000 个令牌的原始研究内容。
如果没有子智能体,同样的调查要么会触碰上下文限制,要么因为模型被大量中间结果淹没而给出质量下降的答案。有了子智能体,主智能体始终保持敏锐,因为它只读取最终答案,而非工作草稿。
限制递归深度。子智能体的工具列表明确排除了 spawn_subagent。如果不这样做,一个行为异常的模型可能会生成生成子智能体的子智能体,你会在凌晨 3 点发现 API 额度耗尽时才意识到这一点。深度=1 对几乎所有场景都足够。
对于简单查询,生成子智能体只会增加开销——你为一次子智能体的编排和额外的模型调用付出成本,而单次搜索本可直接回答。智能体只应在任务具有能从隔离中受益的子结构时才生成子智能体:
- 比较两件或多件事物(每件事物对应一个子智能体)
- 需要围绕某一窄域主题进行大量工具调用的调查
- 中间结果体量庞大、需要压缩摘要的任务
你可以在系统提示词中明确这一点:"仅在研究各自需要 4 次以上工具调用的主题时使用子智能体。对于简单查询,直接搜索。"
并行工具调用让智能体在工具调用层面扇出。子智能体让它在调查层面扇出。这是两个不同层次的并行性,可以组合使用:
- 并行工具调用:一个模型决定"同时搜索 X、Y、Z",并自行读取全部三个结果。
- 子智能体:主模型将"研究 X"委托给自身的一个副本,该副本运行自己的多步骤调查,并进行自己的并行搜索。
两者都要用。子智能体内部使用并行工具调用。主智能体同时使用子智能体和并行工具调用。这种组合正是让深度研究智能体快速运转的关键。
添加规划步骤(用户可编辑)。
对于复杂查询——多跳、比较型、模糊型——在执行前生成一个计划会很有帮助。原因有二:其一,模型在预先确定调查方向后,往往能给出更好的结果;其二,用户可以在智能体消耗大量令牌走错方向之前编辑计划。这个编辑步骤正是"可控"在实践中的真正含义。
规划器:一次专注的模型调用
PLANNER_PROMPT = """You are planning an investigation.
Given a user's question, produce a numbered plan of
3-5 steps describing how an agent should research
the answer.
Each step should be one of:
- SEARCH: a specific keyword search
- COMPARE: comparing two things found via search
- VERIFY: confirming a claim with a second source
- SYNTHESIZE: combining findings into the answer
For simple lookups (one fact, one doc), output
exactly the word: SIMPLE
Output the plan or SIMPLE, nothing else."""
# agent/planner.py async def make_plan(query: str) -> list[str] | None: response = await client.messages.create( model="claude-sonnet-4-5", max_tokens=512, system=PLANNER_PROMPT, messages=[{"role": "user", "content": query}], ) text = response.content[0].text.strip() if text == "SIMPLE": return None return [line for line in text.split("\n") if line.strip()]
# agent/planner.py async def make_plan(query: str) -> list[str] | None: response = await client.responses.create( model="gpt-5.5", instructions=PLANNER_PROMPT, input=query, ) text = response.output_text.strip() if text == "SIMPLE": return None return [line for line in text.split("\n") if line.strip()]
用户编辑循环
async def run_with_plan(query: str): plan = await make_plan(query) if plan is None: # Simple query — skip planning return await run_agent(query) console.print("\n[bold]Proposed plan:[/]") for i, step in enumerate(plan, 1): console.print(f" {i}. {step}") console.print("\n[Enter] approve [e] edit [s] skip plan") choice = input("> ").strip().lower() if choice == "e": plan = edit_plan_interactive(plan) elif choice == "s": return await run_agent(query) plan_block = "\n".join(f"{i+1}. {s}" for i, s in enumerate(plan)) enriched = f"{query}\n\nFollow this plan:\n{plan_block}" return await run_agent(enriched)
实际效果
$ python scripts/run.py "Compare auth approaches in Postgres
for a web app, focused on changes since version 14"
Proposed plan:
1. SEARCH: authentication methods PostgreSQL pg_hba
2. SEARCH: scram-sha-256 vs md5 password auth
3. SEARCH: PostgreSQL 14 15 16 release notes auth
4. COMPARE: trade-offs of each method for web apps
5. SYNTHESIZE: recommendation with version caveats
[Enter] approve [e] edit [s] skip plan
> e
Editing plan. Current step 1: SEARCH: authentication...
[Enter] keep [r] replace [d] delete [a] add after
> r
New step 1: SEARCH: pg_hba.conf authentication methods
[Enter] keep [r] replace [d] delete [a] add after
> [Enter] (keeps step 2)
...
Approved plan:
1. SEARCH: pg_hba.conf authentication methods
2. SEARCH: scram-sha-256 vs md5 password auth
3. SEARCH: PostgreSQL 14 15 16 release notes auth
4. COMPARE: trade-offs for web apps
5. SYNTHESIZE: recommendation with version caveats
──────────────── starting agent ────────────────
→ search_docs({'query': 'pg_hba.conf authentication
methods'})
...
模型的初始计划是合理的,但真正熟悉 Postgres 的用户可能知道,搜索的具体术语应该是"pg_hba.conf",而非"authentication methods"。编辑第 1 步省去了智能体本来需要的 1–2 次后续搜索。
这就是可控性的调节把手。没有它,智能体的计划是个黑箱,你只能在它消耗 10 次模型调用之后才能批评。有了它,你可以在任何昂贵的工作发生之前修正方向。
这正是规划器对不需要计划的查询输出 SIMPLE 的原因。"Postgres 用哪个端口?"会得到 SIMPLE;"比较三个版本间的变化"则会生成计划。对于简单问题,用户永远不会看到计划。
对于非简单查询,计划审批步骤大约需要 10 秒,但在捕获到错误方向的调查时,能节省数十次 API 调用。除了最简单的情况(规划器会自动跳过)之外,这对所有人都是净收益。
让运行可中断、可恢复。
漫长的调查可能需要数分钟。你希望能够暂停运行、查看智能体目前的决策、编辑看起来有误的工具结果,然后从断点处恢复。这正是真正的智能体 SDK 所提供的能力。自己构建它,你才能真正理解哪些状态是关键的。
诀窍:在每次迭代后将消息持久化到磁盘。消息历史就是智能体的状态。只要有历史记录,就可以恢复。
# agent/state.py import json, uuid from pathlib import Path class RunState: def __init__(self, run_id: str = None): self.run_id = run_id or str(uuid.uuid4())[:8] self.path = Path(f"runs/{self.run_id}.jsonl") self.path.parent.mkdir(exist_ok=True) def checkpoint(self, step: int, history: list): with self.path.open("a") as f: f.write(json.dumps({ "step": step, "history": history, }, default=str) + "\n") def resume(self) -> list: if not self.path.exists(): return [] last = None with self.path.open() as f: for line in f: last = json.loads(line) return last["history"] if last else []
接入循环
async def run_agent(query, max_steps=10, run_id=None): state = RunState(run_id) history = state.resume() or [ {"role": "user", "content": query} ] start_step = (len(state.resume()) // 2) if state.resume() else 0 for step in range(start_step, max_steps): response = await client.messages.create(...) history.append({"role": "assistant", ...}) # ... dispatch tools, append results ... state.checkpoint(step, history) # persist every step return {"run_id": state.run_id, ...}
现在你可以做到的事
在运行途中停止(Ctrl-C)。稍后恢复:
$ python scripts/run.py "Compare auth methods..." # ... runs for a few steps, you hit Ctrl-C ... ^C Interrupted at step 3. Run ID: a4f7c2e1 $ python scripts/run.py --resume a4f7c2e1 # resumes from step 4, history intact
或者检查运行记录并编辑某个工具结果:
$ python scripts/inspect.py a4f7c2e1
Step 2: search_docs({'query': 'auth methods'})
→ returned 5 chunks, but the top one is wrong
→ press [e] to edit the result before resuming
> e
Editing tool_result at step 2...
[edit in your editor]
Resumed. Re-running from step 3 with edited result.
这是智能体"时间旅行调试"的基础。当第四阶段的评估(评估)发现某个失败查询时,你会想要检查智能体究竟在哪一步走错了方向,并尝试修复——不同的提示词、不同的工具结果、不同的参数——而无需重新运行失败点之前的昂贵步骤。
是的。Anthropic 的 Agent SDK 和 OpenAI 的 Agents SDK 都内置了运行状态持久化以及从任意检查点恢复的能力。你现在已经自己构建了该机制的最小化版本。
这样一来,当你日后采用某个 SDK 时,你会清楚地知道它的抽象层究竟在做什么——也会知道当它的设计选择不符合你的需求时,你该在哪里自行构建定制方案。
JSONL 是追加写入的,人类可读,并且能在各种崩溃场景中存活。你可以 cat 一个运行文件,grep 特定的工具调用,或者对比两次运行。SQLite 也可行,还能提供更方便的查询——当你有数百次运行需要跨运行分析时可以切换。
在当前规模下,JSONL 是正确的起点。
交付物
一个能够在复杂查询前进行规划、并行扇出搜索、在子智能体中隔离深度研究、并支持暂停与恢复的智能体。第一阶段相同的 10 个问题,应当呈现出质性不同(更快、更简洁)的行为。
- 异步并行工具执行
- 带深度限制的子智能体生成
- 带用户可编辑计划的规划器
- JSONL 检查点 + 从任意步骤恢复
- 追踪对比:测试集上第二阶段 vs 第三阶段