Skip to content

EpisodicMemoryPolicy

autogen.beta.policies.episodic_memory.EpisodicMemoryPolicy #

EpisodicMemoryPolicy(max_episodes=5, transparent=True)

Injects past conversation summaries from the knowledge store.

Reads /memory/conversations/ and injects the most recent summaries into the system prompt. This gives the actor context about past episodes.

Source code in autogen/beta/policies/episodic_memory.py
def __init__(self, max_episodes: int = 5, transparent: bool = True) -> None:
    self._max = max_episodes
    self._transparent = transparent

name class-attribute instance-attribute #

name = 'episodic_memory'

apply async #

apply(prompts, events, context)
Source code in autogen/beta/policies/episodic_memory.py
async def apply(
    self,
    prompts: list[str],
    events: list[BaseEvent],
    context: Context,
) -> tuple[list[str], list[BaseEvent]]:
    store = context.dependencies.get(KnowledgeStore)
    if not store:
        return prompts, events

    entries = await store.list(CONVERSATIONS_PREFIX)
    if not entries:
        return prompts, events

    # Read most recent summaries
    recent = entries[-self._max :]
    summaries: list[str] = []
    for entry in recent:
        content = await store.read(f"{CONVERSATIONS_PREFIX}{entry}")
        if content:
            summaries.append(content)

    if summaries:
        block = "## Past Conversations\n\n" + "\n\n---\n\n".join(summaries)
        prompts = prompts + [block]
        if self._transparent:
            prompts = prompts + [f"[{self.name}] Injected {len(summaries)} past conversation summaries."]

    return prompts, events