API-key adapter — constant-time token compare.
Expects the claim to carry {"token": "<secret>"}. The expected token is resolved by passport.name:
keys — static Mapping[name -> token] for in-config keys. resolver — optional async callable for dynamic lookups (database, secret manager, …). Returns None to signal an unknown name. keys is consulted first; resolver is the fallback.
Constructed with no entries means every name is unknown, every validation fails — useful as a strict-mode default for tests.
Source code in autogen/beta/network/auth.py
| def __init__(
self,
keys: Mapping[str, str] | None = None,
*,
resolver: Callable[[str], Awaitable[str | None]] | None = None,
) -> None:
# __init__ stores params; no side effects.
self._keys: dict[str, str] = dict(keys) if keys is not None else {}
self._resolver = resolver
|
scheme class-attribute instance-attribute
validate async
validate(passport, claim)
Source code in autogen/beta/network/auth.py
| async def validate(self, passport: Passport, claim: dict[str, Any]) -> None:
token = claim.get("token")
if not isinstance(token, str) or not token:
raise AuthError("api_key claim missing required string field 'token'")
expected = self._keys.get(passport.name)
if expected is None and self._resolver is not None:
expected = await self._resolver(passport.name)
if expected is None:
raise AuthError(f"no api_key registered for {passport.name!r}")
# Constant-time compare so token rejection latency does not leak
# which prefix matched. ``compare_digest`` requires equal-length
# bytes; encode both sides as UTF-8.
if not hmac.compare_digest(expected.encode("utf-8"), token.encode("utf-8")):
raise AuthError(f"api_key mismatch for {passport.name!r}")
|