Part XI · Building Agents and Agent Infrastructure
Chapter 138 ~28 min read

-- Agent-to-agent communication: A2A, agent cards, and the interoperability problem

"The hardest part of building a multi-agent system is not making one agent smart---it is making two agents cooperate without a human in the loop translating every request"

We spent Chapter 69 wiring tools to a single agent through Model Context Protocol (MCP). Chapter 68 arranged multiple agents into supervisor, pipeline, and debate topologies. But a question lingered: how do agents that belong to different teams, different organisations, or different vendors actually talk to each other?

MCP solves the agent-to-tool problem---connecting an LLM to structured capabilities. The agent-to-agent problem is fundamentally different. When the remote party is itself an opaque reasoning system that may negotiate, refuse, stream partial results, or ask clarifying questions, the protocol must support a richer interaction model than “call this function, get JSON back.”

Google’s Agent2Agent (A2A) protocol, published in April 2025, is the first serious open attempt at standardising this layer. This chapter dissects A2A end-to-end: discovery via Agent Cards, task lifecycle, streaming, push notifications, authentication, and the enterprise patterns that emerge when you connect dozens of agents into a mesh. We will also survey the broader protocol landscape and develop a decision framework for when A2A is---and is not---worth the integration cost.


138.1 MCP vs A2A: complementary, not competing

A common misconception is that A2A replaces MCP. It does not. The two protocols occupy adjacent but distinct layers:

ConcernMCPA2A
Primary relationshipAgent <-> Tool/resourceAgent <-> Agent
Remote party is…Deterministic functionOpaque reasoning system
Interaction styleRequest-response (single turn)Multi-turn, negotiation, streaming
DiscoveryServer capabilities listAgent Card (JSON)
Output modelStructured tool resultMessages + Artifacts
Protocol surfaceJSON-RPC over stdio/HTTPJSON-RPC over HTTPS
StateStateless per callStateful task with lifecycle

Think of it this way: MCP is the USB port that lets an agent plug into databases, APIs, and file systems. A2A is the lingua franca that lets two agents collaborate on a shared task without either side knowing the other’s internal architecture.

In practice, a well-built agent will use both---MCP to access its own tools, and A2A to delegate subtasks to peer agents.

Agent A LLM Tools MCP DB / API Agent B LLM Tools MCP Files / SaaS A2A

MCP connects agents to tools --- A2A connects agents to agents

Figure 138.1 --- MCP operates vertically (agent to its tools); A2A operates horizontally (agent to peer agent).

138.2 Agent Cards: self-describing agents

Before two agents can collaborate, the client agent must discover what the remote agent can do. A2A solves this with the Agent Card---a JSON document served at a well-known URL (by convention /.well-known/agent.json) that describes the agent’s identity, capabilities, endpoint, and authentication requirements.

// GET https://travel.example.com/.well-known/agent.json
{
  "name": "TravelAgent",
  "description": "Books flights, hotels, and rental cars. Handles multi-city itineraries and loyalty programme integration.",
  "url": "https://travel.example.com/a2a",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true
  },
  "authentication": {
    "schemes": ["OAuth2"],
    "credentials": "Bearer token from https://auth.example.com/token"
  },
  "defaultInputModes": ["text/plain", "application/json"],
  "defaultOutputModes": ["text/plain", "application/json", "image/png"],
  "skills": [
    {
      "id": "flight-booking",
      "name": "Flight Booking",
      "description": "Search and book flights across 400+ airlines.",
      "tags": ["travel", "flights", "booking"],
      "examples": [
        "Book a round-trip from SFO to NRT on March 15, returning March 22",
        "Find the cheapest business class option LAX to LHR next Friday"
      ]
    },
    {
      "id": "hotel-search",
      "name": "Hotel Search",
      "description": "Search hotels by location, date, price range, and amenities.",
      "tags": ["travel", "hotels", "accommodation"],
      "examples": [
        "Find pet-friendly hotels in downtown Tokyo under $200/night"
      ]
    }
  ]
}

Key design decisions in the Agent Card format:

  1. Skills, not tools. Unlike MCP’s tool manifests which describe function signatures with typed parameters, A2A skills are described in natural language. This is intentional---the remote agent is a reasoning system that interprets intent, not a function that requires exact parameter shapes.

  2. Input/output modalities. An agent can declare that it accepts images, audio, or structured JSON---and what it produces. A charting agent might accept application/json data and return image/svg+xml.

  3. Authentication up front. The card declares which auth schemes the agent requires, so the client agent (or its orchestrator) can obtain credentials before the first message.

  4. Capability negotiation. The capabilities object tells the client whether it can use streaming (SSE), push notifications (webhooks), or request state history. Clients degrade gracefully when a capability is absent.

Discovery patterns

In a small deployment, agents can be configured with each other’s card URLs statically. At enterprise scale, you need a registry:

# agent_registry.py -- lightweight A2A agent registry
from dataclasses import dataclass, field
from typing import Optional
import httpx
import asyncio


@dataclass
class RegisteredAgent:
    name: str
    card_url: str
    card: dict = field(default_factory=dict)
    tags: set[str] = field(default_factory=set)
    last_healthy: Optional[float] = None


