Skip to content

ConsultingAdapter

autogen.beta.network.adapters.consulting.ConsultingAdapter #

ConsultingAdapter()

Strict 1Q1R: initiator → 1 envelope, respondent → 1 reply, auto-close.

Knobs: none. Participants: exactly 2 (initiator + respondent).

Default view: :class:FullTranscript (transcripts are short).

Source code in autogen/beta/network/adapters/consulting.py
def __init__(self) -> None:
    # __init__ stores params; manifest is a class-level constant
    # constructed once.
    # Per-client say-tool cache: avoids paying the ``fast_depends``
    # JSON-schema build cost on every notify-handler turn. Keyed by
    # ``client.agent_id`` (stable across the client's lifetime). An
    # unregister + re-register yields a new agent_id, so the dead
    # entry from the prior generation is leaked memory bounded by
    # the total registration count — acceptable for V1.
    self._say_tool_cache: dict[str, object] = {}
    self.manifest = ChannelManifest(
        type=CONSULTING_TYPE,
        version=1,
        participants=ParticipantSchema(
            min=2,
            max=2,
            roles=[ParticipantRole.INITIATOR.value, ParticipantRole.RESPONDENT.value],
        ),
        knobs_schema={},
        default_view_policy=FullTranscript.name,
        expectations=[
            Expectation(
                name="acks_within",
                on_violation="auto_close",
                params={"seconds": 30},
            ),
            Expectation(
                name="reply_within",
                on_violation="auto_close",
                params={"seconds": 600},
            ),
        ],
    )

manifest instance-attribute #

manifest = ChannelManifest(type=CONSULTING_TYPE, version=1, participants=ParticipantSchema(min=2, max=2, roles=[INITIATOR.value, RESPONDENT.value]), knobs_schema={}, default_view_policy=name, expectations=[Expectation(name='acks_within', on_violation='auto_close', params={'seconds': 30}), Expectation(name='reply_within', on_violation='auto_close', params={'seconds': 600})])

initial_state #

initial_state(metadata)
Source code in autogen/beta/network/adapters/consulting.py
def initial_state(self, metadata: ChannelMetadata) -> ConsultingState:
    return ConsultingState()

fold #

fold(envelope, state)
Source code in autogen/beta/network/adapters/consulting.py
def fold(self, envelope: Envelope, state: ConsultingState) -> ConsultingState:
    if _is_channel_protocol_event(envelope) or _is_task_event(envelope):
        return state
    if envelope.event_type != EV_TEXT:
        return state
    if not state.initiator_sent:
        return ConsultingState(
            initiator_sent=True,
            respondent_replied=False,
            last_envelope_id=envelope.envelope_id,
        )
    if not state.respondent_replied:
        return ConsultingState(
            initiator_sent=True,
            respondent_replied=True,
            last_envelope_id=envelope.envelope_id,
        )
    return state

validate_create #

validate_create(metadata)
Source code in autogen/beta/network/adapters/consulting.py
def validate_create(self, metadata: ChannelMetadata) -> None:
    roles = {p.role for p in metadata.participants}
    if ParticipantRole.INITIATOR not in roles:
        raise ProtocolError("consulting requires exactly one initiator")
    if ParticipantRole.RESPONDENT not in roles:
        raise ProtocolError("consulting requires exactly one respondent")
    if len(metadata.participants) != 2:
        raise ProtocolError(f"consulting requires exactly 2 participants, got {len(metadata.participants)}")

validate_send #

validate_send(metadata, envelope, state)
Source code in autogen/beta/network/adapters/consulting.py
def validate_send(
    self,
    metadata: ChannelMetadata,
    envelope: Envelope,
    state: ConsultingState,
) -> None:
    # Hub-emitted protocol events and task envelopes bypass the 1Q1R rule.
    if _is_channel_protocol_event(envelope) or _is_task_event(envelope):
        return
    if envelope.event_type != EV_TEXT:
        # Unknown event types are accepted as informational data on the
        # channel.
        return
    if state.initiator_sent and state.respondent_replied:
        raise ProtocolError(f"consulting channel {metadata.channel_id!r} already complete")
    if not state.initiator_sent:
        initiator_id = self._initiator_id(metadata)
        if envelope.sender_id != initiator_id:
            raise ProtocolError(
                f"consulting channel {metadata.channel_id!r} expects first send "
                f"from initiator {initiator_id!r}, got {envelope.sender_id!r}"
            )
    else:
        respondent_id = self._respondent_id(metadata)
        if envelope.sender_id != respondent_id:
            raise ProtocolError(
                f"consulting channel {metadata.channel_id!r} expects reply "
                f"from respondent {respondent_id!r}, got {envelope.sender_id!r}"
            )

