2.1
第二部分 / 上线 · 让后续三章成为可能的章节

可观测性(Observability):每次运行都留下可读的追踪。

一个在生产环境中因不明原因失败的智能体(agent),就是一个无法上线的智能体。本章将每次模型调用、工具调用(tool calling)和检索(retrieval)接入一棵跨度树(span tree),遵循 OpenTelemetry GenAI 规范;搭建能在 asyncio.gather 中正常工作的结构化日志;并提供一个回放 CLI,接受追踪(trace)ID,用完全相同的工具结果在本地重现智能体运行过程。第一次在五分钟而不是五小时内定位一个生产故障时,你就会明白这一章的价值。

STEP 1

为什么 print() 会失效。

你一直在用散落于循环各处的 print() 语句调试智能体。在你是唯一用户、一次只跑一个请求、盯着终端滚屏的时候,这没问题。但一旦上述任何条件改变,它就会立刻失效。

让我带你过一个具体的故障模式,因为动机是实实在在的。你的智能体已部署。某用户提交了一个工单:"我问的是 Q3 发票,它却给了我 Q1 的内容。"你去查日志,看到的是:

[2026-03-14 10:42:13] INFO  request received: "show me Q3 invoices"
[2026-03-14 10:42:13] INFO  calling search_docs
[2026-03-14 10:42:14] INFO  search_docs returned 5 results
[2026-03-14 10:42:14] INFO  calling search_docs
[2026-03-14 10:42:15] INFO  search_docs returned 3 results
[2026-03-14 10:42:16] INFO  calling answer
[2026-03-14 10:42:18] INFO  request complete (5.2s)
[2026-03-14 10:42:13] INFO  request received: "where do I update billing"
[2026-03-14 10:42:14] INFO  calling fetch_doc
[2026-03-14 10:42:14] INFO  fetch_doc returned
[2026-03-14 10:42:15] INFO  calling answer
[2026-03-14 10:42:16] INFO  request complete (3.1s)

仔细读这些行。至少有两个并发请求交织在一起。你无法分辨哪次 search_docs 调用属于哪个用户,看不到工具实际收到了什么查询、返回了什么结果、模型接下来做出了什么决策,也看不到用户最终收到了什么回答。有的只是时间戳和工具名称。仅此而已。

这就是生产调试的悬崖。单用户、顺序执行、REPL 驱动的开发方式让你靠看滚屏就能得到所需的一切信息。一旦面对并发请求、异步工具层,以及一个指向具体故障的愤怒用户,你就需要 print() 给不了你的三样东西:

  • 关联性。 每条日志都必须标明它属于哪个请求。
  • 结构性。 "Calling search_docs" 毫无价值;"search_docs(query='Q3 invoices', filter='date>=2025-07-01')" 才有价值。
  • 层次性。 调用发生在调用之内。智能体循环包含模型调用,模型调用决定发起工具调用,工具调用又触发数据库查询。这种嵌套关系是不可或缺的上下文。

跨度树作为正确的数据模型

能同时给你这三样东西的数据结构是跨度树(span tree)。一个跨度(span)代表一次操作——一次模型调用、一次工具调用、一次检索、一次智能体运行。它有开始时间、结束时间、唯一 ID、父跨度 ID(若为根节点则为空)、名称,以及一袋描述发生了什么的属性。跨度可以嵌套:子跨度在父跨度之后开始,在父跨度之前结束。

对于一次智能体运行,这棵树看起来是这样的:

invoke_agent "user asks about Q3 invoices" ┐ ├─ chat claude-sonnet-4-5 3 turns │ parent span │ input: 2,341 tok out: 412 tok │ │ ├─ execute_tool search_docs │ │ │ args: {query: "Q3 invoices"} │ │ │ result: 5 chunks │ │ ├─ execute_tool search_docs │ │ │ args: {query: "Q3 2025"} │ │ │ result: 3 chunks │ │ └─ chat claude-sonnet-4-5 synthesis │ │ output: "Your Q3 invoices..." │ └─ duration: 5.2s tokens: 2,753 cost: $0.034 ┘

有了这样一棵跨度树,你可以回答:智能体用了哪些工具?传入了什么参数?按什么顺序执行?有没有失败?每次各花了多长时间?每次模型调用消耗了多少令牌(token)?最终回答是什么?这些问题你都能在几秒内回答,只需按追踪 ID 查询任何历史请求。

