Skip to content

AgentClient

autogen.beta.network.client.agent_client.AgentClient #

AgentClient(*, agent, passport, resume, rule, hub_client, attach_default_handler=True)

Tenant-side handle for one (Agent, identity, hub) registration.

Source code in autogen/beta/network/client/agent_client.py
def __init__(
    self,
    *,
    agent: Agent,
    passport: Passport,
    resume: Resume,
    rule: Rule,
    hub_client: "HubClient",
    attach_default_handler: bool = True,
) -> None:
    # __init__ stores params; no side effects. The hub is reached
    # only through ``hub_client`` — the single seam that is direct
    # in-process and RPC-backed cross-process.
    self._agent = agent
    self._passport = passport
    self._resume = resume
    self._rule = rule
    self._hub_client = hub_client
    self._on_envelope: EnvelopeHandler | None = self._run_default_handler if attach_default_handler else None
    self._disconnected = False

    # Per-channel inbox queues for ``wait_for_channel_event``
    # (used by the ``delegate`` tool to await consulting replies).
    self._channel_inboxes: dict[str, asyncio.Queue[Envelope]] = {}

    # Channels where the default notify handler should NOT run —
    # used by ``delegate`` while it owns the channel lifecycle.
    self._handler_suppressed_channels: set[str] = set()

    # Stack of envelopes currently being handled. The top of the
    # stack is the envelope this agent is processing right now;
    # ``delegate`` reads its ``depth`` to stamp the outgoing prompt
    # for delegation-depth enforcement (Rule.limits.delegation_depth).
    self._handling_envelope_stack: list[Envelope] = []

agent property #

agent

passport property #

passport

resume property #

resume

rule property #

rule

agent_id property #

agent_id

current_handling_depth property #

current_handling_depth

Depth of the envelope this agent is currently handling.

Returns 0 when no handler is on the stack (i.e. the agent initiated the call from outside any inbound delivery). Used by delegate to stamp Envelope.depth = current + 1.

receive async #

receive(envelope)

Hub delivery → fan out to inbox + (suppressible) handler.

Source code in autogen/beta/network/client/agent_client.py
async def receive(self, envelope: Envelope) -> None:
    """Hub delivery → fan out to inbox + (suppressible) handler."""
    inbox = self.ensure_channel_inbox(envelope.channel_id)
    await inbox.put(envelope)
    if envelope.channel_id in self._handler_suppressed_channels:
        return
    if self._on_envelope is not None:
        await self._on_envelope(envelope)

on_envelope #

on_envelope(callback)

Override the default notify handler with a custom callback.

Calling with the default handler restores it: pass self._run_default_handler (or simply construct without attach_default_handler=False).

Source code in autogen/beta/network/client/agent_client.py
def on_envelope(self, callback: EnvelopeHandler) -> None:
    """Override the default notify handler with a custom callback.

    Calling with the default handler restores it: pass
    ``self._run_default_handler`` (or simply construct without
    ``attach_default_handler=False``).
    """
    self._on_envelope = callback

disconnect async #

disconnect()
Source code in autogen/beta/network/client/agent_client.py
async def disconnect(self) -> None:
    self._disconnected = True
    self._on_envelope = None

resume_pending_turns async #

resume_pending_turns()

Re-fire the notify handler against every turn the protocol currently expects from this agent.

Asks the hub for :class:PendingTurn entries (:meth:Hub.pending_turns_for), fetches each turn's triggering envelope from the WAL, and feeds it back through :meth:receive. Returns the number of turns re-fired. Entries without a triggering envelope are skipped — e.g. a freshly opened channel where the creator has nothing to react to yet.

Idempotent under at-least-once delivery: if a prior reply already landed, the agent's handler can short-circuit via :meth:Hub.find_envelope_by_causation so the same logical turn is not posted twice.

Source code in autogen/beta/network/client/agent_client.py
async def resume_pending_turns(self) -> int:
    """Re-fire the notify handler against every turn the protocol
    currently expects from this agent.

    Asks the hub for :class:`PendingTurn` entries
    (:meth:`Hub.pending_turns_for`), fetches each turn's triggering
    envelope from the WAL, and feeds it back through
    :meth:`receive`. Returns the number of turns re-fired. Entries
    without a triggering envelope are skipped — e.g. a freshly
    opened channel where the creator has nothing to react to yet.

    Idempotent under at-least-once delivery: if a prior reply
    already landed, the agent's handler can short-circuit via
    :meth:`Hub.find_envelope_by_causation` so the same logical
    turn is not posted twice.
    """
    pending = await self._hub_client.pending_turns_for(self.agent_id)
    triggered = 0
    for turn in pending:
        if turn.triggering_envelope_id is None:
            continue
        wal = await self._hub_client.read_wal(turn.channel_id)
        envelope = next(
            (e for e in wal if e.envelope_id == turn.triggering_envelope_id),
            None,
        )
        if envelope is None:
            continue
        await self.receive(envelope)
        triggered += 1
    return triggered

open async #

open(*, type, target, ttl=None, knobs=None, intent=None, labels=None)

Open a channel via the hub and return its :class:Channel handle.

target accepts peer names or agent_ids; resolution goes through the bound :class:HubClient.

