Skip to content

Advanced

Topics that aren't part of the day-to-day A2A path: human-in-the-loop, transparent reconnects on streaming drops, plugging in a custom executor, and the error hierarchy.

Human-in-the-Loop via input_required#

When a server-side executor raises requires_input(...) mid-task, the A2A server transitions the task to TASK_STATE_INPUT_REQUIRED and surfaces the prompt to the client. The AG2 client invokes the local agent's hitl_hook with the prompt and continues the same turn with the reply — the server prompt does not leak into the final response text.

from autogen.beta import Agent
from autogen.beta.a2a import A2AConfig

async def hitl_hook() -> str:
    return input("server asks input> ")

remote = Agent(
    "remote",
    config=A2AConfig(card_url="http://127.0.0.1:8000"),
    hitl_hook=hitl_hook,
)
reply = await remote.ask("start")  # server may request input multiple times
print(reply.response.content)

A2AConfig.input_required_timeout caps how long the client waits on the hook. None (default) waits indefinitely — which matches the behaviour of ConversationContext.input in regular agents.

See Human in the Loop for the underlying hitl_hook contract.

Streaming Reconnect#

Streaming connections drop. Network blips, load balancer recycles, idle timeouts — the SSE channel breaks, and the client needs to recover without losing partially-streamed artifacts.

The A2A client keeps internal drive state across drops: on A2AClientError mid-stream it issues a fresh subscribe against the same task_id, deduplicates artifacts already seen by artifact_id (and messages by message_id), and continues from where it failed. Application code sees a single uninterrupted reply.

remote = Agent(
    "remote",
    config=A2AConfig(
        card_url="http://127.0.0.1:8000",
        streaming=True,
        max_reconnects=3,
        reconnect_backoff=0.5,
    ),
)
reply = await remote.ask("a long answer")
Field Default Purpose
max_reconnects 3 Total reconnect attempts before giving up with A2AReconnectError
reconnect_backoff 0.5 Seconds to wait between attempts (constant — no jitter or exponential)

When attempts are exhausted the client raises A2AReconnectError(attempts=N). Catch it to fall back to a polling re-fetch via get_task if you need recovery beyond the streaming budget.

Custom AgentExecutor#

A2AServer defaults to wrapping the supplied Agent in AG2's standard AgentExecutor. When you need behaviour that doesn't fit Agent.ask — a HITL-first turn, a multi-agent pipeline, a non-standard task lifecycle — drop in your own executor:

from a2a.server.agent_execution import AgentExecutor as A2AAgentExecutorBase

from autogen.beta.a2a import A2AServer

class MyExecutor(A2AAgentExecutorBase):
    async def execute(self, request_context, event_queue) -> None:
        ...

    async def cancel(self, request_context, event_queue) -> None:
        ...

server = A2AServer(agent_stub, executor=MyExecutor())

The executor owns the entire request lifecycle: parsing the inbound message, emitting status updates and artifacts to the event_queue, and signalling terminal state. The wrapped agent passed to A2AServer(agent_stub, ...) is still used for card metadata (name, description, etc.) — make it a stub if a real agent doesn't fit your design.

Errors#

All A2A errors live in autogen.beta.a2a.errors and inherit from A2AError. Catch A2AError for everything; catch the specifics when you want to react differently.

Exception Raised when
A2AError Base class — catch this for any A2A failure
A2AInvalidCardError The card is missing data required to connect (no supported_interfaces, no usable URL)
A2AClientToolsNotSupportedError Client passed tools= but the server card doesn't advertise the urn:ag2:client-tools:v1 extension
A2AReconnectError Streaming reconnect attempts exhausted — err.attempts holds the count
A2ATaskTerminalError Base for the three terminal-state errors below; carries err.task with the final Task (status, history, artifacts)
A2ATaskFailedError Task ended in TASK_STATE_FAILED
A2ATaskRejectedError Task ended in TASK_STATE_REJECTED
A2ATaskAuthRequiredError Task ended in TASK_STATE_AUTH_REQUIRED. Per A2A spec §7.6 the agent expects credentials out-of-band — apply them and retry

Catch A2ATaskTerminalError to handle any terminal failure uniformly; switch on the concrete subclass when the recovery path differs (e.g. retry with auth on AuthRequired, surface the error to the operator on Failed / Rejected).

Testing Helpers#

autogen.beta.a2a.testing provides utilities for in-process A2A tests — no real socket, no port binding.

1
2
3
4
5
6
7
8
from autogen.beta import Agent
from autogen.beta.a2a import A2AConfig, A2AServer
from autogen.beta.a2a.testing import make_test_client_factory

server = A2AServer(agent)
factory = make_test_client_factory(server, url="http://test")
remote = Agent("remote", config=A2AConfig(card_url="http://test", httpx_client_factory=factory))
await remote.ask("ping")
Helper Purpose
make_test_client_factory(server, url=..., timeout=...) httpx.AsyncClient factory dispatching JSON-RPC into the server's ASGI app via httpx.ASGITransport
make_test_rest_client_factory(server, url=..., timeout=...) Same idea for REST — builds the REST app and a card declaring only the REST interface
pick_free_port(host="127.0.0.1") Probe a free TCP port. Used by gRPC tests since gRPC has no in-process transport equivalent
1
2
3
4
from autogen.beta.a2a.testing import pick_free_port

port = pick_free_port()
grpc_server = server.build_grpc(bind=f"127.0.0.1:{port}", grpc_url=f"grpc://127.0.0.1:{port}")