Skip to content

Workflow

workflow is the orchestrated multi-party adapter. A declarative TransitionGraph describes who speaks first, what conditions fire, and when the session terminates. It's the modern replacement for the classic GroupChat + handoffs pattern — see Migrating from Group Chat for the side-by-side translation.

Shape#

Participants 2+
Turn order Whatever TransitionGraph says
Auto-close Yes — when graph emits a TerminateTarget decision or max_turns is hit
Termination Auto-close, explicit close, TTL, or expectation violation
Default view WindowedSummary(recent_n=N*2)
Default expectations turn_within(120s, warn), turn_within(600s, auto_close)
Required knob {"graph": <TransitionGraph.to_dict()>}

How a TransitionGraph Resolves the Next Speaker#

flowchart LR
    Env[Accepted EV_TEXT or EV_PACKET] --> Fold[adapter.fold:<br/>bookkeeping advanced]
    Fold --> Iter{For each transition<br/>in priority order}
    Iter -->|when.evaluate true| Then[then.resolve]
    Iter -->|when.evaluate false| Iter
    Iter -->|none match| Default[default_target.resolve]
    Then --> Decision[TransitionDecision:<br/>next_speaker / close_reason]
    Default --> Decision
    Decision -->|next_speaker is a participant| Continue[expected_next_speaker ← agent_id]
    Decision -->|next_speaker is None| Terminate[on_accepted returns CLOSING<br/>EV_SESSION_CLOSED with close_reason]

Each accepted substantive envelope walks the transitions list, finds the first matching condition, and resolves a target. TerminateTarget (with next_speaker=None) ends the session.

Lifecycle: Sequence (Pipeline) Example#

sequenceDiagram
    participant A as alice (creator)
    participant H as Hub + WorkflowAdapter
    participant B as bob
    participant C as carol

    A->>H: open(type="workflow", target=[bob, carol],<br/>knobs.graph = sequence([alice, bob, carol]))
    H->>B: EV_SESSION_INVITE
    H->>C: EV_SESSION_INVITE
    B->>H: EV_SESSION_INVITE_ACK
    C->>H: EV_SESSION_INVITE_ACK
    H->>A: EV_SESSION_OPENED
    Note over H: graph.initial_speaker = alice

    A->>H: EV_TEXT (turn 1)
    Note over H: FromSpeaker(alice) → AgentTarget(bob)
    H->>B: deliver
    B->>H: EV_TEXT (turn 2)
    Note over H: FromSpeaker(bob) → AgentTarget(carol)
    H->>C: deliver
    C->>H: EV_TEXT (turn 3)
    Note over H: no transition matches → default_target<br/>= TerminateTarget("sequence_complete")
    H-->>A: EV_SESSION_CLOSED (sequence_complete)
    H-->>B: EV_SESSION_CLOSED
    H-->>C: EV_SESSION_CLOSED

Building the Graph#

TransitionGraph is the orchestrator script:

@dataclass(slots=True)
class TransitionGraph:
    initial_speaker: str                   # agent_id of the first speaker
    transitions: list[Transition]          # ordered list, evaluated by priority
    default_target: TransitionTarget       # what happens if no transition matches
    max_turns: int | None = None           # hard turn cap

Each Transition pairs a condition with a target:

@dataclass(slots=True)
class Transition:
    when: TransitionCondition   # evaluated against the just-accepted envelope
    then: TransitionTarget      # if when() returns True, this resolves the next speaker
    priority: int = 0           # higher priority runs first; ties break by insertion order

Built-in Targets#

Target Decision
AgentTarget(agent_id) Hand off to a specific named agent.
RoundRobinTarget() Advance through the participant order.
StayTarget() Same speaker continues (rare; for "let me elaborate" patterns).
RevertToInitiatorTarget() Hand back to whoever opened the session.
TerminateTarget(reason="…") End the session; reason flows on EV_SESSION_CLOSED.

Built-in Conditions#

