Skip to content

observer

autogen.beta.observers.observer.observer #

observer(condition: ClassInfo | Condition | None = None, callback: None = None, *, interrupt: bool = False, sync_to_thread: bool = True) -> Callable[[Callable[..., Any]], StreamObserver]
observer(condition: ClassInfo | Condition | None, callback: Callable[..., Any], *, interrupt: bool = False, sync_to_thread: bool = True) -> StreamObserver
observer(condition=None, callback=None, *, interrupt=False, sync_to_thread=True)
Source code in autogen/beta/observers/observer.py
def observer(
    condition: ClassInfo | Condition | None = None,
    callback: Callable[..., Any] | None = None,
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
) -> StreamObserver | Callable[[Callable[..., Any]], StreamObserver]:
    if condition is None:
        cond: Condition | None = None
    elif isinstance(condition, Condition):
        cond = condition
    else:
        cond = TypeCondition(condition)

    def decorator(func: Callable[..., Any]) -> StreamObserver:
        if cond is None:
            return SimpleObserver(
                callback=func,
                interrupt=interrupt,
                sync_to_thread=sync_to_thread,
            )
        return StreamObserver(
            condition=cond,
            callback=func,
            interrupt=interrupt,
            sync_to_thread=sync_to_thread,
        )

    if callback is not None:
        return decorator(callback)
    return decorator