Skip to content

instrument_generate_oai_reply

autogen.opentelemetry.instrumentators.agent_instrumentators.reply.instrument_generate_oai_reply #

instrument_generate_oai_reply(agent, *, tracer)
Source code in autogen/opentelemetry/instrumentators/agent_instrumentators/reply.py
def instrument_generate_oai_reply(agent: Agent, *, tracer: Tracer) -> Agent:
    # Instrument `a_generate_oai_reply` to propagate context to executor thread
    # Critical because a_generate_oai_reply uses run_in_executor which
    # creates a new thread that doesn't inherit OpenTelemetry context so
    # will create new traces instead of being a child span.
    if hasattr(agent, "a_generate_oai_reply") and not hasattr(agent.a_generate_oai_reply, "__otel_wrapped__"):

        async def a_generate_oai_reply_with_context(
            messages: list[dict[str, Any]] | None = None,
            sender: Agent | None = None,
            config: Any | None = None,
            **kwargs: Any,
        ) -> tuple[bool, str | dict[str, Any] | None]:
            # Capture current OpenTelemetry context BEFORE run_in_executor
            current_context = otel_context.get_current()

            iostream = IOStream.get_default()

            def _generate_oai_reply_with_context(
                self_agent: Any,
                captured_context: Context,
                iostream: IOStream,
                *args: Any,
                **kw: Any,
            ) -> tuple[bool, str | dict[str, Any] | None]:
                # Attach the captured context in this thread
                token = otel_context.attach(captured_context)
                try:
                    with IOStream.set_default(iostream):
                        return self_agent.generate_oai_reply(*args, **kw)
                finally:
                    otel_context.detach(token)

            return await asyncio.get_event_loop().run_in_executor(
                None,
                functools.partial(
                    _generate_oai_reply_with_context,
                    self_agent=agent,
                    captured_context=current_context,
                    iostream=iostream,
                    messages=messages,
                    sender=sender,
                    config=config,
                    **kwargs,
                ),
            )

        a_generate_oai_reply_with_context.__otel_wrapped__ = True
        agent.a_generate_oai_reply = a_generate_oai_reply_with_context

        # Also update the reply function in _reply_func_list
        for i, reply_func_entry in enumerate(agent._reply_func_list):
            func = reply_func_entry.get("reply_func")
            if getattr(func, "__name__", None) == "a_generate_oai_reply":
                # Create a wrapper that matches the expected signature (self, messages, sender, config)
                async def a_generate_oai_reply_func_with_context(
                    self_agent: Any,
                    messages: list[dict[str, Any]] | None = None,
                    sender: Agent | None = None,
                    config: Any | None = None,
                ) -> tuple[bool, str | dict[str, Any] | None]:
                    return await self_agent.a_generate_oai_reply(messages, sender, config)

                agent._reply_func_list[i]["reply_func"] = a_generate_oai_reply_func_with_context
                break

    return agent