Skip to content

Stream

What is the Stream?#

The Stream in AG2 is a central event bus that facilitates communication between agents and system components. It operates on an event-driven architecture where components can publish events (derived from BaseEvent) and other components can subscribe to these events.

The Stream manages the flow of messages, actions, and tool calls, allowing for decoupled and scalable architectures. You interact with the Stream by publishing events to it and subscribing to specific events you want to listen to.

Passing a custom stream to an Agent#

By default, agents create a MemoryStream instance internally for each conversation.

However, you can pass a custom stream to an agent when calling its ask method. This allows you to set up subscribers on the stream before the conversation starts, letting you observe or intercept the agent's internal events.

from ag2 import Agent, MemoryStream
from ag2.events import ModelRequest, ModelResponse

agent = Agent("my_agent")

my_stream = MemoryStream()

# Subscribe to see what the model requests and returns
@my_stream.where(ModelRequest | ModelResponse).subscribe()
def log_model_activity(event):
    print(f"[{event.__class__.__name__}] {event}")

# Pass the stream to the agent
response = await agent.ask("Hello!", stream=my_stream)

How to subscribe to stream events#

You can listen to events flowing through the stream using the subscribe method. To filter which events you receive, you can use the where method.

Note

All event subscribers and interrupters support the same powerful execution context capabilities as Agent Tools.

By type-hinting the Context object (or using Depends, Inject, Variable), your subscribers can access injected dependencies, interact with human-in-the-loop flows, or access conversation variables.

For more detailed information on specific context features, see Dependency Injection, Context Variables, Depends, Human-in-the-loop.

Subscribe to events of a specific type#

To subscribe to events of a specific type, pass the event class to stream.where().

from ag2 import MemoryStream
from ag2.events import ToolCallEvent

stream = MemoryStream()

def handle_tool_call(event: ToolCallEvent):
    print(f"Tool called: {event.name}")

# Subscribe only to ToolCallEvent events
stream.where(ToolCallEvent).subscribe(handle_tool_call)

Subscribe to multiple event types#

You can subscribe to multiple event types using the bitwise OR operator (|).

1
2
3
4
5
6
7
from ag2.events import ToolCallEvent, ModelMessage

def handle_event(event):
    print(f"Received event: {event}")

# Subscribe to either ToolCallEvent or ModelMessage events
stream.where(ToolCallEvent | ModelMessage).subscribe(handle_event)

Exclude events of a specific type#

You can negate an event type using the bitwise NOT operator (~). This creates a condition that matches everything except the specified type.

# Subscribe to all events except ToolCallEvent
stream.where(~ToolCallEvent).subscribe(handle_event)

Subscribe to events with a specific value#

You can filter events based on the value of their fields. The event classes define fields that support comparison operators.

# Subscribe only to ToolCallEvent events where the name is "fetch_data"
stream.where(ToolCallEvent.name == "fetch_data").subscribe(handle_event)

Static subscribers with decorators#

You can use subscribe as a decorator to register static handlers cleanly. This works well with where filters.

1
2
3
4
5
6
7
8
9
stream = MemoryStream()

@stream.where(ToolCallEvent).subscribe()
def on_tool_call(event: ToolCallEvent):
    print(f"Handling tool call: {event.name}")

@stream.subscribe()
def on_any_event(event):
    print(f"Global logger: {event}")

Dynamic subscribers by context manager#

If you only need to listen to events for a specific duration or within a specific block of code, you can use sub_scope as a context manager. This dynamically subscribes the handler when entering the block and unsubscribes when exiting.

1
2
3
4
5
6
7
def temp_listener(event):
    print("Temporary event captured:", event)

# temp_listener is active only inside the with block
with stream.sub_scope(temp_listener):
    # Perform actions that might trigger events
    pass

You can also combine this with where:

with stream.where(ModelMessage).sub_scope(temp_listener):
    pass

Get specific events#

To wait for and get the next occurrence of a specific event asynchronously, use the get async context manager. This yields a future that resolves to the matched event.

1
2
3
4
5
6
7
from ag2.events import HumanMessage

async def wait_for_human(stream: MemoryStream):
    async with stream.get(HumanMessage) as response:
        # Code here can trigger the event, or we just wait
        event = await response
        print(f"User said: {event.content}")

Iterate over stream events#

