Audit + broadcast EV_EXPECTATION_VIOLATED to every participant.
name class-attribute instance-attribute
handle async
handle(hub, session_id, violation)
Source code in autogen/beta/network/hub/expectations.py
| async def handle(
self,
hub: "Hub",
session_id: str,
violation: Violation,
) -> None:
await _audit_violation(hub, session_id, violation)
metadata = hub._sessions.get(session_id)
if metadata is None or metadata.is_terminal():
return
envelope = Envelope(
session_id=session_id,
sender_id=metadata.creator_id,
audience=None,
event_type=EV_EXPECTATION_VIOLATED,
event_data={
"expectation": violation.expectation.name,
"violators": list(violation.violator_ids),
"detail": dict(violation.detail),
},
)
# Posting violations is best-effort — a closed/closing
# session shouldn't crash the sweeper.
with contextlib.suppress(Exception):
await hub.post_envelope(envelope)
|