OpenTelemetry GenAI 规范

这不是什么新发明。OpenTelemetry——已经作为大多数生产追踪基础的可观测性标准——设有专门针对 LLM 和智能体语义的工作组,并发布了一套词汇表。截至 2026 年,这些规范仍标记为实验性,但字段名称已趋于稳定,各大厂商(Datadog、Grafana、Langfuse、Braintrust)均已支持。使用这些名称而不是自造一套,意味着你的追踪数据能直接接入现成工具。

本章其余部分将用到的核心跨度名称和属性:

跨度名称
何时发出
关键属性
invoke_agent
每次智能体运行一次(顶层)
gen_ai.agent.namegen_ai.conversation.iduser.id
chat
每次模型调用
gen_ai.request.modelgen_ai.usage.input_tokensgen_ai.usage.output_tokensgen_ai.response.finish_reasons
execute_tool
每次工具分发
gen_ai.tool.namegen_ai.tool.call.id,以及你自己的 args/result

当你选择启用时,跨度还会携带对话内容:gen_ai.system_instructionsgen_ai.input.messagesgen_ai.output.messages。这些字段体量较大且包含用户数据,因此按条件记录——详见下文。

你不需要 OpenTelemetry 收集器或厂商账号才能上手。数据模型才是关键所在——一旦你的智能体能产出带有这些名称和属性的跨度,之后可以随时将其发送到任何地方:JSONL 文件、Langfuse、Datadog、你自己的 Postgres。先定数据模型,再定目的地。

Question
我的智能体是同步运行的。单用户,本地 CLI。真的需要跨度吗?

如果你永远不会并发运行、永远不会部署、也永远不需要调试十分钟前发生的故障——那不需要,print 就够了。但这是一个很短的清单。一旦你在智能体前面加了一个 FastAPI 处理器(第 2.4 章),或者跑一个同时触发 50 条轨迹的评估套件(第 1.4 章),交织的日志就会变得不可读。现在加跨度没有任何成本;等到你有了一堆没有考虑关联性的代码路径再加,代价就很高了。

Question
为什么是"跨度",而不是普通的结构化 JSON 日志?

跨度就是结构化 JSON 日志,只是多了普通日志没有的两个结构:时长(开始/结束时间对)和父指针。正是这两个字段让你能渲染出上面的树状视图。没有它们,你只有一条扁平的事件流;有了它们,你就有了一个可以导航的层次结构。现代可观测性工具(Langfuse、Braintrust、Datadog 等)需要这种结构,因为它们的 UI 本质上就是树状视图。

STEP 2

为循环插桩。

现在进入工程实现。我们将直接使用 OpenTelemetry SDK,为第 1.1 章的智能体循环添加追踪。无论跨度最终去向何处,模式都是一样的——本步骤结束时,我们先把它们指向一个本地开发用的 JSONL 文件;章节末尾,你可以将导出器(exporter)换成 Langfuse、Braintrust 或任何兼容 OTLP 的后端,代码零改动。

最小化配置

三个组件:追踪器(给你跨度)、导出器(将跨度发送到某处)、处理器(高效批处理)。在应用启动时一次性配置:

# obs/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource

def init_tracing(service_name: str = "my-agent"):
    resource = Resource.create({"service.name": service_name})
    provider = TracerProvider(resource=resource)

    # For now: dump spans to stdout as JSON. Swap for OTLPSpanExporter
    # pointed at Langfuse / Braintrust / Datadog later — zero other changes.
    provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
    trace.set_tracer_provider(provider)

tracer = trace.get_tracer("agent")

包裹智能体循环

现在我们包裹三个关键点:顶层运行、每次模型调用、每次工具分发。与第 1.1 章的裸循环相比,唯一的区别是 with tracer.start_as_current_span(...) 包裹器和属性设置。

# agent/loop.py — instrumented version
from opentelemetry import trace
from anthropic import Anthropic
from uuid import uuid4

tracer = trace.get_tracer("agent")
client = Anthropic()

