Skip to content

MaxSilenceEvaluator

autogen.beta.network.hub.expectations.MaxSilenceEvaluator #

Fire when the session has had no content envelope for T.

Session-wide — violator_ids is empty. Anchors on the latest content envelope's timestamp, falling back to session creation when no content has been posted.

name class-attribute instance-attribute #

name = 'max_silence'

evaluate #

evaluate(expectation, context)
Source code in autogen/beta/network/hub/expectations.py
def evaluate(
    self,
    expectation: Expectation,
    context: ExpectationContext,
) -> Violation | None:
    if context.metadata.state != SessionState.ACTIVE:
        return None
    seconds = float(expectation.params.get("seconds", 3600))
    anchor_iso = context.metadata.created_at
    for env in reversed(context.wal):
        if _is_content_event(env.event_type):
            anchor_iso = env.created_at
            break
    elapsed = context.now_seconds - _parse_iso_seconds(anchor_iso)
    if elapsed < seconds:
        return None
    return Violation(
        expectation=expectation,
        violator_ids=[],
        detail={
            "elapsed_seconds": elapsed,
            "threshold_seconds": seconds,
        },
    )