Bases: AgentExecutor
A2A executor that preserves A2UI content as DataParts.
When an agent's response contains A2UI JSON (separated by a delimiter), this executor splits it into: - A TextPart for the conversational text - A DataPart with MIME type application/json+a2ui for the A2UI operations
Also handles A2UI extension negotiation via try_activate_a2ui_extension().
Source code in autogen/agents/experimental/a2ui/a2a_executor.py
| def __init__(
self,
agent: ConversableAgent,
delimiter: str = A2UI_DEFAULT_DELIMITER,
version_string: str = "v0.9",
) -> None:
self._agent = agent
self._agent_service = AgentService(agent)
self._parser = A2UIResponseParser(version_string=version_string, delimiter=delimiter)
|
execute async
execute(context, event_queue)
Source code in autogen/agents/experimental/a2ui/a2a_executor.py
| async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
assert context.message
# Negotiate A2UI extension
use_a2ui = try_activate_a2ui_extension(context)
if use_a2ui:
logger.info("A2UI extension activated for this request.")
# Check for incoming A2UI actions
incoming_action = self._extract_incoming_action(context)
if incoming_action:
logger.info("Received A2UI action: %s", incoming_action.get("name", "?"))
task, updater = await self._setup_task(context, event_queue)
# Handle incoming A2UI action if present
if incoming_action:
handled = await self._handle_action(incoming_action, task, updater, context)
if handled:
return
try:
full_response_text, streaming_started = await self._stream_agent_response(context, task, updater)
except Exception as e:
raise ServerError(error=InternalError()) from e
# input_required was handled inside _stream_agent_response
if not full_response_text and not streaming_started:
return
final_parts = self._build_final_parts(full_response_text, use_a2ui)
if not final_parts:
final_parts = [Part(root=TextPart(text=full_response_text))] if full_response_text else []
# Send final response via status update so genui_a2a connector can process it
message = Message(role="agent", message_id=str(uuid4()), parts=final_parts)
await updater.update_status(state=TaskState.completed, message=message, final=True)
|
cancel async
cancel(context, event_queue)
Source code in autogen/agents/experimental/a2ui/a2a_executor.py
| async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
pass
|