Skip to content

Hub

autogen.beta.network.hub.core.Hub #

Hub(store, *, auth=None, clock=None, ttl_sweep_interval=30.0, expectation_sweep_interval=10.0, invite_ack_timeout=30.0)

In-process registry, dispatcher, session state-machine, persistence root.

Construct with :meth:open for production (hydrates from disk and spawns sweepers); the sync __init__ is for tests that need fine-grained control.

Source code in autogen/beta/network/hub/core.py
def __init__(
    self,
    store: KnowledgeStore,
    *,
    auth: AuthRegistry | None = None,
    clock: Callable[[], str] | None = None,
    ttl_sweep_interval: float = 30.0,
    expectation_sweep_interval: float = 10.0,
    invite_ack_timeout: float = 30.0,
) -> None:
    # __init__ stores params; side effects deferred to start()/hydrate().
    self._store = store
    self._auth = auth if auth is not None else AuthRegistry.default()
    self._clock = clock if clock is not None else _utc_now_iso
    self._ttl_sweep_interval = ttl_sweep_interval
    self._expectation_sweep_interval = expectation_sweep_interval
    self._invite_ack_timeout = invite_ack_timeout

    # Audit log + expectation registries.
    self._audit_log = AuditLog(store)
    self._expectation_evaluators: dict[str, ExpectationEvaluator] = {}
    self._violation_handlers: dict[str, ViolationHandler] = {}
    # session_id → set of (expectation_index, expectation_name, violator_id) fired.
    # The position-based index disambiguates same-name expectations
    # (e.g. two ``turn_within`` entries with different ``on_violation``
    # handlers — without the index the first to fire would suppress
    # the second). Empty violator_id ("") = session-wide violations.
    self._fired_violations: dict[str, set[tuple[int, str, str]]] = {}

    # Identity caches.
    self._passports: dict[str, Passport] = {}
    self._resumes: dict[str, Resume] = {}
    self._rules: dict[str, Rule] = {}
    self._skills: dict[str, str] = {}
    self._name_to_id: dict[str, str] = {}
    # capability name → set of agent_ids that claim or have observed it.
    # Persisted as registry/by_capability.json on every mutation
    # (rebuilt from resumes on hydrate — the file is a derived cache).
    self._capability_index: dict[str, set[str]] = {}

    # Adapter registry.
    self._adapters: dict[tuple[str, int], SessionAdapter] = {}

    # Session caches.
    self._sessions: dict[str, SessionMetadata] = {}
    self._active_sessions: dict[str, SessionMetadata] = {}
    self._adapter_states: dict[str, object] = {}
    self._session_open_waiters: dict[str, asyncio.Future[SessionMetadata]] = {}

    # Task caches (observed; not owned).
    self._tasks: dict[str, TaskMetadata] = {}
    self._session_tasks: dict[str, set[str]] = {}
    # task_ids whose terminal observation has been recorded into
    # the owner's ``Resume.observed`` already. Prevents double-counting
    # when the same task receives multiple terminal events (e.g. a
    # session-cascade EXPIRED followed by an owner-emitted COMPLETED).
    self._observed_task_ids: set[str] = set()

    # Per-recipient outstanding-envelope counter for ``InboxBlock.max_pending``
    # enforcement. Incremented on dispatch to that recipient,
    # decremented when the recipient posts any envelope (treating
    # any outbound activity as "I'm processing my inbox"). A
    # best-effort approximation; per-session ack semantics require
    # a transport with ack frames.
    self._inbox_pending: dict[str, int] = {}

    # Transport-side state.
    self._endpoints_by_id: dict[str, LinkEndpoint] = {}
    self._agent_to_endpoint: dict[str, str] = {}
    self._endpoint_to_agents: dict[str, set[str]] = {}
    self._endpoint_tasks: set[asyncio.Task[None]] = set()

    # Per-session locks for WAL append + dispatch ordering.
    self._session_locks: dict[str, asyncio.Lock] = {}
    self._registration_lock = asyncio.Lock()

    self._ttl_sweeper: _IntervalSweeper | None = None
    self._expectation_sweeper: _IntervalSweeper | None = None
    self._closed = False

open async classmethod #

open(store, *, auth=None, clock=None, ttl_sweep_interval=30.0, expectation_sweep_interval=10.0, invite_ack_timeout=30.0, register_default_adapters=True)

Construct + hydrate from disk + start sweepers. Production entry point.

register_default_adapters=True (default) registers the built-in adapters (consulting@v1, conversation@v1, discussion@v1) and the built-in expectation evaluators / violation handlers (acks_within / reply_within / max_silence, audit / notify_session / auto_close) so simple test setups don't need explicit registration calls.

Set expectation_sweep_interval=0 to disable the expectation sweeper entirely (tests usually do this to avoid background timer noise).

Source code in autogen/beta/network/hub/core.py
@classmethod
async def open(
    cls,
    store: KnowledgeStore,
    *,
    auth: AuthRegistry | None = None,
    clock: Callable[[], str] | None = None,
    ttl_sweep_interval: float = 30.0,
    expectation_sweep_interval: float = 10.0,
    invite_ack_timeout: float = 30.0,
    register_default_adapters: bool = True,
) -> "Hub":
    """Construct + hydrate from disk + start sweepers. Production entry point.

    ``register_default_adapters=True`` (default) registers the
    built-in adapters (``consulting@v1``, ``conversation@v1``,
    ``discussion@v1``) and the built-in expectation evaluators /
    violation handlers (``acks_within`` / ``reply_within`` /
    ``max_silence``, ``audit`` / ``notify_session`` /
    ``auto_close``) so simple test setups don't need explicit
    registration calls.

    Set ``expectation_sweep_interval=0`` to disable the expectation
    sweeper entirely (tests usually do this to avoid background
    timer noise).
    """
    hub = cls(
        store,
        auth=auth,
        clock=clock,
        ttl_sweep_interval=ttl_sweep_interval,
        expectation_sweep_interval=expectation_sweep_interval,
        invite_ack_timeout=invite_ack_timeout,
    )
    if register_default_adapters:
        hub.register_adapter(ConsultingAdapter())
        hub.register_adapter(ConversationAdapter())
        hub.register_adapter(DiscussionAdapter())
        hub.register_adapter(WorkflowAdapter())
        for evaluator in default_evaluators():
            hub.register_expectation_evaluator(evaluator)
        for handler in default_handlers():
            hub.register_violation_handler(handler)
    await hub.hydrate()
    await hub.start()
    return hub

