def observer(
condition: ClassInfo | Condition | None = None,
callback: Callable[..., Any] | None = None,
*,
interrupt: bool = False,
sync_to_thread: bool = True,
) -> StreamObserver | Callable[[Callable[..., Any]], StreamObserver]:
if condition is None:
cond: Condition | None = None
elif isinstance(condition, Condition):
cond = condition
else:
cond = TypeCondition(condition)
def decorator(func: Callable[..., Any]) -> StreamObserver:
if cond is None:
return SimpleObserver(
callback=func,
interrupt=interrupt,
sync_to_thread=sync_to_thread,
)
return StreamObserver(
condition=cond,
callback=func,
interrupt=interrupt,
sync_to_thread=sync_to_thread,
)
if callback is not None:
return decorator(callback)
return decorator