Skip to content

StreamObserver

autogen.beta.observers.observer.StreamObserver dataclass #

StreamObserver(*, callback, interrupt=False, sync_to_thread=True, condition)

Bases: SimpleObserver

Lightweight condition → callback stream subscription.

Produced by :func:observer. Enters a sub_scope on the filtered stream; the ExitStack cleans up the subscription when the agent finishes. When condition is None the observer fires on every event.

condition instance-attribute #

condition

callback instance-attribute #

callback

interrupt class-attribute instance-attribute #

interrupt = False

sync_to_thread class-attribute instance-attribute #

sync_to_thread = True

register #

register(stack, context)
Source code in autogen/beta/observers/observer.py
def register(self, stack: ExitStack | AsyncExitStack, context: Context) -> None:
    stack.enter_context(
        context.stream.where(self.condition).sub_scope(
            self.callback,
            interrupt=self.interrupt,
            sync_to_thread=self.sync_to_thread,
        )
    )