Part XI · Building Agents and Agent Infrastructure
Chapter 132 ~32 min read

Agent memory systems: context, retrieval, and persistence

"Every conversation with an LLM begins with amnesia. The model receives a prompt, generates a response, and forgets everything the moment the connection closes. To build agents that *learn*, *adapt*, and *accumulate expertise* over time, we must engineer memory from scratch — deciding what to remember, where to store it, how to retrieve it, and when to let it decay. This chapter lays out the full memory stack, from the ephemeral working memory of a context window to durable long-term stores backed by vector databases and relational tables"

132.1 — Why agents need memory (the stateless LLM problem)

A bare language model is a stateless function: given tokens in, produce tokens out. There is no hidden notebook, no scratch register that persists between calls. Every inference pass starts from the same blank slate.

This is perfectly fine for single-turn question answering. It is catastrophic for agents, which must:

  • Track multi-step plans across tool calls that may span minutes or hours.
  • Recall user preferences expressed three conversations ago.
  • Avoid repeating mistakes — an agent that retries a failed API call with the exact same malformed payload is wasting tokens and time.
  • Accumulate domain knowledge from past interactions so performance improves with use.

Without engineered memory, an agent is a goldfish with a PhD — brilliant in the moment, oblivious to everything that came before.

The core tension is simple: LLMs have fixed-size context windows, but real tasks generate unbounded information over time. Memory systems bridge that gap by selecting, compressing, and surfacing the right information at the right moment.

Stateless LLM:  prompt → response → (forgotten)
Agent with memory:  prompt + retrieved_context → response → (write to memory) → next turn

Every memory architecture answers three questions:

  1. What gets stored? (raw text, embeddings, structured facts, tool traces)
  2. Where does it live? (context window, external DB, file system)
  3. How is it retrieved? (recency, relevance, importance scoring)

The rest of this chapter builds the machinery to answer all three.


132.2 — Memory taxonomy: working, short-term, long-term, episodic, semantic, procedural

Cognitive science gives us a useful (if imperfect) analogy. Agent memory systems borrow the same categories that describe human memory, adapted for silicon:

Memory typeHuman analogyAgent equivalentPersistence
Working memoryActive thought, mental scratchpadContext window contentsSingle inference call
Short-term memoryRecent conversation recallConversation buffer / chat historySingle session
Long-term memoryLearned facts and experiencesExternal stores (vector DB, SQL)Across sessions
Episodic memory”I remember that meeting”Logs of past agent runs with outcomesLong-term
Semantic memory”Paris is in France”Extracted facts, knowledge graphsLong-term
Procedural memory”How to ride a bicycle”Learned tool-use patterns, cached plansLong-term

These categories are not mutually exclusive. A single piece of information — “the user’s production database is PostgreSQL 16 on port 5433” — might live in working memory during the current turn, get stored in semantic long-term memory after the conversation, and inform procedural memory about which database driver to use in future tool calls.

Agent Memory Taxonomy Working Memory Context window (single call) Short-Term Memory Conversation history (single session) Episodic Past runs, outcomes, success/failure traces Semantic Facts, entities, knowledge graphs Procedural Tool-use patterns, cached plans Long-Term Memory

Volatility: high (top) → low (bottom) | Capacity: small (top) → unbounded (bottom) Fig 132.1 — Memory types arranged by persistence and capacity

The key insight: an agent’s memory architecture is a caching hierarchy, much like L1/L2/L3 caches in a CPU. Working memory is fast but tiny. Long-term memory is vast but requires explicit retrieval. The art is in the promotion and eviction policies that shuttle information between layers.


132.3 — Working memory: context window as RAM

The context window is the agent’s working memory — the only information the model can reason about during a single forward pass. Everything the agent “knows” at inference time must fit here.

The RAM analogy

PropertyCPU RAMLLM Context Window
Capacity16–512 GB8K–2M tokens
Access patternRandom access O(1)Full attention O(n²)
PersistenceUntil power offUntil call ends
Overflow strategySwap to diskTruncate or summarize

Sliding window

The simplest overflow strategy: keep the most recent N tokens and drop everything else.

from dataclasses import dataclass, field

@dataclass
class SlidingWindowMemory:
    """Maintains a fixed-size context window via FIFO eviction."""
    max_tokens: int = 8_000
    messages: list[dict] = field(default_factory=list)
    system_prompt: str = ""
    _system_tokens: int = 0

    def __post_init__(self):
        # Reserve space for system prompt (estimate ~4 chars per token)
        self._system_tokens = len(self.system_prompt) // 4

    def add(self, role: str, content: str) -> None:
        self.messages.append({"role": role, "content": content})
        self._evict()

    def _estimate_tokens(self) -> int:
        return self._system_tokens + sum(
            len(m["content"]) // 4 for m in self.messages
        )

    def _evict(self) -> None:
        """Drop oldest non-system messages until under budget."""
        while self._estimate_tokens() > self.max_tokens and len(self.messages) > 1:
            self.messages.pop(0)

    def to_prompt(self) -> list[dict]:
        out = []
        if self.system_prompt:
            out.append({"role": "system", "content": self.system_prompt})
        out.extend(self.messages)
        return out

Sliding windows are cheap but brutal: they destroy potentially critical information from early in the conversation.

Summarization-based compression

A smarter approach: when the context grows too large, summarize older messages rather than dropping them.

import json
from typing import Protocol

class LLMClient(Protocol):
    def complete(self, messages: list[dict]) -> str: ...

