Skip to content

HubListener

autogen.beta.network.hub.listener.HubListener #

Bases: Protocol

Observer Protocol for hub state transitions.

Register via :meth:Hub.register_listener. Implementations override only the methods they care about; defaults are no-ops.

on_envelope_posted async #

on_envelope_posted(envelope, metadata)

An envelope was validated, WAL-appended, folded, and dispatched.

Source code in autogen/beta/network/hub/listener.py
async def on_envelope_posted(
    self,
    envelope: "Envelope",
    metadata: "ChannelMetadata",
) -> None:
    """An envelope was validated, WAL-appended, folded, and dispatched."""

on_envelope_rejected async #

on_envelope_rejected(envelope, reason)

An envelope was rejected before WAL append.

reason is the typed error (AccessDeniedError, ProtocolError, InboxFull, RateLimited, …) the sender saw. Use this for tenant-side metrics / alerting on rejection rates.

Source code in autogen/beta/network/hub/listener.py
async def on_envelope_rejected(
    self,
    envelope: "Envelope",
    reason: "NetworkError",
) -> None:
    """An envelope was rejected before WAL append.

    ``reason`` is the typed error (``AccessDeniedError``,
    ``ProtocolError``, ``InboxFull``, ``RateLimited``, …) the
    sender saw. Use this for tenant-side metrics / alerting on
    rejection rates.
    """

on_dispatch_failed async #

on_dispatch_failed(envelope, recipient_id, reason)

Dispatch of an accepted envelope to a specific recipient failed.

Per-recipient — the rest of the audience may have received normally. Causes typically reflect a closed endpoint or a downstream link error. Sender still sees the post_envelope return as success because the WAL committed.

Source code in autogen/beta/network/hub/listener.py
async def on_dispatch_failed(
    self,
    envelope: "Envelope",
    recipient_id: str,
    reason: BaseException,
) -> None:
    """Dispatch of an accepted envelope to a specific recipient failed.

    Per-recipient — the rest of the audience may have received
    normally. Causes typically reflect a closed endpoint or a
    downstream link error. Sender still sees the
    ``post_envelope`` return as success because the WAL committed.
    """

on_channel_event async #

on_channel_event(channel_id, kind, payload)

A channel-lifecycle event fired.

kind is one of "opened", "closed", "expired", "participant_removed", "participant_hidden". payload carries event-specific fields (close reason, expired at, etc.).

Source code in autogen/beta/network/hub/listener.py
async def on_channel_event(
    self,
    channel_id: str,
    kind: str,
    payload: dict,
) -> None:
    """A channel-lifecycle event fired.

    ``kind`` is one of ``"opened"``, ``"closed"``, ``"expired"``,
    ``"participant_removed"``, ``"participant_hidden"``. ``payload``
    carries event-specific fields (close ``reason``, expired
    ``at``, etc.).
    """

on_agent_event async #

on_agent_event(agent_id, kind, payload)

An identity-lifecycle event fired.

kind is one of "registered", "unregistered", "resume_set", "skill_set", "rule_set", "observation_recorded". payload carries kind-specific fields (e.g. {"passport": Passport} for "registered").

Source code in autogen/beta/network/hub/listener.py
async def on_agent_event(
    self,
    agent_id: str,
    kind: str,
    payload: dict,
) -> None:
    """An identity-lifecycle event fired.

    ``kind`` is one of ``"registered"``, ``"unregistered"``,
    ``"resume_set"``, ``"skill_set"``, ``"rule_set"``,
    ``"observation_recorded"``. ``payload`` carries
    kind-specific fields (e.g. ``{"passport": Passport}`` for
    ``"registered"``).
    """

on_expectation_fired async #

on_expectation_fired(channel_id, expectation, violation)

An expectation evaluator emitted a violation.

Fires once per (channel, expectation, violator) per evaluator tick (the hub dedupes repeat fires of the same violation key).

Source code in autogen/beta/network/hub/listener.py
async def on_expectation_fired(
    self,
    channel_id: str,
    expectation: "Expectation",
    violation: "Violation",
) -> None:
    """An expectation evaluator emitted a violation.

    Fires once per ``(channel, expectation, violator)`` per
    evaluator tick (the hub dedupes repeat fires of the same
    violation key).
    """

on_turn_failed async #

on_turn_failed(channel_id, agent_id, envelope_id, exc)

A notify handler raised while processing an inbound envelope.

The default notify handler traps agent.ask and build_round_envelope exceptions and emits this event. The channel stays alive; no reply envelope is posted. The application chooses how to react (retry, escalate, surface to a UI).

Source code in autogen/beta/network/hub/listener.py
async def on_turn_failed(
    self,
    channel_id: str,
    agent_id: str,
    envelope_id: str,
    exc: BaseException,
) -> None:
    """A notify handler raised while processing an inbound envelope.

    The default notify handler traps ``agent.ask`` and
    ``build_round_envelope`` exceptions and emits this event. The
    channel stays alive; no reply envelope is posted. The
    application chooses how to react (retry, escalate, surface to
    a UI).
    """

on_task_event async #

on_task_event(task_id, kind, payload)

A task-lifecycle event fired.

kind is one of "started", "progress", "completed", "failed", "expired", "cancelled", "mirror_failed". The mirror-failed kind signals that TaskMirror could not forward an observation to the hub — the task itself may still be advancing locally.

Source code in autogen/beta/network/hub/listener.py
async def on_task_event(
    self,
    task_id: str,
    kind: str,
    payload: dict,
) -> None:
    """A task-lifecycle event fired.

    ``kind`` is one of ``"started"``, ``"progress"``,
    ``"completed"``, ``"failed"``, ``"expired"``, ``"cancelled"``,
    ``"mirror_failed"``. The mirror-failed kind signals that
    ``TaskMirror`` could not forward an observation to the hub —
    the task itself may still be advancing locally.
    """

on_inbox_pressure async #

on_inbox_pressure(agent_id, pending, cap)

A recipient's inbox crossed the high-water mark.

Fired when pending first crosses Rule.limits.inbox.high_water (default: 80% of max_pending). Fires at most once per crossing — does not re-fire on every subsequent envelope while above the mark. Operators wire this to a backpressure dashboard / alert.

Source code in autogen/beta/network/hub/listener.py
async def on_inbox_pressure(
    self,
    agent_id: str,
    pending: int,
    cap: int,
) -> None:
    """A recipient's inbox crossed the high-water mark.

    Fired when ``pending`` first crosses
    ``Rule.limits.inbox.high_water`` (default: 80% of
    ``max_pending``). Fires at most once per crossing — does not
    re-fire on every subsequent envelope while above the mark.
    Operators wire this to a backpressure dashboard / alert.
    """