Idempotency Patterns ()
Patterns for ensuring operations can be safely retried without unintended side effects.
Overview
-
Building payment or financial APIs
-
Implementing webhook handlers
-
Processing messages from queues
-
Creating mutation endpoints (POST, PUT, DELETE)
-
Building distributed systems with at-least-once delivery
Quick Reference
Idempotency Key Generation
import hashlib import json from typing import Any
def generate_idempotency_key( *, entity_id: str, action: str, params: dict[str, Any] | None = None, ) -> str: """ Generate deterministic idempotency key.
Args:
entity_id: Unique identifier of the entity
action: The action being performed
params: Optional parameters that affect the result
Returns:
32-character hex string
"""
content = f"{entity_id}:{action}"
if params:
# Sort keys for deterministic output
content += f":{json.dumps(params, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()[:32]
Examples
key1 = generate_idempotency_key( entity_id="order-123", action="create", params={"amount": 100, "currency": "USD"}, )
key2 = generate_idempotency_key( entity_id="payment-456", action="refund", )
FastAPI Idempotency Middleware
from fastapi import Request, Response, HTTPException from starlette.middleware.base import BaseHTTPMiddleware import redis.asyncio as redis import json
class IdempotencyMiddleware(BaseHTTPMiddleware): """Handle Idempotency-Key header for POST/PUT/PATCH."""
def __init__(self, app, redis_client: redis.Redis, ttl: int = 86400):
super().__init__(app)
self.redis = redis_client
self.ttl = ttl
async def dispatch(self, request: Request, call_next):
# Only apply to mutation methods
if request.method not in ("POST", "PUT", "PATCH"):
return await call_next(request)
# Check for idempotency key
idempotency_key = request.headers.get("Idempotency-Key")
if not idempotency_key:
return await call_next(request)
cache_key = f"idem:{request.url.path}:{idempotency_key}"
# Check for cached response
cached = await self.redis.get(cache_key)
if cached:
data = json.loads(cached)
return Response(
content=data["body"],
status_code=data["status"],
media_type="application/json",
headers={"X-Idempotent-Replayed": "true"},
)
# Process request
response = await call_next(request)
# Cache successful responses
if 200 <= response.status_code < 300:
body = b"".join([chunk async for chunk in response.body_iterator])
await self.redis.setex(
cache_key,
self.ttl,
json.dumps({
"body": body.decode(),
"status": response.status_code,
}),
)
return Response(
content=body,
status_code=response.status_code,
media_type=response.media_type,
)
return response
Database-Backed Idempotency
from sqlalchemy import Column, String, DateTime, Text from sqlalchemy.dialects.postgresql import JSONB, insert from datetime import UTC, datetime, timedelta
class ProcessedRequest(Base): """Track processed requests for idempotency.""" tablename = "processed_requests"
idempotency_key = Column(String(64), primary_key=True)
endpoint = Column(String(255), nullable=False)
status_code = Column(Integer, nullable=False)
response_body = Column(Text)
created_at = Column(DateTime, default=lambda: datetime.now(UTC))
expires_at = Column(DateTime)
async def idempotent_execute( db: AsyncSession, idempotency_key: str, endpoint: str, operation, ttl_hours: int = 24, ) -> tuple[Any, int, bool]: """ Execute operation idempotently.
Returns: (response, status_code, was_replayed)
"""
# Check for existing
existing = await db.get(ProcessedRequest, idempotency_key)
if existing and existing.expires_at > datetime.now(UTC):
return json.loads(existing.response_body), existing.status_code, True
# Execute operation
result, status_code = await operation()
# Store result (upsert to handle races)
stmt = insert(ProcessedRequest).values(
idempotency_key=idempotency_key,
endpoint=endpoint,
status_code=status_code,
response_body=json.dumps(result),
expires_at=datetime.now(UTC) + timedelta(hours=ttl_hours),
).on_conflict_do_nothing()
await db.execute(stmt)
return result, status_code, False
Event Consumer Idempotency
class IdempotentConsumer: """Process events exactly once using idempotency keys."""
def __init__(self, db: AsyncSession, redis: redis.Redis):
self.db = db
self.redis = redis
async def process(
self,
event: dict,
handler,
) -> tuple[Any, bool]:
"""
Process event idempotently.
Returns: (result, was_duplicate)
"""
idempotency_key = event.get("idempotency_key")
if not idempotency_key:
# No key = always process (risky)
return await handler(event), False
# Fast path: check Redis cache
cache_key = f"processed:{idempotency_key}"
if await self.redis.exists(cache_key):
return None, True
# Slow path: check database
existing = await self.db.execute(
select(ProcessedEvent)
.where(ProcessedEvent.idempotency_key == idempotency_key)
)
if existing.scalar_one_or_none():
# Backfill cache
await self.redis.setex(cache_key, 86400, "1")
return None, True
# Process with database lock to prevent races
try:
async with self.db.begin_nested():
# Insert first to claim the key
self.db.add(ProcessedEvent(idempotency_key=idempotency_key))
await self.db.flush()
# Then process
result = await handler(event)
# Cache for fast future lookups
await self.redis.setex(cache_key, 86400, "1")
return result, False
except IntegrityError:
# Another process claimed it
return None, True
Key Decisions
Aspect Recommendation Rationale
Key generation Deterministic hash Same input = same key always
Storage Redis + DB Redis for speed, DB for durability
TTL 24-72 hours Balance storage vs replay window
Lock strategy DB unique constraint Handles race conditions
Response caching Status 2xx only Don't cache errors
Anti-Patterns (FORBIDDEN)
NEVER use non-deterministic keys
def bad_key(): return str(uuid.uuid4()) # Different every time!
NEVER include timestamps in keys
def bad_key(event): return f"{event.id}:{datetime.now(UTC)}" # Timestamp varies!
NEVER check-then-act without locking
async def bad_process(key): if not await exists(key): # Race condition! await process() await mark_processed(key)
NEVER skip idempotency for financial operations
@router.post("/payments") async def create_payment(data: PaymentCreate): return await process_payment(data) # No idempotency!
NEVER cache error responses
if response.status_code >= 400: await cache_response(key, response) # WRONG - errors should retry
Related Skills
-
outbox-pattern
-
Reliable event publishing
-
message-queues
-
At-least-once message delivery
-
caching-strategies
-
Redis caching patterns
-
auth-patterns
-
API key management
Capability Details
key-generation
Keywords: idempotency key, hash, deterministic, deduplication key Solves:
-
How do I generate idempotency keys?
-
Deterministic key generation
-
Key format best practices
api-idempotency
Keywords: idempotency header, POST idempotent, retry safe, middleware Solves:
-
How do I make POST endpoints idempotent?
-
Implement Idempotency-Key header
-
Cache and replay responses
consumer-idempotency
Keywords: exactly-once, event deduplication, message idempotency Solves:
-
How do I process events exactly once?
-
Deduplicate queue messages
-
Handle at-least-once delivery