hydrate async #

hydrate()

Walk the store; rebuild caches. Idempotent.

Loads identities, sessions, and tasks from disk. Active session WALs are re-folded through their adapter so the _adapter_states cache is rebuilt deterministically.

Source code in autogen/beta/network/hub/core.py
async def hydrate(self) -> None:
    """Walk the store; rebuild caches. Idempotent.

    Loads identities, sessions, and tasks from disk. Active session
    WALs are re-folded through their adapter so the
    ``_adapter_states`` cache is rebuilt deterministically.
    """
    self._passports.clear()
    self._resumes.clear()
    self._rules.clear()
    self._skills.clear()
    self._name_to_id.clear()
    self._capability_index.clear()
    self._sessions.clear()
    self._active_sessions.clear()
    self._adapter_states.clear()
    self._tasks.clear()
    self._session_tasks.clear()

    # Identities.
    agent_children = await self._store.list(agents_root())
    for child in agent_children:
        if not child.endswith("/"):
            continue
        agent_id = child.rstrip("/")
        await self._load_agent(agent_id)

    # Rebuild capability index from loaded resumes — by_capability.json
    # is a derived cache, the resumes are the authoritative source.
    for agent_id, resume in self._resumes.items():
        for cap in resume.claimed_capabilities:
            self._capability_index.setdefault(cap, set()).add(agent_id)
        for cap in resume.observed:
            self._capability_index.setdefault(cap, set()).add(agent_id)

    # Sessions — load metadata first, then re-fold WALs.
    session_children = await self._store.list(sessions_root())
    for child in session_children:
        if not child.endswith("/"):
            continue
        session_id = child.rstrip("/")
        await self._load_session(session_id)

    # Tasks.
    task_children = await self._store.list(tasks_root())
    for child in task_children:
        if not child.endswith("/"):
            continue
        task_id = child.rstrip("/")
        await self._load_task(task_id)

start async #

start()

Spawn the TTL + expectation sweepers. Idempotent.

ttl_sweep_interval=0 disables the TTL sweeper; expectation_sweep_interval=0 disables the expectation sweeper.

Source code in autogen/beta/network/hub/core.py
async def start(self) -> None:
    """Spawn the TTL + expectation sweepers. Idempotent.

    ``ttl_sweep_interval=0`` disables the TTL sweeper;
    ``expectation_sweep_interval=0`` disables the expectation sweeper.
    """
    if self._ttl_sweep_interval > 0 and self._ttl_sweeper is None:
        self._ttl_sweeper = _IntervalSweeper(
            name="ttl",
            interval=self._ttl_sweep_interval,
            fn=self.expire_due,
        )
        self._ttl_sweeper.start()
    if self._expectation_sweep_interval > 0 and self._expectation_sweeper is None:
        self._expectation_sweeper = _IntervalSweeper(
            name="expectations",
            interval=self._expectation_sweep_interval,
            fn=self._expectation_tick,
        )
        self._expectation_sweeper.start()

close async #

close()

Cancel sweepers + endpoint tasks; drain queues. Idempotent.

Source code in autogen/beta/network/hub/core.py
async def close(self) -> None:
    """Cancel sweepers + endpoint tasks; drain queues. Idempotent."""
    if self._closed:
        return
    self._closed = True
    if self._ttl_sweeper is not None:
        await self._ttl_sweeper.stop()
        self._ttl_sweeper = None
    if self._expectation_sweeper is not None:
        await self._expectation_sweeper.stop()
        self._expectation_sweeper = None
    for task in list(self._endpoint_tasks):
        task.cancel()
    if self._endpoint_tasks:
        await asyncio.gather(*self._endpoint_tasks, return_exceptions=True)
    self._endpoint_tasks.clear()

register_adapter #

register_adapter(adapter)

Register a SessionAdapter keyed by (type, version).

Re-registering at the same key replaces the prior adapter; the old key's existing in-flight sessions keep their snapshotted manifest for life.

Source code in autogen/beta/network/hub/core.py
def register_adapter(self, adapter: SessionAdapter) -> None:
    """Register a ``SessionAdapter`` keyed by ``(type, version)``.

    Re-registering at the same key replaces the prior adapter; the
    old key's existing in-flight sessions keep their snapshotted
    manifest for life.
    """
    key = (adapter.manifest.type, adapter.manifest.version)
    self._adapters[key] = adapter

register_expectation_evaluator #

register_expectation_evaluator(evaluator)

Register an evaluator keyed by evaluator.name.

Re-registering the same name replaces the prior evaluator.

Source code in autogen/beta/network/hub/core.py
def register_expectation_evaluator(self, evaluator: ExpectationEvaluator) -> None:
    """Register an evaluator keyed by ``evaluator.name``.

    Re-registering the same name replaces the prior evaluator.
    """
    self._expectation_evaluators[evaluator.name] = evaluator

register_violation_handler #

register_violation_handler(handler)

Register a violation handler keyed by handler.name.

Re-registering the same name replaces the prior handler.

Source code in autogen/beta/network/hub/core.py
def register_violation_handler(self, handler: ViolationHandler) -> None:
    """Register a violation handler keyed by ``handler.name``.

    Re-registering the same name replaces the prior handler.
    """
    self._violation_handlers[handler.name] = handler

