Skip to content

DirectoryTraceSource

autogen.beta.eval.sources.trace_source.DirectoryTraceSource #

DirectoryTraceSource(path, *, conventions=None)

A :class:TraceSource over a directory of one-trace-per-file JSON spans.

Each <trace_id>.json holds {"trace_id", "task_id"?, "metadata"?, "spans": [...]} where each span is a :func:span_data_to_dict object. Write the files with :func:save_trace. Format is provisional.

Source code in autogen/beta/eval/sources/trace_source.py
def __init__(self, path: str | os.PathLike[str], *, conventions: Sequence[SpanConvention] | None = None) -> None:
    self._path = Path(path)
    self._conventions = conventions

list async #

list()
Source code in autogen/beta/eval/sources/trace_source.py
async def list(self) -> AsyncIterator[TraceRef]:
    for file in sorted(self._path.glob("*.json")):
        doc = json.loads(file.read_text(encoding="utf-8"))
        yield TraceRef(
            trace_id=doc.get("trace_id", file.stem),
            task_id=doc.get("task_id"),
            metadata=dict(doc.get("metadata", {})),
        )

load async #

load(ref)
Source code in autogen/beta/eval/sources/trace_source.py
async def load(self, ref: TraceRef) -> Trace:
    doc = json.loads((self._path / f"{ref.trace_id}.json").read_text(encoding="utf-8"))
    spans = [span_data_from_dict(s) for s in doc.get("spans", [])]
    return spans_to_trace(spans, conventions=self._conventions)