Skip to content

Compaction

Compaction reduces a stream's event history to respect runtime constraints — event count or token budget. It is the constraint-respecting counterpart to Aggregation.

Compaction removes. Aggregation creates. They are separate concerns.

When to use it#

Long-running conversations accumulate events faster than the model's context window can absorb. Use compaction to cap the size of history that flows into the next LLM call.

Symptom Use
History getting close to provider token limit TailWindowCompact or SummarizeCompact
Need to keep recent events and forget old ones cheaply TailWindowCompact
Want a short summary of old events to preserve context SummarizeCompact

CompactStrategy protocol#

Every strategy implements the same shape:

from typing import Protocol
from ag2 import Context
from ag2.events import BaseEvent
from ag2.knowledge import KnowledgeStore

class CompactStrategy(Protocol):
    async def compact(
        self,
        events: list[BaseEvent],
        context: Context,
        store: KnowledgeStore | None,
    ) -> list[BaseEvent]:
        ...

Returns a new event list that replaces the current history. Strategies must preserve the causal ordering of retained events — no reshuffling.

CompactTrigger#

A dataclass describing when compaction should fire. Any configured threshold that is exceeded triggers compaction.

1
2
3
4
5
6
7
from ag2.compact import CompactTrigger

trigger = CompactTrigger(
    max_events=200,           # fire when history exceeds 200 events
    max_tokens=32_000,        # fire when estimated tokens exceed 32k
    chars_per_token=4,        # estimation constant (default 4)
)

Leaving a field at 0 disables that threshold. CompactTrigger() alone does nothing — you must opt into at least one condition.

CompactTrigger is a plain data object — it records when you want compaction to fire, but does not fire it. Strategies are invoked explicitly via await strategy.compact(...).

Built-in strategies#

Both built-ins are importable from ag2.compact.

TailWindowCompact#

Keeps the last N events, drops the rest. Zero LLM cost. Suitable when old context has diminishing value and recency is what matters.

1
2
3
4
5
6
7
8
9
from ag2.compact import TailWindowCompact
from ag2.knowledge import MemoryKnowledgeStore

store = MemoryKnowledgeStore()
compact = TailWindowCompact(target=50)

retained = await compact.compact(events, ctx, store)
# retained is the last 50 events; older ones are dropped
# (and persisted to /log/{stream_id}.dropped-{n}.jsonl if store is passed)

Passing a KnowledgeStore is optional. If provided, dropped events are persisted to /log/ as a numbered segment — see the KnowledgeStore docs — so they can be replayed later via EventLogWriter.load(). If omitted, dropped events are discarded.

SummarizeCompact#

Summarizes the dropped portion via one LLM call, inserts a CompactionSummary event at the head of retained history. Use when you want to keep some sense of the old conversation instead of just forgetting it.

from ag2.compact import SummarizeCompact
from ag2.config import OpenAIConfig
from ag2.knowledge import MemoryKnowledgeStore

store = MemoryKnowledgeStore()
compact = SummarizeCompact(
    target=50,
    config=OpenAIConfig(model="gpt-5-mini"),  # cheap model recommended for summaries
)

retained = await compact.compact(events, ctx, store)
# retained[0] is a CompactionSummary event; retained[1:] are the last 50 originals

The summarization model is independent from the agent's main model — pick a smaller / cheaper one. Token usage is recorded on the strategy instance as strategy.last_usage.

Note

Both built-ins snap the retained boundary to whole turns: a tool call and its result are never split. A tool cycle straddling the boundary is compacted as a unit, so the retained window can be slightly smaller than target. This keeps the retained history valid for providers that reject a tool result with no preceding call.

CompactionSummary#

The synthetic event inserted by SummarizeCompact at the head of history.

1
2
3
4
5
6
from ag2.compact import CompactionSummary

summary = CompactionSummary(
    summary="User asked about gardening and sourdough; decisions made about ...",
    event_count=42,
)

CompactionSummary is on the allowlist of ConversationPolicy, so it survives the assembly chain. Each provider mapper then renders it as a user turn, so the summary reaches the LLM as visible context at the head of history.

Wiring onto an Agent#

Pass the strategy + trigger through KnowledgeConfig. The Agent wires a _CompactionMiddleware that fires the strategy automatically after each turn when the trigger threshold is crossed.

from ag2 import Agent, KnowledgeConfig
from ag2.compact import CompactTrigger, TailWindowCompact
from ag2.config import OpenAIConfig
from ag2.knowledge import MemoryKnowledgeStore

store = MemoryKnowledgeStore()
agent = Agent(
    "assistant",
    config=OpenAIConfig(model="gpt-5"),
    knowledge=KnowledgeConfig(
        store=store,
        compact=TailWindowCompact(target=100),
        compact_trigger=CompactTrigger(max_events=200),
    ),
)

Every compaction attempt emits a triple on the agent's stream:

Event When Use it to
CompactionStarted Just before compact() runs Mark the start of work; carries strategy / event_count
CompactionCompleted compact() returned and history was replaced Read events_before / events_after / usage
CompactionFailed compact() raised Inspect error_type + error; the history is left untouched and the agent turn is not interrupted

The failure path is the one that matters: the strategy exception is also logged via the module logger, but the stream event is the durable signal — subscribe to CompactionFailed if you want failed compactions to surface in your application's UI or alerting. (Aggregation emits the symmetric AggregationStarted / AggregationCompleted / AggregationFailed triple — see Aggregation › Wiring onto an Agent.)

Driving a strategy directly#

If you're not using Agent (custom harness, tests, one-off scripts), call await strategy.compact(...) yourself:

from ag2.compact import CompactTrigger, TailWindowCompact

trigger = CompactTrigger(max_events=200)
compact = TailWindowCompact(target=100)

async def after_turn(events, ctx, store):
    should = trigger.max_events and len(events) > trigger.max_events
    if should:
        events = await compact.compact(events, ctx, store)
    return events

For the token-based threshold, estimate with sum(len(str(e)) for e in events) / trigger.chars_per_token.

Writing a custom strategy#

Any object with an async compact(events, ctx, store) method satisfies the protocol. A couple of ideas:

  • Drop tool noise. Keep ModelRequest / ModelResponse, drop ToolCallEvent / ToolResultEvent older than some boundary.
  • Priority retention. Score events (e.g. keep every ModelResponse but decimate ToolCallEvents).
  • Segmented summarization. Run SummarizeCompact in chunks to produce multiple CompactionSummary events over time rather than one big one.
from ag2.events import BaseEvent, ToolCallEvent, ToolResultEvent
from ag2.knowledge import KnowledgeStore

class DropOldToolEvents:
    """Keep conversation events; drop tool events older than the last K."""

    def __init__(self, keep_last_k: int = 20) -> None:
        self._k = keep_last_k

    async def compact(
        self,
        events: list[BaseEvent],
        context,
        store: KnowledgeStore | None,
    ) -> list[BaseEvent]:
        tool_types = (ToolCallEvent, ToolResultEvent)
        tool_indices = [i for i, e in enumerate(events) if isinstance(e, tool_types)]
        if len(tool_indices) <= self._k:
            return events
        drop_set = set(tool_indices[: -self._k])
        return [e for i, e in enumerate(events) if i not in drop_set]