Skip to content

Aggregation

Aggregation extracts structured knowledge from raw events and writes it to the KnowledgeStore. It is the knowledge-organizing counterpart to Compaction.

Compaction removes. Aggregation creates. They are separate concerns.

Aggregation is how an agent builds up persistent state between conversations — the material that WorkingMemoryPolicy and EpisodicMemoryPolicy read back in on subsequent runs.

When to use it#

You want the agent to Use
Carry stable facts across conversations (user prefs, role, timezone) WorkingMemoryAggregate + WorkingMemoryPolicy
Remember summaries of what happened in past sessions ConversationSummaryAggregate + EpisodicMemoryPolicy
Index artifacts for later retrieval Write a custom strategy

Each built-in is one half of a producer/consumer pair: the aggregate writes a file, the assembly policy reads it back on the next turn.

AggregateStrategy protocol#

Every strategy implements the same shape:

from typing import Protocol
from autogen.beta import Context
from autogen.beta.events import BaseEvent
from autogen.beta.knowledge import KnowledgeStore

class AggregateStrategy(Protocol):
    async def aggregate(
        self,
        events: list[BaseEvent],
        context: Context,
        store: KnowledgeStore,
    ) -> None:
        ...

Aggregation returns nothing — its output lives in the knowledge store. Unlike CompactStrategy, the store is required (not optional).

AggregateTrigger#

A dataclass describing when aggregation should fire.

1
2
3
4
5
6
7
from autogen.beta.aggregate import AggregateTrigger

trigger = AggregateTrigger(
    every_n_turns=10,      # aggregate every 10 LLM turns
    every_n_events=100,    # aggregate every 100 new events
    on_end=True,           # aggregate when the conversation ends
)

Each condition is independent. Setting a counter to 0 disables it. AggregateTrigger() with no arguments fires nothing — every condition is opt-in. on_end defaults to False because each strategy costs one LLM call per fire, and a typical setup pairs ConversationSummaryAggregate with WorkingMemoryAggregate (so on_end=True doubles the per-conversation cost).

Like CompactTrigger, this is a plain data object — it records when you want aggregation to fire, but does not fire it. Strategies are invoked explicitly via await strategy.aggregate(...).

Built-in strategies#

Both built-ins are importable from autogen.beta.aggregate and both take a ModelConfig for a summarization LLM call. Use a smaller / cheaper model than the agent's main model.

ConversationSummaryAggregate#

Writes a timestamped summary of the conversation to /memory/conversations/. The companion to EpisodicMemoryPolicy, which reads from that directory.

from autogen.beta.aggregate import ConversationSummaryAggregate
from autogen.beta.config import OpenAIConfig
from autogen.beta.knowledge import MemoryKnowledgeStore

store = MemoryKnowledgeStore()
strategy = ConversationSummaryAggregate(config=OpenAIConfig(model="gpt-5-mini"))

# After the conversation:
await strategy.aggregate(events, ctx, store)

# Produces a file like:
# /memory/conversations/20260420T091530_<stream_id>.md

Filenames are {ISO timestamp}_{stream id}.md, so lexicographic sort matches chronological sort — which is why EpisodicMemoryPolicy(max_episodes=N) can simply take the trailing N entries.

Token usage is recorded on the strategy instance as strategy.last_usage.

WorkingMemoryAggregate#

Updates /memory/working.md — the actor's single persistent state document. Reads the existing file, merges in context from recent events, writes the updated version back. The companion to WorkingMemoryPolicy.

from autogen.beta.aggregate import WorkingMemoryAggregate
from autogen.beta.config import OpenAIConfig
from autogen.beta.knowledge import MemoryKnowledgeStore

store = MemoryKnowledgeStore()
strategy = WorkingMemoryAggregate(config=OpenAIConfig(model="gpt-5-mini"))

# Refresh working memory at the end of a session:
await strategy.aggregate(events, ctx, store)
# /memory/working.md now reflects the latest context.

Unlike ConversationSummaryAggregate, this one is destructive toward its own prior output — each call overwrites /memory/working.md with the merged version. That is the point: working memory is a rolling single-file state, not an append log.

Pairing with assembly policies#

The intended pattern: aggregate at the end of a conversation (or on a cadence), then read back in on the next turn via the matching assembly policy.

flowchart LR
    A[Conversation events] --> B[Aggregate]
    B --> C[/KnowledgeStore/]
    C --> D[Policy]
    D --> E[Next LLM turn]
Aggregate File Policy
ConversationSummaryAggregate /memory/conversations/{ts}_{id}.md EpisodicMemoryPolicy
WorkingMemoryAggregate /memory/working.md WorkingMemoryPolicy

The path constants (WORKING_MEMORY_PATH, CONVERSATIONS_PREFIX) are defined in autogen.beta.knowledge. Both sides of each pair use those constants so the producer/consumer contract is held together by types, not magic strings.

Driving aggregation#

Pattern for invoking a strategy against an AggregateTrigger:

from autogen.beta.aggregate import AggregateTrigger, ConversationSummaryAggregate
from autogen.beta.config import OpenAIConfig

trigger = AggregateTrigger(every_n_turns=10, on_end=True)
strategy = ConversationSummaryAggregate(config=OpenAIConfig(model="gpt-5-mini"))

_turn_count = 0

async def after_turn(events, ctx, store, *, is_end: bool) -> None:
    global _turn_count
    _turn_count += 1
    should = (
        (trigger.every_n_turns and _turn_count % trigger.every_n_turns == 0)
        or (is_end and trigger.on_end)
    )
    if should:
        await strategy.aggregate(events, ctx, store)

Writing a custom strategy#

Any object with an async aggregate(events, ctx, store) method satisfies the protocol. Use it to extract domain-specific knowledge:

  • Extract facts. Scan events for entity mentions, write /memory/facts/{entity}.md.
  • Build an index. On each aggregation, append a row to /memory/index.jsonl for later RAG lookup.
  • Classify and tag. Read events, ask the LLM for a category, store under /memory/tags/{tag}/{timestamp}.md.
from autogen.beta.events import BaseEvent, ModelResponse
from autogen.beta.knowledge import KnowledgeStore

class ResponseLengthAggregate:
    """Track the length of each model response for later analysis."""

    async def aggregate(
        self,
        events: list[BaseEvent],
        context,
        store: KnowledgeStore,
    ) -> None:
        lengths = [
            len(e.message.content)
            for e in events
            if isinstance(e, ModelResponse) and e.message
        ]
        if not lengths:
            return
        stream_id = context.stream.id
        path = f"/memory/metrics/{stream_id}.txt"
        await store.write(path, "\n".join(str(n) for n in lengths))

Tip

Aggregation strategies and assembly policies often come in pairs: the strategy writes a path, the policy reads that same path. If you add a new aggregate, consider also adding the reader policy — otherwise the data sits on disk with no way back into the prompt.