class SummarizingMemory:
    """Compresses older messages into a running summary."""

    def __init__(
        self,
        client: LLMClient,
        max_tokens: int = 12_000,
        summary_threshold: float = 0.75,  # summarize when 75% full
    ):
        self.client = client
        self.max_tokens = max_tokens
        self.summary_threshold = summary_threshold
        self.running_summary: str = ""
        self.messages: list[dict] = []

    def _token_estimate(self) -> int:
        total = len(self.running_summary) // 4
        total += sum(len(m["content"]) // 4 for m in self.messages)
        return total

    def add(self, role: str, content: str) -> None:
        self.messages.append({"role": role, "content": content})
        if self._token_estimate() > self.max_tokens * self.summary_threshold:
            self._compress()

    def _compress(self) -> None:
        """Summarize the oldest half of messages into running_summary."""
        split = len(self.messages) // 2
        to_summarize = self.messages[:split]
        self.messages = self.messages[split:]

        prompt = [
            {"role": "system", "content": (
                "Condense the following conversation into a concise summary. "
                "Preserve all facts, decisions, user preferences, and action items. "
                "Omit pleasantries and redundant exchanges."
            )},
            {"role": "user", "content": (
                f"Previous summary:\n{self.running_summary}\n\n"
                f"New messages:\n{json.dumps(to_summarize, indent=2)}"
            )},
        ]
        self.running_summary = self.client.complete(prompt)

    def to_prompt(self) -> list[dict]:
        out = []
        if self.running_summary:
            out.append({
                "role": "system",
                "content": f"Conversation summary so far:\n{self.running_summary}",
            })
        out.extend(self.messages)
        return out

Trade-off: summarization costs one extra LLM call and may lose nuance, but preserves far more information than blind truncation.

Hierarchical compression

Production systems often combine both: keep the last K messages verbatim, maintain a summary of messages K+1 through K+M, and discard anything older. This gives the agent both precise recent context and broad historical awareness.


132.4 — Short-term memory: conversation history and token budgets

Short-term memory spans a single session — typically one conversation thread. It holds the full exchange between user and agent, including tool calls, intermediate reasoning, and assistant responses.

Token budget management

Modern agents juggle multiple sources of context. A disciplined approach allocates a token budget across categories:

from dataclasses import dataclass

@dataclass
class TokenBudget:
    """Allocate context window space across memory sources."""
    total: int = 128_000

    # Allocation percentages
    system_prompt_pct: float = 0.05      # 6,400 tokens
    retrieved_context_pct: float = 0.20  # 25,600 tokens
    conversation_pct: float = 0.55       # 70,400 tokens
    tool_results_pct: float = 0.10       # 12,800 tokens
    output_reserve_pct: float = 0.10     # 12,800 tokens

    @property
    def system_prompt_budget(self) -> int:
        return int(self.total * self.system_prompt_pct)

    @property
    def retrieved_context_budget(self) -> int:
        return int(self.total * self.retrieved_context_pct)

    @property
    def conversation_budget(self) -> int:
        return int(self.total * self.conversation_pct)

    @property
    def tool_results_budget(self) -> int:
        return int(self.total * self.tool_results_pct)

    @property
    def output_reserve(self) -> int:
        return int(self.total * self.output_reserve_pct)

Truncation vs. summarization decision matrix

ScenarioStrategyRationale
Casual chat, no commitmentsTruncate oldestLow-value early turns
Multi-step task in progressSummarizePreserve action plan and partial results
Code debugging sessionKeep errors verbatim, summarize discussionExact error text matters; chit-chat does not
Compliance / audit use casePersist full log externally, summarize for contextLegal record in DB, compressed version for LLM

Conversation history with priority tagging

Not all messages are equal. A production system tags messages with importance:

from enum import IntEnum
from dataclasses import dataclass, field
from typing import Optional
import time

class Priority(IntEnum):
    LOW = 0       # pleasantries, acknowledgments
    NORMAL = 1    # regular conversation
    HIGH = 2      # user preferences, decisions
    CRITICAL = 3  # error traces, commitments, action items

@dataclass
class MemoryEntry:
    role: str
    content: str
    priority: Priority = Priority.NORMAL
    timestamp: float = field(default_factory=time.time)
    token_estimate: int = 0
    metadata: Optional[dict] = None

    def __post_init__(self):
        if not self.token_estimate:
            self.token_estimate = len(self.content) // 4

class PriorityConversationMemory:
    """Evicts low-priority messages first when over budget."""

    def __init__(self, token_budget: int = 70_000):
        self.budget = token_budget
        self.entries: list[MemoryEntry] = []

    def add(self, entry: MemoryEntry) -> None:
        self.entries.append(entry)
        self._enforce_budget()

    def _enforce_budget(self) -> None:
        total = sum(e.token_estimate for e in self.entries)
        if total <= self.budget:
            return

        # Sort candidates for eviction: lowest priority first, then oldest
        candidates = sorted(
            range(len(self.entries)),
            key=lambda i: (self.entries[i].priority, self.entries[i].timestamp),
        )

        to_remove = set()
        freed = 0
        for idx in candidates:
            if total - freed <= self.budget:
                break
            freed += self.entries[idx].token_estimate
            to_remove.add(idx)

        self.entries = [
            e for i, e in enumerate(self.entries) if i not in to_remove
        ]

The priority-aware eviction ensures that user preferences and error traces survive long after polite greetings have been discarded.


132.5 — Long-term memory: vector stores, structured DBs, and the retrieval pattern

When information must survive beyond a single session, it moves to long-term memory — external storage systems the agent queries at runtime.

Vector store pattern

The dominant approach: embed text into dense vectors, store them in a vector database, and retrieve by semantic similarity at query time.

from dataclasses import dataclass, field
from typing import Optional
import hashlib
import time
import numpy as np

@dataclass
class MemoryRecord:
    text: str
    embedding: Optional[np.ndarray] = None
    metadata: dict = field(default_factory=dict)
    record_id: str = ""
    created_at: float = field(default_factory=time.time)
    access_count: int = 0
    last_accessed: float = 0.0

    def __post_init__(self):
        if not self.record_id:
            self.record_id = hashlib.sha256(
                self.text.encode()
            ).hexdigest()[:16]


class VectorMemoryStore:
    """Simple in-memory vector store for illustration.
    
    Production: replace with pgvector, Qdrant, Pinecone, Weaviate, etc.
    """

    def __init__(self, embed_fn):
        self.embed_fn = embed_fn  # text -> np.ndarray
        self.records: dict[str, MemoryRecord] = {}

    def add(self, text: str, metadata: Optional[dict] = None) -> str:
        embedding = self.embed_fn(text)
        record = MemoryRecord(
            text=text,
            embedding=embedding,
            metadata=metadata or {},
        )
        self.records[record.record_id] = record
        return record.record_id

    def search(
        self,
        query: str,
        top_k: int = 5,
        min_score: float = 0.0,
    ) -> list[tuple[MemoryRecord, float]]:
        query_vec = self.embed_fn(query)
        scored = []
        for record in self.records.values():
            score = float(np.dot(query_vec, record.embedding) / (
                np.linalg.norm(query_vec) * np.linalg.norm(record.embedding)
                + 1e-9
            ))
            if score >= min_score:
                scored.append((record, score))
                record.access_count += 1
                record.last_accessed = time.time()

        scored.sort(key=lambda x: x[1], reverse=True)
        return scored[:top_k]

    def delete(self, record_id: str) -> bool:
        return self.records.pop(record_id, None) is not None

Structured DB for facts

Not everything should be embedded. Structured facts — user name, project configuration, API keys references — belong in relational or document stores where they can be queried exactly.

import sqlite3
import json
from typing import Optional

class StructuredMemory:
    """Key-value + relational fact store backed by SQLite."""

    def __init__(self, db_path: str = ":memory:"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS facts (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL,
                category TEXT DEFAULT 'general',
                confidence REAL DEFAULT 1.0,
                source TEXT DEFAULT 'user',
                created_at REAL,
                updated_at REAL
            )
        """)
        self.conn.commit()

    def upsert(
        self,
        key: str,
        value: str,
        category: str = "general",
        confidence: float = 1.0,
        source: str = "user",
    ) -> None:
        import time
        now = time.time()
        self.conn.execute("""
            INSERT INTO facts (key, value, category, confidence, source, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(key) DO UPDATE SET
                value=excluded.value,
                confidence=excluded.confidence,
                updated_at=excluded.updated_at
        """, (key, value, category, confidence, source, now, now))
        self.conn.commit()

    def get(self, key: str) -> Optional[str]:
        row = self.conn.execute(
            "SELECT value FROM facts WHERE key = ?", (key,)
        ).fetchone()
        return row[0] if row else None

    def search_by_category(self, category: str) -> list[dict]:
        rows = self.conn.execute(
            "SELECT key, value, confidence FROM facts WHERE category = ? ORDER BY confidence DESC",
            (category,),
        ).fetchall()
        return [{"key": r[0], "value": r[1], "confidence": r[2]} for r in rows]

The retrieval pattern

The standard flow for incorporating long-term memory into agent responses:

  1. Extract query — derive a search query from the current user message or agent plan.
  2. Retrieve — query vector store and/or structured store.
  3. Rank and filter — score results by relevance, recency, and importance; discard low-quality hits.
  4. Inject — insert retrieved context into the prompt, typically as a system message or prefixed block.
  5. Generate — call the LLM with augmented context.

This is the same RAG (Retrieval-Augmented Generation) pattern from Chapter 119, but applied to the agent’s own memory rather than a static document corpus.


132.6 — Episodic memory: recording past runs, learning from success and failure

Episodic memory records what happened — full traces of past agent executions, including the task, the plan, the tool calls made, the results obtained, and whether the overall outcome was successful.

Why it matters

An agent that can recall “the last time I tried to deploy to staging, the health check failed because the migration hadn’t run” can proactively run migrations before deploying next time. Without episodic memory, it will rediscover this lesson every time.

Episode schema

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import time
import uuid
import json

class Outcome(str, Enum):
    SUCCESS = "success"
    FAILURE = "failure"
    PARTIAL = "partial"
    ABANDONED = "abandoned"

@dataclass
class ToolCall:
    tool_name: str
    arguments: dict
    result: str
    duration_ms: float
    success: bool

@dataclass
class Episode:
    """A complete record of one agent task execution."""
    episode_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
    task_description: str = ""
    plan: list[str] = field(default_factory=list)
    tool_calls: list[ToolCall] = field(default_factory=list)
    outcome: Outcome = Outcome.ABANDONED
    outcome_reason: str = ""
    lessons_learned: list[str] = field(default_factory=list)
    started_at: float = field(default_factory=time.time)
    completed_at: Optional[float] = None
    total_tokens_used: int = 0
    metadata: dict = field(default_factory=dict)

    def to_summary(self) -> str:
        """Generate a compact summary for retrieval."""
        tool_names = list({tc.tool_name for tc in self.tool_calls})
        success_rate = (
            sum(1 for tc in self.tool_calls if tc.success)
            / max(len(self.tool_calls), 1)
        )
        return (
            f"Task: {self.task_description}\n"
            f"Outcome: {self.outcome.value}{self.outcome_reason}\n"
            f"Tools used: {', '.join(tool_names)}\n"
            f"Tool success rate: {success_rate:.0%}\n"
            f"Lessons: {'; '.join(self.lessons_learned)}"
        )


class EpisodicMemory:
    """Stores and retrieves past agent episodes."""

    def __init__(self, vector_store: 'VectorMemoryStore'):
        self.vector_store = vector_store
        self.episodes: dict[str, Episode] = {}

    def record(self, episode: Episode) -> str:
        episode.completed_at = time.time()
        self.episodes[episode.episode_id] = episode

        # Store searchable summary in vector store
        self.vector_store.add(
            text=episode.to_summary(),
            metadata={
                "type": "episode",
                "episode_id": episode.episode_id,
                "outcome": episode.outcome.value,
            },
        )
        return episode.episode_id

    def recall_similar(
        self, task_description: str, top_k: int = 3
    ) -> list[Episode]:
        """Find episodes similar to a new task."""
        results = self.vector_store.search(task_description, top_k=top_k)
        episode_ids = [
            r.metadata["episode_id"]
            for r, score in results
            if r.metadata.get("type") == "episode"
        ]
        return [self.episodes[eid] for eid in episode_ids if eid in self.episodes]

    def get_lessons_for_task(self, task_description: str) -> list[str]:
        """Extract lessons learned from similar past tasks."""
        episodes = self.recall_similar(task_description)
        lessons = []
        for ep in episodes:
            for lesson in ep.lessons_learned:
                lessons.append(f"[{ep.outcome.value}] {lesson}")
        return lessons

Extracting lessons automatically

After each agent run, a reflection step asks the LLM to extract lessons:

REFLECTION_PROMPT = """You just completed a task. Analyze the execution trace and extract 1-3 
concise lessons learned. Focus on:
- What went wrong and how it was fixed
- What worked well and should be repeated
- Surprising findings about the environment or tools

Task: {task}
Tool calls and results: {trace}
Outcome: {outcome}

Return a JSON array of lesson strings."""

These lessons become the agent’s institutional knowledge — the accumulated wisdom of past runs.


132.7 — Procedural memory: learned tool-use patterns and cached plans

Procedural memory captures how to do things — recurring patterns of tool use, successful multi-step plans, and effective prompt templates.

Tool-use patterns

from dataclasses import dataclass, field
from collections import defaultdict
import time

@dataclass
class ToolPattern:
    """A learned pattern for using a specific tool effectively."""
    tool_name: str
    typical_args: dict = field(default_factory=dict)
    common_sequences: list[list[str]] = field(default_factory=list)
    failure_modes: list[str] = field(default_factory=list)
    success_tips: list[str] = field(default_factory=list)
    avg_duration_ms: float = 0.0
    call_count: int = 0
    success_rate: float = 1.0

class ProceduralMemory:
    """Learns and recalls effective tool-use patterns and plans."""

    def __init__(self):
        self.tool_patterns: dict[str, ToolPattern] = {}
        self.plan_cache: dict[str, list[str]] = {}  # task_type -> plan steps
        self.sequence_counts: dict[tuple, int] = defaultdict(int)

    def update_from_episode(self, episode: 'Episode') -> None:
        """Extract procedural knowledge from a completed episode."""
        # Update tool patterns
        for tc in episode.tool_calls:
            if tc.tool_name not in self.tool_patterns:
                self.tool_patterns[tc.tool_name] = ToolPattern(
                    tool_name=tc.tool_name
                )
            pattern = self.tool_patterns[tc.tool_name]
            pattern.call_count += 1
            # Exponential moving average for duration
            alpha = 0.1
            pattern.avg_duration_ms = (
                alpha * tc.duration_ms + (1 - alpha) * pattern.avg_duration_ms
            )
            # Update success rate
            pattern.success_rate = (
                alpha * float(tc.success) + (1 - alpha) * pattern.success_rate
            )

        # Track tool sequences (bigrams)
        tool_seq = [tc.tool_name for tc in episode.tool_calls]
        for i in range(len(tool_seq) - 1):
            bigram = (tool_seq[i], tool_seq[i + 1])
            self.sequence_counts[bigram] += 1

        # Cache successful plans
        if episode.outcome == Outcome.SUCCESS and episode.plan:
            task_type = self._classify_task(episode.task_description)
            self.plan_cache[task_type] = episode.plan

    def suggest_plan(self, task_description: str) -> list[str]:
        """Suggest a plan based on similar past tasks."""
        task_type = self._classify_task(task_description)
        return self.plan_cache.get(task_type, [])

    def get_tool_tips(self, tool_name: str) -> dict:
        """Return learned tips for a specific tool."""
        pattern = self.tool_patterns.get(tool_name)
        if not pattern:
            return {}
        return {
            "success_rate": f"{pattern.success_rate:.0%}",
            "avg_duration_ms": pattern.avg_duration_ms,
            "failure_modes": pattern.failure_modes,
            "success_tips": pattern.success_tips,
        }

    def likely_next_tools(self, current_tool: str, top_k: int = 3) -> list[str]:
        """Predict which tool is likely needed next."""
        candidates = {}
        for (a, b), count in self.sequence_counts.items():
            if a == current_tool:
                candidates[b] = count
        ranked = sorted(candidates, key=candidates.get, reverse=True)
        return ranked[:top_k]

    def _classify_task(self, description: str) -> str:
        """Simple keyword-based task classification."""
        description = description.lower()
        for keyword in ["deploy", "debug", "test", "refactor", "analyze", "write", "search"]:
            if keyword in description:
                return keyword
        return "general"

Procedural memory lets an agent get faster and more reliable over time — it stops exploring dead ends and converges on patterns that work.


132.8 — Memory architecture for production agents (complete stack)

A production agent memory system composes all the layers above into a unified architecture. Here is the full stack:

Production Agent Memory Stack User Input / Task Memory Router

READ PATH

Query Formulator Parallel Retrieval Rank + Rerank + Filter Context Assembler

WRITE PATH

Importance Scorer Fact Extractor Embedder Store Dispatcher LLM (inference) system + retrieved context + conversation + query STORAGE LAYER Vector DB pgvector / Qdrant Relational DB PostgreSQL / SQLite Episode Store JSON / object store KV Cache Redis / DynamoDB Semantic memories Structured facts Episodic + Procedural Session / hot data

Fig 132.2 — Complete production memory stack with read and write paths

The Memory Router

The memory router is the central coordinator. On each agent turn, it:

  1. Determines whether the turn requires reading from memory (most turns do).
  2. Determines whether the turn produced information worth writing to memory.
  3. Dispatches to the appropriate read/write pipelines.
from dataclasses import dataclass
from typing import Optional

@dataclass
class MemoryConfig:
    enable_episodic: bool = True
    enable_semantic: bool = True
    enable_procedural: bool = True
    write_threshold: float = 0.5  # min importance to write
    max_retrieved_items: int = 10
    retrieval_min_score: float = 0.3

class MemoryRouter:
    """Orchestrates reads and writes across all memory subsystems."""

    def __init__(
        self,
        config: MemoryConfig,
        vector_store: VectorMemoryStore,
        structured_store: StructuredMemory,
        episodic_store: EpisodicMemory,
        procedural_store: ProceduralMemory,
        conversation: PriorityConversationMemory,
    ):
        self.config = config
        self.vector = vector_store
        self.structured = structured_store
        self.episodic = episodic_store
        self.procedural = procedural_store
        self.conversation = conversation

    def read(self, query: str, task_type: Optional[str] = None) -> dict:
        """Retrieve relevant memories from all stores."""
        context = {"semantic": [], "facts": [], "episodes": [], "plan_hint": []}

        # Semantic retrieval from vector store
        if self.config.enable_semantic:
            results = self.vector.search(
                query,
                top_k=self.config.max_retrieved_items,
                min_score=self.config.retrieval_min_score,
            )
            context["semantic"] = [
                {"text": r.text, "score": s} for r, s in results
                if r.metadata.get("type") != "episode"
            ]

        # Structured facts
        context["facts"] = self.structured.search_by_category("user_preference")

        # Episodic recall
        if self.config.enable_episodic:
            lessons = self.episodic.get_lessons_for_task(query)
            context["episodes"] = lessons

        # Procedural hints
        if self.config.enable_procedural and task_type:
            plan = self.procedural.suggest_plan(task_type)
            if plan:
                context["plan_hint"] = plan

        return context

    def write(self, content: str, importance: float, metadata: dict) -> None:
        """Persist information if it meets the importance threshold."""
        if importance < self.config.write_threshold:
            return

        # Always write to vector store for semantic search
        self.vector.add(content, metadata=metadata)

        # Extract and store structured facts if flagged
        if metadata.get("has_facts"):
            for fact in metadata.get("facts", []):
                self.structured.upsert(
                    key=fact["key"],
                    value=fact["value"],
                    category=fact.get("category", "general"),
                )

132.9 — Write path: what to memorize and when

Not everything deserves to be remembered. The write path must be selective — storing too much creates noise that degrades retrieval quality. Storing too little means the agent forgets important context.

What to memorize

CategoryExamplesStorage target
User preferences”I prefer TypeScript over JavaScript”Structured DB
Decisions made”We chose PostgreSQL for the user service”Vector + Structured
Error patterns”Deploy failed due to missing env var DATABASE_URL”Episodic
Successful strategies”Running lint before tests catches issues earlier”Procedural
Factual corrections”The API endpoint changed to /v2/users”Structured DB
Task outcomesFull episode traceEpisode store

What NOT to memorize

  • Transient chit-chat and pleasantries
  • Information the agent can always re-derive from tools (e.g., current file contents)
  • Highly volatile state that changes every minute (e.g., real-time metrics — query live instead)
  • Sensitive data unless explicitly designed for it (PII, credentials)

Importance scoring

class ImportanceScorer:
    """Score how important a piece of information is for long-term storage."""

    # Keyword signals for high importance
    HIGH_SIGNALS = {
        "always", "never", "prefer", "require", "important", "remember",
        "don't forget", "key constraint", "must", "critical",
    }
    MEDIUM_SIGNALS = {
        "decided", "agreed", "chose", "learned", "found that", "turns out",
        "note that", "discovered", "configured",
    }

    def score(self, text: str, role: str = "user") -> float:
        """Return importance score in [0, 1]."""
        text_lower = text.lower()
        score = 0.3  # baseline

        # User statements are generally more important than assistant
        if role == "user":
            score += 0.1

        # Check signal words
        for signal in self.HIGH_SIGNALS:
            if signal in text_lower:
                score += 0.15
                break

        for signal in self.MEDIUM_SIGNALS:
            if signal in text_lower:
                score += 0.1
                break

        # Longer, more substantive content scores slightly higher
        word_count = len(text.split())
        if word_count > 20:
            score += 0.05
        if word_count > 50:
            score += 0.05

        # Presence of code or structured data
        if "```" in text or "{" in text:
            score += 0.05

        return min(score, 1.0)

Write timing

StrategyWhen to writeTrade-offs
SynchronousImmediately after each turnAdds latency to every response
End-of-turn batchAfter assistant respondsSlight delay, no user-facing latency
End-of-sessionWhen conversation endsRisk of data loss on crash; lower overhead
Async backgroundVia queue/workerBest latency; requires infrastructure

Production systems typically use end-of-turn batch writes with an async fallback for expensive operations like embedding computation.


132.10 — Read path: retrieval strategies, relevance scoring, and freshness decay

The read path determines what the agent remembers at inference time. A poor read path means the agent either retrieves irrelevant noise or misses critical context.

Multi-strategy retrieval

A single retrieval method is rarely sufficient. Production agents combine:

  1. Semantic search — vector similarity over embedded memories.
  2. Recency search — most recent N items, regardless of content.
  3. Keyword/structured search — exact matches on known entities or categories.
  4. Graph traversal — follow relationships in a knowledge graph (if present).
from dataclasses import dataclass
import time
import math

@dataclass
class ScoredMemory:
    text: str
    relevance: float    # semantic similarity
    recency: float      # time decay factor
    importance: float   # stored importance
    access_count: int
    final_score: float = 0.0


class RetrievalRanker:
    """Combine multiple signals into a single retrieval score."""

    def __init__(
        self,
        relevance_weight: float = 0.5,
        recency_weight: float = 0.25,
        importance_weight: float = 0.2,
        frequency_weight: float = 0.05,
        decay_half_life_hours: float = 168.0,  # 1 week
    ):
        self.w_rel = relevance_weight
        self.w_rec = recency_weight
        self.w_imp = importance_weight
        self.w_freq = frequency_weight
        self.half_life = decay_half_life_hours * 3600  # convert to seconds

    def freshness_decay(self, created_at: float) -> float:
        """Exponential decay based on age."""
        age_seconds = time.time() - created_at
        return math.exp(-0.693 * age_seconds / self.half_life)  # ln(2) ≈ 0.693

    def score(self, memory: ScoredMemory) -> float:
        """Compute composite retrieval score."""
        # Normalize access count with log dampening
        freq_score = math.log1p(memory.access_count) / 10.0
        freq_score = min(freq_score, 1.0)

        memory.final_score = (
            self.w_rel * memory.relevance
            + self.w_rec * memory.recency
            + self.w_imp * memory.importance
            + self.w_freq * freq_score
        )
        return memory.final_score

    def rank(self, memories: list[ScoredMemory], top_k: int = 10) -> list[ScoredMemory]:
        for m in memories:
            self.score(m)
        memories.sort(key=lambda m: m.final_score, reverse=True)
        return memories[:top_k]

Freshness decay

Freshness decay ensures that old memories gradually lose influence unless they are frequently accessed or marked as permanently important. The exponential decay formula:

freshness(t) = exp(-ln(2) * age / half_life)

With a half-life of one week, a memory from:

  • 1 hour ago has freshness 0.996
  • 1 day ago has freshness 0.905
  • 1 week ago has freshness 0.500
  • 1 month ago has freshness 0.063

This prevents stale memories from crowding out fresh, relevant context.

Reranking with a cross-encoder

For high-stakes retrieval, a cross-encoder reranker scores each (query, candidate) pair jointly, improving precision over embedding-only similarity:

class CrossEncoderReranker:
    """Rerank candidates using a cross-encoder model."""

    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    def rerank(
        self,
        query: str,
        candidates: list[str],
        top_k: int = 5,
    ) -> list[tuple[str, float]]:
        pairs = [(query, c) for c in candidates]
        scores = self.model.predict(pairs)
        ranked = sorted(
            zip(candidates, scores), key=lambda x: x[1], reverse=True
        )
        return ranked[:top_k]

Context assembly

After retrieval and ranking, the context assembler packs retrieved memories into the prompt within the token budget:

def assemble_context(
    retrieved: list[ScoredMemory],
    token_budget: int,
    header: str = "Relevant context from memory:",
) -> str:
    """Pack retrieved memories into a context block within budget."""
    lines = [header, ""]
    used_tokens = len(header) // 4

    for mem in retrieved:
        entry = f"- [{mem.final_score:.2f}] {mem.text}"
        entry_tokens = len(entry) // 4
        if used_tokens + entry_tokens > token_budget:
            break
        lines.append(entry)
        used_tokens += entry_tokens

    return "\n".join(lines)

132.11 — Implementation: mem0, LangGraph persistence, and custom PostgreSQL-backed memory

Let us look at three practical approaches, ranging from off-the-shelf to fully custom.

Approach 1: mem0 — managed memory layer

mem0 (formerly Mem0) provides a drop-in memory layer for LLM applications. It handles embedding, storage, and retrieval behind a simple API.

from mem0 import Memory

# Initialize with configuration
config = {
    "vector_store": {
        "provider": "qdrant",
        "config": {
            "host": "localhost",
            "port": 6333,
            "collection_name": "agent_memory",
        },
    },
    "embedder": {
        "provider": "openai",
        "config": {"model": "text-embedding-3-small"},
    },
    "llm": {
        "provider": "anthropic",
        "config": {"model": "claude-sonnet-4-20250514"},
    },
}

memory = Memory.from_config(config)

# Add memories scoped to a user
memory.add(
    "I prefer dark mode and vim keybindings in all editors.",
    user_id="user_42",
    metadata={"category": "preferences"},
)

memory.add(
    "Production database is PostgreSQL 16 on db.prod.internal:5432.",
    user_id="user_42",
    metadata={"category": "infrastructure"},
)

# Retrieve relevant memories
results = memory.search(
    "What database should I connect to?",
    user_id="user_42",
    limit=5,
)
for result in results:
    print(f"[{result['score']:.3f}] {result['memory']}")

# Memory is automatically deduplicated and updated
memory.add(
    "Production database migrated to PostgreSQL 17 on db2.prod.internal:5432.",
    user_id="user_42",
    metadata={"category": "infrastructure"},
)
# mem0 detects the conflict and updates the existing memory

Strengths: minimal setup, automatic deduplication, built-in LLM-based extraction.
Limitations: less control over scoring and retrieval logic; dependency on external service.

Approach 2: LangGraph persistence with checkpointing

LangGraph provides built-in persistence via checkpointers, giving agents durable state across runs.

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list[dict], operator.add]
    memories: list[str]
    current_plan: list[str]
    facts: dict[str, str]

def retrieve_memories(state: AgentState) -> dict:
    """Node: retrieve relevant memories based on latest message."""
    latest = state["messages"][-1]["content"]
    # (Use your vector store here)
    retrieved = vector_store.search(latest, top_k=5)
    return {
        "memories": [r.text for r, _ in retrieved],
    }

def generate_response(state: AgentState) -> dict:
    """Node: generate response with memory-augmented context."""
    memory_context = "\n".join(f"- {m}" for m in state["memories"])
    messages = [
        {"role": "system", "content": f"Relevant memories:\n{memory_context}"},
        *state["messages"],
    ]
    response = llm.complete(messages)
    return {"messages": [{"role": "assistant", "content": response}]}

def persist_memories(state: AgentState) -> dict:
    """Node: extract and store new memories from the conversation."""
    latest_exchange = state["messages"][-2:]  # user + assistant
    scorer = ImportanceScorer()
    for msg in latest_exchange:
        importance = scorer.score(msg["content"], msg["role"])
        if importance > 0.5:
            vector_store.add(msg["content"], metadata={"role": msg["role"]})
    return {}

# Build the graph
builder = StateGraph(AgentState)
builder.add_node("retrieve", retrieve_memories)
builder.add_node("generate", generate_response)
builder.add_node("persist", persist_memories)

builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", "persist")
builder.add_edge("persist", END)

# Compile with PostgreSQL checkpointer for durable state
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://agent:secret@localhost:5432/agent_memory"
)
graph = builder.compile(checkpointer=checkpointer)

# Run — state is automatically persisted between invocations
config = {"configurable": {"thread_id": "user_42_session_7"}}
result = graph.invoke(
    {"messages": [{"role": "user", "content": "Deploy the user service to staging"}]},
    config=config,
)

Approach 3: custom PostgreSQL-backed memory

For full control, build a custom memory system on PostgreSQL with pgvector:

import asyncpg
import numpy as np
import json
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class PgMemoryConfig:
    dsn: str = "postgresql://agent:secret@localhost:5432/agent_memory"
    embedding_dim: int = 1536
    table_name: str = "memories"

class PostgresMemoryStore:
    """Production-grade memory store using PostgreSQL + pgvector."""

    def __init__(self, config: PgMemoryConfig, embed_fn):
        self.config = config
        self.embed_fn = embed_fn
        self.pool: Optional[asyncpg.Pool] = None

    async def initialize(self) -> None:
        """Create connection pool and ensure schema exists."""
        self.pool = await asyncpg.create_pool(self.config.dsn, min_size=2, max_size=10)

        async with self.pool.acquire() as conn:
            await conn.execute("CREATE EXTENSION IF NOT EXISTS vector;")
            await conn.execute(f"""
                CREATE TABLE IF NOT EXISTS {self.config.table_name} (
                    id          BIGSERIAL PRIMARY KEY,
                    content     TEXT NOT NULL,
                    embedding   vector({self.config.embedding_dim}),
                    user_id     TEXT NOT NULL,
                    category    TEXT DEFAULT 'general',
                    importance  REAL DEFAULT 0.5,
                    metadata    JSONB DEFAULT '{{}}'::jsonb,
                    created_at  TIMESTAMPTZ DEFAULT now(),
                    updated_at  TIMESTAMPTZ DEFAULT now(),
                    access_count INTEGER DEFAULT 0,
                    last_accessed TIMESTAMPTZ DEFAULT now()
                );
            """)
            # Create indexes
            await conn.execute(f"""
                CREATE INDEX IF NOT EXISTS idx_{self.config.table_name}_embedding
                ON {self.config.table_name}
                USING ivfflat (embedding vector_cosine_ops)
                WITH (lists = 100);
            """)
            await conn.execute(f"""
                CREATE INDEX IF NOT EXISTS idx_{self.config.table_name}_user
                ON {self.config.table_name} (user_id, category);
            """)

    async def add(
        self,
        content: str,
        user_id: str,
        category: str = "general",
        importance: float = 0.5,
        metadata: Optional[dict] = None,
    ) -> int:
        """Store a new memory."""
        embedding = self.embed_fn(content)
        embedding_str = "[" + ",".join(str(x) for x in embedding) + "]"

        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(f"""
                INSERT INTO {self.config.table_name}
                    (content, embedding, user_id, category, importance, metadata)
                VALUES ($1, $2::vector, $3, $4, $5, $6::jsonb)
                RETURNING id
            """, content, embedding_str, user_id, category, importance,
                json.dumps(metadata or {}))
            return row["id"]

    async def search(
        self,
        query: str,
        user_id: str,
        top_k: int = 10,
        category: Optional[str] = None,
        min_importance: float = 0.0,
        freshness_half_life_days: float = 7.0,
    ) -> list[dict]:
        """Retrieve memories ranked by composite score."""
        query_embedding = self.embed_fn(query)
        embedding_str = "[" + ",".join(str(x) for x in query_embedding) + "]"

        category_filter = "AND category = $4" if category else ""
        params = [embedding_str, user_id, min_importance]
        if category:
            params.append(category)

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(f"""
                WITH scored AS (
                    SELECT
                        id,
                        content,
                        category,
                        importance,
                        metadata,
                        created_at,
                        access_count,
                        1 - (embedding <=> $1::vector) AS similarity,
                        EXP(
                            -0.693 * EXTRACT(EPOCH FROM (now() - created_at))
                            / ({freshness_half_life_days} * 86400)
                        ) AS freshness
                    FROM {self.config.table_name}
                    WHERE user_id = $2
                      AND importance >= $3
                      {category_filter}
                    ORDER BY embedding <=> $1::vector
                    LIMIT {top_k * 3}
                )
                SELECT *,
                    (0.50 * similarity
                     + 0.25 * freshness
                     + 0.20 * importance
                     + 0.05 * LEAST(LN(1 + access_count) / 10.0, 1.0)
                    ) AS final_score
                FROM scored
                ORDER BY final_score DESC
                LIMIT {top_k}
            """, *params)

            # Update access counts
            ids = [r["id"] for r in rows]
            if ids:
                await conn.execute(f"""
                    UPDATE {self.config.table_name}
                    SET access_count = access_count + 1,
                        last_accessed = now()
                    WHERE id = ANY($1::bigint[])
                """, ids)

            return [dict(r) for r in rows]

    async def forget(
        self,
        user_id: str,
        older_than_days: int = 90,
        min_access_count: int = 0,
        max_importance: float = 0.3,
    ) -> int:
        """Garbage-collect low-value old memories."""
        async with self.pool.acquire() as conn:
            result = await conn.execute(f"""
                DELETE FROM {self.config.table_name}
                WHERE user_id = $1
                  AND created_at < now() - INTERVAL '{older_than_days} days'
                  AND access_count <= $2
                  AND importance <= $3
            """, user_id, min_access_count, max_importance)
            return int(result.split()[-1])  # "DELETE N"

    async def close(self) -> None:
        if self.pool:
            await self.pool.close()

Key design decisions in this implementation:

  • IVFFlat index on the embedding column for fast approximate nearest neighbor search.
  • Composite scoring in SQL — the database does the heavy lifting, not the application.
  • Access tracking — every retrieval increments the access count, preventing useful memories from being garbage collected.
  • Garbage collection via the forget method — old, low-importance, rarely-accessed memories are pruned automatically.

Choosing your approach

Criterionmem0LangGraphCustom PostgreSQL
Setup timeMinutesHoursDays
Control over scoringLowMediumFull
Operational complexityLow (managed)MediumHigh
CustomizabilityLimitedGoodUnlimited
Multi-agent supportBuilt-inVia thread IDsCustom
Cost at scaleVendor pricingInfrastructureInfrastructure

132.12 — Mental model

Eight takeaway points for agent memory systems:

  1. Memory is not optional. Any agent that runs more than one turn or serves a user more than once needs engineered memory. The LLM itself remembers nothing.

  2. Think in cache layers. Working memory (context window) is L1 cache — fast, small, volatile. Short-term memory (conversation history) is L2. Long-term stores (vector DB, SQL) are main memory. Design promotion and eviction policies deliberately.

  3. Not everything should be remembered. The write path needs an importance filter. Storing everything creates retrieval noise that degrades agent quality more than storing nothing.

  4. Retrieval quality determines memory quality. A vast memory store is worthless if the agent retrieves the wrong items. Invest in ranking: combine semantic similarity, recency decay, importance scores, and frequency signals.

  5. Freshness decay prevents staleness. Exponential decay with a configurable half-life ensures old memories fade gracefully unless they are reinforced by repeated access or high importance.

  6. Separate structure from semantics. Use vector stores for fuzzy semantic recall and relational databases for precise factual lookup. Most production systems need both.

  7. Episodic memory creates learning loops. Recording full execution traces and extracting lessons lets agents improve over time. This is the closest thing to “experience” a stateless LLM can have.

  8. Procedural memory accelerates execution. Caching successful tool-use patterns and plans means the agent converges on effective strategies rather than re-exploring from scratch every time.


Read it yourself

  • “Generative Agents: Interactive Simulacra of Human Behavior” (Park et al., 2023) — the Stanford paper that introduced memory streams, reflection, and planning for LLM agents, sparking the modern agent memory movement.
  • “MemGPT: Towards LLMs as Operating Systems” (Packer et al., 2023) — reframes context management as virtual memory with explicit paging between main context and external storage.
  • mem0 documentation (https://docs.mem0.ai) — practical reference for the managed memory layer.
  • LangGraph persistence guide (https://langchain-ai.github.io/langgraph/) — checkpointing, state management, and cross-session memory in graph-based agents.
  • pgvector documentation (https://github.com/pgvector/pgvector) — vector similarity search in PostgreSQL, the workhorse of many custom memory backends.
  • “Cognitive Architectures for Language Agents” (Sumers et al., 2024) — a systematic framework mapping cognitive science memory models to LLM agent designs.

Practice

  1. Implement a sliding window memory with a maximum of 4,000 tokens. Feed it a 20-turn conversation and verify that only the most recent messages survive. Confirm the system prompt is always preserved.

  2. Build a summarization-based memory using the SummarizingMemory class. Compare the information retained versus sliding-window truncation on a 50-turn debugging session. Which approach preserves the root cause diagnosis?

  3. Create a priority-aware conversation memory with PriorityConversationMemory. Tag messages with priorities manually and verify that HIGH-priority messages survive eviction while LOW-priority messages are dropped first.

  4. Stand up a vector memory store using the VectorMemoryStore class with a local sentence-transformer embedding model. Add 100 memories from different categories and measure retrieval precision at top-5 for 10 test queries.

  5. Implement the full RetrievalRanker with freshness decay. Create memories at different timestamps (mock time.time()) and verify that the composite scoring correctly balances relevance, recency, importance, and frequency.

  6. Build an episodic memory system that records agent runs and extracts lessons. Simulate five agent episodes (three successes, two failures), then query for lessons relevant to a new similar task. Verify that failure lessons appear and are actionable.

  7. Stretch: Deploy the full PostgresMemoryStore on a real PostgreSQL instance with pgvector. Load 10,000 synthetic memories, benchmark search latency at p50/p95/p99, tune the IVFFlat lists parameter, and implement the forget garbage collector with a 30-day retention policy. Measure how retrieval quality changes as the store grows from 1K to 10K memories.