Strict 1Q1R: initiator → 1 envelope, respondent → 1 reply, auto-close.
Knobs: none. Participants: exactly 2 (initiator + respondent).
Default view: :class:FullTranscript (transcripts are short).
Source code in autogen/beta/network/adapters/consulting.py
| def __init__(self) -> None:
# __init__ stores params; manifest is a class-level constant
# constructed once.
self.manifest = SessionManifest(
type=CONSULTING_TYPE,
version=1,
participants=ParticipantSchema(
min=2,
max=2,
roles=[ParticipantRole.INITIATOR.value, ParticipantRole.RESPONDENT.value],
),
knobs_schema={},
default_view_policy=FullTranscript.name,
expectations=[
Expectation(
name="acks_within",
on_violation="auto_close",
params={"seconds": 30},
),
Expectation(
name="reply_within",
on_violation="auto_close",
params={"seconds": 600},
),
],
)
|
manifest instance-attribute
manifest = SessionManifest(type=CONSULTING_TYPE, version=1, participants=ParticipantSchema(min=2, max=2, roles=[INITIATOR.value, RESPONDENT.value]), knobs_schema={}, default_view_policy=name, expectations=[Expectation(name='acks_within', on_violation='auto_close', params={'seconds': 30}), Expectation(name='reply_within', on_violation='auto_close', params={'seconds': 600})])
initial_state
Source code in autogen/beta/network/adapters/consulting.py
| def initial_state(self, metadata: SessionMetadata) -> ConsultingState:
return ConsultingState()
|
fold
Source code in autogen/beta/network/adapters/consulting.py
| def fold(self, envelope: Envelope, state: ConsultingState) -> ConsultingState:
if _is_session_protocol_event(envelope) or _is_task_event(envelope):
return state
if envelope.event_type != EV_TEXT:
return state
if not state.initiator_sent:
return ConsultingState(
initiator_sent=True,
respondent_replied=False,
last_envelope_id=envelope.envelope_id,
)
if not state.respondent_replied:
return ConsultingState(
initiator_sent=True,
respondent_replied=True,
last_envelope_id=envelope.envelope_id,
)
return state
|
validate_create
validate_create(metadata)
Source code in autogen/beta/network/adapters/consulting.py
| def validate_create(self, metadata: SessionMetadata) -> None:
roles = {p.role for p in metadata.participants}
if ParticipantRole.INITIATOR not in roles:
raise ProtocolError("consulting requires exactly one initiator")
if ParticipantRole.RESPONDENT not in roles:
raise ProtocolError("consulting requires exactly one respondent")
if len(metadata.participants) != 2:
raise ProtocolError(f"consulting requires exactly 2 participants, got {len(metadata.participants)}")
|
validate_send
validate_send(metadata, envelope, state)
Source code in autogen/beta/network/adapters/consulting.py
| def validate_send(
self,
metadata: SessionMetadata,
envelope: Envelope,
state: ConsultingState,
) -> None:
# Hub-emitted protocol events and task envelopes bypass the 1Q1R rule.
if _is_session_protocol_event(envelope) or _is_task_event(envelope):
return
if envelope.event_type != EV_TEXT:
# Unknown event types are accepted as informational data on the
# session.
return
if state.initiator_sent and state.respondent_replied:
raise ProtocolError(f"consulting session {metadata.session_id!r} already complete")
if not state.initiator_sent:
initiator_id = self._initiator_id(metadata)
if envelope.sender_id != initiator_id:
raise ProtocolError(
f"consulting session {metadata.session_id!r} expects first send "
f"from initiator {initiator_id!r}, got {envelope.sender_id!r}"
)
else:
respondent_id = self._respondent_id(metadata)
if envelope.sender_id != respondent_id:
raise ProtocolError(
f"consulting session {metadata.session_id!r} expects reply "
f"from respondent {respondent_id!r}, got {envelope.sender_id!r}"
)
|
on_accepted
on_accepted(metadata, envelope, state)
Source code in autogen/beta/network/adapters/consulting.py
| def on_accepted(
self,
metadata: SessionMetadata,
envelope: Envelope,
state: ConsultingState,
) -> AdapterResult:
if envelope.event_type != EV_TEXT:
return AdapterResult()
if state.initiator_sent and state.respondent_replied:
# Direct to CLOSED — consulting has no async cleanup phase.
# The transitional ``CLOSING`` state is reserved for adapters
# that need a quiescence window (e.g., draining streamed
# chunks before close).
return AdapterResult(
next_state=SessionState.CLOSED,
auto_close_reason="consulting_complete",
)
return AdapterResult()
|
default_view_policy
default_view_policy(metadata, participant_id)
Source code in autogen/beta/network/adapters/consulting.py
| def default_view_policy(
self,
metadata: SessionMetadata,
participant_id: str,
) -> ViewPolicy:
return FullTranscript()
|
extract_turn_input(envelope)
Source code in autogen/beta/network/adapters/consulting.py
| def extract_turn_input(self, envelope):
return default_extract_turn_input(envelope)
|
build_round_envelope
build_round_envelope(metadata, sender_id, reply, events, state, hub)
Source code in autogen/beta/network/adapters/consulting.py
| def build_round_envelope(self, metadata, sender_id, reply, events, state, hub):
return default_build_round_envelope(metadata, sender_id, reply, events, state, hub)
|
render_envelope
render_envelope(envelope)
Source code in autogen/beta/network/adapters/consulting.py
| def render_envelope(self, envelope):
return default_render_envelope(envelope)
|