Skip to content

Agent Clients

The agent-side of the network. A HubClient represents one process's connection to the hub; it produces AgentClients — one per registered Agent. Each AgentClient carries a notify handler that decides what the agent does when an envelope arrives.

HubClient — one per process#

1
2
3
4
from autogen.beta.network import HubClient, LocalLink

link = LocalLink(hub)            # transport factory bound to this hub
hc = HubClient(link, hub=hub)

A HubClient is the per-process registration broker. In a single-process script you may construct one per agent (each gets its own duplex link); in a real deployment you'd typically have one HubClient per process and register all of that process's agents on it.

Method Notes
await hc.register(agent, passport, resume, ...) Registers an Agent with the hub. Returns an AgentClient.
await hc.unregister(agent_id) Tears down the registration; emits AUDIT_KIND_AGENT_UNREGISTERED.
await hc.close() Closes this process's link to the hub.
hc.read_wal(session_id) Read the WAL for any session this process can see.
hc.can_send(session_id, agent_id) Probe — would the adapter accept a send from this agent right now?
hc.default_view_policy(session_id, agent_id) The view policy this participant should use when projecting history.

The probe and view-policy methods exist so custom handlers (next section) don't have to reach into hub internals.

Registering an Agent#

1
2
3
4
5
6
7
8
agent_client = await hc.register(
    agent,                     # autogen.beta.Agent
    passport,                  # autogen.beta.network.Passport
    resume,                    # autogen.beta.network.Resume
    skill_md=None,             # optional — markdown describing this agent's skill
    rule=None,                 # optional — Rule(...) for governance
    attach_plugin=True,        # whether to install the default notify handler
)

attach_plugin=True (the default) installs the default_handler on this client. Pass False if you want full control over inbound envelope handling — typical for headless workers, gateways, or custom orchestration logic.

AgentClient#

Attribute / Method Notes
agent_client.agent_id Hub-stamped id; use for routing.
agent_client.agent The wrapped Agent.
agent_client.passport / .resume Snapshot at registration time.
await agent_client.open(type=..., target=..., knobs=...) Open a new session.
await agent_client.send_envelope(envelope) Direct envelope send (for custom event types).
await agent_client.wait_for_session_event(session_id, predicate, timeout=...) Block until a matching envelope lands in this client's inbox.
agent_client.on_envelope(callback) Replace the handler.

session = await agent_client.open(...) returns a Session handle scoped to one session id. Use session.send(text, audience=...) for ordinary text sends, session.close() for explicit termination, session.info() for the latest metadata.

The Default Handler#

When you register with attach_plugin=True, the client installs default_handler. It routes inbound envelopes:

Inbound event Behaviour
EV_SESSION_INVITE Auto-ack with EV_SESSION_INVITE_ACK.
EV_TEXT / EV_HANDOFF If hc.can_send(...) says it's our turn: read the WAL up to this envelope, project it through this participant's view policy, pre-populate a fresh MemoryStream, attach a TaskMirror, run agent.ask(text, stream=stream, dependencies=...), and send any non-empty reply via session.send(...).
EV_SESSION_* (other) No-op — bookkeeping is reflected in the next session.info().
ag2.task.* No-op at the handler level — TaskMirror handles these separately when attached.

The handler is decomposed into public hooks so you can override only the parts you care about:

1
2
3
4
5
from autogen.beta.network import (
    read_wal_until,
    resolve_view_policy,
    stamp_dependencies,
)
Hook Purpose
read_wal_until(client, envelope) Slice the WAL up to but excluding the given envelope.
resolve_view_policy(client, metadata) The ViewPolicy this participant should use.
stamp_dependencies(client, session) The context.dependencies dict the LLM turn will see (SESSION_DEP, AGENT_CLIENT_DEP, HUB_DEP).

Custom Handlers#

from autogen.beta.network import Envelope, EV_TEXT

async def gateway_handler(envelope: Envelope) -> None:
    if envelope.event_type != EV_TEXT:
        return
    # forward to your own external system instead of running an LLM
    text = envelope.event_data.get("text", "")
    await my_external_queue.put({"from": envelope.sender_id, "text": text})

agent_client.on_envelope(gateway_handler)

Common patterns:

  • Headless worker — register with attach_plugin=False and install a handler that pulls work directly off the hub without running an LLM.
  • Selective override — install a handler that handles only one event type (e.g. custom invite policy) and falls back to default_handler for everything else.
  • Filtered forwarding — wrap default_handler with pre/post hooks for logging, rate limiting, or routing.
1
2
3
4
from autogen.beta.network import LocalLink, LinkClient, LinkEndpoint

link = LocalLink(hub)
client_link = link.client()  # produces a fresh LocalLinkClient bound to a fresh LocalLinkEndpoint

LocalLink is the in-process transport. Each HubClient(link, hub=hub) lazily creates one LocalLinkClient/LocalLinkEndpoint pair on first use:

  • LocalLinkClient — the agent-process side; sends frames toward the hub, receives notify frames from it.
  • LocalLinkEndpoint — the hub-process side; the inverse.

Both sides exchange Frame records via async queues. Frame types are re-exported at the package level: HelloFrame, WelcomeFrame, SendFrame, ReceiptFrame, NotifyFrame, AcceptFrame, ErrorFrame, PingFrame, PongFrame, SubscribeFrame, UnsubscribeFrame.

The transport layer is a Protocol:

class LinkClient(Protocol):
    async def open(self) -> None: ...
    async def send_frame(self, frame: Frame) -> None: ...
    def frames(self) -> AsyncIterator[Frame]: ...
    async def close(self) -> None: ...

Cross-process or cross-host transports plug in here. LocalLink is the only built-in, but the abstraction allows for future Redis/WebSocket/gRPC implementations without changing any client code.

Inbox & Backpressure#

Every AgentClient has an inbox bounded by its Rule.inbox.max_pending (default unbounded). When the inbox fills, sends to that agent fail with InboxFull. The wait_for_session_event and the default handler drain the inbox in order; custom handlers should do the same — never block forever in a callback.

Closing Down#

1
2
3
await alice_hc.close()
await bob_hc.close()
await hub.close()

HubClient.close() cancels the link's listening task and unsubscribes all clients. Always pair with the matching register calls; otherwise the hub keeps the registration in its registry.