Skip to content

Hub & Identity

The hub is the network's single source of truth: registry, audit log, channel table, write-ahead logs, expectation evaluators, sweepers. Every send, observation, and rule check goes through it.

This page covers the registry-side primitives — what you stand up before any agent connects.

Hub.open#

1
2
3
4
5
6
7
8
9
from autogen.beta.knowledge import MemoryKnowledgeStore
from autogen.beta.network import Hub

hub = await Hub.open(
    MemoryKnowledgeStore(),
    ttl_sweep_interval=30.0,        # default: 30s
    expectation_sweep_interval=10.0,# default: 10s
    invite_ack_timeout=30.0,        # default: 30s
)
Parameter Type Description
store KnowledgeStore Persistent backing for the audit log, registry, and WAL. MemoryKnowledgeStore for in-process; DiskKnowledgeStore(path) for durability across restarts.
auth AuthRegistry | None Authentication adapters. Defaults to AuthRegistry.default() (which has NoAuth only).
clock Callable[[], str] | None ISO-8601 clock. Tests override with a MockClock; production uses the default UTC clock.
ttl_sweep_interval float How often the TTL sweeper walks active channels. Set to 0 to disable.
expectation_sweep_interval float How often the expectation sweeper checks declared expectations. Set to 0 to disable.
invite_ack_timeout float Per-channel ack window before the hub auto-closes.

Hub.open(...) is the factory; the constructor Hub(...) exists but doesn't start sweepers. Use open() unless you have a reason not to.

Note

All side effects (sweeper tasks, store initialisation) happen inside Hub.open(), never in the constructor — this matches the AGENTS.md "no side effects in __init__" rule. The same pattern applies throughout the package.

Identity Model#

Three small dataclasses describe an agent on the network:

from autogen.beta.network import Passport, Resume, ResumeExample

passport = Passport(
    name="alice",
    owner="acme",
    model="claude-sonnet-4-6",
)

resume = Resume(
    claimed_capabilities=["analysis", "policy"],
    domains=["finance"],
    summary="Senior policy analyst — scenario synthesis and rebuttal review.",
    examples=[ResumeExample(title="Q3 risk brief", note="…")],
)

Passport — stable identity#

Field Required Notes
name Unique within the hub. The human-readable handle.
agent_id hub-stamped Issued by the hub at registration; use this for routing.
owner optional Tenant id for multi-tenant deployments.
model optional Free-form model identifier; surfaces on peer-lookup results when peer discovery ships.
kind optional One of "agent", "human", "remote_agent", or None. None is the back-compat alias for "agent". "human" is set automatically by hc.register_human(...); "remote_agent" is reserved for a future federation / A2A bridge. __post_init__ rejects any other value. Exported as PassportKind / PASSPORT_KINDS.
created_at hub-stamped ISO-Z timestamp.

hub.list_agents(kind=...) filters the registry by participant kind — kind="human" returns only humans, kind="agent" returns agents (and matches kind=None passports), kind=None returns everything. Non-LLM participants are registered via hc.register_human(...) — see HumanClient (HITL).

Resume — capability claims and observed track record#

Field Source Notes
claimed_capabilities tenant Free-form capability strings (e.g. "research", "summarisation").
domains tenant Coarse subject-matter areas.
summary tenant One-line description, indexed for peer lookup.
examples tenant Optional ResumeExample records — exemplar past work.
observed hub-mutated Per-capability ObservedStat: counts of completed/failed/expired tasks plus a p50_latency_ms. Updated automatically when an agent runs a capability-tagged agent.task(...) inside a network turn — see Task Observation.
last_updated hub-stamped ISO-Z, refreshed on every mutation.

Resume is the agent's network "CV" — both what it claims to do and what the hub has observed it actually doing.

Rule — per-agent governance#

Optional per-agent policy. Pass to hc.register(agent, passport, resume, rule=...).

from autogen.beta.network import Rule, AccessBlock, LimitsBlock, RateBlock, InboxBlock

rule = Rule(
    access=AccessBlock(
        outbound_to=["bob", "carol"],   # whitelist who this agent can address
        channel_types_allowed=["consulting", "discussion"],
    ),
    limits=LimitsBlock(
        channel_ttl_default="4h",
        delegation_depth=2,
    ),
    rate=RateBlock(
        envelopes_per_minute=60,
    ),
    inbox=InboxBlock(
        max_pending=100,
    ),
)
Block Controls
AccessBlock Who this agent can talk to; which channel types it can create or join.
LimitsBlock Default TTLs for channels this agent creates; max delegation depth.
RateBlock Rate limits on outbound envelopes.
InboxBlock Inbound queue cap — protects an agent from being flooded.

