Instrument an A2A server with OpenTelemetry tracing.
Adds OpenTelemetry middleware to the server to trace incoming requests and instruments the server's agent for full observability.
| PARAMETER | DESCRIPTION |
server | The A2A agent server to instrument. TYPE: A2aAgentServer |
tracer_provider | The OpenTelemetry tracer provider to use for creating spans. TYPE: TracerProvider |
Usage
from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from autogen.opentelemetry import instrument_a2a_server
resource = Resource.create(attributes={"service.name": "my-service"}) tracer_provider = TracerProvider(resource=resource) exporter = OTLPSpanExporter(endpoint="http://127.0.0.1:4317") processor = BatchSpanProcessor(exporter) tracer_provider.add_span_processor(processor) trace.set_tracer_provider(tracer_provider)
server = A2aAgentServer(agent) instrument_a2a_server(server, tracer_provider=tracer_provider)
Source code in autogen/opentelemetry/instrumentators/a2a.py
| @export_module("autogen.opentelemetry")
def instrument_a2a_server(server: A2aAgentServer, *, tracer_provider: TracerProvider) -> A2aAgentServer:
"""Instrument an A2A server with OpenTelemetry tracing.
Adds OpenTelemetry middleware to the server to trace incoming requests and
instruments the server's agent for full observability.
Args:
server: The A2A agent server to instrument.
tracer_provider: The OpenTelemetry tracer provider to use for creating spans.
Returns:
The instrumented server instance.
Usage:
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from autogen.opentelemetry import instrument_a2a_server
resource = Resource.create(attributes={"service.name": "my-service"})
tracer_provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint="http://127.0.0.1:4317")
processor = BatchSpanProcessor(exporter)
tracer_provider.add_span_processor(processor)
trace.set_tracer_provider(tracer_provider)
server = A2aAgentServer(agent)
instrument_a2a_server(server, tracer_provider=tracer_provider)
"""
tracer = get_tracer(tracer_provider)
if getattr(server, "__otel_instrumented__", False):
return server
class OTELMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
if "traceparent" in request.headers:
try:
span_context = TRACE_PROPAGATOR.extract(request.headers)
except Exception:
span_context = None
if span_context is not None:
with tracer.start_as_current_span("a2a-execution", context=span_context):
return await call_next(request)
return await call_next(request)
server.add_middleware(OTELMiddleware)
server.agent = instrument_agent(server.agent, tracer_provider=tracer_provider)
server.__otel_instrumented__ = True
return server
|