Skip to content

Observers

Observers let you attach lightweight, read-only event listeners directly to an Agent. Under the hood, each observer is a regular Stream subscriber — but you register it on the Agent instead of managing the stream yourself.

Use observers when you want to monitor agent behavior (logging, metrics, debugging) without writing a full Middleware class.

Creating an Observer#

Use the observer() function to pair an event condition with a callback. The first argument is the event type (or condition) to match, the second is an optional callback:

1
2
3
4
5
6
from autogen.beta import observer
from autogen.beta.events import ModelResponse

@observer(ModelResponse)
async def log_response(event: ModelResponse) -> None:
    print(f"Model said: {event.content}")

Registering Observers#

On the Agent constructor#

Pass observers when creating the agent. These observers are active for every agent.ask() call:

from autogen.beta import Agent, observer
from autogen.beta.config import OpenAIConfig
from autogen.beta.events import ModelResponse, ToolCallEvent

@observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

agent = Agent(
    "assistant",
    config=OpenAIConfig("gpt-4o-mini"),
    observers=[on_response],
)

reply = await agent.ask("Hello!")

With the decorator method#

Use @agent.observer() to register an observer after agent creation. This mirrors the @agent.tool() and @agent.prompt() patterns:

from autogen.beta import Agent

agent = Agent(
    "assistant",
    config=OpenAIConfig("gpt-4o-mini"),
)

@agent.observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

Per-call observers#

Pass observers to a specific ask() call. These are scoped to that call only and are automatically cleaned up when the call finishes:

1
2
3
4
reply = await agent.ask(
    "What is 2+2?",
    observers=[on_response],
)

Constructor-level and per-call observers can be combined — both will fire:

from autogen.beta import Agent

agent = Agent(
    "assistant",
    config=config,
    observers=[observer(ModelResponse, log_to_file)],
)

# log_to_file fires AND send_metric fires for this call
reply = await agent.ask(
    "Hello!",
    observers=[observer(ModelResponse, send_metric)],
)

Event Filtering#

Observers support the same condition system as stream subscriptions. You can filter by event type, combine types, or match on field values:

from autogen.beta.events import ModelRequest, ModelResponse, ToolCallEvent

# Single event type
@agent.observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

# Multiple event types with OR
@agent.observer(ModelRequest | ModelResponse)
def on_any(event: ModelRequest | ModelResponse) -> None:
    print(f"Event: {event}")

# Field-based filtering
@agent.observer(ToolCallEvent.name == "search")
def on_search(event: ToolCallEvent) -> None:
    print(f"Search: {event.name}")

# Negation — everything except a specific type
@agent.observer(~ToolCallEvent)
def on_non_tool(event) -> None:
    print(f"Non-tool event: {event}")

Dependency Injection#

Observer callbacks support the same dependency injection features as Agent Tools and stream subscribersContext, Inject, Depends, and Variable:

1
2
3
4
5
6
from autogen.beta import Context, observer
from autogen.beta.events import ModelResponse

@observer(ModelResponse)
async def track(event: ModelResponse, ctx: Context) -> None:
    print(f"Stream: {ctx.stream.id}, Response: {event.content}")

Advanced Options#

Since observers are stream subscribers, they support the same interrupt and sync_to_thread parameters as stream.subscribe():

# Async observer — disable sync_to_thread since it's already async
@observer(ModelResponse, sync_to_thread=False)
async def async_tracker(event: ModelResponse) -> None:
    await metrics.record(event)

# Interrupter — processes before regular subscribers and can modify events
@observer(ModelResponse, interrupt=True)
def intercept(event: ModelResponse) -> ModelResponse:
    event.content = event.content.upper()
    return event

Observers vs Middleware vs Stream Subscribers#

Feature Observer Middleware Stream Subscriber
Registration On Agent On Agent On Stream
Lifecycle Scoped to execution Scoped to execution Manual
Boilerplate Minimal — one function Class with factory Low — one function
Can modify events Only with interrupt=True Yes (wraps execution) Only with interrupt=True
DI support Yes Yes Yes
Use case Monitoring, logging, metrics Cross-cutting logic (retry, auth, rate limiting) Low-level event wiring