class AgentRegistry:
    """
    Central registry that periodically fetches Agent Cards
    and allows skill-based lookup.
    """

    def __init__(self):
        self._agents: dict[str, RegisteredAgent] = {}
        self._client = httpx.AsyncClient(timeout=10)

    async def register(self, name: str, card_url: str) -> RegisteredAgent:
        card = await self._fetch_card(card_url)
        tags = set()
        for skill in card.get("skills", []):
            tags.update(skill.get("tags", []))
        agent = RegisteredAgent(
            name=name, card_url=card_url, card=card, tags=tags
        )
        self._agents[name] = agent
        return agent

    async def _fetch_card(self, url: str) -> dict:
        resp = await self._client.get(url)
        resp.raise_for_status()
        return resp.json()

    def find_by_skill(self, query_tags: set[str]) -> list[RegisteredAgent]:
        """Return agents whose skills overlap with the query tags."""
        results = []
        for agent in self._agents.values():
            overlap = agent.tags & query_tags
            if overlap:
                results.append((len(overlap), agent))
        results.sort(key=lambda x: x[0], reverse=True)
        return [agent for _, agent in results]

    async def health_check_all(self):
        """Refresh cards and mark unreachable agents."""
        import time
        tasks = []
        for agent in self._agents.values():
            tasks.append(self._refresh(agent))
        await asyncio.gather(*tasks, return_exceptions=True)

    async def _refresh(self, agent: RegisteredAgent):
        import time
        try:
            agent.card = await self._fetch_card(agent.card_url)
            agent.last_healthy = time.time()
        except Exception:
            pass  # agent.last_healthy stays stale

A production registry would add semantic search over skill descriptions (embed the description, store in a vector index, query at delegation time), TTL-based caching, and circuit-breaking for flaky agents.


138.3 Task lifecycle: the heart of A2A

Every interaction between a client agent and a remote agent is modelled as a Task. Tasks have an explicit lifecycle with well-defined state transitions:

A2A Task State Machine submitted working input-needed completed failed canceled done need info reply error cancel Key transitions submitted -> working working -> input-needed input-needed -> working working -> completed|failed any -> canceled
Figure 138.2 --- A2A task states. The input-needed state enables multi-turn negotiation without the client polling blindly.

The six states:

StateMeaning
submittedClient has sent the task; remote agent has acknowledged receipt.
workingRemote agent is actively processing. May emit streaming events.
input-neededRemote agent requires clarification or additional data from the client.
completedTask finished successfully. Artifacts are available.
failedTask terminated due to an error. An error message is attached.
canceledClient (or server) explicitly canceled the task.

The input-needed state is what separates A2A from a simple RPC. It models the real-world pattern where a remote agent says “I found three flights---which one do you prefer?” and the client agent must respond before work continues.


138.4 Messages, Parts, and Artifacts

A2A communication is structured around three primitives:

Messages flow between client and remote agent within a task. Each message has a role (user for the client agent, agent for the remote agent) and contains one or more Parts.

Parts are typed content units:

  • TextPart --- plain text or markdown
  • FilePart --- binary data (inline base64 or a URI reference)
  • DataPart --- structured JSON (for passing parameters, form data, etc.)

Artifacts are the durable outputs of a completed task. They are distinct from messages because they represent deliverables, not conversation. A flight-booking agent’s artifact might be a JSON itinerary; a chart-generation agent’s artifact might be an SVG image.

# a2a_types.py -- core A2A data structures (simplified)
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import uuid
import time


