Skip to content

HubClient

autogen.beta.network.client.hub_client.HubClient #

HubClient(link, *, hub=None)

One connection to a hub. Multiple AgentClients register through it.

Takes a link (currently LocalLink) and an explicit hub reference. The link carries dispatched envelopes via NotifyFrame; the direct hub reference is used for register / discovery / mutation calls (cuts through wire serialisation when we're in-process).

A single tenant process should hold one HubClient per hub it connects to.

Source code in autogen/beta/network/client/hub_client.py
def __init__(self, link: LocalLink, *, hub: "Hub | None" = None) -> None:
    # __init__ stores params; side effects deferred to register()/close().
    self._link = link
    self._hub = hub if hub is not None else link.hub
    self._client_link: LocalLinkClient | None = None
    self._receive_task: asyncio.Task[None] | None = None
    self._clients: dict[str, AgentClient] = {}
    self._closed = False

register async #

register(agent, passport, resume, *, skill_md=None, rule=None, attach_plugin=True)

Register an agent and return its AgentClient handle.

Direct hub call for register (in-process); the resulting agent_id is bound to this connection's endpoint so dispatched NotifyFrames reach the right AgentClient. A cross-process transport binds via HelloFrame instead.

attach_plugin is accepted for forward compatibility but does nothing here — the LLM-facing tool surface that it attaches lives in a layer that is not part of this module.

Source code in autogen/beta/network/client/hub_client.py
async def register(
    self,
    agent: Agent,
    passport: Passport,
    resume: Resume,
    *,
    skill_md: str | None = None,
    rule: Rule | None = None,
    attach_plugin: bool = True,
) -> AgentClient:
    """Register an agent and return its ``AgentClient`` handle.

    Direct hub call for register (in-process); the resulting
    ``agent_id`` is bound to this connection's endpoint so
    dispatched ``NotifyFrame``s reach the right ``AgentClient``. A
    cross-process transport binds via ``HelloFrame`` instead.

    ``attach_plugin`` is accepted for forward compatibility but
    does nothing here — the LLM-facing tool surface that it
    attaches lives in a layer that is not part of this module.
    """
    if self._closed:
        raise RuntimeError("HubClient is closed")

    client_link = self._ensure_connected()

    effective_rule = rule if rule is not None else Rule()
    passport = await self._hub.register(passport, resume, skill_md=skill_md, rule=effective_rule)
    assert passport.agent_id is not None
    self._hub.bind_endpoint(client_link.endpoint_id, passport.agent_id)

    client = AgentClient(
        agent=agent,
        passport=passport,
        resume=resume,
        rule=effective_rule,
        hub=self._hub,
        hub_client=self,
    )
    self._clients[passport.agent_id] = client

    return client

get_agent async #

get_agent(name_or_id)
Source code in autogen/beta/network/client/hub_client.py
async def get_agent(self, name_or_id: str) -> Passport:
    return await self._hub.get_agent(name_or_id)

get_resume async #

get_resume(agent_id)
Source code in autogen/beta/network/client/hub_client.py
async def get_resume(self, agent_id: str) -> Resume:
    return await self._hub.get_resume(agent_id)

get_skill async #

get_skill(agent_id)
Source code in autogen/beta/network/client/hub_client.py
async def get_skill(self, agent_id: str) -> str | None:
    return await self._hub.get_skill(agent_id)

list_agents async #

list_agents(*, capability=None, query=None, sort_by=None, limit=50)
Source code in autogen/beta/network/client/hub_client.py
async def list_agents(
    self,
    *,
    capability: str | None = None,
    query: str | None = None,
    sort_by: str | None = None,
    limit: int = 50,
) -> list[Passport]:
    return await self._hub.list_agents(
        capability=capability,
        query=query,
        sort_by=sort_by,
        limit=limit,
    )

set_resume async #

set_resume(agent_id, resume)
Source code in autogen/beta/network/client/hub_client.py
async def set_resume(self, agent_id: str, resume: Resume) -> None:
    await self._hub.set_resume(agent_id, resume)

set_skill async #

set_skill(agent_id, skill_md)
Source code in autogen/beta/network/client/hub_client.py
async def set_skill(self, agent_id: str, skill_md: str | None) -> None:
    await self._hub.set_skill(agent_id, skill_md)

set_rule async #

set_rule(agent_id, rule)
Source code in autogen/beta/network/client/hub_client.py
async def set_rule(self, agent_id: str, rule: Rule) -> None:
    await self._hub.set_rule(agent_id, rule)

unregister_agent async #

unregister_agent(agent_id)
Source code in autogen/beta/network/client/hub_client.py
async def unregister_agent(self, agent_id: str) -> None:
    await self._hub.unregister(agent_id)

create_session async #

create_session(*, creator_id, manifest_type, manifest_version=1, participants, required_acks=None, ttl=None, knobs=None, intent=None, labels=None)
Source code in autogen/beta/network/client/hub_client.py
async def create_session(
    self,
    *,
    creator_id: str,
    manifest_type: str,
    manifest_version: int = 1,
    participants: list[str],
    required_acks: int | None = None,
    ttl: str | int | None = None,
    knobs: dict[str, object] | None = None,
    intent: str | None = None,
    labels: dict[str, str] | None = None,
) -> SessionMetadata:
    return await self._hub.create_session(
        creator_id=creator_id,
        manifest_type=manifest_type,
        manifest_version=manifest_version,
        participants=participants,
        required_acks=required_acks,
        ttl=ttl,
        knobs=knobs,
        intent=intent,
        labels=labels,
    )

get_session async #

get_session(session_id)
Source code in autogen/beta/network/client/hub_client.py
async def get_session(self, session_id: str) -> SessionMetadata:
    return await self._hub.get_session(session_id)

list_sessions async #

list_sessions(*, agent_id=None, include_terminal=False, limit=50)
Source code in autogen/beta/network/client/hub_client.py
async def list_sessions(
    self,
    *,
    agent_id: str | None = None,
    include_terminal: bool = False,
    limit: int = 50,
) -> list[SessionMetadata]:
    results = await self._hub.list_sessions(agent_id=agent_id, limit=limit * 4)
    if not include_terminal:
        results = [m for m in results if m.state not in (SessionState.CLOSED, SessionState.EXPIRED)]
    return results[:limit]

close_session async #

close_session(session_id, *, reason='')
Source code in autogen/beta/network/client/hub_client.py
async def close_session(self, session_id: str, *, reason: str = "") -> SessionMetadata:
    return await self._hub.close_session(session_id, reason=reason)

post_envelope async #

post_envelope(envelope)
Source code in autogen/beta/network/client/hub_client.py
async def post_envelope(self, envelope: Envelope) -> str:
    return await self._hub.post_envelope(envelope)

read_wal async #

read_wal(session_id, *, since=0, until=None)
Source code in autogen/beta/network/client/hub_client.py
async def read_wal(self, session_id: str, *, since: int = 0, until: int | None = None) -> list[Envelope]:
    return await self._hub.read_wal(session_id, since=since, until=until)

can_send #

can_send(session_id, sender_id, *, event_type=None)
Source code in autogen/beta/network/client/hub_client.py
def can_send(
    self,
    session_id: str,
    sender_id: str,
    *,
    event_type: str | None = None,
) -> bool:
    return self._hub.can_send(session_id, sender_id, event_type=event_type)

default_view_policy #

default_view_policy(session_id, participant_id)
Source code in autogen/beta/network/client/hub_client.py
def default_view_policy(self, session_id: str, participant_id: str) -> ViewPolicy:
    return self._hub.default_view_policy(session_id, participant_id)

get_task async #

get_task(task_id)
Source code in autogen/beta/network/client/hub_client.py
async def get_task(self, task_id: str) -> TaskMetadata:
    return await self._hub.get_task(task_id)

list_tasks async #

list_tasks(*, agent_id=None, session_id=None, state=None, limit=50)
Source code in autogen/beta/network/client/hub_client.py
async def list_tasks(
    self,
    *,
    agent_id: str | None = None,
    session_id: str | None = None,
    state: TaskState | None = None,
    limit: int = 50,
) -> list[TaskMetadata]:
    return await self._hub.list_tasks(
        agent_id=agent_id,
        session_id=session_id,
        state=state,
        limit=limit,
    )

observe_task async #

observe_task(metadata)
Source code in autogen/beta/network/client/hub_client.py
async def observe_task(self, metadata: TaskMetadata) -> None:
    await self._hub.observe_task(metadata)

update_task async #

update_task(task_id, *, state=None, progress=None, result=None, error=None)
Source code in autogen/beta/network/client/hub_client.py
async def update_task(
    self,
    task_id: str,
    *,
    state: TaskState | None = None,
    progress: dict[str, object] | None = None,
    result: object | None = None,
    error: str | None = None,
) -> None:
    await self._hub.update_task(
        task_id,
        state=state,
        progress=progress,
        result=result,
        error=error,
    )

record_observation async #

record_observation(*, owner_id, capability, outcome, latency_ms=None, task_id=None)
Source code in autogen/beta/network/client/hub_client.py
async def record_observation(
    self,
    *,
    owner_id: str,
    capability: str,
    outcome: TaskState,
    latency_ms: int | None = None,
    task_id: str | None = None,
) -> None:
    await self._hub.record_observation(
        owner_id=owner_id,
        capability=capability,
        outcome=outcome,
        latency_ms=latency_ms,
        task_id=task_id,
    )

close async #

close()

Close the connection and stop the receive loop. Idempotent.

Source code in autogen/beta/network/client/hub_client.py
async def close(self) -> None:
    """Close the connection and stop the receive loop. Idempotent."""
    if self._closed:
        return
    self._closed = True
    if self._client_link is not None:
        await self._client_link.close()
    if self._receive_task is not None:
        self._receive_task.cancel()
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await self._receive_task

shutdown async #

shutdown()

Unregister every AgentClient then close().

Source code in autogen/beta/network/client/hub_client.py
async def shutdown(self) -> None:
    """Unregister every ``AgentClient`` then ``close()``."""
    for client in list(self._clients.values()):
        with contextlib.suppress(Exception):
            await client.unregister()
    self._clients.clear()
    await self.close()