Skip to content

AgentClient

autogen.beta.network.client.agent_client.AgentClient #

AgentClient(*, agent, passport, resume, rule, hub, hub_client, attach_default_handler=True)

Tenant-side handle for one (Agent, identity, hub) registration.

Source code in autogen/beta/network/client/agent_client.py
def __init__(
    self,
    *,
    agent: Agent,
    passport: Passport,
    resume: Resume,
    rule: Rule,
    hub: "Hub",
    hub_client: "HubClient",
    attach_default_handler: bool = True,
) -> None:
    # __init__ stores params; no side effects.
    self._agent = agent
    self._passport = passport
    self._resume = resume
    self._rule = rule
    self._hub = hub
    self._hub_client = hub_client
    self._on_envelope: EnvelopeHandler | None = self._run_default_handler if attach_default_handler else None
    self._disconnected = False

    # Per-session inbox queues for ``wait_for_session_event``
    # (used by the ``delegate`` tool to await consulting replies).
    self._session_inboxes: dict[str, asyncio.Queue[Envelope]] = {}

    # Sessions where the default notify handler should NOT run —
    # used by ``delegate`` while it owns the session lifecycle.
    self._handler_suppressed_sessions: set[str] = set()

    # Stack of envelopes currently being handled. The top of the
    # stack is the envelope this agent is processing right now;
    # ``delegate`` reads its ``depth`` to stamp the outgoing prompt
    # for delegation-depth enforcement (Rule.limits.delegation_depth).
    self._handling_envelope_stack: list[Envelope] = []

agent property #

agent

passport property #

passport

resume property #

resume

rule property #

rule

agent_id property #

agent_id

current_handling_depth property #

current_handling_depth

Depth of the envelope this agent is currently handling.

Returns 0 when no handler is on the stack (i.e. the agent initiated the call from outside any inbound delivery). Used by delegate to stamp Envelope.depth = current + 1.

receive async #

receive(envelope)

Hub delivery → fan out to inbox + (suppressible) handler.

Source code in autogen/beta/network/client/agent_client.py
async def receive(self, envelope: Envelope) -> None:
    """Hub delivery → fan out to inbox + (suppressible) handler."""
    inbox = self.ensure_session_inbox(envelope.session_id)
    await inbox.put(envelope)
    if envelope.session_id in self._handler_suppressed_sessions:
        return
    if self._on_envelope is not None:
        await self._on_envelope(envelope)

on_envelope #

on_envelope(callback)

Override the default notify handler with a custom callback.

Calling with the default handler restores it: pass self._run_default_handler (or simply construct without attach_default_handler=False).

Source code in autogen/beta/network/client/agent_client.py
def on_envelope(self, callback: EnvelopeHandler) -> None:
    """Override the default notify handler with a custom callback.

    Calling with the default handler restores it: pass
    ``self._run_default_handler`` (or simply construct without
    ``attach_default_handler=False``).
    """
    self._on_envelope = callback

disconnect async #

disconnect()
Source code in autogen/beta/network/client/agent_client.py
async def disconnect(self) -> None:
    self._disconnected = True
    self._on_envelope = None

open async #

open(*, type, target, ttl=None, knobs=None, intent=None, labels=None)

Open a session via the hub and return its :class:Session handle.

target accepts peer names or agent_ids; resolution goes through the bound :class:HubClient so in-process and any future cross-process transport take the same code path.

Source code in autogen/beta/network/client/agent_client.py
async def open(
    self,
    *,
    type: str,
    target: str | list[str],
    ttl: str | int | None = None,
    knobs: dict[str, object] | None = None,
    intent: str | None = None,
    labels: dict[str, str] | None = None,
) -> Session:
    """Open a session via the hub and return its :class:`Session` handle.

    ``target`` accepts peer **names** or agent_ids; resolution goes
    through the bound :class:`HubClient` so in-process and any
    future cross-process transport take the same code path.
    """
    if self._disconnected:
        raise RuntimeError("AgentClient is disconnected")

    targets = [target] if isinstance(target, str) else list(target)
    target_ids: list[str] = []
    for t in targets:
        passport = await self._hub_client.get_agent(t)
        if passport.agent_id is None:
            raise RuntimeError(f"target {t!r} has no agent_id")
        target_ids.append(passport.agent_id)

    metadata = await self._hub_client.create_session(
        creator_id=self.agent_id,
        manifest_type=type,
        participants=target_ids,
        ttl=ttl,
        knobs=knobs,
        intent=intent,
        labels=labels,
    )
    self.ensure_session_inbox(metadata.session_id)
    return Session(metadata=metadata, client=self)

ensure_session_inbox #

ensure_session_inbox(session_id)

Create (or fetch) the per-session inbox queue.