Use join() as a context manager when you want an async iterator of events for a bounded block of code. The iterator receives events published after the with block starts. Pass max_events to stop automatically after a fixed number of events.

1
2
3
4
5
from ag2.events import ToolResultEvent

with stream.where(ToolResultEvent).join(max_events=1) as results:
    async for event in results:
        print(f"Tool result: {event.result}")

Do not timeout individual join() pulls

Consume the iterator with a plain async for. Avoid wrapping individual pulls such as await asyncio.wait_for(events.__anext__(), timeout=...): cancelling one __anext__() closes the current iterator, so later events will not be yielded by that iterator. For bounded consumption, use join(max_events=N) or put one asyncio.timeout(...) around the whole loop.

Raise events manually#

To raise or publish events to the stream, you should always use the Context object rather than sending them to the stream directly. The Context ensures that dependencies and the stream scope are properly propagated.

from ag2 import Context
from ag2.events import ModelMessage

async def publish_message(context: Context):
    event = ModelMessage(content="Hello from the agent!")
    # Always use context.send() to raise events
    await context.send(event)

Events Interrupters#

Interrupters allow you to intercept an event before it reaches regular subscribers. You can use an interrupter to modify the event, raise a completely different event in its place, or suppress it entirely.

To register an interrupter, pass interrupt=True to the subscribe method. If the interrupter returns an event, that event replaces the original one for subsequent interrupters and subscribers. If it returns None, the event is suppressed and propagation stops.

@stream.where(ModelMessage).subscribe(interrupt=True)
async def intercept_message(
    event: ModelMessage,
    context: Context,
) -> BaseEvent | None:
    if "secret" in event.content:
        # Suppress the event by returning None
        return None

    elif "alert" in event.content:
        # Replace it with a different event
        await context.send(AlertEvent(message=event.content))
        return None

    # Or modify and return the original event
    event.content = event.content.upper()
    return event

RedisStream — Persistent & Cross-Process Events#

RedisStream is a drop-in replacement for MemoryStream that adds persistent event history and cross-process pub/sub via Redis. Events are delivered to all subscribers — even across different processes or machines.

pip install "ag2[redis]"

Basic Usage#

from ag2 import Agent
from ag2.streams.redis import RedisStream
from ag2.config import OpenAIConfig

stream = RedisStream("redis://localhost:6379")

agent = Agent(
    "assistant",
    prompt="You are a helpful assistant.",
    config=OpenAIConfig("gpt-4o-mini"),
)

reply = await agent.ask("Hello!", stream=stream)

# History is persisted in Redis and survives restarts
history = list(await stream.history.get_events())

All the same subscription patterns (subscribe, where, sub_scope, get, interrupters) work exactly as with MemoryStream.

Serialization Format#

By default, events are serialized as JSON for human readability. You can switch to pickle for full Python object fidelity:

1
2
3
4
5
6
7
from ag2.streams.redis import RedisStream, Serializer

# JSON (default) — readable in Redis tools
stream = RedisStream("redis://localhost:6379")

# Pickle — preserves exact Python types
stream = RedisStream("redis://localhost:6379", serializer=Serializer.PICKLE)

Cross-Process Communication#

Multiple RedisStream instances sharing the same id automatically receive each other's events via Redis Pub/Sub:

from uuid import UUID
from ag2.streams.redis import RedisStream

STREAM_ID = UUID("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")

# Process A
stream_a = RedisStream("redis://localhost:6379", id=STREAM_ID)

# Process B (separate Python process)
stream_b = RedisStream("redis://localhost:6379", id=STREAM_ID)

# Events sent on stream_a are received by subscribers on stream_b, and vice versa

Cleanup#

Always close the stream when done to release Redis connections:

await stream.close()

Custom events#

You can create your own custom events by subclassing BaseEvent. Because of its metaclass, field definitions automatically support value-based filtering in where clauses.

from ag2.events import BaseEvent

class PaymentProcessed(BaseEvent):
    amount: float
    status: str

# You can now filter by fields on your custom event:
@stream.where(PaymentProcessed.status == "success").subscribe()
def handle_success(event: PaymentProcessed):
    print(f"Payment processed: {event.amount}")

# And raise them via Context
async def process_payment(context: Context):
    await context.send(PaymentProcessed(amount=100.50, status="success"))