Part XI · Building Agents and Agent Infrastructure
Chapter 139 ~26 min read

-- Long-running and background agents: daemons, checkpoints, and async execution

"A truly useful agent is one you can walk away from---and come back hours later to find the job done, the costs controlled, and a clear trail of what happened while you were gone"

Every agent chapter so far assumed something convenient: the human waits while the agent works. That assumption collapses the moment a task takes thirty minutes---or thirty hours. Research agents that trawl hundreds of papers, coding agents that refactor an entire repository, monitoring agents that watch a production system overnight---none of these fit the synchronous request-response loop we built in Chapters 131—138.

This chapter tackles the infrastructure that turns a short-lived chat agent into a long-running background process: checkpointing so it can survive crashes, budgeting so it does not drain your cloud account, progress reporting so you know it is still alive, scheduling so it can fire on a cron, and the concurrency primitives that let dozens of runs coexist safely.

By the end you will have a mental model---and production code---for every point on the spectrum from interactive chat to autonomous daemon.


139.1 The shift from synchronous to asynchronous agents

A synchronous agent is conceptually simple. The user sends a message, the agent enters a tool-use loop, and eventually a final response streams back---all within a single HTTP connection or CLI session. The entire state lives in one process’s memory.

The problems are well-known:

ConstraintSynchronous agentAsynchronous agent
Maximum run timeBounded by connection timeout (minutes)Hours, days, or indefinite
Fault toleranceProcess crash = total lossCheckpoint + resume
User availabilityMust keep session openSubmit-and-collect
Cost visibilityDiscovered after the factEnforced via budget caps
ConcurrencyOne run at a time per sessionMany runs, queued and prioritised

The shift to async is not a single design decision---it is a cascade of infrastructure choices. You need a task store (where does the run’s state live?), a checkpoint mechanism (how do you serialise mid-run state?), a scheduler (who starts and restarts runs?), and a reporting channel (how does the user learn about progress?).

The rest of this chapter addresses each in turn.


139.2 The spectrum: interactive, background, autonomous, daemon

Not every agent needs every piece of that infrastructure. It helps to place your agent on a four-point spectrum and build only what its position demands.

The agent autonomy spectrum Interactive User waits Seconds Background Submit + poll Minutes--hours Autonomous Self-directed goals Hours--days Daemon Always-on loop Indefinite

Streaming + Checkpoints + Budgets, kill-switch + Scheduler, heartbeat

Each level inherits the infrastructure of the level before it

Interactive agents are what you have already built. A chat model with tool access, streaming responses, and a human in the loop after every step.

Background agents accept a task, run without a live connection, and notify on completion. Think of GitHub’s Copilot Workspace or Anthropic’s Claude Code background tasks: you describe the change, close your laptop, and come back to a pull request.

Autonomous agents extend the background pattern with open-ended goals. A research agent tasked with “survey the state of RLHF techniques and write a report” might run for hours, spawning sub-tasks, revising its own plan, and producing intermediate artefacts---all without human steering.

Daemon agents never terminate. A monitoring agent watches a production system, a compliance agent continuously scans new commits, a triage agent processes incoming support tickets around the clock. Daemons need heartbeats, automatic restarts, and very careful cost controls.

The key insight: every level on the spectrum inherits the infrastructure of the level before it. A daemon needs everything a background agent needs, plus scheduling and lifecycle management. Build from the bottom up.


139.3 Background agents: submit-and-poll, webhooks, and the Claude Code / Codex model

The simplest long-running pattern is submit-and-poll. A client submits a task description, receives a run ID, and periodically checks status.

# background_agent_api.py — FastAPI skeleton for a submit-and-poll agent service.
from __future__ import annotations

import asyncio
import uuid
from datetime import datetime, timezone
from enum import Enum
from typing import Any

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()


class RunStatus(str, Enum):
    QUEUED = "queued"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"


class RunRecord(BaseModel):
    run_id: str
    status: RunStatus
    created_at: datetime
    prompt: str
    result: Any | None = None
    error: str | None = None
    tokens_used: int = 0


# In production, replace with Redis / Postgres.
_runs: dict[str, RunRecord] = {}


class SubmitRequest(BaseModel):
    prompt: str
    max_tokens: int = 100_000
    timeout_seconds: int = 3600


@app.post("/runs", status_code=202)
async def submit_run(req: SubmitRequest) -> dict[str, str]:
    """Accept a task and return a run ID immediately."""
    run_id = str(uuid.uuid4())
    record = RunRecord(
        run_id=run_id,
        status=RunStatus.QUEUED,
        created_at=datetime.now(timezone.utc),
        prompt=req.prompt,
    )
    _runs[run_id] = record
    # Fire-and-forget: the worker picks this up from the store.
    asyncio.create_task(_execute_run(run_id, req))
    return {"run_id": run_id}


