def instrument_generate_oai_reply(agent: Agent, *, tracer: Tracer) -> Agent:
# Instrument `a_generate_oai_reply` to propagate context to executor thread
# Critical because a_generate_oai_reply uses run_in_executor which
# creates a new thread that doesn't inherit OpenTelemetry context so
# will create new traces instead of being a child span.
if hasattr(agent, "a_generate_oai_reply") and not hasattr(agent.a_generate_oai_reply, "__otel_wrapped__"):
async def a_generate_oai_reply_with_context(
messages: list[dict[str, Any]] | None = None,
sender: Agent | None = None,
config: Any | None = None,
**kwargs: Any,
) -> tuple[bool, str | dict[str, Any] | None]:
# Capture current OpenTelemetry context BEFORE run_in_executor
current_context = otel_context.get_current()
iostream = IOStream.get_default()
def _generate_oai_reply_with_context(
self_agent: Any,
captured_context: Context,
iostream: IOStream,
*args: Any,
**kw: Any,
) -> tuple[bool, str | dict[str, Any] | None]:
# Attach the captured context in this thread
token = otel_context.attach(captured_context)
try:
with IOStream.set_default(iostream):
return self_agent.generate_oai_reply(*args, **kw)
finally:
otel_context.detach(token)
return await asyncio.get_event_loop().run_in_executor(
None,
functools.partial(
_generate_oai_reply_with_context,
self_agent=agent,
captured_context=current_context,
iostream=iostream,
messages=messages,
sender=sender,
config=config,
**kwargs,
),
)
a_generate_oai_reply_with_context.__otel_wrapped__ = True
agent.a_generate_oai_reply = a_generate_oai_reply_with_context
# Also update the reply function in _reply_func_list
for i, reply_func_entry in enumerate(agent._reply_func_list):
func = reply_func_entry.get("reply_func")
if getattr(func, "__name__", None) == "a_generate_oai_reply":
# Create a wrapper that matches the expected signature (self, messages, sender, config)
async def a_generate_oai_reply_func_with_context(
self_agent: Any,
messages: list[dict[str, Any]] | None = None,
sender: Agent | None = None,
config: Any | None = None,
) -> tuple[bool, str | dict[str, Any] | None]:
return await self_agent.a_generate_oai_reply(messages, sender, config)
agent._reply_func_list[i]["reply_func"] = a_generate_oai_reply_func_with_context
break
return agent