Skip to content

WorkflowAdapter

autogen.beta.network.adapters.workflow.WorkflowAdapter #

WorkflowAdapter()

Generic orchestrated multi-party channel.

Knobs: {"graph": <TransitionGraph.to_dict()>}. Participants: 2+. Default view: :class:NamedWindowedSummary(recent_n=N*2) with N = participant count — bounded prompt size plus sender labels on non-self projection lines so the orchestrator / next speaker can tell its peers apart in a 3+ party chat (the assistant/user role bit alone collapses every "other" into one indistinguishable stream).

Source code in autogen/beta/network/adapters/workflow.py
def __init__(self) -> None:
    self.manifest = ChannelManifest(
        type=WORKFLOW_TYPE,
        version=1,
        participants=ParticipantSchema(min=2),
        knobs_schema={"graph": "TransitionGraph"},
        default_view_policy=NamedWindowedSummary.name,
        expectations=[
            Expectation(
                name="turn_within",
                on_violation="warn",
                params={"seconds": 120},
            ),
            Expectation(
                name="turn_within",
                on_violation="auto_close",
                params={"seconds": 600},
            ),
        ],
    )

manifest instance-attribute #

manifest = ChannelManifest(type=WORKFLOW_TYPE, version=1, participants=ParticipantSchema(min=2), knobs_schema={'graph': 'TransitionGraph'}, default_view_policy=name, expectations=[Expectation(name='turn_within', on_violation='warn', params={'seconds': 120}), Expectation(name='turn_within', on_violation='auto_close', params={'seconds': 600})])

initial_state #

initial_state(metadata)
Source code in autogen/beta/network/adapters/workflow.py
def initial_state(self, metadata: ChannelMetadata) -> WorkflowState:
    graph_data = metadata.knobs.get("graph")
    if not isinstance(graph_data, dict):
        raise ProtocolError(
            "workflow requires knobs['graph'] as a dict — call TransitionGraph.to_dict() before passing"
        )
    try:
        graph = TransitionGraph.loads(graph_data)
    except WorkflowGraphError as exc:
        raise ProtocolError(f"invalid workflow graph: {exc}") from exc

    order = [p.agent_id for p in sorted(metadata.participants, key=lambda p: p.order)]
    if graph.initial_speaker not in order:
        raise ProtocolError(f"workflow initial_speaker {graph.initial_speaker!r} not in participants {order!r}")
    initial_context = metadata.knobs.get("context_vars", {})
    if not isinstance(initial_context, dict):
        raise ProtocolError("workflow knobs['context_vars'] must be a dict if provided")
    return WorkflowState(
        participant_order=order,
        expected_next_speaker=graph.initial_speaker,
        creator_id=metadata.creator_id,
        graph_data=graph_data,
        context_vars=dict(initial_context),
    )

fold #

fold(envelope, state)
Source code in autogen/beta/network/adapters/workflow.py
def fold(self, envelope: Envelope, state: WorkflowState) -> WorkflowState:
    # Context-variable mutations don't advance turn bookkeeping —
    # they're auxiliary metadata, applied before the substantive
    # gate so the new value is visible to the next fold.
    if envelope.event_type == EV_CONTEXT_SET:
        new_vars = dict(state.context_vars)
        for key in envelope.event_data.get("delete", []) or []:
            new_vars.pop(key, None)
        new_vars.update(envelope.event_data.get("set", {}) or {})
        return WorkflowState(
            participant_order=state.participant_order,
            expected_next_speaker=state.expected_next_speaker,
            last_speaker_id=state.last_speaker_id,
            last_envelope_id=state.last_envelope_id,
            turn_count=state.turn_count,
            pending_close_reason=state.pending_close_reason,
            creator_id=state.creator_id,
            graph_data=state.graph_data,
            context_vars=new_vars,
        )

    if not _is_substantive(envelope):
        return state

    graph = TransitionGraph.loads(state.graph_data)

    # Apply context mutations carried on the packet BEFORE
    # ``select_next`` so a ``ContextEquals`` rule can match values
    # the same packet just set (atomic state-update + speaker-
    # advance).
    new_context = dict(state.context_vars)
    if envelope.event_type == EV_PACKET:
        updates = envelope.event_data.get("context_updates", {}) or {}
        for key in updates.get("delete", []) or []:
            new_context.pop(key, None)
        new_context.update(updates.get("set", {}) or {})

    # Build the post-fold state with bookkeeping advanced; speaker
    # selection happens against this state so transitions see the
    # turn count and last speaker that include this envelope.
    new_state = WorkflowState(
        participant_order=state.participant_order,
        expected_next_speaker=state.expected_next_speaker,
        last_speaker_id=envelope.sender_id,
        last_envelope_id=envelope.envelope_id,
        turn_count=state.turn_count + 1,
        pending_close_reason="",
        creator_id=state.creator_id,
        graph_data=state.graph_data,
        context_vars=new_context,
    )

    # Routing resolution.
    # ``kind: "finish"`` short-circuits to termination — a tool
    # that returned ``Finish`` is the most explicit intent the
    # agent can express. ``kind: "handoff"`` with a pre-resolved
    # ``routing.target`` (from a typed ``Handoff`` return) trusts
    # the tool's pick directly. Otherwise run ``select_next`` so
    # static rules (``ToolCalled``, ``ContextEquals``,
    # ``FromSpeaker``) decide.
    finish_reason: str | None = None
    pre_resolved = None
    if envelope.event_type == EV_PACKET:
        routing = envelope.event_data.get("routing", {}) or {}
        if routing.get("kind") == "finish":
            finish_reason = routing.get("reason") or "finished"
        elif routing.get("kind") == "handoff":
            pre_resolved = routing.get("target")

    if finish_reason is not None:
        new_state.expected_next_speaker = None
        new_state.pending_close_reason = finish_reason
    elif pre_resolved:
        new_state.expected_next_speaker = pre_resolved
        new_state.pending_close_reason = ""
    else:
        decision = self._select(graph, new_state, envelope)
        new_state.expected_next_speaker = decision.next_speaker
        new_state.pending_close_reason = decision.close_reason
    return new_state

