-- Agent-to-agent communication: A2A, agent cards, and the interoperability problem
"The hardest part of building a multi-agent system is not making one agent smart---it is making two agents cooperate without a human in the loop translating every request"
We spent Chapter 69 wiring tools to a single agent through Model Context Protocol (MCP). Chapter 68 arranged multiple agents into supervisor, pipeline, and debate topologies. But a question lingered: how do agents that belong to different teams, different organisations, or different vendors actually talk to each other?
MCP solves the agent-to-tool problem---connecting an LLM to structured capabilities. The agent-to-agent problem is fundamentally different. When the remote party is itself an opaque reasoning system that may negotiate, refuse, stream partial results, or ask clarifying questions, the protocol must support a richer interaction model than “call this function, get JSON back.”
Google’s Agent2Agent (A2A) protocol, published in April 2025, is the first serious open attempt at standardising this layer. This chapter dissects A2A end-to-end: discovery via Agent Cards, task lifecycle, streaming, push notifications, authentication, and the enterprise patterns that emerge when you connect dozens of agents into a mesh. We will also survey the broader protocol landscape and develop a decision framework for when A2A is---and is not---worth the integration cost.
138.1 MCP vs A2A: complementary, not competing
A common misconception is that A2A replaces MCP. It does not. The two protocols occupy adjacent but distinct layers:
| Concern | MCP | A2A |
|---|---|---|
| Primary relationship | Agent <-> Tool/resource | Agent <-> Agent |
| Remote party is… | Deterministic function | Opaque reasoning system |
| Interaction style | Request-response (single turn) | Multi-turn, negotiation, streaming |
| Discovery | Server capabilities list | Agent Card (JSON) |
| Output model | Structured tool result | Messages + Artifacts |
| Protocol surface | JSON-RPC over stdio/HTTP | JSON-RPC over HTTPS |
| State | Stateless per call | Stateful task with lifecycle |
Think of it this way: MCP is the USB port that lets an agent plug into databases, APIs, and file systems. A2A is the lingua franca that lets two agents collaborate on a shared task without either side knowing the other’s internal architecture.
In practice, a well-built agent will use both---MCP to access its own tools, and A2A to delegate subtasks to peer agents.
138.2 Agent Cards: self-describing agents
Before two agents can collaborate, the client agent must discover what the remote agent can do.
A2A solves this with the Agent Card---a JSON document served at a well-known URL (by convention /.well-known/agent.json) that describes the agent’s identity, capabilities, endpoint, and authentication requirements.
// GET https://travel.example.com/.well-known/agent.json
{
"name": "TravelAgent",
"description": "Books flights, hotels, and rental cars. Handles multi-city itineraries and loyalty programme integration.",
"url": "https://travel.example.com/a2a",
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": true
},
"authentication": {
"schemes": ["OAuth2"],
"credentials": "Bearer token from https://auth.example.com/token"
},
"defaultInputModes": ["text/plain", "application/json"],
"defaultOutputModes": ["text/plain", "application/json", "image/png"],
"skills": [
{
"id": "flight-booking",
"name": "Flight Booking",
"description": "Search and book flights across 400+ airlines.",
"tags": ["travel", "flights", "booking"],
"examples": [
"Book a round-trip from SFO to NRT on March 15, returning March 22",
"Find the cheapest business class option LAX to LHR next Friday"
]
},
{
"id": "hotel-search",
"name": "Hotel Search",
"description": "Search hotels by location, date, price range, and amenities.",
"tags": ["travel", "hotels", "accommodation"],
"examples": [
"Find pet-friendly hotels in downtown Tokyo under $200/night"
]
}
]
}
Key design decisions in the Agent Card format:
-
Skills, not tools. Unlike MCP’s tool manifests which describe function signatures with typed parameters, A2A skills are described in natural language. This is intentional---the remote agent is a reasoning system that interprets intent, not a function that requires exact parameter shapes.
-
Input/output modalities. An agent can declare that it accepts images, audio, or structured JSON---and what it produces. A charting agent might accept
application/jsondata and returnimage/svg+xml. -
Authentication up front. The card declares which auth schemes the agent requires, so the client agent (or its orchestrator) can obtain credentials before the first message.
-
Capability negotiation. The
capabilitiesobject tells the client whether it can use streaming (SSE), push notifications (webhooks), or request state history. Clients degrade gracefully when a capability is absent.
Discovery patterns
In a small deployment, agents can be configured with each other’s card URLs statically. At enterprise scale, you need a registry:
# agent_registry.py -- lightweight A2A agent registry
from dataclasses import dataclass, field
from typing import Optional
import httpx
import asyncio
@dataclass
class RegisteredAgent:
name: str
card_url: str
card: dict = field(default_factory=dict)
tags: set[str] = field(default_factory=set)
last_healthy: Optional[float] = None
class AgentRegistry:
"""
Central registry that periodically fetches Agent Cards
and allows skill-based lookup.
"""
def __init__(self):
self._agents: dict[str, RegisteredAgent] = {}
self._client = httpx.AsyncClient(timeout=10)
async def register(self, name: str, card_url: str) -> RegisteredAgent:
card = await self._fetch_card(card_url)
tags = set()
for skill in card.get("skills", []):
tags.update(skill.get("tags", []))
agent = RegisteredAgent(
name=name, card_url=card_url, card=card, tags=tags
)
self._agents[name] = agent
return agent
async def _fetch_card(self, url: str) -> dict:
resp = await self._client.get(url)
resp.raise_for_status()
return resp.json()
def find_by_skill(self, query_tags: set[str]) -> list[RegisteredAgent]:
"""Return agents whose skills overlap with the query tags."""
results = []
for agent in self._agents.values():
overlap = agent.tags & query_tags
if overlap:
results.append((len(overlap), agent))
results.sort(key=lambda x: x[0], reverse=True)
return [agent for _, agent in results]
async def health_check_all(self):
"""Refresh cards and mark unreachable agents."""
import time
tasks = []
for agent in self._agents.values():
tasks.append(self._refresh(agent))
await asyncio.gather(*tasks, return_exceptions=True)
async def _refresh(self, agent: RegisteredAgent):
import time
try:
agent.card = await self._fetch_card(agent.card_url)
agent.last_healthy = time.time()
except Exception:
pass # agent.last_healthy stays stale
A production registry would add semantic search over skill descriptions (embed the description, store in a vector index, query at delegation time), TTL-based caching, and circuit-breaking for flaky agents.
138.3 Task lifecycle: the heart of A2A
Every interaction between a client agent and a remote agent is modelled as a Task. Tasks have an explicit lifecycle with well-defined state transitions:
input-needed state enables multi-turn negotiation without the client polling blindly.The six states:
| State | Meaning |
|---|---|
| submitted | Client has sent the task; remote agent has acknowledged receipt. |
| working | Remote agent is actively processing. May emit streaming events. |
| input-needed | Remote agent requires clarification or additional data from the client. |
| completed | Task finished successfully. Artifacts are available. |
| failed | Task terminated due to an error. An error message is attached. |
| canceled | Client (or server) explicitly canceled the task. |
The input-needed state is what separates A2A from a simple RPC.
It models the real-world pattern where a remote agent says “I found three flights---which one do you prefer?” and the client agent must respond before work continues.
138.4 Messages, Parts, and Artifacts
A2A communication is structured around three primitives:
Messages flow between client and remote agent within a task. Each message has a role (user for the client agent, agent for the remote agent) and contains one or more Parts.
Parts are typed content units:
TextPart--- plain text or markdownFilePart--- binary data (inline base64 or a URI reference)DataPart--- structured JSON (for passing parameters, form data, etc.)
Artifacts are the durable outputs of a completed task. They are distinct from messages because they represent deliverables, not conversation. A flight-booking agent’s artifact might be a JSON itinerary; a chart-generation agent’s artifact might be an SVG image.
# a2a_types.py -- core A2A data structures (simplified)
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import uuid
import time
class TaskState(str, Enum):
SUBMITTED = "submitted"
WORKING = "working"
INPUT_NEEDED = "input-needed"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
@dataclass
class TextPart:
type: str = "text"
text: str = ""
@dataclass
class DataPart:
type: str = "data"
data: dict[str, Any] = field(default_factory=dict)
metadata: Optional[dict] = None
@dataclass
class FilePart:
type: str = "file"
mime_type: str = "application/octet-stream"
uri: Optional[str] = None
data: Optional[str] = None # base64-encoded if inline
Part = TextPart | DataPart | FilePart
@dataclass
class Message:
role: str # "user" or "agent"
parts: list[Part]
metadata: Optional[dict] = None
@dataclass
class Artifact:
name: str
parts: list[Part]
description: Optional[str] = None
metadata: Optional[dict] = None
@dataclass
class Task:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
state: TaskState = TaskState.SUBMITTED
messages: list[Message] = field(default_factory=list)
artifacts: list[Artifact] = field(default_factory=list)
metadata: Optional[dict] = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
def add_message(self, message: Message):
self.messages.append(message)
self.updated_at = time.time()
def transition(self, new_state: TaskState):
self.state = new_state
self.updated_at = time.time()
138.5 The JSON-RPC wire protocol
A2A uses JSON-RPC 2.0 over HTTPS. The core methods are:
| Method | Direction | Purpose |
|---|---|---|
tasks/send | Client -> Server | Send a message, create or continue a task |
tasks/get | Client -> Server | Poll current task state |
tasks/cancel | Client -> Server | Request cancellation |
tasks/sendSubscribe | Client -> Server | Send a message and open SSE stream |
tasks/pushNotification/set | Client -> Server | Register a webhook for task updates |
tasks/pushNotification/get | Client -> Server | Retrieve current webhook config |
A minimal request/response cycle:
// --- Client sends a task ---
// POST https://travel.example.com/a2a
{
"jsonrpc": "2.0",
"id": "req-001",
"method": "tasks/send",
"params": {
"id": "task-7f3a",
"message": {
"role": "user",
"parts": [
{
"type": "text",
"text": "Book the cheapest direct flight SFO to NRT, departing March 15, returning March 22. Economy class."
}
]
}
}
}
// --- Server responds (task is now working) ---
{
"jsonrpc": "2.0",
"id": "req-001",
"result": {
"id": "task-7f3a",
"state": "working",
"messages": [
{
"role": "agent",
"parts": [
{
"type": "text",
"text": "Searching flights from SFO to NRT. I'll have options shortly."
}
]
}
]
}
}
When the remote agent needs input:
// --- Client polls with tasks/get ---
{
"jsonrpc": "2.0",
"id": "req-002",
"method": "tasks/get",
"params": { "id": "task-7f3a" }
}
// --- Server: agent needs clarification ---
{
"jsonrpc": "2.0",
"id": "req-002",
"result": {
"id": "task-7f3a",
"state": "input-needed",
"messages": [
{
"role": "agent",
"parts": [
{
"type": "text",
"text": "I found 3 direct flights:\n1. JAL 1 -- $980 -- departs 11:15\n2. ANA 7 -- $1,020 -- departs 16:30\n3. United 837 -- $890 -- departs 23:55\nWhich would you like?"
},
{
"type": "data",
"data": {
"options": [
{"id": "jal-1", "price": 980, "departure": "11:15"},
{"id": "ana-7", "price": 1020, "departure": "16:30"},
{"id": "ua-837", "price": 890, "departure": "23:55"}
]
}
}
]
}
]
}
}
The client agent can now reason over the structured data in the DataPart, apply its own policies (“never book red-eye flights” or “always choose cheapest”), and reply:
{
"jsonrpc": "2.0",
"id": "req-003",
"method": "tasks/send",
"params": {
"id": "task-7f3a",
"message": {
"role": "user",
"parts": [
{ "type": "text", "text": "Book option 1, JAL flight 1." },
{ "type": "data", "data": { "selected": "jal-1" } }
]
}
}
}
138.6 Streaming with Server-Sent Events
For long-running tasks---generating a report, executing a multi-step workflow---polling with tasks/get is wasteful.
The tasks/sendSubscribe method opens an SSE (Server-Sent Events) stream that pushes TaskStatusUpdateEvent and TaskArtifactUpdateEvent objects as they occur.
# a2a_streaming_client.py
import httpx
import json
from typing import AsyncIterator
async def stream_task(
server_url: str,
task_id: str,
message: dict,
auth_token: str,
) -> AsyncIterator[dict]:
"""
Send a message via tasks/sendSubscribe and yield
streaming events as they arrive.
"""
payload = {
"jsonrpc": "2.0",
"id": f"stream-{task_id}",
"method": "tasks/sendSubscribe",
"params": {
"id": task_id,
"message": message,
},
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {auth_token}",
"Accept": "text/event-stream",
}
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST", server_url, json=payload, headers=headers
) as response:
response.raise_for_status()
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while "\n\n" in buffer:
event_text, buffer = buffer.split("\n\n", 1)
event = _parse_sse(event_text)
if event:
yield event
def _parse_sse(raw: str) -> dict | None:
"""Parse a single SSE event block."""
data_lines = []
for line in raw.strip().split("\n"):
if line.startswith("data: "):
data_lines.append(line[6:])
if data_lines:
return json.loads("".join(data_lines))
return None
# Usage
async def delegate_report_generation():
message = {
"role": "user",
"parts": [{"type": "text", "text": "Generate Q1 revenue report"}],
}
async for event in stream_task(
server_url="https://analytics.internal/a2a",
task_id="report-q1-2026",
message=message,
auth_token="eyJhbGciOi...",
):
if "status" in event.get("result", {}):
status = event["result"]["status"]
print(f"State: {status['state']}")
if status.get("message"):
for part in status["message"].get("parts", []):
if part.get("type") == "text":
print(f" > {part['text']}")
if "artifact" in event.get("result", {}):
artifact = event["result"]["artifact"]
print(f"Artifact received: {artifact.get('name', 'unnamed')}")
Streaming events carry an optional final flag.
When the client sees final: true on a TaskStatusUpdateEvent, the stream is complete and the connection can close.
138.7 Push notifications
SSE works well when the client can hold a connection open. In serverless or event-driven architectures, you may prefer push notifications---the remote agent calls a webhook when the task state changes.
// Register a webhook for task updates
{
"jsonrpc": "2.0",
"id": "notif-setup",
"method": "tasks/pushNotification/set",
"params": {
"id": "task-7f3a",
"pushNotificationConfig": {
"url": "https://orchestrator.internal/hooks/a2a",
"token": "hmac-verification-token-xyz",
"authentication": {
"schemes": ["Bearer"],
"credentials": "callback-auth-token-abc"
}
}
}
}
The remote agent will POST to the webhook URL whenever the task transitions state.
The token field allows the client to verify the notification is authentic (the server includes it in a signature header).
This pattern is essential for fire-and-forget delegation: the orchestrator agent sends a task to five specialist agents, registers webhooks, and processes results as they arrive---no long-lived connections, no polling loops.
138.8 Trust and authentication between agents
Chapter 79 covered inter-service trust in traditional microservices. Agent-to-agent trust introduces additional challenges because agents make autonomous decisions about what to request and what to share.
Authentication schemes
A2A supports pluggable authentication declared in the Agent Card. Common schemes in production:
| Scheme | Use case | Notes |
|---|---|---|
| OAuth 2.0 + OIDC | Cross-org agents | Standard token exchange; supports scopes for fine-grained access |
| Mutual TLS (mTLS) | Internal agent mesh | Certificate-based identity; no tokens to manage |
| API keys | Dev/testing, simple integrations | Rotate frequently; never embed in agent prompts |
| JWT with claims | Delegation chains | Client agent passes its identity + delegated permissions |
The delegation chain problem
When Agent A delegates to Agent B, which delegates to Agent C, the question arises: what permissions does Agent C have?
A naive approach gives Agent C the same token as Agent A. This violates the principle of least privilege and creates a confused deputy risk---Agent C can now access anything Agent A can.
The robust pattern uses scoped delegation tokens:
# delegation_token.py -- scoped token minting for A2A chains
import jwt
import time
from typing import Optional
def mint_delegation_token(
issuer_agent: str,
target_agent: str,
task_id: str,
allowed_skills: list[str],
ttl_seconds: int = 300,
parent_token: Optional[str] = None,
signing_key: str = "",
) -> str:
"""
Mint a short-lived, narrowly-scoped JWT for agent-to-agent delegation.
The token encodes:
- Who issued it (the delegating agent)
- Who it's for (the target agent)
- Which task it's scoped to
- Which skills the target may invoke
- The delegation chain (if this is a re-delegation)
"""
now = time.time()
# Build delegation chain from parent token
chain = [issuer_agent]
if parent_token:
try:
parent_claims = jwt.decode(
parent_token, signing_key, algorithms=["HS256"]
)
chain = parent_claims.get("delegation_chain", []) + [issuer_agent]
except jwt.InvalidTokenError:
raise ValueError("Invalid parent delegation token")
claims = {
"iss": issuer_agent,
"aud": target_agent,
"sub": f"task:{task_id}",
"iat": now,
"exp": now + ttl_seconds,
"skills": allowed_skills,
"delegation_chain": chain,
"max_delegation_depth": 3, # prevent unbounded re-delegation
}
# Enforce depth limit
if len(chain) > claims["max_delegation_depth"]:
raise ValueError(
f"Delegation chain depth {len(chain)} exceeds maximum "
f"{claims['max_delegation_depth']}"
)
return jwt.encode(claims, signing_key, algorithm="HS256")
Input/output guardrails
Authentication tells you who is calling. But agents also need guardrails on what they share. A remote agent should never leak sensitive data just because the client agent asked nicely. Implement output filtering at the A2A boundary:
# a2a_guardrails.py
import re
from dataclasses import dataclass
PII_PATTERNS = [
(re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "SSN"),
(re.compile(r"\b\d{16}\b"), "credit card"),
(re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"), "email"),
]
@dataclass
class GuardrailResult:
passed: bool
violations: list[str]
sanitized_text: str
def check_outbound_message(text: str, allow_email: bool = False) -> GuardrailResult:
"""
Scan outbound A2A messages for PII before sending.
In production, combine with an LLM-based classifier.
"""
violations = []
sanitized = text
for pattern, label in PII_PATTERNS:
if label == "email" and allow_email:
continue
matches = pattern.findall(sanitized)
if matches:
violations.append(f"Found {len(matches)} potential {label}(s)")
sanitized = pattern.sub(f"[REDACTED-{label.upper()}]", sanitized)
return GuardrailResult(
passed=len(violations) == 0,
violations=violations,
sanitized_text=sanitized,
)
138.9 Building an A2A server
Let us build a minimal but complete A2A server that implements the core protocol. We will use FastAPI, but the pattern translates to any HTTP framework.
# a2a_server.py -- minimal A2A-compliant server
from __future__ import annotations
import asyncio
import json
import uuid
from typing import Any, Optional
from dataclasses import dataclass, field, asdict
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
app = FastAPI()
# --- In-memory task store (use Redis/Postgres in production) ---
tasks: dict[str, dict] = {}
# --- Agent Card ---
AGENT_CARD = {
"name": "SummaryAgent",
"description": "Summarises documents and URLs into concise bullet points.",
"url": "https://summary.internal/a2a",
"version": "1.0.0",
"capabilities": {
"streaming": True,
"pushNotifications": False,
"stateTransitionHistory": False,
},
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["text/plain"],
"skills": [
{
"id": "summarize",
"name": "Document Summarization",
"description": "Produce a concise summary of the provided text.",
"tags": ["summarization", "text", "nlp"],
"examples": ["Summarize this quarterly earnings report"],
}
],
}
@app.get("/.well-known/agent.json")
async def agent_card():
return AGENT_CARD
@app.post("/a2a")
async def handle_jsonrpc(request: Request):
body = await request.json()
method = body.get("method")
params = body.get("params", {})
req_id = body.get("id")
if method == "tasks/send":
return _handle_send(req_id, params)
elif method == "tasks/get":
return _handle_get(req_id, params)
elif method == "tasks/cancel":
return _handle_cancel(req_id, params)
elif method == "tasks/sendSubscribe":
return await _handle_send_subscribe(req_id, params)
else:
return _error_response(req_id, -32601, f"Unknown method: {method}")
def _handle_send(req_id: str, params: dict) -> dict:
task_id = params.get("id", str(uuid.uuid4()))
message = params.get("message", {})
if task_id not in tasks:
tasks[task_id] = {
"id": task_id,
"state": "submitted",
"messages": [],
"artifacts": [],
}
task = tasks[task_id]
task["messages"].append(message)
task["state"] = "working"
# Simulate agent processing (in production, dispatch to LLM pipeline)
input_text = _extract_text(message)
summary = f"Summary: {input_text[:100]}..." # placeholder
agent_message = {
"role": "agent",
"parts": [{"type": "text", "text": summary}],
}
task["messages"].append(agent_message)
task["state"] = "completed"
task["artifacts"] = [
{
"name": "summary",
"parts": [{"type": "text", "text": summary}],
}
]
return {
"jsonrpc": "2.0",
"id": req_id,
"result": task,
}
def _handle_get(req_id: str, params: dict) -> dict:
task_id = params.get("id")
task = tasks.get(task_id)
if not task:
return _error_response(req_id, -32001, f"Task {task_id} not found")
return {"jsonrpc": "2.0", "id": req_id, "result": task}
def _handle_cancel(req_id: str, params: dict) -> dict:
task_id = params.get("id")
task = tasks.get(task_id)
if not task:
return _error_response(req_id, -32001, f"Task {task_id} not found")
task["state"] = "canceled"
return {"jsonrpc": "2.0", "id": req_id, "result": task}
async def _handle_send_subscribe(req_id: str, params: dict):
"""Stream task updates via SSE."""
task_id = params.get("id", str(uuid.uuid4()))
message = params.get("message", {})
if task_id not in tasks:
tasks[task_id] = {
"id": task_id,
"state": "submitted",
"messages": [],
"artifacts": [],
}
task = tasks[task_id]
task["messages"].append(message)
async def event_stream():
# Emit working status
task["state"] = "working"
yield _sse_event({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"id": task_id,
"status": {"state": "working", "message": {
"role": "agent",
"parts": [{"type": "text", "text": "Processing..."}],
}},
"final": False,
},
})
await asyncio.sleep(0.5) # simulate work
# Emit artifact
input_text = _extract_text(message)
summary = f"Summary: {input_text[:200]}..."
yield _sse_event({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"id": task_id,
"artifact": {
"name": "summary",
"parts": [{"type": "text", "text": summary}],
},
},
})
# Emit completed (final)
task["state"] = "completed"
yield _sse_event({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"id": task_id,
"status": {"state": "completed"},
"final": True,
},
})
return StreamingResponse(
event_stream(), media_type="text/event-stream"
)
def _sse_event(data: dict) -> str:
return f"data: {json.dumps(data)}\n\n"
def _extract_text(message: dict) -> str:
parts = message.get("parts", [])
return " ".join(p.get("text", "") for p in parts if p.get("type") == "text")
def _error_response(req_id: str, code: int, message: str) -> dict:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": code, "message": message},
}
138.10 Enterprise agent mesh patterns
When you move beyond two agents talking to each other and into a network of dozens of specialised agents, new architectural patterns emerge.
Pattern 1: The orchestrator hub
A single orchestrator agent receives user requests, consults the agent registry, and delegates subtasks to specialist agents. This is the simplest topology and works well when you want centralised control over task decomposition and result assembly.
User -> Orchestrator -> [TravelAgent, ExpenseAgent, CalendarAgent]
<- assembled itinerary + expense report + calendar events
Advantages: single point of policy enforcement, simple auth model (orchestrator holds all tokens). Disadvantages: orchestrator is a bottleneck, single point of failure, must understand every agent’s capabilities.
Pattern 2: Peer-to-peer delegation
Agents discover and delegate to each other directly, with no central orchestrator. The TravelAgent might delegate to a PaymentAgent, which delegates to a FraudDetectionAgent.
Advantages: no bottleneck, agents evolve independently. Disadvantages: harder to trace, harder to enforce global policies, delegation chains can grow unbounded (hence the depth limit in our token minting code).
Pattern 3: The agent gateway
Inspired by API gateways (Chapter 104), an agent gateway sits at the boundary of your agent mesh:
# agent_gateway.py -- A2A gateway with rate limiting and routing
from dataclasses import dataclass
from typing import Optional
import time
import httpx
@dataclass
class RouteRule:
skill_tags: set[str]
target_url: str
rate_limit_rpm: int = 60
requires_approval: bool = False
class AgentGateway:
"""
Routes A2A requests to the appropriate backend agent,
enforcing rate limits, auth, and approval policies.
"""
def __init__(self):
self._routes: list[RouteRule] = []
self._request_counts: dict[str, list[float]] = {}
self._client = httpx.AsyncClient(timeout=30)
def add_route(self, rule: RouteRule):
self._routes.append(rule)
async def route_task(
self,
task_request: dict,
caller_identity: str,
skill_hint: Optional[str] = None,
) -> dict:
# Find matching route
route = self._match_route(skill_hint)
if not route:
return {"error": "No agent available for this skill"}
# Rate limiting
if not self._check_rate_limit(caller_identity, route):
return {"error": "Rate limit exceeded", "retry_after_seconds": 60}
# Approval gate (for high-stakes actions)
if route.requires_approval:
return {
"status": "pending_approval",
"message": "This action requires human approval before proceeding",
}
# Forward to backend agent
response = await self._client.post(
route.target_url,
json=task_request,
headers={"Content-Type": "application/json"},
)
return response.json()
def _match_route(self, skill_hint: Optional[str]) -> Optional[RouteRule]:
if not skill_hint:
return self._routes[0] if self._routes else None
for route in self._routes:
if skill_hint in route.skill_tags:
return route
return None
def _check_rate_limit(self, caller: str, route: RouteRule) -> bool:
now = time.time()
window = 60.0
key = f"{caller}:{route.target_url}"
if key not in self._request_counts:
self._request_counts[key] = []
# Prune old entries
self._request_counts[key] = [
t for t in self._request_counts[key] if now - t < window
]
if len(self._request_counts[key]) >= route.rate_limit_rpm:
return False
self._request_counts[key].append(now)
return True
The gateway pattern gives you a single place to add observability, rate limiting, approval workflows, and audit logging---all without modifying the agents themselves.
Pattern 4: Hierarchical delegation with budget
In enterprise settings, you often want to limit the total cost or time an agent chain can consume. Pass a budget envelope through the delegation chain:
# budget_envelope.py
from dataclasses import dataclass
@dataclass
class BudgetEnvelope:
max_llm_calls: int
max_wall_time_seconds: float
max_delegation_depth: int
remaining_llm_calls: int
remaining_wall_time_seconds: float
current_depth: int
def can_delegate(self) -> bool:
return (
self.current_depth < self.max_delegation_depth
and self.remaining_llm_calls > 0
and self.remaining_wall_time_seconds > 0
)
def subdivide(self, fraction: float = 0.5) -> "BudgetEnvelope":
"""Create a child budget for a sub-delegation."""
if not self.can_delegate():
raise ValueError("Budget exhausted or max depth reached")
return BudgetEnvelope(
max_llm_calls=self.max_llm_calls,
max_wall_time_seconds=self.max_wall_time_seconds,
max_delegation_depth=self.max_delegation_depth,
remaining_llm_calls=int(self.remaining_llm_calls * fraction),
remaining_wall_time_seconds=self.remaining_wall_time_seconds * fraction,
current_depth=self.current_depth + 1,
)
Pass the budget as metadata in A2A messages (DataPart with a well-known schema), and have each agent honour it.
138.11 The broader protocol landscape
A2A is not the only contender. Understanding the landscape helps you evaluate trade-offs:
| Protocol / Standard | Origin | Focus | Status (2026) |
|---|---|---|---|
| A2A (Agent2Agent) | Google, 2025 | Agent-to-agent task delegation | Open spec, growing adoption |
| MCP | Anthropic, 2024 | Agent-to-tool integration | Widely adopted for tool use |
| OpenAPI + function calling | OpenAI ecosystem | LLM-to-API binding | De facto standard for single-turn tool use |
| OASF (Open Agent Schema Framework) | Linux Foundation | Agent metadata and interop | Early stage |
| AutoGen / CrewAI protocols | Microsoft / CrewAI | Multi-agent orchestration | Framework-specific, not wire protocols |
| ACL / FIPA | IEEE FIPA, 2000s | Agent communication language | Academic; influenced A2A’s message model |
Key observations:
-
MCP and A2A are complementary. MCP connects an agent to its tools; A2A connects agents to each other. Most production systems will use both.
-
Framework-specific protocols are traps. If your agents can only talk through AutoGen’s internal message bus, you cannot integrate a CrewAI agent or a custom agent without an adapter layer. Wire-level protocols like A2A avoid this lock-in.
-
The convergence is real. The industry is converging on JSON-RPC over HTTPS for the transport, Agent Cards (or similar manifests) for discovery, and OAuth 2.0 for auth. Bet on these primitives even if the specific protocol names change.
138.12 When A2A is worth it (and when it is not)
A2A introduces real complexity: Agent Cards to maintain, task state to persist, auth tokens to manage, streaming infrastructure to operate. Do not adopt it reflexively.
A2A is worth it when:
- You have agents built by different teams or organisations that need to interoperate without sharing codebases.
- The remote agent is a reasoning system (not a deterministic API) and you need multi-turn negotiation, clarification, or streaming.
- You need a standard discovery mechanism so that new agents can be integrated without code changes to the orchestrator.
- You are building an agent marketplace or platform where third parties publish agents.
- You want to avoid framework lock-in---your agents are built with different stacks (LangGraph, CrewAI, custom) and need a common wire protocol.
A2A is overkill when:
- All agents live in the same process or monorepo---use direct function calls or an in-process message bus.
- The remote capability is a deterministic API with a fixed schema---MCP or plain HTTP is simpler.
- You have only two or three agents with stable interfaces---the ceremony of Agent Cards and task lifecycle management does not pay for itself yet.
- You need sub-millisecond latency---A2A’s HTTP + JSON-RPC overhead adds latency that matters for real-time systems.
Migration path
If you are unsure, start simple and evolve:
- Phase 1: Direct function calls between agents in the same process.
- Phase 2: Extract agents into services; use HTTP + simple JSON contracts.
- Phase 3: Add Agent Cards for discovery; adopt A2A task lifecycle for agents that need multi-turn interaction.
- Phase 4: Deploy an agent gateway; add auth, rate limiting, and observability.
You do not need to jump to Phase 4 on day one. The protocol is designed to be adoptable incrementally.
138.13 Observability in agent meshes
When agents delegate to agents, debugging failures becomes exponentially harder. Every A2A interaction should emit structured telemetry:
# a2a_telemetry.py -- OpenTelemetry integration for A2A
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger("a2a.telemetry")
@dataclass
class A2ASpan:
"""Represents a single A2A interaction for tracing."""
task_id: str
client_agent: str
remote_agent: str
skill: Optional[str] = None
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
state: str = "submitted"
error: Optional[str] = None
message_count: int = 0
artifact_count: int = 0
parent_task_id: Optional[str] = None # for delegation chains
def finish(self, final_state: str, artifacts: int = 0):
self.end_time = time.time()
self.state = final_state
self.artifact_count = artifacts
self._emit()
def _emit(self):
duration_ms = (
(self.end_time - self.start_time) * 1000
if self.end_time
else None
)
logger.info(
"a2a_task_completed",
extra={
"task_id": self.task_id,
"client_agent": self.client_agent,
"remote_agent": self.remote_agent,
"skill": self.skill,
"state": self.state,
"duration_ms": duration_ms,
"messages": self.message_count,
"artifacts": self.artifact_count,
"error": self.error,
"parent_task_id": self.parent_task_id,
},
)
class A2ATracer:
"""Collects spans across a delegation chain for distributed tracing."""
def __init__(self):
self._spans: dict[str, A2ASpan] = {}
def start_span(
self,
task_id: str,
client_agent: str,
remote_agent: str,
skill: Optional[str] = None,
parent_task_id: Optional[str] = None,
) -> A2ASpan:
span = A2ASpan(
task_id=task_id,
client_agent=client_agent,
remote_agent=remote_agent,
skill=skill,
parent_task_id=parent_task_id,
)
self._spans[task_id] = span
return span
def get_chain(self, task_id: str) -> list[A2ASpan]:
"""Walk up the delegation chain from a leaf task."""
chain = []
current = task_id
while current and current in self._spans:
span = self._spans[current]
chain.append(span)
current = span.parent_task_id
chain.reverse()
return chain
In production, plug these spans into OpenTelemetry and visualise delegation chains in Jaeger or Grafana Tempo.
The parent_task_id field lets you reconstruct the full tree of agent interactions that resulted from a single user request.
138.14 Real-world integration: connecting a LangGraph agent to a CrewAI agent
The true test of A2A is cross-framework interop. Here is a sketch showing how a LangGraph-based orchestrator delegates to a CrewAI-based specialist, with A2A as the wire protocol:
# langgraph_a2a_client.py -- A2A client node for LangGraph
from typing import TypedDict
import httpx
class AgentState(TypedDict):
user_query: str
travel_result: str | None
task_id: str | None
async def delegate_to_travel_agent(state: AgentState) -> AgentState:
"""
LangGraph node that delegates to a remote A2A agent.
The remote agent could be CrewAI, AutoGen, or anything
that speaks A2A -- we don't care.
"""
async with httpx.AsyncClient(timeout=60) as client:
# Step 1: Fetch Agent Card to verify capabilities
card_resp = await client.get(
"https://travel-crew.internal/.well-known/agent.json"
)
card = card_resp.json()
assert "flight-booking" in [
s["id"] for s in card.get("skills", [])
], "Remote agent lacks flight-booking skill"
# Step 2: Send task
task_id = f"lg-{state.get('task_id', 'new')}"
response = await client.post(
card["url"],
json={
"jsonrpc": "2.0",
"id": f"req-{task_id}",
"method": "tasks/send",
"params": {
"id": task_id,
"message": {
"role": "user",
"parts": [
{"type": "text", "text": state["user_query"]}
],
},
},
},
)
result = response.json().get("result", {})
# Step 3: Handle multi-turn if needed
while result.get("state") == "input-needed":
# In a real system, the LLM would reason about the
# agent's question and formulate a response.
# Here we auto-select the first option.
agent_msg = result["messages"][-1]
response = await client.post(
card["url"],
json={
"jsonrpc": "2.0",
"id": f"req-{task_id}-reply",
"method": "tasks/send",
"params": {
"id": task_id,
"message": {
"role": "user",
"parts": [
{"type": "text", "text": "Select the first option."}
],
},
},
},
)
result = response.json().get("result", {})
# Step 4: Extract artifacts
artifacts = result.get("artifacts", [])
travel_text = ""
for artifact in artifacts:
for part in artifact.get("parts", []):
if part.get("type") == "text":
travel_text += part["text"] + "\n"
return {
**state,
"travel_result": travel_text or "No result from travel agent",
"task_id": task_id,
}
The key insight: the LangGraph orchestrator does not import CrewAI, does not know what LLM the remote agent uses, and does not care about its internal tool configuration. A2A provides the abstraction boundary.
138.15 Security hardening checklist
Before deploying A2A agents in production, verify each of these:
-
Agent Card validation. Fetch cards over HTTPS only. Pin certificates for internal agents. Validate the JSON schema before trusting any field.
-
Input sanitisation. Treat every incoming A2A message as untrusted input. Remote agents can send prompt-injection payloads---run content through your injection-detection pipeline before feeding it to your LLM.
-
Output filtering. Scan all outbound messages and artifacts for PII, secrets, and internal identifiers before sending them over A2A.
-
Token scoping. Never pass your agent’s full credentials to a remote agent. Mint delegation tokens with narrow scopes and short TTLs.
-
Rate limiting. Apply per-caller rate limits at the A2A endpoint. A misbehaving remote agent can otherwise consume all your capacity.
-
Task timeout. Set maximum TTLs on tasks. A task stuck in
workingstate for 30 minutes is a resource leak. -
Audit logging. Log every A2A method call (tasks/send, tasks/get, etc.) with caller identity, task ID, and timestamp. You will need this for incident response.
-
Delegation depth limits. Cap how many times a task can be re-delegated. Unbounded delegation chains are a denial-of-service vector.
Read it yourself
| Resource | Why it matters |
|---|---|
| Google A2A specification | The canonical protocol spec; read the specification.md for the full JSON-RPC schema |
| Anthropic MCP documentation | Understand the agent-to-tool complement to A2A |
| JSON-RPC 2.0 specification | A2A’s transport layer; essential for debugging wire-level issues |
| OAuth 2.0 for inter-service auth (RFC 6749) | The most common auth scheme for cross-org A2A |
| OpenTelemetry distributed tracing | Observability for agent delegation chains |
| FIPA ACL specification | Historical predecessor; illuminates why A2A’s message model looks the way it does |
Practice
-
Write an Agent Card for a code-review agent that accepts Git diffs (
text/x-diff) and returns structured review comments (application/json). Include at least two skills with realistic examples. -
Implement
tasks/getpolling with exponential backoff. Write a client function that polls a remote A2A agent every 2 seconds (doubling each time, max 30 seconds) until the task reaches a terminal state (completed,failed, orcanceled). -
Add
input-neededhandling to the A2A server in Section 138.9. Modify the/a2aendpoint so that when the input text contains a question mark, the agent transitions toinput-neededand asks the client to clarify. -
Build an agent registry with semantic search. Extend the
AgentRegistryclass from Section 138.2 to embed skill descriptions using a sentence transformer and retrieve the best-matching agent for a natural-language query (e.g., “I need to translate a document from English to Japanese”). -
Implement delegation token verification. Write a middleware function for the A2A server that extracts a JWT from the
Authorizationheader, verifies the delegation chain, checks that the requested skill is in the token’sskillsclaim, and rejects the request if any check fails. -
Trace a delegation chain. Given three agents (Orchestrator, TravelAgent, PaymentAgent), instrument all three with the
A2ATracerfrom Section 138.13. Send a booking request through the chain and print the full span tree showing task IDs, durations, and states. -
Stretch: Build a bidirectional A2A adapter that wraps an existing MCP server as an A2A agent. The adapter should: (a) translate the MCP server’s tool manifest into an Agent Card with skills, (b) accept A2A
tasks/sendrequests, (c) map the incoming message to the appropriate MCP tool call, (d) return the tool result as an A2A artifact. Test it with an MCP server that provides a calculator tool.