Skip to content

Observers

Observers let you attach lightweight, read-only event listeners directly to an Agent. Under the hood, each observer is a regular Stream subscriber — but you register it on the Agent instead of managing the stream yourself.

Use observers when you want to monitor agent behavior (logging, metrics, debugging) without writing a full Middleware class.

Creating an Observer#

Use the observer() function to pair an event condition with a callback. The first argument is the event type (or condition) to match, the second is an optional callback:

1
2
3
4
5
6
from autogen.beta import observer
from autogen.beta.events import ModelResponse

@observer(ModelResponse)
async def log_response(event: ModelResponse) -> None:
    print(f"Model said: {event.content}")

Registering Observers#

On the Agent constructor#

Pass observers when creating the agent. These observers are active for every agent.ask() call:

from autogen.beta import Agent, observer
from autogen.beta.config import OpenAIConfig
from autogen.beta.events import ModelResponse, ToolCallEvent

@observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

agent = Agent(
    "assistant",
    config=OpenAIConfig("gpt-4o-mini"),
    observers=[on_response],
)

reply = await agent.ask("Hello!")

With the decorator method#

Use @agent.observer() to register an observer after agent creation. This mirrors the @agent.tool() and @agent.prompt() patterns:

from autogen.beta import Agent

agent = Agent(
    "assistant",
    config=OpenAIConfig("gpt-4o-mini"),
)

@agent.observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

Per-call observers#

Pass observers to a specific ask() call. These are scoped to that call only and are automatically cleaned up when the call finishes:

1
2
3
4
reply = await agent.ask(
    "What is 2+2?",
    observers=[on_response],
)

Constructor-level and per-call observers can be combined — both will fire:

from autogen.beta import Agent

agent = Agent(
    "assistant",
    config=config,
    observers=[observer(ModelResponse, log_to_file)],
)

# log_to_file fires AND send_metric fires for this call
reply = await agent.ask(
    "Hello!",
    observers=[observer(ModelResponse, send_metric)],
)

Event Filtering#

Observers support the same condition system as stream subscriptions. You can filter by event type, combine types, or match on field values:

from autogen.beta.events import ModelRequest, ModelResponse, ToolCallEvent

# Single event type
@agent.observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

# Multiple event types with OR
@agent.observer(ModelRequest | ModelResponse)
def on_any(event: ModelRequest | ModelResponse) -> None:
    print(f"Event: {event}")

# Field-based filtering
@agent.observer(ToolCallEvent.name == "search")
def on_search(event: ToolCallEvent) -> None:
    print(f"Search: {event.name}")

Dependency Injection#

Observer callbacks support the same dependency injection features as Agent Tools and stream subscribersContext, Inject, Depends, and Variable:

1
2
3
4
5
6
from autogen.beta import Context, observer
from autogen.beta.events import ModelResponse

@observer(ModelResponse)
async def track(event: ModelResponse, ctx: Context) -> None:
    print(f"Stream: {ctx.stream.id}, Response: {event.content}")

Advanced Options#

Since observers are stream subscribers, they support the same interrupt and sync_to_thread parameters as stream.subscribe():

# Async observer — disable sync_to_thread since it's already async
@observer(ModelResponse, sync_to_thread=False)
async def async_tracker(event: ModelResponse) -> None:
    await metrics.record(event)

# Interrupter — processes before regular subscribers and can modify events
@observer(ModelResponse, interrupt=True)
def intercept(event: ModelResponse) -> ModelResponse:
    event.content = event.content.upper()
    return event

Observers vs Middleware vs Stream Subscribers#

Feature Observer Middleware Stream Subscriber
Registration On Agent On Agent On Stream
Lifecycle Scoped to execution Scoped to execution Manual
Boilerplate Minimal — one function Class with factory Low — one function
Can modify events Only with interrupt=True Yes (wraps execution) Only with interrupt=True
DI support Yes Yes Yes
Use case Monitoring, logging, metrics Cross-cutting logic (retry, auth, rate limiting) Low-level event wiring