Agent observability and debugging: traces, cost attribution, and the non-determinism problem
"A deterministic pipeline fails the same way every time, so you grep the logs and move on. An agent fails differently on Tuesday than it did on Monday — with different tool calls, different branching decisions, and a different number of LLM round-trips — and your grep returns nothing useful. Observability for agents is not an upgrade of traditional monitoring; it is a fundamentally different discipline, one where the trace itself is variable-length, the "correct" behavior is semantic rather than structural, and the cost of a single bad run can be orders of magnitude higher than the median. This chapter builds the instrumentation, dashboards, alerting, and debugging workflows you need to keep agents trustworthy in production"
137.1 Why agent observability is harder
Traditional software observability rests on three pillars: logs, metrics, and traces. Each assumes a broadly deterministic execution path — a request enters, traverses a known call graph, and exits. Agents violate every one of those assumptions.
Non-deterministic execution paths. Given the same user prompt, an agent may take three steps on one run and eleven on the next. It may invoke tool A before tool B, or skip tool A entirely. The call graph is not fixed at deploy time; it is generated at inference time by the LLM.
Variable-length traces. A single “run” can span seconds or minutes, with anywhere from one to dozens of LLM calls. Traditional span-based tracing (Jaeger, Zipkin) expects a DAG of bounded depth. Agent traces are closer to unbounded recursive trees.
Semantic correctness. A web server either returns 200 or it does not. An agent can return 200, produce syntactically valid JSON, and still be semantically wrong — it answered the wrong question, hallucinated a tool output, or silently dropped a constraint. Observability must therefore include evaluation signals, not just status codes.
Cost amplification. A stuck agent that loops 40 times before hitting a token limit can burn $5 on a single request. Without per-run cost attribution, you will not notice until the monthly bill arrives.
Multi-model, multi-tool fan-out. Production agents often call different models for different sub-tasks (a cheap model for classification, an expensive one for generation) and invoke external APIs with their own latency and failure modes. The observability surface is combinatorially larger.
These five properties mean you cannot bolt Datadog onto an agent and call it done. You need an agent-native observability stack.
137.2 Agent trace model: sessions, runs, steps, LLM calls, tool calls
The foundational data model for agent observability is a hierarchy of five levels:
Session — groups all runs belonging to a single user conversation or task thread. It carries the user_id, session start time, and any persistent memory references.
Run — one complete request-to-response cycle. The user says “book me a flight to Tokyo,” and the agent works until it produces a final answer or fails. A run has a run_id, status (success / failure / timeout), total_cost, and total_latency.
Step — one iteration of the agent loop: think (LLM decides what to do), act (tool call or final answer), observe (tool result fed back). Steps are numbered sequentially within a run.
LLM call — the actual API call to the model. Records model_id, prompt_tokens, completion_tokens, latency_ms, temperature, stop_reason, and the raw messages.
Tool call — the invocation of an external tool or function. Records tool_name, arguments, result (or error), duration_ms, and http_status if applicable.
This hierarchy is the schema of your trace store. Everything else — dashboards, alerts, cost attribution — derives from it.
from dataclasses import dataclass, field
from typing import Optional
import time
import uuid
@dataclass
class LLMCall:
call_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
model: str = ""
prompt_tokens: int = 0
completion_tokens: int = 0
latency_ms: float = 0.0
temperature: float = 0.0
stop_reason: str = ""
cost_usd: float = 0.0
timestamp: float = field(default_factory=time.time)
@dataclass
class ToolCall:
call_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
tool_name: str = ""
arguments: dict = field(default_factory=dict)
result: Optional[str] = None
error: Optional[str] = None
duration_ms: float = 0.0
timestamp: float = field(default_factory=time.time)
@dataclass
class Step:
step_number: int = 0
llm_calls: list[LLMCall] = field(default_factory=list)
tool_calls: list[ToolCall] = field(default_factory=list)
reasoning: str = "" # chain-of-thought summary
timestamp: float = field(default_factory=time.time)
@dataclass
class Run:
run_id: str = field(default_factory=lambda: uuid.uuid4().hex[:16])
session_id: str = ""
user_id: str = ""
status: str = "running" # running | success | failure | timeout
steps: list[Step] = field(default_factory=list)
total_cost_usd: float = 0.0
total_latency_ms: float = 0.0
error_message: Optional[str] = None
created_at: float = field(default_factory=time.time)
def add_step(self, step: Step) -> None:
step.step_number = len(self.steps)
self.steps.append(step)
# Accumulate cost
for lc in step.llm_calls:
self.total_cost_usd += lc.cost_usd
@property
def step_count(self) -> int:
return len(self.steps)
137.3 Instrumenting the loop: what to capture at each step
Instrumentation must be low-friction — ideally a decorator or context manager that wraps the agent loop without polluting business logic.
What to capture
| Phase | Fields | Why |
|---|---|---|
| Pre-LLM | Full prompt (or hash), model id, temperature, tool definitions | Enables replay and trace diffing |
| Post-LLM | Raw completion, token counts, latency, stop reason, cost | Cost attribution, latency profiling |
| Pre-tool | Tool name, serialized arguments, trace id | Correlate tool calls across services |
| Post-tool | Result (truncated), error, duration, HTTP status | Tool reliability metrics |
| Step boundary | Step number, cumulative cost, cumulative tokens | Loop detection, budget enforcement |
| Run boundary | Final status, total cost, total steps, final answer | Top-level KPIs |
Decorator-based instrumentation
import functools
import time
import logging
from contextlib import contextmanager
from typing import Callable
logger = logging.getLogger("agent.trace")
class TraceCollector:
"""Collects trace data for a single agent run."""
def __init__(self, run: Run):
self.run = run
self._current_step: Optional[Step] = None
@contextmanager
def step(self):
"""Context manager for a single agent step."""
s = Step(timestamp=time.time())
self._current_step = s
try:
yield s
finally:
self.run.add_step(s)
self._current_step = None
logger.info(
"step.complete",
extra={
"run_id": self.run.run_id,
"step": s.step_number,
"llm_calls": len(s.llm_calls),
"tool_calls": len(s.tool_calls),
"cumulative_cost": self.run.total_cost_usd,
},
)
def trace_llm_call(self, func: Callable) -> Callable:
"""Decorator for LLM API calls."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
lc = LLMCall(model=kwargs.get("model", "unknown"))
t0 = time.perf_counter()
try:
result = await func(*args, **kwargs)
lc.prompt_tokens = result.usage.prompt_tokens
lc.completion_tokens = result.usage.completion_tokens
lc.stop_reason = result.stop_reason
lc.cost_usd = _estimate_cost(
lc.model, lc.prompt_tokens, lc.completion_tokens
)
return result
finally:
lc.latency_ms = (time.perf_counter() - t0) * 1000
if self._current_step:
self._current_step.llm_calls.append(lc)
return wrapper
def trace_tool_call(self, func: Callable) -> Callable:
"""Decorator for tool invocations."""
@functools.wraps(func)
async def wrapper(tool_name: str, arguments: dict, **kwargs):
tc = ToolCall(tool_name=tool_name, arguments=arguments)
t0 = time.perf_counter()
try:
result = await func(tool_name, arguments, **kwargs)
tc.result = str(result)[:2000] # truncate for storage
return result
except Exception as e:
tc.error = str(e)
raise
finally:
tc.duration_ms = (time.perf_counter() - t0) * 1000
if self._current_step:
self._current_step.tool_calls.append(tc)
return wrapper
# ---------- cost lookup (simplified) ----------
_COST_PER_1K = {
"gpt-4o": {"prompt": 0.0025, "completion": 0.01},
"gpt-4o-mini": {"prompt": 0.00015, "completion": 0.0006},
"claude-sonnet-4-20250514": {"prompt": 0.003, "completion": 0.015},
}
def _estimate_cost(model: str, prompt_tok: int, completion_tok: int) -> float:
rates = _COST_PER_1K.get(model, {"prompt": 0.002, "completion": 0.008})
return (prompt_tok / 1000) * rates["prompt"] + (
completion_tok / 1000
) * rates["completion"]
The key principle: capture everything at the boundary, decide what to store later. It is far cheaper to drop fields at write time than to re-instrument code to add them.
137.4 Platforms: LangSmith, Braintrust, Arize Phoenix, LangFuse, Helicone
The agent observability ecosystem is maturing fast. Here is a field-tested comparison as of early 2026.
| Platform | Deployment | Trace model | Eval built-in | Cost tracking | OSS | Best for |
|---|---|---|---|---|---|---|
| LangSmith | SaaS + self-host | Run/chain/step hierarchy | Yes (datasets, scoring) | Per-run | No | LangChain-native teams |
| Braintrust | SaaS | Experiment/log/span | Yes (scoring, human review) | Per-span | No | Eval-heavy workflows |
| Arize Phoenix | Self-host (OSS) | OpenTelemetry spans | Yes (LLM evals) | Manual | Yes | Teams wanting OTel compatibility |
| LangFuse | SaaS + self-host (OSS) | Trace/generation/span | Yes (scoring API) | Per-generation | Yes | Cost-conscious teams, self-host |
| Helicone | SaaS proxy | Request/response | Partial | Excellent | Partial | API-level visibility, caching |
Build vs. buy decision matrix
Buy (use a platform) when:
- You need dashboards and alerting in days, not months.
- Your team is < 10 engineers and cannot maintain infra.
- You want built-in eval/scoring workflows.
Build (custom) when:
- You have strict data residency requirements (no SaaS).
- Your agent architecture does not fit the platform’s trace model.
- You need sub-millisecond instrumentation overhead (rare).
- You already run an OpenTelemetry stack and want to extend it.
Hybrid (most common): use a platform for the UI and eval workflows, but own the data pipeline. Export traces to your own data warehouse for long-term analytics and cost modeling.
# Example: LangFuse integration (simplified)
from langfuse import Langfuse
langfuse = Langfuse(
public_key="pk-...",
secret_key="sk-...",
host="https://cloud.langfuse.com", # or self-hosted
)
def create_traced_run(user_id: str, session_id: str):
"""Create a LangFuse trace for an agent run."""
trace = langfuse.trace(
name="agent-run",
user_id=user_id,
session_id=session_id,
metadata={"agent_version": "2.3.1", "environment": "production"},
)
return trace
def log_step(trace, step_num: int, model: str, prompt: str, completion: str):
"""Log a single agent step as a generation."""
generation = trace.generation(
name=f"step-{step_num}",
model=model,
input=prompt,
output=completion,
metadata={"step_number": step_num},
)
generation.end()
return generation
def log_tool(trace, tool_name: str, args: dict, result: str, duration_ms: float):
"""Log a tool call as a span."""
span = trace.span(
name=f"tool:{tool_name}",
input=args,
output=result,
metadata={"duration_ms": duration_ms},
)
span.end()
return span
137.5 Cost attribution: $/run, $/step, $/user
Cost attribution is the single most under-invested area of agent operations. Without it, you cannot answer: “Which user is costing us $200/day?” or “Did last week’s prompt change save money or waste it?”
Token accounting
Every LLM call produces three numbers: prompt tokens, completion tokens, and (for some providers) cached tokens. The cost formula:
cost = (prompt_tokens * price_per_prompt_token)
+ (completion_tokens * price_per_completion_token)
- (cached_tokens * cache_discount_per_token)
For agents, prompt tokens grow across steps because the context window accumulates prior tool results. This means later steps are disproportionately expensive. A 10-step run does not cost 10x a 1-step run; it can cost 30-50x due to context accumulation.
Attribution hierarchy
Organization
└── Team / Project
└── User
└── Session
└── Run
└── Step
└── LLM Call (actual cost)
└── Tool Call (external API cost)
Each level aggregates costs from its children. This lets you slice: “What is our $/user/day?” or “What is the average cost of step 5+ across all runs?”
Budget enforcement
from dataclasses import dataclass
@dataclass
class BudgetPolicy:
max_cost_per_run: float = 1.00 # hard stop
max_cost_per_user_day: float = 10.00 # daily cap
max_steps_per_run: int = 25 # loop guard
warning_threshold: float = 0.7 # fraction of budget
class BudgetEnforcer:
"""Checks cost against policy before each LLM call."""
def __init__(self, policy: BudgetPolicy, cost_store: dict):
self.policy = policy
self.cost_store = cost_store # user_id -> daily_cost
def check_run_budget(self, run: Run) -> bool:
if run.total_cost_usd >= self.policy.max_cost_per_run:
run.status = "failure"
run.error_message = (
f"Budget exceeded: ${run.total_cost_usd:.3f} "
f">= ${self.policy.max_cost_per_run:.2f}"
)
return False
return True
def check_user_budget(self, user_id: str, run: Run) -> bool:
daily = self.cost_store.get(user_id, 0.0) + run.total_cost_usd
if daily >= self.policy.max_cost_per_user_day:
run.status = "failure"
run.error_message = f"Daily user budget exceeded: ${daily:.2f}"
return False
return True
def check_step_limit(self, run: Run) -> bool:
if run.step_count >= self.policy.max_steps_per_run:
run.status = "failure"
run.error_message = (
f"Step limit reached: {run.step_count} "
f">= {self.policy.max_steps_per_run}"
)
return False
return True
def pre_step_check(self, run: Run, user_id: str) -> bool:
"""Call before every agent step. Returns False to halt."""
return (
self.check_run_budget(run)
and self.check_user_budget(user_id, run)
and self.check_step_limit(run)
)
Tip: emit a warning event at 70% of any budget threshold. This gives the agent (or a meta-controller) a chance to wrap up gracefully rather than being hard-killed.
Cost dashboards should answer five questions
- What is our total spend this hour / day / week?
- Which users are in the top decile of cost?
- Which tools drive the most expensive runs (by triggering extra steps)?
- Is cost per successful run trending up or down after a prompt change?
- What fraction of spend goes to failed runs (wasted cost)?
137.6 Metrics: success rate, steps/task, cost/task, tool error rate, loop detection
Define your agent’s metric taxonomy before you build dashboards. Here is the production-tested set:
Primary metrics
| Metric | Definition | Target |
|---|---|---|
| Success rate | successful_runs / total_runs | > 90% (task-dependent) |
| Steps per task | Distribution of step counts per run | Low median, bounded p99 |
| Cost per task | USD per completed run | Budget-dependent |
| Latency (e2e) | Wall-clock time from request to final answer | p50 < 10s, p99 < 60s |
| Token efficiency | completion_tokens / prompt_tokens ratio | Trending down (less wasted context) |
Tool-level metrics
| Metric | Definition | Why it matters |
|---|---|---|
| Tool error rate | tool_errors / tool_calls per tool | Identifies flaky integrations |
| Tool latency | p50 / p99 duration per tool | Bottleneck detection |
| Tool call frequency | Calls per run, per tool | Usage patterns, deprecation signals |
| Tool retry rate | Retries / first attempts | Reliability signal |
Agent-specific metrics
| Metric | Definition | Why it matters |
|---|---|---|
| Loop detection rate | Runs where agent repeated same tool+args > 2x | Stuck agent behavior |
| Context utilization | tokens_used / context_window_size at final step | Context exhaustion risk |
| Fallback rate | Runs that triggered model fallback | Primary model reliability |
| Self-correction rate | Steps where agent revised prior output | Agent robustness signal |
from collections import Counter, defaultdict
import statistics
class AgentMetrics:
"""Compute agent metrics from a batch of completed runs."""
def __init__(self, runs: list[Run]):
self.runs = runs
def success_rate(self) -> float:
if not self.runs:
return 0.0
ok = sum(1 for r in self.runs if r.status == "success")
return ok / len(self.runs)
def steps_per_task(self) -> dict:
counts = [r.step_count for r in self.runs]
if not counts:
return {}
return {
"p50": statistics.median(counts),
"p99": sorted(counts)[int(len(counts) * 0.99)],
"mean": statistics.mean(counts),
"max": max(counts),
}
def cost_per_task(self) -> dict:
costs = [r.total_cost_usd for r in self.runs]
if not costs:
return {}
return {
"p50": statistics.median(costs),
"p99": sorted(costs)[int(len(costs) * 0.99)],
"mean": statistics.mean(costs),
"total": sum(costs),
}
def tool_error_rate(self) -> dict[str, float]:
tool_ok: dict[str, int] = defaultdict(int)
tool_err: dict[str, int] = defaultdict(int)
for r in self.runs:
for s in r.steps:
for tc in s.tool_calls:
if tc.error:
tool_err[tc.tool_name] += 1
else:
tool_ok[tc.tool_name] += 1
result = {}
for name in set(tool_ok) | set(tool_err):
total = tool_ok[name] + tool_err[name]
result[name] = tool_err[name] / total if total else 0.0
return result
def detect_loops(self, threshold: int = 3) -> list[str]:
"""Return run_ids where the agent called the same tool+args
more than `threshold` times."""
looping = []
for r in self.runs:
call_counter: Counter = Counter()
for s in r.steps:
for tc in s.tool_calls:
key = (tc.tool_name, str(sorted(tc.arguments.items())))
call_counter[key] += 1
if any(v >= threshold for v in call_counter.values()):
looping.append(r.run_id)
return looping
137.7 Dashboards: active runs, error rate, cost burn, model distribution, tool call heatmap
A production agent dashboard has three layers: real-time operations, daily analytics, and long-term trends.
Layer 1: Real-time operations (refresh every 10s)
- Active runs — count of currently executing agent runs.
- Error rate — 5-minute rolling window, broken down by error type (tool failure, budget exceeded, timeout, LLM error).
- Cost burn rate — $/minute, with a projected daily spend line.
- P50 / P99 latency — end-to-end run latency.
Layer 2: Daily analytics (refresh hourly)
- Success rate by task type — not all tasks are equal; a “summarize” task failing is different from a “execute trade” task failing.
- Steps distribution — histogram of steps per run. A bimodal distribution often indicates two populations: simple tasks and complex/stuck tasks.
- Cost per run distribution — box plot. Watch the right tail.
- Tool call heatmap — tools on the y-axis, time on the x-axis, color by call volume. Reveals patterns: “code_exec spikes at 2pm when batch jobs trigger.”
Layer 3: Long-term trends (daily aggregation)
- Cost per successful run — the unit economics metric. If this rises 20% after a prompt change, the change was expensive.
- Model mix over time — are you migrating traffic correctly?
- Failure mode taxonomy — categorize failures (hallucination, tool error, timeout, budget) and track proportions.
# Grafana-compatible metric emission using StatsD
import statsd
c = statsd.StatsClient("localhost", 8125, prefix="agent")
def emit_run_metrics(run: Run) -> None:
"""Emit metrics at run completion."""
c.incr(f"run.status.{run.status}")
c.timing("run.latency_ms", run.total_latency_ms)
c.gauge("run.cost_usd", run.total_cost_usd)
c.gauge("run.steps", run.step_count)
for step in run.steps:
for tc in step.tool_calls:
c.incr(f"tool.call.{tc.tool_name}")
c.timing(f"tool.latency.{tc.tool_name}", tc.duration_ms)
if tc.error:
c.incr(f"tool.error.{tc.tool_name}")
for lc in step.llm_calls:
c.incr(f"llm.call.{lc.model}")
c.gauge(f"llm.tokens.prompt.{lc.model}", lc.prompt_tokens)
c.gauge(f"llm.tokens.completion.{lc.model}", lc.completion_tokens)
137.8 Debugging non-deterministic failures
The hardest debugging problem in agent systems: a run fails on Tuesday but succeeds on Wednesday with the same input. You cannot reproduce it. Traditional debugging assumes reproducibility; agent debugging must work without it.
Strategy 1: Trace replay
Record the full trace — every LLM call’s prompt and completion, every tool call’s arguments and result — and replay it deterministically. This does not re-call the LLM; it replays the recorded decisions.
from typing import Any
class TraceReplayer:
"""Replay a recorded trace to reproduce agent behavior
without making live LLM/tool calls."""
def __init__(self, recorded_run: Run):
self.run = recorded_run
self._step_idx = 0
self._llm_idx = 0
self._tool_idx = 0
def replay_llm_call(self, prompt: str) -> dict[str, Any]:
"""Return the recorded LLM response instead of calling the API."""
step = self.run.steps[self._step_idx]
lc = step.llm_calls[self._llm_idx]
self._llm_idx += 1
return {
"model": lc.model,
"prompt_tokens": lc.prompt_tokens,
"completion_tokens": lc.completion_tokens,
"stop_reason": lc.stop_reason,
"latency_ms": lc.latency_ms,
# In production, you'd also store/return the actual completion text
}
def replay_tool_call(self, tool_name: str, args: dict) -> dict[str, Any]:
"""Return the recorded tool result."""
step = self.run.steps[self._step_idx]
tc = step.tool_calls[self._tool_idx]
self._tool_idx += 1
assert tc.tool_name == tool_name, (
f"Trace divergence: expected {tc.tool_name}, got {tool_name}"
)
return {"result": tc.result, "error": tc.error, "duration_ms": tc.duration_ms}
def advance_step(self) -> None:
self._step_idx += 1
self._llm_idx = 0
self._tool_idx = 0
Strategy 2: Seed reproducibility
Some providers support a seed parameter. When set, the model attempts (but does not guarantee) deterministic outputs for identical inputs. Use seeds during debugging sessions:
# Use a fixed seed during debug replay
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
seed=42,
temperature=0.0, # combine with temp=0 for best reproducibility
)
Warning: seed reproducibility is best-effort. Provider-side model updates, batching changes, or hardware differences can still produce different outputs. Never rely on seeds for correctness guarantees.
Strategy 3: Trace diffing
Compare a failing trace against a succeeding trace for the same input. The diff reveals the divergence point — the first step where behavior changed.
from dataclasses import dataclass
@dataclass
class TraceDiff:
divergence_step: int
field: str
expected: str
actual: str
def diff_traces(good: Run, bad: Run) -> list[TraceDiff]:
"""Find where two traces diverge."""
diffs = []
max_steps = min(len(good.steps), len(bad.steps))
for i in range(max_steps):
gs, bs = good.steps[i], bad.steps[i]
# Compare tool calls
for j, (gtc, btc) in enumerate(
zip(gs.tool_calls, bs.tool_calls)
):
if gtc.tool_name != btc.tool_name:
diffs.append(TraceDiff(i, "tool_name", gtc.tool_name, btc.tool_name))
if gtc.error != btc.error:
diffs.append(TraceDiff(i, "tool_error", str(gtc.error), str(btc.error)))
# Compare LLM stop reasons
for j, (glc, blc) in enumerate(
zip(gs.llm_calls, bs.llm_calls)
):
if glc.stop_reason != blc.stop_reason:
diffs.append(
TraceDiff(i, "stop_reason", glc.stop_reason, blc.stop_reason)
)
if len(good.steps) != len(bad.steps):
diffs.append(
TraceDiff(
max_steps,
"step_count",
str(len(good.steps)),
str(len(bad.steps)),
)
)
return diffs
Strategy 4: Bisection debugging
If the agent took 12 steps and failed at step 12, the question is: which earlier step introduced the poison? Feed the agent’s context from step N directly into a fresh LLM call and check if the output is reasonable. Binary-search across steps to find the first step where the agent’s reasoning goes off track.
The non-determinism playbook
- Capture everything — you cannot debug what you did not record.
- Replay first — confirm you can reproduce the exact trace offline.
- Diff second — compare against a known-good trace.
- Bisect third — narrow down the divergence to a single step.
- Check tool outputs — 70% of non-deterministic failures trace back to tool outputs that changed between runs (stale data, API changes, rate limits).
137.9 Alerting: error spikes, cost anomaly, loop detection, context exhaustion, tool timeout
Alerting for agents must be semantically aware. A 500 error from a tool is obvious; an agent that confidently returns a wrong answer is not. Define alerts across five categories:
Category 1: Error spikes
# Alert: error rate exceeds threshold
- alert: AgentErrorRateHigh
expr: |
rate(agent_run_status_total{status="failure"}[5m])
/ rate(agent_run_status_total[5m]) > 0.15
for: 3m
labels:
severity: critical
annotations:
summary: "Agent error rate above 15% for 3 minutes"
Category 2: Cost anomaly
Detect when hourly spend exceeds 2x the trailing 7-day average for that hour-of-day. This catches both absolute spikes and relative anomalies (e.g., a prompt change that silently doubled cost).
def detect_cost_anomaly(
current_hourly_cost: float,
historical_hourly_costs: list[float], # same hour, past 7 days
threshold_multiplier: float = 2.0,
) -> bool:
if not historical_hourly_costs:
return False
avg = sum(historical_hourly_costs) / len(historical_hourly_costs)
return current_hourly_cost > avg * threshold_multiplier
Category 3: Loop detection
An agent calling web_search("python datetime format") five times in a row is stuck. Alert when any run exceeds the repeated-action threshold.
def check_loop_alert(run: Run, max_repeats: int = 3) -> Optional[str]:
"""Return alert message if a loop is detected, else None."""
from collections import Counter
counter: Counter = Counter()
for step in run.steps:
for tc in step.tool_calls:
key = f"{tc.tool_name}:{hash(str(sorted(tc.arguments.items())))}"
counter[key] += 1
if counter[key] >= max_repeats:
return (
f"Loop detected in run {run.run_id}: "
f"{tc.tool_name} called {counter[key]} times "
f"with same arguments"
)
return None
Category 4: Context exhaustion
Alert when a run’s cumulative token count approaches the model’s context window. At 80% utilization, the agent is at risk of losing early context or hitting a hard limit.
CONTEXT_WINDOWS = {
"gpt-4o": 128_000,
"claude-sonnet-4-20250514": 200_000,
"gpt-4o-mini": 128_000,
}
def check_context_exhaustion(
run: Run, model: str, threshold: float = 0.80
) -> Optional[str]:
window = CONTEXT_WINDOWS.get(model, 128_000)
total_prompt = sum(
lc.prompt_tokens
for step in run.steps
for lc in step.llm_calls
if lc.model == model
)
# The last LLM call's prompt_tokens is the best proxy for current usage
if run.steps and run.steps[-1].llm_calls:
last_prompt = run.steps[-1].llm_calls[-1].prompt_tokens
if last_prompt > window * threshold:
return (
f"Context exhaustion warning: {last_prompt}/{window} "
f"tokens ({last_prompt / window:.0%}) in run {run.run_id}"
)
return None
Category 5: Tool timeout
Individual tool calls should have SLAs. Alert when a tool’s p99 latency exceeds its budget.
- alert: ToolLatencyHigh
expr: |
histogram_quantile(0.99,
rate(agent_tool_duration_seconds_bucket[10m])
) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "Tool {{ $labels.tool_name }} p99 latency > 30s"
Alert routing
| Severity | Channel | Response time |
|---|---|---|
| Critical (error spike, budget blow) | PagerDuty / on-call | < 15 min |
| Warning (loop detected, context high) | Slack channel | < 1 hour |
| Info (tool degraded, cost trending up) | Daily digest email | Next business day |
137.10 Logging: structured logs, trace-id propagation
Agent logs must be structured (JSON), correlated (trace-id propagation), and level-appropriate (not everything is ERROR).
Structured log schema
import json
import logging
import sys
class AgentLogFormatter(logging.Formatter):
"""JSON formatter with trace context."""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
}
# Inject trace context if available
for key in ("run_id", "session_id", "user_id", "step", "tool_name"):
val = getattr(record, key, None)
if val is not None:
log_entry[key] = val
# Include exception info
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
def configure_agent_logging() -> logging.Logger:
logger = logging.getLogger("agent")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(AgentLogFormatter())
logger.addHandler(handler)
return logger
Trace-id propagation through tool calls
When an agent invokes a tool that itself calls external services, the trace id must propagate so you can correlate the agent’s step with the downstream service’s logs.
import contextvars
from typing import Optional
# Context variable for trace propagation
_trace_ctx: contextvars.ContextVar[dict] = contextvars.ContextVar(
"agent_trace_ctx", default={}
)
def set_trace_context(run_id: str, step: int, session_id: str) -> None:
_trace_ctx.set({
"run_id": run_id,
"step": step,
"session_id": session_id,
})
def get_trace_headers() -> dict[str, str]:
"""Generate HTTP headers for trace propagation to tool services."""
ctx = _trace_ctx.get()
if not ctx:
return {}
return {
"X-Agent-Run-Id": ctx.get("run_id", ""),
"X-Agent-Step": str(ctx.get("step", "")),
"X-Agent-Session-Id": ctx.get("session_id", ""),
# Also set W3C trace context if using OpenTelemetry
"traceparent": f"00-{ctx.get('run_id', '')}-{ctx.get('step', '')}-01",
}
async def call_tool_with_tracing(
tool_name: str, args: dict, http_client, endpoint: str
) -> dict:
"""Invoke a tool's HTTP endpoint with trace headers."""
headers = get_trace_headers()
headers["Content-Type"] = "application/json"
response = await http_client.post(
endpoint, json={"tool": tool_name, "arguments": args}, headers=headers
)
return response.json()
Log levels for agents
| Level | When to use | Example |
|---|---|---|
| DEBUG | LLM raw prompt/completion (verbose, off in prod) | Full message array |
| INFO | Step boundaries, tool calls, run start/end | "step.complete" run_id=abc step=3 |
| WARNING | Budget threshold, retry, degraded tool | "budget.warning" run_id=abc pct=72% |
| ERROR | Tool failure, LLM error, budget exceeded | "tool.error" tool=sql_query err="timeout" |
| CRITICAL | Loop detected, safety violation, data breach | "loop.detected" run_id=abc repeats=5 |
Rule of thumb: in production, set the agent logger to INFO and the LLM-detail logger to WARNING. Enable DEBUG only when actively debugging a specific run (use the run_id to filter).
137.11 Privacy/compliance: PII in traces, redaction, retention
Agent traces are a compliance minefield. They contain user prompts (PII), tool outputs (potentially sensitive data), and LLM completions (which may hallucinate sensitive information). You must design for privacy from the start, not bolt it on.
Where PII appears in agent traces
| Trace field | PII risk | Example |
|---|---|---|
| User prompt | High | ”My SSN is 123-45-6789, can you…” |
| LLM completion | High | Model repeats or hallucinates PII |
| Tool arguments | Medium | sql_query("SELECT * FROM users WHERE email='...'") |
| Tool results | High | Database rows with names, emails, addresses |
| Session metadata | Low | user_id (pseudonymized) |
Redaction pipeline
Redact PII before writing to the trace store, not after. Once PII hits your logging pipeline, it is extremely difficult to purge retroactively.
import re
from typing import Callable
class PIIRedactor:
"""Redact common PII patterns from trace data."""
PATTERNS: list[tuple[str, str]] = [
# SSN
(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN_REDACTED]"),
# Email
(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b", "[EMAIL_REDACTED]"),
# Phone (US)
(r"\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b", "[PHONE_REDACTED]"),
# Credit card (basic)
(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "[CC_REDACTED]"),
# IP address
(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", "[IP_REDACTED]"),
]
def __init__(self, extra_patterns: list[tuple[str, str]] | None = None):
self.patterns = self.PATTERNS.copy()
if extra_patterns:
self.patterns.extend(extra_patterns)
self._compiled = [(re.compile(p), r) for p, r in self.patterns]
def redact(self, text: str) -> str:
for pattern, replacement in self._compiled:
text = pattern.sub(replacement, text)
return text
def redact_trace(self, run: Run) -> Run:
"""Redact PII from all text fields in a run. Mutates in place."""
for step in run.steps:
step.reasoning = self.redact(step.reasoning)
for tc in step.tool_calls:
tc.arguments = {
k: self.redact(str(v)) for k, v in tc.arguments.items()
}
if tc.result:
tc.result = self.redact(tc.result)
if tc.error:
tc.error = self.redact(tc.error)
return run
# Usage
redactor = PIIRedactor()
redacted_run = redactor.redact_trace(completed_run)
# Now safe to write to trace store
Retention policy
| Data tier | Retention | Storage |
|---|---|---|
| Full traces (with prompts/completions) | 7-30 days | Hot storage (Postgres, ClickHouse) |
| Aggregated metrics (no PII) | 1 year | Time-series DB (Prometheus, InfluxDB) |
| Redacted summaries | 90 days | Trace store |
| Raw LLM prompts/completions | 48 hours or per-policy | Encrypted, access-controlled |
Compliance checklist
- GDPR / CCPA: users can request deletion of their traces. Your trace store must support deletion by
user_idacross all tables. - SOC 2: access to trace data must be audited. Use role-based access control (RBAC) for the trace UI.
- HIPAA: if traces may contain protected health information (PHI), they must be encrypted at rest and in transit, and access logged.
- Data residency: if you use a SaaS observability platform, confirm where trace data is stored. Some platforms do not yet offer EU-region hosting.
Principle: treat agent traces with the same sensitivity as database query logs. They contain user data, and they will be subpoenaed if something goes wrong.
137.12 Mental model
-
Agent traces are trees, not chains. The five-level hierarchy (session, run, step, LLM call, tool call) is your foundational schema. Every metric, dashboard, and alert derives from it.
-
Non-determinism is the default. Do not design for reproducibility; design for explainability. Record enough state to understand any run after the fact, even if you cannot reproduce it.
-
Cost attribution is not optional. Agents can burn 100x their median cost on a single bad run. Track $/run, $/user, and $/step. Enforce budgets at the step level, not just the run level.
-
Instrument at the boundary. Capture data at the LLM call boundary and the tool call boundary. These two points give you 95% of the observability you need.
-
Diff traces, do not grep logs. When debugging a non-deterministic failure, compare a failing trace against a succeeding trace. The divergence point is almost always a changed tool output or an unlucky sample from the LLM.
-
Alert on semantics, not just syntax. A 200 status does not mean correctness. Build alerts for loops, context exhaustion, and cost anomalies — the failure modes specific to agents.
-
Redact before you store. PII will appear in agent traces. Build the redaction pipeline before you build the dashboard, not after your first compliance incident.
-
Buy the UI, own the data. Use a platform (LangFuse, LangSmith, Braintrust) for dashboards and eval workflows, but export traces to your own warehouse for long-term analytics, cost modeling, and compliance control.
Read it yourself
- LangFuse documentation — open-source tracing platform with excellent trace model documentation: langfuse.com/docs
- Arize Phoenix — open-source LLM observability built on OpenTelemetry: docs.arize.com/phoenix
- OpenTelemetry Semantic Conventions for GenAI — the emerging standard for LLM call instrumentation: opentelemetry.io/docs/specs/semconv/gen-ai/
- Lilian Weng, “LLM Powered Autonomous Agents” (2023) — foundational blog post on agent architectures, useful context for understanding what to observe.
- Braintrust AI — eval and observability platform with strong cost-tracking features: braintrust.dev/docs
- Chip Huyen, AI Engineering (O’Reilly, 2025) — Chapter on evaluation and monitoring of LLM applications covers foundational observability concepts.
Practice
-
Implement the five-level trace model from Section 137.2 in your preferred language. Ingest 100 synthetic agent runs into a SQLite database and write queries for: average steps per run, p99 cost, and top-3 most-called tools.
-
Build a budget enforcer that halts an agent run when cumulative cost exceeds $0.50 or step count exceeds 15. Test it by mocking an agent loop that calls a cheap model for planning and an expensive model for generation.
-
Instrument an existing agent (LangChain, CrewAI, or a custom loop) with the
TraceCollectorfrom Section 137.3. Run the agent on 10 different prompts and compare traces: how much does step count vary? What is the cost distribution? -
Build a trace differ. Record two traces of the same agent on the same prompt (different runs). Use the
diff_tracesfunction from Section 137.8 to find divergence points. What caused the divergence — LLM non-determinism or tool output changes? -
Implement the PII redactor from Section 137.11 and test it against 20 synthetic prompts containing SSNs, emails, and phone numbers. Measure: what is the false-negative rate? What patterns does the regex miss? How would you layer an NER model on top?
-
Set up alerting using Prometheus and Alertmanager (or your preferred stack). Implement the three alerts from Section 137.9: error rate spike, loop detection, and context exhaustion. Trigger each alert with synthetic data and verify the notification arrives.
-
Stretch: Design a trace-based regression test suite. Record 50 “golden” traces from a working agent. After a prompt or model change, re-run the same inputs and compare new traces against golden traces using the diffing approach from Section 137.8. Define a “regression” as: (a) step count increased by > 50%, (b) a new tool was called that was not in the golden trace, or (c) cost increased by > 2x. Implement the comparison pipeline and report which runs regressed. How would you handle legitimate behavioral changes (e.g., the agent found a better path)?