Skip to content

instrument_initiate_chat

autogen.opentelemetry.instrumentators.agent_instrumentators.chat.instrument_initiate_chat #

instrument_initiate_chat(agent, *, tracer)
Source code in autogen/opentelemetry/instrumentators/agent_instrumentators/chat.py
def instrument_initiate_chat(agent: Agent, *, tracer: Tracer) -> Agent:
    # Instrument `a_initiate_chat` as a conversation span
    if hasattr(agent, "a_initiate_chat") and not hasattr(agent.a_initiate_chat, "__otel_wrapped__"):
        old_a_initiate_chat = agent.a_initiate_chat

        async def a_initiate_traced_chat(
            *args: Any,
            max_turns: int | None = None,
            message: str | dict[str, Any] | None = None,
            **kwargs: Any,
        ) -> Any:
            with tracer.start_as_current_span(f"conversation {agent.name}") as span:
                # Set AG2 span type and OTEL GenAI semantic convention attributes
                span.set_attribute("ag2.span.type", SpanType.CONVERSATION.value)
                span.set_attribute("gen_ai.operation.name", "conversation")
                span.set_attribute("gen_ai.agent.name", agent.name)

                # Set provider and model from recipient's LLM config (first positional arg)
                if args:
                    recipient = args[0]
                    provider = get_provider_name(recipient)
                    if provider:
                        span.set_attribute("gen_ai.provider.name", provider)
                    model = get_model_name(recipient)
                    if model:
                        span.set_attribute("gen_ai.request.model", model)

                if max_turns:
                    span.set_attribute("gen_ai.conversation.max_turns", max_turns)

                # Capture input message
                if message is not None:
                    if isinstance(message, str):
                        input_msg = {"role": "user", "content": message}
                    elif isinstance(message, dict):
                        input_msg = {"role": message.get("role", "user"), **message}
                    else:
                        input_msg = None

                    if input_msg:
                        otel_input = messages_to_otel([input_msg])
                        span.set_attribute("gen_ai.input.messages", json.dumps(otel_input))

                result = await old_a_initiate_chat(*args, max_turns=max_turns, message=message, **kwargs)

                span.set_attribute("gen_ai.conversation.id", str(result.chat_id))
                span.set_attribute("gen_ai.conversation.turns", len(result.chat_history))

                # Capture output messages (full chat history)
                if result.chat_history:
                    otel_output = messages_to_otel(result.chat_history)
                    span.set_attribute("gen_ai.output.messages", json.dumps(otel_output))

                usage_including_cached_inference = result.cost["usage_including_cached_inference"]
                total_cost = usage_including_cached_inference.pop("total_cost")
                span.set_attribute("gen_ai.usage.cost", total_cost)

                usage = aggregate_usage(usage_including_cached_inference)
                if usage:
                    model, input_tokens, output_tokens = usage
                    span.set_attribute("gen_ai.response.model", model)
                    span.set_attribute("gen_ai.usage.input_tokens", input_tokens)
                    span.set_attribute("gen_ai.usage.output_tokens", output_tokens)

                return result

        a_initiate_traced_chat.__otel_wrapped__ = True
        agent.a_initiate_chat = a_initiate_traced_chat

    # Instrument `initiate_chat` (sync) as a conversation span
    if hasattr(agent, "initiate_chat") and not hasattr(agent.initiate_chat, "__otel_wrapped__"):
        old_initiate_chat = agent.initiate_chat

        def initiate_traced_chat(
            *args: Any,
            max_turns: int | None = None,
            message: str | dict[str, Any] | None = None,
            **kwargs: Any,
        ) -> Any:
            with tracer.start_as_current_span(f"conversation {agent.name}") as span:
                span.set_attribute("ag2.span.type", SpanType.CONVERSATION.value)
                span.set_attribute("gen_ai.operation.name", "conversation")
                span.set_attribute("gen_ai.agent.name", agent.name)

                # Set provider and model from recipient's LLM config (first positional arg)
                if args:
                    recipient = args[0]
                    provider = get_provider_name(recipient)
                    if provider:
                        span.set_attribute("gen_ai.provider.name", provider)
                    model = get_model_name(recipient)
                    if model:
                        span.set_attribute("gen_ai.request.model", model)

                if max_turns:
                    span.set_attribute("gen_ai.conversation.max_turns", max_turns)

                if message is not None:
                    if isinstance(message, str):
                        input_msg = {"role": "user", "content": message}
                    elif isinstance(message, dict):
                        input_msg = {"role": message.get("role", "user"), **message}
                    else:
                        input_msg = None

                    if input_msg:
                        otel_input = messages_to_otel([input_msg])
                        span.set_attribute("gen_ai.input.messages", json.dumps(otel_input))

                result = old_initiate_chat(*args, max_turns=max_turns, message=message, **kwargs)

                span.set_attribute("gen_ai.conversation.id", str(result.chat_id))
                span.set_attribute("gen_ai.conversation.turns", len(result.chat_history))

                if result.chat_history:
                    otel_output = messages_to_otel(result.chat_history)
                    span.set_attribute("gen_ai.output.messages", json.dumps(otel_output))

                usage_including_cached_inference = result.cost["usage_including_cached_inference"]
                total_cost = usage_including_cached_inference.pop("total_cost")
                span.set_attribute("gen_ai.usage.cost", total_cost)

                usage = aggregate_usage(usage_including_cached_inference)
                if usage:
                    model, input_tokens, output_tokens = usage
                    span.set_attribute("gen_ai.response.model", model)
                    span.set_attribute("gen_ai.usage.input_tokens", input_tokens)
                    span.set_attribute("gen_ai.usage.output_tokens", output_tokens)

                return result

        initiate_traced_chat.__otel_wrapped__ = True
        agent.initiate_chat = initiate_traced_chat

    return agent