When a rule's check fails, the hub raises AccessDeniedError (deny by access) or InboxFull (queue cap) instead of letting the envelope through.

Authentication#

1
2
3
4
5
6
7
8
9
from autogen.beta.network import AuthAdapter, AuthRegistry, NoAuth

# default: NoAuth registered for the empty scheme
hub = await Hub.open(store)

# explicit:
auth = AuthRegistry()
auth.register("hmac", MyHMACAdapter())
hub = await Hub.open(store, auth=auth)

AuthAdapter is a Protocol:

class AuthAdapter(Protocol):
    async def verify(self, passport: Passport, credentials: AuthBlock) -> None: ...

Hub calls verify(...) at registration. Raise AuthError to reject. The default registry registers NoAuth for the empty scheme so the simple in-process flow works with no setup.

Audit Log#

Every governance-relevant event the hub processes lands in hub.audit_log (an AuditLog, which is itself a registered HubListener):

1
2
3
records = await hub.audit_log.read_all()
for r in records:
    print(r["kind"], r["at"], r)

Audit kinds (re-exported from autogen.beta.network):

Kind When
AUDIT_KIND_AGENT_REGISTERED hc.register(...)
AUDIT_KIND_AGENT_UNREGISTERED hc.unregister(...)
AUDIT_KIND_RESUME_SET hub.set_resume(...)
AUDIT_KIND_SKILL_SET hub.set_skill(...)
AUDIT_KIND_RULE_SET hub.set_rule(...)
AUDIT_KIND_CHANNEL_CREATED alice.open(...)
AUDIT_KIND_CHANNEL_CLOSED adapter close, explicit close, or auto_close violation
AUDIT_KIND_CHANNEL_EXPIRED TTL sweep
AUDIT_KIND_TASK_TERMINATED A task observed via TaskMirror reached a terminal state
AUDIT_KIND_EXPECTATION_VIOLATED An expectation evaluator's threshold elapsed
AUDIT_KIND_TURN_FAILED A notify handler crashed while processing an inbound envelope (channel stays alive — see resilience note below)

The kind set is open — tenants and hub subclasses can write their own kinds via hub.audit_log.append(record). The audit log is the single trail for compliance and debugging — see Governance, Audit & Observability.

Reading Hub State#

Call Returns
await hub.get_channel(channel_id) ChannelMetadata snapshot.
await hub.get_resume(agent_id) Current Resume.
await hub.get_passport(agent_id) Current Passport.
await hub.read_wal(channel_id) Ordered list of Envelopes in that channel.
hub.audit_log The AuditLog instance — await hub.audit_log.read_all() for every record, hub.audit_log.subscribe(cb) to live-tail.
hub.health() Cheap in-memory operational snapshot (active_channels, registered_agents, pending_inbox_total, max_pending_inbox_depth, registered_listeners, adapters_loaded, audit_log_bytes).

Hub TTL & Sweepers#

Two background tasks run as long as the hub is open:

  • TTL sweeper — walks active channels and expires those past created_at + ttl. Default TTL comes from Rule.limits.channel_ttl_default or the adapter's manifest.
  • Expectation sweeper — walks the expectation evaluators registered for each open channel, fires ViolationHandlers on threshold breach.

Both are tunable via ttl_sweep_interval and expectation_sweep_interval. Set to 0 to disable for tests; in those cases call await hub._ttl_tick() or await hub._expectation_tick() manually for deterministic timing.

Custom sweepers#

Attach your own periodic worker — protocol-specific background work like polling a chat platform's presence list or refreshing an auth token:

1
2
3
4
5
6
async def heartbeat() -> None:
    ...  # runs every 30s

hub.register_sweeper("heartbeat", 30.0, heartbeat)
# later, e.g. on a config reload:
await hub.unregister_sweeper("heartbeat")
  • register_sweeper(name, interval_seconds, fn) is synchronous (it only updates bookkeeping). If the hub has already started (Hub.open(...) calls start() for you), the sweeper begins immediately; if you registered it before start(), it begins then.
  • unregister_sweeper(name) is async — it awaits clean cancellation of the running task. Unknown names are a no-op.
  • A duplicate name raises ValueError (call unregister_sweeper first to replace); a non-positive interval raises ValueError.
  • hub.close() stops every custom sweeper automatically — you don't need to track them yourself.

Closing Down#

await hub.close()

Cancels all sweeper tasks, closes the underlying store, drains pending I/O. Always pair Hub.open(...) with hub.close() (typically in a try/finally).