async def run_agent(user_msg: str, user_id: str):
    with tracer.start_as_current_span("invoke_agent") as root:
        root.set_attribute("gen_ai.agent.name", "research-assistant")
        root.set_attribute("gen_ai.conversation.id", str(uuid4()))
        root.set_attribute("user.id", user_id)

        messages = [{"role": "user", "content": user_msg}]
        for step in range(20):  # budget
            response = await _chat(messages)
            messages.append({"role": "assistant",
                             "content": response.content})

            if response.stop_reason == "end_turn":
                root.set_attribute("gen_ai.response.finish_reasons",
                                   ["end_turn"])
                return response.content[0].text

            tool_results = []
            for block in response.content:
                if block.type == "tool_use":
                    result = await _dispatch_tool(block)
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })
            messages.append({"role": "user", "content": tool_results})

async def _chat(messages):
    with tracer.start_as_current_span("chat") as span:
        span.set_attribute("gen_ai.request.model", "claude-sonnet-4-5")
        span.set_attribute("gen_ai.provider.name", "anthropic")
        response = client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages,
        )
        span.set_attribute("gen_ai.usage.input_tokens",
                           response.usage.input_tokens)
        span.set_attribute("gen_ai.usage.output_tokens",
                           response.usage.output_tokens)
        span.set_attribute("gen_ai.response.finish_reasons",
                           [response.stop_reason])
        return response

async def _dispatch_tool(block):
    with tracer.start_as_current_span("execute_tool") as span:
        span.set_attribute("gen_ai.tool.name", block.name)
        span.set_attribute("gen_ai.tool.call.id", block.id)
        span.set_attribute("tool.args", json.dumps(block.input))
        try:
            result = await HANDLERS[block.name](**block.input)
            span.set_attribute("tool.result_size", len(str(result)))
            return result
        except Exception as e:
            span.set_attribute("error.type", type(e).__name__)
            span.record_exception(e)
            raise
# agent/loop.py — instrumented version
from opentelemetry import trace
from openai import OpenAI
from uuid import uuid4

tracer = trace.get_tracer("agent")
client = OpenAI()

async def run_agent(user_msg: str, user_id: str):
    with tracer.start_as_current_span("invoke_agent") as root:
        root.set_attribute("gen_ai.agent.name", "research-assistant")
        root.set_attribute("gen_ai.conversation.id", str(uuid4()))
        root.set_attribute("user.id", user_id)

        # Responses API maintains state via previous_response_id
        prev_id = None
        current_input = user_msg
        for step in range(20):
            response = await _chat(current_input, prev_id)
            prev_id = response.id

            tool_calls = [item for item in response.output
                          if item.type == "function_call"]
            if not tool_calls:
                root.set_attribute("gen_ai.response.finish_reasons",
                                   ["stop"])
                return response.output_text

            results = []
            for call in tool_calls:
                result = await _dispatch_tool(call)
                results.append({
                    "type": "function_call_output",
                    "call_id": call.call_id,
                    "output": str(result),
                })
            current_input = results

async def _chat(input_data, previous_response_id):
    with tracer.start_as_current_span("chat") as span:
        span.set_attribute("gen_ai.request.model", "gpt-5.5")
        span.set_attribute("gen_ai.provider.name", "openai")
        response = client.responses.create(
            model="gpt-5.5",
            tools=TOOLS,
            input=input_data,
            previous_response_id=previous_response_id,
        )
        span.set_attribute("gen_ai.usage.input_tokens",
                           response.usage.input_tokens)
        span.set_attribute("gen_ai.usage.output_tokens",
                           response.usage.output_tokens)
        return response

async def _dispatch_tool(call):
    with tracer.start_as_current_span("execute_tool") as span:
        span.set_attribute("gen_ai.tool.name", call.name)
        span.set_attribute("gen_ai.tool.call.id", call.call_id)
        args = json.loads(call.arguments)
        span.set_attribute("tool.args", call.arguments)
        try:
            result = await HANDLERS[call.name](**args)
            span.set_attribute("tool.result_size", len(str(result)))
            return result
        except Exception as e:
            span.set_attribute("error.type", type(e).__name__)
            span.record_exception(e)
            raise

你得到了什么

跑一个查询,你会在 stdout 得到一串 JSON 跨度流。按树状顺序整理并美化输出:

