Skip to content

RuleBasedArbiter

autogen.beta.network.hub.arbiter.RuleBasedArbiter #

Default arbiter — enforces per-agent :class:Rule exactly as the hub did before this seam existed.

Tenants extend or replace via :meth:Hub.register_arbiter. Pure function of the inputs; holds no state.

authorize_send async #

authorize_send(envelope, sender, sender_rule, recipients)
Source code in autogen/beta/network/hub/arbiter.py
async def authorize_send(
    self,
    envelope: "Envelope",
    sender: "Passport",
    sender_rule: "Rule",
    recipients: list["Passport"],
) -> Decision:
    # Self-routing is always allowed — protocol broadcasts
    # (``EV_CHANNEL_OPENED`` / ``EV_CHANNEL_CLOSED``) include the
    # creator in their own audience.
    for recipient in recipients:
        if recipient.agent_id == sender.agent_id:
            continue
        if not _match_any(recipient.name, sender_rule.access.outbound_to):
            return Deny(
                reason=f"sender {sender.name!r} not permitted to send to {recipient.name!r}",
            )
    # Delegation-depth check. ``0`` disables.
    depth_cap = sender_rule.limits.delegation_depth
    if depth_cap > 0 and envelope.depth > depth_cap:
        return Deny(
            reason=f"sender {sender.name!r} exceeded delegation_depth ({envelope.depth} > {depth_cap})",
        )
    return Allow()

authorize_inbox async #

authorize_inbox(envelope, recipient, recipient_rule, current_pending)
Source code in autogen/beta/network/hub/arbiter.py
async def authorize_inbox(
    self,
    envelope: "Envelope",
    recipient: "Passport",
    recipient_rule: "Rule",
    current_pending: int,
) -> Decision:
    max_pending = recipient_rule.limits.inbox.max_pending
    if max_pending > 0 and current_pending >= max_pending:
        assert recipient.agent_id is not None
        return Deny(
            reason=f"recipient {recipient.agent_id!r} inbox at capacity ({current_pending} >= {max_pending})",
            error=InboxFull,
        )
    return Allow()

authorize_dispatch async #

authorize_dispatch(envelope, sender, recipient, recipient_rule)
Source code in autogen/beta/network/hub/arbiter.py
async def authorize_dispatch(
    self,
    envelope: "Envelope",
    sender: "Passport",
    recipient: "Passport",
    recipient_rule: "Rule",
) -> Decision:
    if not _match_any(sender.name, recipient_rule.access.inbound_from):
        return Deny(reason=f"recipient {recipient.name!r} blocks inbound from {sender.name!r}")
    return Allow()

authorize_channel_open async #

authorize_channel_open(manifest, creator, creator_rule, invitees, invitee_rules, active_creator_channels)
Source code in autogen/beta/network/hub/arbiter.py
async def authorize_channel_open(
    self,
    manifest: "ChannelManifest",
    creator: "Passport",
    creator_rule: "Rule",
    invitees: list["Passport"],
    invitee_rules: list["Rule"],
    active_creator_channels: int,
) -> Decision:
    # Pre-flight inbound check on each invitee. The dispatch path
    # silently filters envelopes whose sender is not in the
    # recipient's whitelist; without this pre-check, an invite to
    # a recipient who blocks the creator would be dropped and the
    # creator would hang on the ack-timeout.
    for invitee, invitee_rule in zip(invitees, invitee_rules, strict=True):
        if invitee.agent_id == creator.agent_id:
            continue
        if not _match_any(creator.name, invitee_rule.access.inbound_from):
            return Deny(reason=f"invitee {invitee.name!r} does not accept inbound from {creator.name!r}")
    # Concurrency cap on the creator.
    max_channels = creator_rule.limits.max_concurrent_channels
    if max_channels > 0 and active_creator_channels >= max_channels:
        assert creator.agent_id is not None
        return Deny(
            reason=f"creator {creator.agent_id!r} exceeded max_concurrent_channels "
            f"({active_creator_channels} >= {max_channels})",
        )
    return Allow()

authorize_register async #

authorize_register(passport, resume, rule)
Source code in autogen/beta/network/hub/arbiter.py
async def authorize_register(
    self,
    passport: "Passport",
    resume: "Resume",
    rule: "Rule",
) -> Decision:
    return Allow()

resolve_unknown_audience async #

resolve_unknown_audience(envelope, unknown_ids)
Source code in autogen/beta/network/hub/arbiter.py
async def resolve_unknown_audience(
    self,
    envelope: "Envelope",
    unknown_ids: list[str],
) -> list[str] | None:
    # Single-hub behavior: drop unknown ids silently.
    return None