FastAPI Caching with Redis
This skill provides production-ready caching patterns using Redis for FastAPI applications.
Redis Client Setup
Connection Configuration
app/infrastructure/cache.py
import redis.asyncio as redis from typing import Optional, Any import json from app.config import get_settings
settings = get_settings()
class RedisCache: def init(self): self._pool: Optional[redis.ConnectionPool] = None self._client: Optional[redis.Redis] = None
async def connect(self):
self._pool = redis.ConnectionPool.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True,
max_connections=20
)
self._client = redis.Redis(connection_pool=self._pool)
async def disconnect(self):
if self._pool:
await self._pool.disconnect()
@property
def client(self) -> redis.Redis:
if not self._client:
raise RuntimeError("Redis not connected")
return self._client
async def get(self, key: str) -> Optional[Any]:
value = await self.client.get(key)
if value:
return json.loads(value)
return None
async def set(
self,
key: str,
value: Any,
expire: int = 3600
):
await self.client.setex(
key,
expire,
json.dumps(value, default=str)
)
async def delete(self, key: str):
await self.client.delete(key)
async def delete_pattern(self, pattern: str):
"""Delete all keys matching pattern."""
keys = []
async for key in self.client.scan_iter(match=pattern):
keys.append(key)
if keys:
await self.client.delete(*keys)
async def exists(self, key: str) -> bool:
return await self.client.exists(key) > 0
cache = RedisCache()
Lifespan Integration
from contextlib import asynccontextmanager from fastapi import FastAPI
@asynccontextmanager async def lifespan(app: FastAPI): await cache.connect() yield await cache.disconnect()
app = FastAPI(lifespan=lifespan)
Cache Decorator
app/core/cache.py
from functools import wraps from typing import Callable, Optional import hashlib import json
def cached( prefix: str, expire: int = 3600, key_builder: Optional[Callable] = None ): """ Cache decorator for async functions.
Args:
prefix: Cache key prefix
expire: TTL in seconds
key_builder: Custom function to build cache key
"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# Build cache key
if key_builder:
cache_key = f"{prefix}:{key_builder(*args, **kwargs)}"
else:
# Default: hash all arguments
key_data = json.dumps(
{"args": args[1:], "kwargs": kwargs},
sort_keys=True,
default=str
)
key_hash = hashlib.md5(key_data.encode()).hexdigest()
cache_key = f"{prefix}:{key_hash}"
# Try cache
cached_value = await cache.get(cache_key)
if cached_value is not None:
return cached_value
# Execute and cache
result = await func(*args, **kwargs)
await cache.set(cache_key, result, expire)
return result
return wrapper
return decorator
Usage Examples
class UserService: @cached(prefix="user", expire=300) async def get_by_id(self, user_id: str) -> Optional[dict]: user = await User.get(user_id) return user.model_dump() if user else None
@cached(
prefix="users_list",
expire=60,
key_builder=lambda self, skip, limit, **kw: f"{skip}:{limit}"
)
async def get_all(self, skip: int = 0, limit: int = 100) -> list:
users = await User.find_all().skip(skip).limit(limit).to_list()
return [u.model_dump() for u in users]
async def create(self, data: UserCreate) -> User:
user = await User(**data.model_dump()).insert()
# Invalidate list cache
await cache.delete_pattern("users_list:*")
return user
async def update(self, user_id: str, data: UserUpdate) -> User:
user = await self.get_by_id_uncached(user_id)
await user.set(data.model_dump(exclude_unset=True))
# Invalidate caches
await cache.delete(f"user:{user_id}")
await cache.delete_pattern("users_list:*")
return user
Response Caching Middleware
app/middleware/cache.py
from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response import hashlib
class ResponseCacheMiddleware(BaseHTTPMiddleware): def init(self, app, cache_ttl: int = 60, cache_methods: list = None): super().init(app) self.cache_ttl = cache_ttl self.cache_methods = cache_methods or ["GET"]
async def dispatch(self, request: Request, call_next):
# Only cache specified methods
if request.method not in self.cache_methods:
return await call_next(request)
# Skip if cache-control: no-cache
if request.headers.get("cache-control") == "no-cache":
return await call_next(request)
# Build cache key
cache_key = self._build_key(request)
# Try cache
cached = await cache.get(cache_key)
if cached:
return Response(
content=cached["body"],
status_code=cached["status"],
headers={**cached["headers"], "X-Cache": "HIT"}
)
# Execute request
response = await call_next(request)
# Cache successful responses
if 200 <= response.status_code < 300:
body = b""
async for chunk in response.body_iterator:
body += chunk
await cache.set(cache_key, {
"body": body.decode(),
"status": response.status_code,
"headers": dict(response.headers)
}, self.cache_ttl)
return Response(
content=body,
status_code=response.status_code,
headers={**response.headers, "X-Cache": "MISS"}
)
return response
def _build_key(self, request: Request) -> str:
key_data = f"{request.method}:{request.url.path}:{request.query_params}"
return f"response:{hashlib.md5(key_data.encode()).hexdigest()}"
Cache-Aside Pattern
class CachedUserRepository: def init(self, cache: RedisCache, db: Database): self.cache = cache self.db = db
async def get(self, user_id: str) -> Optional[User]:
# 1. Check cache
cache_key = f"user:{user_id}"
cached = await self.cache.get(cache_key)
if cached:
return User(**cached)
# 2. Query database
user = await self.db.users.find_one({"_id": user_id})
if not user:
return None
# 3. Store in cache
await self.cache.set(cache_key, user.model_dump(), expire=300)
return user
async def save(self, user: User) -> User:
# 1. Save to database
await self.db.users.update_one(
{"_id": user.id},
{"$set": user.model_dump()},
upsert=True
)
# 2. Invalidate cache
await self.cache.delete(f"user:{user.id}")
return user
Write-Through Pattern
class WriteThroughUserRepository: async def save(self, user: User) -> User: # 1. Write to database await self.db.users.update_one( {"_id": user.id}, {"$set": user.model_dump()}, upsert=True )
# 2. Update cache immediately
await self.cache.set(
f"user:{user.id}",
user.model_dump(),
expire=300
)
return user
Cache Invalidation Patterns
Event-based invalidation
class CacheInvalidator: def init(self, cache: RedisCache): self.cache = cache
async def on_user_updated(self, user_id: str):
await self.cache.delete(f"user:{user_id}")
await self.cache.delete_pattern("users_list:*")
await self.cache.delete_pattern(f"user_orders:{user_id}:*")
async def on_product_updated(self, product_id: str):
await self.cache.delete(f"product:{product_id}")
await self.cache.delete_pattern("products_list:*")
await self.cache.delete_pattern("category_products:*")
Additional Resources
Reference Files
For detailed patterns:
-
references/patterns.md
-
Cache patterns (write-behind, read-through)
-
references/distributed.md
-
Distributed caching, cache stampede
-
references/monitoring.md
-
Cache hit rates, memory usage
Example Files
Working examples in examples/ :
-
examples/cache_service.py
-
Complete cache service
-
examples/cached_repository.py
-
Repository with caching
-
examples/cache_middleware.py
-
Response caching middleware