AG2 Beta: Stronger Foundation for Real-World Agents
The original AutoGen and later AG2 architecture helped define the early agent ecosystem. It enabled real systems, shaped how many developers thought about agent orchestration, and gave us firsthand experience building and operating agent applications in practice.
That experience also made the limits of the original design clearer over time. As the agent ecosystem matured, expectations changed. Agents increasingly needed to fit into real application environments with concurrent users, explicit session boundaries, platform identities, persistence layers, and integration points that could not be treated as incidental details.
We found that some of these needs were challenging to address cleanly inside the original framework model. In many cases, shipping more production-suitable behavior meant adding complexity around the edges instead of improving the core abstraction itself, taking the focus of developers away from building agentic capabilities. That is a big part of why we decided to create AG2 Beta: a new framework track built around lessons we learned from the original AG2 to better support modern, real-world agentic systems.
You can read more about the motivation behind AG2 Beta in the AG2 Beta overview.
One of the clearest examples of that shift is conversation state.
In the previous implementation, an agent instance effectively carried the active conversation with it. That was convenient for small examples, but it became impractical in real applications:
one agent instance could not be cleanly reused across concurrent users
isolating conversations required extra coordination outside the agent API
integrating agents into chat platforms, web apps, and background workers was hard
These gaps appear when you move from demos to production.
In a real-world application, the agent definition should be reusable. The conversation should be isolated. The application should decide how sessions are created, persisted, resumed, and discarded.
AG2 Beta introduces MemoryStream as the conversation boundary.
Instead of storing the current conversation history on the agent instance, you create a fresh stream for each conversation. To continue that conversation, you pass the same stream into the next ask() call.
That provides a better model for real systems:
the agent definition is reusable
each conversation is isolated by its own stream
concurrent users can share the same agent safely while keeping separate histories
the application can decide when to persist or discard a conversation
If you want to explore the details of how streams work as the event bus behind a conversation, see Events Streaming.
fromautogen.betaimportAgent,MemoryStreamfromautogen.beta.configimportOpenAIConfigagent=Agent("assistant",prompt="You are a concise assistant.",config=OpenAIConfig("gpt-5-nano"),)asyncdefmain()->None:# Conversation Astream_a=MemoryStream()reply_1=awaitagent.ask("My name is Anna. Please remember it.",stream=stream_a,)print(reply_1.content)reply_2=awaitagent.ask("What is my name?",stream=stream_a,)print(reply_2.content)# Conversation B starts fresh because it uses a different streamstream_b=MemoryStream()reply_3=awaitagent.ask("What is my name?",stream=stream_b,)print(reply_3.content)if__name__=="__main__":importasyncioasyncio.run(main())
The important detail is not the model call, it is the ownership of state.
The same agent instance serves both conversations. The memory lives in stream_a and stream_b, not in the agent itself. That is the key property that makes the API significantly easier to embed into real applications.
This architecture becomes even more useful when the agent is connected to a real communication channel.
To demonstrate that, here's a Telegram-based integration. The structure is intentionally simple:
keep one reusable agent definition
create one MemoryStream per Telegram chat
pass the same stream back into agent.ask(...) on each new message
That is enough to turn the same agent into a multi-user ambient application without mixing conversation histories.
The first, minimal, version focuses on the session boundary and message flow. We pass user_id into the conversation variables so the next iteration of the conversation can use that identity inside tools.
importasynciofromaiogramimportBot,Dispatcher,Ffromaiogram.client.defaultimportDefaultBotPropertiesfromaiogram.enumsimportParseModefromaiogram.filtersimportCommandStartfromaiogram.typesimportMessagefromautogen.betaimportAgent,MemoryStreamfromautogen.beta.configimportOpenAIConfigagent=Agent("ambient-agent",config=OpenAIConfig("gpt-5-nano",reasoning_effort="low"),)dp=Dispatcher()chat_state:dict[int,MemoryStream]={}defget_or_create_stream(chat_id:int)->MemoryStream:ifchat_idnotinchat_state:chat_state[chat_id]=MemoryStream()returnchat_state[chat_id]@dp.message(CommandStart())asyncdefcmd_start(message:Message)->None:get_or_create_stream(message.chat.id)awaitmessage.answer("Hi! Send me a message and I'll reply using AG2 Beta.")@dp.message(F.text)asyncdefon_text(message:Message)->None:chat_stream=get_or_create_stream(message.chat.id)status=awaitmessage.answer("Thinking…")try:reply=awaitagent.ask(message.text,stream=chat_stream,# pass the `user_id` as a variable to the conversationvariables={"user_id":message.chat.id},)text=reply.contentor"(no response)"awaitstatus.edit_text(text)exceptExceptionase:awaitstatus.edit_text(f"Error: {e}")if__name__=="__main__":bot=Bot(token="...",default=DefaultBotProperties(parse_mode=ParseMode.HTML),)asyncio.run(dp.start_polling(bot))
The resulting interaction looks like this in Telegram: one chat, one stream, and each new message continues the same conversation context.
Short-term conversation state is handled by MemoryStream, but real assistants often need more than that. They also need a lightweight way to recall what happened earlier, even after the in-memory conversation is cleared.
The Telegram example shows one practical pattern:
keep the live conversation in a MemoryStream
save the completed conversation to a per-user file such as ./memory/<user_id>/YYYY-MM-DD.md
expose a tool that can load that saved history back into context when needed
The next snippet adds to the previous to do this, showing the additional pieces needed for long-term memory: a tool that can read saved history, a /clear command that persists the current stream before dropping it, and a helper that writes the conversation transcript to disk.
importasynciofromdatetimeimportdate,datetime,timezonefrompathlibimportPathfromtypingimportAnnotatedfromaiogramimportDispatcherfromaiogram.filtersimportCommandfromaiogram.typesimportMessagefrompydanticimportFieldfromautogen.betaimportAgent,MemoryStream,Variable,toolfromautogen.beta.configimportOpenAIConfigfromautogen.beta.eventsimportModelRequest,ModelResponseMEMORY_DIR=Path("./memory")@tooldefread_conversation_history(date:Annotated[date,Field(default_factory=lambda:datetime.now(timezone.utc).strftime("%Y-%m-%d"),description="The date to read the conversation history for",),],# get the `user_id` from the conversation contextuser_id:Annotated[int,Variable()],)->str:"""Read current user's conversation history for the given date (summaries from previous sessions). Use this when the user asks what was discussed earlier on the given date or to recall prior context. """path=MEMORY_DIR/str(user_id)/date.strftime("%Y-%m-%d.md")ifnotpath.exists():return"No conversation history saved for today yet."returnpath.read_text()agent=Agent("ambient-agent",config=OpenAIConfig("gpt-5-nano",reasoning_effort="low"),tools=[read_conversation_history],)dp=Dispatcher()chat_state:dict[int,MemoryStream]={}# add `/clear` command to clear the conversation and save the history to memory@dp.message(Command("clear"))asyncdefcmd_clear(message:Message)->None:# remove current conversation streamstream=chat_state.pop(message.chat.id,None)ifstreamisnotNone:# append it to `./memory/<user_id>/YYYY-MM-DD.md`awaitwrite_conversation_to_memory(message.chat.id,stream)awaitmessage.answer("Context dropped. The next message starts a new conversation.")asyncdefwrite_conversation_to_memory(user_id:int,stream:MemoryStream)->None:# summarise the conversationevents=awaitstream.history.get_events()lines:list[str]=[]foreventinevents:ifisinstance(event,ModelRequest):lines.append(f"**user:** {event.content}")elifisinstance(event,ModelResponse)andevent.content:lines.append(f"**assistant:** {event.content}")ifnotlines:return# use `./memory/<user_id>/YYYY-MM-DD.md` as the todays conversation history filenow=datetime.now(timezone.utc)user_dir=MEMORY_DIR/str(user_id)user_dir.mkdir(parents=True,exist_ok=True)path=user_dir/f"{now:%Y-%m-%d}.md"def_write()->None:# append current history to the filewithpath.open("a")asfile:iffile.tell()!=0:file.write("\n\n---\n\n")file.write(f"## {now:%H:%M:%S} UTC\n\n")file.write("\n\n".join(lines))awaitasyncio.to_thread(_write)# The rest of Telegram bot code from above......
After calling /clear, the bot can still recover the saved context through the history-loading tool:
Taken together, the two snippets describe the full pattern: the first establishes per-chat conversation streams, and the second adds persistence and recall on top of that same structure. It does not require a complex orchestration layer to feel production-shaped.
The application (Telegram Bot) owns session lifecycle. The agent owns reasoning and tool use. MemoryStream bridges the two.
importasyncioimportloggingimportosfromdatetimeimportdate,datetime,timezonefrompathlibimportPathfromtypingimportAnnotatedfromaiogramimportBot,Dispatcher,Ffromaiogram.client.defaultimportDefaultBotPropertiesfromaiogram.enumsimportParseModefromaiogram.filtersimportCommand,CommandStartfromaiogram.typesimportMessagefrompydanticimportFieldfromautogen.betaimportAgent,MemoryStream,Variablefromautogen.beta.configimportOpenAIConfigfromautogen.beta.eventsimportModelRequest,ModelResponsefromautogen.beta.middlewareimportLoggingMiddlewareMEMORY_DIR=Path("./memory")logging.basicConfig(level=logging.INFO)agent=Agent("test-agent",config=OpenAIConfig("gpt-5-nano",reasoning_effort="low",streaming=True,),middleware=[LoggingMiddleware()],)@agent.tooldefread_conversation_history(date:Annotated[date,Field(default_factory=lambda:datetime.now(timezone.utc).strftime("%Y-%m-%d"),description="The date to read the conversation history for",),],user_id:Annotated[int,Variable()],)->str:"""Read current user's conversation history for the given date (summaries from previous sessions). Use this when the user asks what was discussed earlier on the given date or to recall prior context. """path=MEMORY_DIR/str(user_id)/date.strftime("%Y-%m-%d.md")ifnotpath.exists():return"No conversation history saved for today yet."returnpath.read_text(encoding="utf-8")dp=Dispatcher()chat_state:dict[int,MemoryStream]={}def_get_or_create_chat(chat_id:int)->MemoryStream:ifchat_idnotinchat_state:chat_state[chat_id]=MemoryStream()returnchat_state[chat_id]@dp.message(CommandStart())asyncdefcmd_start(message:Message)->None:_get_or_create_chat(message.chat.id)awaitmessage.answer("Hi, there! Send me any message and I'll reply using the agent.")@dp.message(Command("clear"))asyncdefcmd_clear(message:Message)->None:stream=chat_state.pop(message.chat.id,None)ifstreamisnotNone:await_write_conversation_to_memory(message.chat.id,stream)awaitmessage.answer("Context dropped. Next message will start a new conversation.")@dp.message(F.text)asyncdefon_text(message:Message)->None:ifnotmessage.text:returnchat_stream=_get_or_create_chat(message.chat.id)status=awaitmessage.answer("Thinking…")try:reply=awaitagent.ask(message.text,stream=chat_stream,variables={"user_id":message.chat.id},)text=reply.contentor"(no response)"awaitstatus.edit_text(text)exceptExceptionase:awaitstatus.edit_text(f"Error: {e}")asyncdef_write_conversation_to_memory(user_id:int,stream:MemoryStream)->None:"""Write current conversation transcript to folder-based memory."""# summarise the conversationevents=awaitstream.history.get_events()lines:list[str]=[]forevinevents:ifisinstance(ev,ModelRequest):lines.append(f"**user:** {ev.content}")elifisinstance(ev,ModelResponse)andev.content:lines.append(f"**assistant:** {ev.content}")ifnotlines:return# use `./memory/user_id/YYYY-MM-DD.md` as the todays conversation history filenow=datetime.now(timezone.utc)user_dir=MEMORY_DIR/str(user_id)user_dir.mkdir(parents=True,exist_ok=True)path=user_dir/now.strftime("%Y-%m-%d.md")content="\n\n".join(lines)def_write()->None:# append current history to the filewithpath.open("a",encoding="utf-8")asf:iff.tell()!=0:f.write("\n\n---\n\n")f.write(f"## {now.strftime('%H:%M:%S')} UTC\n\n")f.write(content)awaitasyncio.to_thread(_write)if__name__=="__main__":token=os.environ.get("TELEGRAM_BOT_TOKEN")bot=Bot(token=token,default=DefaultBotProperties(parse_mode=ParseMode.HTML))asyncio.run(dp.start_polling(bot))
If you have looked at OpenClaw, this architecture should feel familiar.
The shared idea is simple: keep the runtime session outside the reusable agent definition. The application manages per-user or per-channel conversation state, while the agent logic stays portable.
Right now, AG2 Beta is especially strong for single-agent applications and integrations. That is deliberate.
We want to make the most common real-world cases easy first:
one reusable agent
one conversation stream per user or session
optional persistence and memory tools when the application needs them
That model is already enough to power ambient assistants, bots, web co-pilots, and background workers in a clean way.
Note
Ready for multi-agent workflows? They're on the way, but you can start experimenting with Beta agents in existing AG2 workflows right now. Beta agents can be converted to ConversableAgents using as_conversable(). This allows them to be used in existing group chats, sequential workflows, and with handoffs. See AG2 Compatibility for the current integration path.
You can introduce new agents from the Beta API right into your current AG2 chat topologies.