{
  "name": "invoke_agent",
  "trace_id": "8f3c2e1a9b4d5067...",
  "span_id": "a0b1c2d3...",
  "parent_span_id": null,
  "start_time": "2026-03-14T10:42:13.412Z",
  "end_time":   "2026-03-14T10:42:18.673Z",
  "attributes": {
    "gen_ai.agent.name": "research-assistant",
    "gen_ai.conversation.id": "4e7a-9c2f-...",
    "user.id": "u_8821",
    "gen_ai.response.finish_reasons": ["end_turn"]
  }
}
  {
    "name": "chat",
    "parent_span_id": "a0b1c2d3...",
    "start_time": "10:42:13.420Z",
    "end_time":   "10:42:14.108Z",
    "attributes": {
      "gen_ai.request.model": "claude-sonnet-4-5",
      "gen_ai.provider.name": "anthropic",
      "gen_ai.usage.input_tokens": 2341,
      "gen_ai.usage.output_tokens": 89,
      "gen_ai.response.finish_reasons": ["tool_use"]
    }
  }
    {
      "name": "execute_tool",
      "parent_span_id": "a0b1c2d3..." (sibling of chat),
      "attributes": {
        "gen_ai.tool.name": "search_docs",
        "gen_ai.tool.call.id": "toolu_01ABC...",
        "tool.args": "{\"query\": \"Q3 invoices\"}",
        "tool.result_size": 1842
      }
    }
    ...
相比 print 方式的改变

原来是 print 的每一行,现在都变成了跨度上的一个属性。跨度的父指针编码了调用层次。追踪 ID 将每个跨度都关联到其发起请求——并发请求不会相互混淆,因为每个请求在顶层都有自己的追踪 ID。

树状结构是关键所在。一旦拥有它,市面上所有可观测性工具——Langfuse、Braintrust、Datadog、Honeycomb、Grafana——都能接入并给你一个 UI。你也可以把它导出为 JSONL,用 jq 查询。数据是可移植的。

指向厂商

一旦你的代码能发出跨度,将其发送到厂商只需改一行:换掉导出器。使用 Langfuse(免费套餐,专为 LLM 应用设计):

# obs/tracing.py — Langfuse variant
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter
)

def init_tracing(service_name: str = "my-agent"):
    resource = Resource.create({"service.name": service_name})
    provider = TracerProvider(resource=resource)

    # Replace ConsoleSpanExporter with OTLP pointed at Langfuse
    exporter = OTLPSpanExporter(
        endpoint="https://us.cloud.langfuse.com/api/public/otel/v1/traces",
        headers={"Authorization": f"Bearer {os.environ['LANGFUSE_KEY']}"},
    )
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)

Braintrust、Datadog、Honeycomb 以及任何其他兼容 OTLP 的后端,区别仅在于 URL 和认证头。智能体代码无需改动。

内容怎么办?

你会注意到我没有把提示词(prompt)或补全内容放到跨度上。这是刻意为之——它们体量大,往往包含用户数据,而且你通常不希望把每一段检索出的文档文本都回写进可观测性存储。规范提供了 gen_ai.input.messagesgen_ai.output.messages,但建议按条件启用:

# Only record full content for sampled traces (1%) or for traces
# flagged for inspection. Cheap to add per-call.

CAPTURE_CONTENT = os.environ.get("CAPTURE_CONTENT") == "1"

def should_capture(trace_id: str) -> bool:
    if CAPTURE_CONTENT: return True
    # Sample 1% by trace_id hash for ambient observability
    return int(trace_id[:8], 16) % 100 == 0

if should_capture(span.context.trace_id):
    span.set_attribute("gen_ai.input.messages",
                       json.dumps(messages)[:10000])
    span.set_attribute("gen_ai.output.messages",
                       json.dumps([b.model_dump() for b in response.content]))

如果你处理的是受监管数据——医疗、金融、欧盟个人数据——请咨询你的隐私团队,再开启全内容捕获。追踪就是日志;日志就是存储;存储有合规影响。上述 1% 采样模式之所以是生产环境默认值,原因正在于此。

Question
这些追踪不会拖慢我的智能体吗?

实际上不会。OpenTelemetry SDK 使用异步批处理器——跨度在内存中排队,在后台批量刷入导出器。热路径的开销约为每个跨度几微秒;即使一个智能体每次运行发出 20 个跨度,也只需付出个位数毫秒的代价。与单次 LLM 调用 800ms+ 的延迟相比,追踪成本几乎不可见。

真正可能造成损耗的地方是:在每个跨度上序列化巨大的提示词/补全内容作为属性。这就是为什么内容捕获要按条件启用。结构性属性(模型名称、令牌数、工具名称)体量很小,可以始终开启。