@app.get("/runs/{run_id}")
async def poll_run(run_id: str) -> RunRecord:
    """Poll for the current status of a run."""
    if run_id not in _runs:
        raise HTTPException(404, "Run not found")
    return _runs[run_id]


@app.post("/runs/{run_id}/cancel")
async def cancel_run(run_id: str) -> dict[str, str]:
    """Request cancellation of a running task."""
    if run_id not in _runs:
        raise HTTPException(404, "Run not found")
    _runs[run_id].status = RunStatus.CANCELLED
    return {"status": "cancellation_requested"}


async def _execute_run(run_id: str, req: SubmitRequest) -> None:
    """Placeholder agent loop — replace with your real agent."""
    record = _runs[run_id]
    record.status = RunStatus.RUNNING
    try:
        # --- your agent loop here ---
        await asyncio.sleep(2)  # simulate work
        record.result = {"answer": "done"}
        record.status = RunStatus.COMPLETED
    except Exception as exc:
        record.error = str(exc)
        record.status = RunStatus.FAILED

Real products layer more on top of this skeleton:

  • Webhooks. Instead of polling, the client registers a callback URL. When the run completes, the server POSTs the result. This is how GitHub notifies CI systems---and it is the natural fit for agent-to-agent workflows (Chapter 138).
  • Server-Sent Events (SSE). The client opens a streaming connection and receives incremental progress events. This is the model used by the Anthropic Messages API and by MCP’s streamable HTTP transport.
  • Queuing. In Claude Code’s background mode and OpenAI’s Codex, the submitted task enters a queue. A pool of workers picks tasks off the queue, one at a time, each in an isolated sandbox (container, VM, or firecracker microVM). The sandbox provides both security isolation (Chapter 136) and reproducibility.

The Claude Code / Codex model deserves special attention because it is becoming the template for coding agents everywhere:

  1. User submits a natural-language task plus a Git ref.
  2. The system clones the repo into a sandbox.
  3. An agent loop runs: read files, edit code, run tests, iterate.
  4. On completion, the agent opens a pull request (or pushes a branch).
  5. The user reviews asynchronously.

The critical design choice is that the unit of output is a Git commit, not a chat message. This gives the user a familiar review interface (diffs, CI checks, code review) and makes the agent’s work auditable.


139.4 Checkpointing: saving state, checkpoint stores, and resume-on-failure

Any run lasting more than a few minutes needs checkpointing---periodic snapshots of the agent’s state that allow resumption after a crash, a deployment, or a preemption.

A checkpoint must capture:

  1. Conversation history --- the messages exchanged so far (system, user, assistant, tool results).
  2. Tool state --- open files, database cursors, partial downloads.
  3. Plan state --- the agent’s current sub-goals and progress.
  4. Accumulated artefacts --- partial outputs, intermediate files, generated code.
# checkpoint.py — A minimal checkpoint store backed by the filesystem.
from __future__ import annotations

import json
import os
import time
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any


@dataclass
class Checkpoint:
    run_id: str
    step: int
    messages: list[dict[str, Any]]
    plan: dict[str, Any] = field(default_factory=dict)
    artefacts: dict[str, str] = field(default_factory=dict)
    tokens_used: int = 0
    wall_seconds: float = 0.0
    timestamp: float = field(default_factory=time.time)


class CheckpointStore:
    """Persist checkpoints to disk.  Swap for S3 / GCS / Redis in production."""

    def __init__(self, base_dir: str = "/tmp/agent_checkpoints") -> None:
        self._base = Path(base_dir)
        self._base.mkdir(parents=True, exist_ok=True)

    def _path(self, run_id: str, step: int) -> Path:
        run_dir = self._base / run_id
        run_dir.mkdir(exist_ok=True)
        return run_dir / f"step_{step:06d}.json"

    def save(self, cp: Checkpoint) -> Path:
        path = self._path(cp.run_id, cp.step)
        path.write_text(json.dumps(asdict(cp), default=str))
        return path

    def latest(self, run_id: str) -> Checkpoint | None:
        run_dir = self._base / run_id
        if not run_dir.exists():
            return None
        files = sorted(run_dir.glob("step_*.json"))
        if not files:
            return None
        data = json.loads(files[-1].read_text())
        return Checkpoint(**data)

    def list_steps(self, run_id: str) -> list[int]:
        run_dir = self._base / run_id
        if not run_dir.exists():
            return []
        return sorted(
            int(f.stem.split("_")[1]) for f in run_dir.glob("step_*.json")
        )