class TaskState(str, Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_NEEDED = "input-needed"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELED = "canceled"


@dataclass
class TextPart:
    type: str = "text"
    text: str = ""


@dataclass
class DataPart:
    type: str = "data"
    data: dict[str, Any] = field(default_factory=dict)
    metadata: Optional[dict] = None


@dataclass
class FilePart:
    type: str = "file"
    mime_type: str = "application/octet-stream"
    uri: Optional[str] = None
    data: Optional[str] = None  # base64-encoded if inline


Part = TextPart | DataPart | FilePart


@dataclass
class Message:
    role: str  # "user" or "agent"
    parts: list[Part]
    metadata: Optional[dict] = None


@dataclass
class Artifact:
    name: str
    parts: list[Part]
    description: Optional[str] = None
    metadata: Optional[dict] = None


@dataclass
class Task:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    state: TaskState = TaskState.SUBMITTED
    messages: list[Message] = field(default_factory=list)
    artifacts: list[Artifact] = field(default_factory=list)
    metadata: Optional[dict] = None
    created_at: float = field(default_factory=time.time)
    updated_at: float = field(default_factory=time.time)

    def add_message(self, message: Message):
        self.messages.append(message)
        self.updated_at = time.time()

    def transition(self, new_state: TaskState):
        self.state = new_state
        self.updated_at = time.time()

138.5 The JSON-RPC wire protocol

A2A uses JSON-RPC 2.0 over HTTPS. The core methods are:

MethodDirectionPurpose
tasks/sendClient -> ServerSend a message, create or continue a task
tasks/getClient -> ServerPoll current task state
tasks/cancelClient -> ServerRequest cancellation
tasks/sendSubscribeClient -> ServerSend a message and open SSE stream
tasks/pushNotification/setClient -> ServerRegister a webhook for task updates
tasks/pushNotification/getClient -> ServerRetrieve current webhook config

A minimal request/response cycle:

// --- Client sends a task ---
// POST https://travel.example.com/a2a
{
  "jsonrpc": "2.0",
  "id": "req-001",
  "method": "tasks/send",
  "params": {
    "id": "task-7f3a",
    "message": {
      "role": "user",
      "parts": [
        {
          "type": "text",
          "text": "Book the cheapest direct flight SFO to NRT, departing March 15, returning March 22. Economy class."
        }
      ]
    }
  }
}

// --- Server responds (task is now working) ---
{
  "jsonrpc": "2.0",
  "id": "req-001",
  "result": {
    "id": "task-7f3a",
    "state": "working",
    "messages": [
      {
        "role": "agent",
        "parts": [
          {
            "type": "text",
            "text": "Searching flights from SFO to NRT. I'll have options shortly."
          }
        ]
      }
    ]
  }
}

When the remote agent needs input:

// --- Client polls with tasks/get ---
{
  "jsonrpc": "2.0",
  "id": "req-002",
  "method": "tasks/get",
  "params": { "id": "task-7f3a" }
}

// --- Server: agent needs clarification ---
{
  "jsonrpc": "2.0",
  "id": "req-002",
  "result": {
    "id": "task-7f3a",
    "state": "input-needed",
    "messages": [
      {
        "role": "agent",
        "parts": [
          {
            "type": "text",
            "text": "I found 3 direct flights:\n1. JAL 1 -- $980 -- departs 11:15\n2. ANA 7 -- $1,020 -- departs 16:30\n3. United 837 -- $890 -- departs 23:55\nWhich would you like?"
          },
          {
            "type": "data",
            "data": {
              "options": [
                {"id": "jal-1", "price": 980, "departure": "11:15"},
                {"id": "ana-7", "price": 1020, "departure": "16:30"},
                {"id": "ua-837", "price": 890, "departure": "23:55"}
              ]
            }
          }
        ]
      }
    ]
  }
}

The client agent can now reason over the structured data in the DataPart, apply its own policies (“never book red-eye flights” or “always choose cheapest”), and reply:

{
  "jsonrpc": "2.0",
  "id": "req-003",
  "method": "tasks/send",
  "params": {
    "id": "task-7f3a",
    "message": {
      "role": "user",
      "parts": [
        { "type": "text", "text": "Book option 1, JAL flight 1." },
        { "type": "data", "data": { "selected": "jal-1" } }
      ]
    }
  }
}

138.6 Streaming with Server-Sent Events

For long-running tasks---generating a report, executing a multi-step workflow---polling with tasks/get is wasteful. The tasks/sendSubscribe method opens an SSE (Server-Sent Events) stream that pushes TaskStatusUpdateEvent and TaskArtifactUpdateEvent objects as they occur.

# a2a_streaming_client.py
import httpx
import json
from typing import AsyncIterator


async def stream_task(
    server_url: str,
    task_id: str,
    message: dict,
    auth_token: str,
) -> AsyncIterator[dict]:
    """
    Send a message via tasks/sendSubscribe and yield
    streaming events as they arrive.
    """
    payload = {
        "jsonrpc": "2.0",
        "id": f"stream-{task_id}",
        "method": "tasks/sendSubscribe",
        "params": {
            "id": task_id,
            "message": message,
        },
    }

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {auth_token}",
        "Accept": "text/event-stream",
    }

    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream(
            "POST", server_url, json=payload, headers=headers
        ) as response:
            response.raise_for_status()
            buffer = ""
            async for chunk in response.aiter_text():
                buffer += chunk
                while "\n\n" in buffer:
                    event_text, buffer = buffer.split("\n\n", 1)
                    event = _parse_sse(event_text)
                    if event:
                        yield event


def _parse_sse(raw: str) -> dict | None:
    """Parse a single SSE event block."""
    data_lines = []
    for line in raw.strip().split("\n"):
        if line.startswith("data: "):
            data_lines.append(line[6:])
    if data_lines:
        return json.loads("".join(data_lines))
    return None


# Usage
async def delegate_report_generation():
    message = {
        "role": "user",
        "parts": [{"type": "text", "text": "Generate Q1 revenue report"}],
    }

    async for event in stream_task(
        server_url="https://analytics.internal/a2a",
        task_id="report-q1-2026",
        message=message,
        auth_token="eyJhbGciOi...",
    ):
        if "status" in event.get("result", {}):
            status = event["result"]["status"]
            print(f"State: {status['state']}")
            if status.get("message"):
                for part in status["message"].get("parts", []):
                    if part.get("type") == "text":
                        print(f"  > {part['text']}")

        if "artifact" in event.get("result", {}):
            artifact = event["result"]["artifact"]
            print(f"Artifact received: {artifact.get('name', 'unnamed')}")

Streaming events carry an optional final flag. When the client sees final: true on a TaskStatusUpdateEvent, the stream is complete and the connection can close.


138.7 Push notifications

SSE works well when the client can hold a connection open. In serverless or event-driven architectures, you may prefer push notifications---the remote agent calls a webhook when the task state changes.

