Skip to content

AGUIStream

autogen.ag_ui.AGUIStream #

AGUIStream(agent)
Source code in autogen/ag_ui/adapter.py
def __init__(self, agent: ConversableAgent) -> None:
    self.__agent = agent
    self.service = AgentService(agent)

service instance-attribute #

service = AgentService(agent)

dispatch async #

dispatch(incoming, *, context=None, accept=None)
Source code in autogen/ag_ui/adapter.py
async def dispatch(
    self,
    incoming: RunAgentInput,
    *,
    context: dict[str, Any] | None = None,
    accept: str | None = None,
) -> AsyncIterator[str]:
    # TODO: put it into ContextVariables if it supports unpickleable objects
    write_events_stream, read_events_stream = create_memory_object_stream[BaseEvent]()

    state = ContextVariables()
    # add agent initial context
    state.update(self.__agent.context_variables.to_dict())
    # add frontend-passed context
    state.update(incoming.state or {})
    # add manual-passed context
    state.update(context or {})

    async with create_task_group() as tg:
        tg.start_soon(
            run_stream,
            AGStreamInput(incoming=incoming, context=state),
            self.service,
            write_events_stream,
        )

        # EventEncoder typed incompletely, so we need to ignore the type error
        encoder = EventEncoder(accept=accept)  # type: ignore[arg-type]

        async with read_events_stream:
            async for event in read_events_stream:
                yield encoder.encode(event)

build_asgi #

build_asgi()

Build an ASGI endpoint for the AGUIStream.

Source code in autogen/ag_ui/adapter.py
def build_asgi(self) -> "type[HTTPEndpoint]":
    """Build an ASGI endpoint for the AGUIStream."""
    # import here to avoid Starlette requirements in the main package
    from .asgi import build_asgi

    return build_asgi(self)