Trigger-Driven Observers (BaseObserver)#

The observer() factory above is perfect for one-off event hooks. But when you need stateful monitoring — detecting repeated tool calls, tracking cumulative token usage, rolling time-window metrics — subclass BaseObserver.

A BaseObserver is an ABC that pairs a Watch with a process() method. The Watch decides when to fire; process() decides what to do with the collected events. If process() returns an ObserverAlert, the base class emits it back onto the stream for other subscribers to consume.

BaseObserver vs @observer#

@observer (StreamObserver) BaseObserver
Shape Function Class
State Stateless Instance state (counters, history, etc.)
Trigger Per matching event Any Watch (event / batch / time / composite)
Output Whatever your callback does Optional ObserverAlert auto-emitted on the stream
Good for Logging, metrics, one-offs Thresholds, rate limits, loop detection, rolling stats

Built-in observers#

Two ready-to-use BaseObserver subclasses ship with AG2 Beta:

LoopDetector#

Detects repetitive tool-call patterns. Maintains a sliding window of recent tool calls and emits a WARNING alert when repeat_threshold consecutive identical calls (same tool name and arguments) are observed.

1
2
3
4
5
6
7
8
9
from autogen.beta import Agent
from autogen.beta.observer import LoopDetector
from autogen.beta.config import OpenAIConfig

agent = Agent(
    "poller",
    config=OpenAIConfig(model="gpt-5"),
    observers=[LoopDetector(window_size=10, repeat_threshold=3)],
)

TokenMonitor#

Tracks cumulative token usage across ModelResponse and TaskCompleted events. Emits WARNING / CRITICAL alerts as thresholds are crossed.

1
2
3
4
5
6
7
8
9
from autogen.beta import Agent
from autogen.beta.observer import TokenMonitor
from autogen.beta.config import OpenAIConfig

agent = Agent(
    "assistant",
    config=OpenAIConfig(model="gpt-5"),
    observers=[TokenMonitor(warn_threshold=50_000, alert_threshold=100_000)],
)

ObserverAlert#

Both built-ins (and any BaseObserver you write) emit ObserverAlert events on the stream. Subscribe to them like any other event:

1
2
3
4
5
6
7
8
from autogen.beta import MemoryStream
from autogen.beta.events import ObserverAlert

stream = MemoryStream()

@stream.where(ObserverAlert).subscribe
def surface_alerts(event: ObserverAlert) -> None:
    print(f"[{event.severity}] {event.source}: {event.message}")

Note

ObserverAlert is emitted to the stream and persisted in history, but is not rendered back to the LLM by the default provider mappers. If you want the agent itself to react to alerts, write a Middleware that converts alerts into follow-up messages.

Severity levels live in autogen.beta.events.Severity: INFO, WARNING, CRITICAL, FATAL.

Building a custom observer#

Subclass BaseObserver, pick a Watch, implement process():

from autogen.beta import Context
from autogen.beta.observer import BaseObserver
from autogen.beta.watch import CadenceWatch
from autogen.beta.events import BaseEvent, ModelResponse, ObserverAlert, Severity

class AvgCompletionObserver(BaseObserver):
    """Every N responses, emit an INFO alert with the average completion-token count."""

    def __init__(self, window: int = 5) -> None:
        super().__init__("avg-completion", watch=CadenceWatch(n=window, condition=ModelResponse))
        self._window = window

    async def process(self, events: list[BaseEvent], ctx: Context) -> ObserverAlert | None:
        tokens = [e.usage.completion_tokens for e in events if isinstance(e, ModelResponse) and e.usage]
        if not tokens:
            return None
        return ObserverAlert(
            source=self.name,
            severity=Severity.INFO,
            message=f"Avg completion tokens over last {self._window} responses: {sum(tokens) / len(tokens):.0f}",
        )

Register it like any other observer:

1
2
3
4
5
6
7
8
from autogen.beta import Agent
from autogen.beta.config import OpenAIConfig

agent = Agent(
    "assistant",
    config=OpenAIConfig(model="gpt-5"),
    observers=[AvgCompletionObserver(window=5)],
)

process() may also emit events directly via await ctx.send(...) — returning an ObserverAlert is just the common case.