Fire when the session has had no content envelope for T.
Session-wide — violator_ids is empty. Anchors on the latest content envelope's timestamp, falling back to session creation when no content has been posted.
name class-attribute instance-attribute
evaluate
evaluate(expectation, context)
Source code in autogen/beta/network/hub/expectations.py
| def evaluate(
self,
expectation: Expectation,
context: ExpectationContext,
) -> Violation | None:
if context.metadata.state != SessionState.ACTIVE:
return None
seconds = float(expectation.params.get("seconds", 3600))
anchor_iso = context.metadata.created_at
for env in reversed(context.wal):
if _is_content_event(env.event_type):
anchor_iso = env.created_at
break
elapsed = context.now_seconds - _parse_iso_seconds(anchor_iso)
if elapsed < seconds:
return None
return Violation(
expectation=expectation,
violator_ids=[],
detail={
"elapsed_seconds": elapsed,
"threshold_seconds": seconds,
},
)
|