Governance & Audit
The hub enforces governance and surfaces observability through a few layered seams:
- Evaluators — pure functions over channel state that return zero or more
Violationrecords when their thresholds are breached. - Violation handlers — what to do when a violation fires: log to the audit trail, notify the channel, or auto-close.
- The audit log — append-only record of every governance-relevant event the hub processes (itself a
HubListener). HubListener— read-only observers the hub fans state transitions out to, after the fact.HubArbiter— the gatekeeper the hub consults before committing register / channel-open / send / dispatch decisions.
Evaluators#
Three evaluators ship today, addressed by name in adapter manifests:
| Name | Class | Threshold |
|---|---|---|
"acks_within" | AcksWithinEvaluator | All invitees must ack within params["seconds"] of channel creation. |
"reply_within" | ReplyWithinEvaluator | The respondent must reply within params["seconds"] of the initiator's first send (consulting only). |
"max_silence" | MaxSilenceEvaluator | No participant may go silent for longer than params["seconds"]. |
"turn_within" | (composes from the above) | The next speaker must speak within params["seconds"] of being scheduled. |
Each evaluator implements:
class ExpectationEvaluator(Protocol):
name: ClassVar[str]
def evaluate(self, ctx: ExpectationContext) -> list[Violation]: ...
ExpectationContext is a small dataclass holding the metadata, WAL slice, current time, and the expectation's params. Evaluators are pure — no I/O, no mutation — so they're trivially testable.
The default registry exposes them as default_evaluators(). Custom evaluators register similarly to custom transition targets.
Adapter-Declared Expectations#
Each adapter's manifest declares its defaults (see Adapters Overview for the table). Examples:
# ConsultingAdapter
expectations = [
Expectation(name="acks_within", on_violation="auto_close", params={"seconds": 30}),
Expectation(name="reply_within", on_violation="auto_close", params={"seconds": 600}),
]
# ConversationAdapter
expectations = [
Expectation(name="max_silence", on_violation="audit", params={"seconds": 3600}),
]
Expectation.on_violation selects the handler:
on_violation | Handler | Effect |
|---|---|---|
"audit" | AuditHandler | Write to the audit log only. Channel continues. |
"warn" | NotifyChannelHandler | Post EV_EXPECTATION_VIOLATED on the channel WAL. |
"auto_close" | AutoCloseHandler | Close the channel with reason="expectation_violated:<name>"; record to audit. |
"hide" | (custom) | Hide later turns from the offending participant; not yet implemented as a built-in. |
The default registry exposes them as default_handlers().
The Sweeper Loop#
When the hub is open, an expectation sweeper task wakes every expectation_sweep_interval (default 10 s), walks every active channel, runs each expectation's evaluator, and dispatches any violations to the configured handler.
For deterministic tests / examples:
hub._expectation_tick() is a public-by-convention test entry point — a leading underscore, but exercised explicitly by the test suite.
Audit Log#
hub.audit_log is an AuditLog instance — append-only, and itself a registered HubListener (so every state transition the hub fans out also lands as one structured record). It writes a single audit.jsonl under the hub's KnowledgeStore.
Each record is a plain dict with at minimum kind and at; kind-specific fields appear alongside.
| Member | Purpose |
|---|---|
await hub.audit_log.read_all() | Read + parse the whole log. [] if absent. |
await hub.audit_log.append(record) | Write one record. The kind set is open — tenants/subclasses append their own kind values here. |
hub.audit_log.subscribe(cb) / unsubscribe(cb) | Live tail — cb(record) fires per appended record (no polling). Subscriber exceptions are logged and swallowed. |
hub.audit_log.bytes_written | Process-local byte counter (resets on hub restart). Surfaced by hub.health() as audit_log_bytes. |
hub.replace_audit_log(custom) | Swap in a tenant-provided AuditLog subclass (e.g. a different on-disk format). The replacement is registered as the first listener so audit writes still complete before tenant listeners observe the same event. |
Audit kinds#
Re-exported as constants from autogen.beta.network:
| Constant | Notes |
|---|---|
AUDIT_KIND_AGENT_REGISTERED | Records agent_id, name. |
AUDIT_KIND_AGENT_UNREGISTERED | Records agent_id, name. |
AUDIT_KIND_RESUME_SET | Records the source: RESUME_SOURCE_TENANT (a set_resume call) or RESUME_SOURCE_OBSERVED (a record_observation). |
AUDIT_KIND_SKILL_SET | Records updated skill markdown. |
AUDIT_KIND_RULE_SET | Records the new rule. |
AUDIT_KIND_CHANNEL_CREATED | Records creator_id, manifest type/version, participants. |
AUDIT_KIND_CHANNEL_CLOSED | Records reason. |
AUDIT_KIND_CHANNEL_EXPIRED | Records the TTL details. |
AUDIT_KIND_TASK_TERMINATED | Records owner_id, capability, outcome, latency_ms. |
AUDIT_KIND_EXPECTATION_VIOLATED | Records expectation, channel_id, evaluator details. |
AUDIT_KIND_TURN_FAILED | A notify handler crashed processing an inbound envelope. Records channel_id, agent_id, envelope_id, exc_type, exc_message. |
Inspection patterns#
The AuditLog is durable when the hub is backed by DiskKnowledgeStore; with MemoryKnowledgeStore it lives only as long as the hub.
HubListener — observing state transitions#
A HubListener is a read-only observer. Attach one with hub.register_listener(...); the hub awaits the matching method after the corresponding state change commits — listeners observe, they don't gate (that's HubArbiter, below). Registration itself is inert: there's no startup hook, and methods only fire on subsequent transitions. Each listener call is wrapped in its own try/except so a throwing listener can't stall dispatch — the exception is logged at ERROR and the next listener still runs.
BaseHubListener is a no-op base — subclass it and override only the events you care about (you don't have to implement the full Protocol surface). The built-in AuditLog is a HubListener, pre-registered on every hub, which is why a fresh hub already reports registered_listeners: 1.
| Method | Fires when |
|---|---|
on_envelope_posted(envelope, metadata) | An envelope was validated, WAL-appended, folded, and dispatched. |
on_envelope_rejected(envelope, reason) | An envelope was rejected before WAL append. reason is the typed NetworkError the sender saw. |
on_dispatch_failed(envelope, recipient_id, reason) | Delivery of an accepted envelope to one recipient failed (the rest of the audience may have received it). |
on_channel_event(channel_id, kind, payload) | kind ∈ opened / closed / expired / participant_removed / participant_hidden. |
on_agent_event(agent_id, kind, payload) | kind ∈ registered / unregistered / resume_set / skill_set / rule_set / observation_recorded. |
on_expectation_fired(channel_id, expectation, violation) | An evaluator emitted a violation (deduped per (channel, expectation, violator)). |
on_turn_failed(channel_id, agent_id, envelope_id, exc) | A notify handler crashed processing an inbound envelope. |
on_task_event(task_id, kind, payload) | kind ∈ started / progress / completed / failed / expired / cancelled / mirror_failed. |
on_inbox_pressure(agent_id, pending, cap) | A recipient's pending count first crossed its inbox high-water mark (fires once per crossing). |
Subclassing the Hub instead of registering a listener#
The same on_* methods are available on Hub itself (with no-op defaults) — so if you're building a custom hub you can override them directly rather than registering the hub as a listener of itself:
Subclass overrides fire alongside any externally-registered listeners, with the same per-callee try/except isolation. Use a subclass when the observation logic belongs to your hub implementation; use register_listener(...) when it's a separate concern (metrics shipper, audit tap) you want to attach and detach independently.
on_task_event is the one listener hook that isn't purely hub-driven — TaskMirror (and other tenant code) emit "mirror_failed" and other kinds through it. The public way to fan one out is await hub_client.fire_task_event(task_id, kind, payload) (from a registered tenant) or await hub.fire_task_event(...) (direct); neither touches the hub's private fan-out.
Health snapshot#
hub.health() is a cheap, in-memory operational snapshot — wire it to a /health endpoint or dashboard:
HubArbiter — the decision seam#
Where a HubListener only observes, a HubArbiter decides. The hub consults the active arbiter inline before committing register / channel-open / send / dispatch decisions. Exactly one arbiter is active at a time; install yours with hub.register_arbiter(arbiter) (and read it back via hub.arbiter).
Each gate returns a Decision — Allow() or Deny(reason, error=...), where error selects which NetworkError subclass the hub raises back to the caller (defaults to AccessDeniedError):
| Gate | Called before | Default RuleBasedArbiter checks |
|---|---|---|
authorize_send(envelope, sender, sender_rule, recipients) | post_envelope WAL append | access.outbound_to, limits.delegation_depth |
authorize_inbox(envelope, recipient, recipient_rule, current_pending) | per-recipient, on post_envelope | limits.inbox.max_pending (denies with InboxFull) |
authorize_dispatch(envelope, sender, recipient, recipient_rule) | each notify frame | access.inbound_from (deny ⇒ silently skip that recipient) |
authorize_channel_open(manifest, creator, creator_rule, invitees, invitee_rules, active_creator_channels) | create_channel | each invitee's access.inbound_from, creator's limits.max_concurrent_channels |
authorize_register(passport, resume, rule) | (reserved — not yet wired by the hub) | always Allow |
resolve_unknown_audience(envelope, unknown_ids) | dispatch to ids the hub doesn't know | returns None (drop silently) — the federation hook |
The default RuleBasedArbiter enforces the per-agent Rule (access + limits) — exactly the behavior the hub had inline before this seam existed. Two ways to customise:
- Add policy on top — subclass
RuleBasedArbiterandawait super()in the gates you extend. - Start from scratch — subclass
BaseHubArbiter(all gates returnAllowby default) and implement only the ones you need.
A Deny from authorize_send / authorize_inbox / authorize_channel_open surfaces to the caller as the chosen NetworkError (so channel.send(...) raises AccessDeniedError); a Deny from authorize_dispatch just drops that one recipient. resolve_unknown_audience is the seam a federated arbiter uses to re-route to a local proxy id instead of dropping.
Turn-failure resilience#
The default notify handler wraps its entire substantive path — channel resolve, view projection, adapter.extract_turn_input, agent.ask, round-envelope build, outbound send. If any step raises, the handler:
- routes the failure through
HubClient.report_turn_failure→Hub.report_turn_failure, - which fans
on_turn_failed(channel_id, agent_id, envelope_id, exc)out to everyHubListener— the built-inAuditLogwrites anAUDIT_KIND_TURN_FAILEDrecord, - then returns cleanly. No reply envelope is posted, but the channel stays active and the next envelope flows normally — a buggy turn no longer takes down the receive loop.
React however you like — retry, escalate, surface to a UI — by registering a listener that overrides on_turn_failed.
Custom Evaluators#
Same shape as the built-ins:
Register on a custom registry and pass to Hub.open(..., evaluators=registry). The default registry can also be mutated via the module-level register_evaluator(...) helper.