register async #

register(passport, resume, *, skill_md=None, rule=None)
Source code in autogen/beta/network/hub/core.py
async def register(
    self,
    passport: Passport,
    resume: Resume,
    *,
    skill_md: str | None = None,
    rule: Rule | None = None,
) -> Passport:
    adapter = self._auth.get(passport.auth.scheme)
    await adapter.validate(passport, passport.auth.claim)

    async with self._registration_lock:
        # Reject a re-register that collides on ``name``: the prior
        # registration's passport / resume / rule / SKILL.md would
        # be orphaned on disk under a now-unreachable agent_id.
        # Tenants must explicitly ``unregister`` first.
        if passport.name in self._name_to_id:
            raise ProtocolError(
                f"name {passport.name!r} already registered "
                f"(agent_id={self._name_to_id[passport.name]}); "
                "unregister it before re-registering."
            )
        agent_id = make_id()
        passport.agent_id = agent_id
        passport.created_at = self._clock()

        effective_rule = rule if rule is not None else Rule()

        await self._persist_passport(passport)
        await self._persist_resume(agent_id, resume)
        await self._persist_rule(agent_id, effective_rule)
        if skill_md is not None:
            await self._persist_skill(agent_id, skill_md)

        self._passports[agent_id] = passport
        self._resumes[agent_id] = resume
        self._rules[agent_id] = effective_rule
        if skill_md is not None:
            self._skills[agent_id] = skill_md
        self._name_to_id[passport.name] = agent_id

        for cap in resume.claimed_capabilities:
            self._capability_index.setdefault(cap, set()).add(agent_id)
        for cap in resume.observed:
            self._capability_index.setdefault(cap, set()).add(agent_id)

    await self._persist_capability_index()
    await self._audit_log.append({
        "at": self._clock(),
        "kind": AUDIT_KIND_AGENT_REGISTERED,
        "agent_id": agent_id,
        "name": passport.name,
    })
    return passport

unregister async #

unregister(agent_id)
Source code in autogen/beta/network/hub/core.py
async def unregister(self, agent_id: str) -> None:
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")

    async with self._registration_lock:
        passport = self._passports.pop(agent_id, None)
        self._resumes.pop(agent_id, None)
        self._rules.pop(agent_id, None)
        self._skills.pop(agent_id, None)
        if passport is not None and self._name_to_id.get(passport.name) == agent_id:
            self._name_to_id.pop(passport.name, None)

        endpoint_id = self._agent_to_endpoint.pop(agent_id, None)
        if endpoint_id is not None:
            bound = self._endpoint_to_agents.get(endpoint_id)
            if bound is not None:
                bound.discard(agent_id)
                if not bound:
                    self._endpoint_to_agents.pop(endpoint_id, None)

        # Drop the agent from every capability bucket; clean empty
        # buckets so the index stays compact.
        empty_caps: list[str] = []
        for cap, ids in self._capability_index.items():
            ids.discard(agent_id)
            if not ids:
                empty_caps.append(cap)
        for cap in empty_caps:
            self._capability_index.pop(cap, None)

        # Delete on-disk identity files. Without this the next
        # ``hydrate()`` would re-load the unregistered agent from
        # disk. Sessions and tasks the agent participated in are
        # kept for audit / read; only the per-agent identity files
        # are removed.
        await self._store.delete(passport_path(agent_id))
        await self._store.delete(resume_path(agent_id))
        await self._store.delete(rule_path(agent_id))
        await self._store.delete(skill_path(agent_id))

        # Drop inbox accounting so a future re-register with a
        # different agent_id starts from zero.
        self._inbox_pending.pop(agent_id, None)

    await self._persist_capability_index()
    await self._audit_log.append({
        "at": self._clock(),
        "kind": AUDIT_KIND_AGENT_UNREGISTERED,
        "agent_id": agent_id,
        "name": passport.name if passport is not None else None,
    })

get_agent async #

get_agent(name_or_id)
Source code in autogen/beta/network/hub/core.py
async def get_agent(self, name_or_id: str) -> Passport:
    agent_id = self._name_to_id.get(name_or_id, name_or_id)
    passport = self._passports.get(agent_id)
    if passport is None:
        raise NotFoundError(f"agent not found: {name_or_id}")
    return passport

get_resume async #

get_resume(agent_id)
Source code in autogen/beta/network/hub/core.py
async def get_resume(self, agent_id: str) -> Resume:
    resume = self._resumes.get(agent_id)
    if resume is None:
        raise NotFoundError(f"resume not found: {agent_id}")
    return resume

get_skill async #

get_skill(agent_id)
Source code in autogen/beta/network/hub/core.py
async def get_skill(self, agent_id: str) -> str | None:
    if agent_id in self._skills:
        return self._skills[agent_id]
    if agent_id not in self._passports:
        return None
    body = await self._store.read(skill_path(agent_id))
    if body is not None:
        self._skills[agent_id] = body
    return body

list_agents async #

list_agents(*, capability=None, query=None, sort_by=None, limit=50)
Source code in autogen/beta/network/hub/core.py
async def list_agents(
    self,
    *,
    capability: str | None = None,
    query: str | None = None,
    sort_by: str | None = None,
    limit: int = 50,
) -> list[Passport]:
    results: list[Passport] = []
    query_lower = query.lower() if query else None
    for agent_id, passport in self._passports.items():
        if capability is not None:
            resume = self._resumes.get(agent_id)
            if resume is None:
                continue
            claimed = set(resume.claimed_capabilities)
            observed = set(resume.observed.keys())
            if capability not in claimed and capability not in observed:
                continue
        if query_lower is not None:
            resume = self._resumes.get(agent_id)
            summary = resume.summary.lower() if resume else ""
            if query_lower not in summary:
                continue
        results.append(passport)

    if sort_by == "name":
        results.sort(key=lambda p: p.name)

    return results[:limit]

