Agent Clients
The agent-side of the network. A HubClient represents one process's connection to the hub; it produces AgentClients — one per registered Agent. Each AgentClient carries a notify handler that decides what the agent does when an envelope arrives.
Tip
For a non-LLM participant — e.g. a human in the loop — register a HumanClient instead via hc.register_human(...); see HumanClient (HITL).
HubClient — one per process#
A HubClient is the per-process registration broker. In a single-process script you may construct one per agent (each gets its own duplex link); in a real deployment you'd typically have one HubClient per process and register all of that process's agents on it.
| Method | Notes |
|---|---|
await hc.register(agent, passport, resume, ...) | Registers an Agent with the hub. Returns an AgentClient. |
await hc.unregister(agent_id) | Tears down the registration; emits AUDIT_KIND_AGENT_UNREGISTERED. |
await hc.close() | Closes this process's link to the hub. |
hc.read_wal(channel_id) | Read the WAL for any channel this process can see. |
hc.can_send(channel_id, agent_id) | Probe — would the adapter accept a send from this agent right now? |
hc.default_view_policy(channel_id, agent_id) | The view policy this participant should use when projecting history. |
The probe and view-policy methods exist so custom handlers (next section) don't have to reach into hub internals.
Registering an Agent#
attach_plugin=True (the default) installs the default_handler on this client. Pass False if you want full control over inbound envelope handling — typical for headless workers, gateways, or custom orchestration logic.
AgentClient#
| Attribute / Method | Notes |
|---|---|
agent_client.agent_id | Hub-stamped id; use for routing. |
agent_client.agent | The wrapped Agent. |
agent_client.passport / .resume | Snapshot at registration time. |
await agent_client.open(type=..., target=..., knobs=...) | Open a new channel. |
await agent_client.send_envelope(envelope) | Direct envelope send (for custom event types). |
await agent_client.wait_for_channel_event(channel_id, predicate, timeout=...) | Block until a matching envelope lands in this client's inbox. |
agent_client.on_envelope(callback) | Replace the handler. |
channel = await agent_client.open(...) returns a Channel handle scoped to one channel id. Use channel.send(text, audience=...) for ordinary text sends, channel.close() for explicit termination, channel.info() for the latest metadata.
The Default Handler#
When you register with attach_plugin=True, the client installs default_handler. It routes inbound envelopes:
| Inbound event | Behaviour |
|---|---|
EV_CHANNEL_INVITE | Auto-ack with EV_CHANNEL_INVITE_ACK. |
EV_TEXT / EV_HANDOFF | If hc.can_send(...) says it's our turn: read the WAL up to this envelope, project it through this participant's view policy, pre-populate a fresh MemoryStream, attach a TaskMirror, run agent.ask(text, stream=stream, dependencies=...), and send any non-empty reply via channel.send(...). |
EV_CHANNEL_* (other) | No-op — bookkeeping is reflected in the next channel.info(). |
ag2.task.* | No-op at the handler level — TaskMirror handles these separately when attached. |
The handler wraps the entire turn path (WAL slice, view projection, extract_turn_input, agent.ask, round-envelope build, outbound send) in a single trap: a crash is routed through HubClient.report_turn_failure → Hub.report_turn_failure, which fans on_turn_failed out to every HubListener (the built-in AuditLog records AUDIT_KIND_TURN_FAILED). No reply is posted, but the channel stays active and the next envelope flows normally — see Turn-failure resilience.
The handler is decomposed into public hooks so you can override only the parts you care about:
| Hook | Purpose |
|---|---|
read_wal_until(client, envelope) | Slice the WAL up to but excluding the given envelope. |
resolve_view_policy(client, metadata) | The ViewPolicy this participant should use. |
stamp_dependencies(client, channel) | The context.dependencies dict the LLM turn will see (CHANNEL_DEP, AGENT_CLIENT_DEP, HUB_DEP). |
Custom Handlers#
Common patterns:
- Headless worker — register with
attach_plugin=Falseand install a handler that pulls work directly off the hub without running an LLM. - Selective override — install a handler that handles only one event type (e.g. custom invite policy) and falls back to
default_handlerfor everything else. - Filtered forwarding — wrap
default_handlerwith pre/post hooks for logging, rate limiting, or routing.
Transport — LocalLink#
LocalLink is the in-process transport. Each HubClient(link, hub=hub) lazily creates one LocalLinkClient/LocalLinkEndpoint pair on first use:
LocalLinkClient— the agent-process side; sends frames toward the hub, receives notify frames from it.LocalLinkEndpoint— the hub-process side; the inverse.
Both sides exchange Frame records via async queues. Frame types are re-exported at the package level: HelloFrame, WelcomeFrame, SendFrame, ReceiptFrame, NotifyFrame, AcceptFrame, ErrorFrame, PingFrame, PongFrame, SubscribeFrame, UnsubscribeFrame.
The transport layer is a Protocol:
class LinkClient(Protocol):
async def open(self) -> None: ...
async def send_frame(self, frame: Frame) -> None: ...
def frames(self) -> AsyncIterator[Frame]: ...
async def close(self) -> None: ...
Cross-process or cross-host transports plug in here. LocalLink is the only built-in, but the abstraction allows for future Redis/WebSocket/gRPC implementations without changing any client code.
Inbox & Backpressure#
Every AgentClient has an inbox bounded by its Rule.inbox.max_pending (default unbounded). When the inbox fills, sends to that agent fail with InboxFull. The wait_for_channel_event and the default handler drain the inbox in order; custom handlers should do the same — never block forever in a callback.
Rule.inbox.high_water is a soft threshold below the hard cap: when a dispatch first pushes a recipient over it, the hub fires on_inbox_pressure(agent_id, pending, cap) on every listener (and on a Hub subclass override) — once per crossing, not on every subsequent envelope. It defaults to None, which resolves to 80% of max_pending; set 0 to disable the signal. See HubListener.
Closing Down#
HubClient.close() cancels the link's listening task and unsubscribes all clients. Always pair with the matching register calls; otherwise the hub keeps the registration in its registry.