def resumable_agent_loop(
    run_id: str,
    initial_prompt: str,
    store: CheckpointStore,
    max_steps: int = 200,
) -> str:
    """Agent loop with checkpoint-based resume."""
    existing = store.latest(run_id)
    if existing:
        messages = existing.messages
        start_step = existing.step + 1
        tokens = existing.tokens_used
        print(f"Resuming run {run_id} from step {start_step}")
    else:
        messages = [{"role": "user", "content": initial_prompt}]
        start_step = 0
        tokens = 0

    for step in range(start_step, max_steps):
        # --- call LLM, execute tools, append to messages ---
        response = _call_llm(messages)          # placeholder
        messages.append(response)
        tokens += response.get("usage", {}).get("total_tokens", 0)

        # Checkpoint every 5 steps (tune to your cost/safety trade-off).
        if step % 5 == 0:
            cp = Checkpoint(
                run_id=run_id,
                step=step,
                messages=messages,
                tokens_used=tokens,
                wall_seconds=time.time(),
            )
            store.save(cp)

        if _is_done(response):
            return response["content"]

    return "Max steps reached."


def _call_llm(messages: list) -> dict:
    """Stub — replace with your model client."""
    return {"role": "assistant", "content": "...", "usage": {"total_tokens": 500}}


def _is_done(response: dict) -> bool:
    return "DONE" in response.get("content", "")

Checkpoint frequency is a trade-off. Checkpoint after every LLM call and you pay serialisation overhead; checkpoint every 50 steps and you lose up to 50 steps of work on a crash. A good default is every 3—5 tool-use cycles, or whenever the agent completes a logical sub-task.

For coding agents, there is a natural checkpoint boundary: every successful test run. If the agent edits three files and then the tests pass, that is the moment to snapshot.

In production, store checkpoints in a durable object store (S3, GCS) and keep a pointer in a relational database. The pointer table gives you a fast “list all checkpoints for run X” query; the object store holds the (potentially large) message history.


139.5 Temporal as backbone: agent runs as workflows, durable execution

Writing your own checkpoint-and-resume logic works for simple agents, but as complexity grows---retries, timeouts, sub-workflows, human-in-the-loop approvals---you are reinventing a workflow engine.

Temporal (and its open-source predecessor Cadence) provides durable execution: the runtime automatically persists the state of your workflow after every step, and replays it from history on failure. This is a near-perfect fit for agent orchestration.

# temporal_agent_workflow.py — Agent run modelled as a Temporal workflow.
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.common import RetryPolicy

# ── Activities (the actual work) ─────────────────────────────

@activity.defn
async def call_llm(messages: list[dict]) -> dict:
    """Call the LLM.  Temporal will retry on transient failures."""
    import anthropic

    client = anthropic.AsyncAnthropic()
    response = await client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=messages,
    )
    return {
        "role": "assistant",
        "content": response.content[0].text,
        "stop_reason": response.stop_reason,
        "usage": {
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
        },
    }


@activity.defn
async def execute_tool(tool_name: str, tool_input: dict) -> dict:
    """Run a tool inside the sandbox.  Temporal records the result."""
    # dispatch to your tool registry
    return {"result": f"Executed {tool_name}"}


@activity.defn
async def post_result(run_id: str, result: str, channel: str) -> None:
    """Notify the user --- Slack, webhook, email, etc."""
    print(f"[{channel}] Run {run_id} completed: {result[:80]}")


# ── Workflow (the orchestration) ─────────────────────────────

@workflow.defn
class AgentRunWorkflow:
    """A single agent run, modelled as a Temporal workflow.

    Temporal automatically checkpoints after each activity.
    On crash, the workflow replays from event history --- no
    manual checkpoint code required.
    """

    @workflow.run
    async def run(self, run_id: str, prompt: str, max_steps: int = 100) -> str:
        messages: list[dict] = [{"role": "user", "content": prompt}]
        total_tokens = 0

        retry = RetryPolicy(
            initial_interval=timedelta(seconds=2),
            maximum_interval=timedelta(seconds=30),
            maximum_attempts=5,
        )

        for step in range(max_steps):
            # 1. Call the LLM (persisted as an activity result).
            response = await workflow.execute_activity(
                call_llm,
                messages,
                start_to_close_timeout=timedelta(minutes=5),
                retry_policy=retry,
            )
            messages.append(response)
            total_tokens += response["usage"]["input_tokens"]
            total_tokens += response["usage"]["output_tokens"]

            # 2. Check for tool use.
            if response["stop_reason"] == "tool_use":
                tool_result = await workflow.execute_activity(
                    execute_tool,
                    args=["file_read", {}],
                    start_to_close_timeout=timedelta(minutes=10),
                    retry_policy=retry,
                )
                messages.append({"role": "user", "content": str(tool_result)})
                continue

            # 3. Agent signalled completion.
            if response["stop_reason"] == "end_turn":
                await workflow.execute_activity(
                    post_result,
                    args=[run_id, response["content"], "slack"],
                    start_to_close_timeout=timedelta(seconds=30),
                )
                return response["content"]

        return "Max steps reached"

