Skip to content

Quick Start

The smallest possible end-to-end network scenario: one in-process hub, two agents, a consulting session that auto-closes after a single Q-and-A.

import asyncio

from autogen.beta import Agent
from autogen.beta.config import AnthropicConfig
from autogen.beta.knowledge import MemoryKnowledgeStore
from autogen.beta.network import (
    EV_SESSION_CLOSED,
    EV_TEXT,
    Hub,
    HubClient,
    LocalLink,
    Passport,
    Resume,
)

async def main() -> None:
    config = AnthropicConfig(model="claude-sonnet-4-6")

    # Hub: registry + WAL + audit log + adapters live here.
    hub = await Hub.open(MemoryKnowledgeStore(), ttl_sweep_interval=0)
    link = LocalLink(hub)  # in-process duplex transport

    # Each agent gets its own HubClient (its own duplex pair to the hub).
    alice_hc = HubClient(link, hub=hub)
    bob_hc = HubClient(link, hub=hub)

    alice = await alice_hc.register(
        Agent("alice", prompt="Ask one focused question and stop.", config=config),
        Passport(name="alice"),
        Resume(),
    )
    bob = await bob_hc.register(
        Agent("bob", prompt="Answer in one short sentence.", config=config),
        Passport(name="bob"),
        Resume(),
    )

    # Strict 1Q1R; the adapter auto-closes on bob's reply.
    session = await alice.open(type="consulting", target="bob")
    await session.send(
        "What's the single most important property of a distributed system?",
        audience=[bob.agent_id],
    )

    # Bob's default handler runs Agent.ask on the inbound EV_TEXT, sends the
    # reply, and ConsultingAdapter posts EV_SESSION_CLOSED.
    close_env = await alice.wait_for_session_event(
        session_id=session.session_id,
        predicate=lambda e: e.event_type == EV_SESSION_CLOSED,
        timeout=60.0,
    )
    print(f"closed: {close_env.event_data.get('reason')!r}")

    # Replay the conversation from the hub's write-ahead log.
    wal = await hub.read_wal(session.session_id)
    for env in wal:
        if env.event_type == EV_TEXT:
            speaker = "alice" if env.sender_id == alice.agent_id else "bob"
            print(f"{speaker}: {env.event_data['text']}")

    await alice_hc.close()
    await bob_hc.close()
    await hub.close()

asyncio.run(main())

Expected output (Sonnet's exact words will differ on each run):

closed: 'consulting_complete'
alice: What's the single most important property of a distributed system?
bob: Fault tolerance — because a system that can't survive partial failures defeats its entire purpose.

What Just Happened#

In order:

  1. Hub.open(MemoryKnowledgeStore()) — boots an in-process hub. The KnowledgeStore is where the hub persists its audit log, registry, and write-ahead logs (here in memory).
  2. LocalLink(hub) — a transport factory. Each HubClient constructed against the same link gets its own duplex queue pair to the hub.
  3. HubClient(link, hub=hub) — one per process boundary. In a real deployment alice and bob would each live in their own process, each with one HubClient. Here they share a process for clarity.
  4. hc.register(agent, passport, resume) — registers an Agent with the hub. Returns an AgentClient whose agent_id is hub-stamped.
  5. alice.open(type="consulting", target="bob") — alice creates a consulting session with bob as respondent. Internally: hub posts EV_SESSION_INVITE to bob → bob's default handler auto-acks → hub posts EV_SESSION_OPENED and alice.open(...) returns with session.state == ACTIVE.
  6. session.send(text, audience=...) — alice sends an EV_TEXT envelope.
  7. Bob's default handler — receives the EV_TEXT, probes whether the adapter would accept a reply right now (it would — bob hasn't replied yet), runs Agent.ask(text), and sends bob's reply back through bob's own session handle.
  8. ConsultingAdapter — sees both initiator_sent and respondent_replied are true, returns AdapterResult(next_state=CLOSED, auto_close_reason="consulting_complete"). Hub posts EV_SESSION_CLOSED.
  9. alice.wait_for_session_event(...) — alice's loop wakes when she receives the close envelope.
  10. hub.read_wal(session_id) — replays every envelope the hub recorded for the session. Each envelope is hub-stamped (id, timestamp, sender, audience, event_type, event_data).

Mental Hooks#

  • The Hub is the only authoritative state. Every send goes through it; every observer reads from it. Clients are thin.
  • A session_id is the unit of conversation. The hub's WAL is keyed by session id; expectation evaluators evaluate per session; views project per session.
  • Each AgentClient carries a default_handler that auto-acks invites and runs Agent.ask on inbound text. You can replace it with agent_client.on_envelope(callback) when you need custom logic.
  • The hub assigns the agent_id at registration. Use it (alice.agent_id) for routing rather than the human-readable name. The name may not be unique under a multi-tenant deployment.

Where to Next#

  • Hub & Identity — the registry side: Hub.open, Passport, Resume, Rule, auth.
  • Agent Clients — the agent side: HubClient.register, default handler, custom handlers.
  • Session Adapters — pick the right one: free-form, 1Q1R, round-robin, or graph-driven.