// Register a webhook for task updates
{
  "jsonrpc": "2.0",
  "id": "notif-setup",
  "method": "tasks/pushNotification/set",
  "params": {
    "id": "task-7f3a",
    "pushNotificationConfig": {
      "url": "https://orchestrator.internal/hooks/a2a",
      "token": "hmac-verification-token-xyz",
      "authentication": {
        "schemes": ["Bearer"],
        "credentials": "callback-auth-token-abc"
      }
    }
  }
}

The remote agent will POST to the webhook URL whenever the task transitions state. The token field allows the client to verify the notification is authentic (the server includes it in a signature header).

This pattern is essential for fire-and-forget delegation: the orchestrator agent sends a task to five specialist agents, registers webhooks, and processes results as they arrive---no long-lived connections, no polling loops.


138.8 Trust and authentication between agents

Chapter 79 covered inter-service trust in traditional microservices. Agent-to-agent trust introduces additional challenges because agents make autonomous decisions about what to request and what to share.

Authentication schemes

A2A supports pluggable authentication declared in the Agent Card. Common schemes in production:

SchemeUse caseNotes
OAuth 2.0 + OIDCCross-org agentsStandard token exchange; supports scopes for fine-grained access
Mutual TLS (mTLS)Internal agent meshCertificate-based identity; no tokens to manage
API keysDev/testing, simple integrationsRotate frequently; never embed in agent prompts
JWT with claimsDelegation chainsClient agent passes its identity + delegated permissions

The delegation chain problem

When Agent A delegates to Agent B, which delegates to Agent C, the question arises: what permissions does Agent C have?

A naive approach gives Agent C the same token as Agent A. This violates the principle of least privilege and creates a confused deputy risk---Agent C can now access anything Agent A can.

The robust pattern uses scoped delegation tokens:

# delegation_token.py -- scoped token minting for A2A chains
import jwt
import time
from typing import Optional


def mint_delegation_token(
    issuer_agent: str,
    target_agent: str,
    task_id: str,
    allowed_skills: list[str],
    ttl_seconds: int = 300,
    parent_token: Optional[str] = None,
    signing_key: str = "",
) -> str:
    """
    Mint a short-lived, narrowly-scoped JWT for agent-to-agent delegation.
    
    The token encodes:
    - Who issued it (the delegating agent)
    - Who it's for (the target agent)
    - Which task it's scoped to
    - Which skills the target may invoke
    - The delegation chain (if this is a re-delegation)
    """
    now = time.time()

    # Build delegation chain from parent token
    chain = [issuer_agent]
    if parent_token:
        try:
            parent_claims = jwt.decode(
                parent_token, signing_key, algorithms=["HS256"]
            )
            chain = parent_claims.get("delegation_chain", []) + [issuer_agent]
        except jwt.InvalidTokenError:
            raise ValueError("Invalid parent delegation token")

    claims = {
        "iss": issuer_agent,
        "aud": target_agent,
        "sub": f"task:{task_id}",
        "iat": now,
        "exp": now + ttl_seconds,
        "skills": allowed_skills,
        "delegation_chain": chain,
        "max_delegation_depth": 3,  # prevent unbounded re-delegation
    }

    # Enforce depth limit
    if len(chain) > claims["max_delegation_depth"]:
        raise ValueError(
            f"Delegation chain depth {len(chain)} exceeds maximum "
            f"{claims['max_delegation_depth']}"
        )

    return jwt.encode(claims, signing_key, algorithm="HS256")

Input/output guardrails

Authentication tells you who is calling. But agents also need guardrails on what they share. A remote agent should never leak sensitive data just because the client agent asked nicely. Implement output filtering at the A2A boundary:

# a2a_guardrails.py
import re
from dataclasses import dataclass

