Skip to content

Context-Aware Routing

The Context-Aware Routing pattern uses a router agent to read the user's request, classify it into a category, and dispatch to the specialist whose domain matches.

Classic primitives: DefaultPattern, StringLLMCondition (LLM-evaluated routing inside the framework), or ExpressionContextCondition over a router-tool-set domain field.

Key Characteristics#

  • Router agent thin. The router's only job is to classify and call the matching classify_as_<category> tool.
  • Specialists answer. Each specialist is a separate agent with a domain-specific prompt; the graph routes the request to the right one via ContextEquals on the just-set category.
  • Specialist's reply terminates. A FromSpeaker(<specialist>) TerminateTarget rule sits ABOVE the ContextEquals rules so the workflow closes after the specialist speaks (rather than looping back on the sticky category flag — see warning below).

Routing Mechanics#

Each classify_as_<category> tool is a plain @tool that calls set_context(session, "category", <category>). The helper emits a non-substantive EV_CONTEXT_SET envelope that lands in the WAL before the round's EV_PACKET commits, so when the router's packet folds, ContextEquals("category", X) already sees the just-set value and routes the next turn to the matching specialist.

Sticky routing

Note the transition ordering: FromSpeaker(<specialist>) TerminateTarget rules sit ABOVE the ContextEquals rules. Without that ordering, ContextEquals("category", X) would re-match after the specialist speaks (the category var is never cleared) and the graph would loop back to the same specialist. This is the canonical "sticky routing" trap; the fix is always to list terminate / FromSpeaker rules before any sticky ContextEquals.

Agent Flow#

sequenceDiagram
    participant User as user
    participant Router as router
    participant Billing as billing
    participant Technical as technical
    participant General as general

    User->>Router: question
    Router->>Router: classify_as_<category>(reason); set_context("category", <category>)
    Note over Router: ContextEquals("category", X) routes to specialist
    alt category=billing
        Router->>Billing: AgentTarget(billing)
        Billing->>User: answer; FromSpeaker(billing) → TerminateTarget("billing_resolved")
    else category=technical
        Router->>Technical: AgentTarget(technical)
        Technical->>User: answer; FromSpeaker(technical) → TerminateTarget("technical_resolved")
    else category=general
        Router->>General: AgentTarget(general)
        General->>User: answer; FromSpeaker(general) → TerminateTarget("general_resolved")
    end

Migration Notes#

Classic Beta
StringLLMCondition (framework asks LLM at the transition) Router agent's LLM turn calls classify_as_<category> tool; declarative graph reads the resulting state
ReplyResult(context_variables={"category": "billing"}, target=AgentTarget(billing_specialist)) await set_context(session, "category", "billing") from inside a plain @tool; ContextEquals("category", "billing") AgentTarget(billing) graph rule
ExpressionContextCondition(...) per category ContextEquals("category", X) per category

Gaps & Workarounds#

  • No LLMCondition. Classic StringLLMCondition allowed the framework to ask an LLM "should this transition fire?" — purely declarative routing. Beta requires the router agent to make the decision inside its own LLM turn and record it via a tool call. Functionally equivalent but conceptually different: the routing intelligence lives in the agent, not the graph. Workaround: keep the router agent thin (system prompt = "classify into one of {billing, technical, general} then call classify_as_<category>").

Code#

Tip

The router uses real Sonnet (the classification is the LLM-driven part). Each specialist also uses real Sonnet to give a domain-flavoured reply.

"""Cookbook 07 — Context-Aware Routing pattern.

A router agent reads the user's request, classifies it into a
category, and the graph's ``ContextEquals`` transitions route to the
matching specialist.
"""

import asyncio

from dotenv import load_dotenv