Question
该选哪个厂商——Langfuse、Braintrust、Datadog,还是别的?

实话实说:没有你感觉的那么重要,因为 OpenTelemetry 数据模型让它们可以互换。我的粗略启发式原则:

  • Langfuse——免费套餐最好,LLM 原生 UI,上手最简单。如果你是小团队,技术栈就是"纯 LLM",选这个。
  • Braintrust——评估(eval)集成最佳,对提示词工程工作流有自己的主张。如果第 1.4 章的评估是你团队的日常工作,选这个。
  • Datadog / Honeycomb / Grafana——选公司已经在用的那个。针对 LLM 的专属功能正在快速追赶 LLM 原生工具;统一视图的价值很高。
  • 自建(仅 JSONL → SQLite)——个人项目完全可行,也是理解这些工具原理的正确答案。一旦有团队成员需要查看追踪,就不再可行了。

锁定风险很低,因为数据模型是共享的。从一个迁移到另一个只需改配置,不需要重写代码。

STEP 3

能在 asyncio.gather 中存活的日志。

跨度负责结构化的故事:哪些操作发生了,按什么顺序,带什么属性。你还需要普通日志来讲述非结构化的故事——错误信息、调试上下文、你认为不值得单独建属性的中间值。问题是,一旦你在 asyncio.gather 内部越过 await 边界,普通的 Python logging 就会丢失与追踪树的连接。

具体来说,有一个难以察觉的失效方式:

# Naive logging — what NOT to do
import logging
log = logging.getLogger("agent")

async def fan_out_retrieval(queries):
    # Logs from these concurrent calls interleave with no way
    # to tell which log belongs to which query.
    return await asyncio.gather(*[search(q) for q in queries])

async def search(query):
    log.info(f"searching for {query}")
    results = await bm25.search(query)
    log.info(f"found {len(results)} results")
    return results
# Log output during a concurrent run:
INFO:agent:searching for Q3 invoices
INFO:agent:searching for billing settings
INFO:agent:found 12 results
INFO:agent:searching for refund policy
INFO:agent:found 3 results
INFO:agent:found 8 results

哪个结果数属于哪个查询?从日志里看不出来。这是 Step 1 提到的同一个关联性问题,只是出现在单个追踪内部。解决方法是通过 contextvars 传播上下文——Python 中让变量跟随异步任务而非线程的机制——并将该上下文绑定到每条日志记录上。

模式

structlog + contextvars 给出最简洁的方案。大约十五行配置,之后每条日志行都自动携带追踪上下文:

# obs/logging.py
import structlog, logging, sys
from opentelemetry import trace

def _add_trace_context(logger, method_name, event_dict):
    """Pull current trace IDs from OpenTelemetry into log record."""
    span = trace.get_current_span()
    ctx = span.get_span_context()
    if ctx.is_valid:
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,  # pull bound vars
        _add_trace_context,                       # pull OTel IDs
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    logger_factory=structlog.stdlib.LoggerFactory(),
)
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

log = structlog.get_logger("agent")

让这一切生效的两个关键:

  • merge_contextvars——拉入通过 structlog.contextvars.bind_contextvars(...) 绑定的所有变量。绑定的作用域是当前异步任务:asyncio.gather 为每个协程创建独立的上下文,因此绑定不会在并发调用之间泄漏。
  • _add_trace_context——在日志发出时读取当前 OpenTelemetry 跨度 ID。每条日志行自动获得日志触发时活跃跨度的追踪 ID 和跨度 ID。

使用方式

原来你写的是:

log.info(f"searching for {query}")  # f-string concatenation

现在你写:

log.info("searching", query=query, top_k=5)

在并发运行时,输出变成了:

{"timestamp":"2026-03-14T10:42:13.421Z","level":"info","event":"searching",
 "query":"Q3 invoices","top_k":5,
 "trace_id":"8f3c2e1a9b4d50670c8...","span_id":"a0b1c2d3e4f50678"}
{"timestamp":"2026-03-14T10:42:13.422Z","level":"info","event":"searching",
 "query":"billing settings","top_k":5,
 "trace_id":"8f3c2e1a9b4d50670c8...","span_id":"b1c2d3e4f5067890"}
{"timestamp":"2026-03-14T10:42:13.890Z","level":"info","event":"found",
 "query":"Q3 invoices","count":12,
 "trace_id":"8f3c2e1a9b4d50670c8...","span_id":"a0b1c2d3e4f50678"}
{"timestamp":"2026-03-14T10:42:13.894Z","level":"info","event":"found",
 "query":"billing settings","count":8,
 "trace_id":"8f3c2e1a9b4d50670c8...","span_id":"b1c2d3e4f5067890"}