set_resume async #

set_resume(agent_id, resume)
Source code in autogen/beta/network/hub/core.py
async def set_resume(self, agent_id: str, resume: Resume) -> None:
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")
    resume.last_updated = self._clock()
    resume.version = (self._resumes[agent_id].version + 1) if agent_id in self._resumes else resume.version

    # Diff capabilities so the index stays in sync. Without this,
    # a tenant adding a new claim via ``set_resume`` would not
    # surface under ``peers(action="find", capability=...)`` until
    # the agent re-registered or recorded an observation.
    old_resume = self._resumes.get(agent_id)
    old_caps: set[str] = set()
    if old_resume is not None:
        old_caps.update(old_resume.claimed_capabilities)
        old_caps.update(old_resume.observed.keys())
    new_caps: set[str] = set(resume.claimed_capabilities) | set(resume.observed.keys())

    await self._persist_resume(agent_id, resume)
    self._resumes[agent_id] = resume

    added = new_caps - old_caps
    removed = old_caps - new_caps
    for cap in added:
        self._capability_index.setdefault(cap, set()).add(agent_id)
    for cap in removed:
        bucket = self._capability_index.get(cap)
        if bucket is None:
            continue
        bucket.discard(agent_id)
        if not bucket:
            self._capability_index.pop(cap, None)
    if added or removed:
        await self._persist_capability_index()

    await self._audit_log.append({
        "at": self._clock(),
        "kind": AUDIT_KIND_RESUME_SET,
        "source": RESUME_SOURCE_TENANT,
        "agent_id": agent_id,
        "version": resume.version,
    })

set_skill async #

set_skill(agent_id, skill_md)
Source code in autogen/beta/network/hub/core.py
async def set_skill(self, agent_id: str, skill_md: str | None) -> None:
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")
    if skill_md is None:
        await self._store.delete(skill_path(agent_id))
        self._skills.pop(agent_id, None)
    else:
        await self._persist_skill(agent_id, skill_md)
        self._skills[agent_id] = skill_md
    await self._audit_log.append({
        "at": self._clock(),
        "kind": AUDIT_KIND_SKILL_SET,
        "agent_id": agent_id,
        "removed": skill_md is None,
    })

set_rule async #

set_rule(agent_id, rule)
Source code in autogen/beta/network/hub/core.py
async def set_rule(self, agent_id: str, rule: Rule) -> None:
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")
    rule.version = (self._rules[agent_id].version + 1) if agent_id in self._rules else rule.version
    await self._persist_rule(agent_id, rule)
    self._rules[agent_id] = rule
    await self._audit_log.append({
        "at": self._clock(),
        "kind": AUDIT_KIND_RULE_SET,
        "agent_id": agent_id,
        "version": rule.version,
    })

record_observation async #

record_observation(*, owner_id, capability, outcome, latency_ms=None, task_id=None)

Update Resume.observed[capability] from a terminal task event.

Called by TaskMirror when an owner's task ends with a capability tag set on its TaskSpec. Updates the capability index so the agent appears under that capability even if it wasn't in their original claimed_capabilities.

Outcome must be one of the terminal task states (COMPLETED / FAILED / EXPIRED); other states are ignored. latency_ms, when provided, replaces the prior p50_latency_ms (single-sample stand-in for a future reservoir).

task_id (when provided) is used to dedup: a single task contributing twice to Resume.observed.n (e.g. cascade EXPIRED + owner-emitted COMPLETED) is recorded only once.

Source code in autogen/beta/network/hub/core.py
async def record_observation(
    self,
    *,
    owner_id: str,
    capability: str,
    outcome: TaskState,
    latency_ms: int | None = None,
    task_id: str | None = None,
) -> None:
    """Update ``Resume.observed[capability]`` from a terminal task event.

    Called by ``TaskMirror`` when an owner's task ends with a
    ``capability`` tag set on its ``TaskSpec``. Updates the
    capability index so the agent appears under that capability
    even if it wasn't in their original ``claimed_capabilities``.

    Outcome must be one of the terminal task states
    (``COMPLETED`` / ``FAILED`` / ``EXPIRED``); other states are
    ignored. ``latency_ms``, when provided, replaces the prior
    ``p50_latency_ms`` (single-sample stand-in for a future
    reservoir).

    ``task_id`` (when provided) is used to dedup: a single task
    contributing twice to ``Resume.observed.n`` (e.g. cascade
    EXPIRED + owner-emitted COMPLETED) is recorded only once.
    """
    if outcome not in TERMINAL_TASK_STATES:
        return
    if task_id is not None and task_id in self._observed_task_ids:
        return
    resume = self._resumes.get(owner_id)
    if resume is None:
        return
    stat = resume.observed.get(capability) or ObservedStat()
    stat.n += 1
    if outcome == TaskState.COMPLETED:
        stat.completed += 1
    elif outcome == TaskState.FAILED:
        stat.failed += 1
    elif outcome == TaskState.EXPIRED:
        stat.expired += 1
    if latency_ms is not None:
        stat.p50_latency_ms = latency_ms
    resume.observed[capability] = stat
    resume.last_updated = self._clock()
    resume.version += 1
    await self._persist_resume(owner_id, resume)

    bucket = self._capability_index.setdefault(capability, set())
    if owner_id not in bucket:
        bucket.add(owner_id)
        await self._persist_capability_index()

    if task_id is not None:
        self._observed_task_ids.add(task_id)

    await self._audit_log.append({
        "at": self._clock(),
        "kind": AUDIT_KIND_RESUME_SET,
        "source": RESUME_SOURCE_OBSERVED,
        "agent_id": owner_id,
        "version": resume.version,
        "capability": capability,
        "outcome": outcome.value,
    })