on_accepted #

on_accepted(metadata, envelope, state)
Source code in autogen/beta/network/adapters/consulting.py
def on_accepted(
    self,
    metadata: ChannelMetadata,
    envelope: Envelope,
    state: ConsultingState,
) -> AdapterResult:
    if envelope.event_type != EV_TEXT:
        return AdapterResult()
    if state.initiator_sent and state.respondent_replied:
        # Direct to CLOSED — consulting has no async cleanup phase.
        # The transitional ``CLOSING`` state is reserved for adapters
        # that need a quiescence window (e.g., draining streamed
        # chunks before close).
        return AdapterResult(
            next_state=ChannelState.CLOSED,
            auto_close_reason="consulting_complete",
        )
    return AdapterResult()

expected_next #

expected_next(metadata, state)
Source code in autogen/beta/network/adapters/consulting.py
def expected_next(
    self,
    metadata: ChannelMetadata,
    state: ConsultingState,
) -> ExpectedTurn | None:
    # 1Q1R cycle: initiator first, then respondent. Once both have
    # spoken the cycle is complete and no participant is expected.
    if not state.initiator_sent:
        return ExpectedTurn(
            agent_id=self._initiator_id(metadata),
            triggering_envelope_id=state.last_envelope_id,
        )
    if not state.respondent_replied:
        return ExpectedTurn(
            agent_id=self._respondent_id(metadata),
            triggering_envelope_id=state.last_envelope_id,
        )
    return None

default_view_policy #

default_view_policy(metadata, participant_id)
Source code in autogen/beta/network/adapters/consulting.py
def default_view_policy(
    self,
    metadata: ChannelMetadata,
    participant_id: str,
) -> ViewPolicy:
    return FullTranscript()

extract_turn_input #

extract_turn_input(envelope)
Source code in autogen/beta/network/adapters/consulting.py
def extract_turn_input(self, envelope):
    return default_extract_turn_input(envelope)

build_round_envelope #

build_round_envelope(metadata, sender_id, reply, events, state, hub)
Source code in autogen/beta/network/adapters/consulting.py
def build_round_envelope(self, metadata, sender_id, reply, events, state, hub):
    return default_build_round_envelope(metadata, sender_id, reply, events, state, hub)

render_envelope #

render_envelope(envelope)
Source code in autogen/beta/network/adapters/consulting.py
def render_envelope(self, envelope):
    return default_render_envelope(envelope)

tools_for #

tools_for(client, metadata, state, participant_id)

Consulting offers say to the participant whose turn it is.

State gating: the initiator has the floor until they send the prompt; the respondent has the floor after the prompt lands and before they reply. Once the respondent replies the channel auto-closes — no further turns.

The resolved tool is memoized per-client on the adapter (see :meth:_cached_say_tool) so the per-turn fast_depends schema build cost is paid once.

Source code in autogen/beta/network/adapters/consulting.py
def tools_for(self, client, metadata, state, participant_id):
    """Consulting offers ``say`` to the participant whose turn it is.

    State gating: the initiator has the floor until they send the
    prompt; the respondent has the floor after the prompt lands and
    before they reply. Once the respondent replies the channel
    auto-closes — no further turns.

    The resolved tool is memoized per-client on the adapter (see
    :meth:`_cached_say_tool`) so the per-turn ``fast_depends`` schema
    build cost is paid once.
    """
    initiator_id = next(
        (p.agent_id for p in metadata.participants if p.role == ParticipantRole.INITIATOR),
        None,
    )
    if participant_id == initiator_id and not state.initiator_sent:
        return [self._cached_say_tool(client)]
    if participant_id != initiator_id and state.initiator_sent and not state.respondent_replied:
        return [self._cached_say_tool(client)]
    return []

build_text_envelope #

build_text_envelope(channel_id, sender_id, text, *, audience=None, causation_id=None)
Source code in autogen/beta/network/adapters/consulting.py
def build_text_envelope(self, channel_id, sender_id, text, *, audience=None, causation_id=None):
    return default_build_text_envelope(channel_id, sender_id, text, audience=audience, causation_id=causation_id)

build_packet_envelope #

build_packet_envelope(channel_id, sender_id, body, *, handoff=None, context_set=None, audience=None, causation_id=None)
Source code in autogen/beta/network/adapters/consulting.py
def build_packet_envelope(
    self,
    channel_id,
    sender_id,
    body,
    *,
    handoff=None,
    context_set=None,
    audience=None,
    causation_id=None,
):
    return default_build_packet_envelope(
        channel_id,
        sender_id,
        body,
        handoff=handoff,
        context_set=context_set,
        audience=audience,
        causation_id=causation_id,
    )