One connection to a hub. Multiple AgentClients register through it.
Takes a link (currently LocalLink) and an explicit hub reference. The link carries dispatched envelopes via NotifyFrame; the direct hub reference is used for register / discovery / mutation calls (cuts through wire serialisation when we're in-process).
A single tenant process should hold one HubClient per hub it connects to.
Source code in autogen/beta/network/client/hub_client.py
| def __init__(self, link: LocalLink, *, hub: "Hub | None" = None) -> None:
# __init__ stores params; side effects deferred to register()/close().
self._link = link
self._hub = hub if hub is not None else link.hub
self._client_link: LocalLinkClient | None = None
self._receive_task: asyncio.Task[None] | None = None
self._clients: dict[str, AgentClient] = {}
self._closed = False
|
register async
register(agent, passport, resume, *, skill_md=None, rule=None, attach_plugin=True)
Register an agent and return its AgentClient handle.
Direct hub call for register (in-process); the resulting agent_id is bound to this connection's endpoint so dispatched NotifyFrames reach the right AgentClient. A cross-process transport binds via HelloFrame instead.
attach_plugin is accepted for forward compatibility but does nothing here — the LLM-facing tool surface that it attaches lives in a layer that is not part of this module.
Source code in autogen/beta/network/client/hub_client.py
| async def register(
self,
agent: Agent,
passport: Passport,
resume: Resume,
*,
skill_md: str | None = None,
rule: Rule | None = None,
attach_plugin: bool = True,
) -> AgentClient:
"""Register an agent and return its ``AgentClient`` handle.
Direct hub call for register (in-process); the resulting
``agent_id`` is bound to this connection's endpoint so
dispatched ``NotifyFrame``s reach the right ``AgentClient``. A
cross-process transport binds via ``HelloFrame`` instead.
``attach_plugin`` is accepted for forward compatibility but
does nothing here — the LLM-facing tool surface that it
attaches lives in a layer that is not part of this module.
"""
if self._closed:
raise RuntimeError("HubClient is closed")
client_link = self._ensure_connected()
effective_rule = rule if rule is not None else Rule()
passport = await self._hub.register(passport, resume, skill_md=skill_md, rule=effective_rule)
assert passport.agent_id is not None
self._hub.bind_endpoint(client_link.endpoint_id, passport.agent_id)
client = AgentClient(
agent=agent,
passport=passport,
resume=resume,
rule=effective_rule,
hub=self._hub,
hub_client=self,
)
self._clients[passport.agent_id] = client
return client
|
get_agent async
Source code in autogen/beta/network/client/hub_client.py
| async def get_agent(self, name_or_id: str) -> Passport:
return await self._hub.get_agent(name_or_id)
|
get_resume async
Source code in autogen/beta/network/client/hub_client.py
| async def get_resume(self, agent_id: str) -> Resume:
return await self._hub.get_resume(agent_id)
|
get_skill async
Source code in autogen/beta/network/client/hub_client.py
| async def get_skill(self, agent_id: str) -> str | None:
return await self._hub.get_skill(agent_id)
|
list_agents async
list_agents(*, capability=None, query=None, sort_by=None, limit=50)
Source code in autogen/beta/network/client/hub_client.py
| async def list_agents(
self,
*,
capability: str | None = None,
query: str | None = None,
sort_by: str | None = None,
limit: int = 50,
) -> list[Passport]:
return await self._hub.list_agents(
capability=capability,
query=query,
sort_by=sort_by,
limit=limit,
)
|
set_resume async
set_resume(agent_id, resume)
Source code in autogen/beta/network/client/hub_client.py
| async def set_resume(self, agent_id: str, resume: Resume) -> None:
await self._hub.set_resume(agent_id, resume)
|
set_skill async
set_skill(agent_id, skill_md)
Source code in autogen/beta/network/client/hub_client.py
| async def set_skill(self, agent_id: str, skill_md: str | None) -> None:
await self._hub.set_skill(agent_id, skill_md)
|
set_rule async
Source code in autogen/beta/network/client/hub_client.py
| async def set_rule(self, agent_id: str, rule: Rule) -> None:
await self._hub.set_rule(agent_id, rule)
|
unregister_agent async
unregister_agent(agent_id)
Source code in autogen/beta/network/client/hub_client.py
| async def unregister_agent(self, agent_id: str) -> None:
await self._hub.unregister(agent_id)
|
create_session async
create_session(*, creator_id, manifest_type, manifest_version=1, participants, required_acks=None, ttl=None, knobs=None, intent=None, labels=None)
Source code in autogen/beta/network/client/hub_client.py
| async def create_session(
self,
*,
creator_id: str,
manifest_type: str,
manifest_version: int = 1,
participants: list[str],
required_acks: int | None = None,
ttl: str | int | None = None,
knobs: dict[str, object] | None = None,
intent: str | None = None,
labels: dict[str, str] | None = None,
) -> SessionMetadata:
return await self._hub.create_session(
creator_id=creator_id,
manifest_type=manifest_type,
manifest_version=manifest_version,
participants=participants,
required_acks=required_acks,
ttl=ttl,
knobs=knobs,
intent=intent,
labels=labels,
)
|
get_session async
Source code in autogen/beta/network/client/hub_client.py
| async def get_session(self, session_id: str) -> SessionMetadata:
return await self._hub.get_session(session_id)
|
list_sessions async
list_sessions(*, agent_id=None, include_terminal=False, limit=50)
Source code in autogen/beta/network/client/hub_client.py
| async def list_sessions(
self,
*,
agent_id: str | None = None,
include_terminal: bool = False,
limit: int = 50,
) -> list[SessionMetadata]:
results = await self._hub.list_sessions(agent_id=agent_id, limit=limit * 4)
if not include_terminal:
results = [m for m in results if m.state not in (SessionState.CLOSED, SessionState.EXPIRED)]
return results[:limit]
|
close_session async
close_session(session_id, *, reason='')
Source code in autogen/beta/network/client/hub_client.py
| async def close_session(self, session_id: str, *, reason: str = "") -> SessionMetadata:
return await self._hub.close_session(session_id, reason=reason)
|
post_envelope async
Source code in autogen/beta/network/client/hub_client.py
| async def post_envelope(self, envelope: Envelope) -> str:
return await self._hub.post_envelope(envelope)
|
read_wal async
read_wal(session_id, *, since=0, until=None)
Source code in autogen/beta/network/client/hub_client.py
| async def read_wal(self, session_id: str, *, since: int = 0, until: int | None = None) -> list[Envelope]:
return await self._hub.read_wal(session_id, since=since, until=until)
|
can_send
can_send(session_id, sender_id, *, event_type=None)
Source code in autogen/beta/network/client/hub_client.py
| def can_send(
self,
session_id: str,
sender_id: str,
*,
event_type: str | None = None,
) -> bool:
return self._hub.can_send(session_id, sender_id, event_type=event_type)
|
default_view_policy
default_view_policy(session_id, participant_id)
Source code in autogen/beta/network/client/hub_client.py
| def default_view_policy(self, session_id: str, participant_id: str) -> ViewPolicy:
return self._hub.default_view_policy(session_id, participant_id)
|
get_task async
Source code in autogen/beta/network/client/hub_client.py
| async def get_task(self, task_id: str) -> TaskMetadata:
return await self._hub.get_task(task_id)
|
list_tasks async
list_tasks(*, agent_id=None, session_id=None, state=None, limit=50)
Source code in autogen/beta/network/client/hub_client.py
| async def list_tasks(
self,
*,
agent_id: str | None = None,
session_id: str | None = None,
state: TaskState | None = None,
limit: int = 50,
) -> list[TaskMetadata]:
return await self._hub.list_tasks(
agent_id=agent_id,
session_id=session_id,
state=state,
limit=limit,
)
|
observe_task async
Source code in autogen/beta/network/client/hub_client.py
| async def observe_task(self, metadata: TaskMetadata) -> None:
await self._hub.observe_task(metadata)
|
update_task async
update_task(task_id, *, state=None, progress=None, result=None, error=None)
Source code in autogen/beta/network/client/hub_client.py
| async def update_task(
self,
task_id: str,
*,
state: TaskState | None = None,
progress: dict[str, object] | None = None,
result: object | None = None,
error: str | None = None,
) -> None:
await self._hub.update_task(
task_id,
state=state,
progress=progress,
result=result,
error=error,
)
|
record_observation async
record_observation(*, owner_id, capability, outcome, latency_ms=None, task_id=None)
Source code in autogen/beta/network/client/hub_client.py
| async def record_observation(
self,
*,
owner_id: str,
capability: str,
outcome: TaskState,
latency_ms: int | None = None,
task_id: str | None = None,
) -> None:
await self._hub.record_observation(
owner_id=owner_id,
capability=capability,
outcome=outcome,
latency_ms=latency_ms,
task_id=task_id,
)
|
close async
Close the connection and stop the receive loop. Idempotent.
Source code in autogen/beta/network/client/hub_client.py
| async def close(self) -> None:
"""Close the connection and stop the receive loop. Idempotent."""
if self._closed:
return
self._closed = True
if self._client_link is not None:
await self._client_link.close()
if self._receive_task is not None:
self._receive_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await self._receive_task
|
shutdown async
Unregister every AgentClient then close().
Source code in autogen/beta/network/client/hub_client.py
| async def shutdown(self) -> None:
"""Unregister every ``AgentClient`` then ``close()``."""
for client in list(self._clients.values()):
with contextlib.suppress(Exception):
await client.unregister()
self._clients.clear()
await self.close()
|