agents_with_capability #

agents_with_capability(capability)

Return agent_ids matching capability (claimed or observed).

Source code in autogen/beta/network/hub/core.py
def agents_with_capability(self, capability: str) -> list[str]:
    """Return agent_ids matching ``capability`` (claimed or observed)."""
    return sorted(self._capability_index.get(capability, set()))

create_session async #

create_session(*, creator_id, manifest_type, manifest_version=1, participants, required_acks=None, ttl=None, knobs=None, intent=None, labels=None)

Allocate session_id, post invites, await acks, return metadata.

Posts EV_SESSION_INVITE to every invitee, awaits an EV_SESSION_INVITE_ACK from each (the handshake is all-or-nothing — any reject fails creation), transitions to ACTIVE, and broadcasts EV_SESSION_OPENED. Times out after invite_ack_timeout if the acks do not arrive.

Source code in autogen/beta/network/hub/core.py
async def create_session(
    self,
    *,
    creator_id: str,
    manifest_type: str,
    manifest_version: int = 1,
    participants: list[str],
    required_acks: int | None = None,
    ttl: str | int | None = None,
    knobs: dict[str, object] | None = None,
    intent: str | None = None,
    labels: dict[str, str] | None = None,
) -> SessionMetadata:
    """Allocate ``session_id``, post invites, await acks, return metadata.

    Posts ``EV_SESSION_INVITE`` to every invitee, awaits an
    ``EV_SESSION_INVITE_ACK`` from each (the handshake is
    all-or-nothing — any reject fails creation), transitions to
    ``ACTIVE``, and broadcasts ``EV_SESSION_OPENED``. Times out
    after ``invite_ack_timeout`` if the acks do not arrive.
    """
    if creator_id not in self._passports:
        raise NotFoundError(f"creator not registered: {creator_id}")
    if not participants:
        raise ProtocolError("session requires at least one participant")
    seen: set[str] = set()
    for p_id in participants:
        if p_id in seen:
            raise ProtocolError(f"participant listed twice: {p_id!r}")
        seen.add(p_id)
        if p_id != creator_id and p_id not in self._passports:
            raise NotFoundError(f"participant not registered: {p_id}")
    if creator_id not in participants:
        participants = [creator_id, *participants]

    adapter = self._adapter_for(manifest_type, manifest_version)

    creator_rule = self._rules.get(creator_id, Rule())
    creator_name = self._passports[creator_id].name

    # Pre-flight invitee inbound-access check. The dispatch path
    # silently filters envelopes whose sender is not in the
    # recipient's ``inbound_from`` whitelist; without this
    # pre-check, an invite to a recipient who blocks the creator
    # would be dropped on the floor and the creator would hang on
    # the ack waiter until ``invite_ack_timeout``. Surface the
    # access denial synchronously instead.
    for p_id in participants:
        if p_id == creator_id:
            continue
        invitee_rule = self._rules.get(p_id)
        if invitee_rule is None:
            continue
        if not _match_any(creator_name, invitee_rule.access.inbound_from):
            invitee_name = self._passports[p_id].name
            raise AccessDeniedError(f"invitee {invitee_name!r} does not accept inbound from {creator_name!r}")

    # Concurrency cap: count active sessions where this agent is
    # the creator. ``0`` disables the cap. Hub rejects before any
    # WAL or persistence work so the caller sees the limit
    # synchronously and on-disk state stays clean.
    max_sessions = creator_rule.limits.max_concurrent_sessions
    if max_sessions > 0:
        active = sum(1 for m in self._active_sessions.values() if m.creator_id == creator_id)
        if active >= max_sessions:
            raise AccessDeniedError(
                f"creator {creator_id!r} exceeded max_concurrent_sessions ({active} >= {max_sessions})"
            )

    session_id = make_id()
    now = self._clock()

    ttl_value: str | int = ttl if ttl is not None else creator_rule.limits.session_ttl_default
    ttl_seconds = parse_duration(ttl_value)
    expires_at = _expires_at(now, ttl_seconds) or None

    metadata_participants: list[Participant] = []
    for index, p_id in enumerate(participants):
        if p_id == creator_id:
            role = ParticipantRole.INITIATOR
        elif len(participants) == 2:
            role = ParticipantRole.RESPONDENT
        else:
            role = ParticipantRole.PARTICIPANT
        metadata_participants.append(Participant(agent_id=p_id, role=role, order=index, joined_at=now))

    final_labels: dict[str, str] = dict(labels) if labels else {}
    if intent:
        final_labels["intent"] = intent

    invitees = [p_id for p_id in participants if p_id != creator_id]

    metadata = SessionMetadata(
        session_id=session_id,
        manifest=adapter.manifest,
        creator_id=creator_id,
        participants=metadata_participants,
        state=SessionState.PENDING,
        created_at=now,
        expires_at=expires_at,
        knobs=dict(knobs) if knobs else {},
        labels=final_labels,
        required_acks=required_acks,
        pending_acks=list(invitees),
    )

    adapter.validate_create(metadata)

    # Activate caches before persistence so the post_envelope path
    # finds the metadata when the invite is dispatched.
    self._sessions[session_id] = metadata
    self._active_sessions[session_id] = metadata
    self._adapter_states[session_id] = adapter.initial_state(metadata)

    await self._persist_session_metadata(metadata)
    await self._audit_log.append({
        "at": now,
        "kind": AUDIT_KIND_SESSION_CREATED,
        "session_id": session_id,
        "manifest_type": manifest_type,
        "manifest_version": manifest_version,
        "creator_id": creator_id,
        "participants": [p.agent_id for p in metadata_participants],
    })

    if not invitees:
        # Self-only session — already complete; transition to ACTIVE.
        await self._activate_session(session_id)
        return metadata

    waiter: asyncio.Future[SessionMetadata] = asyncio.get_event_loop().create_future()
    self._session_open_waiters[session_id] = waiter

    # Post invites — each goes to one invitee via post_envelope.
    invite_data: dict[str, object] = {
        "session_id": session_id,
        "manifest_type": manifest_type,
        "manifest_version": manifest_version,
        "creator_id": creator_id,
        "knobs": metadata.knobs,
        "labels": metadata.labels,
    }
    try:
        for invitee_id in invitees:
            envelope = Envelope(
                session_id=session_id,
                sender_id=creator_id,
                audience=[invitee_id],
                event_type=EV_SESSION_INVITE,
                event_data=invite_data,
            )
            await self.post_envelope(envelope)
    except Exception:
        self._session_open_waiters.pop(session_id, None)
        await self._transition_session(session_id, SessionState.CLOSED, "invite_failed")
        raise

    try:
        return await asyncio.wait_for(waiter, timeout=self._invite_ack_timeout)
    except asyncio.TimeoutError as exc:
        await self._transition_session(session_id, SessionState.CLOSED, "invite_timeout")
        raise ProtocolError(f"session {session_id!r} ack timeout") from exc
    finally:
        self._session_open_waiters.pop(session_id, None)

