Skip to content

Expectations & Audit

The hub enforces governance through three layered concepts:

  1. Evaluators — pure functions over session state that return zero or more Violation records when their thresholds are breached.
  2. Violation handlers — what to do when a violation fires: log to the audit trail, notify the session, or auto-close.
  3. The audit log — append-only record of every governance-relevant event the hub processes.

Evaluators#

Three evaluators ship today, addressed by name in adapter manifests:

Name Class Threshold
"acks_within" AcksWithinEvaluator All invitees must ack within params["seconds"] of session creation.
"reply_within" ReplyWithinEvaluator The respondent must reply within params["seconds"] of the initiator's first send (consulting only).
"max_silence" MaxSilenceEvaluator No participant may go silent for longer than params["seconds"].
"turn_within" (composes from the above) The next speaker must speak within params["seconds"] of being scheduled.

Each evaluator implements:

class ExpectationEvaluator(Protocol):
    name: ClassVar[str]
    def evaluate(self, ctx: ExpectationContext) -> list[Violation]: ...

ExpectationContext is a small dataclass holding the metadata, WAL slice, current time, and the expectation's params. Evaluators are pure — no I/O, no mutation — so they're trivially testable.

The default registry exposes them as default_evaluators(). Custom evaluators register similarly to custom transition targets.

Adapter-Declared Expectations#

Each adapter's manifest declares its defaults (see Adapters Overview for the table). Examples:

# ConsultingAdapter
expectations = [
    Expectation(name="acks_within",  on_violation="auto_close", params={"seconds": 30}),
    Expectation(name="reply_within", on_violation="auto_close", params={"seconds": 600}),
]

# ConversationAdapter
expectations = [
    Expectation(name="max_silence", on_violation="audit", params={"seconds": 3600}),
]

Expectation.on_violation selects the handler:

on_violation Handler Effect
"audit" AuditHandler Write to the audit log only. Session continues.
"warn" NotifySessionHandler Post EV_EXPECTATION_VIOLATED on the session WAL.
"auto_close" AutoCloseHandler Close the session with reason="expectation_violated:<name>"; record to audit.
"hide" (custom) Hide later turns from the offending participant; not yet implemented as a built-in.

The default registry exposes them as default_handlers().

The Sweeper Loop#

When the hub is open, an expectation sweeper task wakes every expectation_sweep_interval (default 10 s), walks every active session, runs each expectation's evaluator, and dispatches any violations to the configured handler.

For deterministic tests / examples:

from autogen.beta.network import Hub
from autogen.beta.knowledge import MemoryKnowledgeStore

hub = await Hub.open(
    MemoryKnowledgeStore(),
    expectation_sweep_interval=0,  # disable background loop
)

# Manually advance state and tick:
clock.advance(45)                       # mock-clock pattern
await hub._expectation_tick()           # operator API

hub._expectation_tick() is a public-by-convention test entry point — a leading underscore, but exercised explicitly by the test suite.

Audit Log#

hub._audit_log is an AuditLog instance. It's append-only with a simple read API:

1
2
3
records = await hub._audit_log.read_all()
for r in records:
    print(r["kind"], r["at"], r)

Each record is a plain dict with at minimum kind and at; kind-specific fields appear alongside.

Audit kinds#

Re-exported as constants from autogen.beta.network:

Constant Notes
AUDIT_KIND_AGENT_REGISTERED Records agent_id, name, owner.
AUDIT_KIND_AGENT_UNREGISTERED Records agent_id.
AUDIT_KIND_RESUME_SET Records the source: RESUME_SOURCE_TENANT or RESUME_SOURCE_OBSERVED.
AUDIT_KIND_SKILL_SET Records updated skill markdown.
AUDIT_KIND_RULE_SET Records the new rule.
AUDIT_KIND_SESSION_CREATED Records creator_id, manifest type/version, participants.
AUDIT_KIND_SESSION_CLOSED Records reason.
AUDIT_KIND_SESSION_EXPIRED Records the TTL details.
AUDIT_KIND_TASK_TERMINATED Records owner_id, capability, outcome, latency_ms.
AUDIT_KIND_EXPECTATION_VIOLATED Records expectation, session_id, evaluator details.

Inspection patterns#

# Filter to violations only.
violations = [
    r for r in await hub._audit_log.read_all()
    if r["kind"] == AUDIT_KIND_EXPECTATION_VIOLATED
]

# Filter to one session.
session_records = [
    r for r in await hub._audit_log.read_all()
    if r.get("session_id") == session_id
]

The AuditLog is durable when the hub is backed by DiskKnowledgeStore; with MemoryKnowledgeStore it lives only as long as the hub.

Custom Evaluators#

Same shape as the built-ins:

from typing import ClassVar
from autogen.beta.network.hub import (
    ExpectationContext,
    ExpectationEvaluator,
    Violation,
)

class TooManyMessagesEvaluator:
    name: ClassVar[str] = "too_many_messages"

    def evaluate(self, ctx: ExpectationContext) -> list[Violation]:
        threshold = ctx.params["max"]
        text_count = sum(1 for e in ctx.wal if e.event_type == EV_TEXT)
        if text_count > threshold:
            return [Violation(
                expectation=self.name,
                session_id=ctx.session.session_id,
                detail=f"text count {text_count} exceeds {threshold}",
            )]
        return []

Register on a custom registry and pass to Hub.open(..., evaluators=registry). The default registry can also be mutated via the module-level register_evaluator(...) helper.