Skip to content

instrument_pattern

autogen.opentelemetry.instrument_pattern #

instrument_pattern(pattern, *, tracer_provider)

Instrument a Pattern with OpenTelemetry tracing.

Instruments the pattern's prepare_group_chat method to automatically instrument all agents and group chats created by the pattern.

PARAMETER DESCRIPTION
pattern

The pattern instance to instrument.

TYPE: Pattern

tracer_provider

The OpenTelemetry tracer provider to use for creating spans.

TYPE: TracerProvider

RETURNS DESCRIPTION
Pattern

The instrumented pattern instance (same object, modified in place).

Usage

from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from autogen.opentelemetry import instrument_pattern

resource = Resource.create(attributes={"service.name": "my-service"}) tracer_provider = TracerProvider(resource=resource) exporter = OTLPSpanExporter(endpoint="http://127.0.0.1:4317") processor = BatchSpanProcessor(exporter) tracer_provider.add_span_processor(processor) trace.set_tracer_provider(tracer_provider)

pattern = SomePattern() instrument_pattern(pattern, tracer_provider=tracer_provider)

Source code in autogen/opentelemetry/instrumentators/pattern.py
@export_module("autogen.opentelemetry")
def instrument_pattern(pattern: Pattern, *, tracer_provider: TracerProvider) -> Pattern:
    """Instrument a Pattern with OpenTelemetry tracing.

    Instruments the pattern's prepare_group_chat method to automatically
    instrument all agents and group chats created by the pattern.

    Args:
        pattern: The pattern instance to instrument.
        tracer_provider: The OpenTelemetry tracer provider to use for creating spans.

    Returns:
        The instrumented pattern instance (same object, modified in place).

    Usage:
        from opentelemetry import trace
        from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
        from opentelemetry.sdk.resources import Resource
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import BatchSpanProcessor
        from autogen.opentelemetry import instrument_pattern

        resource = Resource.create(attributes={"service.name": "my-service"})
        tracer_provider = TracerProvider(resource=resource)
        exporter = OTLPSpanExporter(endpoint="http://127.0.0.1:4317")
        processor = BatchSpanProcessor(exporter)
        tracer_provider.add_span_processor(processor)
        trace.set_tracer_provider(tracer_provider)

        pattern = SomePattern()
        instrument_pattern(pattern, tracer_provider=tracer_provider)
    """
    old_prepare_group_chat = pattern.prepare_group_chat
    if hasattr(old_prepare_group_chat, "__otel_wrapped__"):
        return pattern

    def prepare_group_chat_traced(
        max_rounds: int,
        *args: Any,
        **kwargs: Any,
    ) -> tuple[
        list["ConversableAgent"],
        list["ConversableAgent"],
        Optional["ConversableAgent"],
        ContextVariables,
        "ConversableAgent",
        TransitionTarget,
        "GroupToolExecutor",
        "GroupChat",
        "GroupChatManager",
        list[dict[str, Any]],
        "ConversableAgent",
        list[str],
        list["Agent"],
    ]:
        (
            agents,
            wrapped_agents,
            user_agent,
            context_variables,
            initial_agent,
            group_after_work,
            tool_executor,
            groupchat,
            manager,
            processed_messages,
            last_agent,
            group_agent_names,
            temp_user_list,
        ) = old_prepare_group_chat(max_rounds, *args, **kwargs)

        groupchat.agents = [instrument_agent(agent, tracer_provider=tracer_provider) for agent in groupchat.agents]

        manager = instrument_agent(manager, tracer_provider=tracer_provider)
        groupchat = instrument_groupchat(groupchat, tracer_provider=tracer_provider)

        # IMPORTANT: register_reply() in GroupChatManager.__init__ creates a shallow copy of groupchat
        # (via copy.copy). We need to also instrument that copy which is stored in manager._reply_func_list
        # so that we can trace the "auto" speaker selection internal chats.
        for reply_func_entry in manager._reply_func_list:
            config = reply_func_entry.get("config")
            if isinstance(config, GroupChat) and config is not groupchat:
                groupchat = instrument_groupchat(config, tracer_provider=tracer_provider)

        return (
            agents,
            wrapped_agents,
            user_agent,
            context_variables,
            initial_agent,
            group_after_work,
            tool_executor,
            groupchat,
            manager,
            processed_messages,
            last_agent,
            group_agent_names,
            temp_user_list,
        )

    prepare_group_chat_traced.__otel_wrapped__ = True
    pattern.prepare_group_chat = prepare_group_chat_traced

    return pattern