def observer(
condition: ClassInfo | Condition,
callback: Callable[..., Any] | None = None,
*,
interrupt: bool = False,
sync_to_thread: bool = True,
) -> StreamObserver | Callable[[Callable[..., Any]], StreamObserver]:
cond = _ensure_condition(condition)
if callback is not None:
return StreamObserver(condition=cond, callback=callback, interrupt=interrupt, sync_to_thread=sync_to_thread)
def decorator(func: Callable[..., Any]) -> StreamObserver:
return StreamObserver(condition=cond, callback=func, interrupt=interrupt, sync_to_thread=sync_to_thread)
return decorator