Skip to content

AGUIStream

autogen.ag_ui.AGUIStream #

AGUIStream(agent, *, event_interceptors=())
Source code in autogen/ag_ui/adapter.py
def __init__(
    self,
    agent: ConversableAgent,
    *,
    event_interceptors: Iterable[Callable[[ServiceResponse], AsyncIterator[BaseEvent]]] = (),
) -> None:
    self.__agent = agent
    self.__event_interceptors = event_interceptors
    self.service = AgentService(agent)

service instance-attribute #

service = AgentService(agent)

dispatch async #

dispatch(incoming, *, context=None, accept=None)
Source code in autogen/ag_ui/adapter.py
async def dispatch(
    self,
    incoming: RunAgentInput,
    *,
    context: dict[str, Any] | None = None,
    accept: str | None = None,
) -> AsyncIterator[str]:
    # TODO: put it into ContextVariables if it supports unpickleable objects
    write_events_stream, read_events_stream = create_memory_object_stream[BaseEvent]()

    state = ContextVariables()
    # add agent initial context
    state.update(self.__agent.context_variables.to_dict())
    # add frontend-passed context
    state.update(incoming.state or {})
    # add manual-passed context
    state.update(context or {})

    async with create_task_group() as tg:
        tg.start_soon(
            run_stream,
            AGStreamInput(incoming=incoming, context=state),
            self.service,
            write_events_stream,
            self.__event_interceptors,
        )

        # EventEncoder typed incompletely, so we need to ignore the type error
        encoder = EventEncoder(accept=accept)  # type: ignore[arg-type]

        async with read_events_stream:
            async for event in read_events_stream:
                yield encoder.encode(event)

build_asgi #

build_asgi()

Build an ASGI endpoint for the AGUIStream.

Source code in autogen/ag_ui/adapter.py
def build_asgi(self) -> "type[HTTPEndpoint]":
    """Build an ASGI endpoint for the AGUIStream."""
    # import here to avoid Starlette requirements in the main package
    from .asgi import build_asgi

    return build_asgi(self)