Skip to content

AG2 Beta: Stronger Foundation for Real-World Agents

AG2 Beta

The original AutoGen and later AG2 architecture helped define the early agent ecosystem. It enabled real systems, shaped how many developers thought about agent orchestration, and gave us firsthand experience building and operating agent applications in practice.

That experience also made the limits of the original design clearer over time. As the agent ecosystem matured, expectations changed. Agents increasingly needed to fit into real application environments with concurrent users, explicit session boundaries, platform identities, persistence layers, and integration points that could not be treated as incidental details.

We found that some of these needs were challenging to address cleanly inside the original framework model. In many cases, shipping more production-suitable behavior meant adding complexity around the edges instead of improving the core abstraction itself, taking the focus of developers away from building agentic capabilities. That is a big part of why we decided to create AG2 Beta: a new framework track built around lessons we learned from the original AG2 to better support modern, real-world agentic systems.

You can read more about the motivation behind AG2 Beta in the AG2 Beta overview.

One of the clearest examples of that shift is conversation state.

The Problem with Stateful Agent Instances#

In the previous implementation, an agent instance effectively carried the active conversation with it. That was convenient for small examples, but it became impractical in real applications:

  • one agent instance could not be cleanly reused across concurrent users
  • isolating conversations required extra coordination outside the agent API
  • integrating agents into chat platforms, web apps, and background workers was hard

These gaps appear when you move from demos to production.

In a real-world application, the agent definition should be reusable. The conversation should be isolated. The application should decide how sessions are created, persisted, resumed, and discarded.

The Core Idea: MemoryStream#

AG2 Beta introduces MemoryStream as the conversation boundary.

Instead of storing the current conversation history on the agent instance, you create a fresh stream for each conversation. To continue that conversation, you pass the same stream into the next ask() call.

That provides a better model for real systems:

  • the agent definition is reusable
  • each conversation is isolated by its own stream
  • concurrent users can share the same agent safely while keeping separate histories
  • the application can decide when to persist or discard a conversation

If you want to explore the details of how streams work as the event bus behind a conversation, see Events Streaming.

Basic Example: One Agent, Two Sequential Turns#

Here is the simplest form of the new model:

from autogen.beta import Agent, MemoryStream
from autogen.beta.config import OpenAIConfig

agent = Agent(
    "assistant",
    prompt="You are a concise assistant.",
    config=OpenAIConfig("gpt-5-nano"),
)

async def main() -> None:
    # Conversation A
    stream_a = MemoryStream()

    reply_1 = await agent.ask(
        "My name is Anna. Please remember it.",
        stream=stream_a,
    )
    print(reply_1.content)

    reply_2 = await agent.ask(
        "What is my name?",
        stream=stream_a,
    )
    print(reply_2.content)

    # Conversation B starts fresh because it uses a different stream
    stream_b = MemoryStream()

    reply_3 = await agent.ask(
        "What is my name?",
        stream=stream_b,
    )
    print(reply_3.content)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

The important detail is not the model call, it is the ownership of state.

The same agent instance serves both conversations. The memory lives in stream_a and stream_b, not in the agent itself. That is the key property that makes the API significantly easier to embed into real applications.

From Sequential Turns to Real Applications#

This architecture becomes even more useful when the agent is connected to a real communication channel.

To demonstrate that, here's a Telegram-based integration. The structure is intentionally simple:

  • keep one reusable agent definition
  • create one MemoryStream per Telegram chat
  • pass the same stream back into agent.ask(...) on each new message

That is enough to turn the same agent into a multi-user ambient application without mixing conversation histories.

The first, minimal, version focuses on the session boundary and message flow. We pass user_id into the conversation variables so the next iteration of the conversation can use that identity inside tools.

import asyncio

from aiogram import Bot, Dispatcher, F
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart
from aiogram.types import Message

from autogen.beta import Agent, MemoryStream
from autogen.beta.config import OpenAIConfig

agent = Agent(
    "ambient-agent",
    config=OpenAIConfig("gpt-5-nano", reasoning_effort="low"),
)

dp = Dispatcher()
chat_state: dict[int, MemoryStream] = {}

def get_or_create_stream(chat_id: int) -> MemoryStream:
    if chat_id not in chat_state:
        chat_state[chat_id] = MemoryStream()
    return chat_state[chat_id]

@dp.message(CommandStart())
async def cmd_start(message: Message) -> None:
    get_or_create_stream(message.chat.id)
    await message.answer("Hi! Send me a message and I'll reply using AG2 Beta.")

@dp.message(F.text)
async def on_text(message: Message) -> None:
    chat_stream = get_or_create_stream(message.chat.id)

    status = await message.answer("Thinking…")
    try:
        reply = await agent.ask(
            message.text,
            stream=chat_stream,
            # pass the `user_id` as a variable to the conversation
            variables={"user_id": message.chat.id},
        )
        text = reply.content or "(no response)"

        await status.edit_text(text)
    except Exception as e:
        await status.edit_text(f"Error: {e}")