validate_create #

validate_create(metadata)
Source code in autogen/beta/network/adapters/workflow.py
def validate_create(self, metadata: ChannelMetadata) -> None:
    if len(metadata.participants) < 2:
        raise ProtocolError(f"workflow requires at least 2 participants, got {len(metadata.participants)}")
    graph_data = metadata.knobs.get("graph")
    if not isinstance(graph_data, dict):
        raise ProtocolError(
            "workflow requires knobs['graph'] as a dict — call TransitionGraph.to_dict() before passing"
        )
    try:
        graph = TransitionGraph.loads(graph_data)
    except WorkflowGraphError as exc:
        raise ProtocolError(f"invalid workflow graph: {exc}") from exc
    order = {p.agent_id for p in metadata.participants}
    if graph.initial_speaker not in order:
        raise ProtocolError(
            f"workflow initial_speaker {graph.initial_speaker!r} not in participants {sorted(order)!r}"
        )

validate_send #

validate_send(metadata, envelope, state)
Source code in autogen/beta/network/adapters/workflow.py
def validate_send(
    self,
    metadata: ChannelMetadata,
    envelope: Envelope,
    state: WorkflowState,
) -> None:
    if envelope.event_type == EV_CONTEXT_SET:
        # Context variables can be set by any valid participant at any time
        participant_ids = {p.agent_id for p in metadata.participants}
        if envelope.sender_id not in participant_ids:
            raise ProtocolError(
                f"workflow {metadata.channel_id!r} only accepts EV_CONTEXT_SET "
                f"from participants, got {envelope.sender_id!r}"
            )
        return
    if not _is_substantive(envelope):
        return
    if state.expected_next_speaker and envelope.sender_id != state.expected_next_speaker:
        raise ProtocolError(
            f"workflow {metadata.channel_id!r} expects "
            f"{state.expected_next_speaker!r} to speak, got "
            f"{envelope.sender_id!r}"
        )

on_accepted #

on_accepted(metadata, envelope, state)
Source code in autogen/beta/network/adapters/workflow.py
def on_accepted(
    self,
    metadata: ChannelMetadata,
    envelope: Envelope,
    state: WorkflowState,
) -> AdapterResult:
    if not _is_substantive(envelope):
        return AdapterResult()

    # Prefer graph-emitted termination reason over max_turns
    if state.expected_next_speaker is None:
        reason = state.pending_close_reason or "workflow_terminated"
        return AdapterResult(
            next_state=ChannelState.CLOSED,
            auto_close_reason=reason,
        )

    graph = TransitionGraph.loads(state.graph_data)
    if graph.max_turns is not None and state.turn_count >= graph.max_turns:
        return AdapterResult(
            next_state=ChannelState.CLOSED,
            auto_close_reason="max_turns",
        )
    return AdapterResult()

expected_next #

expected_next(metadata, state)
Source code in autogen/beta/network/adapters/workflow.py
def expected_next(
    self,
    metadata: ChannelMetadata,
    state: WorkflowState,
) -> ExpectedTurn | None:
    if state.expected_next_speaker is None:
        return None
    return ExpectedTurn(
        agent_id=state.expected_next_speaker,
        triggering_envelope_id=state.last_envelope_id,
    )

default_view_policy #

default_view_policy(metadata, participant_id)
Source code in autogen/beta/network/adapters/workflow.py
def default_view_policy(
    self,
    metadata: ChannelMetadata,
    participant_id: str,
) -> ViewPolicy:
    recent_n = max(len(metadata.participants) * 2, 4)
    return NamedWindowedSummary(recent_n=recent_n)

extract_turn_input #

extract_turn_input(envelope)

Decode an inbound substantive envelope into the next speaker's prompt. Workflow handles EV_TEXT and EV_PACKET (concatenates the routing-handoff line with the body).

