Skip to content

TempoTraceSource

autogen.beta.eval.sources.tempo.TempoTraceSource #

TempoTraceSource(base_url, *, query='{}', lookback_seconds=3600, limit=20, headers=None, task_id_attribute='ag2.eval.task_id', conventions=None, client=None)

A :class:~autogen.beta.eval.TraceSource backed by a Tempo HTTP API.

PARAMETER DESCRIPTION
base_url

Tempo query base, e.g. "http://localhost:3200".

TYPE: str

query

TraceQL passed to /api/search. Defaults to "{}" (all).

TYPE: str DEFAULT: '{}'

lookback_seconds

Search window ending now. Default 1 hour.

TYPE: int DEFAULT: 3600

limit

Max traces returned by list.

TYPE: int DEFAULT: 20

headers

Extra headers (e.g. auth) merged into every request.

TYPE: Mapping[str, str] | None DEFAULT: None

task_id_attribute

Span/search attribute holding the eval task id, if the producer stamped one (else refs carry task_id=None).

TYPE: str DEFAULT: 'ag2.eval.task_id'

client

Optional injected httpx.AsyncClient (for tests); when given, it is used as-is and not closed.

TYPE: AsyncClient | None DEFAULT: None

Source code in autogen/beta/eval/sources/tempo.py
def __init__(
    self,
    base_url: str,
    *,
    query: str = "{}",
    lookback_seconds: int = 3600,
    limit: int = 20,
    headers: Mapping[str, str] | None = None,
    task_id_attribute: str = "ag2.eval.task_id",
    conventions: Sequence[SpanConvention] | None = None,
    client: httpx.AsyncClient | None = None,
) -> None:
    self._base_url = base_url.rstrip("/")
    self._query = query
    self._lookback = lookback_seconds
    self._limit = limit
    self._headers = {"Accept": "application/json", **(headers or {})}
    self._task_id_attribute = task_id_attribute
    self._conventions = conventions
    self._client = client

list async #

list()
Source code in autogen/beta/eval/sources/tempo.py
async def list(self) -> AsyncIterator[TraceRef]:
    now = int(time.time())
    params = {"q": self._query, "limit": self._limit, "start": now - self._lookback, "end": now}
    data = await self._get("/api/search", params=params)
    for summary in data.get("traces", []):
        trace_id = summary.get("traceID")
        if not trace_id:
            continue
        yield TraceRef(
            trace_id=trace_id,
            task_id=_summary_attribute(summary, self._task_id_attribute),
            metadata={"root_service": summary.get("rootServiceName"), "root_span": summary.get("rootTraceName")},
        )

load async #

load(ref)
Source code in autogen/beta/eval/sources/tempo.py
async def load(self, ref: TraceRef) -> Trace:
    doc = await self._get(f"/api/traces/{ref.trace_id}")
    return spans_to_trace(otlp_json_to_spans(doc), conventions=self._conventions)