Skip to content

WorkflowAdapter

autogen.beta.network.adapters.workflow.WorkflowAdapter #

WorkflowAdapter()

Generic orchestrated multi-party session.

Knobs: {"graph": <TransitionGraph.to_dict()>}. Participants: 2+. Default view: :class:WindowedSummary(recent_n=N*2) with N = participant count.

Source code in autogen/beta/network/adapters/workflow.py
def __init__(self) -> None:
    self.manifest = SessionManifest(
        type=WORKFLOW_TYPE,
        version=1,
        participants=ParticipantSchema(min=2),
        knobs_schema={"graph": "TransitionGraph"},
        default_view_policy=WindowedSummary.name,
        expectations=[
            Expectation(
                name="turn_within",
                on_violation="warn",
                params={"seconds": 120},
            ),
            Expectation(
                name="turn_within",
                on_violation="auto_close",
                params={"seconds": 600},
            ),
        ],
    )

manifest instance-attribute #

manifest = SessionManifest(type=WORKFLOW_TYPE, version=1, participants=ParticipantSchema(min=2), knobs_schema={'graph': 'TransitionGraph'}, default_view_policy=name, expectations=[Expectation(name='turn_within', on_violation='warn', params={'seconds': 120}), Expectation(name='turn_within', on_violation='auto_close', params={'seconds': 600})])

initial_state #

initial_state(metadata)
Source code in autogen/beta/network/adapters/workflow.py
def initial_state(self, metadata: SessionMetadata) -> WorkflowState:
    graph_data = metadata.knobs.get("graph")
    if not isinstance(graph_data, dict):
        raise ProtocolError(
            "workflow requires knobs['graph'] as a dict — call TransitionGraph.to_dict() before passing"
        )
    try:
        graph = TransitionGraph.loads(graph_data)
    except WorkflowGraphError as exc:
        raise ProtocolError(f"invalid workflow graph: {exc}") from exc

    order = [p.agent_id for p in sorted(metadata.participants, key=lambda p: p.order)]
    if graph.initial_speaker not in order:
        raise ProtocolError(f"workflow initial_speaker {graph.initial_speaker!r} not in participants {order!r}")
    initial_context = metadata.knobs.get("context_vars", {})
    if not isinstance(initial_context, dict):
        raise ProtocolError("workflow knobs['context_vars'] must be a dict if provided")
    return WorkflowState(
        participant_order=order,
        expected_next_speaker=graph.initial_speaker,
        creator_id=metadata.creator_id,
        graph_data=graph_data,
        context_vars=dict(initial_context),
    )

fold #