Condition Fires when
Always() Every accepted envelope.
FromSpeaker(agent_id) The just-accepted envelope was sent by this agent.
ToolCalled(tool_name) The previous turn called this tool by name (matched via the packet's routing.tool field).
ContextEquals(key, value) Session-scoped context_vars[key] equals value.

ContextEquals is the read side of the Context Variables primitive — most non-trivial routing in classic AG2 went through OnContextCondition and friends, and ContextEquals is its modern equivalent. The "Context-Driven Transitions" section below covers routing patterns, multi-branch dispatch, and the order-of-rules traps in detail.

Both TransitionTarget and TransitionCondition are Protocols with a name: ClassVar[str] registration key. Custom targets or conditions register via register_target(MyTarget) / register_condition(MyCondition) so they can round-trip through TransitionGraph.to_dict().

Convenience Factories#

Two are shipped:

# Cycle through participants for max_turns total turns.
graph = TransitionGraph.round_robin(
    participants=[alice.agent_id, bob.agent_id, carol.agent_id],
    max_turns=6,
)

# Pipeline: alice -> bob -> carol -> terminate.
graph = TransitionGraph.sequence([
    alice.agent_id, bob.agent_id, carol.agent_id,
])

round_robin(participants) uses Always() -> RoundRobinTarget(). sequence(steps) uses FromSpeaker(steps[i]) -> AgentTarget(steps[i+1]) for each pair, with TerminateTarget("sequence_complete") as the default.

Custom Graphs#

graph = TransitionGraph(
    initial_speaker=triage.agent_id,
    transitions=[
        Transition(
            when=ToolCalled("escalate_to_security"),
            then=AgentTarget(security.agent_id),
        ),
        Transition(
            when=FromSpeaker(security.agent_id),
            then=RevertToInitiatorTarget(),
        ),
    ],
    default_target=TerminateTarget(reason="triage_complete"),
    max_turns=20,
)

ToolCalled reads from the packet's routing.tool field. The flow is: the agent's Agent.ask round runs and one or more routing tools fire; when the round ends, the framework walks the agent's local-stream ToolCallEvents in emission order and records the first one matching a ToolCalled(name) rule into the packet's routing field; the workflow adapter folds the resulting EV_PACKET envelope and ToolCalled("escalate_to_security") matches.

For dynamic routing (target depends on runtime state), a tool can return a typed Handoff(target="<name>", reason="...") value instead — the framework reads it from the tool's result, resolves the participant name, and stamps routing.target on the packet. The matching ToolCalled rule is shadowed when a Handoff is returned from the same tool: the dynamic target wins.

Context-Driven Transitions#

Most non-trivial group-chat orchestrations in classic AG2 routed on context variables — OnContextCondition, StringContextCondition, ExpressionContextCondition. The beta equivalent is a tool that emits EV_CONTEXT_SET and a transition whose when is ContextEquals(key, value). The mutation primitive lives on the Context Variables page; this section is about using context to decide who speaks next.

Reading context in a transition#

ContextEquals compares state.context_vars.get(key) to value. Missing keys compare as None, so an unset key never matches a non-None value:

1
2
3
4
Transition(
    when=ContextEquals(key="route", value="security"),
    then=AgentTarget(security.agent_id),
)

The state is read on every fold of a substantive envelope (text or handoff). So as soon as a tool's EV_CONTEXT_SET lands on the WAL, the next fold sees the new value — typically the speaker's reply text, fired from the same Agent.ask call.

Routing on a flag#

The 1-bit case: a tool sets a boolean, the transition routes on it. This is the modern is_termination_msg analogue, but generalised to "is some condition met."

graph = TransitionGraph(
    initial_speaker=intake.agent_id,
    transitions=[
        Transition(when=FromSpeaker(intake.agent_id),     then=AgentTarget(triage.agent_id)),
        Transition(when=ContextEquals("urgent", value=True), then=AgentTarget(oncall.agent_id)),
        Transition(when=FromSpeaker(triage.agent_id),     then=AgentTarget(reviewer.agent_id)),
        Transition(when=FromSpeaker(oncall.agent_id),     then=TerminateTarget("paged")),
        Transition(when=FromSpeaker(reviewer.agent_id),   then=TerminateTarget("reviewed")),
    ],
    default_target=TerminateTarget("fall_through"),
    max_turns=10,
)

Triage's tool flips urgent=True when the ticket warrants paging. The condition fires on triage's reply fold and reroutes the next turn to oncall — whose FromSpeaker rule then terminates the session. Without the flag, triage hands off to the reviewer instead.

Multi-branch dispatch#

Three or more buckets, one ContextEquals per branch. Order matters: lower-priority transitions are checked first, and ties resolve in insertion order. Put the more specific rules first so they win:

graph = TransitionGraph(
    initial_speaker=triage.agent_id,
    transitions=[
        Transition(when=ContextEquals("domain", value="security"), then=AgentTarget(sec.agent_id)),
        Transition(when=ContextEquals("domain", value="legal"),    then=AgentTarget(legal.agent_id)),
        Transition(when=ContextEquals("domain", value="billing"),  then=AgentTarget(billing.agent_id)),
        # Default: fall through to the generic catch-all.
        Transition(when=FromSpeaker(triage.agent_id), then=AgentTarget(generic.agent_id)),
    ],
    default_target=TerminateTarget("done"),
)

If triage's tool calls set_context(domain="security") then the security row matches and the generic FromSpeaker(triage) row is never consulted. Note that ContextEquals(key, value=None) fires on missing keys — useful for "unset" branches.

Combining with FromSpeaker#

Most useful patterns combine the two: "if alice spoke AND the flag is set, do X." Beta's first-cut conditions don't ship a built-in AllOf composer, so you encode the conjunction as transition order — list the most specific rules first, with subsequent rules as fallbacks:

1
2
3
4
5
6
transitions=[
    # Specific: alice flagged escalation → security
    Transition(when=ContextEquals("escalate", value=True), then=AgentTarget(security.agent_id)),
    # Less specific: alice's normal reply → reviewer
    Transition(when=FromSpeaker(alice.agent_id), then=AgentTarget(reviewer.agent_id)),
]

When alice speaks and escalate==True, the first row wins. When alice speaks and the flag is unset, the first row falls through and the second row matches. Same evaluation order as classic OnCondition lists.

If you need a true AND of "from this speaker AND in this state," register a custom composer (the AllOf recipe in Context Variables is the typical shape).

Avoiding the stuck-routing trap#

ContextEquals is sticky. Once route="security" is in context_vars, every subsequent fold re-evaluates it. If the security agent speaks next and you have ContextEquals("route", "security") AgentTarget(security) near the top of the list, you'll bounce right back to security forever (or until max_turns).

Two fixes:

  1. List terminate rules before context-conditions. FromSpeaker(security) TerminateTarget(...) placed earlier in the list short-circuits the loop after security speaks.
1
2
3
4
5
6
transitions=[
    # Terminate FIRST so post-handoff speaker exits before re-matching.
    Transition(when=FromSpeaker(security.agent_id), then=TerminateTarget("security_done")),
    Transition(when=ContextEquals("route", value="security"), then=AgentTarget(security.agent_id)),
    Transition(when=FromSpeaker(triage.agent_id), then=AgentTarget(legal.agent_id)),
]
  1. Have the second agent clear the key. Security's tool emits EV_CONTEXT_SET with {"delete": ["route"]} when it's done. Subsequent folds see route unset and the routing transition stops firing.

The first fix is the more common pattern — terminate transitions are cheap, the speaker-rule check is just an == against the envelope's sender_id.

Order check

When a graph-driven session loops unexpectedly, the first thing to check is the transition list order. The shipped sequence and round_robin factories handle this for you; custom graphs need explicit attention.

Beyond ContextEquals#

ContextEquals is the only context-driven condition shipped today. For richer predicates, register your own — the Protocol is evaluate(state, envelope) -> bool:

from typing import ClassVar
from dataclasses import dataclass
from autogen.beta.network import register_condition

@dataclass(slots=True)
class ContextThreshold:
    """Fires when ``state.context_vars[key] >= threshold``."""

    key: str
    threshold: float
    name: ClassVar[str] = "context_threshold"

    def evaluate(self, state, envelope) -> bool:
        value = state.context_vars.get(self.key, 0)
        return isinstance(value, (int, float)) and value >= self.threshold

register_condition(ContextThreshold)

register_condition plugs the class into TransitionGraph.to_dict() round-tripping, so the graph still serialises cleanly through Hub.hydrate(). The AllOf / AnyOf / ContextIn / ContextThreshold recipes follow the same pattern. See Context Variables → Custom Conditions for the full set of recipes.

Opening a Workflow Session#

graph = TransitionGraph.round_robin(
    participants=[alice.agent_id, bob.agent_id, carol.agent_id],
    max_turns=6,
)

session = await alice.open(
    type="workflow",
    target=[bob.agent_id, carol.agent_id],
    knobs={"graph": graph.to_dict()},
)

initial_speaker must match a participant. The creator (alice) is automatically a participant; targets fill the rest. The graph's dict form is what gets stored on SessionMetadata.knobs["graph"], so it round-trips through Hub.hydrate() deterministically.

After opening, the creator's first send is treated as turn 1 by the adapter. The default handler then drives subsequent turns by probing can_send.

Termination#

The graph terminates the session when:

  1. A transition fires whose target is TerminateTarget(reason="…"), or
  2. max_turns is reached and no other transition fires (the default_target is consulted), or
  3. An agent explicitly calls session.close(reason="…"), or
  4. An expectation violation triggers AutoCloseHandler, or
  5. The session's TTL expires.

EV_SESSION_CLOSED carries the close reason on its event_data.

For the cross-adapter view (when to use the workflow graph vs. an app-side cap, an agent tool, or a sentinel adapter), see Closing Sessions.

State Object#

@dataclass(slots=True)
class WorkflowState:
    participant_order: list[str]
    expected_next_speaker: str | None
    last_speaker_id: str | None = None
    last_envelope_id: str | None = None
    turn_count: int = 0
    pending_close_reason: str = ""
    creator_id: str = ""
    graph_data: dict = field(default_factory=dict)

expected_next_speaker = None signals "session should terminate." The adapter's on_accepted reads this and returns AdapterResult(next_state=CLOSING, ...).

graph_data is the serialised graph — the adapter rebuilds TransitionGraph on every fold so it doesn't keep mutable graph state in memory between turns.

Custom Targets / Conditions#

Implement the Protocol, decorate with a unique name, register on the default registry:

from autogen.beta.network import (
    Envelope,
    TransitionTarget,
    TransitionDecision,
    register_target,
)

@dataclass(slots=True)
class HighestRankedReviewer(TransitionTarget):
    name: ClassVar[str] = "highest_ranked_reviewer"
    role_priority: list[str] = field(default_factory=list)

    def resolve(self, state, envelope: Envelope) -> TransitionDecision:
        # ...look up the next reviewer based on your domain logic...
        return TransitionDecision(next_speaker=chosen_id)

register_target(HighestRankedReviewer)

Then use it in a graph just like a built-in:

Transition(when=Always(), then=HighestRankedReviewer(role_priority=["security", "legal"]))

Custom targets and conditions persist via TransitionGraph.to_dict() — the name field is the key, and the dataclass fields become the args dict that loads(...) passes back to the constructor.

Packet execution model#

Each Agent.ask round on a workflow session commits to the WAL atomically as a single EV_PACKET envelope. The packet carries the agent's routing decision (routing.tool matched against ToolCalled rules, or a pre-resolved routing.target from a typed Handoff return), the round's body text, and a reserved context_updates slot. State mutations from tool calls (via set_context(session, ...)) land as separate EV_CONTEXT_SET envelopes during tool execution — they're folded before the packet, so a ContextEquals rule on the same fold sees the just-set value.

External side-effects and packet retry#

The packet model commits a round's effects atomically: if the agent crashes mid-packet, the session reverts to its pre-packet state and the original input is re-dispatched. Tool calls within that packet will execute again on retry.

If your tool calls an external system (HTTP API, database, payment gateway, queue), it must be idempotent under retry — calling it twice with the same arguments must produce the same outcome as calling it once.

Recommended patterns:

  • Use the external service's idempotency-key feature where available (Stripe, S3, well-designed REST APIs). Derive a stable key from (session_id, round_counter, tool_name) so retries within a packet reuse the same key.
  • For database writes, use upsert (INSERT ... ON CONFLICT) rather than blind insert.
  • For tools that genuinely cannot be made idempotent (rare), gate them behind a HITL confirmation step or run them in a single-tool round so the packet-rollback boundary is tighter.

HITL packet-boundary semantics#

Non-speaker substantive sends (e.g. a supervisor injecting a correction via session.send(EV_TEXT, ...)) are accepted at packet boundaries, not arbitrary instants. While an agent's packet is in flight (including any slow tool execution), validate_send keeps that agent as the expected speaker until the packet commits. A supervisor's mid-packet inject waits for the active packet to commit (≈ slow-tool latency) before being accepted.

Loose-semantics writes are unaffected

EV_CONTEXT_SET envelopes (emitted by set_context / delete_context from any participant) are non-substantive and land immediately, regardless of who's speaking — observer writes during another agent's packet are visible to the next packet's ContextEquals evaluation.

Working Examples#

For the canonical multi-agent patterns translated from classic AG2 (Pipeline, Star, Feedback Loop, Triage-with-Tasks, etc.), see the Pattern Cookbook.