Skip to content

AutogenAgentExecutor

autogen.a2a.AutogenAgentExecutor #

AutogenAgentExecutor(agent)

Bases: AgentExecutor

An agent executor that bridges Autogen ConversableAgents with A2A protocols.

This class wraps an Autogen ConversableAgent to enable it to be executed within the A2A framework, handling message processing, task management, and event publishing.

Source code in autogen/a2a/agent_executor.py
def __init__(self, agent: ConversableAgent) -> None:
    self.agent = AgentService(agent)

agent instance-attribute #

agent = AgentService(agent)

execute async #

execute(context, event_queue)
Source code in autogen/a2a/agent_executor.py
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
    assert context.message

    task = context.current_task
    if not task:
        task = new_task(context.message)
        task.status.timestamp = datetime.now(timezone.utc).isoformat()
        # publish the task status submitted event
        await event_queue.enqueue_event(task)

    try:
        result = await self.agent(request_message_from_a2a(context.message))

    except Exception as e:
        # publish the task status failed event
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task.id,
                status=TaskStatus(
                    state=TaskState.failed,
                    message=new_agent_text_message(
                        str(e),
                        task_id=task.id,
                        context_id=context.context_id,
                    ),
                    timestamp=datetime.now(timezone.utc).isoformat(),
                ),
                context_id=context.context_id,
                final=True,
            )
        )
        return

    artifact, messages = response_message_to_a2a(result, context.context_id, task.id)

    # publish local chat history events
    for message in messages:
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task.id,
                status=TaskStatus(
                    state=TaskState.working,
                    message=message,
                    timestamp=datetime.now(timezone.utc).isoformat(),
                ),
                context_id=context.context_id,
                final=False,
            )
        )

    # publish the task result event
    await event_queue.enqueue_event(
        TaskArtifactUpdateEvent(
            task_id=task.id,
            last_chunk=True,
            context_id=context.context_id,
            artifact=artifact,
        )
    )

    # publish the task status completed event
    await event_queue.enqueue_event(
        TaskStatusUpdateEvent(
            task_id=task.id,
            status=TaskStatus(
                state=TaskState.completed,
                timestamp=datetime.now(timezone.utc).isoformat(),
            ),
            context_id=context.context_id,
            final=True,
        )
    )

cancel async #

cancel(context, event_queue)
Source code in autogen/a2a/agent_executor.py
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
    pass