Stream
What is the Stream?#
The Stream in AG2 Beta is a central event bus that facilitates communication between agents and system components. It operates on an event-driven architecture where components can publish events (derived from BaseEvent) and other components can subscribe to these events.
The Stream manages the flow of messages, actions, and tool calls, allowing for decoupled and scalable architectures. You interact with the Stream by publishing events to it and subscribing to specific events you want to listen to.
Passing a custom stream to an Agent#
By default, agents create a MemoryStream instance internally for each conversation.
However, you can pass a custom stream to an agent when calling its ask method. This allows you to set up subscribers on the stream before the conversation starts, letting you observe or intercept the agent's internal events.
How to subscribe to stream events#
You can listen to events flowing through the stream using the subscribe method. To filter which events you receive, you can use the where method.
Note
All event subscribers and interrupters support the same powerful execution context capabilities as Agent Tools.
By type-hinting the Context object (or using Depends, Inject, Variable), your subscribers can access injected dependencies, interact with human-in-the-loop flows, or access conversation variables.
For more detailed information on specific context features, see Dependency Injection, Context Variables, Depends, Human-in-the-loop.
Subscribe to events of a specific type#
To subscribe to events of a specific type, pass the event class to stream.where().
Subscribe to multiple event types#
You can subscribe to multiple event types using the bitwise OR operator (|).
Subscribe to events with a specific value#
You can filter events based on the value of their fields. The event classes define fields that support comparison operators.
# Subscribe only to ToolCall events where the name is "fetch_data"
stream.where(ToolCall.name == "fetch_data").subscribe(handle_event)
Static subscribers with decorators#
You can use subscribe as a decorator to register static handlers cleanly. This works well with where filters.
Dynamic subscribers by context manager#
If you only need to listen to events for a specific duration or within a specific block of code, you can use sub_scope as a context manager. This dynamically subscribes the handler when entering the block and unsubscribes when exiting.
You can also combine this with where:
Get specific events#
To wait for and get the next occurrence of a specific event asynchronously, use the get async context manager. This yields a future that resolves to the matched event.
Raise events manually#
To raise or publish events to the stream, you should always use the Context object rather than sending them to the stream directly. The Context ensures that dependencies and the stream scope are properly propagated.
from autogen.beta import Context
from autogen.beta.events import ModelMessage
async def publish_message(context: Context):
event = ModelMessage(content="Hello from the agent!")
# Always use context.send() to raise events
await context.send(event)
Events Interrupters#
Interrupters allow you to intercept an event before it reaches regular subscribers. You can use an interrupter to modify the event, raise a completely different event in its place, or suppress it entirely.
To register an interrupter, pass interrupt=True to the subscribe method. If the interrupter returns an event, that event replaces the original one for subsequent interrupters and subscribers. If it returns None, the event is suppressed and propagation stops.
Custom events#
You can create your own custom events by subclassing BaseEvent. Because of its metaclass, field definitions automatically support value-based filtering in where clauses.