fold(envelope, state)
Source code in autogen/beta/network/adapters/workflow.py
def fold(self, envelope: Envelope, state: WorkflowState) -> WorkflowState:
    # Context-variable mutations don't advance turn bookkeeping —
    # they're auxiliary metadata, applied before the substantive
    # gate so the new value is visible to the next fold.
    if envelope.event_type == EV_CONTEXT_SET:
        new_vars = dict(state.context_vars)
        for key in envelope.event_data.get("delete", []) or []:
            new_vars.pop(key, None)
        new_vars.update(envelope.event_data.get("set", {}) or {})
        return WorkflowState(
            participant_order=state.participant_order,
            expected_next_speaker=state.expected_next_speaker,
            last_speaker_id=state.last_speaker_id,
            last_envelope_id=state.last_envelope_id,
            turn_count=state.turn_count,
            pending_close_reason=state.pending_close_reason,
            creator_id=state.creator_id,
            graph_data=state.graph_data,
            context_vars=new_vars,
        )

    if not _is_substantive(envelope):
        return state

    graph = TransitionGraph.loads(state.graph_data)

    # Apply context mutations carried on the packet BEFORE
    # ``select_next`` so a ``ContextEquals`` rule can match values
    # the same packet just set (atomic state-update + speaker-
    # advance).
    new_context = dict(state.context_vars)
    if envelope.event_type == EV_PACKET:
        updates = envelope.event_data.get("context_updates", {}) or {}
        for key in updates.get("delete", []) or []:
            new_context.pop(key, None)
        new_context.update(updates.get("set", {}) or {})

    # Build the post-fold state with bookkeeping advanced; speaker
    # selection happens against this state so transitions see the
    # turn count and last speaker that include this envelope.
    new_state = WorkflowState(
        participant_order=state.participant_order,
        expected_next_speaker=state.expected_next_speaker,
        last_speaker_id=envelope.sender_id,
        last_envelope_id=envelope.envelope_id,
        turn_count=state.turn_count + 1,
        pending_close_reason="",
        creator_id=state.creator_id,
        graph_data=state.graph_data,
        context_vars=new_context,
    )

    # Routing resolution.
    # If the packet carries a pre-resolved ``routing.target`` (set
    # by the framework when a tool returned a typed ``Handoff``),
    # trust it directly — dynamic Handoff supersedes graph rules.
    # Otherwise run ``select_next`` so static rules (``ToolCalled``,
    # ``ContextEquals``, ``FromSpeaker``) decide.
    pre_resolved = None
    if envelope.event_type == EV_PACKET:
        routing = envelope.event_data.get("routing", {}) or {}
        if routing.get("kind") == "handoff":
            pre_resolved = routing.get("target")

    if pre_resolved:
        new_state.expected_next_speaker = pre_resolved
        new_state.pending_close_reason = ""
    else:
        decision = self._select(graph, new_state, envelope)
        new_state.expected_next_speaker = decision.next_speaker
        new_state.pending_close_reason = decision.close_reason
    return new_state

validate_create #

validate_create(metadata)
Source code in autogen/beta/network/adapters/workflow.py
def validate_create(self, metadata: SessionMetadata) -> None:
    if len(metadata.participants) < 2:
        raise ProtocolError(f"workflow requires at least 2 participants, got {len(metadata.participants)}")
    graph_data = metadata.knobs.get("graph")
    if not isinstance(graph_data, dict):
        raise ProtocolError(
            "workflow requires knobs['graph'] as a dict — call TransitionGraph.to_dict() before passing"
        )
    try:
        graph = TransitionGraph.loads(graph_data)
    except WorkflowGraphError as exc:
        raise ProtocolError(f"invalid workflow graph: {exc}") from exc
    order = {p.agent_id for p in metadata.participants}
    if graph.initial_speaker not in order:
        raise ProtocolError(
            f"workflow initial_speaker {graph.initial_speaker!r} not in participants {sorted(order)!r}"
        )

validate_send #

validate_send(metadata, envelope, state)
Source code in autogen/beta/network/adapters/workflow.py
def validate_send(
    self,
    metadata: SessionMetadata,
    envelope: Envelope,
    state: WorkflowState,
) -> None:
    if envelope.event_type == EV_CONTEXT_SET:
        # Context variables can be set by any valid participant at any time
        participant_ids = {p.agent_id for p in metadata.participants}
        if envelope.sender_id not in participant_ids:
            raise ProtocolError(
                f"workflow {metadata.session_id!r} only accepts EV_CONTEXT_SET "
                f"from participants, got {envelope.sender_id!r}"
            )
        return
    if not _is_substantive(envelope):
        return
    if state.expected_next_speaker and envelope.sender_id != state.expected_next_speaker:
        raise ProtocolError(
            f"workflow {metadata.session_id!r} expects "
            f"{state.expected_next_speaker!r} to speak, got "
            f"{envelope.sender_id!r}"
        )

on_accepted #

on_accepted(metadata, envelope, state)
Source code in autogen/beta/network/adapters/workflow.py
def on_accepted(
    self,
    metadata: SessionMetadata,
    envelope: Envelope,
    state: WorkflowState,
) -> AdapterResult:
    if not _is_substantive(envelope):
        return AdapterResult()

    # Prefer graph-emitted termination reason over max_turns
    if state.expected_next_speaker is None:
        reason = state.pending_close_reason or "workflow_terminated"
        return AdapterResult(
            next_state=SessionState.CLOSED,
            auto_close_reason=reason,
        )

    graph = TransitionGraph.loads(state.graph_data)
    if graph.max_turns is not None and state.turn_count >= graph.max_turns:
        return AdapterResult(
            next_state=SessionState.CLOSED,
            auto_close_reason="max_turns",
        )
    return AdapterResult()