Why Temporal fits agents:

  • Automatic checkpointing. Every activity result is persisted in Temporal’s event history. A worker crash triggers replay---the LLM is not called again; the recorded response is reused.
  • Timeouts at every level. You set start_to_close_timeout per activity, run_timeout per workflow, and Temporal enforces them.
  • Retries with backoff. Transient API errors (429, 503) are retried automatically.
  • Human-in-the-loop via signals. Temporal workflows can wait for an external signal---perfect for a “pause and ask the user” step.
  • Visibility. The Temporal Web UI shows the full event history for every run, making debugging trivial.

The cost is operational complexity: you need a Temporal cluster (or Temporal Cloud), workers, and a queue. For teams already running Temporal for backend orchestration, this is zero marginal cost. For teams that are not, the simpler checkpoint store from 139.4 may be the right starting point.


139.6 Context window management for long runs

A short interactive agent might use 10k tokens. A background coding agent that reads 50 files, runs tests, and iterates can easily burn through 200k tokens of context---and that is before you count the tokens generated.

Naively appending every message to the conversation will eventually exceed even the largest context window. Three strategies manage this.

Summarisation checkpoints. After every N steps (or when context usage exceeds a threshold), ask the model to summarise the conversation so far. Replace the full history with the summary plus a small window of the most recent messages.

# context_management.py — Summarise-and-compact for long agent runs.
from __future__ import annotations
from typing import Any

MAX_CONTEXT_TOKENS = 180_000   # leave headroom below the hard limit
SUMMARY_THRESHOLD  = 150_000   # trigger compaction here


