Skip to content

ApiKeyAuth

autogen.beta.network.auth.ApiKeyAuth #

ApiKeyAuth(keys=None, *, resolver=None)

API-key adapter — constant-time token compare.

Expects the claim to carry {"token": "<secret>"}. The expected token is resolved by passport.name:

  • keys — static Mapping[name -> token] for in-config keys.
  • resolver — optional async callable for dynamic lookups (database, secret manager, …). Returns None to signal an unknown name. keys is consulted first; resolver is the fallback.

Constructed with no entries means every name is unknown, every validation fails — useful as a strict-mode default for tests.

Source code in autogen/beta/network/auth.py
def __init__(
    self,
    keys: Mapping[str, str] | None = None,
    *,
    resolver: Callable[[str], Awaitable[str | None]] | None = None,
) -> None:
    # __init__ stores params; no side effects.
    self._keys: dict[str, str] = dict(keys) if keys is not None else {}
    self._resolver = resolver

scheme class-attribute instance-attribute #

scheme = 'api_key'

validate async #

validate(passport, claim)
Source code in autogen/beta/network/auth.py
async def validate(self, passport: Passport, claim: dict[str, Any]) -> None:
    token = claim.get("token")
    if not isinstance(token, str) or not token:
        raise AuthError("api_key claim missing required string field 'token'")

    expected = self._keys.get(passport.name)
    if expected is None and self._resolver is not None:
        expected = await self._resolver(passport.name)
    if expected is None:
        raise AuthError(f"no api_key registered for {passport.name!r}")

    # Constant-time compare so token rejection latency does not leak
    # which prefix matched. ``compare_digest`` requires equal-length
    # bytes; encode both sides as UTF-8.
    if not hmac.compare_digest(expected.encode("utf-8"), token.encode("utf-8")):
        raise AuthError(f"api_key mismatch for {passport.name!r}")