Observers
Observers let you attach lightweight, read-only event listeners directly to an Agent. Under the hood, each observer is a regular Stream subscriber — but you register it on the Agent instead of managing the stream yourself.
Use observers when you want to monitor agent behavior (logging, metrics, debugging) without writing a full Middleware class.
Creating an Observer#
Use the observer() function to pair an event condition with a callback. The first argument is the event type (or condition) to match, the second is an optional callback:
Registering Observers#
On the Agent constructor#
Pass observers when creating the agent. These observers are active for every agent.ask() call:
With the decorator method#
Use @agent.observer() to register an observer after agent creation. This mirrors the @agent.tool() and @agent.prompt() patterns:
Per-call observers#
Pass observers to a specific ask() call. These are scoped to that call only and are automatically cleaned up when the call finishes:
Constructor-level and per-call observers can be combined — both will fire:
Event Filtering#
Observers support the same condition system as stream subscriptions. You can filter by event type, combine types, or match on field values:
Dependency Injection#
Observer callbacks support the same dependency injection features as Agent Tools and stream subscribers — Context, Inject, Depends, and Variable:
Advanced Options#
Since observers are stream subscribers, they support the same interrupt and sync_to_thread parameters as stream.subscribe():
Observers vs Middleware vs Stream Subscribers#
| Feature | Observer | Middleware | Stream Subscriber |
|---|---|---|---|
| Registration | On Agent | On Agent | On Stream |
| Lifecycle | Scoped to execution | Scoped to execution | Manual |
| Boilerplate | Minimal — one function | Class with factory | Low — one function |
| Can modify events | Only with interrupt=True | Yes (wraps execution) | Only with interrupt=True |
| DI support | Yes | Yes | Yes |
| Use case | Monitoring, logging, metrics | Cross-cutting logic (retry, auth, rate limiting) | Low-level event wiring |
Trigger-Driven Observers (BaseObserver)#
The observer() factory above is perfect for one-off event hooks. But when you need stateful monitoring — detecting repeated tool calls, tracking cumulative token usage, rolling time-window metrics — subclass BaseObserver.
A BaseObserver is an ABC that pairs a Watch with a process() method. The Watch decides when to fire; process() decides what to do with the collected events. If process() returns an ObserverAlert, the base class emits it back onto the stream for other subscribers to consume.
BaseObserver vs @observer#
@observer (StreamObserver) | BaseObserver | |
|---|---|---|
| Shape | Function | Class |
| State | Stateless | Instance state (counters, history, etc.) |
| Trigger | Per matching event | Any Watch (event / batch / time / composite) |
| Output | Whatever your callback does | Optional ObserverAlert auto-emitted on the stream |
| Good for | Logging, metrics, one-offs | Thresholds, rate limits, loop detection, rolling stats |
Built-in observers#
Two ready-to-use BaseObserver subclasses ship with AG2 Beta:
LoopDetector#
Detects repetitive tool-call patterns. Maintains a sliding window of recent tool calls and emits a WARNING alert when repeat_threshold consecutive identical calls (same tool name and arguments) are observed.
TokenMonitor#
Tracks cumulative token usage across ModelResponse and TaskCompleted events. Emits WARNING / CRITICAL alerts as thresholds are crossed.
ObserverAlert#
Both built-ins (and any BaseObserver you write) emit ObserverAlert events on the stream. Subscribe to them like any other event:
Note
ObserverAlert is emitted to the stream and persisted in history, but is not rendered back to the LLM by the default provider mappers. If you want the agent itself to react to alerts, write a Middleware that converts alerts into follow-up messages.
Severity levels live in autogen.beta.events.Severity: INFO, WARNING, CRITICAL, FATAL.
Building a custom observer#
Subclass BaseObserver, pick a Watch, implement process():
Register it like any other observer:
process() may also emit events directly via await ctx.send(...) — returning an ObserverAlert is just the common case.