现在你可以用两种方式回答"哪个结果数属于哪个查询":通过显式的 query 字段,或通过匹配 span_id。两种都有效。后者适用于碰巧没有重复查询字符串的日志。

绑定更高层次的上下文

在智能体运行的顶部,绑定 user_id 和任何你希望出现在后续每条日志行上的其他标识符:

from structlog.contextvars import bind_contextvars, clear_contextvars

async def run_agent(user_msg: str, user_id: str):
    clear_contextvars()
    bind_contextvars(user_id=user_id, agent="research-assistant")

    with tracer.start_as_current_span("invoke_agent"):
        log.info("agent_start", user_msg_len=len(user_msg))
        # ... agent loop ...
        log.info("agent_done", duration_s=elapsed)

run_agent 内部任何地方发出的每条日志——包括来自工具处理器、检索代码、任何你调用的东西——都自动获得绑定的 user_id,这要归功于 contextvars。而且由于 contextvars 是异步任务作用域的,两个并发的 run_agent 调用不会互相踩踏各自的绑定。

搜索与聚合

一旦日志是带有追踪 ID 的结构化 JSON,两件事就变得轻而易举:

  • 深入单个请求。 jq 'select(.trace_id=="8f3c...")' app.log 输出该请求的每一条日志行。结合上一步的跨度树,你就拥有了完整的重建信息。
  • 跨请求聚合。 jq -s 'group_by(.event) | map({event:.[0].event, count:length})' app.log 告诉你哪些事件触发频率最高。对于发现高频错误事件非常有用。

大多数可观测性厂商也支持接入结构化日志,并在追踪和日志共享同一追踪 ID 时自动关联——Langfuse、Datadog、Honeycomb 都可以做到。你不必在追踪和日志之间二选一;两者都用,它们会相互引用。

如果本步骤只做一件事:把所有 f"..." 风格的日志消息换成 log.info("event_name", field=value, ...)。"每条日志都是带有具名字段的有类型事件"这一纪律,会让你受益多年。异步关联只是额外的收获。

Question
我到处都在用标准 logging 模块。可以继续用吗?

可以。structlog 是对 logging 的包裹而非替换;它坐在标准处理器前面进行重新格式化。现有的 logging.getLogger("foo").info("bar") 调用继续有效——只是输出纯字符串而非结构化记录。你可以逐文件迁移。

最小有效步骤:不要迁移整个代码库。只迁移智能体循环和工具处理器——关联性最重要的地方——其余的留着不动。

Question
为什么用 contextvars 而不是线程本地存储?

线程本地存储是按操作系统线程隔离的;asyncio 在单个线程上运行多个任务。如果任务 A 中设置的绑定对共享同一线程的任务 B 可见,就会产生我们试图避免的关联性 bug。contextvars 是按异步任务隔离的,因此每个 asyncio.gather 的子任务都有自己独立的上下文。这正是 PEP 567 的设计初衷。

STEP 4

回放:在你的笔记本电脑上重现故障。

你现在有了每次生产运行的跨度树,以及与之关联的结构化日志。这已经能帮你搞定大多数调试工作的 80%:读追踪,看到问题工具调用,定位问题结果。但对于另外 20%——你需要逐步走过智能体行为、改动一个提示词(prompt)、看看会有什么不同结果的 bug——你需要回放(replay)

回放的含义是:取生产环境中的一个追踪 ID,用相同的输入重新运行智能体,但不真正调用工具,而是返回原始运行中记录的确切工具结果。模型看到的是与之前相同的上下文窗口(context window),以相同的方式走完循环,你可以在任何步骤暂停、变更或重新提示。这是智能体 AI 的调试器等价物。

两件套架构

要让回放生效,你需要在原始运行期间持久化工具 I/O,然后教会工具分发器在回放模式下查找已记录的结果。大约四十行代码。

# obs/replay.py
import json, contextvars
from pathlib import Path

REPLAY_DIR = Path("runs/replay")
REPLAY_DIR.mkdir(parents=True, exist_ok=True)

