Skip to content

instrument_generate_reply

autogen.opentelemetry.instrumentators.agent_instrumentators.reply.instrument_generate_reply #

instrument_generate_reply(agent, *, tracer)
Source code in autogen/opentelemetry/instrumentators/agent_instrumentators/reply.py
def instrument_generate_reply(agent: Agent, *, tracer: Tracer) -> Agent:
    # Instrument `a_generate_reply` as an invoke_agent span
    if not hasattr(agent.a_generate_reply, "__otel_wrapped__"):
        old_a_generate_reply = agent.a_generate_reply

        async def a_generate_traced_reply(
            messages: list[dict[str, Any]] | None = None,
            *args: Any,
            **kwargs: Any,
        ) -> Any:
            with tracer.start_as_current_span(f"invoke_agent {agent.name}") as span:
                span.set_attribute("ag2.span.type", SpanType.AGENT.value)
                span.set_attribute("gen_ai.operation.name", "invoke_agent")
                span.set_attribute("gen_ai.agent.name", agent.name)

                # Set provider and model from agent's LLM config
                provider = get_provider_name(agent)
                if provider:
                    span.set_attribute("gen_ai.provider.name", provider)
                model = get_model_name(agent)
                if model:
                    span.set_attribute("gen_ai.request.model", model)

                # Capture input messages
                if messages:
                    otel_input = messages_to_otel(messages)
                    span.set_attribute("gen_ai.input.messages", json.dumps(otel_input))

                reply = await old_a_generate_reply(messages, *args, **kwargs)

                # Capture output message
                if reply is not None:
                    otel_output = reply_to_otel_message(reply)
                    span.set_attribute("gen_ai.output.messages", json.dumps(otel_output))

                return reply

        a_generate_traced_reply.__otel_wrapped__ = True
        agent.a_generate_reply = a_generate_traced_reply

    # Instrument `generate_reply` (sync) as an invoke_agent span
    if hasattr(agent, "generate_reply") and not hasattr(agent.generate_reply, "__otel_wrapped__"):
        old_generate_reply = agent.generate_reply

        def generate_traced_reply(
            messages: list[dict[str, Any]] | None = None,
            *args: Any,
            **kwargs: Any,
        ) -> Any:
            with tracer.start_as_current_span(f"invoke_agent {agent.name}") as span:
                span.set_attribute("ag2.span.type", SpanType.AGENT.value)
                span.set_attribute("gen_ai.operation.name", "invoke_agent")
                span.set_attribute("gen_ai.agent.name", agent.name)

                # Set provider and model from agent's LLM config
                provider = get_provider_name(agent)
                if provider:
                    span.set_attribute("gen_ai.provider.name", provider)
                model = get_model_name(agent)
                if model:
                    span.set_attribute("gen_ai.request.model", model)

                if messages:
                    otel_input = messages_to_otel(messages)
                    span.set_attribute("gen_ai.input.messages", json.dumps(otel_input))

                reply = old_generate_reply(messages, *args, **kwargs)

                if reply is not None:
                    otel_output = reply_to_otel_message(reply)
                    span.set_attribute("gen_ai.output.messages", json.dumps(otel_output))

                return reply

        generate_traced_reply.__otel_wrapped__ = True
        agent.generate_reply = generate_traced_reply

    return agent