Skip to content

Hub

autogen.beta.network.hub.core.Hub #

Hub(store, *, auth=None, clock=None, ttl_sweep_interval=30.0, expectation_sweep_interval=10.0, invite_ack_timeout=30.0)

In-process registry, dispatcher, channel state-machine, persistence root.

Construct with :meth:open for production (hydrates from disk and spawns sweepers); the sync __init__ is for tests that need fine-grained control.

Source code in autogen/beta/network/hub/core.py
def __init__(
    self,
    store: KnowledgeStore,
    *,
    auth: AuthRegistry | None = None,
    clock: Callable[[], str] | None = None,
    ttl_sweep_interval: float = 30.0,
    expectation_sweep_interval: float = 10.0,
    invite_ack_timeout: float = 30.0,
) -> None:
    # __init__ stores params; side effects deferred to start()/hydrate().
    self._store = store
    self._auth = auth if auth is not None else AuthRegistry.default()
    self._clock = clock if clock is not None else _utc_now_iso
    self._ttl_sweep_interval = ttl_sweep_interval
    self._expectation_sweep_interval = expectation_sweep_interval
    self._invite_ack_timeout = invite_ack_timeout

    # Audit log + expectation registries. AuditLog is a HubListener;
    # see _install_default_listeners() — it installs as the first
    # listener so audit records are written before any tenant
    # listener observes the same event.
    self._audit_log = AuditLog(store, clock=self._clock)
    self._expectation_evaluators: dict[str, ExpectationEvaluator] = {}
    self._violation_handlers: dict[str, ViolationHandler] = {}
    # channel_id → set of (expectation_index, expectation_name, violator_id) fired.
    # The position-based index disambiguates same-name expectations
    # (e.g. two ``turn_within`` entries with different ``on_violation``
    # handlers — without the index the first to fire would suppress
    # the second). Empty violator_id ("") = channel-wide violations.
    self._fired_violations: dict[str, set[tuple[int, str, str]]] = {}

    # Identity caches.
    self._passports: dict[str, Passport] = {}
    self._resumes: dict[str, Resume] = {}
    self._rules: dict[str, Rule] = {}
    self._skills: dict[str, str] = {}
    self._name_to_id: dict[str, str] = {}
    # capability name → set of agent_ids that claim or have observed it.
    # Persisted as registry/by_capability.json on every mutation
    # (rebuilt from resumes on hydrate — the file is a derived cache).
    self._capability_index: dict[str, set[str]] = {}

    # Adapter registry.
    self._adapters: dict[tuple[str, int], ChannelAdapter] = {}

    # Channel caches.
    self._channels: dict[str, ChannelMetadata] = {}
    self._active_channels: dict[str, ChannelMetadata] = {}
    self._adapter_states: dict[str, object] = {}
    self._channel_open_waiters: dict[str, asyncio.Future[ChannelMetadata]] = {}

    # Task caches (observed; not owned).
    self._tasks: dict[str, TaskMetadata] = {}
    self._channel_tasks: dict[str, set[str]] = {}
    # task_ids whose terminal observation has been recorded into
    # the owner's ``Resume.observed`` already. Prevents double-counting
    # when the same task receives multiple terminal events (e.g. a
    # channel-cascade EXPIRED followed by an owner-emitted COMPLETED).
    self._observed_task_ids: set[str] = set()

    # Per-recipient outstanding-envelope counter for ``InboxBlock.max_pending``
    # enforcement. Incremented on dispatch to that recipient,
    # decremented when the recipient posts any envelope (treating
    # any outbound activity as "I'm processing my inbox"). A
    # best-effort approximation; per-channel ack semantics require
    # a transport with ack frames.
    self._inbox_pending: dict[str, int] = {}

    # Per-recipient, per-channel delivery cursor:
    # ``agent_id -> {channel_id -> last-acked envelope_id}``. Each
    # channel is an independently-ordered stream (its own WAL +
    # lock), so the cursor is scoped per channel: an ack in one
    # channel must not advance the high-water mark of another, or an
    # older unacked envelope elsewhere would never replay. On
    # reconnect with ``HelloFrame.since_envelope_id`` set, the hub
    # replays, per channel, every dispatched envelope whose id sorts
    # strictly above ``max(channel_cursor, since_envelope_id)``.
    # Envelope ids come from ``self._mint_envelope_id`` (strictly
    # monotonic, time-ordered), so lexicographic compare matches
    # dispatch ordering within a channel.
    self._inbox_cursors: dict[str, dict[str, str]] = {}

    # Strictly-monotonic source for envelope ids. The cursor and
    # replay above rely on per-channel WAL order == sort order, but
    # ``time.time_ns`` can repeat within a tick on coarse-resolution
    # clocks (Windows ~15 ms), so a plain ``make_id`` would sort two
    # same-tick envelopes by their random suffix — non-deterministically.
    # This clamps each id strictly above the last so sort order always
    # tracks mint order. State is per-hub (no shared global counter).
    self._mint_envelope_id = _MonotonicIds()

    # Federation dispatch registry keyed by ``proxy.scheme``. When
    # a recipient passport has ``effective_kind == "remote_agent"``
    # the hub looks up the proxy by ``recipient.auth.scheme`` and
    # hands the envelope to ``proxy.dispatch(...)`` instead of
    # sending a ``NotifyFrame`` to a local endpoint. No proxies
    # ship in the framework; tenants register their own.
    self._remote_proxies: dict[str, RemoteAgentProxy] = {}

    # ``(channel_id, sender_id, causation_id) -> envelope_id``.
    # Lets handlers short-circuit logically-duplicate work after an
    # at-least-once redelivery: a sender that retries with the same
    # ``causation_id`` looks up the prior envelope and skips the
    # repeated side effect. Populated on every WAL append, pruned
    # when a channel transitions to a terminal state, and rebuilt
    # from active-channel WALs on ``hydrate()``.
    self._causation_index: dict[tuple[str, str, str], str] = {}

    # Transport-side state.
    self._endpoints_by_id: dict[str, LinkEndpoint] = {}
    self._agent_to_endpoint: dict[str, str] = {}
    self._endpoint_to_agents: dict[str, set[str]] = {}
    self._endpoint_tasks: set[asyncio.Task[None]] = set()

    # Per-channel locks for WAL append + dispatch ordering.
    self._channel_locks: dict[str, asyncio.Lock] = {}
    self._registration_lock = asyncio.Lock()

    self._ttl_sweeper: _IntervalSweeper | None = None
    self._expectation_sweeper: _IntervalSweeper | None = None
    # Subclass-registered periodic workers. Keyed by name so a
    # subclass can replace a sweeper at the same name (e.g. via a
    # config reload) by unregister + re-register.
    self._custom_sweepers: dict[str, _IntervalSweeper] = {}
    # Set True by ``start()`` so ``register_sweeper`` knows whether
    # to spawn the new sweeper immediately or queue it for ``start()``.
    self._started = False
    self._closed = False

    # Observability + decision-making seams. Audit log is registered
    # as the first listener so it sees every event a tenant
    # listener does, in the same order.
    self._listeners: list[HubListener] = [self._audit_log]
    self._arbiter: HubArbiter = RuleBasedArbiter()

audit_log property #

audit_log

Public access to the built-in audit log (a :class:HubListener).

Use this to call :meth:AuditLog.read_all from tooling or to attach a live subscriber via :meth:AuditLog.subscribe. Custom hub subclasses that want a different audit format can replace the instance via :meth:replace_audit_log.

arbiter property #

arbiter

The currently active arbiter (read-only access for testing).

open async classmethod #

open(store, *, auth=None, clock=None, ttl_sweep_interval=30.0, expectation_sweep_interval=10.0, invite_ack_timeout=30.0, register_default_adapters=True)

Construct + hydrate from disk + start sweepers. Production entry point.

register_default_adapters=True (default) registers the built-in adapters (consulting@v1, conversation@v1, discussion@v1) and the built-in expectation evaluators / violation handlers (acks_within / reply_within / max_silence, audit / notify_channel / auto_close) so simple test setups don't need explicit registration calls.

Set expectation_sweep_interval=0 to disable the expectation sweeper entirely (tests usually do this to avoid background timer noise).

