Skip to content

ConsultingAdapter

autogen.beta.network.adapters.consulting.ConsultingAdapter #

ConsultingAdapter()

Strict 1Q1R: initiator → 1 envelope, respondent → 1 reply, auto-close.

Knobs: none. Participants: exactly 2 (initiator + respondent).

Default view: :class:FullTranscript (transcripts are short).

Source code in autogen/beta/network/adapters/consulting.py
def __init__(self) -> None:
    # __init__ stores params; manifest is a class-level constant
    # constructed once.
    self.manifest = SessionManifest(
        type=CONSULTING_TYPE,
        version=1,
        participants=ParticipantSchema(
            min=2,
            max=2,
            roles=[ParticipantRole.INITIATOR.value, ParticipantRole.RESPONDENT.value],
        ),
        knobs_schema={},
        default_view_policy=FullTranscript.name,
        expectations=[
            Expectation(
                name="acks_within",
                on_violation="auto_close",
                params={"seconds": 30},
            ),
            Expectation(
                name="reply_within",
                on_violation="auto_close",
                params={"seconds": 600},
            ),
        ],
    )

manifest instance-attribute #

manifest = SessionManifest(type=CONSULTING_TYPE, version=1, participants=ParticipantSchema(min=2, max=2, roles=[INITIATOR.value, RESPONDENT.value]), knobs_schema={}, default_view_policy=name, expectations=[Expectation(name='acks_within', on_violation='auto_close', params={'seconds': 30}), Expectation(name='reply_within', on_violation='auto_close', params={'seconds': 600})])

initial_state #

initial_state(metadata)
Source code in autogen/beta/network/adapters/consulting.py
def initial_state(self, metadata: SessionMetadata) -> ConsultingState:
    return ConsultingState()

fold #

fold(envelope, state)
Source code in autogen/beta/network/adapters/consulting.py
def fold(self, envelope: Envelope, state: ConsultingState) -> ConsultingState:
    if _is_session_protocol_event(envelope) or _is_task_event(envelope):
        return state
    if envelope.event_type != EV_TEXT:
        return state
    if not state.initiator_sent:
        return ConsultingState(
            initiator_sent=True,
            respondent_replied=False,
            last_envelope_id=envelope.envelope_id,
        )
    if not state.respondent_replied:
        return ConsultingState(
            initiator_sent=True,
            respondent_replied=True,
            last_envelope_id=envelope.envelope_id,
        )
    return state

validate_create #

validate_create(metadata)
Source code in autogen/beta/network/adapters/consulting.py
def validate_create(self, metadata: SessionMetadata) -> None:
    roles = {p.role for p in metadata.participants}
    if ParticipantRole.INITIATOR not in roles:
        raise ProtocolError("consulting requires exactly one initiator")
    if ParticipantRole.RESPONDENT not in roles:
        raise ProtocolError("consulting requires exactly one respondent")
    if len(metadata.participants) != 2:
        raise ProtocolError(f"consulting requires exactly 2 participants, got {len(metadata.participants)}")

validate_send #

validate_send(metadata, envelope, state)
Source code in autogen/beta/network/adapters/consulting.py
def validate_send(
    self,
    metadata: SessionMetadata,
    envelope: Envelope,
    state: ConsultingState,
) -> None:
    # Hub-emitted protocol events and task envelopes bypass the 1Q1R rule.
    if _is_session_protocol_event(envelope) or _is_task_event(envelope):
        return
    if envelope.event_type != EV_TEXT:
        # Unknown event types are accepted as informational data on the
        # session.
        return
    if state.initiator_sent and state.respondent_replied:
        raise ProtocolError(f"consulting session {metadata.session_id!r} already complete")
    if not state.initiator_sent:
        initiator_id = self._initiator_id(metadata)
        if envelope.sender_id != initiator_id:
            raise ProtocolError(
                f"consulting session {metadata.session_id!r} expects first send "
                f"from initiator {initiator_id!r}, got {envelope.sender_id!r}"
            )
    else:
        respondent_id = self._respondent_id(metadata)
        if envelope.sender_id != respondent_id:
            raise ProtocolError(
                f"consulting session {metadata.session_id!r} expects reply "
                f"from respondent {respondent_id!r}, got {envelope.sender_id!r}"
            )

on_accepted #

on_accepted(metadata, envelope, state)
Source code in autogen/beta/network/adapters/consulting.py
def on_accepted(
    self,
    metadata: SessionMetadata,
    envelope: Envelope,
    state: ConsultingState,
) -> AdapterResult:
    if envelope.event_type != EV_TEXT:
        return AdapterResult()
    if state.initiator_sent and state.respondent_replied:
        # Direct to CLOSED — consulting has no async cleanup phase.
        # The transitional ``CLOSING`` state is reserved for adapters
        # that need a quiescence window (e.g., draining streamed
        # chunks before close).
        return AdapterResult(
            next_state=SessionState.CLOSED,
            auto_close_reason="consulting_complete",
        )
    return AdapterResult()

default_view_policy #

default_view_policy(metadata, participant_id)
Source code in autogen/beta/network/adapters/consulting.py
def default_view_policy(
    self,
    metadata: SessionMetadata,
    participant_id: str,
) -> ViewPolicy:
    return FullTranscript()

extract_turn_input #

extract_turn_input(envelope)
Source code in autogen/beta/network/adapters/consulting.py
def extract_turn_input(self, envelope):
    return default_extract_turn_input(envelope)

build_round_envelope #

build_round_envelope(metadata, sender_id, reply, events, state, hub)
Source code in autogen/beta/network/adapters/consulting.py
def build_round_envelope(self, metadata, sender_id, reply, events, state, hub):
    return default_build_round_envelope(metadata, sender_id, reply, events, state, hub)

render_envelope #

render_envelope(envelope)
Source code in autogen/beta/network/adapters/consulting.py
def render_envelope(self, envelope):
    return default_render_envelope(envelope)