PII_PATTERNS = [
    (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "SSN"),
    (re.compile(r"\b\d{16}\b"), "credit card"),
    (re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"), "email"),
]


@dataclass
class GuardrailResult:
    passed: bool
    violations: list[str]
    sanitized_text: str


def check_outbound_message(text: str, allow_email: bool = False) -> GuardrailResult:
    """
    Scan outbound A2A messages for PII before sending.
    In production, combine with an LLM-based classifier.
    """
    violations = []
    sanitized = text

    for pattern, label in PII_PATTERNS:
        if label == "email" and allow_email:
            continue
        matches = pattern.findall(sanitized)
        if matches:
            violations.append(f"Found {len(matches)} potential {label}(s)")
            sanitized = pattern.sub(f"[REDACTED-{label.upper()}]", sanitized)

    return GuardrailResult(
        passed=len(violations) == 0,
        violations=violations,
        sanitized_text=sanitized,
    )

138.9 Building an A2A server

Let us build a minimal but complete A2A server that implements the core protocol. We will use FastAPI, but the pattern translates to any HTTP framework.

# a2a_server.py -- minimal A2A-compliant server
from __future__ import annotations
import asyncio
import json
import uuid
from typing import Any, Optional
from dataclasses import dataclass, field, asdict

from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse


app = FastAPI()

# --- In-memory task store (use Redis/Postgres in production) ---
tasks: dict[str, dict] = {}


# --- Agent Card ---
AGENT_CARD = {
    "name": "SummaryAgent",
    "description": "Summarises documents and URLs into concise bullet points.",
    "url": "https://summary.internal/a2a",
    "version": "1.0.0",
    "capabilities": {
        "streaming": True,
        "pushNotifications": False,
        "stateTransitionHistory": False,
    },
    "defaultInputModes": ["text/plain"],
    "defaultOutputModes": ["text/plain"],
    "skills": [
        {
            "id": "summarize",
            "name": "Document Summarization",
            "description": "Produce a concise summary of the provided text.",
            "tags": ["summarization", "text", "nlp"],
            "examples": ["Summarize this quarterly earnings report"],
        }
    ],
}


@app.get("/.well-known/agent.json")
async def agent_card():
    return AGENT_CARD


@app.post("/a2a")
async def handle_jsonrpc(request: Request):
    body = await request.json()
    method = body.get("method")
    params = body.get("params", {})
    req_id = body.get("id")

    if method == "tasks/send":
        return _handle_send(req_id, params)
    elif method == "tasks/get":
        return _handle_get(req_id, params)
    elif method == "tasks/cancel":
        return _handle_cancel(req_id, params)
    elif method == "tasks/sendSubscribe":
        return await _handle_send_subscribe(req_id, params)
    else:
        return _error_response(req_id, -32601, f"Unknown method: {method}")


def _handle_send(req_id: str, params: dict) -> dict:
    task_id = params.get("id", str(uuid.uuid4()))
    message = params.get("message", {})

    if task_id not in tasks:
        tasks[task_id] = {
            "id": task_id,
            "state": "submitted",
            "messages": [],
            "artifacts": [],
        }

    task = tasks[task_id]
    task["messages"].append(message)
    task["state"] = "working"

    # Simulate agent processing (in production, dispatch to LLM pipeline)
    input_text = _extract_text(message)
    summary = f"Summary: {input_text[:100]}..."  # placeholder
    
    agent_message = {
        "role": "agent",
        "parts": [{"type": "text", "text": summary}],
    }
    task["messages"].append(agent_message)
    task["state"] = "completed"
    task["artifacts"] = [
        {
            "name": "summary",
            "parts": [{"type": "text", "text": summary}],
        }
    ]

    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "result": task,
    }


def _handle_get(req_id: str, params: dict) -> dict:
    task_id = params.get("id")
    task = tasks.get(task_id)
    if not task:
        return _error_response(req_id, -32001, f"Task {task_id} not found")
    return {"jsonrpc": "2.0", "id": req_id, "result": task}


def _handle_cancel(req_id: str, params: dict) -> dict:
    task_id = params.get("id")
    task = tasks.get(task_id)
    if not task:
        return _error_response(req_id, -32001, f"Task {task_id} not found")
    task["state"] = "canceled"
    return {"jsonrpc": "2.0", "id": req_id, "result": task}


async def _handle_send_subscribe(req_id: str, params: dict):
    """Stream task updates via SSE."""
    task_id = params.get("id", str(uuid.uuid4()))
    message = params.get("message", {})

    if task_id not in tasks:
        tasks[task_id] = {
            "id": task_id,
            "state": "submitted",
            "messages": [],
            "artifacts": [],
        }

    task = tasks[task_id]
    task["messages"].append(message)

    async def event_stream():
        # Emit working status
        task["state"] = "working"
        yield _sse_event({
            "jsonrpc": "2.0",
            "id": req_id,
            "result": {
                "id": task_id,
                "status": {"state": "working", "message": {
                    "role": "agent",
                    "parts": [{"type": "text", "text": "Processing..."}],
                }},
                "final": False,
            },
        })

        await asyncio.sleep(0.5)  # simulate work

        # Emit artifact
        input_text = _extract_text(message)
        summary = f"Summary: {input_text[:200]}..."
        yield _sse_event({
            "jsonrpc": "2.0",
            "id": req_id,
            "result": {
                "id": task_id,
                "artifact": {
                    "name": "summary",
                    "parts": [{"type": "text", "text": summary}],
                },
            },
        })

        # Emit completed (final)
        task["state"] = "completed"
        yield _sse_event({
            "jsonrpc": "2.0",
            "id": req_id,
            "result": {
                "id": task_id,
                "status": {"state": "completed"},
                "final": True,
            },
        })

    return StreamingResponse(
        event_stream(), media_type="text/event-stream"
    )


def _sse_event(data: dict) -> str:
    return f"data: {json.dumps(data)}\n\n"


def _extract_text(message: dict) -> str:
    parts = message.get("parts", [])
    return " ".join(p.get("text", "") for p in parts if p.get("type") == "text")


def _error_response(req_id: str, code: int, message: str) -> dict:
    return {
        "jsonrpc": "2.0",
        "id": req_id,
        "error": {"code": code, "message": message},
    }

138.10 Enterprise agent mesh patterns

When you move beyond two agents talking to each other and into a network of dozens of specialised agents, new architectural patterns emerge.

Pattern 1: The orchestrator hub

A single orchestrator agent receives user requests, consults the agent registry, and delegates subtasks to specialist agents. This is the simplest topology and works well when you want centralised control over task decomposition and result assembly.

User -> Orchestrator -> [TravelAgent, ExpenseAgent, CalendarAgent]
                     <- assembled itinerary + expense report + calendar events

Advantages: single point of policy enforcement, simple auth model (orchestrator holds all tokens). Disadvantages: orchestrator is a bottleneck, single point of failure, must understand every agent’s capabilities.

Pattern 2: Peer-to-peer delegation