default_view_policy #

default_view_policy(metadata, participant_id)
Source code in autogen/beta/network/adapters/workflow.py
def default_view_policy(
    self,
    metadata: SessionMetadata,
    participant_id: str,
) -> ViewPolicy:
    recent_n = max(len(metadata.participants) * 2, 4)
    return WindowedSummary(recent_n=recent_n)

extract_turn_input #

extract_turn_input(envelope)

Decode an inbound substantive envelope into the next speaker's prompt. Workflow handles EV_TEXT and EV_PACKET (concatenates the routing-handoff line with the body).

Source code in autogen/beta/network/adapters/workflow.py
def extract_turn_input(self, envelope: Envelope) -> str | None:
    """Decode an inbound substantive envelope into the next
    speaker's prompt. Workflow handles ``EV_TEXT`` and
    ``EV_PACKET`` (concatenates the routing-handoff line with
    the body)."""
    if envelope.event_type == EV_TEXT:
        text = envelope.event_data.get("text", "")
        return text if isinstance(text, str) else None
    if envelope.event_type == EV_PACKET:
        return _packet_turn_text(envelope) or None
    return None

build_round_envelope #

build_round_envelope(metadata, sender_id, reply, events, state, hub)

Build the EV_PACKET envelope capturing this round.

Walks the agent's local-stream events to determine routing intent (Handoff result first, then ToolCalled match, else text). select_next resolves the target at fold time for static routing; dynamic Handoff carries its resolved target on the packet's routing.target field.

Returns None for silent rounds (no body and no routing tool fired) — matches pre-packet "no envelope" behaviour.

Source code in autogen/beta/network/adapters/workflow.py
def build_round_envelope(
    self,
    metadata: SessionMetadata,
    sender_id: str,
    reply: "AgentReply",
    events: list[BaseEvent],
    state: "WorkflowState | None",
    hub: "Hub",
) -> Envelope | None:
    """Build the ``EV_PACKET`` envelope capturing this round.

    Walks the agent's local-stream events to determine routing
    intent (Handoff result first, then ToolCalled match, else
    text). ``select_next`` resolves the target at fold time for
    static routing; dynamic Handoff carries its resolved target
    on the packet's ``routing.target`` field.

    Returns ``None`` for silent rounds (no body and no routing
    tool fired) — matches pre-packet "no envelope" behaviour.
    """
    graph: TransitionGraph | None = None
    if state is not None and state.graph_data:
        try:
            graph = TransitionGraph.loads(state.graph_data)
        except WorkflowGraphError:
            graph = None

    routing = _resolve_routing(events, graph, hub._name_to_id)
    body = reply.body or ""

    if routing["kind"] == "text" and not body:
        return None

    return Envelope(
        session_id=metadata.session_id,
        sender_id=sender_id,
        audience=None,
        event_type=EV_PACKET,
        event_data={
            "routing": routing,
            "context_updates": {"set": {}, "delete": []},
            "body": body,
        },
    )

render_envelope #

render_envelope(envelope)

Project EV_PACKET via :func:_packet_text; defer to :func:default_render_envelope for everything else (notably EV_TEXT for participant text emitted outside the workflow's round-end packet).

Source code in autogen/beta/network/adapters/workflow.py
def render_envelope(self, envelope):
    """Project ``EV_PACKET`` via :func:`_packet_text`; defer to
    :func:`default_render_envelope` for everything else (notably
    ``EV_TEXT`` for participant text emitted outside the
    workflow's round-end packet)."""
    if envelope.event_type == EV_PACKET:
        return _packet_text(envelope)
    return default_render_envelope(envelope)