Agent Clients
The agent-side of the network. A HubClient represents one process's connection to the hub; it produces AgentClients — one per registered Agent. Each AgentClient carries a notify handler that decides what the agent does when an envelope arrives.
HubClient — one per process#
A HubClient is the per-process registration broker. In a single-process script you may construct one per agent (each gets its own duplex link); in a real deployment you'd typically have one HubClient per process and register all of that process's agents on it.
| Method | Notes |
|---|---|
await hc.register(agent, passport, resume, ...) | Registers an Agent with the hub. Returns an AgentClient. |
await hc.unregister(agent_id) | Tears down the registration; emits AUDIT_KIND_AGENT_UNREGISTERED. |
await hc.close() | Closes this process's link to the hub. |
hc.read_wal(session_id) | Read the WAL for any session this process can see. |
hc.can_send(session_id, agent_id) | Probe — would the adapter accept a send from this agent right now? |
hc.default_view_policy(session_id, agent_id) | The view policy this participant should use when projecting history. |
The probe and view-policy methods exist so custom handlers (next section) don't have to reach into hub internals.
Registering an Agent#
attach_plugin=True (the default) installs the default_handler on this client. Pass False if you want full control over inbound envelope handling — typical for headless workers, gateways, or custom orchestration logic.
AgentClient#
| Attribute / Method | Notes |
|---|---|
agent_client.agent_id | Hub-stamped id; use for routing. |
agent_client.agent | The wrapped Agent. |
agent_client.passport / .resume | Snapshot at registration time. |
await agent_client.open(type=..., target=..., knobs=...) | Open a new session. |
await agent_client.send_envelope(envelope) | Direct envelope send (for custom event types). |
await agent_client.wait_for_session_event(session_id, predicate, timeout=...) | Block until a matching envelope lands in this client's inbox. |
agent_client.on_envelope(callback) | Replace the handler. |
session = await agent_client.open(...) returns a Session handle scoped to one session id. Use session.send(text, audience=...) for ordinary text sends, session.close() for explicit termination, session.info() for the latest metadata.
The Default Handler#
When you register with attach_plugin=True, the client installs default_handler. It routes inbound envelopes:
| Inbound event | Behaviour |
|---|---|
EV_SESSION_INVITE | Auto-ack with EV_SESSION_INVITE_ACK. |
EV_TEXT / EV_HANDOFF | If hc.can_send(...) says it's our turn: read the WAL up to this envelope, project it through this participant's view policy, pre-populate a fresh MemoryStream, attach a TaskMirror, run agent.ask(text, stream=stream, dependencies=...), and send any non-empty reply via session.send(...). |
EV_SESSION_* (other) | No-op — bookkeeping is reflected in the next session.info(). |
ag2.task.* | No-op at the handler level — TaskMirror handles these separately when attached. |
The handler is decomposed into public hooks so you can override only the parts you care about:
| Hook | Purpose |
|---|---|
read_wal_until(client, envelope) | Slice the WAL up to but excluding the given envelope. |
resolve_view_policy(client, metadata) | The ViewPolicy this participant should use. |
stamp_dependencies(client, session) | The context.dependencies dict the LLM turn will see (SESSION_DEP, AGENT_CLIENT_DEP, HUB_DEP). |
Custom Handlers#
Common patterns:
- Headless worker — register with
attach_plugin=Falseand install a handler that pulls work directly off the hub without running an LLM. - Selective override — install a handler that handles only one event type (e.g. custom invite policy) and falls back to
default_handlerfor everything else. - Filtered forwarding — wrap
default_handlerwith pre/post hooks for logging, rate limiting, or routing.
Transport — LocalLink#
LocalLink is the in-process transport. Each HubClient(link, hub=hub) lazily creates one LocalLinkClient/LocalLinkEndpoint pair on first use:
LocalLinkClient— the agent-process side; sends frames toward the hub, receives notify frames from it.LocalLinkEndpoint— the hub-process side; the inverse.
Both sides exchange Frame records via async queues. Frame types are re-exported at the package level: HelloFrame, WelcomeFrame, SendFrame, ReceiptFrame, NotifyFrame, AcceptFrame, ErrorFrame, PingFrame, PongFrame, SubscribeFrame, UnsubscribeFrame.
The transport layer is a Protocol:
class LinkClient(Protocol):
async def open(self) -> None: ...
async def send_frame(self, frame: Frame) -> None: ...
def frames(self) -> AsyncIterator[Frame]: ...
async def close(self) -> None: ...
Cross-process or cross-host transports plug in here. LocalLink is the only built-in, but the abstraction allows for future Redis/WebSocket/gRPC implementations without changing any client code.
Inbox & Backpressure#
Every AgentClient has an inbox bounded by its Rule.inbox.max_pending (default unbounded). When the inbox fills, sends to that agent fail with InboxFull. The wait_for_session_event and the default handler drain the inbox in order; custom handlers should do the same — never block forever in a callback.
Closing Down#
HubClient.close() cancels the link's listening task and unsubscribes all clients. Always pair with the matching register calls; otherwise the hub keeps the registration in its registry.