Source code in autogen/beta/network/hub/core.py
@classmethod
async def open(
    cls,
    store: KnowledgeStore,
    *,
    auth: AuthRegistry | None = None,
    clock: Callable[[], str] | None = None,
    ttl_sweep_interval: float = 30.0,
    expectation_sweep_interval: float = 10.0,
    invite_ack_timeout: float = 30.0,
    register_default_adapters: bool = True,
) -> "Hub":
    """Construct + hydrate from disk + start sweepers. Production entry point.

    ``register_default_adapters=True`` (default) registers the
    built-in adapters (``consulting@v1``, ``conversation@v1``,
    ``discussion@v1``) and the built-in expectation evaluators /
    violation handlers (``acks_within`` / ``reply_within`` /
    ``max_silence``, ``audit`` / ``notify_channel`` /
    ``auto_close``) so simple test setups don't need explicit
    registration calls.

    Set ``expectation_sweep_interval=0`` to disable the expectation
    sweeper entirely (tests usually do this to avoid background
    timer noise).
    """
    hub = cls(
        store,
        auth=auth,
        clock=clock,
        ttl_sweep_interval=ttl_sweep_interval,
        expectation_sweep_interval=expectation_sweep_interval,
        invite_ack_timeout=invite_ack_timeout,
    )
    if register_default_adapters:
        hub.register_adapter(ConsultingAdapter())
        hub.register_adapter(ConversationAdapter())
        hub.register_adapter(DiscussionAdapter())
        hub.register_adapter(WorkflowAdapter())
        for evaluator in default_evaluators():
            hub.register_expectation_evaluator(evaluator)
        for handler in default_handlers():
            hub.register_violation_handler(handler)
    await hub.hydrate()
    await hub.start()
    return hub

hydrate async #

hydrate()

Walk the store; rebuild caches. Idempotent.

Loads identities, channels, and tasks from disk. Active channel WALs are re-folded through their adapter so the _adapter_states cache is rebuilt deterministically.

Source code in autogen/beta/network/hub/core.py
async def hydrate(self) -> None:
    """Walk the store; rebuild caches. Idempotent.

    Loads identities, channels, and tasks from disk. Active channel
    WALs are re-folded through their adapter so the
    ``_adapter_states`` cache is rebuilt deterministically.
    """
    self._passports.clear()
    self._resumes.clear()
    self._rules.clear()
    self._skills.clear()
    self._name_to_id.clear()
    self._capability_index.clear()
    self._channels.clear()
    self._active_channels.clear()
    self._adapter_states.clear()
    self._tasks.clear()
    self._channel_tasks.clear()
    self._inbox_cursors.clear()
    self._causation_index.clear()

    # Identities.
    agent_children = await self._store.list(agents_root())
    for child in agent_children:
        if not child.endswith("/"):
            continue
        agent_id = child.rstrip("/")
        await self._load_agent(agent_id)

    # Rebuild capability index from loaded resumes — by_capability.json
    # is a derived cache, the resumes are the authoritative source.
    for agent_id, resume in self._resumes.items():
        for cap in resume.claimed_capabilities:
            self._capability_index.setdefault(cap, set()).add(agent_id)
        for cap in resume.observed:
            self._capability_index.setdefault(cap, set()).add(agent_id)

    # Channels — load metadata first, then re-fold WALs.
    channel_children = await self._store.list(channels_root())
    for child in channel_children:
        if not child.endswith("/"):
            continue
        channel_id = child.rstrip("/")
        await self._load_channel(channel_id)

    # Tasks.
    task_children = await self._store.list(tasks_root())
    for child in task_children:
        if not child.endswith("/"):
            continue
        task_id = child.rstrip("/")
        await self._load_task(task_id)

start async #

start()

Spawn the TTL + expectation + custom sweepers. Idempotent.

ttl_sweep_interval=0 disables the TTL sweeper; expectation_sweep_interval=0 disables the expectation sweeper. Custom sweepers attached via :meth:register_sweeper start here too (registered before start()) or immediately at registration (after start()).

Source code in autogen/beta/network/hub/core.py
async def start(self) -> None:
    """Spawn the TTL + expectation + custom sweepers. Idempotent.

    ``ttl_sweep_interval=0`` disables the TTL sweeper;
    ``expectation_sweep_interval=0`` disables the expectation
    sweeper. Custom sweepers attached via
    :meth:`register_sweeper` start here too (registered before
    ``start()``) or immediately at registration (after ``start()``).
    """
    if self._ttl_sweep_interval > 0 and self._ttl_sweeper is None:
        self._ttl_sweeper = _IntervalSweeper(
            name="ttl",
            interval=self._ttl_sweep_interval,
            fn=self.expire_due,
        )
        self._ttl_sweeper.start()
    if self._expectation_sweep_interval > 0 and self._expectation_sweeper is None:
        self._expectation_sweeper = _IntervalSweeper(
            name="expectations",
            interval=self._expectation_sweep_interval,
            fn=self._expectation_tick,
        )
        self._expectation_sweeper.start()
    for sweeper in self._custom_sweepers.values():
        sweeper.start()
    self._started = True

close async #

close()

Cancel sweepers + endpoint tasks; drain queues. Idempotent.

Source code in autogen/beta/network/hub/core.py
async def close(self) -> None:
    """Cancel sweepers + endpoint tasks; drain queues. Idempotent."""
    if self._closed:
        return
    self._closed = True
    if self._ttl_sweeper is not None:
        await self._ttl_sweeper.stop()
        self._ttl_sweeper = None
    if self._expectation_sweeper is not None:
        await self._expectation_sweeper.stop()
        self._expectation_sweeper = None
    for sweeper in list(self._custom_sweepers.values()):
        await sweeper.stop()
    self._custom_sweepers.clear()
    for task in list(self._endpoint_tasks):
        task.cancel()
    if self._endpoint_tasks:
        await asyncio.gather(*self._endpoint_tasks, return_exceptions=True)
    self._endpoint_tasks.clear()

register_sweeper #

register_sweeper(name, interval_seconds, fn)

Attach a custom periodic worker.