if __name__ == "__main__":
    bot = Bot(
        token="...",
        default=DefaultBotProperties(parse_mode=ParseMode.HTML),
    )
    asyncio.run(dp.start_polling(bot))

The resulting interaction looks like this in Telegram: one chat, one stream, and each new message continues the same conversation context.

Telegram conversation using one MemoryStream per chat

Adding Long-Term Memory on Top#

Short-term conversation state is handled by MemoryStream, but real assistants often need more than that. They also need a lightweight way to recall what happened earlier, even after the in-memory conversation is cleared.

The Telegram example shows one practical pattern:

  1. keep the live conversation in a MemoryStream
  2. save the completed conversation to a per-user file such as ./memory/<user_id>/YYYY-MM-DD.md
  3. expose a tool that can load that saved history back into context when needed

The next snippet adds to the previous to do this, showing the additional pieces needed for long-term memory: a tool that can read saved history, a /clear command that persists the current stream before dropping it, and a helper that writes the conversation transcript to disk.

import asyncio
from datetime import date, datetime, timezone
from pathlib import Path
from typing import Annotated

from aiogram import Dispatcher
from aiogram.filters import Command
from aiogram.types import Message
from pydantic import Field

from autogen.beta import Agent, MemoryStream, Variable, tool
from autogen.beta.config import OpenAIConfig
from autogen.beta.events import ModelRequest, ModelResponse

MEMORY_DIR = Path("./memory")

@tool
def read_conversation_history(
    date: Annotated[
        date,
        Field(
            default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d"),
            description="The date to read the conversation history for",
        ),
    ],
    # get the `user_id` from the conversation context
    user_id: Annotated[int, Variable()],
) -> str:
    """Read current user's conversation history for the given date (summaries from previous sessions).
    Use this when the user asks what was discussed earlier on the given date or to recall prior context.
    """
    path = MEMORY_DIR / str(user_id) / date.strftime("%Y-%m-%d.md")
    if not path.exists():
        return "No conversation history saved for today yet."
    return path.read_text()

agent = Agent(
    "ambient-agent",
    config=OpenAIConfig("gpt-5-nano", reasoning_effort="low"),
    tools=[read_conversation_history],
)

dp = Dispatcher()
chat_state: dict[int, MemoryStream] = {}

# add `/clear` command to clear the conversation and save the history to memory
@dp.message(Command("clear"))
async def cmd_clear(message: Message) -> None:
    # remove current conversation stream
    stream = chat_state.pop(message.chat.id, None)
    if stream is not None:
        # append it to `./memory/<user_id>/YYYY-MM-DD.md`
        await write_conversation_to_memory(message.chat.id, stream)
    await message.answer("Context dropped. The next message starts a new conversation.")

async def write_conversation_to_memory(user_id: int, stream: MemoryStream) -> None:
    # summarise the conversation
    events = await stream.history.get_events()
    lines: list[str] = []

    for event in events:
        if isinstance(event, ModelRequest):
            lines.append(f"**user:** {event.content}")
        elif isinstance(event, ModelResponse) and event.content:
            lines.append(f"**assistant:** {event.content}")

    if not lines:
        return

    # use `./memory/<user_id>/YYYY-MM-DD.md` as the todays conversation history file
    now = datetime.now(timezone.utc)
    user_dir = MEMORY_DIR / str(user_id)
    user_dir.mkdir(parents=True, exist_ok=True)
    path = user_dir / f"{now:%Y-%m-%d}.md"

    def _write() -> None:
        # append current history to the file
        with path.open("a") as file:
            if file.tell() != 0:
                file.write("\n\n---\n\n")
            file.write(f"## {now:%H:%M:%S} UTC\n\n")
            file.write("\n\n".join(lines))

    await asyncio.to_thread(_write)

# The rest of Telegram bot code from above...
...

After calling /clear, the bot can still recover the saved context through the history-loading tool:

Telegram conversation recovering saved context after /clear

Taken together, the two snippets describe the full pattern: the first establishes per-chat conversation streams, and the second adds persistence and recall on top of that same structure. It does not require a complex orchestration layer to feel production-shaped.

The application (Telegram Bot) owns session lifecycle. The agent owns reasoning and tool use. MemoryStream bridges the two.

`ambient.py` — full source (click to expand)
import asyncio
import logging
import os
from datetime import date, datetime, timezone
from pathlib import Path
from typing import Annotated

from aiogram import Bot, Dispatcher, F
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.filters import Command, CommandStart
from aiogram.types import Message
from pydantic import Field

from autogen.beta import Agent, MemoryStream, Variable
from autogen.beta.config import OpenAIConfig
from autogen.beta.events import ModelRequest, ModelResponse
from autogen.beta.middleware import LoggingMiddleware

MEMORY_DIR = Path("./memory")

logging.basicConfig(level=logging.INFO)

agent = Agent(
    "test-agent",
    config=OpenAIConfig(
        "gpt-5-nano",
        reasoning_effort="low",
        streaming=True,
    ),
    middleware=[LoggingMiddleware()],
)

