Skip to content

AGUIStream

autogen.beta.ag_ui.stream.AGUIStream #

AGUIStream(agent)
Source code in autogen/beta/ag_ui/stream.py
def __init__(self, agent: Agent) -> None:
    self.__agent = agent

build_asgi #

build_asgi()

Build an ASGI endpoint for the AGUIStream.

Source code in autogen/beta/ag_ui/stream.py
def build_asgi(self) -> "type[HTTPEndpoint]":
    """Build an ASGI endpoint for the AGUIStream."""
    # import here to avoid Starlette requirements in the main package
    from .asgi import build_asgi

    return build_asgi(self)

dispatch async #

dispatch(incoming, *, variables=None, prompt=(), dependencies=None, config=None, tools=(), middleware=(), observers=(), hitl_hook=None, accept=None)
Source code in autogen/beta/ag_ui/stream.py
async def dispatch(
    self,
    incoming: RunAgentInput,
    *,
    variables: dict[str, Any] | None = None,
    prompt: Iterable[str] = (),
    dependencies: dict[Any, Any] | None = None,
    config: ModelConfig | None = None,
    tools: Iterable[Tool] = (),
    middleware: Iterable[MiddlewareFactory] = (),
    observers: Iterable[Observer] = (),
    hitl_hook: HumanHook | None = None,
    accept: str | None = None,
) -> AsyncIterator[str]:
    write_events_stream, read_events_stream = create_memory_object_stream[BaseEvent]()

    async with create_task_group() as tg:
        tg.start_soon(
            run_stream,
            AGStreamInput(
                incoming=incoming,
                variables=variables or {},
                prompt=list(prompt),
                dependencies=dependencies,
                config=config,
                tools=list(tools),
                middleware=list(middleware),
                observers=list(observers),
                hitl_hook=hitl_hook,
            ),
            self.__agent,
            write_events_stream,
        )

        # EventEncoder typed incompletely, so we need to ignore the type error
        encoder = EventEncoder(accept=accept)  # type: ignore[arg-type]

        async with read_events_stream:
            async for event in read_events_stream:
                yield encoder.encode(event)