Source code in autogen/beta/network/client/agent_client.py
async def open(
    self,
    *,
    type: str,
    target: str | list[str],
    ttl: str | int | None = None,
    knobs: dict[str, object] | None = None,
    intent: str | None = None,
    labels: dict[str, str] | None = None,
) -> Channel:
    """Open a channel via the hub and return its :class:`Channel` handle.

    ``target`` accepts peer **names** or agent_ids; resolution goes
    through the bound :class:`HubClient`.
    """
    if self._disconnected:
        raise RuntimeError("AgentClient is disconnected")

    targets = [target] if isinstance(target, str) else list(target)
    target_ids: list[str] = []
    for t in targets:
        passport = await self._hub_client.get_agent(t)
        if passport.agent_id is None:
            raise RuntimeError(f"target {t!r} has no agent_id")
        target_ids.append(passport.agent_id)

    metadata = await self._hub_client.create_channel(
        creator_id=self.agent_id,
        manifest_type=type,
        participants=target_ids,
        ttl=ttl,
        knobs=knobs,
        intent=intent,
        labels=labels,
    )
    self.ensure_channel_inbox(metadata.channel_id)
    return Channel(metadata=metadata, client=self)

ensure_channel_inbox #

ensure_channel_inbox(channel_id)

Create (or fetch) the per-channel inbox queue.

Callers that send first and then wait_for_channel_event MUST call this BEFORE the send. Otherwise a fast reply (e.g. via LocalLink where dispatch happens on the same event-loop tick) can be delivered to receive before the wait creates the inbox — the envelope would then be dropped silently.

Idempotent: returns the existing queue if one is already bound.

Source code in autogen/beta/network/client/agent_client.py
def ensure_channel_inbox(self, channel_id: str) -> "asyncio.Queue[Envelope]":
    """Create (or fetch) the per-channel inbox queue.

    Callers that send first and then ``wait_for_channel_event`` MUST
    call this BEFORE the send. Otherwise a fast reply (e.g. via
    ``LocalLink`` where dispatch happens on the same event-loop tick)
    can be delivered to ``receive`` before the wait creates the
    inbox — the envelope would then be dropped silently.

    Idempotent: returns the existing queue if one is already bound.
    """
    inbox = self._channel_inboxes.get(channel_id)
    if inbox is None:
        inbox = asyncio.Queue()
        self._channel_inboxes[channel_id] = inbox
    return inbox

discard_channel_inbox #

discard_channel_inbox(channel_id)

Drop the per-channel inbox queue.

Callers should invoke this after they've finished waiting on a channel so the per-client memory footprint doesn't grow with every consulted channel.

Source code in autogen/beta/network/client/agent_client.py
def discard_channel_inbox(self, channel_id: str) -> None:
    """Drop the per-channel inbox queue.

    Callers should invoke this after they've finished waiting on a
    channel so the per-client memory footprint doesn't grow with
    every consulted channel.
    """
    self._channel_inboxes.pop(channel_id, None)

wait_for_channel_event async #

wait_for_channel_event(*, channel_id, predicate, timeout=300.0)

Block until an inbound envelope on channel_id matches.

Used by delegate to await the consulting respondent's reply. The inbox is created on demand and shared across waits; callers should not hold multiple concurrent waits on the same channel.

Raises asyncio.TimeoutError on timeout.

Source code in autogen/beta/network/client/agent_client.py
async def wait_for_channel_event(
    self,
    *,
    channel_id: str,
    predicate: EnvelopePredicate,
    timeout: float = 300.0,
) -> Envelope:
    """Block until an inbound envelope on ``channel_id`` matches.

    Used by ``delegate`` to await the consulting respondent's
    reply. The inbox is created on demand and shared across waits;
    callers should not hold multiple concurrent waits on the same
    channel.

    Raises ``asyncio.TimeoutError`` on timeout.
    """
    inbox = self.ensure_channel_inbox(channel_id)

    loop = asyncio.get_event_loop()
    deadline = loop.time() + timeout
    while True:
        remaining = deadline - loop.time()
        if remaining <= 0:
            raise asyncio.TimeoutError()
        envelope = await asyncio.wait_for(inbox.get(), timeout=remaining)
        if predicate(envelope):
            return envelope

send_envelope async #

send_envelope(envelope)

Post an envelope through the hub. Returns the stamped envelope_id.

Source code in autogen/beta/network/client/agent_client.py
async def send_envelope(self, envelope: Envelope) -> str:
    """Post an envelope through the hub. Returns the stamped envelope_id."""
    if self._disconnected:
        raise RuntimeError("AgentClient is disconnected")
    if envelope.sender_id == "":
        envelope.sender_id = self.agent_id
    return await self._hub_client.post_envelope(envelope)

set_resume async #

set_resume(resume)
Source code in autogen/beta/network/client/agent_client.py
async def set_resume(self, resume: Resume) -> None:
    await self._hub_client.set_resume(self.agent_id, resume)
    # Refresh local cache so subsequent reads see the bumped version.
    self._resume = await self._hub_client.get_resume(self.agent_id)

add_example async #

add_example(example)

Append a ResumeExample to this agent's resume.

Fetches the latest resume from the hub first so concurrent set_resume / record_observation updates don't get clobbered.

Source code in autogen/beta/network/client/agent_client.py
async def add_example(self, example: ResumeExample) -> None:
    """Append a ``ResumeExample`` to this agent's resume.

    Fetches the latest resume from the hub first so concurrent
    ``set_resume`` / ``record_observation`` updates don't get
    clobbered.
    """
    current = await self._hub_client.get_resume(self.agent_id)
    current.examples.append(example)
    await self.set_resume(current)

set_skill async #

set_skill(skill_md)
Source code in autogen/beta/network/client/agent_client.py
async def set_skill(self, skill_md: str | None) -> None:
    await self._hub_client.set_skill(self.agent_id, skill_md)

set_rule async #

set_rule(rule)
Source code in autogen/beta/network/client/agent_client.py
async def set_rule(self, rule: Rule) -> None:
    await self._hub_client.set_rule(self.agent_id, rule)
    self._rule = rule

unregister async #

unregister()
Source code in autogen/beta/network/client/agent_client.py
async def unregister(self) -> None:
    if not self._disconnected:
        await self._hub_client.unregister_agent(self.agent_id)
        self._disconnected = True