Skip to content

Stream

What is the Stream?#

The Stream in AG2 Beta is a central event bus that facilitates communication between agents and system components. It operates on an event-driven architecture where components can publish events (derived from BaseEvent) and other components can subscribe to these events.

The Stream manages the flow of messages, actions, and tool calls, allowing for decoupled and scalable architectures. You interact with the Stream by publishing events to it and subscribing to specific events you want to listen to.

Passing a custom stream to an Agent#

By default, agents create a MemoryStream instance internally for each conversation.

However, you can pass a custom stream to an agent when calling its ask method. This allows you to set up subscribers on the stream before the conversation starts, letting you observe or intercept the agent's internal events.

from autogen.beta import Agent, MemoryStream
from autogen.beta.events import ModelRequest, ModelResponse

agent = Agent("my_agent")

my_stream = MemoryStream()

# Subscribe to see what the model requests and returns
@my_stream.where(ModelRequest | ModelResponse).subscribe()
def log_model_activity(event):
    print(f"[{event.__class__.__name__}] {event}")

# Pass the stream to the agent
response = await agent.ask("Hello!", stream=my_stream)

How to subscribe to stream events#

You can listen to events flowing through the stream using the subscribe method. To filter which events you receive, you can use the where method.

Note

All event subscribers and interrupters support the same powerful execution context capabilities as Agent Tools.

By type-hinting the Context object (or using Depends, Inject, Variable), your subscribers can access injected dependencies, interact with human-in-the-loop flows, or access conversation variables.

For more detailed information on specific context features, see Dependency Injection, Context Variables, Depends, Human-in-the-loop.

Subscribe to events of a specific type#

To subscribe to events of a specific type, pass the event class to stream.where().

from autogen.beta import MemoryStream
from autogen.beta.events import ToolCall

stream = MemoryStream()

def handle_tool_call(event: ToolCall):
    print(f"Tool called: {event.name}")

# Subscribe only to ToolCall events
stream.where(ToolCall).subscribe(handle_tool_call)

Subscribe to multiple event types#

You can subscribe to multiple event types using the bitwise OR operator (|).

1
2
3
4
5
6
7
from autogen.beta.events import ToolCall, ModelMessage

def handle_event(event):
    print(f"Received event: {event}")

# Subscribe to either ToolCall or ModelMessage events
stream.where(ToolCall | ModelMessage).subscribe(handle_event)

Subscribe to events with a specific value#

You can filter events based on the value of their fields. The event classes define fields that support comparison operators.

# Subscribe only to ToolCall events where the name is "fetch_data"
stream.where(ToolCall.name == "fetch_data").subscribe(handle_event)

Static subscribers with decorators#

You can use subscribe as a decorator to register static handlers cleanly. This works well with where filters.

1
2
3
4
5
6
7
8
9
stream = MemoryStream()

@stream.where(ToolCall).subscribe()
def on_tool_call(event: ToolCall):
    print(f"Handling tool call: {event.name}")

@stream.subscribe()
def on_any_event(event):
    print(f"Global logger: {event}")

Dynamic subscribers by context manager#

If you only need to listen to events for a specific duration or within a specific block of code, you can use sub_scope as a context manager. This dynamically subscribes the handler when entering the block and unsubscribes when exiting.

1
2
3
4
5
6
7
def temp_listener(event):
    print("Temporary event captured:", event)

# temp_listener is active only inside the with block
with stream.sub_scope(temp_listener):
    # Perform actions that might trigger events
    pass

You can also combine this with where:

with stream.where(ModelMessage).sub_scope(temp_listener):
    pass

Get specific events#

To wait for and get the next occurrence of a specific event asynchronously, use the get async context manager. This yields a future that resolves to the matched event.

1
2
3
4
5
6
7
from autogen.beta.events import HumanMessage

async def wait_for_human(stream: MemoryStream):
    async with stream.get(HumanMessage) as response:
        # Code here can trigger the event, or we just wait
        event = await response
        print(f"User said: {event.content}")

Raise events manually#

To raise or publish events to the stream, you should always use the Context object rather than sending them to the stream directly. The Context ensures that dependencies and the stream scope are properly propagated.

from autogen.beta import Context
from autogen.beta.events import ModelMessage

async def publish_message(context: Context):
    event = ModelMessage(content="Hello from the agent!")
    # Always use context.send() to raise events
    await context.send(event)

Events Interrupters#

Interrupters allow you to intercept an event before it reaches regular subscribers. You can use an interrupter to modify the event, raise a completely different event in its place, or suppress it entirely.

To register an interrupter, pass interrupt=True to the subscribe method. If the interrupter returns an event, that event replaces the original one for subsequent interrupters and subscribers. If it returns None, the event is suppressed and propagation stops.

@stream.where(ModelMessage).subscribe(interrupt=True)
async def intercept_message(
    event: ModelMessage,
    context: Context,
) -> BaseEvent | None:
    if "secret" in event.content:
        # Suppress the event by returning None
        return None

    elif "alert" in event.content:
        # Replace it with a different event
        await context.send(AlertEvent(message=event.content))
        return None

    # Or modify and return the original event
    event.content = event.content.upper()
    return event

Custom events#

You can create your own custom events by subclassing BaseEvent. Because of its metaclass, field definitions automatically support value-based filtering in where clauses.

from autogen.beta.events import BaseEvent

class PaymentProcessed(BaseEvent):
    amount: float
    status: str

# You can now filter by fields on your custom event:
@stream.where(PaymentProcessed.status == "success").subscribe()
def handle_success(event: PaymentProcessed):
    print(f"Payment processed: {event.amount}")

# And raise them via Context
async def process_payment(context: Context):
    await context.send(PaymentProcessed(amount=100.50, status="success"))