Fire when a participant addressed by a send hasn't replied in T.
For each participant, finds the most recent EV_TEXT envelope addressed to them (broadcast or via audience). If they haven't sent any EV_TEXT after that timestamp, and that timestamp is older than params.seconds, they're a violator.
name class-attribute instance-attribute
evaluate
evaluate(expectation, context)
Source code in autogen/beta/network/hub/expectations.py
| def evaluate(
self,
expectation: Expectation,
context: ExpectationContext,
) -> Violation | None:
if context.metadata.state != SessionState.ACTIVE:
return None
seconds = float(expectation.params.get("seconds", 600))
# Index: latest EV_TEXT addressed to each participant + each
# participant's latest sent EV_TEXT.
latest_in: dict[str, Envelope] = {}
latest_out: dict[str, Envelope] = {}
for env in context.wal:
if env.event_type != EV_TEXT:
continue
sender = env.sender_id
latest_out[sender] = env
for p in context.metadata.participants:
pid = p.agent_id
if pid == sender:
continue
if env.audience is not None and pid not in env.audience:
continue
latest_in[pid] = env
violators: list[str] = []
for p in context.metadata.participants:
pid = p.agent_id
inbound = latest_in.get(pid)
if inbound is None:
continue
outbound = latest_out.get(pid)
inbound_at = _parse_iso_seconds(inbound.created_at)
if outbound is not None:
outbound_at = _parse_iso_seconds(outbound.created_at)
if outbound_at >= inbound_at:
continue
if context.now_seconds - inbound_at >= seconds:
violators.append(pid)
if not violators:
return None
return Violation(
expectation=expectation,
violator_ids=violators,
detail={"threshold_seconds": seconds},
)
|