Skip to content

Agent Clients

The agent-side of the network. Registering an Agent produces an AgentClient — one per registered agent — carrying a notify handler that decides what the agent does when an envelope arrives.

In-process, the quickest path is hub.register(agent): the hub owns a connection for you and hands back the AgentClient.

agent_client = await hub.register(agent)

Under the hood that connection is a HubClient — one process's link to the hub, which produces AgentClients. You construct one explicitly when you need a custom transport (distributed deployments over WsLink) or want several agents to share one connection.

Tip

For a non-LLM participant — e.g. a human in the loop — register a HumanClient instead via hc.register_human(...); see HumanClient (HITL).

HubClient — one per process#

1
2
3
4
from ag2.network import HubClient, LocalLink

link = LocalLink(hub)            # transport factory bound to this hub
hc = HubClient(link, hub=hub)

A HubClient is the per-process registration broker. hub.register(agent) constructs one of these on a LocalLink for you; construct it explicitly for a custom transport (WsLink) or to register several agents on one connection.

Method Notes
await hc.register(agent, passport, resume, ...) Registers an Agent with the hub. Returns an AgentClient.
await hc.unregister(agent_id) Tears down the registration; emits AUDIT_KIND_AGENT_UNREGISTERED.
await hc.close() Closes this process's link to the hub.
hc.read_wal(channel_id) Read the WAL for any channel this process can see.
hc.can_send(channel_id, agent_id) Probe — would the adapter accept a send from this agent right now?
hc.default_view_policy(channel_id, agent_id) The view policy this participant should use when projecting history.

The probe and view-policy methods exist so custom handlers (next section) don't have to reach into hub internals.

Registering an Agent#

1
2
3
4
5
6
7
8
agent_client = await hub.register(
    agent,                     # ag2.Agent
    passport,                  # ag2.network.Passport
    resume,                    # ag2.network.Resume
    skill_md=None,             # optional — markdown describing this agent's skill
    rule=None,                 # optional — Rule(...) for governance
    attach_plugin=True,        # whether to install the default notify handler
)

hub.register(...) and the explicit hc.register(...) take the identical signature and both return an AgentClient — the former owns the connection, the latter reuses the HubClient you built.

attach_plugin=True (the default) installs the default_handler on this client. Pass False if you want full control over inbound envelope handling — typical for headless workers, gateways, or custom orchestration logic.

AgentClient#

Attribute / Method Notes
agent_client.agent_id Hub-stamped id; use for routing.
agent_client.agent The wrapped Agent.
agent_client.passport / .resume Snapshot at registration time.
await agent_client.open(type=..., target=..., knobs=...) Open a new channel.
await agent_client.send_envelope(envelope) Direct envelope send (for custom event types).
await agent_client.wait_for_channel_event(channel_id, predicate, timeout=...) Block until a matching envelope lands in this client's inbox.
agent_client.on_envelope(callback) Replace the handler.

channel = await agent_client.open(...) returns a Channel handle scoped to one channel id. Use channel.send(text, audience=...) for ordinary text sends, channel.close() for explicit termination, channel.info() for the latest metadata.

The Default Handler#

When you register with attach_plugin=True, the client installs default_handler. It routes inbound envelopes:

Inbound event Behaviour
EV_CHANNEL_INVITE Auto-ack with EV_CHANNEL_INVITE_ACK.
EV_TEXT / EV_HANDOFF If hc.can_send(...) says it's our turn: read the WAL up to this envelope, project it through this participant's view policy, pre-populate a fresh MemoryStream, attach a TaskMirror, run agent.ask(text, stream=stream, dependencies=...), and send any non-empty reply via channel.send(...).
EV_CHANNEL_* (other) No-op — bookkeeping is reflected in the next channel.info().
ag2.task.* No-op at the handler level — TaskMirror handles these separately when attached.

The handler wraps the entire turn path (WAL slice, view projection, extract_turn_input, agent.ask, round-envelope build, outbound send) in a single trap: a crash is routed through HubClient.report_turn_failureHub.report_turn_failure, which fans on_turn_failed out to every HubListener (the built-in AuditLog records AUDIT_KIND_TURN_FAILED). No reply is posted, but the channel stays active and the next envelope flows normally — see Turn-failure resilience.

The handler is decomposed into public hooks so you can override only the parts you care about:

1
2
3
4
5
from ag2.network import (
    read_wal_until,
    resolve_view_policy,
    stamp_dependencies,
)
Hook Purpose
read_wal_until(client, envelope) Slice the WAL up to but excluding the given envelope.
resolve_view_policy(client, metadata) The ViewPolicy this participant should use.
stamp_dependencies(client, channel) The context.dependencies dict the LLM turn will see (CHANNEL_DEP, AGENT_CLIENT_DEP, HUB_DEP).

Custom Handlers#

from ag2.network import Envelope, EV_TEXT

async def gateway_handler(envelope: Envelope) -> None:
    if envelope.event_type != EV_TEXT:
        return
    # forward to your own external system instead of running an LLM
    text = envelope.event_data.get("text", "")
    await my_external_queue.put({"from": envelope.sender_id, "text": text})

agent_client.on_envelope(gateway_handler)

Common patterns:

  • Headless worker — register with attach_plugin=False and install a handler that pulls work directly off the hub without running an LLM.
  • Selective override — install a handler that handles only one event type (e.g. custom invite policy) and falls back to default_handler for everything else.
  • Filtered forwarding — wrap default_handler with pre/post hooks for logging, rate limiting, or routing.
1
2
3
4
from ag2.network import LocalLink, LinkClient, LinkEndpoint

link = LocalLink(hub)
client_link = link.client()  # produces a fresh LocalLinkClient bound to a fresh LocalLinkEndpoint

LocalLink is the in-process transport. Each HubClient(link, hub=hub) lazily creates one LocalLinkClient/LocalLinkEndpoint pair on first use:

  • LocalLinkClient — the agent-process side; sends frames toward the hub, receives notify frames from it.
  • LocalLinkEndpoint — the hub-process side; the inverse.

Both sides exchange Frame records via async queues. Frame types are re-exported at the package level: HelloFrame, WelcomeFrame, SendFrame, ReceiptFrame, NotifyFrame, AcceptFrame, ErrorFrame, PingFrame, PongFrame, SubscribeFrame, UnsubscribeFrame.

The transport layer is a Protocol:

class LinkClient(Protocol):
    async def open(self) -> None: ...
    async def send_frame(self, frame: Frame) -> None: ...
    def frames(self) -> AsyncIterator[Frame]: ...
    async def close(self) -> None: ...

Cross-process or cross-host transports plug in here. LocalLink is the only built-in, but the abstraction allows for future Redis/WebSocket/gRPC implementations without changing any client code.

Inbox & Backpressure#

Every AgentClient has an inbox bounded by its Rule.inbox.max_pending (default unbounded). When the inbox fills, sends to that agent fail with InboxFull. The wait_for_channel_event and the default handler drain the inbox in order; custom handlers should do the same — never block forever in a callback.

Rule.inbox.high_water is a soft threshold below the hard cap: when a dispatch first pushes a recipient over it, the hub fires on_inbox_pressure(agent_id, pending, cap) on every listener (and on a Hub subclass override) — once per crossing, not on every subsequent envelope. It defaults to None, which resolves to 80% of max_pending; set 0 to disable the signal. See HubListener.

Closing Down#

await hub.close()

For agents registered via hub.register(...), hub.close() is enough — it tears down every connection the hub created. Close an individual agent early with await agent_client.close() (unregisters it and closes its connection). In the explicit flow, await hc.close() cancels the HubClient's link and unsubscribes its clients.