Skip to content

run_variants

autogen.beta.eval.runtime.variants.run_variants async #

run_variants(suite, *, variants, scorers, store_dir, repeats=1, concurrency=4, run_id=None, label=None, stream=None)

Run each variant over suite and return a ranked :class:VariantRunResult.

Variants run sequentially; within each, tasks run up to concurrency in parallel (and repeats times each). Every variant's run is persisted as its own schema-0.1 JSON under store_dir (<run_id>-<variant>.json).

Pass stream to observe the sweep: VariantStarted / VariantCompleted wrap each variant, and that variant's own EvalStarted / TaskEvaluated (tagged with the variant name) / EvalCompleted flow through the same stream — so a single observer sees both the sweep and the per-task detail.

Source code in autogen/beta/eval/runtime/variants.py
async def run_variants(
    suite: Suite | str | os.PathLike[str] | list[dict[str, Any]],
    *,
    variants: Variants,
    scorers: Iterable[Scorer],
    store_dir: str | os.PathLike[str],
    repeats: int = 1,
    concurrency: int = 4,
    run_id: str | None = None,
    label: str | None = None,
    stream: Stream | None = None,
) -> VariantRunResult:
    """Run each variant over ``suite`` and return a ranked :class:`VariantRunResult`.

    Variants run sequentially; within each, tasks run up to ``concurrency`` in
    parallel (and ``repeats`` times each). Every variant's run is persisted as
    its own schema-0.1 JSON under ``store_dir`` (``<run_id>-<variant>.json``).

    Pass ``stream`` to observe the sweep: ``VariantStarted`` / ``VariantCompleted``
    wrap each variant, and that variant's own ``EvalStarted`` / ``TaskEvaluated``
    (tagged with the variant name) / ``EvalCompleted`` flow through the same
    stream — so a single observer sees both the sweep and the per-task detail.
    """
    scorer_list = tuple(scorers)
    base_run_id = run_id if run_id is not None else uuid4().hex
    created_at = datetime.now(timezone.utc).isoformat()
    started = time.perf_counter()

    eval_stream = stream
    eval_ctx = ConversationContext(stream=eval_stream) if eval_stream is not None else None

    results: dict[str, RunResult] = {}
    total = len(variants.builds)
    for index, (name, build) in enumerate(variants.builds.items(), start=1):
        if eval_stream is not None:
            await eval_stream.send(
                VariantStarted(run_id=base_run_id, label=label, variant=name, index=index, total=total),
                eval_ctx,
            )
        results[name] = await run_agent(
            suite,
            agent=build,
            scorers=scorer_list,
            store_dir=store_dir,
            repeats=repeats,
            concurrency=concurrency,
            run_id=f"{base_run_id}-{_slug(name)}",
            label=label,
            stream=eval_stream,
            variant=name,
        )
        if eval_stream is not None:
            await eval_stream.send(
                VariantCompleted(run_id=base_run_id, label=label, variant=name, result=results[name]),
                eval_ctx,
            )

    duration_ms = int((time.perf_counter() - started) * 1000)
    return VariantRunResult(
        run_id=base_run_id,
        axis=variants.axis,
        results=results,
        created_at=created_at,
        duration_ms=duration_ms,
    )