Agents discover and delegate to each other directly, with no central orchestrator. The TravelAgent might delegate to a PaymentAgent, which delegates to a FraudDetectionAgent.

Advantages: no bottleneck, agents evolve independently. Disadvantages: harder to trace, harder to enforce global policies, delegation chains can grow unbounded (hence the depth limit in our token minting code).

Pattern 3: The agent gateway

Inspired by API gateways (Chapter 104), an agent gateway sits at the boundary of your agent mesh:

# agent_gateway.py -- A2A gateway with rate limiting and routing
from dataclasses import dataclass
from typing import Optional
import time
import httpx


@dataclass
class RouteRule:
    skill_tags: set[str]
    target_url: str
    rate_limit_rpm: int = 60
    requires_approval: bool = False


class AgentGateway:
    """
    Routes A2A requests to the appropriate backend agent,
    enforcing rate limits, auth, and approval policies.
    """

    def __init__(self):
        self._routes: list[RouteRule] = []
        self._request_counts: dict[str, list[float]] = {}
        self._client = httpx.AsyncClient(timeout=30)

    def add_route(self, rule: RouteRule):
        self._routes.append(rule)

    async def route_task(
        self,
        task_request: dict,
        caller_identity: str,
        skill_hint: Optional[str] = None,
    ) -> dict:
        # Find matching route
        route = self._match_route(skill_hint)
        if not route:
            return {"error": "No agent available for this skill"}

        # Rate limiting
        if not self._check_rate_limit(caller_identity, route):
            return {"error": "Rate limit exceeded", "retry_after_seconds": 60}

        # Approval gate (for high-stakes actions)
        if route.requires_approval:
            return {
                "status": "pending_approval",
                "message": "This action requires human approval before proceeding",
            }

        # Forward to backend agent
        response = await self._client.post(
            route.target_url,
            json=task_request,
            headers={"Content-Type": "application/json"},
        )
        return response.json()

    def _match_route(self, skill_hint: Optional[str]) -> Optional[RouteRule]:
        if not skill_hint:
            return self._routes[0] if self._routes else None
        for route in self._routes:
            if skill_hint in route.skill_tags:
                return route
        return None

    def _check_rate_limit(self, caller: str, route: RouteRule) -> bool:
        now = time.time()
        window = 60.0
        key = f"{caller}:{route.target_url}"

        if key not in self._request_counts:
            self._request_counts[key] = []

        # Prune old entries
        self._request_counts[key] = [
            t for t in self._request_counts[key] if now - t < window
        ]

        if len(self._request_counts[key]) >= route.rate_limit_rpm:
            return False

        self._request_counts[key].append(now)
        return True

The gateway pattern gives you a single place to add observability, rate limiting, approval workflows, and audit logging---all without modifying the agents themselves.

Pattern 4: Hierarchical delegation with budget

In enterprise settings, you often want to limit the total cost or time an agent chain can consume. Pass a budget envelope through the delegation chain:

# budget_envelope.py
from dataclasses import dataclass


@dataclass
class BudgetEnvelope:
    max_llm_calls: int
    max_wall_time_seconds: float
    max_delegation_depth: int
    remaining_llm_calls: int
    remaining_wall_time_seconds: float
    current_depth: int

    def can_delegate(self) -> bool:
        return (
            self.current_depth < self.max_delegation_depth
            and self.remaining_llm_calls > 0
            and self.remaining_wall_time_seconds > 0
        )

    def subdivide(self, fraction: float = 0.5) -> "BudgetEnvelope":
        """Create a child budget for a sub-delegation."""
        if not self.can_delegate():
            raise ValueError("Budget exhausted or max depth reached")
        return BudgetEnvelope(
            max_llm_calls=self.max_llm_calls,
            max_wall_time_seconds=self.max_wall_time_seconds,
            max_delegation_depth=self.max_delegation_depth,
            remaining_llm_calls=int(self.remaining_llm_calls * fraction),
            remaining_wall_time_seconds=self.remaining_wall_time_seconds * fraction,
            current_depth=self.current_depth + 1,
        )

Pass the budget as metadata in A2A messages (DataPart with a well-known schema), and have each agent honour it.


138.11 The broader protocol landscape

A2A is not the only contender. Understanding the landscape helps you evaluate trade-offs:

Protocol / StandardOriginFocusStatus (2026)
A2A (Agent2Agent)Google, 2025Agent-to-agent task delegationOpen spec, growing adoption
MCPAnthropic, 2024Agent-to-tool integrationWidely adopted for tool use
OpenAPI + function callingOpenAI ecosystemLLM-to-API bindingDe facto standard for single-turn tool use
OASF (Open Agent Schema Framework)Linux FoundationAgent metadata and interopEarly stage
AutoGen / CrewAI protocolsMicrosoft / CrewAIMulti-agent orchestrationFramework-specific, not wire protocols
ACL / FIPAIEEE FIPA, 2000sAgent communication languageAcademic; influenced A2A’s message model

