Skip to content

Observers

Observers let you attach lightweight, read-only event listeners directly to an Agent. Under the hood, each observer is a regular Stream subscriber — but you register it on the Agent instead of managing the stream yourself.

Use observers when you want to monitor agent behavior (logging, metrics, debugging) without writing a full Middleware class.

Creating an Observer#

Use the observer() function to pair an event condition with a callback. The first argument is the event type (or condition) to match, the second is an optional callback:

1
2
3
4
5
6
from ag2 import observer
from ag2.events import ModelResponse

@observer(ModelResponse)
async def log_response(event: ModelResponse) -> None:
    print(f"Model said: {event.content}")

Registering Observers#

On the Agent constructor#

Pass observers when creating the agent. These observers are active for every agent.ask() call:

from ag2 import Agent, observer
from ag2.config import OpenAIConfig
from ag2.events import ModelResponse, ToolCallEvent

@observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

agent = Agent(
    "assistant",
    config=OpenAIConfig("gpt-4o-mini"),
    observers=[on_response],
)

reply = await agent.ask("Hello!")

With the decorator method#

Use @agent.observer() to register an observer after agent creation. This mirrors the @agent.tool() and @agent.prompt() patterns:

from ag2 import Agent

agent = Agent(
    "assistant",
    config=OpenAIConfig("gpt-4o-mini"),
)

@agent.observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

Per-call observers#

Pass observers to a specific ask() call. These are scoped to that call only and are automatically cleaned up when the call finishes:

1
2
3
4
reply = await agent.ask(
    "What is 2+2?",
    observers=[on_response],
)

Constructor-level and per-call observers can be combined — both will fire:

from ag2 import Agent

agent = Agent(
    "assistant",
    config=config,
    observers=[observer(ModelResponse, log_to_file)],
)

# log_to_file fires AND send_metric fires for this call
reply = await agent.ask(
    "Hello!",
    observers=[observer(ModelResponse, send_metric)],
)

Event Filtering#

Observers support the same condition system as stream subscriptions. You can filter by event type, combine types, or match on field values:

from ag2.events import ModelRequest, ModelResponse, ToolCallEvent

# Single event type
@agent.observer(ModelResponse)
def on_response(event: ModelResponse) -> None:
    print(f"Response: {event.content}")

# Multiple event types with OR
@agent.observer(ModelRequest | ModelResponse)
def on_any(event: ModelRequest | ModelResponse) -> None:
    print(f"Event: {event}")

# Field-based filtering
@agent.observer(ToolCallEvent.name == "search")
def on_search(event: ToolCallEvent) -> None:
    print(f"Search: {event.name}")

# Negation — everything except a specific type
@agent.observer(~ToolCallEvent)
def on_non_tool(event) -> None:
    print(f"Non-tool event: {event}")

Dependency Injection#

Observer callbacks support the same dependency injection features as Agent Tools and stream subscribersContext, Inject, Depends, and Variable:

1
2
3
4
5
6
from ag2 import Context, observer
from ag2.events import ModelResponse

@observer(ModelResponse)
async def track(event: ModelResponse, ctx: Context) -> None:
    print(f"Stream: {ctx.stream.id}, Response: {event.content}")

Advanced Options#

Since observers are stream subscribers, they support the same interrupt and sync_to_thread parameters as stream.subscribe():

# Async observer — disable sync_to_thread since it's already async
@observer(ModelResponse, sync_to_thread=False)
async def async_tracker(event: ModelResponse) -> None:
    await metrics.record(event)

# Interrupter — processes before regular subscribers and can modify events
@observer(ModelResponse, interrupt=True)
def intercept(event: ModelResponse) -> ModelResponse:
    event.content = event.content.upper()
    return event

Observers vs Middleware vs Stream Subscribers#

Feature Observer Middleware Stream Subscriber
Registration On Agent On Agent On Stream
Lifecycle Scoped to execution Scoped to execution Manual
Boilerplate Minimal — one function Class with factory Low — one function
Can modify events Only with interrupt=True Yes (wraps execution) Only with interrupt=True
DI support Yes Yes Yes
Use case Monitoring, logging, metrics Cross-cutting logic (retry, auth, rate limiting) Low-level event wiring