close_session async #

close_session(session_id, *, reason='')
Source code in autogen/beta/network/hub/core.py
async def close_session(self, session_id: str, *, reason: str = "") -> SessionMetadata:
    await self._transition_session(session_id, SessionState.CLOSED, reason or "explicit_close")
    return self._sessions[session_id]

get_session async #

get_session(session_id)
Source code in autogen/beta/network/hub/core.py
async def get_session(self, session_id: str) -> SessionMetadata:
    metadata = self._sessions.get(session_id)
    if metadata is None:
        raise NotFoundError(f"session not found: {session_id}")
    return metadata

can_send #

can_send(session_id, sender_id, *, event_type=None)

True if the adapter would accept a substantive send from sender_id against the current state.

Wraps adapter.validate_send with a probe envelope so the default notify handler doesn't need to reach into private hub state to figure out whether it's the agent's turn.

Source code in autogen/beta/network/hub/core.py
def can_send(
    self,
    session_id: str,
    sender_id: str,
    *,
    event_type: str | None = None,
) -> bool:
    """True if the adapter would accept a substantive send from
    ``sender_id`` against the current state.

    Wraps ``adapter.validate_send`` with a probe envelope so the
    default notify handler doesn't need to reach into private hub
    state to figure out whether it's the agent's turn.
    """
    metadata = self._sessions.get(session_id)
    if metadata is None or metadata.is_terminal():
        return False
    state = self._adapter_states.get(session_id)
    if state is None:
        return False
    adapter = self._adapter_for(metadata.manifest.type, metadata.manifest.version)
    probe = Envelope(
        session_id=session_id,
        sender_id=sender_id,
        audience=None,
        event_type=event_type or EV_TEXT,
        event_data={"text": ""},
    )
    try:
        adapter.validate_send(metadata, probe, state)
    except Exception:
        return False
    return True

default_view_policy #

default_view_policy(session_id, participant_id)

Return the adapter-declared default view policy for this participant on this session. Wraps adapter.default_view_policy so callers don't need adapter registry access.

Source code in autogen/beta/network/hub/core.py
def default_view_policy(
    self,
    session_id: str,
    participant_id: str,
) -> "ViewPolicy":
    """Return the adapter-declared default view policy for this
    participant on this session. Wraps
    ``adapter.default_view_policy`` so callers don't need adapter
    registry access."""
    metadata = self._sessions.get(session_id)
    if metadata is None:
        raise NotFoundError(f"session not found: {session_id}")
    adapter = self._adapter_for(metadata.manifest.type, metadata.manifest.version)
    return adapter.default_view_policy(metadata, participant_id)

list_sessions async #

list_sessions(*, agent_id=None, state=None, limit=50)
Source code in autogen/beta/network/hub/core.py
async def list_sessions(
    self,
    *,
    agent_id: str | None = None,
    state: SessionState | None = None,
    limit: int = 50,
) -> list[SessionMetadata]:
    results: list[SessionMetadata] = []
    for metadata in self._sessions.values():
        if state is not None and metadata.state != state:
            continue
        if agent_id is not None and agent_id not in metadata.participant_ids():
            continue
        results.append(metadata)
    return results[:limit]

read_wal async #

read_wal(session_id, *, since=0, until=None)
Source code in autogen/beta/network/hub/core.py
async def read_wal(
    self,
    session_id: str,
    *,
    since: int = 0,
    until: int | None = None,
) -> list[Envelope]:
    body = await self._store.read(wal_path(session_id))
    if not body:
        return []
    envelopes: list[Envelope] = []
    for line in body.splitlines():
        line = line.strip()
        if not line:
            continue
        envelopes.append(Envelope.from_json(line))
    end = len(envelopes) if until is None else until
    return envelopes[since:end]

observe_task async #

observe_task(metadata)

Register a task observed via the agent's stream.

Hub does not create, assign, or cancel — it stores TaskMetadata, persists it, and starts TTL accounting.

On first observation, enforces the owner's Rule.limits.max_concurrent_tasks cap (0 disables).

