Bases: ABC
Trigger-driven observer. Subclasses implement :meth:process.
The :class:Watch handles stream subscription and event buffering. When the watch fires, :meth:process is called with the collected events. If process returns an :class:ObserverAlert, it is emitted on the stream.
Parameters
name: Observer display name (used in alert source field). watch: Watch strategy that determines when process is called.
Source code in autogen/beta/observers/observer.py
| def __init__(self, name: str, watch: Watch) -> None:
self.name = name
self._watch = watch
self._ctx: Context | None = None
|
register
Source code in autogen/beta/observers/observer.py
| def register(self, stack: ExitStack | AsyncExitStack, context: Context) -> None:
if self._watch.is_armed:
self._watch.disarm()
self._ctx = context
self._watch.arm(context.stream, self._on_watch)
stack.callback(self._disarm)
|
process abstractmethod async
Analyze events and optionally return an alert.
Source code in autogen/beta/observers/observer.py
| @abstractmethod
async def process(self, events: list[BaseEvent], ctx: Context) -> ObserverAlert | None:
"""Analyze events and optionally return an alert."""
...
|