Skip to content

AutoCloseHandler

autogen.beta.network.hub.expectations.AutoCloseHandler #

Transition the channel to CLOSED.

Audit emission is owned by the AuditLog listener.

name class-attribute instance-attribute #

name = 'auto_close'

handle async #

handle(hub, channel_id, violation)
Source code in autogen/beta/network/hub/expectations.py
async def handle(
    self,
    hub: "Hub",
    channel_id: str,
    violation: Violation,
) -> None:
    metadata = hub._channels.get(channel_id)
    if metadata is None or metadata.is_terminal():
        return
    with contextlib.suppress(Exception):
        await hub.close_channel(
            channel_id,
            reason=f"expectation_violated:{violation.expectation.name}",
        )