# When non-None, _dispatch_tool returns recorded results instead of
# calling handlers. Set by the replay CLI.
replay_mode: contextvars.ContextVar = contextvars.ContextVar(
    "replay_mode", default=None
)

def recorder_path(trace_id: str) -> Path:
    return REPLAY_DIR / f"{trace_id}.jsonl"

def record(trace_id: str, event: dict):
    """Append a tool I/O event to this trace's recording."""
    with recorder_path(trace_id).open("a") as f:
        f.write(json.dumps(event) + "\n")

def load_recording(trace_id: str) -> list[dict]:
    path = recorder_path(trace_id)
    if not path.exists():
        raise FileNotFoundError(f"no recording for trace {trace_id}")
    return [json.loads(line) for line in path.open()]

在正常运行时记录

修改 _dispatch_tool,记录每个输入/输出对。我们用工具调用 ID(已在跨度上)作为查找键。记录以追踪 ID 为索引,因此每个请求的工具调用都在自己的文件里。

# Updated _dispatch_tool, replaces the version from Step 2
async def _dispatch_tool(block):
    with tracer.start_as_current_span("execute_tool") as span:
        span.set_attribute("gen_ai.tool.name", block.name)
        span.set_attribute("gen_ai.tool.call.id", block.id)

        trace_id = format(span.get_span_context().trace_id, "032x")

        # REPLAY MODE: return recorded result, skip the real call
        recording = replay_mode.get()
        if recording is not None:
            recorded = next(
                (e for e in recording if e["call_id"] == block.id),
                None,
            )
            if recorded:
                span.set_attribute("replay.matched", True)
                return recorded["result"]
            # Tool call wasn't in recording — model deviated from
            # original trajectory. Flag it but continue with a real call.
            span.set_attribute("replay.matched", False)

        # NORMAL MODE: call handler and record
        try:
            result = await HANDLERS[block.name](**block.input)
            record(trace_id, {
                "call_id": block.id,
                "tool": block.name,
                "args": block.input,
                "result": result,
            })
            return result
        except Exception as e:
            record(trace_id, {
                "call_id": block.id,
                "tool": block.name,
                "args": block.input,
                "error": str(e),
            })
            raise

回放 CLI

一条命令。接受追踪 ID,加载记录,在回放模式下针对原始用户消息运行智能体。

# scripts/replay.py
import argparse, asyncio
from agent.loop import run_agent
from obs.replay import load_recording, replay_mode

parser = argparse.ArgumentParser()
parser.add_argument("trace_id")
parser.add_argument("--user-msg", required=True,
                    help="the original user message")
args = parser.parse_args()

recording = load_recording(args.trace_id)
print(f"loaded {len(recording)} tool events from trace {args.trace_id}")

async def main():
    token = replay_mode.set(recording)
    try:
        result = await run_agent(args.user_msg, user_id="replay")
        print("---")
        print(result)
    finally:
        replay_mode.reset(token)

asyncio.run(main())

看起来是什么样子

$ python scripts/replay.py 8f3c2e1a9b4d... \
    --user-msg "show me my Q3 invoices"

loaded 3 tool events from trace 8f3c2e1a9b4d...

[span: invoke_agent  agent=research-assistant  user=replay]
  [span: chat  model=claude-sonnet-4-5  in=2341 out=89  finish=tool_use]
  [span: execute_tool  tool=search_docs  replay.matched=True]
  [span: execute_tool  tool=search_docs  replay.matched=True]
  [span: chat  model=claude-sonnet-4-5  in=3104 out=271  finish=tool_use]
  [span: execute_tool  tool=fetch_doc  replay.matched=True]
  [span: chat  model=claude-sonnet-4-5  in=4892 out=412  finish=end_turn]
---
"Your Q3 2025 invoices are: INV-4421 ($2,300, paid),
 INV-4438 ($1,150, outstanding), INV-4502 ($875, paid)..."

你刚刚在笔记本电脑上重现了一次生产运行。从这里你可以:

  • 修改系统提示词并重新运行,看看新提示词是否会产生更好的回答(记录的工具结果仍然匹配,因为模型的工具调用不会改变形态)。
  • _dispatch_tool 内部加入 print 语句或断点,检查每一次记录的交互。
  • 用不同的模型回放(Sonnet → Opus),在真实生产案例上进行 A/B 测试。
  • 将追踪保存到评估(eval)集——它刚刚成为一个回归(regression)测试。

