async def run_agent(
suite: Suite | str | os.PathLike[str] | list[dict[str, Any]],
*,
agent: Agent | Callable[..., Agent],
scorers: Iterable[Scorer],
store_dir: str | os.PathLike[str],
model_config: ModelConfig | dict[str, ModelConfig] | None = None,
repeats: int = 1,
budgets: BudgetThresholds | None = None,
concurrency: int = 4,
run_id: str | None = None,
label: str | None = None,
stream: Stream | None = None,
variant: str | None = None,
span_attributes: dict[str, str] | None = None,
span_processors: "Sequence[SpanProcessor] | None" = None,
) -> RunResult:
"""Run an evaluation suite end-to-end.
Each task gets a fresh :class:`~autogen.beta.Agent` built
from ``agent`` (an instance, reused, or a factory), run with a
:class:`~autogen.beta.middleware.builtin.telemetry.TelemetryMiddleware` exporting to an
in-memory span exporter; the :class:`~autogen.beta.eval.Trace` is then reconstructed
from those spans (per-task duration timed around the ``ask``) — the same span→Trace
path the trace-based sources use. Those traces are graded through the same core
:func:`~autogen.beta.eval.evaluate_traces` uses, and the run is persisted as
``<store_dir>/<run_id>.json``.
Args:
suite: A :class:`Suite`, a JSONL path, or an inline list of dict
task records. Strings / paths are loaded via
:meth:`Suite.from_jsonl`; lists are loaded via
:meth:`Suite.from_list`.
agent: The agent to evaluate — either an :class:`~autogen.beta.Agent`
*instance*, reused for every task, or a *factory* callable that
builds a fresh :class:`~autogen.beta.Agent` per task. A factory may
take a keyword-only ``config`` parameter so the runner can inject
per-task or global model configs (use a factory, not an instance,
when you want per-task ``model_config``).
scorers: Scorer instances (typically produced by ``@scorer``).
Each is called once per task; the resulting feedback is
recorded on the task's :class:`TaskResult`.
store_dir: Directory under which the run JSON is persisted as
``<store_dir>/<run_id>.json``. Required — evals are
comparison artifacts; a run that isn't persisted has no
shelf life. Use ``tmp_path`` in tests, a repo directory
for CI, or any path that fits your retention story.
model_config: ``None`` to let the factory pick (its default),
a single ``ModelConfig`` to use everywhere, or a
``dict[task_id, ModelConfig]`` for per-task configs (e.g.
one ``TestConfig`` cassette per task).
repeats: Run each task this many times (default ``1``) — for
measuring consistency. ``pass_rate`` / ``score_stats`` pool
all runs; with ``repeats > 1`` each run gets a distinct
``task_id`` suffix (``"<id>#1"``, ``"<id>#2"``, …).
budgets: Optional :class:`BudgetThresholds`. Violations are
recorded on each task's ``budget_violation`` flag but never
abort the run.
concurrency: Maximum number of tasks executed in parallel.
Clamped to ``>= 1``.
run_id: Override for the auto-generated UUID4 run id (unique per run).
label: Optional user-defined identifier recorded on the run. Unlike
``run_id`` (unique per run), a ``label`` is meant to be *shared*
across runs of the same eval, so a sequence of runs can be grouped
and trended over time. ``None`` if unset; the framework never fills it.
stream: Optional :class:`~autogen.beta.stream.Stream` to publish eval
lifecycle events to (``EvalStarted`` / ``TaskEvaluated`` /
``EvalCompleted``) — observe a run like you observe an agent.
Subscribe your own observer to render progress / a live view.
variant: Tags this run's ``TaskEvaluated`` events with a variant name.
Set by :func:`~autogen.beta.eval.run_variants` for each variant in a
sweep; leave ``None`` for a standalone run.
span_attributes: Extra attributes stamped on **every** span the agent
emits during production (passed to ``TelemetryMiddleware``). The run
is auto-seeded with ``ag2.eval.run_id`` and — when set —
``ag2.eval.variant`` / ``ag2.eval.label``; each task additionally
gets ``ag2.eval.task_id``. Caller-supplied keys win on conflict, so
you can scope spans for an external backend (e.g.
``{"ag2.org.id": org_id}``).
span_processors: Optional OpenTelemetry ``SpanProcessor`` s attached to
each task's tracer provider **in addition to** the internal
in-memory exporter that grading reads. Use this to export the same
spans to your own backend — e.g.
``[BatchSpanProcessor(OTLPSpanExporter(...))]``. Export is purely
additive: the in-memory processor (the grading source) is never
replaced, so grading output is identical with or without it.
Returns:
A :class:`RunResult` containing per-task results and metadata.
The result has already been written to disk by the time this
function returns.
"""
resolved_suite = _resolve_suite(suite)
scorer_list = tuple(scorers)
factory, accepts_config, target_path = _normalize_target(agent)
tasks_to_run = _expand_repeats(resolved_suite, repeats)
suite_to_grade = Suite(tuple(tasks_to_run), name=resolved_suite.name, source=resolved_suite.source)
# Resolve run_id up front so it can be stamped on the produced spans; caller keys win.
resolved_run_id = run_id if run_id is not None else uuid4().hex
run_span_attributes = {
"ag2.eval.run_id": resolved_run_id,
**({"ag2.eval.variant": variant} if variant else {}),
**({"ag2.eval.label": label} if label else {}),
**(span_attributes or {}),
}
started = time.perf_counter()
source = await _produce(
tasks_to_run,
factory,
accepts_config=accepts_config,
model_config=model_config,
concurrency=concurrency,
span_attributes=run_span_attributes,
span_processors=span_processors,
)
return await _grade(
source,
scorers=scorer_list,
suite=suite_to_grade,
store_dir=store_dir,
budgets=budgets,
concurrency=concurrency,
run_id=resolved_run_id,
label=label,
stream=stream,
variant=variant,
target_path=target_path,
started_at=started,
)