Skip to content

AutoCloseHandler

autogen.beta.network.hub.expectations.AutoCloseHandler #

Audit + transition the session to CLOSED.

name class-attribute instance-attribute #

name = 'auto_close'

handle async #

handle(hub, session_id, violation)
Source code in autogen/beta/network/hub/expectations.py
async def handle(
    self,
    hub: "Hub",
    session_id: str,
    violation: Violation,
) -> None:
    await _audit_violation(hub, session_id, violation)
    metadata = hub._sessions.get(session_id)
    if metadata is None or metadata.is_terminal():
        return
    with contextlib.suppress(Exception):
        await hub.close_session(
            session_id,
            reason=f"expectation_violated:{violation.expectation.name}",
        )