Source code in autogen/beta/network/hub/core.py
async def observe_task(self, metadata: TaskMetadata) -> None:
    """Register a task observed via the agent's stream.

    Hub does not create, assign, or cancel — it stores
    ``TaskMetadata``, persists it, and starts TTL accounting.

    On first observation, enforces the owner's
    ``Rule.limits.max_concurrent_tasks`` cap (``0`` disables).
    """
    if metadata.task_id in self._tasks:
        # Update in place — owner re-emitting TaskStarted on retry, etc.
        existing = self._tasks[metadata.task_id]
        existing.state = metadata.state
        existing.started_at = metadata.started_at or existing.started_at
        existing.expires_at = metadata.expires_at or existing.expires_at
        existing.session_id = metadata.session_id or existing.session_id
        existing.progress.update(metadata.progress)
        await self._persist_task_metadata(existing)
        return

    owner_rule = self._rules.get(metadata.owner_id, Rule())
    max_tasks = owner_rule.limits.max_concurrent_tasks
    if max_tasks > 0:
        active = sum(
            1
            for t in self._tasks.values()
            if t.owner_id == metadata.owner_id and t.state not in TERMINAL_TASK_STATES
        )
        if active >= max_tasks:
            raise AccessDeniedError(
                f"owner {metadata.owner_id!r} exceeded max_concurrent_tasks ({active} >= {max_tasks})"
            )

    self._tasks[metadata.task_id] = metadata
    if metadata.session_id:
        self._session_tasks.setdefault(metadata.session_id, set()).add(metadata.task_id)
    await self._persist_task_metadata(metadata)

get_task async #

get_task(task_id)
Source code in autogen/beta/network/hub/core.py
async def get_task(self, task_id: str) -> TaskMetadata:
    metadata = self._tasks.get(task_id)
    if metadata is None:
        raise NotFoundError(f"task not found: {task_id}")
    return metadata

update_task async #

update_task(task_id, *, state=None, progress=None, result=None, error=None)

Update an observed task's lifecycle. Used by task_mirror.

Terminal-state transitions stamp completed_at. Idempotent — terminal-on-terminal is a no-op (further events ignored).

Source code in autogen/beta/network/hub/core.py
async def update_task(
    self,
    task_id: str,
    *,
    state: TaskState | None = None,
    progress: dict[str, object] | None = None,
    result: object | None = None,
    error: str | None = None,
) -> None:
    """Update an observed task's lifecycle. Used by ``task_mirror``.

    Terminal-state transitions stamp ``completed_at``. Idempotent —
    terminal-on-terminal is a no-op (further events ignored).
    """
    metadata = self._tasks.get(task_id)
    if metadata is None:
        raise NotFoundError(f"task not found: {task_id}")
    if metadata.state in TERMINAL_TASK_STATES:
        return
    if progress:
        metadata.progress.update(progress)
        metadata.last_progress_at = self._clock()
    if result is not None:
        metadata.result = result
    if error:
        metadata.error = error
    if state is not None:
        metadata.state = state
        if state in TERMINAL_TASK_STATES:
            metadata.completed_at = self._clock()
    await self._persist_task_metadata(metadata)

list_tasks async #

list_tasks(*, agent_id=None, session_id=None, state=None, limit=50)
Source code in autogen/beta/network/hub/core.py
async def list_tasks(
    self,
    *,
    agent_id: str | None = None,
    session_id: str | None = None,
    state: TaskState | None = None,
    limit: int = 50,
) -> list[TaskMetadata]:
    results: list[TaskMetadata] = []
    for metadata in self._tasks.values():
        if agent_id is not None and metadata.owner_id != agent_id:
            continue
        if session_id is not None and metadata.session_id != session_id:
            continue
        if state is not None and metadata.state != state:
            continue
        results.append(metadata)
    return results[:limit]

expire_due async #

expire_due()

Walk active sessions and tasks; expire ones past their TTL.

Cascades non-terminal tasks under closing sessions (via :meth:_transition_session).

Source code in autogen/beta/network/hub/core.py
async def expire_due(self) -> None:
    """Walk active sessions and tasks; expire ones past their TTL.

    Cascades non-terminal tasks under closing sessions (via
    :meth:`_transition_session`).
    """
    now = self._clock()

    expired_sessions: list[str] = []
    for session_id, metadata in list(self._active_sessions.items()):
        if metadata.expires_at and metadata.expires_at <= now:
            expired_sessions.append(session_id)
    for session_id in expired_sessions:
        await self._transition_session(session_id, SessionState.EXPIRED, "ttl_expired")

    # Expire standalone tasks (those not under an expiring session).
    expired_tasks: list[str] = []
    for task_id, metadata in list(self._tasks.items()):
        if metadata.state in TERMINAL_TASK_STATES:
            continue
        if metadata.expires_at and metadata.expires_at <= now:
            expired_tasks.append(task_id)
    for task_id in expired_tasks:
        await self._transition_task(task_id, TaskState.EXPIRED, "ttl_expired")

post_envelope async #

post_envelope(envelope)

Validate sender + adapter + WAL append + dispatch.

Per-session lock makes validate_send / fold / on_accepted see a consistent state. Dispatch and post-accept transitions happen outside the lock so the broadcast of EV_SESSION_CLOSED does not deadlock on the same lock.

