Stream
What is the Stream?#
The Stream in AG2 is a central event bus that facilitates communication between agents and system components. It operates on an event-driven architecture where components can publish events (derived from BaseEvent) and other components can subscribe to these events.
The Stream manages the flow of messages, actions, and tool calls, allowing for decoupled and scalable architectures. You interact with the Stream by publishing events to it and subscribing to specific events you want to listen to.
Passing a custom stream to an Agent#
By default, agents create a MemoryStream instance internally for each conversation.
However, you can pass a custom stream to an agent when calling its ask method. This allows you to set up subscribers on the stream before the conversation starts, letting you observe or intercept the agent's internal events.
How to subscribe to stream events#
You can listen to events flowing through the stream using the subscribe method. To filter which events you receive, you can use the where method.
Note
All event subscribers and interrupters support the same powerful execution context capabilities as Agent Tools.
By type-hinting the Context object (or using Depends, Inject, Variable), your subscribers can access injected dependencies, interact with human-in-the-loop flows, or access conversation variables.
For more detailed information on specific context features, see Dependency Injection, Context Variables, Depends, Human-in-the-loop.
Subscribe to events of a specific type#
To subscribe to events of a specific type, pass the event class to stream.where().
Subscribe to multiple event types#
You can subscribe to multiple event types using the bitwise OR operator (|).
Exclude events of a specific type#
You can negate an event type using the bitwise NOT operator (~). This creates a condition that matches everything except the specified type.
Subscribe to events with a specific value#
You can filter events based on the value of their fields. The event classes define fields that support comparison operators.
# Subscribe only to ToolCallEvent events where the name is "fetch_data"
stream.where(ToolCallEvent.name == "fetch_data").subscribe(handle_event)
Static subscribers with decorators#
You can use subscribe as a decorator to register static handlers cleanly. This works well with where filters.
Dynamic subscribers by context manager#
If you only need to listen to events for a specific duration or within a specific block of code, you can use sub_scope as a context manager. This dynamically subscribes the handler when entering the block and unsubscribes when exiting.
You can also combine this with where:
Get specific events#
To wait for and get the next occurrence of a specific event asynchronously, use the get async context manager. This yields a future that resolves to the matched event.
Iterate over stream events#
Use join() as a context manager when you want an async iterator of events for a bounded block of code. The iterator receives events published after the with block starts. Pass max_events to stop automatically after a fixed number of events.
Do not timeout individual join() pulls
Consume the iterator with a plain async for. Avoid wrapping individual pulls such as await asyncio.wait_for(events.__anext__(), timeout=...): cancelling one __anext__() closes the current iterator, so later events will not be yielded by that iterator. For bounded consumption, use join(max_events=N) or put one asyncio.timeout(...) around the whole loop.
Raise events manually#
To raise or publish events to the stream, you should always use the Context object rather than sending them to the stream directly. The Context ensures that dependencies and the stream scope are properly propagated.
from ag2 import Context
from ag2.events import ModelMessage
async def publish_message(context: Context):
event = ModelMessage(content="Hello from the agent!")
# Always use context.send() to raise events
await context.send(event)
Events Interrupters#
Interrupters allow you to intercept an event before it reaches regular subscribers. You can use an interrupter to modify the event, raise a completely different event in its place, or suppress it entirely.
To register an interrupter, pass interrupt=True to the subscribe method. If the interrupter returns an event, that event replaces the original one for subsequent interrupters and subscribers. If it returns None, the event is suppressed and propagation stops.
RedisStream — Persistent & Cross-Process Events#
RedisStream is a drop-in replacement for MemoryStream that adds persistent event history and cross-process pub/sub via Redis. Events are delivered to all subscribers — even across different processes or machines.
Basic Usage#
All the same subscription patterns (subscribe, where, sub_scope, get, interrupters) work exactly as with MemoryStream.
Serialization Format#
By default, events are serialized as JSON for human readability. You can switch to pickle for full Python object fidelity:
Cross-Process Communication#
Multiple RedisStream instances sharing the same id automatically receive each other's events via Redis Pub/Sub:
Cleanup#
Always close the stream when done to release Redis connections:
Custom events#
You can create your own custom events by subclassing BaseEvent. Because of its metaclass, field definitions automatically support value-based filtering in where clauses.