Key observations:

  1. MCP and A2A are complementary. MCP connects an agent to its tools; A2A connects agents to each other. Most production systems will use both.

  2. Framework-specific protocols are traps. If your agents can only talk through AutoGen’s internal message bus, you cannot integrate a CrewAI agent or a custom agent without an adapter layer. Wire-level protocols like A2A avoid this lock-in.

  3. The convergence is real. The industry is converging on JSON-RPC over HTTPS for the transport, Agent Cards (or similar manifests) for discovery, and OAuth 2.0 for auth. Bet on these primitives even if the specific protocol names change.


138.12 When A2A is worth it (and when it is not)

A2A introduces real complexity: Agent Cards to maintain, task state to persist, auth tokens to manage, streaming infrastructure to operate. Do not adopt it reflexively.

A2A is worth it when:

  • You have agents built by different teams or organisations that need to interoperate without sharing codebases.
  • The remote agent is a reasoning system (not a deterministic API) and you need multi-turn negotiation, clarification, or streaming.
  • You need a standard discovery mechanism so that new agents can be integrated without code changes to the orchestrator.
  • You are building an agent marketplace or platform where third parties publish agents.
  • You want to avoid framework lock-in---your agents are built with different stacks (LangGraph, CrewAI, custom) and need a common wire protocol.

A2A is overkill when:

  • All agents live in the same process or monorepo---use direct function calls or an in-process message bus.
  • The remote capability is a deterministic API with a fixed schema---MCP or plain HTTP is simpler.
  • You have only two or three agents with stable interfaces---the ceremony of Agent Cards and task lifecycle management does not pay for itself yet.
  • You need sub-millisecond latency---A2A’s HTTP + JSON-RPC overhead adds latency that matters for real-time systems.

Migration path

If you are unsure, start simple and evolve:

  1. Phase 1: Direct function calls between agents in the same process.
  2. Phase 2: Extract agents into services; use HTTP + simple JSON contracts.
  3. Phase 3: Add Agent Cards for discovery; adopt A2A task lifecycle for agents that need multi-turn interaction.
  4. Phase 4: Deploy an agent gateway; add auth, rate limiting, and observability.

You do not need to jump to Phase 4 on day one. The protocol is designed to be adoptable incrementally.


138.13 Observability in agent meshes

When agents delegate to agents, debugging failures becomes exponentially harder. Every A2A interaction should emit structured telemetry:

# a2a_telemetry.py -- OpenTelemetry integration for A2A
import time
import logging
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger("a2a.telemetry")


@dataclass
class A2ASpan:
    """Represents a single A2A interaction for tracing."""
    task_id: str
    client_agent: str
    remote_agent: str
    skill: Optional[str] = None
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    state: str = "submitted"
    error: Optional[str] = None
    message_count: int = 0
    artifact_count: int = 0
    parent_task_id: Optional[str] = None  # for delegation chains

    def finish(self, final_state: str, artifacts: int = 0):
        self.end_time = time.time()
        self.state = final_state
        self.artifact_count = artifacts
        self._emit()

    def _emit(self):
        duration_ms = (
            (self.end_time - self.start_time) * 1000
            if self.end_time
            else None
        )
        logger.info(
            "a2a_task_completed",
            extra={
                "task_id": self.task_id,
                "client_agent": self.client_agent,
                "remote_agent": self.remote_agent,
                "skill": self.skill,
                "state": self.state,
                "duration_ms": duration_ms,
                "messages": self.message_count,
                "artifacts": self.artifact_count,
                "error": self.error,
                "parent_task_id": self.parent_task_id,
            },
        )


class A2ATracer:
    """Collects spans across a delegation chain for distributed tracing."""

    def __init__(self):
        self._spans: dict[str, A2ASpan] = {}

    def start_span(
        self,
        task_id: str,
        client_agent: str,
        remote_agent: str,
        skill: Optional[str] = None,
        parent_task_id: Optional[str] = None,
    ) -> A2ASpan:
        span = A2ASpan(
            task_id=task_id,
            client_agent=client_agent,
            remote_agent=remote_agent,
            skill=skill,
            parent_task_id=parent_task_id,
        )
        self._spans[task_id] = span
        return span

    def get_chain(self, task_id: str) -> list[A2ASpan]:
        """Walk up the delegation chain from a leaf task."""
        chain = []
        current = task_id
        while current and current in self._spans:
            span = self._spans[current]
            chain.append(span)
            current = span.parent_task_id
        chain.reverse()
        return chain

In production, plug these spans into OpenTelemetry and visualise delegation chains in Jaeger or Grafana Tempo. The parent_task_id field lets you reconstruct the full tree of agent interactions that resulted from a single user request.


138.14 Real-world integration: connecting a LangGraph agent to a CrewAI agent

The true test of A2A is cross-framework interop. Here is a sketch showing how a LangGraph-based orchestrator delegates to a CrewAI-based specialist, with A2A as the wire protocol:

# langgraph_a2a_client.py -- A2A client node for LangGraph
from typing import TypedDict
import httpx


class AgentState(TypedDict):
    user_query: str
    travel_result: str | None
    task_id: str | None