fn is called every interval_seconds. Subclasses use this for protocol-specific background work (e.g. polling a chat platform's presence list, refreshing an auth token).

If Hub.start() has already run, the sweeper starts immediately. Otherwise it's queued and starts when start() runs.

Re-registering at the same name raises ValueError — use :meth:unregister_sweeper first if you mean to replace.

Sync vs. async: register_sweeper is synchronous because it only updates internal bookkeeping (and may call the underlying _IntervalSweeper.start which schedules a fire-and-forget task). :meth:unregister_sweeper is async because it awaits the sweeper's task cancellation to ensure clean shutdown.

Source code in autogen/beta/network/hub/core.py
def register_sweeper(
    self,
    name: str,
    interval_seconds: float,
    fn: Callable[[], "Awaitable[None]"],
) -> None:
    """Attach a custom periodic worker.

    ``fn`` is called every ``interval_seconds``. Subclasses use this
    for protocol-specific background work (e.g. polling a chat
    platform's presence list, refreshing an auth token).

    If ``Hub.start()`` has already run, the sweeper starts immediately.
    Otherwise it's queued and starts when ``start()`` runs.

    Re-registering at the same ``name`` raises ``ValueError`` — use
    :meth:`unregister_sweeper` first if you mean to replace.

    Sync vs. async: ``register_sweeper`` is synchronous because it
    only updates internal bookkeeping (and may call the underlying
    ``_IntervalSweeper.start`` which schedules a fire-and-forget
    task). :meth:`unregister_sweeper` is async because it awaits the
    sweeper's task cancellation to ensure clean shutdown.
    """
    if name in self._custom_sweepers:
        raise ValueError(f"sweeper already registered: {name!r}")
    if interval_seconds <= 0:
        raise ValueError(f"interval_seconds must be positive: {interval_seconds}")
    sweeper = _IntervalSweeper(name=name, interval=interval_seconds, fn=fn)
    self._custom_sweepers[name] = sweeper
    if self._started:
        # ``start()`` has already run — start this sweeper immediately.
        sweeper.start()

unregister_sweeper async #

unregister_sweeper(name)

Stop and remove a custom sweeper. No-op if absent.

Async to mirror :meth:_IntervalSweeper.stop, which awaits cancellation of the running task. Custom sweepers registered before :meth:start still go through this path on :meth:close, so subclasses don't need to track them themselves.

Source code in autogen/beta/network/hub/core.py
async def unregister_sweeper(self, name: str) -> None:
    """Stop and remove a custom sweeper. No-op if absent.

    Async to mirror :meth:`_IntervalSweeper.stop`, which awaits
    cancellation of the running task. Custom sweepers registered
    before :meth:`start` still go through this path on
    :meth:`close`, so subclasses don't need to track them
    themselves.
    """
    sweeper = self._custom_sweepers.pop(name, None)
    if sweeper is not None:
        await sweeper.stop()

register_adapter #

register_adapter(adapter)

Register a ChannelAdapter keyed by (type, version).

Re-registering at the same key replaces the prior adapter; the old key's existing in-flight channels keep their snapshotted manifest for life.

Source code in autogen/beta/network/hub/core.py
def register_adapter(self, adapter: ChannelAdapter) -> None:
    """Register a ``ChannelAdapter`` keyed by ``(type, version)``.

    Re-registering at the same key replaces the prior adapter; the
    old key's existing in-flight channels keep their snapshotted
    manifest for life.
    """
    key = (adapter.manifest.type, adapter.manifest.version)
    self._adapters[key] = adapter

register_listener #

register_listener(listener)

Attach a :class:HubListener to receive state-transition events.

Listeners receive events in registration order. Each is wrapped in try/except so one buggy listener cannot stall dispatch — its exception is logged at ERROR and the next listener still runs.

Source code in autogen/beta/network/hub/core.py
def register_listener(self, listener: HubListener) -> None:
    """Attach a :class:`HubListener` to receive state-transition events.

    Listeners receive events in registration order. Each is wrapped
    in try/except so one buggy listener cannot stall dispatch — its
    exception is logged at ``ERROR`` and the next listener still
    runs.
    """
    self._listeners.append(listener)

unregister_listener #

unregister_listener(listener)

Detach a previously-registered listener. No-op if absent.

Source code in autogen/beta/network/hub/core.py
def unregister_listener(self, listener: HubListener) -> None:
    """Detach a previously-registered listener. No-op if absent."""
    with contextlib.suppress(ValueError):
        self._listeners.remove(listener)

report_turn_failure async #

report_turn_failure(*, channel_id, agent_id, envelope_id, exc)

Fan out an on_turn_failed event to every listener.

Public entry point so client-side notify handlers can route substantive-turn crashes through the observability surface without touching hub privates. AuditLog (the built-in listener) records the failure; tenant listeners react however they choose.

Source code in autogen/beta/network/hub/core.py
async def report_turn_failure(
    self,
    *,
    channel_id: str,
    agent_id: str,
    envelope_id: str,
    exc: BaseException,
) -> None:
    """Fan out an ``on_turn_failed`` event to every listener.

    Public entry point so client-side notify handlers can route
    substantive-turn crashes through the observability surface
    without touching hub privates. ``AuditLog`` (the built-in
    listener) records the failure; tenant listeners react however
    they choose.
    """
    await self._fan_out(
        "on_turn_failed",
        channel_id,
        agent_id,
        envelope_id,
        exc,
    )

fire_task_event async #

fire_task_event(task_id, kind, payload)

Fan out an on_task_event to every listener.

Public entry point so :class:TaskMirror (and other tenant observers) can route task-side notifications through the hub's listener surface without touching _fan_out. kind is free-form — the built-in listener Protocol documents "started" / "progress" / "completed" / "failed" / "expired" / "cancelled" / "mirror_failed" as the recognised values, but tenants may emit additional kinds.

Source code in autogen/beta/network/hub/core.py
async def fire_task_event(
    self,
    task_id: str,
    kind: str,
    payload: dict,
) -> None:
    """Fan out an ``on_task_event`` to every listener.

    Public entry point so :class:`TaskMirror` (and other tenant
    observers) can route task-side notifications through the hub's
    listener surface without touching ``_fan_out``. ``kind`` is
    free-form — the built-in listener Protocol documents
    ``"started"`` / ``"progress"`` / ``"completed"`` / ``"failed"`` /
    ``"expired"`` / ``"cancelled"`` / ``"mirror_failed"`` as the
    recognised values, but tenants may emit additional kinds.
    """
    await self._fan_out("on_task_event", task_id, kind, payload)

replace_audit_log #

replace_audit_log(audit_log)

Swap the built-in :class:AuditLog for a tenant-provided one.

Unregisters the prior audit log from the listener chain, registers the replacement as the first listener (preserving the convention that audit writes complete before tenant listeners observe the same event), and updates :attr:audit_log to point at it.

Source code in autogen/beta/network/hub/core.py
def replace_audit_log(self, audit_log: AuditLog) -> None:
    """Swap the built-in :class:`AuditLog` for a tenant-provided one.

    Unregisters the prior audit log from the listener chain,
    registers the replacement as the first listener (preserving
    the convention that audit writes complete before tenant
    listeners observe the same event), and updates
    :attr:`audit_log` to point at it.
    """
    with contextlib.suppress(ValueError):
        self._listeners.remove(self._audit_log)
    self._audit_log = audit_log
    self._listeners.insert(0, audit_log)

register_arbiter #

register_arbiter(arbiter)

Replace the active :class:HubArbiter instance.

The default :class:RuleBasedArbiter is installed automatically and enforces per-agent :class:Rule (access + limits) — the same behavior the hub had inline before this seam existed. Tenants replace it to layer custom permission protocols (JWT scope, federation routing, etc.) on top of (or in place of) the rule-based defaults.

Only one arbiter is active at a time; calling this with a new instance replaces the prior arbiter outright.

Source code in autogen/beta/network/hub/core.py
def register_arbiter(self, arbiter: HubArbiter) -> None:
    """Replace the active :class:`HubArbiter` instance.

    The default :class:`RuleBasedArbiter` is installed automatically
    and enforces per-agent :class:`Rule` (access + limits) — the
    same behavior the hub had inline before this seam existed.
    Tenants replace it to layer custom permission protocols
    (JWT scope, federation routing, etc.) on top of (or in place
    of) the rule-based defaults.

    Only one arbiter is active at a time; calling this with a new
    instance replaces the prior arbiter outright.
    """
    self._arbiter = arbiter

register_remote_proxy #

register_remote_proxy(proxy)

Register a federation proxy keyed by proxy.scheme.

When the hub dispatches an envelope to a recipient whose passport has effective_kind == "remote_agent", it looks up the proxy by the recipient's auth.scheme and calls proxy.dispatch(envelope, recipient) instead of sending a NotifyFrame to a local endpoint. Re-registering at the same scheme replaces the prior proxy.

Source code in autogen/beta/network/hub/core.py
def register_remote_proxy(self, proxy: RemoteAgentProxy) -> None:
    """Register a federation proxy keyed by ``proxy.scheme``.

    When the hub dispatches an envelope to a recipient whose
    passport has ``effective_kind == "remote_agent"``, it looks up
    the proxy by the recipient's ``auth.scheme`` and calls
    ``proxy.dispatch(envelope, recipient)`` instead of sending a
    ``NotifyFrame`` to a local endpoint. Re-registering at the
    same ``scheme`` replaces the prior proxy.
    """
    self._remote_proxies[proxy.scheme] = proxy

unregister_remote_proxy #

unregister_remote_proxy(scheme)

Remove the proxy registered for scheme and return it.

Returns None if no proxy was registered for scheme. The caller is responsible for awaiting proxy.close() — the hub leaves lifecycle decisions to whoever owns the proxy instance.

Source code in autogen/beta/network/hub/core.py
def unregister_remote_proxy(self, scheme: str) -> RemoteAgentProxy | None:
    """Remove the proxy registered for ``scheme`` and return it.

    Returns ``None`` if no proxy was registered for ``scheme``.
    The caller is responsible for awaiting ``proxy.close()``
    — the hub leaves lifecycle decisions to whoever owns the
    proxy instance.
    """
    return self._remote_proxies.pop(scheme, None)

remote_proxy_for #

remote_proxy_for(scheme)

Read-only lookup against the proxy registry.

Source code in autogen/beta/network/hub/core.py
def remote_proxy_for(self, scheme: str) -> RemoteAgentProxy | None:
    """Read-only lookup against the proxy registry."""
    return self._remote_proxies.get(scheme)

health #

health()

Return an operational snapshot of hub state.

Cheap to compute (in-memory only). Wire to a /health endpoint or operational dashboard. The shape is intentionally small — operators want a handful of indicative numbers, not the full state.

Fields:

  • active_channels: number of channels in OPENED/PENDING state.
  • registered_agents: total registered identities (agents + humans).
  • pending_inbox_total: sum of per-recipient outstanding envelope counters (best-effort approximation).
  • max_pending_inbox_depth: maximum per-recipient queue depth, or None when nothing is queued. Indicative of the "stuck agent" case.
  • registered_listeners: number of attached :class:HubListener instances (the built-in :class:AuditLog counts).
  • adapters_loaded: number of registered :class:ChannelAdapter instances.
Source code in autogen/beta/network/hub/core.py
def health(self) -> dict:
    """Return an operational snapshot of hub state.

    Cheap to compute (in-memory only). Wire to a ``/health``
    endpoint or operational dashboard. The shape is intentionally
    small — operators want a handful of indicative numbers, not
    the full state.

    Fields:

    * ``active_channels``: number of channels in OPENED/PENDING state.
    * ``registered_agents``: total registered identities (agents +
      humans).
    * ``pending_inbox_total``: sum of per-recipient outstanding
      envelope counters (best-effort approximation).
    * ``max_pending_inbox_depth``: maximum per-recipient queue depth,
      or ``None`` when nothing is queued. Indicative of the
      "stuck agent" case.
    * ``registered_listeners``: number of attached
      :class:`HubListener` instances (the built-in
      :class:`AuditLog` counts).
    * ``adapters_loaded``: number of registered
      :class:`ChannelAdapter` instances.
    """
    max_depth: int | None = None
    if self._inbox_pending:
        max_depth = max(self._inbox_pending.values())
    return {
        "active_channels": len(self._active_channels),
        "registered_agents": len(self._passports),
        "pending_inbox_total": sum(self._inbox_pending.values()),
        "max_pending_inbox_depth": max_depth,
        "registered_listeners": len(self._listeners),
        "adapters_loaded": len(self._adapters),
        "audit_log_bytes": self._audit_log.bytes_written,
    }

on_envelope_posted async #

on_envelope_posted(envelope, metadata)
Source code in autogen/beta/network/hub/core.py
async def on_envelope_posted(self, envelope: Envelope, metadata: ChannelMetadata) -> None:  # noqa: ARG002
    return None

on_envelope_rejected async #

on_envelope_rejected(envelope, reason)
Source code in autogen/beta/network/hub/core.py
async def on_envelope_rejected(self, envelope: Envelope, reason: NetworkError) -> None:  # noqa: ARG002
    return None

on_dispatch_failed async #

on_dispatch_failed(envelope, recipient_id, reason)
Source code in autogen/beta/network/hub/core.py
async def on_dispatch_failed(
    self,
    envelope: Envelope,
    recipient_id: str,
    reason: BaseException,
) -> None:  # noqa: ARG002
    return None

on_channel_event async #

on_channel_event(channel_id, kind, payload)
Source code in autogen/beta/network/hub/core.py
async def on_channel_event(self, channel_id: str, kind: str, payload: dict) -> None:  # noqa: ARG002
    return None

on_agent_event async #

on_agent_event(agent_id, kind, payload)
Source code in autogen/beta/network/hub/core.py
async def on_agent_event(self, agent_id: str, kind: str, payload: dict) -> None:  # noqa: ARG002
    return None

on_expectation_fired async #

on_expectation_fired(channel_id, expectation, violation)
Source code in autogen/beta/network/hub/core.py
async def on_expectation_fired(self, channel_id: str, expectation: object, violation: object) -> None:  # noqa: ARG002
    return None

on_turn_failed async #

on_turn_failed(channel_id, agent_id, envelope_id, exc)
Source code in autogen/beta/network/hub/core.py
async def on_turn_failed(
    self,
    channel_id: str,
    agent_id: str,
    envelope_id: str,
    exc: BaseException,
) -> None:  # noqa: ARG002
    return None

on_task_event async #

on_task_event(task_id, kind, payload)
Source code in autogen/beta/network/hub/core.py
async def on_task_event(self, task_id: str, kind: str, payload: dict) -> None:  # noqa: ARG002
    return None

on_inbox_pressure async #

on_inbox_pressure(agent_id, pending, cap)
Source code in autogen/beta/network/hub/core.py
async def on_inbox_pressure(self, agent_id: str, pending: int, cap: int) -> None:  # noqa: ARG002
    return None

register_expectation_evaluator #

register_expectation_evaluator(evaluator)

Register an evaluator keyed by evaluator.name.

Re-registering the same name replaces the prior evaluator.

Source code in autogen/beta/network/hub/core.py
def register_expectation_evaluator(self, evaluator: ExpectationEvaluator) -> None:
    """Register an evaluator keyed by ``evaluator.name``.

    Re-registering the same name replaces the prior evaluator.
    """
    self._expectation_evaluators[evaluator.name] = evaluator

register_violation_handler #

register_violation_handler(handler)

Register a violation handler keyed by handler.name.

Re-registering the same name replaces the prior handler.

Source code in autogen/beta/network/hub/core.py
def register_violation_handler(self, handler: ViolationHandler) -> None:
    """Register a violation handler keyed by ``handler.name``.

    Re-registering the same name replaces the prior handler.
    """
    self._violation_handlers[handler.name] = handler

register async #

register(passport, resume, *, skill_md=None, rule=None)
Source code in autogen/beta/network/hub/core.py
async def register(
    self,
    passport: Passport,
    resume: Resume,
    *,
    skill_md: str | None = None,
    rule: Rule | None = None,
) -> Passport:
    # Remote-agent passports represent participants on another hub
    # and ``auth.scheme`` is a routing label consumed by the
    # registered ``RemoteAgentProxy`` — not a credential to be
    # validated locally. Skip the local auth check for them;
    # authentication on the wire belongs to the originating hub.
    if passport.effective_kind != "remote_agent":
        adapter = self._auth.get(passport.auth.scheme)
        await adapter.validate(passport, passport.auth.claim)

    async with self._registration_lock:
        # Reject a re-register that collides on ``name``: the prior
        # registration's passport / resume / rule / SKILL.md would
        # be orphaned on disk under a now-unreachable agent_id.
        # Tenants must explicitly ``unregister`` first.
        if passport.name in self._name_to_id:
            raise ProtocolError(
                f"name {passport.name!r} already registered "
                f"(agent_id={self._name_to_id[passport.name]}); "
                "unregister it before re-registering."
            )
        agent_id = make_id()
        passport.agent_id = agent_id
        passport.created_at = self._clock()

        effective_rule = rule if rule is not None else Rule()

        await self._persist_passport(passport)
        await self._persist_resume(agent_id, resume)
        await self._persist_rule(agent_id, effective_rule)
        if skill_md is not None:
            await self._persist_skill(agent_id, skill_md)

        self._passports[agent_id] = passport
        self._resumes[agent_id] = resume
        self._rules[agent_id] = effective_rule
        if skill_md is not None:
            self._skills[agent_id] = skill_md
        self._name_to_id[passport.name] = agent_id

        for cap in resume.claimed_capabilities:
            self._capability_index.setdefault(cap, set()).add(agent_id)
        for cap in resume.observed:
            self._capability_index.setdefault(cap, set()).add(agent_id)

    await self._persist_capability_index()
    logger.info("agent registered: name=%s agent_id=%s", passport.name, agent_id)
    await self._fan_out(
        "on_agent_event",
        agent_id,
        "registered",
        {"passport": passport, "at": self._clock()},
    )
    return passport

unregister async #

unregister(agent_id)
Source code in autogen/beta/network/hub/core.py
async def unregister(self, agent_id: str) -> None:
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")

    async with self._registration_lock:
        passport = self._passports.pop(agent_id, None)
        self._resumes.pop(agent_id, None)
        self._rules.pop(agent_id, None)
        self._skills.pop(agent_id, None)
        if passport is not None and self._name_to_id.get(passport.name) == agent_id:
            self._name_to_id.pop(passport.name, None)

        endpoint_id = self._agent_to_endpoint.pop(agent_id, None)
        if endpoint_id is not None:
            bound = self._endpoint_to_agents.get(endpoint_id)
            if bound is not None:
                bound.discard(agent_id)
                if not bound:
                    self._endpoint_to_agents.pop(endpoint_id, None)

        # Drop the agent from every capability bucket; clean empty
        # buckets so the index stays compact.
        empty_caps: list[str] = []
        for cap, ids in self._capability_index.items():
            ids.discard(agent_id)
            if not ids:
                empty_caps.append(cap)
        for cap in empty_caps:
            self._capability_index.pop(cap, None)

        # Delete on-disk identity files. Without this the next
        # ``hydrate()`` would re-load the unregistered agent from
        # disk. Channels and tasks the agent participated in are
        # kept for audit / read; only the per-agent identity files
        # are removed.
        await self._store.delete(passport_path(agent_id))
        await self._store.delete(resume_path(agent_id))
        await self._store.delete(rule_path(agent_id))
        await self._store.delete(skill_path(agent_id))

        # Drop inbox accounting so a future re-register with a
        # different agent_id starts from zero.
        self._inbox_pending.pop(agent_id, None)
        self._inbox_cursors.pop(agent_id, None)
        await self._store.delete(inbox_cursor_path(agent_id))

    await self._persist_capability_index()
    logger.info("agent unregistered: agent_id=%s", agent_id)
    await self._fan_out(
        "on_agent_event",
        agent_id,
        "unregistered",
        {"name": passport.name if passport is not None else None, "at": self._clock()},
    )

name_for #

name_for(agent_id, *, default=None)

Resolve agent_id to its registered Passport.name.

Reads the in-memory passport directory. Returns default when the id is unknown (or agent_id itself if default is None), so callers can use this as a safe NameResolver for view projection without needing to handle the unregistered / unregistered-mid-turn case.

Source code in autogen/beta/network/hub/core.py
def name_for(self, agent_id: str, *, default: str | None = None) -> str:
    """Resolve ``agent_id`` to its registered ``Passport.name``.

    Reads the in-memory passport directory.
    Returns ``default`` when the id is unknown (or ``agent_id`` itself
    if ``default`` is ``None``), so callers can use this as a safe
    ``NameResolver`` for view projection without needing to handle
    the unregistered / unregistered-mid-turn case.
    """
    passport = self._passports.get(agent_id)
    if passport is not None:
        return passport.name
    return default if default is not None else agent_id

get_agent async #

get_agent(name_or_id)
Source code in autogen/beta/network/hub/core.py
async def get_agent(self, name_or_id: str) -> Passport:
    agent_id = self._name_to_id.get(name_or_id, name_or_id)
    passport = self._passports.get(agent_id)
    if passport is None:
        raise NotFoundError(f"agent not found: {name_or_id}")
    return passport

get_resume async #

get_resume(agent_id)
Source code in autogen/beta/network/hub/core.py
async def get_resume(self, agent_id: str) -> Resume:
    resume = self._resumes.get(agent_id)
    if resume is None:
        raise NotFoundError(f"resume not found: {agent_id}")
    return resume

find_agent_id #

find_agent_id(name)

Resolve name to its registered agent_id, or None.

Non-raising peer to :meth:get_agent — callers that need to branch on "is this name registered?" without catching an exception use this directly. Returns None when name has no current registration.

Source code in autogen/beta/network/hub/core.py
def find_agent_id(self, name: str) -> str | None:
    """Resolve ``name`` to its registered ``agent_id``, or ``None``.

    Non-raising peer to :meth:`get_agent` — callers that need to
    branch on "is this name registered?" without catching an
    exception use this directly. Returns ``None`` when ``name``
    has no current registration.
    """
    return self._name_to_id.get(name)

name_to_id_map #

name_to_id_map()

Snapshot of the name → agent_id directory.

Public read surface so callers that need reverse name resolution (e.g. WorkflowAdapter resolving handoff target names to ids) don't reach into the private index. Returns a copy so mutation can't corrupt the registry.

Source code in autogen/beta/network/hub/core.py
def name_to_id_map(self) -> dict[str, str]:
    """Snapshot of the ``name → agent_id`` directory.

    Public read surface so callers that need reverse name
    resolution (e.g. ``WorkflowAdapter`` resolving handoff target
    names to ids) don't reach into the private index. Returns a
    copy so mutation can't corrupt the registry.
    """
    return dict(self._name_to_id)

get_rule async #

get_rule(agent_id)

Return the rule attached to agent_id.

Raises :class:NotFoundError if no rule is registered — the registration path stamps a default :class:Rule for every agent, so a missing entry indicates the agent itself is unregistered.

Source code in autogen/beta/network/hub/core.py
async def get_rule(self, agent_id: str) -> Rule:
    """Return the rule attached to ``agent_id``.

    Raises :class:`NotFoundError` if no rule is registered — the
    registration path stamps a default :class:`Rule` for every
    agent, so a missing entry indicates the agent itself is
    unregistered.
    """
    rule = self._rules.get(agent_id)
    if rule is None:
        raise NotFoundError(f"rule not found: {agent_id}")
    return rule

get_skill async #

get_skill(agent_id)
Source code in autogen/beta/network/hub/core.py
async def get_skill(self, agent_id: str) -> str | None:
    if agent_id in self._skills:
        return self._skills[agent_id]
    if agent_id not in self._passports:
        return None
    body = await self._store.read(skill_path(agent_id))
    if body is not None:
        self._skills[agent_id] = body
    return body

list_agents async #

list_agents(*, capability=None, query=None, kind=None, sort_by=None, limit=50)

Enumerate registered participants, optionally filtered.

kind filters by Passport.kind ("agent" / "human" / "remote_agent"). None returns all kinds (current default behavior); passing "agent" also matches passports with kind=None since None is the back-compat alias.

Source code in autogen/beta/network/hub/core.py
async def list_agents(
    self,
    *,
    capability: str | None = None,
    query: str | None = None,
    kind: str | None = None,
    sort_by: str | None = None,
    limit: int = 50,
) -> list[Passport]:
    """Enumerate registered participants, optionally filtered.

    ``kind`` filters by ``Passport.kind`` (``"agent"`` / ``"human"`` /
    ``"remote_agent"``). ``None`` returns all kinds (current default
    behavior); passing ``"agent"`` also matches passports with
    ``kind=None`` since ``None`` is the back-compat alias.
    """
    results: list[Passport] = []
    query_lower = query.lower() if query else None
    for agent_id, passport in self._passports.items():
        if kind is not None:
            resolved = passport.kind or "agent"
            if resolved != kind:
                continue
        if capability is not None:
            resume = self._resumes.get(agent_id)
            if resume is None:
                continue
            claimed = set(resume.claimed_capabilities)
            observed = set(resume.observed.keys())
            if capability not in claimed and capability not in observed:
                continue
        if query_lower is not None:
            resume = self._resumes.get(agent_id)
            summary = resume.summary.lower() if resume else ""
            if query_lower not in summary:
                continue
        results.append(passport)

    if sort_by == "name":
        results.sort(key=lambda p: p.name)

    return results[:limit]

set_resume async #

set_resume(agent_id, resume)
Source code in autogen/beta/network/hub/core.py
async def set_resume(self, agent_id: str, resume: Resume) -> None:
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")
    resume.last_updated = self._clock()
    resume.version = (self._resumes[agent_id].version + 1) if agent_id in self._resumes else resume.version

    # Diff capabilities so the index stays in sync. Without this,
    # a tenant adding a new claim via ``set_resume`` would not
    # surface under ``peers(action="find", capability=...)`` until
    # the agent re-registered or recorded an observation.
    old_resume = self._resumes.get(agent_id)
    old_caps: set[str] = set()
    if old_resume is not None:
        old_caps.update(old_resume.claimed_capabilities)
        old_caps.update(old_resume.observed.keys())
    new_caps: set[str] = set(resume.claimed_capabilities) | set(resume.observed.keys())

    await self._persist_resume(agent_id, resume)
    self._resumes[agent_id] = resume

    added = new_caps - old_caps
    removed = old_caps - new_caps
    for cap in added:
        self._capability_index.setdefault(cap, set()).add(agent_id)
    for cap in removed:
        bucket = self._capability_index.get(cap)
        if bucket is None:
            continue
        bucket.discard(agent_id)
        if not bucket:
            self._capability_index.pop(cap, None)
    if added or removed:
        await self._persist_capability_index()

    await self._fan_out(
        "on_agent_event",
        agent_id,
        "resume_set",
        {"resume": resume, "version": resume.version, "at": self._clock()},
    )

set_skill async #

set_skill(agent_id, skill_md)
Source code in autogen/beta/network/hub/core.py
async def set_skill(self, agent_id: str, skill_md: str | None) -> None:
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")
    if skill_md is None:
        await self._store.delete(skill_path(agent_id))
        self._skills.pop(agent_id, None)
    else:
        await self._persist_skill(agent_id, skill_md)
        self._skills[agent_id] = skill_md
    await self._fan_out(
        "on_agent_event",
        agent_id,
        "skill_set",
        {"removed": skill_md is None, "at": self._clock()},
    )

set_rule async #

set_rule(agent_id, rule)
Source code in autogen/beta/network/hub/core.py
async def set_rule(self, agent_id: str, rule: Rule) -> None:
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")
    rule.version = (self._rules[agent_id].version + 1) if agent_id in self._rules else rule.version
    await self._persist_rule(agent_id, rule)
    self._rules[agent_id] = rule
    await self._fan_out(
        "on_agent_event",
        agent_id,
        "rule_set",
        {"rule": rule, "version": rule.version, "at": self._clock()},
    )

record_observation async #

record_observation(*, owner_id, capability, outcome, latency_ms=None, task_id=None)

Update Resume.observed[capability] from a terminal task event.

Called by TaskMirror when an owner's task ends with a capability tag set on its TaskSpec. Updates the capability index so the agent appears under that capability even if it wasn't in their original claimed_capabilities.

Outcome must be one of the terminal task states (COMPLETED / FAILED / EXPIRED); other states are ignored. latency_ms, when provided, replaces the prior p50_latency_ms (single-sample stand-in for a future reservoir).

task_id (when provided) is used to dedup: a single task contributing twice to Resume.observed.n (e.g. cascade EXPIRED + owner-emitted COMPLETED) is recorded only once.

Source code in autogen/beta/network/hub/core.py
async def record_observation(
    self,
    *,
    owner_id: str,
    capability: str,
    outcome: TaskState,
    latency_ms: int | None = None,
    task_id: str | None = None,
) -> None:
    """Update ``Resume.observed[capability]`` from a terminal task event.

    Called by ``TaskMirror`` when an owner's task ends with a
    ``capability`` tag set on its ``TaskSpec``. Updates the
    capability index so the agent appears under that capability
    even if it wasn't in their original ``claimed_capabilities``.

    Outcome must be one of the terminal task states
    (``COMPLETED`` / ``FAILED`` / ``EXPIRED``); other states are
    ignored. ``latency_ms``, when provided, replaces the prior
    ``p50_latency_ms`` (single-sample stand-in for a future
    reservoir).

    ``task_id`` (when provided) is used to dedup: a single task
    contributing twice to ``Resume.observed.n`` (e.g. cascade
    EXPIRED + owner-emitted COMPLETED) is recorded only once.
    """
    if outcome not in TERMINAL_TASK_STATES:
        return
    if task_id is not None and task_id in self._observed_task_ids:
        return
    resume = self._resumes.get(owner_id)
    if resume is None:
        return
    stat = resume.observed.get(capability) or ObservedStat()
    stat.n += 1
    if outcome == TaskState.COMPLETED:
        stat.completed += 1
    elif outcome == TaskState.FAILED:
        stat.failed += 1
    elif outcome == TaskState.EXPIRED:
        stat.expired += 1
    if latency_ms is not None:
        stat.p50_latency_ms = latency_ms
    resume.observed[capability] = stat
    resume.last_updated = self._clock()
    resume.version += 1
    await self._persist_resume(owner_id, resume)

    bucket = self._capability_index.setdefault(capability, set())
    if owner_id not in bucket:
        bucket.add(owner_id)
        await self._persist_capability_index()

    if task_id is not None:
        self._observed_task_ids.add(task_id)

    await self._fan_out(
        "on_agent_event",
        owner_id,
        "observation_recorded",
        {
            "resume": resume,
            "version": resume.version,
            "capability": capability,
            "outcome": outcome.value,
            "at": self._clock(),
        },
    )

agents_with_capability #

agents_with_capability(capability)

Return agent_ids matching capability (claimed or observed).

Source code in autogen/beta/network/hub/core.py
def agents_with_capability(self, capability: str) -> list[str]:
    """Return agent_ids matching ``capability`` (claimed or observed)."""
    return sorted(self._capability_index.get(capability, set()))

create_channel async #

create_channel(*, creator_id, manifest_type, manifest_version=1, participants, required_acks=None, ttl=None, knobs=None, intent=None, labels=None)

Allocate channel_id, post invites, await acks, return metadata.

Posts EV_CHANNEL_INVITE to every invitee, awaits an EV_CHANNEL_INVITE_ACK from each (the handshake is all-or-nothing — any reject fails creation), transitions to ACTIVE, and broadcasts EV_CHANNEL_OPENED. Times out after invite_ack_timeout if the acks do not arrive.

Source code in autogen/beta/network/hub/core.py
async def create_channel(
    self,
    *,
    creator_id: str,
    manifest_type: str,
    manifest_version: int = 1,
    participants: list[str],
    required_acks: int | None = None,
    ttl: str | int | None = None,
    knobs: dict[str, object] | None = None,
    intent: str | None = None,
    labels: dict[str, str] | None = None,
) -> ChannelMetadata:
    """Allocate ``channel_id``, post invites, await acks, return metadata.

    Posts ``EV_CHANNEL_INVITE`` to every invitee, awaits an
    ``EV_CHANNEL_INVITE_ACK`` from each (the handshake is
    all-or-nothing — any reject fails creation), transitions to
    ``ACTIVE``, and broadcasts ``EV_CHANNEL_OPENED``. Times out
    after ``invite_ack_timeout`` if the acks do not arrive.
    """
    if creator_id not in self._passports:
        raise NotFoundError(f"creator not registered: {creator_id}")
    if not participants:
        raise ProtocolError("channel requires at least one participant")
    seen: set[str] = set()
    for p_id in participants:
        if p_id in seen:
            raise ProtocolError(f"participant listed twice: {p_id!r}")
        seen.add(p_id)
        if p_id != creator_id and p_id not in self._passports:
            raise NotFoundError(f"participant not registered: {p_id}")
    if creator_id not in participants:
        participants = [creator_id, *participants]

    adapter = self._adapter_for(manifest_type, manifest_version)

    creator_passport = self._passports[creator_id]
    creator_rule = self._rules.get(creator_id, Rule())

    # ── Authorize channel open (arbiter) ────────────────────────────
    # Pre-flight invitee inbound-access check + creator concurrency
    # cap. The dispatch path silently filters envelopes whose
    # sender is not in the recipient's whitelist; without a
    # pre-check, an invite to a blocking recipient would be
    # dropped and the creator would hang on the ack waiter.
    invitee_passports: list[Passport] = []
    invitee_rules: list[Rule] = []
    for p_id in participants:
        if p_id == creator_id:
            continue
        invitee_passports.append(self._passports[p_id])
        invitee_rules.append(self._rules.get(p_id, Rule()))
    active_creator_channels = sum(1 for m in self._active_channels.values() if m.creator_id == creator_id)
    decision = await self._arbiter.authorize_channel_open(
        adapter.manifest,
        creator_passport,
        creator_rule,
        invitee_passports,
        invitee_rules,
        active_creator_channels,
    )
    if isinstance(decision, Deny):
        raise decision.error(decision.reason)

    channel_id = make_id()
    now = self._clock()

    ttl_value: str | int = ttl if ttl is not None else creator_rule.limits.channel_ttl_default
    ttl_seconds = parse_duration(ttl_value)
    expires_at = _expires_at(now, ttl_seconds) or None

    metadata_participants: list[Participant] = []
    for index, p_id in enumerate(participants):
        if p_id == creator_id:
            role = ParticipantRole.INITIATOR
        elif len(participants) == 2:
            role = ParticipantRole.RESPONDENT
        else:
            role = ParticipantRole.PARTICIPANT
        metadata_participants.append(Participant(agent_id=p_id, role=role, order=index, joined_at=now))

    final_labels: dict[str, str] = dict(labels) if labels else {}
    if intent:
        final_labels["intent"] = intent

    invitees = [p_id for p_id in participants if p_id != creator_id]

    metadata = ChannelMetadata(
        channel_id=channel_id,
        manifest=adapter.manifest,
        creator_id=creator_id,
        participants=metadata_participants,
        state=ChannelState.PENDING,
        created_at=now,
        expires_at=expires_at,
        knobs=dict(knobs) if knobs else {},
        labels=final_labels,
        required_acks=required_acks,
        pending_acks=list(invitees),
    )

    adapter.validate_create(metadata)

    # Activate caches before persistence so the post_envelope path
    # finds the metadata when the invite is dispatched.
    self._channels[channel_id] = metadata
    self._active_channels[channel_id] = metadata
    self._adapter_states[channel_id] = adapter.initial_state(metadata)

    await self._persist_channel_metadata(metadata)
    logger.info(
        "channel created: id=%s type=%s creator=%s participants=%d",
        channel_id,
        manifest_type,
        creator_id,
        len(metadata_participants),
    )
    await self._fan_out(
        "on_channel_event",
        channel_id,
        "created",
        {
            "metadata": metadata,
            "participants": [p.agent_id for p in metadata_participants],
            "at": now,
        },
    )

    if not invitees:
        # Self-only channel — already complete; transition to ACTIVE.
        await self._activate_channel(channel_id)
        return metadata

    waiter: asyncio.Future[ChannelMetadata] = asyncio.get_event_loop().create_future()
    self._channel_open_waiters[channel_id] = waiter

    # Post invites — each goes to one invitee via post_envelope.
    invite_data: dict[str, object] = {
        "channel_id": channel_id,
        "manifest_type": manifest_type,
        "manifest_version": manifest_version,
        "creator_id": creator_id,
        "knobs": metadata.knobs,
        "labels": metadata.labels,
    }
    try:
        for invitee_id in invitees:
            envelope = Envelope(
                channel_id=channel_id,
                sender_id=creator_id,
                audience=[invitee_id],
                event_type=EV_CHANNEL_INVITE,
                event_data=invite_data,
            )
            await self.post_envelope(envelope)
    except Exception:
        self._channel_open_waiters.pop(channel_id, None)
        await self._transition_channel(channel_id, ChannelState.CLOSED, "invite_failed")
        raise

    try:
        return await asyncio.wait_for(waiter, timeout=self._invite_ack_timeout)
    except asyncio.TimeoutError as exc:
        await self._transition_channel(channel_id, ChannelState.CLOSED, "invite_timeout")
        raise ProtocolError(f"channel {channel_id!r} ack timeout") from exc
    finally:
        self._channel_open_waiters.pop(channel_id, None)

close_channel async #

close_channel(channel_id, *, reason='')
Source code in autogen/beta/network/hub/core.py
async def close_channel(self, channel_id: str, *, reason: str = "") -> ChannelMetadata:
    await self._transition_channel(channel_id, ChannelState.CLOSED, reason or "explicit_close")
    return self._channels[channel_id]

get_channel async #

get_channel(channel_id)
Source code in autogen/beta/network/hub/core.py
async def get_channel(self, channel_id: str) -> ChannelMetadata:
    metadata = self._channels.get(channel_id)
    if metadata is None:
        raise NotFoundError(f"channel not found: {channel_id}")
    return metadata

can_send #

can_send(channel_id, sender_id, *, event_type=None)

True if the adapter would accept a substantive send from sender_id against the current state.

Wraps adapter.validate_send with a probe envelope so the default notify handler doesn't need to reach into private hub state to figure out whether it's the agent's turn.

Source code in autogen/beta/network/hub/core.py
def can_send(
    self,
    channel_id: str,
    sender_id: str,
    *,
    event_type: str | None = None,
) -> bool:
    """True if the adapter would accept a substantive send from
    ``sender_id`` against the current state.

    Wraps ``adapter.validate_send`` with a probe envelope so the
    default notify handler doesn't need to reach into private hub
    state to figure out whether it's the agent's turn.
    """
    metadata = self._channels.get(channel_id)
    if metadata is None or metadata.is_terminal():
        return False
    state = self._adapter_states.get(channel_id)
    if state is None:
        return False
    adapter = self._adapter_for(metadata.manifest.type, metadata.manifest.version)
    probe = Envelope(
        channel_id=channel_id,
        sender_id=sender_id,
        audience=None,
        event_type=event_type or EV_TEXT,
        event_data={"text": ""},
    )
    try:
        adapter.validate_send(metadata, probe, state)
    except Exception:
        return False
    return True

default_view_policy #

default_view_policy(channel_id, participant_id)

Return the adapter-declared default view policy for this participant on this channel. Wraps adapter.default_view_policy so callers don't need adapter registry access.

Source code in autogen/beta/network/hub/core.py
def default_view_policy(
    self,
    channel_id: str,
    participant_id: str,
) -> "ViewPolicy":
    """Return the adapter-declared default view policy for this
    participant on this channel. Wraps
    ``adapter.default_view_policy`` so callers don't need adapter
    registry access."""
    metadata = self._channels.get(channel_id)
    if metadata is None:
        raise NotFoundError(f"channel not found: {channel_id}")
    adapter = self._adapter_for(metadata.manifest.type, metadata.manifest.version)
    return adapter.default_view_policy(metadata, participant_id)

adapter_for #

adapter_for(channel_id)

Return the adapter resolved from channel_id's manifest.

Public surface so callers (notably the default notify handler) don't need to reach into _adapter_for(type, version) or the _channels map directly.

Source code in autogen/beta/network/hub/core.py
def adapter_for(self, channel_id: str) -> ChannelAdapter:
    """Return the adapter resolved from ``channel_id``'s manifest.

    Public surface so callers (notably the default notify handler)
    don't need to reach into ``_adapter_for(type, version)`` or the
    ``_channels`` map directly.
    """
    metadata = self._channels.get(channel_id)
    if metadata is None:
        raise NotFoundError(f"channel not found: {channel_id}")
    return self._adapter_for(metadata.manifest.type, metadata.manifest.version)

adapter_state #

adapter_state(channel_id)

Return channel_id's current folded adapter state, or None if the channel has none cached.

Public surface so callers don't need to reach into _adapter_states.

Source code in autogen/beta/network/hub/core.py
def adapter_state(self, channel_id: str) -> object | None:
    """Return ``channel_id``'s current folded adapter state, or
    ``None`` if the channel has none cached.

    Public surface so callers don't need to reach into
    ``_adapter_states``.
    """
    return self._adapter_states.get(channel_id)

list_channels async #

list_channels(*, agent_id=None, state=None, limit=50)
Source code in autogen/beta/network/hub/core.py
async def list_channels(
    self,
    *,
    agent_id: str | None = None,
    state: ChannelState | None = None,
    limit: int = 50,
) -> list[ChannelMetadata]:
    results: list[ChannelMetadata] = []
    for metadata in self._channels.values():
        if state is not None and metadata.state != state:
            continue
        if agent_id is not None and agent_id not in metadata.participant_ids():
            continue
        results.append(metadata)
    return results[:limit]

read_wal async #

read_wal(channel_id, *, since=0, until=None)
Source code in autogen/beta/network/hub/core.py
async def read_wal(
    self,
    channel_id: str,
    *,
    since: int = 0,
    until: int | None = None,
) -> list[Envelope]:
    body = await self._store.read(wal_path(channel_id))
    if not body:
        return []
    envelopes: list[Envelope] = []
    for line in body.splitlines():
        line = line.strip()
        if not line:
            continue
        envelopes.append(Envelope.from_json(line))
    end = len(envelopes) if until is None else until
    return envelopes[since:end]

observe_task async #

observe_task(metadata)

Register a task observed via the agent's stream.

Hub does not create, assign, or cancel — it stores TaskMetadata, persists it, and starts TTL accounting.

On first observation, enforces the owner's Rule.limits.max_concurrent_tasks cap (0 disables).

Source code in autogen/beta/network/hub/core.py
async def observe_task(self, metadata: TaskMetadata) -> None:
    """Register a task observed via the agent's stream.

    Hub does not create, assign, or cancel — it stores
    ``TaskMetadata``, persists it, and starts TTL accounting.

    On first observation, enforces the owner's
    ``Rule.limits.max_concurrent_tasks`` cap (``0`` disables).
    """
    if metadata.task_id in self._tasks:
        # Update in place — owner re-emitting TaskStarted on retry, etc.
        existing = self._tasks[metadata.task_id]
        existing.state = metadata.state
        existing.started_at = metadata.started_at or existing.started_at
        existing.expires_at = metadata.expires_at or existing.expires_at
        existing.channel_id = metadata.channel_id or existing.channel_id
        existing.progress.update(metadata.progress)
        await self._persist_task_metadata(existing)
        return

    owner_rule = self._rules.get(metadata.owner_id, Rule())
    max_tasks = owner_rule.limits.max_concurrent_tasks
    if max_tasks > 0:
        active = sum(
            1
            for t in self._tasks.values()
            if t.owner_id == metadata.owner_id and t.state not in TERMINAL_TASK_STATES
        )
        if active >= max_tasks:
            raise AccessDeniedError(
                f"owner {metadata.owner_id!r} exceeded max_concurrent_tasks ({active} >= {max_tasks})"
            )

    self._tasks[metadata.task_id] = metadata
    if metadata.channel_id:
        self._channel_tasks.setdefault(metadata.channel_id, set()).add(metadata.task_id)
    await self._persist_task_metadata(metadata)

get_task async #

get_task(task_id)
Source code in autogen/beta/network/hub/core.py
async def get_task(self, task_id: str) -> TaskMetadata:
    metadata = self._tasks.get(task_id)
    if metadata is None:
        raise NotFoundError(f"task not found: {task_id}")
    return metadata

update_task async #

update_task(task_id, *, state=None, progress=None, result=None, error=None)

Update an observed task's lifecycle. Used by task_mirror.

Terminal-state transitions stamp completed_at. Idempotent — terminal-on-terminal is a no-op (further events ignored).

Source code in autogen/beta/network/hub/core.py
async def update_task(
    self,
    task_id: str,
    *,
    state: TaskState | None = None,
    progress: dict[str, object] | None = None,
    result: object | None = None,
    error: str | None = None,
) -> None:
    """Update an observed task's lifecycle. Used by ``task_mirror``.

    Terminal-state transitions stamp ``completed_at``. Idempotent —
    terminal-on-terminal is a no-op (further events ignored).
    """
    metadata = self._tasks.get(task_id)
    if metadata is None:
        raise NotFoundError(f"task not found: {task_id}")
    if metadata.state in TERMINAL_TASK_STATES:
        return
    if progress:
        metadata.progress.update(progress)
        metadata.last_progress_at = self._clock()
    if result is not None:
        metadata.result = result
    if error:
        metadata.error = error
    if state is not None:
        metadata.state = state
        if state in TERMINAL_TASK_STATES:
            metadata.completed_at = self._clock()
    await self._persist_task_metadata(metadata)

list_tasks async #

list_tasks(*, agent_id=None, channel_id=None, state=None, limit=50)
Source code in autogen/beta/network/hub/core.py
async def list_tasks(
    self,
    *,
    agent_id: str | None = None,
    channel_id: str | None = None,
    state: TaskState | None = None,
    limit: int = 50,
) -> list[TaskMetadata]:
    results: list[TaskMetadata] = []
    for metadata in self._tasks.values():
        if agent_id is not None and metadata.owner_id != agent_id:
            continue
        if channel_id is not None and metadata.channel_id != channel_id:
            continue
        if state is not None and metadata.state != state:
            continue
        results.append(metadata)
    return results[:limit]

checkpoint_task async #

checkpoint_task(task_id, state)

Persist an owner-supplied resume snapshot for task_id.

Writes a single JSON blob at tasks/{task_id}/checkpoint.json; last-write-wins, no history. The framework treats the payload as opaque — owners pick what to store and how to interpret it on resume. Pairs with :meth:read_task_checkpoint for the read side; the canonical entry point is the HubBackedCheckpointStore on the client.

Source code in autogen/beta/network/hub/core.py
async def checkpoint_task(self, task_id: str, state: dict[str, object]) -> None:
    """Persist an owner-supplied resume snapshot for ``task_id``.

    Writes a single JSON blob at ``tasks/{task_id}/checkpoint.json``;
    last-write-wins, no history. The framework treats the payload as
    opaque — owners pick what to store and how to interpret it on
    resume. Pairs with :meth:`read_task_checkpoint` for the read
    side; the canonical entry point is the ``HubBackedCheckpointStore``
    on the client.
    """
    await self._store.write(task_checkpoint_path(task_id), json.dumps(state))

read_task_checkpoint async #

read_task_checkpoint(task_id)

Read the resume snapshot for task_id, or None if absent.

Returned dict is the value the owner most recently passed to :meth:checkpoint_task. Malformed JSON on disk surfaces as an exception — the framework does not silently swallow corruption.

Source code in autogen/beta/network/hub/core.py
async def read_task_checkpoint(self, task_id: str) -> dict[str, object] | None:
    """Read the resume snapshot for ``task_id``, or ``None`` if absent.

    Returned dict is the value the owner most recently passed to
    :meth:`checkpoint_task`. Malformed JSON on disk surfaces as an
    exception — the framework does not silently swallow corruption.
    """
    raw = await self._store.read(task_checkpoint_path(task_id))
    if raw is None:
        return None
    return json.loads(raw)

expire_due async #

expire_due()

Walk active channels and tasks; expire ones past their TTL.

Cascades non-terminal tasks under closing channels (via :meth:_transition_channel).

Source code in autogen/beta/network/hub/core.py
async def expire_due(self) -> None:
    """Walk active channels and tasks; expire ones past their TTL.

    Cascades non-terminal tasks under closing channels (via
    :meth:`_transition_channel`).
    """
    now = self._clock()

    expired_channels: list[str] = []
    for channel_id, metadata in list(self._active_channels.items()):
        if metadata.expires_at and metadata.expires_at <= now:
            expired_channels.append(channel_id)
    for channel_id in expired_channels:
        await self._transition_channel(channel_id, ChannelState.EXPIRED, "ttl_expired")

    # Expire standalone tasks (those not under an expiring channel).
    expired_tasks: list[str] = []
    for task_id, metadata in list(self._tasks.items()):
        if metadata.state in TERMINAL_TASK_STATES:
            continue
        if metadata.expires_at and metadata.expires_at <= now:
            expired_tasks.append(task_id)
    for task_id in expired_tasks:
        await self._transition_task(task_id, TaskState.EXPIRED, "ttl_expired")

post_envelope async #

post_envelope(envelope)

Validate sender + adapter + WAL append + dispatch.

Per-channel lock makes validate_send / fold / on_accepted see a consistent state. Dispatch and post-accept transitions happen outside the lock so the broadcast of EV_CHANNEL_CLOSED does not deadlock on the same lock.

Access / limits decisions go through :attr:arbiter so federation / custom permission protocols can replace the default rule-based behavior without forking the hub. Hub fires :meth:HubListener.on_envelope_posted (success) or :meth:on_envelope_rejected (any pre-WAL failure) for every attempt.

Source code in autogen/beta/network/hub/core.py
async def post_envelope(self, envelope: Envelope) -> str:
    """Validate sender + adapter + WAL append + dispatch.

    Per-channel lock makes ``validate_send`` / ``fold`` /
    ``on_accepted`` see a consistent state. Dispatch and post-accept
    transitions happen outside the lock so the broadcast of
    ``EV_CHANNEL_CLOSED`` does not deadlock on the same lock.

    Access / limits decisions go through :attr:`arbiter` so
    federation / custom permission protocols can replace the default
    rule-based behavior without forking the hub. Hub fires
    :meth:`HubListener.on_envelope_posted` (success) or
    :meth:`on_envelope_rejected` (any pre-WAL failure) for every
    attempt.
    """
    try:
        envelope_id = await self._post_envelope_impl(envelope)
    except NetworkError as exc:
        await self._fan_out("on_envelope_rejected", envelope, exc)
        logger.warning(
            "post_envelope rejected: channel=%s sender=%s event=%s reason=%s",
            envelope.channel_id,
            envelope.sender_id,
            envelope.event_type,
            exc,
        )
        raise
    return envelope_id

attach_endpoint #

attach_endpoint(endpoint)
Source code in autogen/beta/network/hub/core.py
def attach_endpoint(self, endpoint: LinkEndpoint) -> None:
    if self._closed:
        return
    self._endpoints_by_id[endpoint.endpoint_id] = endpoint
    task = asyncio.create_task(self._handle_endpoint(endpoint))
    self._endpoint_tasks.add(task)
    task.add_done_callback(self._endpoint_tasks.discard)

bind_endpoint #

bind_endpoint(endpoint_id, agent_id)
Source code in autogen/beta/network/hub/core.py
def bind_endpoint(self, endpoint_id: str, agent_id: str) -> None:
    if endpoint_id not in self._endpoints_by_id:
        raise NotFoundError(f"endpoint not attached: {endpoint_id}")
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")
    # If the agent already has an endpoint binding (reconnect from a
    # different connection), evict the prior mapping before stamping
    # the new one. The prior endpoint stays alive — other agents
    # bound to it keep working — but envelopes addressed to this
    # agent now route through the new endpoint.
    prior_endpoint_id = self._agent_to_endpoint.get(agent_id)
    if prior_endpoint_id is not None and prior_endpoint_id != endpoint_id:
        prior_bound = self._endpoint_to_agents.get(prior_endpoint_id)
        if prior_bound is not None:
            prior_bound.discard(agent_id)
            if not prior_bound:
                self._endpoint_to_agents.pop(prior_endpoint_id, None)
        prior_endpoint = self._endpoints_by_id.get(prior_endpoint_id)
        if prior_endpoint is not None and prior_endpoint.agent_id == agent_id:
            prior_endpoint.agent_id = None
    self._endpoints_by_id[endpoint_id].agent_id = agent_id
    self._agent_to_endpoint[agent_id] = endpoint_id
    self._endpoint_to_agents.setdefault(endpoint_id, set()).add(agent_id)

pending_turns_for async #

pending_turns_for(agent_id)

Return turns in active channels where the protocol expects this agent.

Walks every active channel the agent participates in, asks the registered adapter via :meth:ChannelAdapter.expected_next, and returns a :class:PendingTurn per channel where the agent is named. Channels with no specific expected speaker (free-form conversations) or where another participant is expected are skipped. The triggering envelope's created_at is read from the WAL; if the trigger envelope cannot be located the current hub clock is used as a fallback.

Source code in autogen/beta/network/hub/core.py
async def pending_turns_for(self, agent_id: str) -> "list[PendingTurn]":
    """Return turns in active channels where the protocol expects this agent.

    Walks every active channel the agent participates in, asks the
    registered adapter via :meth:`ChannelAdapter.expected_next`,
    and returns a :class:`PendingTurn` per channel where the agent
    is named. Channels with no specific expected speaker (free-form
    conversations) or where another participant is expected are
    skipped. The triggering envelope's ``created_at`` is read from
    the WAL; if the trigger envelope cannot be located the current
    hub clock is used as a fallback.
    """
    if agent_id not in self._passports:
        return []
    results: list[PendingTurn] = []
    for channel_id, metadata in self._active_channels.items():
        if not any(p.agent_id == agent_id for p in metadata.participants):
            continue
        adapter = self._adapters.get((metadata.manifest.type, metadata.manifest.version))
        state = self._adapter_states.get(channel_id)
        if adapter is None or state is None:
            continue
        expected = adapter.expected_next(metadata, state)
        if expected is None or expected.agent_id != agent_id:
            continue
        expected_at = ""
        if expected.triggering_envelope_id:
            wal = await self.read_wal(channel_id)
            for env in wal:
                if env.envelope_id == expected.triggering_envelope_id:
                    expected_at = env.created_at or ""
                    break
        results.append(
            PendingTurn(
                channel_id=channel_id,
                triggering_envelope_id=expected.triggering_envelope_id,
                expected_at=expected_at or self._clock(),
            )
        )
    return results

inbox_cursor #

inbox_cursor(agent_id, channel_id)

Last envelope_id agent_id has acked in channel_id.

Empty string when the agent has acked nothing in that channel (or is unknown). Read-only view of the delivery high-water mark a reconnect would replay past.

Source code in autogen/beta/network/hub/core.py
def inbox_cursor(self, agent_id: str, channel_id: str) -> str:
    """Last envelope_id ``agent_id`` has acked in ``channel_id``.

    Empty string when the agent has acked nothing in that channel
    (or is unknown). Read-only view of the delivery high-water mark
    a reconnect would replay past."""
    return self._inbox_cursors.get(agent_id, {}).get(channel_id, "")

find_envelope_by_causation async #

find_envelope_by_causation(channel_id, *, sender_id, causation_id)

Look up an envelope previously accepted under this causation key.

Handlers use this to short-circuit duplicate work after an at-least-once redelivery: when the same sender re-posts an envelope with the same causation_id (typical on retry), the prior accepted envelope is returned so the handler can skip the side effect. Returns None if no envelope is recorded for the key — either it was never accepted or its channel has already closed (terminal-channel pruning clears the index).

Source code in autogen/beta/network/hub/core.py
async def find_envelope_by_causation(
    self,
    channel_id: str,
    *,
    sender_id: str,
    causation_id: str,
) -> Envelope | None:
    """Look up an envelope previously accepted under this causation key.

    Handlers use this to short-circuit duplicate work after an
    at-least-once redelivery: when the same sender re-posts an
    envelope with the same ``causation_id`` (typical on retry),
    the prior accepted envelope is returned so the handler can
    skip the side effect. Returns ``None`` if no envelope is
    recorded for the key — either it was never accepted or its
    channel has already closed (terminal-channel pruning clears
    the index).
    """
    envelope_id = self._causation_index.get((channel_id, sender_id, causation_id))
    if envelope_id is None:
        return None
    wal = await self.read_wal(channel_id)
    for envelope in wal:
        if envelope.envelope_id == envelope_id:
            return envelope
    return None