Skip to content

observer

autogen.beta.observer.observer #

observer(condition: ClassInfo | Condition, callback: Callable[..., Any], *, interrupt: bool = False, sync_to_thread: bool = True) -> StreamObserver
observer(condition: ClassInfo | Condition, callback: None = None, *, interrupt: bool = False, sync_to_thread: bool = True) -> Callable[[Callable[..., Any]], StreamObserver]
observer(condition, callback=None, *, interrupt=False, sync_to_thread=True)
Source code in autogen/beta/observer.py
def observer(
    condition: ClassInfo | Condition,
    callback: Callable[..., Any] | None = None,
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
) -> StreamObserver | Callable[[Callable[..., Any]], StreamObserver]:
    cond = _ensure_condition(condition)

    if callback is not None:
        return StreamObserver(condition=cond, callback=callback, interrupt=interrupt, sync_to_thread=sync_to_thread)

    def decorator(func: Callable[..., Any]) -> StreamObserver:
        return StreamObserver(condition=cond, callback=func, interrupt=interrupt, sync_to_thread=sync_to_thread)

    return decorator