keycloak fastapi integration

Keycloak Integration for FastAPI

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "keycloak fastapi integration" with this command: npx skills add lobbi-docs/claude/lobbi-docs-claude-keycloak-fastapi-integration

Keycloak Integration for FastAPI

This skill provides patterns for integrating Keycloak as an identity provider with FastAPI applications using OIDC/OAuth2.

Configuration

Settings

from pydantic_settings import BaseSettings

class KeycloakSettings(BaseSettings): keycloak_url: str = "https://auth.example.com" keycloak_realm: str = "my-realm" keycloak_client_id: str = "my-api" keycloak_client_secret: str = ""

@property
def openid_config_url(self) -> str:
    return f"{self.keycloak_url}/realms/{self.keycloak_realm}/.well-known/openid-configuration"

@property
def jwks_url(self) -> str:
    return f"{self.keycloak_url}/realms/{self.keycloak_realm}/protocol/openid-connect/certs"

@property
def token_url(self) -> str:
    return f"{self.keycloak_url}/realms/{self.keycloak_realm}/protocol/openid-connect/token"

class Config:
    env_file = ".env"

JWT Token Validation

Token Validator

import httpx from jose import jwt, JWTError from jose.jwk import construct from functools import lru_cache from typing import Optional, Dict, Any

class KeycloakTokenValidator: def init(self, settings: KeycloakSettings): self.settings = settings self._jwks: Optional[Dict] = None

async def get_jwks(self) -> Dict:
    if self._jwks is None:
        async with httpx.AsyncClient() as client:
            response = await client.get(self.settings.jwks_url)
            response.raise_for_status()
            self._jwks = response.json()
    return self._jwks

async def validate_token(self, token: str) -> Dict[str, Any]:
    try:
        jwks = await self.get_jwks()

        # Get key ID from token header
        unverified_header = jwt.get_unverified_header(token)
        kid = unverified_header.get("kid")

        # Find matching key
        key = None
        for jwk in jwks.get("keys", []):
            if jwk.get("kid") == kid:
                key = jwk
                break

        if not key:
            raise JWTError("Key not found")

        # Decode and validate
        payload = jwt.decode(
            token,
            key,
            algorithms=["RS256"],
            audience=self.settings.keycloak_client_id,
            issuer=f"{self.settings.keycloak_url}/realms/{self.settings.keycloak_realm}"
        )

        return payload

    except JWTError as e:
        raise ValueError(f"Invalid token: {str(e)}")

FastAPI Dependencies

Current User Dependency

from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional

security = HTTPBearer(auto_error=False)

class TokenUser: def init(self, payload: Dict[str, Any]): self.sub: str = payload.get("sub", "") self.email: str = payload.get("email", "") self.name: str = payload.get("name", "") self.preferred_username: str = payload.get("preferred_username", "") self.roles: list = self._extract_roles(payload) self.raw_payload = payload

def _extract_roles(self, payload: Dict) -> list:
    # Realm roles
    realm_roles = payload.get("realm_access", {}).get("roles", [])

    # Client roles
    resource_access = payload.get("resource_access", {})
    client_roles = resource_access.get(
        settings.keycloak_client_id, {}
    ).get("roles", [])

    return list(set(realm_roles + client_roles))

async def get_token_validator() -> KeycloakTokenValidator: return KeycloakTokenValidator(get_settings())

async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), validator: KeycloakTokenValidator = Depends(get_token_validator) ) -> TokenUser: if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", headers={"WWW-Authenticate": "Bearer"} )

try:
    payload = await validator.validate_token(credentials.credentials)
    return TokenUser(payload)
except ValueError as e:
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail=str(e),
        headers={"WWW-Authenticate": "Bearer"}
    )

async def get_current_user_optional( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), validator: KeycloakTokenValidator = Depends(get_token_validator) ) -> Optional[TokenUser]: if not credentials: return None try: payload = await validator.validate_token(credentials.credentials) return TokenUser(payload) except ValueError: return None

