Skip to content

NotifySessionHandler

autogen.beta.network.hub.expectations.NotifySessionHandler #

Audit + broadcast EV_EXPECTATION_VIOLATED to every participant.

name class-attribute instance-attribute #

name = 'notify_session'

handle async #

handle(hub, session_id, violation)
Source code in autogen/beta/network/hub/expectations.py
async def handle(
    self,
    hub: "Hub",
    session_id: str,
    violation: Violation,
) -> None:
    await _audit_violation(hub, session_id, violation)
    metadata = hub._sessions.get(session_id)
    if metadata is None or metadata.is_terminal():
        return
    envelope = Envelope(
        session_id=session_id,
        sender_id=metadata.creator_id,
        audience=None,
        event_type=EV_EXPECTATION_VIOLATED,
        event_data={
            "expectation": violation.expectation.name,
            "violators": list(violation.violator_ids),
            "detail": dict(violation.detail),
        },
    )
    # Posting violations is best-effort — a closed/closing
    # session shouldn't crash the sweeper.
    with contextlib.suppress(Exception):
        await hub.post_envelope(envelope)