Trigger-Driven Observers (BaseObserver)#

The observer() factory above is perfect for one-off event hooks. But when you need stateful monitoring — detecting repeated tool calls, tracking cumulative token usage, rolling time-window metrics — subclass BaseObserver.

A BaseObserver is an ABC that pairs a Watch with a process() method. The Watch decides when to fire; process() decides what to do with the collected events. If process() returns an ObserverAlert, the base class emits it back onto the stream for other subscribers to consume.

BaseObserver vs @observer#

@observer (StreamObserver) BaseObserver
Shape Function Class
State Stateless Instance state (counters, history, etc.)
Trigger Per matching event Any Watch (event / batch / time / composite)
Output Whatever your callback does Optional ObserverAlert auto-emitted on the stream
Good for Logging, metrics, one-offs Thresholds, rate limits, loop detection, rolling stats

Built-in observers#

Two ready-to-use BaseObserver subclasses ship with AG2:

LoopDetector#

Detects repetitive tool-call patterns. Maintains a sliding window of recent tool calls and emits a WARNING alert when repeat_threshold consecutive identical calls (same tool name and arguments) are observed.

1
2
3
4
5
6
7
8
9
from ag2 import Agent
from ag2.observer import LoopDetector
from ag2.config import OpenAIConfig

agent = Agent(
    "poller",
    config=OpenAIConfig(model="gpt-5"),
    observers=[LoopDetector(window_size=10, repeat_threshold=3)],
)

TokenMonitor#

Tracks cumulative token usage across ModelResponse and TaskCompleted events. Emits WARNING / CRITICAL alerts as thresholds are crossed.

1
2
3
4
5
6
7
8
9
from ag2 import Agent
from ag2.observer import TokenMonitor
from ag2.config import OpenAIConfig

agent = Agent(
    "assistant",
    config=OpenAIConfig(model="gpt-5"),
    observers=[TokenMonitor(warn_threshold=50_000, alert_threshold=100_000)],
)

ObserverAlert#

Both built-ins (and any BaseObserver you write) emit ObserverAlert events on the stream. Subscribe to them like any other event:

1
2
3
4
5
6
7
8
from ag2 import MemoryStream
from ag2.events import ObserverAlert

stream = MemoryStream()

@stream.where(ObserverAlert).subscribe
def surface_alerts(event: ObserverAlert) -> None:
    print(f"[{event.severity}] {event.source}: {event.message}")

Note

ObserverAlert is emitted to the stream and persisted in history, but is not rendered back to the LLM by the default provider mappers. If you want the agent itself to react to alerts, write a Middleware that converts alerts into follow-up messages.

Severity levels live in ag2.events.Severity: INFO, WARNING, CRITICAL, FATAL.

Building a custom observer#

Subclass BaseObserver, pick a Watch, implement process():

from ag2 import Context
from ag2.observer import BaseObserver
from ag2.watch import CadenceWatch
from ag2.events import BaseEvent, ModelResponse, ObserverAlert, Severity

class AvgCompletionObserver(BaseObserver):
    """Every N responses, emit an INFO alert with the average completion-token count."""

    def __init__(self, window: int = 5) -> None:
        super().__init__("avg-completion", watch=CadenceWatch(n=window, condition=ModelResponse))
        self._window = window

    async def process(self, events: list[BaseEvent], ctx: Context) -> ObserverAlert | None:
        tokens = [e.usage.completion_tokens for e in events if isinstance(e, ModelResponse) and e.usage]
        if not tokens:
            return None
        return ObserverAlert(
            source=self.name,
            severity=Severity.INFO,
            message=f"Avg completion tokens over last {self._window} responses: {sum(tokens) / len(tokens):.0f}",
        )

Register it like any other observer:

1
2
3
4
5
6
7
8
from ag2 import Agent
from ag2.config import OpenAIConfig

agent = Agent(
    "assistant",
    config=OpenAIConfig(model="gpt-5"),
    observers=[AvgCompletionObserver(window=5)],
)

process() may also emit events directly via await ctx.send(...) — returning an ObserverAlert is just the common case.