Skip to content

A2UIAgentExecutor

autogen.agents.experimental.a2ui.a2a_executor.A2UIAgentExecutor #

A2UIAgentExecutor(agent, delimiter=A2UI_DEFAULT_DELIMITER, version_string='v0.9')

Bases: AgentExecutor

A2A executor that preserves A2UI content as DataParts.

When an agent's response contains A2UI JSON (separated by a delimiter), this executor splits it into: - A TextPart for the conversational text - A DataPart with MIME type application/json+a2ui for the A2UI operations

Also handles A2UI extension negotiation via try_activate_a2ui_extension().

Source code in autogen/agents/experimental/a2ui/a2a_executor.py
def __init__(
    self,
    agent: ConversableAgent,
    delimiter: str = A2UI_DEFAULT_DELIMITER,
    version_string: str = "v0.9",
) -> None:
    self._agent = agent
    self._agent_service = AgentService(agent)
    self._parser = A2UIResponseParser(version_string=version_string, delimiter=delimiter)

execute async #

execute(context, event_queue)
Source code in autogen/agents/experimental/a2ui/a2a_executor.py
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
    assert context.message

    # Negotiate A2UI extension
    use_a2ui = try_activate_a2ui_extension(context)
    if use_a2ui:
        logger.info("A2UI extension activated for this request.")

    # Check for incoming A2UI actions
    incoming_action = self._extract_incoming_action(context)
    if incoming_action:
        logger.info("Received A2UI action: %s", incoming_action.get("name", "?"))

    task, updater = await self._setup_task(context, event_queue)

    # Handle incoming A2UI action if present
    if incoming_action:
        handled = await self._handle_action(incoming_action, task, updater, context)
        if handled:
            return

    try:
        full_response_text, streaming_started = await self._stream_agent_response(context, task, updater)
    except Exception as e:
        raise ServerError(error=InternalError()) from e

    # input_required was handled inside _stream_agent_response
    if not full_response_text and not streaming_started:
        return

    final_parts = self._build_final_parts(full_response_text, use_a2ui)
    if not final_parts:
        final_parts = [Part(root=TextPart(text=full_response_text))] if full_response_text else []

    # Send final response via status update so genui_a2a connector can process it
    message = Message(role="agent", message_id=str(uuid4()), parts=final_parts)
    await updater.update_status(state=TaskState.completed, message=message, final=True)

cancel async #

cancel(context, event_queue)
Source code in autogen/agents/experimental/a2ui/a2a_executor.py
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
    pass