Skip to content

Human Clients (HITL)

A HumanClient is a non-LLM participant on the network — the human-in-the-loop primitive. It is a client in the network, just like LLM agents, so the hub routes envelopes to it exactly the same way.

Your application supplies the UI; the framework supplies the participant.

Use it whenever a person (one or many) needs to join a channel — answering a consulting request, taking a turn in a discussion, seeding a workflow, or just chatting in a conversation.

Registering#

from autogen.beta.network import HubClient, LocalLink, Passport

hc = HubClient(LocalLink(hub), hub=hub)

human = await hc.register_human(
    Passport(name="operator"),
    resume=None,            # optional — Resume() defaults
    rule=None,              # optional — Rule(...) for governance, same as agents
    auto_ack_invites=True,  # auto-accept channel invites (see below)
)

register_human runs the same UUID-stamping and persistence path as hc.register(...), then forces passport.kind = "human" so the participant is discoverable as a human:

await hub.list_agents(kind="human")   # -> [Passport(name="operator", kind="human", ...)]
await hub.list_agents(kind="agent")   # agents only (also matches kind=None)

hc.register(...) rejects Passport(kind="human"), ensure you use register_human for HumanClients.

Receiving — push or pull#

A HumanClient exposes inbound envelopes two ways. Both see every inbound envelope; use whichever fits your UI (or both at once).

Push — on_envelope#

Register a coroutine; it fires once per inbound envelope. Multiple callbacks compose in registration order. A callback that raises exceptions is logged and never propagates to the hub's dispatch path — a buggy UI cannot break the network.

1
2
3
4
5
async def on_inbound(envelope) -> None:
    await ui.push_event(envelope)   # forward to a websocket, queue, etc.

human.on_envelope(on_inbound)
human.remove_envelope_callback(on_inbound)   # detach later

Pull — next_envelope / envelopes#

Block until the next matching envelope arrives, or iterate the inbound stream:

from autogen.beta.network import EV_TEXT

# Wait for the next text reply from a specific peer.
reply = await human.next_envelope(
    predicate=lambda e: e.event_type == EV_TEXT and e.sender_id == peer_id,
    timeout=60.0,   # raises asyncio.TimeoutError if exceeded
)

# Or stream everything until disconnect.
async for envelope in human.envelopes():
    ...

# Channel-scoped wait (symmetric with AgentClient.wait_for_channel_event):
env = await human.wait_for_channel_event(
    channel_id=channel.channel_id,
    predicate=lambda e: e.event_type == EV_TEXT,
    timeout=300.0,
)

Envelopes that don't match a next_envelope predicate are discarded — use on_envelope if you want to observe everything and await something specific.

Sending#

Outbound mirrors AgentClient. human.open(...) returns the same Channel handle, so multi-turn channel code is identical whether the initiator is a human or an agent.

from autogen.beta.network import CONVERSATION_TYPE

channel = await human.open(type=CONVERSATION_TYPE, target=expert.agent_id)
await channel.send("Hi — what's a good first ML concept to learn?")
await channel.close(reason="done")

# Convenience for an existing channel id:
await human.send(channel_id, "another message")

# Escape hatch for adapter-shaped envelopes (e.g. a workflow EV_PACKET seed):
await human.post_envelope(envelope)
Call Notes
await human.open(type=, target=, ttl=, knobs=, intent=, labels=) Open a channel as the initiator → Channel. target accepts peer names or agent ids.
await human.send(channel_id, text, audience=, causation_id=) Post an EV_TEXT envelope.
await human.post_envelope(envelope) Post an arbitrary envelope (stamps sender_id if blank).
await human.close_channel(channel_id, reason=) Close a channel this human is in.
await human.disconnect() Stop accepting deliveries; wakes any blocked next_envelope / envelopes consumers. Idempotent — call it in your shutdown path.

Channel invites — auto_ack_invites#

When an agent opens a channel to a human, the hub waits for the human's EV_CHANNEL_INVITE_ACK before the channel reaches ACTIVE. With auto_ack_invites=True (the default) the HumanClient acks automatically the moment the invite arrives — the channel handshake completes with no UI round-trip, exactly like the default agent handler.

Pass auto_ack_invites=False if you want a human to decide whether to join (an "accept invite?" prompt). If so, your UI is responsible for emitting the ack:

from autogen.beta.network import EV_CHANNEL_INVITE, EV_CHANNEL_INVITE_ACK, Envelope

human = await hc.register_human(Passport(name="operator"), auto_ack_invites=False)

async def gate_invites(envelope) -> None:
    if envelope.event_type != EV_CHANNEL_INVITE:
        return
    if await ui.confirm(f"Join channel {envelope.channel_id}?"):
        await human.post_envelope(Envelope(
            channel_id=envelope.channel_id,
            sender_id=human.agent_id,
            event_type=EV_CHANNEL_INVITE_ACK,
            event_data={"channel_id": envelope.channel_id},
            causation_id=envelope.envelope_id,
        ))
    # otherwise let the hub's invite-ack timeout close the channel

human.on_envelope(gate_invites)

Hooking up a UI#

The framework deliberately doesn't pick an input modality — you bridge the HumanClient to whatever UI you have. Two common shapes:

A web app / websocket bridge (push out, RPC in)#

Forward inbound envelopes to the client over a websocket; turn UI messages into sends.

async def serve(websocket, human):
    # outbound: hub -> browser
    async def to_browser(envelope) -> None:
        await websocket.send_json({
            "channel": envelope.channel_id,
            "from": envelope.sender_id,
            "type": envelope.event_type,
            "data": envelope.event_data,
        })
    human.on_envelope(to_browser)

    # inbound: browser -> hub
    try:
        async for msg in websocket:
            payload = msg.json()
            if payload["action"] == "send":
                await human.send(payload["channel"], payload["text"])
            elif payload["action"] == "open":
                await human.open(type=payload["type"], target=payload["target"])
            elif payload["action"] == "close":
                await human.close_channel(payload["channel"], reason="user_closed")
    finally:
        human.remove_envelope_callback(to_browser)
        await human.disconnect()

A console / CLI loop (pull)#

input() is blocking — run it off the event loop with asyncio.to_thread so the network stays responsive while the user types.

import asyncio
from autogen.beta.network import CONVERSATION_TYPE, EV_TEXT

channel = await human.open(type=CONVERSATION_TYPE, target=expert.agent_id)

while True:
    text = (await asyncio.to_thread(input, "you> ")).strip()
    if not text or text.lower() in {"quit", "exit"}:
        break
    await channel.send(text)
    try:
        reply = await human.next_envelope(
            predicate=lambda e: e.event_type == EV_TEXT and e.sender_id == expert.agent_id,
            timeout=60.0,
        )
        print(f"expert> {reply.event_data['text']}")
    except asyncio.TimeoutError:
        print("expert> (no reply within 60s)")

await channel.close(reason="operator_done")
await human.disconnect()

Drain rate is yours to manage

The pull queue is unbounded by design — the embedder controls how fast it's drained via the UI. If it grows pathologically, the application has a UI bug to fix. Always call human.disconnect() on shutdown so blocked consumers wake up instead of hanging.

See also#