from autogen.beta import Agent
from autogen.beta.config import AnthropicConfig
from autogen.beta.knowledge import MemoryKnowledgeStore
from autogen.beta.network import (
    EV_PACKET,
    EV_SESSION_CLOSED,
    EV_TEXT,
    WORKFLOW_TYPE,
    AgentTarget,
    ContextEquals,
    FromSpeaker,
    Hub,
    HubClient,
    LocalLink,
    Passport,
    Resume,
    SessionInject,
    TerminateTarget,
    Transition,
    TransitionGraph,
)
from autogen.beta.network.workflow_helpers import set_context
from autogen.beta.testing import TestConfig

load_dotenv()

async def classify_as_billing(reason: str, session: SessionInject) -> str:
    """Classify the request as billing. Sets
    context_vars['category'] = 'billing' so the graph's
    ContextEquals('category', 'billing') rule routes to the
    billing specialist."""
    if session is None:
        return "no session"
    print(f"  [tool] classify_as_billing({reason!r})")
    await set_context(session, "category", "billing")
    return f"classified as billing: {reason}"

async def classify_as_technical(reason: str, session: SessionInject) -> str:
    """Classify as technical."""
    if session is None:
        return "no session"
    print(f"  [tool] classify_as_technical({reason!r})")
    await set_context(session, "category", "technical")
    return f"classified as technical: {reason}"

async def classify_as_general(reason: str, session: SessionInject) -> str:
    """Classify as general support."""
    if session is None:
        return "no session"
    print(f"  [tool] classify_as_general({reason!r})")
    await set_context(session, "category", "general")
    return f"classified as general: {reason}"