Source code in autogen/beta/network/adapters/workflow.py
def extract_turn_input(self, envelope: Envelope) -> str | None:
    """Decode an inbound substantive envelope into the next
    speaker's prompt. Workflow handles ``EV_TEXT`` and
    ``EV_PACKET`` (concatenates the routing-handoff line with
    the body)."""
    if envelope.event_type == EV_TEXT:
        text = envelope.event_data.get("text", "")
        return text if isinstance(text, str) else None
    if envelope.event_type == EV_PACKET:
        return _packet_turn_text(envelope) or None
    return None

build_round_envelope #

build_round_envelope(metadata, sender_id, reply, events, state, hub)

Build the EV_PACKET envelope capturing this round.

Walks the agent's local-stream events to determine routing intent (Handoff result first, then ToolCalled match, else text). select_next resolves the target at fold time for static routing; dynamic Handoff carries its resolved target on the packet's routing.target field.

Returns None for silent rounds (no body and no routing tool fired) — matches pre-packet "no envelope" behaviour.

Source code in autogen/beta/network/adapters/workflow.py
def build_round_envelope(
    self,
    metadata: ChannelMetadata,
    sender_id: str,
    reply: "AgentReply",
    events: list[BaseEvent],
    state: "WorkflowState | None",
    hub: "Hub",
) -> Envelope | None:
    """Build the ``EV_PACKET`` envelope capturing this round.

    Walks the agent's local-stream events to determine routing
    intent (Handoff result first, then ToolCalled match, else
    text). ``select_next`` resolves the target at fold time for
    static routing; dynamic Handoff carries its resolved target
    on the packet's ``routing.target`` field.

    Returns ``None`` for silent rounds (no body and no routing
    tool fired) — matches pre-packet "no envelope" behaviour.
    """
    graph: TransitionGraph | None = None
    if state is not None and state.graph_data:
        try:
            graph = TransitionGraph.loads(state.graph_data)
        except WorkflowGraphError:
            graph = None

    routing = _resolve_routing(events, graph, hub.name_to_id_map())
    body = reply.body or ""

    if routing["kind"] == "text" and not body:
        return None

    return Envelope(
        channel_id=metadata.channel_id,
        sender_id=sender_id,
        audience=None,
        event_type=EV_PACKET,
        event_data={
            "routing": routing,
            "context_updates": {"set": {}, "delete": []},
            "body": body,
        },
    )

render_envelope #

render_envelope(envelope)

Project EV_PACKET via :func:_packet_text; defer to :func:default_render_envelope for everything else (notably EV_TEXT for participant text emitted outside the workflow's round-end packet).

Source code in autogen/beta/network/adapters/workflow.py
def render_envelope(self, envelope):
    """Project ``EV_PACKET`` via :func:`_packet_text`; defer to
    :func:`default_render_envelope` for everything else (notably
    ``EV_TEXT`` for participant text emitted outside the
    workflow's round-end packet)."""
    if envelope.event_type == EV_PACKET:
        return _packet_text(envelope)
    return default_render_envelope(envelope)

tools_for #

tools_for(client, metadata, state, participant_id)

Workflow offers no adapter-level tools.

Handoff routing is encoded by user-authored @tool functions that return :class:Handoff(target=, reason=). The handler merges those tools (already on agent.tools) with the identity-level NetworkPlugin set; the workflow adapter itself contributes nothing.

Source code in autogen/beta/network/adapters/workflow.py
def tools_for(self, client, metadata, state, participant_id):
    """Workflow offers no adapter-level tools.

    Handoff routing is encoded by user-authored ``@tool`` functions
    that return :class:`Handoff(target=, reason=)`. The handler
    merges those tools (already on ``agent.tools``) with the
    identity-level ``NetworkPlugin`` set; the workflow adapter
    itself contributes nothing.
    """
    return default_tools_for(client, metadata, state, participant_id)

build_text_envelope #

build_text_envelope(channel_id, sender_id, text, *, audience=None, causation_id=None)

Workflow accepts text seeds (e.g. for an initiator's first turn) as plain EV_TEXT — the adapter folds them into the round-end packet downstream.

Source code in autogen/beta/network/adapters/workflow.py
def build_text_envelope(self, channel_id, sender_id, text, *, audience=None, causation_id=None):
    """Workflow accepts text seeds (e.g. for an initiator's first
    turn) as plain ``EV_TEXT`` — the adapter folds them into the
    round-end packet downstream."""
    return default_build_text_envelope(channel_id, sender_id, text, audience=audience, causation_id=causation_id)

build_packet_envelope #

build_packet_envelope(channel_id, sender_id, body, *, handoff=None, context_set=None, audience=None, causation_id=None)

Workflow's native round-end shape — handoff + context_set live in routing / context fields.

Source code in autogen/beta/network/adapters/workflow.py
def build_packet_envelope(
    self,
    channel_id,
    sender_id,
    body,
    *,
    handoff=None,
    context_set=None,
    audience=None,
    causation_id=None,
):
    """Workflow's native round-end shape — handoff + context_set
    live in ``routing`` / ``context`` fields."""
    return default_build_packet_envelope(
        channel_id,
        sender_id,
        body,
        handoff=handoff,
        context_set=context_set,
        audience=audience,
        causation_id=causation_id,
    )