Skip to content

Stream

autogen.beta.context.Stream #

Bases: Protocol

id instance-attribute #

id

send async #

send(event, context)
Source code in autogen/beta/context.py
async def send(self, event: BaseEvent, context: "Context") -> None: ...

where #

where(condition)
Source code in autogen/beta/context.py
def where(self, condition: ClassInfo | Condition) -> "Stream": ...

join #

join(*, max_events=None)
Source code in autogen/beta/context.py
def join(
    self,
    *,
    max_events: int | None = None,
) -> AbstractContextManager[AsyncIterator[BaseEvent]]: ...

subscribe #

subscribe(func: Callable[..., Any], *, interrupt: bool = False, sync_to_thread: bool = True, condition: Condition | None = None) -> SubId
subscribe(func: None = None, *, interrupt: bool = False, sync_to_thread: bool = True, condition: Condition | None = None) -> Callable[[Callable[..., Any]], SubId]
subscribe(func=None, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/context.py
def subscribe(
    self,
    func: Callable[..., Any] | None = None,
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
    condition: Condition | None = None,
) -> Callable[[Callable[..., Any]], SubId] | SubId: ...

unsubscribe #

unsubscribe(sub_id)
Source code in autogen/beta/context.py
def unsubscribe(self, sub_id: SubId) -> None: ...

sub_scope #

sub_scope(func, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/context.py
def sub_scope(
    self,
    func: Callable[..., Any],
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
    condition: Condition | None = None,
) -> AbstractContextManager[None]: ...

get #

get(condition)
Source code in autogen/beta/context.py
def get(
    self,
    condition: ClassInfo | Condition,
) -> AbstractAsyncContextManager[asyncio.Future[BaseEvent]]: ...