Agent memory systems: context, retrieval, and persistence
"Every conversation with an LLM begins with amnesia. The model receives a prompt, generates a response, and forgets everything the moment the connection closes. To build agents that *learn*, *adapt*, and *accumulate expertise* over time, we must engineer memory from scratch — deciding what to remember, where to store it, how to retrieve it, and when to let it decay. This chapter lays out the full memory stack, from the ephemeral working memory of a context window to durable long-term stores backed by vector databases and relational tables"
132.1 — Why agents need memory (the stateless LLM problem)
A bare language model is a stateless function: given tokens in, produce tokens out. There is no hidden notebook, no scratch register that persists between calls. Every inference pass starts from the same blank slate.
This is perfectly fine for single-turn question answering. It is catastrophic for agents, which must:
- Track multi-step plans across tool calls that may span minutes or hours.
- Recall user preferences expressed three conversations ago.
- Avoid repeating mistakes — an agent that retries a failed API call with the exact same malformed payload is wasting tokens and time.
- Accumulate domain knowledge from past interactions so performance improves with use.
Without engineered memory, an agent is a goldfish with a PhD — brilliant in the moment, oblivious to everything that came before.
The core tension is simple: LLMs have fixed-size context windows, but real tasks generate unbounded information over time. Memory systems bridge that gap by selecting, compressing, and surfacing the right information at the right moment.
Stateless LLM: prompt → response → (forgotten)
Agent with memory: prompt + retrieved_context → response → (write to memory) → next turn
Every memory architecture answers three questions:
- What gets stored? (raw text, embeddings, structured facts, tool traces)
- Where does it live? (context window, external DB, file system)
- How is it retrieved? (recency, relevance, importance scoring)
The rest of this chapter builds the machinery to answer all three.
132.2 — Memory taxonomy: working, short-term, long-term, episodic, semantic, procedural
Cognitive science gives us a useful (if imperfect) analogy. Agent memory systems borrow the same categories that describe human memory, adapted for silicon:
| Memory type | Human analogy | Agent equivalent | Persistence |
|---|---|---|---|
| Working memory | Active thought, mental scratchpad | Context window contents | Single inference call |
| Short-term memory | Recent conversation recall | Conversation buffer / chat history | Single session |
| Long-term memory | Learned facts and experiences | External stores (vector DB, SQL) | Across sessions |
| Episodic memory | ”I remember that meeting” | Logs of past agent runs with outcomes | Long-term |
| Semantic memory | ”Paris is in France” | Extracted facts, knowledge graphs | Long-term |
| Procedural memory | ”How to ride a bicycle” | Learned tool-use patterns, cached plans | Long-term |
These categories are not mutually exclusive. A single piece of information — “the user’s production database is PostgreSQL 16 on port 5433” — might live in working memory during the current turn, get stored in semantic long-term memory after the conversation, and inform procedural memory about which database driver to use in future tool calls.
The key insight: an agent’s memory architecture is a caching hierarchy, much like L1/L2/L3 caches in a CPU. Working memory is fast but tiny. Long-term memory is vast but requires explicit retrieval. The art is in the promotion and eviction policies that shuttle information between layers.
132.3 — Working memory: context window as RAM
The context window is the agent’s working memory — the only information the model can reason about during a single forward pass. Everything the agent “knows” at inference time must fit here.
The RAM analogy
| Property | CPU RAM | LLM Context Window |
|---|---|---|
| Capacity | 16–512 GB | 8K–2M tokens |
| Access pattern | Random access O(1) | Full attention O(n²) |
| Persistence | Until power off | Until call ends |
| Overflow strategy | Swap to disk | Truncate or summarize |
Sliding window
The simplest overflow strategy: keep the most recent N tokens and drop everything else.
from dataclasses import dataclass, field
@dataclass
class SlidingWindowMemory:
"""Maintains a fixed-size context window via FIFO eviction."""
max_tokens: int = 8_000
messages: list[dict] = field(default_factory=list)
system_prompt: str = ""
_system_tokens: int = 0
def __post_init__(self):
# Reserve space for system prompt (estimate ~4 chars per token)
self._system_tokens = len(self.system_prompt) // 4
def add(self, role: str, content: str) -> None:
self.messages.append({"role": role, "content": content})
self._evict()
def _estimate_tokens(self) -> int:
return self._system_tokens + sum(
len(m["content"]) // 4 for m in self.messages
)
def _evict(self) -> None:
"""Drop oldest non-system messages until under budget."""
while self._estimate_tokens() > self.max_tokens and len(self.messages) > 1:
self.messages.pop(0)
def to_prompt(self) -> list[dict]:
out = []
if self.system_prompt:
out.append({"role": "system", "content": self.system_prompt})
out.extend(self.messages)
return out
Sliding windows are cheap but brutal: they destroy potentially critical information from early in the conversation.
Summarization-based compression
A smarter approach: when the context grows too large, summarize older messages rather than dropping them.
import json
from typing import Protocol
class LLMClient(Protocol):
def complete(self, messages: list[dict]) -> str: ...
class SummarizingMemory:
"""Compresses older messages into a running summary."""
def __init__(
self,
client: LLMClient,
max_tokens: int = 12_000,
summary_threshold: float = 0.75, # summarize when 75% full
):
self.client = client
self.max_tokens = max_tokens
self.summary_threshold = summary_threshold
self.running_summary: str = ""
self.messages: list[dict] = []
def _token_estimate(self) -> int:
total = len(self.running_summary) // 4
total += sum(len(m["content"]) // 4 for m in self.messages)
return total
def add(self, role: str, content: str) -> None:
self.messages.append({"role": role, "content": content})
if self._token_estimate() > self.max_tokens * self.summary_threshold:
self._compress()
def _compress(self) -> None:
"""Summarize the oldest half of messages into running_summary."""
split = len(self.messages) // 2
to_summarize = self.messages[:split]
self.messages = self.messages[split:]
prompt = [
{"role": "system", "content": (
"Condense the following conversation into a concise summary. "
"Preserve all facts, decisions, user preferences, and action items. "
"Omit pleasantries and redundant exchanges."
)},
{"role": "user", "content": (
f"Previous summary:\n{self.running_summary}\n\n"
f"New messages:\n{json.dumps(to_summarize, indent=2)}"
)},
]
self.running_summary = self.client.complete(prompt)
def to_prompt(self) -> list[dict]:
out = []
if self.running_summary:
out.append({
"role": "system",
"content": f"Conversation summary so far:\n{self.running_summary}",
})
out.extend(self.messages)
return out
Trade-off: summarization costs one extra LLM call and may lose nuance, but preserves far more information than blind truncation.
Hierarchical compression
Production systems often combine both: keep the last K messages verbatim, maintain a summary of messages K+1 through K+M, and discard anything older. This gives the agent both precise recent context and broad historical awareness.
132.4 — Short-term memory: conversation history and token budgets
Short-term memory spans a single session — typically one conversation thread. It holds the full exchange between user and agent, including tool calls, intermediate reasoning, and assistant responses.
Token budget management
Modern agents juggle multiple sources of context. A disciplined approach allocates a token budget across categories:
from dataclasses import dataclass
@dataclass
class TokenBudget:
"""Allocate context window space across memory sources."""
total: int = 128_000
# Allocation percentages
system_prompt_pct: float = 0.05 # 6,400 tokens
retrieved_context_pct: float = 0.20 # 25,600 tokens
conversation_pct: float = 0.55 # 70,400 tokens
tool_results_pct: float = 0.10 # 12,800 tokens
output_reserve_pct: float = 0.10 # 12,800 tokens
@property
def system_prompt_budget(self) -> int:
return int(self.total * self.system_prompt_pct)
@property
def retrieved_context_budget(self) -> int:
return int(self.total * self.retrieved_context_pct)
@property
def conversation_budget(self) -> int:
return int(self.total * self.conversation_pct)
@property
def tool_results_budget(self) -> int:
return int(self.total * self.tool_results_pct)
@property
def output_reserve(self) -> int:
return int(self.total * self.output_reserve_pct)
Truncation vs. summarization decision matrix
| Scenario | Strategy | Rationale |
|---|---|---|
| Casual chat, no commitments | Truncate oldest | Low-value early turns |
| Multi-step task in progress | Summarize | Preserve action plan and partial results |
| Code debugging session | Keep errors verbatim, summarize discussion | Exact error text matters; chit-chat does not |
| Compliance / audit use case | Persist full log externally, summarize for context | Legal record in DB, compressed version for LLM |
Conversation history with priority tagging
Not all messages are equal. A production system tags messages with importance:
from enum import IntEnum
from dataclasses import dataclass, field
from typing import Optional
import time
class Priority(IntEnum):
LOW = 0 # pleasantries, acknowledgments
NORMAL = 1 # regular conversation
HIGH = 2 # user preferences, decisions
CRITICAL = 3 # error traces, commitments, action items
@dataclass
class MemoryEntry:
role: str
content: str
priority: Priority = Priority.NORMAL
timestamp: float = field(default_factory=time.time)
token_estimate: int = 0
metadata: Optional[dict] = None
def __post_init__(self):
if not self.token_estimate:
self.token_estimate = len(self.content) // 4
class PriorityConversationMemory:
"""Evicts low-priority messages first when over budget."""
def __init__(self, token_budget: int = 70_000):
self.budget = token_budget
self.entries: list[MemoryEntry] = []
def add(self, entry: MemoryEntry) -> None:
self.entries.append(entry)
self._enforce_budget()
def _enforce_budget(self) -> None:
total = sum(e.token_estimate for e in self.entries)
if total <= self.budget:
return
# Sort candidates for eviction: lowest priority first, then oldest
candidates = sorted(
range(len(self.entries)),
key=lambda i: (self.entries[i].priority, self.entries[i].timestamp),
)
to_remove = set()
freed = 0
for idx in candidates:
if total - freed <= self.budget:
break
freed += self.entries[idx].token_estimate
to_remove.add(idx)
self.entries = [
e for i, e in enumerate(self.entries) if i not in to_remove
]
The priority-aware eviction ensures that user preferences and error traces survive long after polite greetings have been discarded.
132.5 — Long-term memory: vector stores, structured DBs, and the retrieval pattern
When information must survive beyond a single session, it moves to long-term memory — external storage systems the agent queries at runtime.
Vector store pattern
The dominant approach: embed text into dense vectors, store them in a vector database, and retrieve by semantic similarity at query time.
from dataclasses import dataclass, field
from typing import Optional
import hashlib
import time
import numpy as np
@dataclass
class MemoryRecord:
text: str
embedding: Optional[np.ndarray] = None
metadata: dict = field(default_factory=dict)
record_id: str = ""
created_at: float = field(default_factory=time.time)
access_count: int = 0
last_accessed: float = 0.0
def __post_init__(self):
if not self.record_id:
self.record_id = hashlib.sha256(
self.text.encode()
).hexdigest()[:16]
class VectorMemoryStore:
"""Simple in-memory vector store for illustration.
Production: replace with pgvector, Qdrant, Pinecone, Weaviate, etc.
"""
def __init__(self, embed_fn):
self.embed_fn = embed_fn # text -> np.ndarray
self.records: dict[str, MemoryRecord] = {}
def add(self, text: str, metadata: Optional[dict] = None) -> str:
embedding = self.embed_fn(text)
record = MemoryRecord(
text=text,
embedding=embedding,
metadata=metadata or {},
)
self.records[record.record_id] = record
return record.record_id
def search(
self,
query: str,
top_k: int = 5,
min_score: float = 0.0,
) -> list[tuple[MemoryRecord, float]]:
query_vec = self.embed_fn(query)
scored = []
for record in self.records.values():
score = float(np.dot(query_vec, record.embedding) / (
np.linalg.norm(query_vec) * np.linalg.norm(record.embedding)
+ 1e-9
))
if score >= min_score:
scored.append((record, score))
record.access_count += 1
record.last_accessed = time.time()
scored.sort(key=lambda x: x[1], reverse=True)
return scored[:top_k]
def delete(self, record_id: str) -> bool:
return self.records.pop(record_id, None) is not None
Structured DB for facts
Not everything should be embedded. Structured facts — user name, project configuration, API keys references — belong in relational or document stores where they can be queried exactly.
import sqlite3
import json
from typing import Optional
class StructuredMemory:
"""Key-value + relational fact store backed by SQLite."""
def __init__(self, db_path: str = ":memory:"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS facts (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
category TEXT DEFAULT 'general',
confidence REAL DEFAULT 1.0,
source TEXT DEFAULT 'user',
created_at REAL,
updated_at REAL
)
""")
self.conn.commit()
def upsert(
self,
key: str,
value: str,
category: str = "general",
confidence: float = 1.0,
source: str = "user",
) -> None:
import time
now = time.time()
self.conn.execute("""
INSERT INTO facts (key, value, category, confidence, source, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value=excluded.value,
confidence=excluded.confidence,
updated_at=excluded.updated_at
""", (key, value, category, confidence, source, now, now))
self.conn.commit()
def get(self, key: str) -> Optional[str]:
row = self.conn.execute(
"SELECT value FROM facts WHERE key = ?", (key,)
).fetchone()
return row[0] if row else None
def search_by_category(self, category: str) -> list[dict]:
rows = self.conn.execute(
"SELECT key, value, confidence FROM facts WHERE category = ? ORDER BY confidence DESC",
(category,),
).fetchall()
return [{"key": r[0], "value": r[1], "confidence": r[2]} for r in rows]
The retrieval pattern
The standard flow for incorporating long-term memory into agent responses:
- Extract query — derive a search query from the current user message or agent plan.
- Retrieve — query vector store and/or structured store.
- Rank and filter — score results by relevance, recency, and importance; discard low-quality hits.
- Inject — insert retrieved context into the prompt, typically as a system message or prefixed block.
- Generate — call the LLM with augmented context.
This is the same RAG (Retrieval-Augmented Generation) pattern from Chapter 119, but applied to the agent’s own memory rather than a static document corpus.
132.6 — Episodic memory: recording past runs, learning from success and failure
Episodic memory records what happened — full traces of past agent executions, including the task, the plan, the tool calls made, the results obtained, and whether the overall outcome was successful.
Why it matters
An agent that can recall “the last time I tried to deploy to staging, the health check failed because the migration hadn’t run” can proactively run migrations before deploying next time. Without episodic memory, it will rediscover this lesson every time.
Episode schema
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import time
import uuid
import json
class Outcome(str, Enum):
SUCCESS = "success"
FAILURE = "failure"
PARTIAL = "partial"
ABANDONED = "abandoned"
@dataclass
class ToolCall:
tool_name: str
arguments: dict
result: str
duration_ms: float
success: bool
@dataclass
class Episode:
"""A complete record of one agent task execution."""
episode_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
task_description: str = ""
plan: list[str] = field(default_factory=list)
tool_calls: list[ToolCall] = field(default_factory=list)
outcome: Outcome = Outcome.ABANDONED
outcome_reason: str = ""
lessons_learned: list[str] = field(default_factory=list)
started_at: float = field(default_factory=time.time)
completed_at: Optional[float] = None
total_tokens_used: int = 0
metadata: dict = field(default_factory=dict)
def to_summary(self) -> str:
"""Generate a compact summary for retrieval."""
tool_names = list({tc.tool_name for tc in self.tool_calls})
success_rate = (
sum(1 for tc in self.tool_calls if tc.success)
/ max(len(self.tool_calls), 1)
)
return (
f"Task: {self.task_description}\n"
f"Outcome: {self.outcome.value} — {self.outcome_reason}\n"
f"Tools used: {', '.join(tool_names)}\n"
f"Tool success rate: {success_rate:.0%}\n"
f"Lessons: {'; '.join(self.lessons_learned)}"
)
class EpisodicMemory:
"""Stores and retrieves past agent episodes."""
def __init__(self, vector_store: 'VectorMemoryStore'):
self.vector_store = vector_store
self.episodes: dict[str, Episode] = {}
def record(self, episode: Episode) -> str:
episode.completed_at = time.time()
self.episodes[episode.episode_id] = episode
# Store searchable summary in vector store
self.vector_store.add(
text=episode.to_summary(),
metadata={
"type": "episode",
"episode_id": episode.episode_id,
"outcome": episode.outcome.value,
},
)
return episode.episode_id
def recall_similar(
self, task_description: str, top_k: int = 3
) -> list[Episode]:
"""Find episodes similar to a new task."""
results = self.vector_store.search(task_description, top_k=top_k)
episode_ids = [
r.metadata["episode_id"]
for r, score in results
if r.metadata.get("type") == "episode"
]
return [self.episodes[eid] for eid in episode_ids if eid in self.episodes]
def get_lessons_for_task(self, task_description: str) -> list[str]:
"""Extract lessons learned from similar past tasks."""
episodes = self.recall_similar(task_description)
lessons = []
for ep in episodes:
for lesson in ep.lessons_learned:
lessons.append(f"[{ep.outcome.value}] {lesson}")
return lessons
Extracting lessons automatically
After each agent run, a reflection step asks the LLM to extract lessons:
REFLECTION_PROMPT = """You just completed a task. Analyze the execution trace and extract 1-3
concise lessons learned. Focus on:
- What went wrong and how it was fixed
- What worked well and should be repeated
- Surprising findings about the environment or tools
Task: {task}
Tool calls and results: {trace}
Outcome: {outcome}
Return a JSON array of lesson strings."""
These lessons become the agent’s institutional knowledge — the accumulated wisdom of past runs.
132.7 — Procedural memory: learned tool-use patterns and cached plans
Procedural memory captures how to do things — recurring patterns of tool use, successful multi-step plans, and effective prompt templates.
Tool-use patterns
from dataclasses import dataclass, field
from collections import defaultdict
import time
@dataclass
class ToolPattern:
"""A learned pattern for using a specific tool effectively."""
tool_name: str
typical_args: dict = field(default_factory=dict)
common_sequences: list[list[str]] = field(default_factory=list)
failure_modes: list[str] = field(default_factory=list)
success_tips: list[str] = field(default_factory=list)
avg_duration_ms: float = 0.0
call_count: int = 0
success_rate: float = 1.0
class ProceduralMemory:
"""Learns and recalls effective tool-use patterns and plans."""
def __init__(self):
self.tool_patterns: dict[str, ToolPattern] = {}
self.plan_cache: dict[str, list[str]] = {} # task_type -> plan steps
self.sequence_counts: dict[tuple, int] = defaultdict(int)
def update_from_episode(self, episode: 'Episode') -> None:
"""Extract procedural knowledge from a completed episode."""
# Update tool patterns
for tc in episode.tool_calls:
if tc.tool_name not in self.tool_patterns:
self.tool_patterns[tc.tool_name] = ToolPattern(
tool_name=tc.tool_name
)
pattern = self.tool_patterns[tc.tool_name]
pattern.call_count += 1
# Exponential moving average for duration
alpha = 0.1
pattern.avg_duration_ms = (
alpha * tc.duration_ms + (1 - alpha) * pattern.avg_duration_ms
)
# Update success rate
pattern.success_rate = (
alpha * float(tc.success) + (1 - alpha) * pattern.success_rate
)
# Track tool sequences (bigrams)
tool_seq = [tc.tool_name for tc in episode.tool_calls]
for i in range(len(tool_seq) - 1):
bigram = (tool_seq[i], tool_seq[i + 1])
self.sequence_counts[bigram] += 1
# Cache successful plans
if episode.outcome == Outcome.SUCCESS and episode.plan:
task_type = self._classify_task(episode.task_description)
self.plan_cache[task_type] = episode.plan
def suggest_plan(self, task_description: str) -> list[str]:
"""Suggest a plan based on similar past tasks."""
task_type = self._classify_task(task_description)
return self.plan_cache.get(task_type, [])
def get_tool_tips(self, tool_name: str) -> dict:
"""Return learned tips for a specific tool."""
pattern = self.tool_patterns.get(tool_name)
if not pattern:
return {}
return {
"success_rate": f"{pattern.success_rate:.0%}",
"avg_duration_ms": pattern.avg_duration_ms,
"failure_modes": pattern.failure_modes,
"success_tips": pattern.success_tips,
}
def likely_next_tools(self, current_tool: str, top_k: int = 3) -> list[str]:
"""Predict which tool is likely needed next."""
candidates = {}
for (a, b), count in self.sequence_counts.items():
if a == current_tool:
candidates[b] = count
ranked = sorted(candidates, key=candidates.get, reverse=True)
return ranked[:top_k]
def _classify_task(self, description: str) -> str:
"""Simple keyword-based task classification."""
description = description.lower()
for keyword in ["deploy", "debug", "test", "refactor", "analyze", "write", "search"]:
if keyword in description:
return keyword
return "general"
Procedural memory lets an agent get faster and more reliable over time — it stops exploring dead ends and converges on patterns that work.
132.8 — Memory architecture for production agents (complete stack)
A production agent memory system composes all the layers above into a unified architecture. Here is the full stack:
The Memory Router
The memory router is the central coordinator. On each agent turn, it:
- Determines whether the turn requires reading from memory (most turns do).
- Determines whether the turn produced information worth writing to memory.
- Dispatches to the appropriate read/write pipelines.
from dataclasses import dataclass
from typing import Optional
@dataclass
class MemoryConfig:
enable_episodic: bool = True
enable_semantic: bool = True
enable_procedural: bool = True
write_threshold: float = 0.5 # min importance to write
max_retrieved_items: int = 10
retrieval_min_score: float = 0.3
class MemoryRouter:
"""Orchestrates reads and writes across all memory subsystems."""
def __init__(
self,
config: MemoryConfig,
vector_store: VectorMemoryStore,
structured_store: StructuredMemory,
episodic_store: EpisodicMemory,
procedural_store: ProceduralMemory,
conversation: PriorityConversationMemory,
):
self.config = config
self.vector = vector_store
self.structured = structured_store
self.episodic = episodic_store
self.procedural = procedural_store
self.conversation = conversation
def read(self, query: str, task_type: Optional[str] = None) -> dict:
"""Retrieve relevant memories from all stores."""
context = {"semantic": [], "facts": [], "episodes": [], "plan_hint": []}
# Semantic retrieval from vector store
if self.config.enable_semantic:
results = self.vector.search(
query,
top_k=self.config.max_retrieved_items,
min_score=self.config.retrieval_min_score,
)
context["semantic"] = [
{"text": r.text, "score": s} for r, s in results
if r.metadata.get("type") != "episode"
]
# Structured facts
context["facts"] = self.structured.search_by_category("user_preference")
# Episodic recall
if self.config.enable_episodic:
lessons = self.episodic.get_lessons_for_task(query)
context["episodes"] = lessons
# Procedural hints
if self.config.enable_procedural and task_type:
plan = self.procedural.suggest_plan(task_type)
if plan:
context["plan_hint"] = plan
return context
def write(self, content: str, importance: float, metadata: dict) -> None:
"""Persist information if it meets the importance threshold."""
if importance < self.config.write_threshold:
return
# Always write to vector store for semantic search
self.vector.add(content, metadata=metadata)
# Extract and store structured facts if flagged
if metadata.get("has_facts"):
for fact in metadata.get("facts", []):
self.structured.upsert(
key=fact["key"],
value=fact["value"],
category=fact.get("category", "general"),
)
132.9 — Write path: what to memorize and when
Not everything deserves to be remembered. The write path must be selective — storing too much creates noise that degrades retrieval quality. Storing too little means the agent forgets important context.
What to memorize
| Category | Examples | Storage target |
|---|---|---|
| User preferences | ”I prefer TypeScript over JavaScript” | Structured DB |
| Decisions made | ”We chose PostgreSQL for the user service” | Vector + Structured |
| Error patterns | ”Deploy failed due to missing env var DATABASE_URL” | Episodic |
| Successful strategies | ”Running lint before tests catches issues earlier” | Procedural |
| Factual corrections | ”The API endpoint changed to /v2/users” | Structured DB |
| Task outcomes | Full episode trace | Episode store |
What NOT to memorize
- Transient chit-chat and pleasantries
- Information the agent can always re-derive from tools (e.g., current file contents)
- Highly volatile state that changes every minute (e.g., real-time metrics — query live instead)
- Sensitive data unless explicitly designed for it (PII, credentials)
Importance scoring
class ImportanceScorer:
"""Score how important a piece of information is for long-term storage."""
# Keyword signals for high importance
HIGH_SIGNALS = {
"always", "never", "prefer", "require", "important", "remember",
"don't forget", "key constraint", "must", "critical",
}
MEDIUM_SIGNALS = {
"decided", "agreed", "chose", "learned", "found that", "turns out",
"note that", "discovered", "configured",
}
def score(self, text: str, role: str = "user") -> float:
"""Return importance score in [0, 1]."""
text_lower = text.lower()
score = 0.3 # baseline
# User statements are generally more important than assistant
if role == "user":
score += 0.1
# Check signal words
for signal in self.HIGH_SIGNALS:
if signal in text_lower:
score += 0.15
break
for signal in self.MEDIUM_SIGNALS:
if signal in text_lower:
score += 0.1
break
# Longer, more substantive content scores slightly higher
word_count = len(text.split())
if word_count > 20:
score += 0.05
if word_count > 50:
score += 0.05
# Presence of code or structured data
if "```" in text or "{" in text:
score += 0.05
return min(score, 1.0)
Write timing
| Strategy | When to write | Trade-offs |
|---|---|---|
| Synchronous | Immediately after each turn | Adds latency to every response |
| End-of-turn batch | After assistant responds | Slight delay, no user-facing latency |
| End-of-session | When conversation ends | Risk of data loss on crash; lower overhead |
| Async background | Via queue/worker | Best latency; requires infrastructure |
Production systems typically use end-of-turn batch writes with an async fallback for expensive operations like embedding computation.
132.10 — Read path: retrieval strategies, relevance scoring, and freshness decay
The read path determines what the agent remembers at inference time. A poor read path means the agent either retrieves irrelevant noise or misses critical context.
Multi-strategy retrieval
A single retrieval method is rarely sufficient. Production agents combine:
- Semantic search — vector similarity over embedded memories.
- Recency search — most recent N items, regardless of content.
- Keyword/structured search — exact matches on known entities or categories.
- Graph traversal — follow relationships in a knowledge graph (if present).
from dataclasses import dataclass
import time
import math
@dataclass
class ScoredMemory:
text: str
relevance: float # semantic similarity
recency: float # time decay factor
importance: float # stored importance
access_count: int
final_score: float = 0.0
class RetrievalRanker:
"""Combine multiple signals into a single retrieval score."""
def __init__(
self,
relevance_weight: float = 0.5,
recency_weight: float = 0.25,
importance_weight: float = 0.2,
frequency_weight: float = 0.05,
decay_half_life_hours: float = 168.0, # 1 week
):
self.w_rel = relevance_weight
self.w_rec = recency_weight
self.w_imp = importance_weight
self.w_freq = frequency_weight
self.half_life = decay_half_life_hours * 3600 # convert to seconds
def freshness_decay(self, created_at: float) -> float:
"""Exponential decay based on age."""
age_seconds = time.time() - created_at
return math.exp(-0.693 * age_seconds / self.half_life) # ln(2) ≈ 0.693
def score(self, memory: ScoredMemory) -> float:
"""Compute composite retrieval score."""
# Normalize access count with log dampening
freq_score = math.log1p(memory.access_count) / 10.0
freq_score = min(freq_score, 1.0)
memory.final_score = (
self.w_rel * memory.relevance
+ self.w_rec * memory.recency
+ self.w_imp * memory.importance
+ self.w_freq * freq_score
)
return memory.final_score
def rank(self, memories: list[ScoredMemory], top_k: int = 10) -> list[ScoredMemory]:
for m in memories:
self.score(m)
memories.sort(key=lambda m: m.final_score, reverse=True)
return memories[:top_k]
Freshness decay
Freshness decay ensures that old memories gradually lose influence unless they are frequently accessed or marked as permanently important. The exponential decay formula:
freshness(t) = exp(-ln(2) * age / half_life)
With a half-life of one week, a memory from:
- 1 hour ago has freshness 0.996
- 1 day ago has freshness 0.905
- 1 week ago has freshness 0.500
- 1 month ago has freshness 0.063
This prevents stale memories from crowding out fresh, relevant context.
Reranking with a cross-encoder
For high-stakes retrieval, a cross-encoder reranker scores each (query, candidate) pair jointly, improving precision over embedding-only similarity:
class CrossEncoderReranker:
"""Rerank candidates using a cross-encoder model."""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
candidates: list[str],
top_k: int = 5,
) -> list[tuple[str, float]]:
pairs = [(query, c) for c in candidates]
scores = self.model.predict(pairs)
ranked = sorted(
zip(candidates, scores), key=lambda x: x[1], reverse=True
)
return ranked[:top_k]
Context assembly
After retrieval and ranking, the context assembler packs retrieved memories into the prompt within the token budget:
def assemble_context(
retrieved: list[ScoredMemory],
token_budget: int,
header: str = "Relevant context from memory:",
) -> str:
"""Pack retrieved memories into a context block within budget."""
lines = [header, ""]
used_tokens = len(header) // 4
for mem in retrieved:
entry = f"- [{mem.final_score:.2f}] {mem.text}"
entry_tokens = len(entry) // 4
if used_tokens + entry_tokens > token_budget:
break
lines.append(entry)
used_tokens += entry_tokens
return "\n".join(lines)
132.11 — Implementation: mem0, LangGraph persistence, and custom PostgreSQL-backed memory
Let us look at three practical approaches, ranging from off-the-shelf to fully custom.
Approach 1: mem0 — managed memory layer
mem0 (formerly Mem0) provides a drop-in memory layer for LLM applications. It handles embedding, storage, and retrieval behind a simple API.
from mem0 import Memory
# Initialize with configuration
config = {
"vector_store": {
"provider": "qdrant",
"config": {
"host": "localhost",
"port": 6333,
"collection_name": "agent_memory",
},
},
"embedder": {
"provider": "openai",
"config": {"model": "text-embedding-3-small"},
},
"llm": {
"provider": "anthropic",
"config": {"model": "claude-sonnet-4-20250514"},
},
}
memory = Memory.from_config(config)
# Add memories scoped to a user
memory.add(
"I prefer dark mode and vim keybindings in all editors.",
user_id="user_42",
metadata={"category": "preferences"},
)
memory.add(
"Production database is PostgreSQL 16 on db.prod.internal:5432.",
user_id="user_42",
metadata={"category": "infrastructure"},
)
# Retrieve relevant memories
results = memory.search(
"What database should I connect to?",
user_id="user_42",
limit=5,
)
for result in results:
print(f"[{result['score']:.3f}] {result['memory']}")
# Memory is automatically deduplicated and updated
memory.add(
"Production database migrated to PostgreSQL 17 on db2.prod.internal:5432.",
user_id="user_42",
metadata={"category": "infrastructure"},
)
# mem0 detects the conflict and updates the existing memory
Strengths: minimal setup, automatic deduplication, built-in LLM-based extraction.
Limitations: less control over scoring and retrieval logic; dependency on external service.
Approach 2: LangGraph persistence with checkpointing
LangGraph provides built-in persistence via checkpointers, giving agents durable state across runs.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list[dict], operator.add]
memories: list[str]
current_plan: list[str]
facts: dict[str, str]
def retrieve_memories(state: AgentState) -> dict:
"""Node: retrieve relevant memories based on latest message."""
latest = state["messages"][-1]["content"]
# (Use your vector store here)
retrieved = vector_store.search(latest, top_k=5)
return {
"memories": [r.text for r, _ in retrieved],
}
def generate_response(state: AgentState) -> dict:
"""Node: generate response with memory-augmented context."""
memory_context = "\n".join(f"- {m}" for m in state["memories"])
messages = [
{"role": "system", "content": f"Relevant memories:\n{memory_context}"},
*state["messages"],
]
response = llm.complete(messages)
return {"messages": [{"role": "assistant", "content": response}]}
def persist_memories(state: AgentState) -> dict:
"""Node: extract and store new memories from the conversation."""
latest_exchange = state["messages"][-2:] # user + assistant
scorer = ImportanceScorer()
for msg in latest_exchange:
importance = scorer.score(msg["content"], msg["role"])
if importance > 0.5:
vector_store.add(msg["content"], metadata={"role": msg["role"]})
return {}
# Build the graph
builder = StateGraph(AgentState)
builder.add_node("retrieve", retrieve_memories)
builder.add_node("generate", generate_response)
builder.add_node("persist", persist_memories)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", "persist")
builder.add_edge("persist", END)
# Compile with PostgreSQL checkpointer for durable state
checkpointer = PostgresSaver.from_conn_string(
"postgresql://agent:secret@localhost:5432/agent_memory"
)
graph = builder.compile(checkpointer=checkpointer)
# Run — state is automatically persisted between invocations
config = {"configurable": {"thread_id": "user_42_session_7"}}
result = graph.invoke(
{"messages": [{"role": "user", "content": "Deploy the user service to staging"}]},
config=config,
)
Approach 3: custom PostgreSQL-backed memory
For full control, build a custom memory system on PostgreSQL with pgvector:
import asyncpg
import numpy as np
import json
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class PgMemoryConfig:
dsn: str = "postgresql://agent:secret@localhost:5432/agent_memory"
embedding_dim: int = 1536
table_name: str = "memories"
class PostgresMemoryStore:
"""Production-grade memory store using PostgreSQL + pgvector."""
def __init__(self, config: PgMemoryConfig, embed_fn):
self.config = config
self.embed_fn = embed_fn
self.pool: Optional[asyncpg.Pool] = None
async def initialize(self) -> None:
"""Create connection pool and ensure schema exists."""
self.pool = await asyncpg.create_pool(self.config.dsn, min_size=2, max_size=10)
async with self.pool.acquire() as conn:
await conn.execute("CREATE EXTENSION IF NOT EXISTS vector;")
await conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self.config.table_name} (
id BIGSERIAL PRIMARY KEY,
content TEXT NOT NULL,
embedding vector({self.config.embedding_dim}),
user_id TEXT NOT NULL,
category TEXT DEFAULT 'general',
importance REAL DEFAULT 0.5,
metadata JSONB DEFAULT '{{}}'::jsonb,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
access_count INTEGER DEFAULT 0,
last_accessed TIMESTAMPTZ DEFAULT now()
);
""")
# Create indexes
await conn.execute(f"""
CREATE INDEX IF NOT EXISTS idx_{self.config.table_name}_embedding
ON {self.config.table_name}
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
""")
await conn.execute(f"""
CREATE INDEX IF NOT EXISTS idx_{self.config.table_name}_user
ON {self.config.table_name} (user_id, category);
""")
async def add(
self,
content: str,
user_id: str,
category: str = "general",
importance: float = 0.5,
metadata: Optional[dict] = None,
) -> int:
"""Store a new memory."""
embedding = self.embed_fn(content)
embedding_str = "[" + ",".join(str(x) for x in embedding) + "]"
async with self.pool.acquire() as conn:
row = await conn.fetchrow(f"""
INSERT INTO {self.config.table_name}
(content, embedding, user_id, category, importance, metadata)
VALUES ($1, $2::vector, $3, $4, $5, $6::jsonb)
RETURNING id
""", content, embedding_str, user_id, category, importance,
json.dumps(metadata or {}))
return row["id"]
async def search(
self,
query: str,
user_id: str,
top_k: int = 10,
category: Optional[str] = None,
min_importance: float = 0.0,
freshness_half_life_days: float = 7.0,
) -> list[dict]:
"""Retrieve memories ranked by composite score."""
query_embedding = self.embed_fn(query)
embedding_str = "[" + ",".join(str(x) for x in query_embedding) + "]"
category_filter = "AND category = $4" if category else ""
params = [embedding_str, user_id, min_importance]
if category:
params.append(category)
async with self.pool.acquire() as conn:
rows = await conn.fetch(f"""
WITH scored AS (
SELECT
id,
content,
category,
importance,
metadata,
created_at,
access_count,
1 - (embedding <=> $1::vector) AS similarity,
EXP(
-0.693 * EXTRACT(EPOCH FROM (now() - created_at))
/ ({freshness_half_life_days} * 86400)
) AS freshness
FROM {self.config.table_name}
WHERE user_id = $2
AND importance >= $3
{category_filter}
ORDER BY embedding <=> $1::vector
LIMIT {top_k * 3}
)
SELECT *,
(0.50 * similarity
+ 0.25 * freshness
+ 0.20 * importance
+ 0.05 * LEAST(LN(1 + access_count) / 10.0, 1.0)
) AS final_score
FROM scored
ORDER BY final_score DESC
LIMIT {top_k}
""", *params)
# Update access counts
ids = [r["id"] for r in rows]
if ids:
await conn.execute(f"""
UPDATE {self.config.table_name}
SET access_count = access_count + 1,
last_accessed = now()
WHERE id = ANY($1::bigint[])
""", ids)
return [dict(r) for r in rows]
async def forget(
self,
user_id: str,
older_than_days: int = 90,
min_access_count: int = 0,
max_importance: float = 0.3,
) -> int:
"""Garbage-collect low-value old memories."""
async with self.pool.acquire() as conn:
result = await conn.execute(f"""
DELETE FROM {self.config.table_name}
WHERE user_id = $1
AND created_at < now() - INTERVAL '{older_than_days} days'
AND access_count <= $2
AND importance <= $3
""", user_id, min_access_count, max_importance)
return int(result.split()[-1]) # "DELETE N"
async def close(self) -> None:
if self.pool:
await self.pool.close()
Key design decisions in this implementation:
- IVFFlat index on the embedding column for fast approximate nearest neighbor search.
- Composite scoring in SQL — the database does the heavy lifting, not the application.
- Access tracking — every retrieval increments the access count, preventing useful memories from being garbage collected.
- Garbage collection via the
forgetmethod — old, low-importance, rarely-accessed memories are pruned automatically.
Choosing your approach
| Criterion | mem0 | LangGraph | Custom PostgreSQL |
|---|---|---|---|
| Setup time | Minutes | Hours | Days |
| Control over scoring | Low | Medium | Full |
| Operational complexity | Low (managed) | Medium | High |
| Customizability | Limited | Good | Unlimited |
| Multi-agent support | Built-in | Via thread IDs | Custom |
| Cost at scale | Vendor pricing | Infrastructure | Infrastructure |
132.12 — Mental model
Eight takeaway points for agent memory systems:
-
Memory is not optional. Any agent that runs more than one turn or serves a user more than once needs engineered memory. The LLM itself remembers nothing.
-
Think in cache layers. Working memory (context window) is L1 cache — fast, small, volatile. Short-term memory (conversation history) is L2. Long-term stores (vector DB, SQL) are main memory. Design promotion and eviction policies deliberately.
-
Not everything should be remembered. The write path needs an importance filter. Storing everything creates retrieval noise that degrades agent quality more than storing nothing.
-
Retrieval quality determines memory quality. A vast memory store is worthless if the agent retrieves the wrong items. Invest in ranking: combine semantic similarity, recency decay, importance scores, and frequency signals.
-
Freshness decay prevents staleness. Exponential decay with a configurable half-life ensures old memories fade gracefully unless they are reinforced by repeated access or high importance.
-
Separate structure from semantics. Use vector stores for fuzzy semantic recall and relational databases for precise factual lookup. Most production systems need both.
-
Episodic memory creates learning loops. Recording full execution traces and extracting lessons lets agents improve over time. This is the closest thing to “experience” a stateless LLM can have.
-
Procedural memory accelerates execution. Caching successful tool-use patterns and plans means the agent converges on effective strategies rather than re-exploring from scratch every time.
Read it yourself
- “Generative Agents: Interactive Simulacra of Human Behavior” (Park et al., 2023) — the Stanford paper that introduced memory streams, reflection, and planning for LLM agents, sparking the modern agent memory movement.
- “MemGPT: Towards LLMs as Operating Systems” (Packer et al., 2023) — reframes context management as virtual memory with explicit paging between main context and external storage.
- mem0 documentation (https://docs.mem0.ai) — practical reference for the managed memory layer.
- LangGraph persistence guide (https://langchain-ai.github.io/langgraph/) — checkpointing, state management, and cross-session memory in graph-based agents.
- pgvector documentation (https://github.com/pgvector/pgvector) — vector similarity search in PostgreSQL, the workhorse of many custom memory backends.
- “Cognitive Architectures for Language Agents” (Sumers et al., 2024) — a systematic framework mapping cognitive science memory models to LLM agent designs.
Practice
-
Implement a sliding window memory with a maximum of 4,000 tokens. Feed it a 20-turn conversation and verify that only the most recent messages survive. Confirm the system prompt is always preserved.
-
Build a summarization-based memory using the
SummarizingMemoryclass. Compare the information retained versus sliding-window truncation on a 50-turn debugging session. Which approach preserves the root cause diagnosis? -
Create a priority-aware conversation memory with
PriorityConversationMemory. Tag messages with priorities manually and verify that HIGH-priority messages survive eviction while LOW-priority messages are dropped first. -
Stand up a vector memory store using the
VectorMemoryStoreclass with a local sentence-transformer embedding model. Add 100 memories from different categories and measure retrieval precision at top-5 for 10 test queries. -
Implement the full
RetrievalRankerwith freshness decay. Create memories at different timestamps (mocktime.time()) and verify that the composite scoring correctly balances relevance, recency, importance, and frequency. -
Build an episodic memory system that records agent runs and extracts lessons. Simulate five agent episodes (three successes, two failures), then query for lessons relevant to a new similar task. Verify that failure lessons appear and are actionable.
-
Stretch: Deploy the full
PostgresMemoryStoreon a real PostgreSQL instance with pgvector. Load 10,000 synthetic memories, benchmark search latency at p50/p95/p99, tune the IVFFlatlistsparameter, and implement theforgetgarbage collector with a 30-day retention policy. Measure how retrieval quality changes as the store grows from 1K to 10K memories.