async def main() -> None:
    config = AnthropicConfig(model="claude-sonnet-4-6")

    hub_obj = await Hub.open(MemoryKnowledgeStore(), ttl_sweep_interval=0)
    link = LocalLink(hub_obj)

    user_hc = HubClient(link, hub=hub_obj)
    router_hc = HubClient(link, hub=hub_obj)
    billing_hc = HubClient(link, hub=hub_obj)
    technical_hc = HubClient(link, hub=hub_obj)
    general_hc = HubClient(link, hub=hub_obj)

    user_agent = Agent("user", config=TestConfig())

    router_agent = Agent(
        "router",
        prompt=(
            "You are the routing agent. Classify the user's request "
            "into ONE of three categories and call the matching tool. "
            "Don't write any extra text — the tool advances the workflow.\n"
            "\n"
            "Categories:\n"
            "* `classify_as_billing` — payment, refund, invoice, "
            "subscription tier, pricing.\n"
            "* `classify_as_technical` — bug, error, integration, "
            "API, setup, connectivity. Anything technical.\n"
            "* `classify_as_general` — account info, policy, FAQ, "
            "anything not billing or technical.\n"
            "\n"
            "Call exactly ONE tool with a short `reason` argument."
        ),
        config=config,
    )
    router_agent.tool(classify_as_billing)
    router_agent.tool(classify_as_technical)
    router_agent.tool(classify_as_general)

    billing_agent = Agent(
        "billing",
        prompt=(
            "You are the billing specialist. Reply in 1-2 sentences "
            "with concrete next steps for billing/payment/subscription "
            "issues. Don't escalate — just answer."
        ),
        config=config,
    )
    technical_agent = Agent(
        "technical",
        prompt=(
            "You are the technical specialist. Reply in 1-2 sentences "
            "with concrete diagnostic next steps for bugs, API errors, "
            "or integration problems. Don't escalate — just answer."
        ),
        config=config,
    )
    general_agent = Agent(
        "general",
        prompt=(
            "You are the general support specialist. Reply in 1-2 "
            "sentences answering account, policy, or FAQ questions. "
            "Don't escalate — just answer."
        ),
        config=config,
    )

    user = await user_hc.register(user_agent, Passport(name="user"), Resume())
    router = await router_hc.register(router_agent, Passport(name="router"), Resume())
    billing = await billing_hc.register(billing_agent, Passport(name="billing"), Resume())
    technical = await technical_hc.register(technical_agent, Passport(name="technical"), Resume())
    general = await general_hc.register(general_agent, Passport(name="general"), Resume())

    graph = TransitionGraph(
        initial_speaker=user.agent_id,
        transitions=[
            # Specialist's reply terminates — listed BEFORE the
            # ContextEquals routing rules so we don't loop on the
            # sticky `category` var.
            Transition(when=FromSpeaker(billing.agent_id),   then=TerminateTarget("billing_resolved")),
            Transition(when=FromSpeaker(technical.agent_id), then=TerminateTarget("technical_resolved")),
            Transition(when=FromSpeaker(general.agent_id),   then=TerminateTarget("general_resolved")),
            # Route based on router-set category.
            Transition(when=ContextEquals("category", "billing"),   then=AgentTarget(billing.agent_id)),
            Transition(when=ContextEquals("category", "technical"), then=AgentTarget(technical.agent_id)),
            Transition(when=ContextEquals("category", "general"),   then=AgentTarget(general.agent_id)),
            # User's question → router.
            Transition(when=FromSpeaker(user.agent_id), then=AgentTarget(router.agent_id)),
        ],
        default_target=TerminateTarget("fall_through"),
        max_turns=10,
    )

    session = await user.open(
        type=WORKFLOW_TYPE,
        target=[router.agent_id, billing.agent_id, technical.agent_id, general.agent_id],
        knobs={"graph": graph.to_dict()},
    )
    print(f"session: {session.session_id}\n")

    name_by_id = {
        user.agent_id: "user",
        router.agent_id: "router",
        billing.agent_id: "billing",
        technical.agent_id: "technical",
        general.agent_id: "general",
    }

    await session.send(
        "I tried to upgrade my subscription but the API is returning a "
        "500 error. The status page says everything is green. Help?"
    )

    # Wait for the workflow to terminate (any of the five close routes
    # documented in /docs/beta/network/termination — this demo uses
    # FromSpeaker(<specialist>) → TerminateTarget("<category>_resolved")).
    close_env = await user_hc.wait_for_session_event(
        session_id=session.session_id,
        predicate=lambda e: e.event_type == EV_SESSION_CLOSED,
        timeout=120.0,
    )

    # Print the transcript from the WAL after close.
    for env in await hub_obj.read_wal(session.session_id):
        speaker = name_by_id.get(env.sender_id, env.sender_id[:8])
        if env.event_type == EV_TEXT:
            print(f"{speaker:>14}: {env.event_data['text']}")
        elif env.event_type == EV_PACKET:
            routing = env.event_data.get("routing", {}) or {}
            if routing.get("kind") == "handoff":
                line = f"[Handed off via {routing.get('tool', '')}] {routing.get('reason', '')}"
                print(f"{speaker:>14}: {line.rstrip()}")
            body = env.event_data.get("body", "")
            if body:
                print(f"{speaker:>14}: {body}")

    print(f"\nclosed: reason={close_env.event_data.get('reason')!r}")

    state = hub_obj._adapter_states[session.session_id]
    print(f"final context_vars: {state.context_vars!r}")

    await user_hc.close()
    await router_hc.close()
    await billing_hc.close()
    await technical_hc.close()
    await general_hc.close()
    await hub_obj.close()

if __name__ == "__main__":
    asyncio.run(main())

Output#

session: 8b4d...

           user: I tried to upgrade my subscription but the API is returning a 500 error. The status page says everything is green. Help?
  [tool] classify_as_technical('User is reporting a 500 error from the API during a subscription upgrade — this is a technical/integration issue, not a billing dispute or general policy question.')
         router:
      technical: Capture the full request (endpoint, payload, response headers, request-id) and re-try the upgrade — if the 500 persists, share the request-id with us so we can trace it server-side; in the meantime, check whether the failure correlates with a specific plan, payment method, or coupon as those code paths are the most common 500 sources during upgrade.

closed: reason='technical_resolved'
final context_vars: {'category': 'technical'}