def estimate_tokens(messages: list[dict[str, Any]]) -> int:
    """Rough estimator — 1 token ≈ 4 chars.  Use tiktoken for precision."""
    return sum(len(str(m.get("content", ""))) // 4 for m in messages)


async def maybe_compact(
    messages: list[dict[str, Any]],
    llm_call,
) -> list[dict[str, Any]]:
    """If context is getting large, summarise older messages."""
    current_tokens = estimate_tokens(messages)
    if current_tokens < SUMMARY_THRESHOLD:
        return messages

    # Keep the system prompt and the last 10 messages verbatim.
    system = [m for m in messages if m["role"] == "system"]
    recent = messages[-10:]
    to_summarise = messages[len(system):-10]

    summary_prompt = [
        {"role": "system", "content": "Summarise the following agent conversation history. "
         "Preserve all key decisions, tool outputs, file paths, and error messages. "
         "Be concise but do not drop actionable details."},
        {"role": "user", "content": _format_for_summary(to_summarise)},
    ]
    summary_response = await llm_call(summary_prompt)

    compacted = (
        system
        + [{"role": "assistant", "content": f"[Summary of steps 1-{len(to_summarise)}]\n"
            + summary_response["content"]}]
        + recent
    )
    print(f"Compacted context: {current_tokens}{estimate_tokens(compacted)} tokens")
    return compacted


def _format_for_summary(messages: list[dict]) -> str:
    lines = []
    for i, m in enumerate(messages):
        role = m.get("role", "?")
        content = str(m.get("content", ""))[:2000]
        lines.append(f"[{i}] {role}: {content}")
    return "\n".join(lines)

Sliding window. Keep only the last K messages (plus the system prompt). This is the crudest approach but works surprisingly well for mechanical tasks where each step is nearly independent---like batch file processing.

Memory refresh. Store important facts (file paths, variable names, design decisions) in an external memory store (Chapter 132). Before each LLM call, retrieve the relevant memories and inject them into the system prompt. This lets you keep a very short conversation window while retaining long-term context.

In practice, production agents combine all three: a sliding window of recent messages, periodic summarisation of older history, and a key-value memory store for persistent facts.


139.7 Resource management: token budgets, cost caps, time limits, and the kill switch

Long-running agents can be expensive. A research agent that calls Claude 200 times at 100k input tokens each burns through 20 million tokens---hundreds of dollars in API costs. Without guardrails, a bug in the agent loop can turn this into a runaway spend.

Token budget. Track cumulative input + output tokens across the run. Before each LLM call, check whether the budget allows it.

# budget.py — Token and cost budgeting for agent runs.
from __future__ import annotations
from dataclasses import dataclass


@dataclass
class RunBudget:
    max_input_tokens: int = 2_000_000
    max_output_tokens: int = 500_000
    max_cost_usd: float = 50.0
    max_wall_seconds: float = 7200.0        # 2 hours
    max_steps: int = 300

    # Accumulators
    input_tokens_used: int = 0
    output_tokens_used: int = 0
    cost_usd: float = 0.0
    wall_seconds_elapsed: float = 0.0
    steps_completed: int = 0

    def record_usage(
        self,
        input_tokens: int,
        output_tokens: int,
        input_price_per_m: float = 3.0,
        output_price_per_m: float = 15.0,
    ) -> None:
        self.input_tokens_used += input_tokens
        self.output_tokens_used += output_tokens
        self.cost_usd += (input_tokens / 1_000_000) * input_price_per_m
        self.cost_usd += (output_tokens / 1_000_000) * output_price_per_m
        self.steps_completed += 1

    def check(self) -> str | None:
        """Return a reason string if any limit is exceeded, else None."""
        if self.input_tokens_used >= self.max_input_tokens:
            return f"Input token limit reached ({self.input_tokens_used:,})"
        if self.output_tokens_used >= self.max_output_tokens:
            return f"Output token limit reached ({self.output_tokens_used:,})"
        if self.cost_usd >= self.max_cost_usd:
            return f"Cost cap reached (${self.cost_usd:.2f})"
        if self.wall_seconds_elapsed >= self.max_wall_seconds:
            return f"Time limit reached ({self.wall_seconds_elapsed:.0f}s)"
        if self.steps_completed >= self.max_steps:
            return f"Step limit reached ({self.steps_completed})"
        return None

Cost caps are the most important guardrail. Set a dollar limit per run, per user, and per day. When the cap is hit, the agent must gracefully wind down: summarise what it has accomplished, save a checkpoint, and report to the user. Do not simply kill the process---that wastes all the work done so far.

Time limits serve a different purpose. A coding agent that has been running for six hours on a “rename this variable” task is almost certainly stuck in a loop. Time limits catch these pathological cases.

The kill switch. Every long-running agent must expose a cancellation endpoint (as we showed in 139.3). The kill switch should:

  1. Set a cancellation flag that the agent loop checks after every step.
  2. Allow a grace period (30—60 seconds) for the agent to checkpoint and clean up.
  3. Force-terminate if the grace period expires.

In daemon agents, the kill switch also needs an escalation path: if a monitoring agent starts taking harmful actions (modifying production data, sending emails), a human must be able to stop it immediately, not after the current step finishes.


139.8 Progress reporting: streaming to the user

A background agent that runs for an hour in silence is a black box. Good progress reporting turns it into a glass box.

Progress reporting architecture Agent Loop emit(event) after each step Event Log (append-only store) step, status, tokens, tool calls, artefacts write SSE / WebSocket real-time stream Webhook on completion / error fan out Dashboard / CLI progress bar, log tail poll (fallback)

The event log is the source of truth --- channels are projections

The architecture has three layers:

1. Event log. An append-only store (database table, Redis stream, or Kafka topic) that records every significant event during the run.

# events.py — Structured event logging for agent runs.
from __future__ import annotations

import time
import json
from dataclasses import dataclass, asdict, field
from enum import Enum
from typing import Any


class EventType(str, Enum):
    STEP_START = "step_start"
    STEP_END = "step_end"
    TOOL_CALL = "tool_call"
    TOOL_RESULT = "tool_result"
    CHECKPOINT = "checkpoint"
    ERROR = "error"
    BUDGET_WARNING = "budget_warning"
    SUMMARY = "summary"
    COMPLETED = "completed"


@dataclass
class AgentEvent:
    run_id: str
    event_type: EventType
    step: int
    timestamp: float = field(default_factory=time.time)
    data: dict[str, Any] = field(default_factory=dict)
    tokens_cumulative: int = 0


class EventLog:
    """Append-only event log.  Back with Redis Streams in production."""

    def __init__(self) -> None:
        self._events: list[AgentEvent] = []

    def emit(self, event: AgentEvent) -> None:
        self._events.append(event)
        # In production, also publish to SSE / WebSocket channel here.
        print(json.dumps(asdict(event), default=str))

    def since(self, run_id: str, after_step: int) -> list[AgentEvent]:
        return [
            e for e in self._events
            if e.run_id == run_id and e.step > after_step
        ]

    def tail(self, run_id: str, n: int = 20) -> list[AgentEvent]:
        matching = [e for e in self._events if e.run_id == run_id]
        return matching[-n:]

2. Real-time channel. For users who keep a tab open, push events via Server-Sent Events (SSE) or WebSocket. SSE is simpler (unidirectional, works over HTTP/2, auto-reconnects) and is usually the right default.

3. Poll endpoint. The /runs/{run_id} endpoint from 139.3 doubles as a progress poll. Include a last_event_step query parameter so the client only fetches new events.

What to report:

  • Step number and total estimate (“Step 34 / ~80”).
  • Current action (“Reading src/auth/handler.py”, “Running pytest -x”).
  • Token usage and cost (“42k tokens, $0.83 so far”).
  • Artefacts produced (“3 files modified, 1 test added”).
  • Errors and retries (“API 429 — retrying in 4s”).

139.9 Scheduling: cron-triggered, event-triggered, and monitoring agents

A daemon agent does not wait for a human to submit a task. It wakes up on a schedule or in response to an event.

Cron-triggered agents. The simplest pattern: a cron job (or Kubernetes CronJob, or Temporal schedule) fires the agent at regular intervals.

# scheduled_agent.py — Cron-triggered monitoring agent.
from __future__ import annotations

import json
from datetime import datetime, timezone

# In production, this is invoked by a scheduler (cron, K8s CronJob, Temporal).

def run_daily_review_agent() -> dict:
    """
    Example: nightly code-quality agent.
    Runs every day at 02:00 UTC, reviews the day's merged PRs,
    and posts a summary to Slack.
    """
    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")

    prompt = f"""You are a code-quality reviewer.
    
Today is {today}.  Review all pull requests merged in the last 24 hours.
For each PR:
1. Check for missing tests.
2. Check for security anti-patterns (hardcoded secrets, SQL injection, etc.).
3. Check for performance regressions (N+1 queries, unbounded loops).

Output a JSON report with:
- "summary": one-paragraph overview
- "issues": list of {{"pr_number", "severity", "description", "suggestion"}}
- "stats": {{"prs_reviewed", "issues_found", "critical_count"}}
"""
    # Submit to the background agent service (139.3).
    from background_agent_api import submit_run, SubmitRequest
    import asyncio

    result = asyncio.run(submit_run(SubmitRequest(
        prompt=prompt,
        max_tokens=200_000,
        timeout_seconds=1800,
    )))
    return result


# ── Crontab entry (for reference) ───────────────────────────
# 0 2 * * * /usr/bin/python3 /app/scheduled_agent.py

Event-triggered agents. Instead of a timer, the agent fires in response to an external event---a GitHub webhook (new PR opened), a PagerDuty alert, an S3 object creation, or a message on a Kafka topic.

The pattern is: event source → message queue → agent worker.

# event_trigger.py — Skeleton for event-triggered agent dispatch.
from __future__ import annotations

from fastapi import FastAPI, Request
import asyncio

app = FastAPI()

TRIGGER_MAP = {
    "pull_request.opened": "review_pr",
    "issues.opened": "triage_issue",
    "alert.firing": "investigate_alert",
}


@app.post("/webhooks/github")
async def github_webhook(request: Request) -> dict:
    payload = await request.json()
    event_type = request.headers.get("X-GitHub-Event", "")
    action = payload.get("action", "")
    key = f"{event_type}.{action}"

    agent_type = TRIGGER_MAP.get(key)
    if not agent_type:
        return {"status": "ignored", "event": key}

    # Enqueue an agent run.
    run_id = await enqueue_agent_run(
        agent_type=agent_type,
        context=payload,
    )
    return {"status": "queued", "run_id": run_id}


async def enqueue_agent_run(agent_type: str, context: dict) -> str:
    """Push to your job queue — Redis, SQS, Temporal, etc."""
    # placeholder
    return "run-abc-123"

Monitoring agents are the canonical daemon use case. A monitoring agent runs on a loop:

  1. Check a data source (logs, metrics, alerts, database).
  2. If a condition is met, investigate (query more data, correlate events).
  3. If the investigation warrants it, act (page on-call, open a ticket, run a remediation script).
  4. Sleep until the next check interval.

The key design challenge is preventing runaway costs when the monitoring agent encounters a storm of events. Use a rate limiter on agent invocations and a deduplication window to avoid investigating the same alert twice within N minutes.


139.10 Concurrency: multiple runs per user, queues, priority, and fair scheduling

A production agent platform serves many users, each potentially running multiple agents simultaneously. You need concurrency control.

Per-user concurrency limits. Prevent a single user from monopolising the worker pool. A typical limit: 3—5 concurrent runs per user, with additional runs queued.

Priority queues. Not all runs are equal. An agent investigating a production outage should jump ahead of a nightly code-review agent.

# queue.py — Priority queue with per-user concurrency limits.
from __future__ import annotations

import heapq
import time
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Any


@dataclass(order=True)
class QueuedRun:
    priority: int                         # lower = higher priority
    submitted_at: float = field(compare=True)
    run_id: str = field(compare=False)
    user_id: str = field(compare=False)
    payload: Any = field(compare=False, repr=False)


class AgentQueue:
    """Priority queue with per-user concurrency control."""

    def __init__(
        self,
        max_concurrent_per_user: int = 3,
        max_concurrent_total: int = 20,
    ) -> None:
        self._heap: list[QueuedRun] = []
        self._running: dict[str, set[str]] = defaultdict(set)  # user_id → {run_ids}
        self._max_per_user = max_concurrent_per_user
        self._max_total = max_concurrent_total
        self._total_running = 0

    def enqueue(
        self,
        run_id: str,
        user_id: str,
        payload: Any,
        priority: int = 10,
    ) -> int:
        """Add a run to the queue.  Returns current queue position."""
        item = QueuedRun(
            priority=priority,
            submitted_at=time.time(),
            run_id=run_id,
            user_id=user_id,
            payload=payload,
        )
        heapq.heappush(self._heap, item)
        return len(self._heap)

    def try_dequeue(self) -> QueuedRun | None:
        """Pop the highest-priority run that satisfies concurrency limits."""
        if self._total_running >= self._max_total:
            return None

        # Scan the heap for the first run whose user is under the limit.
        skipped: list[QueuedRun] = []
        result: QueuedRun | None = None

        while self._heap:
            candidate = heapq.heappop(self._heap)
            if len(self._running[candidate.user_id]) < self._max_per_user:
                result = candidate
                break
            skipped.append(candidate)

        # Push skipped items back.
        for item in skipped:
            heapq.heappush(self._heap, item)

        if result:
            self._running[result.user_id].add(result.run_id)
            self._total_running += 1

        return result

    def mark_complete(self, run_id: str, user_id: str) -> None:
        self._running[user_id].discard(run_id)
        self._total_running -= 1

    @property
    def pending(self) -> int:
        return len(self._heap)

Fair scheduling. Without fairness, a user who submits 100 low-priority runs can still starve other users. Fair scheduling assigns each user a weight and cycles through users in proportion to their weight. This is the same algorithm used by Linux’s CFS (Completely Fair Scheduler) and by Kubernetes resource quotas.

In practice, the simplest fair approach is: round-robin dequeue across users, then sort by priority within each user’s sub-queue.

Queue persistence. If the queue lives only in memory, a server restart loses all pending runs. Use Redis sorted sets (score = priority + timestamp) or a database-backed queue. Temporal handles this naturally---each workflow is a durable queue entry.


139.11 Production examples

Coding agents (background). Claude Code’s background mode and OpenAI’s Codex CLI both follow the pattern from 139.3. The user submits a task (“add pagination to the /users endpoint”), the agent clones the repo into a sandbox, runs an edit-test loop, and produces a Git branch. Key infrastructure: container-based isolation, Git as the checkpoint mechanism (every commit is a recoverable state), token budgets (Codex defaults to a per-task cap), and PR-based output for human review.

Research agents (autonomous). A research agent tasked with “write a literature review on protein folding methods since 2022” might:

  1. Search for relevant papers (tool: Semantic Scholar API).
  2. Download and read abstracts (tool: PDF reader).
  3. Cluster papers by method (internal reasoning).
  4. Draft sections of the review (generation).
  5. Self-critique and revise (reflection loop).

This can take 100+ LLM calls over 1—2 hours. Key infrastructure: summarisation checkpoints (the context window fills fast when reading papers), cost caps ($20—50 per run), and artefact tracking (intermediate drafts stored as checkpoint artefacts).

Monitoring agents (daemon). A production monitoring daemon:

  • Runs on a 5-minute loop.
  • Queries Prometheus for anomalous metrics.
  • If anomalies are detected, pulls recent logs from Elasticsearch.
  • Correlates metrics + logs to form a hypothesis.
  • Posts the hypothesis to Slack (or pages on-call if severity is critical).

Key infrastructure: heartbeat monitoring (if the daemon itself dies, an alert fires), rate limiting (no more than 10 investigations per hour), deduplication (same root cause does not get investigated twice), and a kill switch accessible to the on-call engineer.

Multi-agent pipelines (composition). Complex systems compose the above. A “ship this feature” meta-agent might:

  1. Spawn a coding agent to implement the feature.
  2. Spawn a review agent to review the PR.
  3. On approval, trigger a deployment agent to merge and deploy.
  4. After deployment, activate a monitoring agent to watch error rates.

Each sub-agent is a separate run with its own budget, checkpoint store, and event log. The meta-agent orchestrates them via the same submit-and-poll interface, making agent composition recursive.


139.12 Mental model: eight takeaway points

  1. The spectrum is real. Do not build daemon infrastructure for an interactive agent. Place your agent on the spectrum first, then build only the infrastructure that level demands.

  2. Checkpoints are non-negotiable for any run over five minutes. The question is not whether to checkpoint but how often and what to include.

  3. The unit of output should be reviewable. For coding agents it is a Git commit. For research agents it is a document. For monitoring agents it is a structured report. Design the output format before the agent loop.

  4. Budgets are guardrails, not suggestions. Set hard limits on tokens, cost, time, and steps. Enforce them in the agent loop, not in a dashboard you check after the fact.

  5. The event log is the source of truth. SSE, WebSocket, and polling are projections of the event log. If you build the log right, you can add any delivery channel later.

  6. Context window management is the silent killer. Long runs that naively accumulate messages will hit the context limit and either fail or degrade. Summarisation checkpoints, sliding windows, and external memory are not optional.

  7. Temporal (or a similar durable execution engine) eliminates an entire class of infrastructure. If your organisation already runs Temporal, use it. If not, start with the simple checkpoint store and graduate when complexity demands it.

  8. Concurrency is a platform problem. If you are running more than one agent at a time, you need queues, per-user limits, and fair scheduling. Bolt these on early---retrofitting concurrency control is painful.


Read it yourself

  • Temporal documentation on durable execution and workflow replay: docs.temporal.io.
  • Anthropic’s documentation on Claude Code background tasks and the Messages API streaming protocol.
  • OpenAI’s Codex CLI architecture post, detailing sandbox isolation and token budgeting.
  • The A2A protocol specification (Chapter 138) for agent-to-agent coordination in long-running multi-agent systems.
  • Martin Kleppmann, Designing Data-Intensive Applications --- Chapter 11 on stream processing is directly applicable to event logs and progress reporting.
  • The Linux CFS (Completely Fair Scheduler) design document --- the same fairness principles apply to agent queue scheduling.

Practice

  1. Checkpoint round-trip. Using the CheckpointStore from 139.4, write a test that (a) runs an agent for 10 steps, (b) kills the process, (c) resumes from the latest checkpoint, and (d) verifies that no work is repeated.

  2. Context compaction. Simulate a 200-message conversation that exceeds 150k tokens. Implement maybe_compact and verify that the compacted history fits under the token limit while preserving key facts from the original history.

  3. Budget enforcement. Extend the RunBudget class to support a warn_at threshold (e.g., warn at 80% of budget). Integrate it into an agent loop that emits a budget_warning event when the threshold is crossed and gracefully terminates at 100%.

  4. SSE progress stream. Build a FastAPI endpoint that accepts an SSE connection and streams AgentEvent objects from the event log in real time. Write a client (using httpx or aiohttp) that prints events as they arrive.

  5. Priority queue fairness. Write a simulation with 5 users, each submitting 20 runs with random priorities. Measure the average wait time per user under (a) a simple priority queue and (b) the fair-scheduling approach from 139.10. Plot the difference.

  6. Temporal workflow. Deploy a local Temporal server (via Docker) and implement the AgentRunWorkflow from 139.5. Kill a worker mid-run and verify that the workflow resumes without re-calling the LLM for already-completed steps.

  7. Stretch: Build a complete daemon monitoring agent that (a) runs on a 60-second loop, (b) checks a mock metrics endpoint for anomalies, (c) investigates anomalies by calling an LLM with the metrics context, (d) posts findings to a webhook, (e) respects a per-hour invocation rate limit, (f) deduplicates repeated alerts within a 15-minute window, and (g) exposes a kill-switch endpoint that gracefully shuts it down. Include checkpointing so the daemon can resume its deduplication state after a restart.