Advanced
Topics that aren't part of the day-to-day A2A path: human-in-the-loop, transparent reconnects on streaming drops, plugging in a custom executor, and the error hierarchy.
Human-in-the-Loop via input_required#
When a server-side executor raises requires_input(...) mid-task, the A2A server transitions the task to TASK_STATE_INPUT_REQUIRED and surfaces the prompt to the client. The AG2 client invokes the local agent's hitl_hook with the prompt and continues the same turn with the reply — the server prompt does not leak into the final response text.
A2AConfig.input_required_timeout caps how long the client waits on the hook. None (default) waits indefinitely — which matches the behaviour of ConversationContext.input in regular agents.
See Human in the Loop for the underlying hitl_hook contract.
Streaming Reconnect#
Streaming connections drop. Network blips, load balancer recycles, idle timeouts — the SSE channel breaks, and the client needs to recover without losing partially-streamed artifacts.
The A2A client keeps internal drive state across drops: on A2AClientError mid-stream it issues a fresh subscribe against the same task_id, deduplicates artifacts already seen by artifact_id (and messages by message_id), and continues from where it failed. Application code sees a single uninterrupted reply.
| Field | Default | Purpose |
|---|---|---|
max_reconnects | 3 | Total reconnect attempts before giving up with A2AReconnectError |
reconnect_backoff | 0.5 | Seconds to wait between attempts (constant — no jitter or exponential) |
When attempts are exhausted the client raises A2AReconnectError(attempts=N). Catch it to fall back to a polling re-fetch via get_task if you need recovery beyond the streaming budget.
Custom AgentExecutor#
A2AServer defaults to wrapping the supplied Agent in AG2's standard AgentExecutor. When you need behaviour that doesn't fit Agent.ask — a HITL-first turn, a multi-agent pipeline, a non-standard task lifecycle — drop in your own executor:
The executor owns the entire request lifecycle: parsing the inbound message, emitting status updates and artifacts to the event_queue, and signalling terminal state. The wrapped agent passed to A2AServer(agent_stub, ...) is still used for card metadata (name, description, etc.) — make it a stub if a real agent doesn't fit your design.
Errors#
All A2A errors live in autogen.beta.a2a.errors and inherit from A2AError. Catch A2AError for everything; catch the specifics when you want to react differently.
| Exception | Raised when |
|---|---|
A2AError | Base class — catch this for any A2A failure |
A2AInvalidCardError | The card is missing data required to connect (no supported_interfaces, no usable URL) |
A2AClientToolsNotSupportedError | Client passed tools= but the server card doesn't advertise the urn:ag2:client-tools:v1 extension |
A2AReconnectError | Streaming reconnect attempts exhausted — err.attempts holds the count |
A2ATaskTerminalError | Base for the three terminal-state errors below; carries err.task with the final Task (status, history, artifacts) |
A2ATaskFailedError | Task ended in TASK_STATE_FAILED |
A2ATaskRejectedError | Task ended in TASK_STATE_REJECTED |
A2ATaskAuthRequiredError | Task ended in TASK_STATE_AUTH_REQUIRED. Per A2A spec §7.6 the agent expects credentials out-of-band — apply them and retry |
Catch A2ATaskTerminalError to handle any terminal failure uniformly; switch on the concrete subclass when the recovery path differs (e.g. retry with auth on AuthRequired, surface the error to the operator on Failed / Rejected).
Testing Helpers#
autogen.beta.a2a.testing provides utilities for in-process A2A tests — no real socket, no port binding.
| Helper | Purpose |
|---|---|
make_test_client_factory(server, url=..., timeout=...) | httpx.AsyncClient factory dispatching JSON-RPC into the server's ASGI app via httpx.ASGITransport |
make_test_rest_client_factory(server, url=..., timeout=...) | Same idea for REST — builds the REST app and a card declaring only the REST interface |
pick_free_port(host="127.0.0.1") | Probe a free TCP port. Used by gRPC tests since gRPC has no in-process transport equivalent |