Skip to content

Stream

autogen.beta.context.Stream #

Bases: Protocol

id instance-attribute #

id

pending_messages instance-attribute #

pending_messages

Inbox of follow-up turns produced asynchronously (e.g. by background tasks). The agent loop drains this before each model call; whatever lands here while no ask is running is consumed by the next ask on this stream and merged into its initial request.

send async #

send(event, context)
Source code in autogen/beta/context.py
async def send(self, event: BaseEvent, context: "ConversationContext") -> None: ...

enqueue #

enqueue(*content)

Append a follow-up turn to this stream's inbox.

Source code in autogen/beta/context.py
def enqueue(self, *content: "SendableMessage | Input") -> None:
    """Append a follow-up turn to this stream's inbox."""
    ...

spawn_background #

spawn_background(coro)

Start a fire-and-forget task in this stream's scope.

The task is not awaited by the agent loop. Tasks deliver their results via self.enqueue(...) — anything enqueued while an ask is live feeds the next model call; anything enqueued after ask returned sits in pending_messages and is consumed by the next ask on the same stream.

Source code in autogen/beta/context.py
def spawn_background(self, coro: Coroutine[Any, Any, None]) -> asyncio.Task[None]:
    """Start a fire-and-forget task in this stream's scope.

    The task is not awaited by the agent loop. Tasks deliver their results
    via ``self.enqueue(...)`` — anything enqueued while an ``ask`` is live
    feeds the next model call; anything enqueued after ``ask`` returned
    sits in ``pending_messages`` and is consumed by the next ``ask`` on
    the same stream.
    """
    ...

where #

where(condition)
Source code in autogen/beta/context.py
def where(self, condition: ClassInfo | Condition) -> "Stream": ...

join #

join(*, max_events=None)
Source code in autogen/beta/context.py
def join(
    self,
    *,
    max_events: int | None = None,
) -> AbstractContextManager[AsyncIterator[BaseEvent]]: ...

subscribe #

subscribe(func: Callable[..., Any], *, interrupt: bool = False, sync_to_thread: bool = True, condition: Condition | None = None) -> SubId
subscribe(func: None = None, *, interrupt: bool = False, sync_to_thread: bool = True, condition: Condition | None = None) -> Callable[[Callable[..., Any]], SubId]
subscribe(func=None, *, interrupt=False, sync_to_thread=True, condition=None)
Source code in autogen/beta/context.py
def subscribe(
    self,
    func: Callable[..., Any] | None = None,
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
    condition: Condition | None = None,
) -> Callable[[Callable[..., Any]], SubId] | SubId: ...

unsubscribe #

unsubscribe(sub_id)
Source code in autogen/beta/context.py
def unsubscribe(self, sub_id: SubId) -> None: ...

sub_scope #

sub_scope(func, *, interrupt=False, sync_to_thread=True)
Source code in autogen/beta/context.py
def sub_scope(
    self,
    func: Callable[..., Any],
    *,
    interrupt: bool = False,
    sync_to_thread: bool = True,
) -> AbstractContextManager[None]: ...

get #

get(condition)
Source code in autogen/beta/context.py
def get(
    self,
    condition: ClassInfo | Condition,
) -> AbstractAsyncContextManager[asyncio.Future[BaseEvent]]: ...