@agent.tool
def read_conversation_history(
    date: Annotated[
        date,
        Field(
            default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d"),
            description="The date to read the conversation history for",
        ),
    ],
    user_id: Annotated[int, Variable()],
) -> str:
    """Read current user's conversation history for the given date (summaries from previous sessions).
    Use this when the user asks what was discussed earlier on the given date or to recall prior context.
    """
    path = MEMORY_DIR / str(user_id) / date.strftime("%Y-%m-%d.md")
    if not path.exists():
        return "No conversation history saved for today yet."
    return path.read_text(encoding="utf-8")

dp = Dispatcher()
chat_state: dict[int, MemoryStream] = {}

def _get_or_create_chat(chat_id: int) -> MemoryStream:
    if chat_id not in chat_state:
        chat_state[chat_id] = MemoryStream()
    return chat_state[chat_id]

@dp.message(CommandStart())
async def cmd_start(message: Message) -> None:
    _get_or_create_chat(message.chat.id)
    await message.answer("Hi, there! Send me any message and I'll reply using the agent.")

@dp.message(Command("clear"))
async def cmd_clear(message: Message) -> None:
    stream = chat_state.pop(message.chat.id, None)
    if stream is not None:
        await _write_conversation_to_memory(message.chat.id, stream)
    await message.answer("Context dropped. Next message will start a new conversation.")

@dp.message(F.text)
async def on_text(message: Message) -> None:
    if not message.text:
        return
    chat_stream = _get_or_create_chat(message.chat.id)

    status = await message.answer("Thinking…")
    try:
        reply = await agent.ask(
            message.text,
            stream=chat_stream,
            variables={"user_id": message.chat.id},
        )
        text = reply.content or "(no response)"

        await status.edit_text(text)
    except Exception as e:
        await status.edit_text(f"Error: {e}")

async def _write_conversation_to_memory(user_id: int, stream: MemoryStream) -> None:
    """Write current conversation transcript to folder-based memory."""
    # summarise the conversation
    events = await stream.history.get_events()
    lines: list[str] = []
    for ev in events:
        if isinstance(ev, ModelRequest):
            lines.append(f"**user:** {ev.content}")
        elif isinstance(ev, ModelResponse) and ev.content:
            lines.append(f"**assistant:** {ev.content}")
    if not lines:
        return

    # use `./memory/user_id/YYYY-MM-DD.md` as the todays conversation history file
    now = datetime.now(timezone.utc)
    user_dir = MEMORY_DIR / str(user_id)
    user_dir.mkdir(parents=True, exist_ok=True)
    path = user_dir / now.strftime("%Y-%m-%d.md")
    content = "\n\n".join(lines)

    def _write() -> None:
        # append current history to the file
        with path.open("a", encoding="utf-8") as f:
            if f.tell() != 0:
                f.write("\n\n---\n\n")
            f.write(f"## {now.strftime('%H:%M:%S')} UTC\n\n")
            f.write(content)

    await asyncio.to_thread(_write)

if __name__ == "__main__":
    token = os.environ.get("TELEGRAM_BOT_TOKEN")
    bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))

    asyncio.run(dp.start_polling(bot))

Why This Matters#

This design is a better fit for how real applications are actually built:

  • Chat sessions, web sessions, and background jobs already have their own identity model
  • session state often needs to be persisted independently from the agent definition
  • developers need a simple way to resume, archive, or discard conversations

That is why MemoryStream is an important primitive in AG2 Beta. It moves conversation ownership to the same layer that manages users and sessions.

A Pattern You May Recognize from OpenClaw#

If you have looked at OpenClaw, this architecture should feel familiar.

The shared idea is simple: keep the runtime session outside the reusable agent definition. The application manages per-user or per-channel conversation state, while the agent logic stays portable.

Start with Single-Agent Systems#

Right now, AG2 Beta is especially strong for single-agent applications and integrations. That is deliberate.

We want to make the most common real-world cases easy first:

  • one reusable agent
  • one conversation stream per user or session
  • optional persistence and memory tools when the application needs them

That model is already enough to power ambient assistants, bots, web co-pilots, and background workers in a clean way.

Note

Ready for multi-agent workflows? They're on the way, but you can start experimenting with Beta agents in existing AG2 workflows right now. Beta agents can be converted to ConversableAgents using as_conversable(). This allows them to be used in existing group chats, sequential workflows, and with handoffs. See AG2 Compatibility for the current integration path.

You can introduce new agents from the Beta API right into your current AG2 chat topologies.

Try AG2 Beta#

AG2 Beta is available today inside the ag2 package. Just pip install ag2 like you already do and check out the docs.

Here's a suggested mental model to get started:

  1. Define one reusable agent
  2. Create a new MemoryStream for each conversation
  3. Pass the same stream back into each new turn
  4. Add persistence only where your application actually needs it

It's a small API shift, but it unlocks a strong foundation for real-world systems.

If your current agent application needs isolated conversations, concurrent users, or lightweight long-term memory, give AG2 Beta a try!