Role-Based Access Control

from functools import wraps from typing import List

def require_roles(*required_roles: str): """Dependency that checks for required roles.""" async def role_checker( user: TokenUser = Depends(get_current_user) ) -> TokenUser: if not any(role in user.roles for role in required_roles): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Required roles: {', '.join(required_roles)}" ) return user return role_checker

def require_all_roles(*required_roles: str): """Dependency that checks user has ALL required roles.""" async def role_checker( user: TokenUser = Depends(get_current_user) ) -> TokenUser: missing = [r for r in required_roles if r not in user.roles] if missing: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing roles: {', '.join(missing)}" ) return user return role_checker

Usage in routes

@router.get("/admin/users") async def list_users(user: TokenUser = Depends(require_roles("admin", "user-manager"))): """Only admins or user-managers can access.""" return {"users": []}

@router.delete("/admin/system") async def system_action(user: TokenUser = Depends(require_all_roles("admin", "super-admin"))): """Requires BOTH admin AND super-admin roles.""" return {"status": "ok"}

Protected Routes

from fastapi import APIRouter, Depends

router = APIRouter(prefix="/api/v1", tags=["Protected"])

@router.get("/profile") async def get_profile(user: TokenUser = Depends(get_current_user)): """Get current user's profile.""" return { "sub": user.sub, "email": user.email, "name": user.name, "roles": user.roles }

@router.get("/public") async def public_endpoint(): """Public endpoint - no auth required.""" return {"message": "Public data"}

@router.get("/optional-auth") async def optional_auth(user: Optional[TokenUser] = Depends(get_current_user_optional)): """Returns different data based on auth status.""" if user: return {"message": f"Hello, {user.name}!", "authenticated": True} return {"message": "Hello, guest!", "authenticated": False}

Token Refresh Flow

import httpx from typing import Tuple

class KeycloakAuthService: def init(self, settings: KeycloakSettings): self.settings = settings

async def refresh_token(self, refresh_token: str) -> Tuple[str, str]:
    """Exchange refresh token for new access token."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            self.settings.token_url,
            data={
                "grant_type": "refresh_token",
                "client_id": self.settings.keycloak_client_id,
                "client_secret": self.settings.keycloak_client_secret,
                "refresh_token": refresh_token
            }
        )

        if response.status_code != 200:
            raise ValueError("Failed to refresh token")

        data = response.json()
        return data["access_token"], data["refresh_token"]

async def exchange_code(self, code: str, redirect_uri: str) -> dict:
    """Exchange authorization code for tokens."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            self.settings.token_url,
            data={
                "grant_type": "authorization_code",
                "client_id": self.settings.keycloak_client_id,
                "client_secret": self.settings.keycloak_client_secret,
                "code": code,
                "redirect_uri": redirect_uri
            }
        )

        if response.status_code != 200:
            raise ValueError("Failed to exchange code")

        return response.json()

Middleware for Token Refresh

from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response

class TokenRefreshMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response = await call_next(request)

    # Check if token is about to expire (from custom header)
    token_exp = request.state.get("token_exp")
    if token_exp and token_exp - time.time() < 300:  # 5 min
        # Token expires soon - add header to signal frontend
        response.headers["X-Token-Expiring"] = "true"

    return response

Additional Resources

Reference Files

For detailed configuration and advanced patterns:

  • references/keycloak-setup.md

  • Keycloak realm/client configuration

  • references/multi-tenant.md

  • Multi-tenant authentication patterns

  • references/testing.md

  • Testing authenticated endpoints

Example Files

Working examples in examples/ :

  • examples/auth_dependencies.py

  • Complete auth dependencies

  • examples/protected_router.py

  • Protected route examples

  • examples/keycloak_service.py

  • Full Keycloak service

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

vision-multimodal

No summary provided by upstream source.

Repository SourceNeeds Review
General

design-system

No summary provided by upstream source.

Repository SourceNeeds Review
General

kanban

No summary provided by upstream source.

Repository SourceNeeds Review