async def delegate_to_travel_agent(state: AgentState) -> AgentState:
    """
    LangGraph node that delegates to a remote A2A agent.
    The remote agent could be CrewAI, AutoGen, or anything
    that speaks A2A -- we don't care.
    """
    async with httpx.AsyncClient(timeout=60) as client:
        # Step 1: Fetch Agent Card to verify capabilities
        card_resp = await client.get(
            "https://travel-crew.internal/.well-known/agent.json"
        )
        card = card_resp.json()
        assert "flight-booking" in [
            s["id"] for s in card.get("skills", [])
        ], "Remote agent lacks flight-booking skill"

        # Step 2: Send task
        task_id = f"lg-{state.get('task_id', 'new')}"
        response = await client.post(
            card["url"],
            json={
                "jsonrpc": "2.0",
                "id": f"req-{task_id}",
                "method": "tasks/send",
                "params": {
                    "id": task_id,
                    "message": {
                        "role": "user",
                        "parts": [
                            {"type": "text", "text": state["user_query"]}
                        ],
                    },
                },
            },
        )
        result = response.json().get("result", {})

        # Step 3: Handle multi-turn if needed
        while result.get("state") == "input-needed":
            # In a real system, the LLM would reason about the
            # agent's question and formulate a response.
            # Here we auto-select the first option.
            agent_msg = result["messages"][-1]
            response = await client.post(
                card["url"],
                json={
                    "jsonrpc": "2.0",
                    "id": f"req-{task_id}-reply",
                    "method": "tasks/send",
                    "params": {
                        "id": task_id,
                        "message": {
                            "role": "user",
                            "parts": [
                                {"type": "text", "text": "Select the first option."}
                            ],
                        },
                    },
                },
            )
            result = response.json().get("result", {})

        # Step 4: Extract artifacts
        artifacts = result.get("artifacts", [])
        travel_text = ""
        for artifact in artifacts:
            for part in artifact.get("parts", []):
                if part.get("type") == "text":
                    travel_text += part["text"] + "\n"

        return {
            **state,
            "travel_result": travel_text or "No result from travel agent",
            "task_id": task_id,
        }

The key insight: the LangGraph orchestrator does not import CrewAI, does not know what LLM the remote agent uses, and does not care about its internal tool configuration. A2A provides the abstraction boundary.


138.15 Security hardening checklist

Before deploying A2A agents in production, verify each of these:

  1. Agent Card validation. Fetch cards over HTTPS only. Pin certificates for internal agents. Validate the JSON schema before trusting any field.

  2. Input sanitisation. Treat every incoming A2A message as untrusted input. Remote agents can send prompt-injection payloads---run content through your injection-detection pipeline before feeding it to your LLM.

  3. Output filtering. Scan all outbound messages and artifacts for PII, secrets, and internal identifiers before sending them over A2A.

  4. Token scoping. Never pass your agent’s full credentials to a remote agent. Mint delegation tokens with narrow scopes and short TTLs.

  5. Rate limiting. Apply per-caller rate limits at the A2A endpoint. A misbehaving remote agent can otherwise consume all your capacity.

  6. Task timeout. Set maximum TTLs on tasks. A task stuck in working state for 30 minutes is a resource leak.

  7. Audit logging. Log every A2A method call (tasks/send, tasks/get, etc.) with caller identity, task ID, and timestamp. You will need this for incident response.

  8. Delegation depth limits. Cap how many times a task can be re-delegated. Unbounded delegation chains are a denial-of-service vector.


Read it yourself

ResourceWhy it matters
Google A2A specificationThe canonical protocol spec; read the specification.md for the full JSON-RPC schema
Anthropic MCP documentationUnderstand the agent-to-tool complement to A2A
JSON-RPC 2.0 specificationA2A’s transport layer; essential for debugging wire-level issues
OAuth 2.0 for inter-service auth (RFC 6749)The most common auth scheme for cross-org A2A
OpenTelemetry distributed tracingObservability for agent delegation chains
FIPA ACL specificationHistorical predecessor; illuminates why A2A’s message model looks the way it does

Practice

  1. Write an Agent Card for a code-review agent that accepts Git diffs (text/x-diff) and returns structured review comments (application/json). Include at least two skills with realistic examples.

  2. Implement tasks/get polling with exponential backoff. Write a client function that polls a remote A2A agent every 2 seconds (doubling each time, max 30 seconds) until the task reaches a terminal state (completed, failed, or canceled).

  3. Add input-needed handling to the A2A server in Section 138.9. Modify the /a2a endpoint so that when the input text contains a question mark, the agent transitions to input-needed and asks the client to clarify.

  4. Build an agent registry with semantic search. Extend the AgentRegistry class from Section 138.2 to embed skill descriptions using a sentence transformer and retrieve the best-matching agent for a natural-language query (e.g., “I need to translate a document from English to Japanese”).

  5. Implement delegation token verification. Write a middleware function for the A2A server that extracts a JWT from the Authorization header, verifies the delegation chain, checks that the requested skill is in the token’s skills claim, and rejects the request if any check fails.

  6. Trace a delegation chain. Given three agents (Orchestrator, TravelAgent, PaymentAgent), instrument all three with the A2ATracer from Section 138.13. Send a booking request through the chain and print the full span tree showing task IDs, durations, and states.

  7. Stretch: Build a bidirectional A2A adapter that wraps an existing MCP server as an A2A agent. The adapter should: (a) translate the MCP server’s tool manifest into an Agent Card with skills, (b) accept A2A tasks/send requests, (c) map the incoming message to the appropriate MCP tool call, (d) return the tool result as an A2A artifact. Test it with an MCP server that provides a calculator tool.