Source code in autogen/beta/network/hub/core.py
async def post_envelope(self, envelope: Envelope) -> str:
    """Validate sender + adapter + WAL append + dispatch.

    Per-session lock makes ``validate_send`` / ``fold`` /
    ``on_accepted`` see a consistent state. Dispatch and post-accept
    transitions happen outside the lock so the broadcast of
    ``EV_SESSION_CLOSED`` does not deadlock on the same lock.
    """
    sender = self._passports.get(envelope.sender_id)
    if sender is None:
        raise NotFoundError(f"sender not registered: {envelope.sender_id}")

    sender_rule = self._rules.get(envelope.sender_id, Rule())

    # Outbound access check. Self-routing is always allowed —
    # protocol broadcasts (``EV_SESSION_OPENED`` / ``EV_SESSION_CLOSED``)
    # include the creator in their own audience so the creator's
    # ``Session`` handle receives the lifecycle notification, and the
    # sender's ``outbound_to`` should never block their own
    # state-sync envelopes.
    if envelope.audience is not None:
        for recipient_id in envelope.audience:
            if recipient_id == envelope.sender_id:
                continue
            recipient = self._passports.get(recipient_id)
            if recipient is None:
                continue
            if not _match_any(recipient.name, sender_rule.access.outbound_to):
                raise AccessDeniedError(f"sender {sender.name!r} not permitted to send to {recipient.name!r}")

    # Delegation-depth check. ``0`` disables the cap. Hub rejects
    # before the WAL append so the outer caller sees the limit
    # synchronously and the WAL stays clean.
    depth_cap = sender_rule.limits.delegation_depth
    if depth_cap > 0 and envelope.depth > depth_cap:
        raise AccessDeniedError(
            f"sender {sender.name!r} exceeded delegation_depth ({envelope.depth} > {depth_cap})"
        )

    metadata = self._sessions.get(envelope.session_id)
    if metadata is None:
        raise NotFoundError(f"session not found: {envelope.session_id}")
    if metadata.is_terminal():
        raise ProtocolError(f"session {envelope.session_id!r} is {metadata.state.value}")
    if not _is_protocol_event(envelope.event_type) and metadata.state != SessionState.ACTIVE:
        raise ProtocolError(f"session {envelope.session_id!r} not active (state={metadata.state.value})")

    # Adapter must be registered to dispatch on this session.
    # Distinct from create_session's NotFoundError (where the user
    # is asking for an unknown manifest at session-creation time):
    # here the session exists but its manifest's adapter is no
    # longer loaded — typically a hydrate where the manifest type
    # wasn't re-registered before ``hub.start()``. Surface as a
    # ProtocolError so callers can distinguish "session is down"
    # from "session never existed."
    if (metadata.manifest.type, metadata.manifest.version) not in self._adapters:
        raise ProtocolError(
            f"session {envelope.session_id!r} has no registered adapter "
            f"(manifest {metadata.manifest.type!r}@v{metadata.manifest.version})"
        )

    # Inbox capacity check (substantive events only — protocol
    # invites / acks / opens / closes must always reach
    # participants for the session machine to advance).
    if not _is_protocol_event(envelope.event_type):
        if envelope.audience is not None:
            inbox_audience: list[str] = list(envelope.audience)
        else:
            inbox_audience = [p.agent_id for p in metadata.participants if p.agent_id != envelope.sender_id]
        for recipient_id in inbox_audience:
            if recipient_id == envelope.sender_id:
                continue
            recipient_rule = self._rules.get(recipient_id)
            if recipient_rule is None:
                continue
            max_pending = recipient_rule.limits.inbox.max_pending
            if max_pending > 0:
                current = self._inbox_pending.get(recipient_id, 0)
                if current >= max_pending:
                    raise InboxFull(f"recipient {recipient_id!r} inbox at capacity ({current} >= {max_pending})")

    adapter = self._adapter_for(metadata.manifest.type, metadata.manifest.version)

    # Critical section: validate, append, fold, on_accepted under lock.
    async with self._wal_lock(envelope.session_id):
        state = self._adapter_states.get(envelope.session_id)
        if state is None:
            # Session metadata exists but its adapter state was
            # never folded — typically a hydrate where the manifest's
            # adapter was not registered. Surface as a protocol
            # error rather than a bare KeyError.
            raise ProtocolError(
                f"session {envelope.session_id!r} has no adapter state "
                f"(manifest {metadata.manifest.type!r}@v{metadata.manifest.version} "
                "may not be registered)"
            )
        adapter.validate_send(metadata, envelope, state)

        envelope.envelope_id = make_id()
        envelope.created_at = self._clock()

        await self._wal_append(envelope)
        new_state = adapter.fold(envelope, state)
        self._adapter_states[envelope.session_id] = new_state
        result = adapter.on_accepted(metadata, envelope, new_state)

    # Sender showed they're processing their inbox — decrement
    # their outstanding count. Substantive events only; protocol
    # acks and opens shouldn't drain inbox accounting.
    if not _is_protocol_event(envelope.event_type):
        current = self._inbox_pending.get(envelope.sender_id, 0)
        if current > 0:
            self._inbox_pending[envelope.sender_id] = current - 1

    # Outside lock: dispatch + post-accept handling.
    # Acks/rejects are absorbed by the hub — they aren't dispatched.
    if envelope.event_type == EV_SESSION_INVITE_ACK:
        await self._handle_invite_ack(envelope, metadata)
        return envelope.envelope_id
    if envelope.event_type == EV_SESSION_INVITE_REJECT:
        await self._handle_invite_reject(envelope, metadata)
        return envelope.envelope_id

    await self._dispatch(envelope, metadata)

    if result.next_state is not None:
        await self._transition_session(
            envelope.session_id,
            result.next_state,
            result.auto_close_reason,
        )

    return envelope.envelope_id

attach_endpoint #

attach_endpoint(endpoint)
Source code in autogen/beta/network/hub/core.py
def attach_endpoint(self, endpoint: LinkEndpoint) -> None:
    if self._closed:
        return
    self._endpoints_by_id[endpoint.endpoint_id] = endpoint
    task = asyncio.create_task(self._handle_endpoint(endpoint))
    self._endpoint_tasks.add(task)
    task.add_done_callback(self._endpoint_tasks.discard)

bind_endpoint #

bind_endpoint(endpoint_id, agent_id)
Source code in autogen/beta/network/hub/core.py
def bind_endpoint(self, endpoint_id: str, agent_id: str) -> None:
    if endpoint_id not in self._endpoints_by_id:
        raise NotFoundError(f"endpoint not attached: {endpoint_id}")
    if agent_id not in self._passports:
        raise NotFoundError(f"agent not registered: {agent_id}")
    self._endpoints_by_id[endpoint_id].agent_id = agent_id
    self._agent_to_endpoint[agent_id] = endpoint_id
    self._endpoint_to_agents.setdefault(endpoint_id, set()).add(agent_id)