Skip to content

AutogenAgentExecutor

autogen.a2a.AutogenAgentExecutor #

AutogenAgentExecutor(agent)

Bases: AgentExecutor

An agent executor that bridges Autogen ConversableAgents with A2A protocols.

This class wraps an Autogen ConversableAgent to enable it to be executed within the A2A framework, handling message processing, task management, and event publishing.

Source code in autogen/a2a/agent_executor.py
def __init__(self, agent: ConversableAgent) -> None:
    self.agent = AgentService(agent)

agent instance-attribute #

agent = AgentService(agent)

execute async #

execute(context, event_queue)
Source code in autogen/a2a/agent_executor.py
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
    assert context.message
    # The 1.0 SDK gives us protobuf objects on the context; the rest of this
    # module operates on v0.3-pydantic, so translate at the entry boundary.
    request = _v03_conversions.to_compat_message(context.message)

    if context.current_task is None:
        # build task object manually to allow empty messages
        task = Task(
            status=TaskStatus(
                state=TaskState.submitted,
                timestamp=datetime.now(timezone.utc).isoformat(),
            ),
            id=request.task_id or str(uuid4()),
            context_id=request.context_id or str(uuid4()),
            history=[request],
        )
        # publish the task status submitted event (proto on the wire)
        await event_queue.enqueue_event(_v03_conversions.to_core_task(task))
    else:
        task = _v03_conversions.to_compat_task(context.current_task)

    updater = TaskUpdater(event_queue, task.id, task.context_id)
    await updater.start_work()

    artifact = make_artifact(message=None)

    streaming_started = False
    try:
        async for response in self.agent(request_message_from_a2a(request)):
            if response.input_required:
                await updater.requires_input(
                    message=to_core_message(
                        make_input_required_message(
                            context_id=task.context_id,
                            task_id=task.id,
                            text=response.input_required,
                            context=response.context,
                        )
                    ),
                )
                return

            if response.streaming_text:
                artifact = copy_artifact(
                    artifact=artifact,
                    message={"content": response.streaming_text},
                    context=response.context,
                )

                await updater.add_artifact(
                    parts=to_core_parts(artifact.parts),
                    artifact_id=artifact.artifact_id,
                    name=artifact.name,
                    append=streaming_started,
                    last_chunk=False,
                )

                streaming_started = True

            elif response.message:
                artifact = copy_artifact(
                    artifact=artifact,
                    message=response.message,
                    context=response.context,
                )

    except Exception as e:
        raise InternalError(repr(e)) from e

    await updater.add_artifact(
        artifact_id=artifact.artifact_id,
        name=artifact.name,
        parts=to_core_parts(artifact.parts),
        metadata=artifact.metadata,
        extensions=artifact.extensions,
        append=streaming_started,
        last_chunk=True,
    )

    await updater.complete()

cancel async #

cancel(context, event_queue)
Source code in autogen/a2a/agent_executor.py
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
    pass