当回放偏离时

如果你的改动导致模型发起了原始运行中没有的工具调用,你的记录就没有匹配项。上面的代码会在跨度上打印 replay.matched=False,然后回退到调用真实的处理器。这通常是正确的行为——你想看看会发生什么——但可能产生副作用(向用户收费、发送邮件)。为了回放安全,在回放激活时用一个标志门控状态变更工具并将其打桩:

if replay_mode.get() is not None and POLICIES[block.name].scope != Scope.READ_ONLY:
    return {"error": "state-changing tool stubbed during replay"}

ScopePOLICIES 来自第 2.3 章安全。)只读工具始终可以安全回放;状态变更工具要么从记录中回放,要么返回一个打桩错误。

为什么这值得投入

第一次上线智能体,某用户提了一个"我问了 X,它说了 Y"的 bug,你原来的工作流是:试图在本地重现,失败,因为语料库已经变了或者检索随机性不同,最后只能靠猜。你的新工作流是:从审计日志复制追踪 ID,运行 replay.py,看着确切的故障在你的笔记本上发生,修复它,再次回放以确认。差距是每个 bug 节省几小时,有时是几天。

这也是让回归评估成为可能的基础。每一个可重现的生产故障都成为永久评估案例。每一个你修复的问题都会一直保持修复状态,因为下次有任何改动,你都会回放所有案例,立刻发现任何回归。跨度 + 记录 + 回放是第 1.4 章评估的工程底座,也是第三部分所构建扩展的底座。

Question
记录每个工具结果数据量会很大。保留策略怎么定?

不必全部保留。两个实用模式:

  • 默认按比例采样。 记录 100% 的失败运行(成本低,价值高)和 1% 的成功运行(成本低,对常规调试有用)。两者都以根跨度上可见的完成原因/错误状态为依据。
  • 按时间分层。 保留完整记录 7 天为热数据;归档到冷存储保留 90 天;之后删除。对于高流量智能体,这对应热数据几百 MB、冷数据几 GB。

例外情况:被人工标记为待检查的运行(关联了支持工单、关联了评估失败、关联了客户投诉)永久全量保留。它们是未来的评估案例。

Question
我可以同时回放模型调用,而不只是工具调用吗?

可以,有些配置确实这样做。Anthropic 和 OpenAI 都提供提示词缓存和基于 ID 的响应检索,让你能重新获取给定输入的确切响应。但更有用的模式是回放模型调用——这样你就可以改变提示词或模型,看看新模型在相同工具结果上会有什么行为。

"全部回放"更接近视频录制;"只回放工具 I/O"更接近调试器。调试器版本在日常工作中更有用。

Question
我的智能体使用并行工具调用。回放还能用吗?

可以,因为我们按 call_id 匹配,而不是按调用顺序。如果模型在第 2 轮并发触发三次 search_docs 调用,这三次调用都会用各自不同的 ID 记录下来,回放分发器会为每次调用返回正确的结果。唯一的注意事项是时序敏感的行为无法精确重现——但工具 I/O 是精确重现的,这对几乎所有 bug 来说才是关键。

End of chapter 2.1

交付物

一个智能体,每次运行都会产出遵循 OpenTelemetry GenAI 规范的跨度树、能在并发执行中正常工作的结构化日志,以及一个 replay <trace_id> CLI,能在几秒内在你的笔记本电脑上重现生产故障。这是让第二部分其余章节——成本(2.2)、安全(2.3)、部署(2.4)——真正可调试的底座,也是让第三部分的评估驱动开发在规模上可行的底座。

  • OpenTelemetry SDK 已接入,本地开发使用 ConsoleSpanExporter
  • invoke_agent、chat 和 execute_tool 跨度带有 gen_ai.* 属性
  • 一行切换厂商:OTLP 导出器 → Langfuse / Braintrust / Datadog
  • structlog 配合 contextvars + OTel 追踪 ID 注入
  • 按条件捕获内容(1% 采样或标志门控)
  • 按追踪记录工具 I/O,写入 runs/replay/<trace_id>.jsonl
  • replay.py CLI:加载记录,设置 replay_mode contextvar,运行智能体
  • 回放期间打桩状态变更工具(与 2.3 安全作用域集成)