Callers that send first and then wait_for_session_event MUST call this BEFORE the send. Otherwise a fast reply (e.g. via LocalLink where dispatch happens on the same event-loop tick) can be delivered to receive before the wait creates the inbox — the envelope would then be dropped silently.

Idempotent: returns the existing queue if one is already bound.

Source code in autogen/beta/network/client/agent_client.py
def ensure_session_inbox(self, session_id: str) -> "asyncio.Queue[Envelope]":
    """Create (or fetch) the per-session inbox queue.

    Callers that send first and then ``wait_for_session_event`` MUST
    call this BEFORE the send. Otherwise a fast reply (e.g. via
    ``LocalLink`` where dispatch happens on the same event-loop tick)
    can be delivered to ``receive`` before the wait creates the
    inbox — the envelope would then be dropped silently.

    Idempotent: returns the existing queue if one is already bound.
    """
    inbox = self._session_inboxes.get(session_id)
    if inbox is None:
        inbox = asyncio.Queue()
        self._session_inboxes[session_id] = inbox
    return inbox

discard_session_inbox #

discard_session_inbox(session_id)

Drop the per-session inbox queue.

Callers should invoke this after they've finished waiting on a session so the per-client memory footprint doesn't grow with every consulted session.

Source code in autogen/beta/network/client/agent_client.py
def discard_session_inbox(self, session_id: str) -> None:
    """Drop the per-session inbox queue.

    Callers should invoke this after they've finished waiting on a
    session so the per-client memory footprint doesn't grow with
    every consulted session.
    """
    self._session_inboxes.pop(session_id, None)

wait_for_session_event async #

wait_for_session_event(*, session_id, predicate, timeout=300.0)

Block until an inbound envelope on session_id matches.

Used by delegate to await the consulting respondent's reply. The inbox is created on demand and shared across waits; callers should not hold multiple concurrent waits on the same session.

Raises asyncio.TimeoutError on timeout.

Source code in autogen/beta/network/client/agent_client.py
async def wait_for_session_event(
    self,
    *,
    session_id: str,
    predicate: EnvelopePredicate,
    timeout: float = 300.0,
) -> Envelope:
    """Block until an inbound envelope on ``session_id`` matches.

    Used by ``delegate`` to await the consulting respondent's
    reply. The inbox is created on demand and shared across waits;
    callers should not hold multiple concurrent waits on the same
    session.

    Raises ``asyncio.TimeoutError`` on timeout.
    """
    inbox = self.ensure_session_inbox(session_id)

    loop = asyncio.get_event_loop()
    deadline = loop.time() + timeout
    while True:
        remaining = deadline - loop.time()
        if remaining <= 0:
            raise asyncio.TimeoutError()
        envelope = await asyncio.wait_for(inbox.get(), timeout=remaining)
        if predicate(envelope):
            return envelope

send_envelope async #

send_envelope(envelope)

Post an envelope through the hub. Returns the stamped envelope_id.

Source code in autogen/beta/network/client/agent_client.py
async def send_envelope(self, envelope: Envelope) -> str:
    """Post an envelope through the hub. Returns the stamped envelope_id."""
    if self._disconnected:
        raise RuntimeError("AgentClient is disconnected")
    if envelope.sender_id == "":
        envelope.sender_id = self.agent_id
    return await self._hub_client.post_envelope(envelope)

set_resume async #

set_resume(resume)
Source code in autogen/beta/network/client/agent_client.py
async def set_resume(self, resume: Resume) -> None:
    await self._hub_client.set_resume(self.agent_id, resume)
    # Refresh local cache so subsequent reads see the bumped version.
    self._resume = await self._hub_client.get_resume(self.agent_id)

add_example async #

add_example(example)

Append a ResumeExample to this agent's resume.

Fetches the latest resume from the hub first so concurrent set_resume / record_observation updates don't get clobbered.

Source code in autogen/beta/network/client/agent_client.py
async def add_example(self, example: ResumeExample) -> None:
    """Append a ``ResumeExample`` to this agent's resume.

    Fetches the latest resume from the hub first so concurrent
    ``set_resume`` / ``record_observation`` updates don't get
    clobbered.
    """
    current = await self._hub_client.get_resume(self.agent_id)
    current.examples.append(example)
    await self.set_resume(current)

set_skill async #

set_skill(skill_md)
Source code in autogen/beta/network/client/agent_client.py
async def set_skill(self, skill_md: str | None) -> None:
    await self._hub_client.set_skill(self.agent_id, skill_md)

set_rule async #

set_rule(rule)
Source code in autogen/beta/network/client/agent_client.py
async def set_rule(self, rule: Rule) -> None:
    await self._hub_client.set_rule(self.agent_id, rule)
    self._rule = rule

unregister async #

unregister()
Source code in autogen/beta/network/client/agent_client.py
async def unregister(self) -> None:
    if not self._disconnected:
        await self._hub_client.unregister_agent(self.agent_id)
        self._disconnected = True