Redis State Management
A comprehensive skill for mastering Redis state management patterns in distributed systems. This skill covers caching strategies, session management, pub/sub messaging, distributed locks, data structures, and production-ready patterns using redis-py.
When to Use This Skill
Use this skill when:
-
Implementing high-performance caching layers for web applications
-
Managing user sessions in distributed environments
-
Building real-time messaging and event distribution systems
-
Coordinating distributed processes with locks and synchronization
-
Storing and querying structured data with Redis data structures
-
Optimizing application performance with Redis
-
Scaling applications horizontally with shared state
-
Implementing rate limiting, counters, and analytics
-
Building microservices with Redis as a communication layer
-
Managing temporary data with automatic expiration (TTL)
-
Implementing leaderboards, queues, and real-time features
Core Concepts
Redis Fundamentals
Redis (Remote Dictionary Server) is an in-memory data structure store used as:
-
Database: Persistent key-value storage
-
Cache: High-speed data layer
-
Message Broker: Pub/sub and stream messaging
-
Session Store: Distributed session management
Key Characteristics:
-
In-memory storage (microsecond latency)
-
Optional persistence (RDB snapshots, AOF logs)
-
Rich data structures beyond key-value
-
Atomic operations on complex data types
-
Built-in replication and clustering
-
Pub/sub messaging support
-
Lua scripting for complex operations
-
Pipelining for batch operations
Redis Data Structures
Redis provides multiple data types for different use cases:
Strings: Simple key-value pairs, binary safe
-
Use for: Cache values, counters, flags, JSON objects
-
Max size: 512 MB
-
Commands: SET, GET, INCR, APPEND
Hashes: Field-value maps (objects)
-
Use for: User profiles, configuration objects, small entities
-
Efficient for storing objects with multiple fields
-
Commands: HSET, HGET, HMGET, HINCRBY
Lists: Ordered collections (linked lists)
-
Use for: Queues, activity feeds, recent items
-
Operations at head/tail are O(1)
-
Commands: LPUSH, RPUSH, LPOP, RPOP, LRANGE
Sets: Unordered unique collections
-
Use for: Tags, unique visitors, relationships
-
Set operations: union, intersection, difference
-
Commands: SADD, SMEMBERS, SISMEMBER, SINTER
Sorted Sets: Ordered sets with scores
-
Use for: Leaderboards, time-series, priority queues
-
Range queries by score or rank
-
Commands: ZADD, ZRANGE, ZRANGEBYSCORE, ZRANK
Streams: Append-only logs with consumer groups
-
Use for: Event sourcing, activity logs, message queues
-
Built-in consumer group support
-
Commands: XADD, XREAD, XREADGROUP
Connection Management
Connection Pools: Redis connections are expensive to create. Always use connection pools:
import redis
Connection pool (recommended)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10) r = redis.Redis(connection_pool=pool)
Direct connection (avoid in production)
r = redis.Redis(host='localhost', port=6379, db=0)
Best Practices:
-
Use connection pools for all applications
-
Set appropriate max_connections based on workload
-
Enable decode_responses=True for string data
-
Configure socket_timeout and socket_keepalive
-
Handle connection errors with retries
Data Persistence
Redis offers two persistence mechanisms:
RDB (Redis Database): Point-in-time snapshots
-
Compact binary format
-
Fast restart times
-
Lower disk I/O
-
Potential data loss between snapshots
AOF (Append-Only File): Log of write operations
-
Better durability (fsync policies)
-
Larger files, slower restarts
-
Can be automatically rewritten/compacted
-
Minimal data loss potential
Hybrid Approach: RDB + AOF for best of both worlds
RESP 3 Protocol
Redis Serialization Protocol version 3 offers:
-
Client-side caching support
-
Better data type support
-
Push notifications
-
Performance improvements
import redis from redis.cache import CacheConfig
Enable RESP3 with client-side caching
r = redis.Redis(host='localhost', port=6379, protocol=3, cache_config=CacheConfig())
Caching Strategies
Cache-Aside (Lazy Loading)
Pattern: Application checks cache first, loads from database on miss
import redis import json from typing import Optional, Dict, Any
r = redis.Redis(decode_responses=True)
def get_user(user_id: int) -> Optional[Dict[str, Any]]: """Cache-aside pattern for user data.""" cache_key = f"user:{user_id}"
# Try cache first
cached_data = r.get(cache_key)
if cached_data:
return json.loads(cached_data)
# Cache miss - load from database
user_data = database.get_user(user_id) # Your DB query
if user_data:
# Store in cache with 1 hour TTL
r.setex(cache_key, 3600, json.dumps(user_data))
return user_data
Advantages:
-
Only requested data is cached (efficient memory usage)
-
Cache failures don't break the application
-
Simple to implement
Disadvantages:
-
Cache miss penalty (latency spike)
-
Thundering herd on popular items
-
Stale data until cache expiration
Write-Through Cache
Pattern: Write to cache and database simultaneously
def update_user(user_id: int, user_data: Dict[str, Any]) -> bool: """Write-through pattern for user updates.""" cache_key = f"user:{user_id}"
# Write to database first
success = database.update_user(user_id, user_data)
if success:
# Update cache immediately
r.setex(cache_key, 3600, json.dumps(user_data))
return success
Advantages:
-
Cache always consistent with database
-
No read penalty for recently written data
Disadvantages:
-
Write latency increases
-
Unused data may be cached
-
Extra cache write overhead
Write-Behind (Write-Back) Cache
Pattern: Write to cache immediately, sync to database asynchronously
import redis import json from queue import Queue from threading import Thread
r = redis.Redis(decode_responses=True) write_queue = Queue()
def async_writer(): """Background worker to sync cache to database.""" while True: user_id, user_data = write_queue.get() try: database.update_user(user_id, user_data) except Exception as e: # Log error, potentially retry print(f"Failed to write user {user_id}: {e}") finally: write_queue.task_done()
Start background writer
Thread(target=async_writer, daemon=True).start()
def update_user_fast(user_id: int, user_data: Dict[str, Any]): """Write-behind pattern for fast writes.""" cache_key = f"user:{user_id}"
# Write to cache immediately (fast)
r.setex(cache_key, 3600, json.dumps(user_data))
# Queue database write (async)
write_queue.put((user_id, user_data))
Advantages:
-
Minimal write latency
-
Can batch database writes
-
Handles write spikes
Disadvantages:
-
Risk of data loss if cache fails
-
Complex error handling
-
Consistency challenges
Cache Invalidation Strategies
Time-based Expiration (TTL):
Set key with expiration
r.setex("session:abc123", 1800, session_data) # 30 minutes
Or set TTL on existing key
r.expire("user:profile:123", 3600) # 1 hour
Check remaining TTL
ttl = r.ttl("user:profile:123")
Event-based Invalidation:
def update_product(product_id: int, product_data: dict): """Invalidate cache on update.""" # Update database database.update_product(product_id, product_data)
# Invalidate related caches
r.delete(f"product:{product_id}")
r.delete(f"product_list:category:{product_data['category']}")
r.delete("products:featured")
Pattern-based Invalidation:
Delete all keys matching pattern
def invalidate_user_cache(user_id: int): """Invalidate all cache entries for a user.""" pattern = f"user:{user_id}:*"
# Find and delete matching keys
for key in r.scan_iter(match=pattern, count=100):
r.delete(key)
Cache Stampede Prevention
Problem: Multiple requests simultaneously miss cache and query database
Solution 1: Probabilistic Early Expiration
import time import random
def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0): """Prevent stampede with probabilistic early recomputation.""" value = r.get(key)
if value is None:
# Cache miss - compute and cache
value = compute_value(key)
r.setex(key, ttl, value)
return value
# Check if we should recompute early
current_time = time.time()
delta = current_time - float(r.get(f"{key}:timestamp") or 0)
expiry = ttl * random.random() * beta
if delta > expiry:
# Recompute in background
value = compute_value(key)
r.setex(key, ttl, value)
r.set(f"{key}:timestamp", current_time)
return value
Solution 2: Locking
from contextlib import contextmanager
@contextmanager def cache_lock(key: str, timeout: int = 10): """Acquire lock for cache computation.""" lock_key = f"{key}:lock" identifier = str(time.time())
# Try to acquire lock
if r.set(lock_key, identifier, nx=True, ex=timeout):
try:
yield True
finally:
# Release lock
if r.get(lock_key) == identifier:
r.delete(lock_key)
else:
yield False
def get_with_lock(key: str): """Use lock to prevent stampede.""" value = r.get(key)
if value is None:
with cache_lock(key) as acquired:
if acquired:
# We got the lock - compute value
value = compute_value(key)
r.setex(key, 3600, value)
else:
# Someone else is computing - wait and retry
time.sleep(0.1)
value = r.get(key) or compute_value(key)
return value
Session Management
Distributed Session Storage
Basic Session Management:
import redis import json import uuid from datetime import datetime, timedelta
r = redis.Redis(decode_responses=True)
class SessionManager: def init(self, ttl: int = 1800): """Session manager with Redis backend.
Args:
ttl: Session timeout in seconds (default 30 minutes)
"""
self.ttl = ttl
def create_session(self, user_id: int, data: dict = None) -> str:
"""Create new session and return session ID."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"data": data or {}
}
r.setex(session_key, self.ttl, json.dumps(session_data))
return session_id
def get_session(self, session_id: str) -> dict:
"""Retrieve session data and refresh TTL."""
session_key = f"session:{session_id}"
session_data = r.get(session_key)
if session_data:
# Refresh TTL on access (sliding expiration)
r.expire(session_key, self.ttl)
return json.loads(session_data)
return None
def update_session(self, session_id: str, data: dict) -> bool:
"""Update session data."""
session_key = f"session:{session_id}"
session_data = self.get_session(session_id)
if session_data:
session_data["data"].update(data)
r.setex(session_key, self.ttl, json.dumps(session_data))
return True
return False
def delete_session(self, session_id: str) -> bool:
"""Delete session (logout)."""
session_key = f"session:{session_id}"
return r.delete(session_key) > 0
Session with Hash Storage
More efficient for session objects:
class HashSessionManager: """Session manager using Redis hashes for better performance."""
def __init__(self, ttl: int = 1800):
self.ttl = ttl
def create_session(self, user_id: int, **kwargs) -> str:
"""Create session using hash."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
# Store as hash for efficient field access
session_fields = {
"user_id": str(user_id),
"created_at": datetime.utcnow().isoformat(),
**{k: str(v) for k, v in kwargs.items()}
}
r.hset(session_key, mapping=session_fields)
r.expire(session_key, self.ttl)
return session_id
def get_field(self, session_id: str, field: str) -> str:
"""Get single session field efficiently."""
session_key = f"session:{session_id}"
value = r.hget(session_key, field)
if value:
r.expire(session_key, self.ttl) # Refresh TTL
return value
def set_field(self, session_id: str, field: str, value: str) -> bool:
"""Update single session field."""
session_key = f"session:{session_id}"
if r.exists(session_key):
r.hset(session_key, field, value)
r.expire(session_key, self.ttl)
return True
return False
def get_all(self, session_id: str) -> dict:
"""Get all session fields."""
session_key = f"session:{session_id}"
data = r.hgetall(session_key)
if data:
r.expire(session_key, self.ttl)
return data
User Activity Tracking
def track_user_activity(user_id: int, action: str): """Track user activity with automatic expiration.""" activity_key = f"user:{user_id}:activity" timestamp = datetime.utcnow().isoformat()
# Add activity to list
r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))
# Keep only last 100 activities
r.ltrim(activity_key, 0, 99)
# Set expiration (30 days)
r.expire(activity_key, 2592000)
def get_recent_activity(user_id: int, limit: int = 10) -> list: """Get recent user activities.""" activity_key = f"user:{user_id}:activity" activities = r.lrange(activity_key, 0, limit - 1)
return [json.loads(a) for a in activities]
Pub/Sub Patterns
Basic Publisher/Subscriber
Publisher:
import redis
r = redis.Redis(decode_responses=True)
def publish_event(channel: str, message: dict): """Publish event to channel.""" import json r.publish(channel, json.dumps(message))
Example usage
publish_event("notifications", { "type": "user_signup", "user_id": 12345, "timestamp": datetime.utcnow().isoformat() })
Subscriber:
import redis import json
def handle_message(message): """Process received message.""" data = json.loads(message['data']) print(f"Received: {data}")
Initialize pubsub
r = redis.Redis(decode_responses=True) p = r.pubsub()
Subscribe to channels
p.subscribe('notifications', 'alerts')
Listen for messages
for message in p.listen(): if message['type'] == 'message': handle_message(message)
Pattern-Based Subscriptions
Subscribe to multiple channels with patterns
p = r.pubsub() p.psubscribe('user:', 'notification:')
Get messages from pattern subscriptions
for message in p.listen(): if message['type'] == 'pmessage': channel = message['channel'] pattern = message['pattern'] data = message['data'] print(f"Pattern {pattern} matched {channel}: {data}")
Async Pub/Sub with Background Thread
import redis import time
r = redis.Redis(decode_responses=True) p = r.pubsub()
def message_handler(message): """Handle messages in background thread.""" print(f"Handler received: {message['data']}")
Subscribe with handler
p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})
Run in background thread
thread = p.run_in_thread(sleep_time=0.001)
Publish some messages
r.publish('notifications', 'Hello!') r.publish('alerts', 'Warning!')
time.sleep(1)
Stop background thread
thread.stop()
Async Pub/Sub with asyncio
import asyncio import redis.asyncio as redis
async def reader(channel: redis.client.PubSub): """Async message reader.""" while True: message = await channel.get_message(ignore_subscribe_messages=True, timeout=None) if message is not None: print(f"Received: {message}")
# Stop on specific message
if message["data"].decode() == "STOP":
break
async def pubsub_example(): """Async pub/sub example.""" r = await redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
# Subscribe to channels
await pubsub.subscribe("channel:1", "channel:2")
# Create reader task
reader_task = asyncio.create_task(reader(pubsub))
# Publish messages
await r.publish("channel:1", "Hello")
await r.publish("channel:2", "World")
await r.publish("channel:1", "STOP")
# Wait for reader to finish
await reader_task
await r.close()
Run async example
asyncio.run(pubsub_example())
Sharded Pub/Sub (Redis 7.0+)
from redis.cluster import RedisCluster, ClusterNode
Connect to cluster
rc = RedisCluster(startup_nodes=[ ClusterNode('localhost', 6379), ClusterNode('localhost', 6380) ])
Create sharded pubsub
p = rc.pubsub() p.ssubscribe('foo')
Get message from specific node
message = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))
Distributed Locks
Simple Lock Implementation
import redis import time import uuid
class RedisLock: """Simple distributed lock using Redis."""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""Acquire lock."""
end_time = time.time() + (timeout or self.timeout)
while True:
# Try to set lock with NX (only if not exists) and EX (expiration)
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
return True
if not blocking:
return False
if timeout and time.time() > end_time:
return False
# Wait before retry
time.sleep(0.01)
def release(self) -> bool:
"""Release lock only if we own it."""
# Use Lua script for atomic check-and-delete
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1
def __enter__(self):
"""Context manager support."""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager cleanup."""
self.release()
Usage example
r = redis.Redis() lock = RedisLock(r, "resource:123", timeout=5)
with lock: # Critical section - only one process at a time print("Processing resource 123") process_resource()
Advanced Lock with Auto-Renewal
import threading
class RenewableLock: """Distributed lock with automatic renewal."""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
self.renewal_thread = None
self.stop_renewal = threading.Event()
def _renew_lock(self):
"""Background task to renew lock."""
while not self.stop_renewal.is_set():
time.sleep(self.timeout / 3) # Renew at 1/3 of timeout
# Renew only if we still own the lock
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key,
self.identifier, self.timeout)
if result == 0:
# We lost the lock
self.stop_renewal.set()
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""Acquire lock and start auto-renewal."""
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
# Start renewal thread
self.stop_renewal.clear()
self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
self.renewal_thread.start()
return True
return False
def release(self) -> bool:
"""Release lock and stop renewal."""
self.stop_renewal.set()
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1
Redlock Algorithm (Multiple Redis Instances)
class Redlock: """Redlock algorithm for distributed locking across multiple Redis instances."""
def __init__(self, redis_instances: list):
"""
Args:
redis_instances: List of Redis client connections
"""
self.instances = redis_instances
self.quorum = len(redis_instances) // 2 + 1
def acquire(self, resource: str, ttl: int = 10000) -> tuple:
"""
Acquire lock across multiple Redis instances.
Returns:
(success: bool, lock_identifier: str)
"""
identifier = str(uuid.uuid4())
start_time = int(time.time() * 1000)
# Try to acquire lock on all instances
acquired = 0
for instance in self.instances:
try:
if instance.set(f"lock:{resource}", identifier,
nx=True, px=ttl):
acquired += 1
except Exception:
pass
# Calculate elapsed time
elapsed = int(time.time() * 1000) - start_time
validity_time = ttl - elapsed - 100 # drift compensation
# Check if we got quorum
if acquired >= self.quorum and validity_time > 0:
return True, identifier
else:
# Release locks if we didn't get quorum
self._release_all(resource, identifier)
return False, None
def _release_all(self, resource: str, identifier: str):
"""Release lock on all instances."""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
for instance in self.instances:
try:
instance.eval(lua_script, 1, f"lock:{resource}", identifier)
except Exception:
pass
Data Structures and Operations
Working with Hashes
User profile storage
def save_user_profile(user_id: int, profile: dict): """Save user profile as hash.""" key = f"user:profile:{user_id}" r.hset(key, mapping=profile) r.expire(key, 86400) # 24 hour TTL
def get_user_profile(user_id: int) -> dict: """Get complete user profile.""" key = f"user:profile:{user_id}" return r.hgetall(key)
def update_user_field(user_id: int, field: str, value: str): """Update single profile field.""" key = f"user:profile:{user_id}" r.hset(key, field, value)
Example usage
save_user_profile(123, { "username": "alice", "email": "alice@example.com", "age": "30" })
Atomic increment
r.hincrby("user:profile:123", "login_count", 1)
Working with Lists
Job queue implementation
def enqueue_job(queue_name: str, job_data: dict): """Add job to queue.""" key = f"queue:{queue_name}" r.rpush(key, json.dumps(job_data))
def dequeue_job(queue_name: str, timeout: int = 0) -> dict: """Get job from queue (blocking).""" key = f"queue:{queue_name}"
if timeout > 0:
# Blocking pop with timeout
result = r.blpop(key, timeout=timeout)
if result:
_, job_data = result
return json.loads(job_data)
else:
# Non-blocking pop
job_data = r.lpop(key)
if job_data:
return json.loads(job_data)
return None
Activity feed
def add_to_feed(user_id: int, activity: dict): """Add activity to user feed.""" key = f"feed:{user_id}" r.lpush(key, json.dumps(activity)) r.ltrim(key, 0, 99) # Keep only latest 100 items r.expire(key, 604800) # 7 days
def get_feed(user_id: int, start: int = 0, end: int = 19) -> list: """Get user feed with pagination.""" key = f"feed:{user_id}" items = r.lrange(key, start, end) return [json.loads(item) for item in items]
Working with Sets
Tags and relationships
def add_tags(item_id: int, tags: list): """Add tags to item.""" key = f"item:{item_id}:tags" r.sadd(key, *tags)
def get_tags(item_id: int) -> set: """Get all tags for item.""" key = f"item:{item_id}:tags" return r.smembers(key)
def find_items_with_all_tags(tags: list) -> set: """Find items having all specified tags.""" keys = [f"item:*:tags" for _ in tags] # This is simplified - in practice, you'd need to track item IDs differently return r.sinter(*keys)
Online users tracking
def user_online(user_id: int): """Mark user as online.""" r.sadd("users:online", user_id) r.expire(f"user:{user_id}:heartbeat", 60)
def user_offline(user_id: int): """Mark user as offline.""" r.srem("users:online", user_id)
def get_online_users() -> set: """Get all online users.""" return r.smembers("users:online")
def get_online_count() -> int: """Get count of online users.""" return r.scard("users:online")
Working with Sorted Sets
Leaderboard implementation
def update_score(leaderboard: str, user_id: int, score: float): """Update user score in leaderboard.""" key = f"leaderboard:{leaderboard}" r.zadd(key, {user_id: score})
def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list: """Get top players (descending order).""" key = f"leaderboard:{leaderboard}" # ZREVRANGE for descending order (highest scores first) return r.zrevrange(key, start, end, withscores=True)
def get_user_rank(leaderboard: str, user_id: int) -> int: """Get user's rank (0-indexed).""" key = f"leaderboard:{leaderboard}" # ZREVRANK for descending rank rank = r.zrevrank(key, user_id) return rank if rank is not None else -1
def get_user_score(leaderboard: str, user_id: int) -> float: """Get user's score.""" key = f"leaderboard:{leaderboard}" score = r.zscore(key, user_id) return score if score is not None else 0.0
def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list: """Get users within score range.""" key = f"leaderboard:{leaderboard}" return r.zrangebyscore(key, min_score, max_score, withscores=True)
Time-based sorted set (activity stream)
def add_activity(user_id: int, activity: str): """Add timestamped activity.""" key = f"user:{user_id}:activities" timestamp = time.time() r.zadd(key, {activity: timestamp})
# Keep only last 24 hours
cutoff = timestamp - 86400
r.zremrangebyscore(key, '-inf', cutoff)
def get_recent_activities(user_id: int, count: int = 10) -> list: """Get recent activities.""" key = f"user:{user_id}:activities" # Get most recent (highest timestamps) return r.zrevrange(key, 0, count - 1, withscores=True)
Working with Streams
Event stream
def add_event(stream_key: str, event_data: dict) -> str: """Add event to stream.""" # Returns auto-generated ID (timestamp-sequence) event_id = r.xadd(stream_key, event_data) return event_id
def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list: """Read events from stream.""" events = r.xread({stream_key: start_id}, count=count)
# events format: [(stream_name, [(id, data), (id, data), ...])]
if events:
_, event_list = events[0]
return event_list
return []
Consumer groups
def create_consumer_group(stream_key: str, group_name: str): """Create consumer group for stream.""" try: r.xgroup_create(name=stream_key, groupname=group_name, id='0') except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise
def read_from_group(stream_key: str, group_name: str, consumer_name: str, count: int = 10) -> list: """Read events as consumer in group.""" # Read new messages with '>' events = r.xreadgroup( groupname=group_name, consumername=consumer_name, streams={stream_key: '>'}, count=count, block=5000 # 5 second timeout )
if events:
_, event_list = events[0]
return event_list
return []
def acknowledge_event(stream_key: str, group_name: str, event_id: str): """Acknowledge processed event.""" r.xack(stream_key, group_name, event_id)
Example: Processing events with consumer group
def process_events(stream_key: str, group_name: str, consumer_name: str): """Process events from stream.""" create_consumer_group(stream_key, group_name)
while True:
events = read_from_group(stream_key, group_name, consumer_name, count=10)
for event_id, event_data in events:
try:
# Process event
process_event(event_data)
# Acknowledge successful processing
acknowledge_event(stream_key, group_name, event_id)
except Exception as e:
print(f"Failed to process event {event_id}: {e}")
# Event remains unacknowledged for retry
Performance Optimization
Pipelining for Batch Operations
Without pipelining (slow - multiple round trips)
for i in range(1000): r.set(f"key:{i}", f"value:{i}")
With pipelining (fast - single round trip)
pipe = r.pipeline() for i in range(1000): pipe.set(f"key:{i}", f"value:{i}") results = pipe.execute()
Pipelining with reads
pipe = r.pipeline() for i in range(100): pipe.get(f"key:{i}") values = pipe.execute()
Builder pattern with pipeline
class DataLoader: def init(self): self.pipeline = r.pipeline()
def add_user(self, user_id: int, user_data: dict):
"""Add user data."""
self.pipeline.hset(f"user:{user_id}", mapping=user_data)
return self
def add_to_set(self, set_name: str, value: str):
"""Add to set."""
self.pipeline.sadd(set_name, value)
return self
def execute(self):
"""Execute all pipelined commands."""
return self.pipeline.execute()
Usage
loader = DataLoader() results = (loader .add_user(1, {"name": "Alice", "email": "alice@example.com"}) .add_user(2, {"name": "Bob", "email": "bob@example.com"}) .add_to_set("active_users", "1") .add_to_set("active_users", "2") .execute())
Transactions with WATCH
Optimistic locking with WATCH
def transfer_credits(from_user: int, to_user: int, amount: int) -> bool: """Transfer credits between users with optimistic locking."""
with r.pipeline() as pipe:
while True:
try:
# Watch the keys we're going to modify
pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")
# Get current values
from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)
# Check if transfer is possible
if from_credits < amount:
pipe.unwatch()
return False
# Start transaction
pipe.multi()
pipe.set(f"user:{from_user}:credits", from_credits - amount)
pipe.set(f"user:{to_user}:credits", to_credits + amount)
# Execute transaction
pipe.execute()
return True
except redis.WatchError:
# Key was modified by another client - retry
continue
Lua scripts for atomic operations
increment_script = """ local current = redis.call('GET', KEYS[1]) if not current then current = 0 end local new_val = tonumber(current) + tonumber(ARGV[1]) redis.call('SET', KEYS[1], new_val) return new_val """
Register and use Lua script
increment = r.register_script(increment_script) new_value = increment(keys=['counter:views'], args=[1])
Lua Scripts for Complex Operations
Rate limiting with Lua
rate_limit_script = """ local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local current = redis.call('INCR', key)
if current == 1 then redis.call('EXPIRE', key, window) end
if current > limit then return 0 else return 1 end """
rate_limiter = r.register_script(rate_limit_script)
def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool: """Check if user is within rate limit.""" key = f"rate_limit:{user_id}" result = rate_limiter(keys=[key], args=[limit, window]) return result == 1
Get-or-set pattern with Lua
get_or_set_script = """ local value = redis.call('GET', KEYS[1]) if value then return value else redis.call('SET', KEYS[1], ARGV[1]) redis.call('EXPIRE', KEYS[1], ARGV[2]) return ARGV[1] end """
get_or_set = r.register_script(get_or_set_script)
def get_or_compute(key: str, compute_fn, ttl: int = 3600): """Get value from cache or compute and cache it.""" value = get_or_set(keys=[key], args=["COMPUTING", ttl])
if value == "__COMPUTING__":
# We set the placeholder - compute the real value
computed = compute_fn()
r.setex(key, ttl, computed)
return computed
return value
Production Patterns
High Availability with Sentinel
from redis.sentinel import Sentinel
Connect to Sentinel
sentinel = Sentinel([ ('sentinel1', 26379), ('sentinel2', 26379), ('sentinel3', 26379) ], socket_timeout=0.5)
Get master connection
master = sentinel.master_for('mymaster', socket_timeout=0.5)
Get replica connection (for read-only operations)
replica = sentinel.slave_for('mymaster', socket_timeout=0.5)
Use master for writes
master.set('key', 'value')
Use replica for reads (optional, for load distribution)
value = replica.get('key')
Async Redis with asyncio
import asyncio import redis.asyncio as redis
async def async_redis_operations(): """Async Redis operations example.""" # Create async connection r = await redis.from_url("redis://localhost")
try:
# Async operations
await r.set("async_key", "async_value")
value = await r.get("async_key")
print(f"Value: {value}")
# Async pipeline
async with r.pipeline(transaction=True) as pipe:
await pipe.set("key1", "value1")
await pipe.set("key2", "value2")
await pipe.get("key1")
results = await pipe.execute()
print(f"Pipeline results: {results}")
finally:
await r.close()
Run async operations
asyncio.run(async_redis_operations())
Connection Pool Configuration
Production-ready connection pool
pool = redis.ConnectionPool( host='localhost', port=6379, db=0, max_connections=50, # Max pool size socket_timeout=5, # Socket timeout socket_connect_timeout=5, # Connection timeout socket_keepalive=True, # Keep TCP connection alive socket_keepalive_options={ socket.TCP_KEEPIDLE: 60, socket.TCP_KEEPINTVL: 10, socket.TCP_KEEPCNT: 3 }, retry_on_timeout=True, # Retry on timeout health_check_interval=30, # Health check every 30s decode_responses=True # Auto-decode bytes to strings )
r = redis.Redis(connection_pool=pool)
Error Handling and Resilience
import redis from redis.exceptions import ConnectionError, TimeoutError import time
class ResilientRedisClient: """Redis client with retry logic and circuit breaker."""
def __init__(self, max_retries: int = 3, backoff: float = 0.1):
self.redis = redis.Redis(
host='localhost',
port=6379,
socket_timeout=5,
retry_on_timeout=True
)
self.max_retries = max_retries
self.backoff = backoff
def get_with_retry(self, key: str, default=None):
"""Get value with exponential backoff retry."""
for attempt in range(self.max_retries):
try:
return self.redis.get(key) or default
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
# Log error and return default
print(f"Redis error after {self.max_retries} attempts: {e}")
return default
# Exponential backoff
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)
def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
"""Set value with retry logic."""
for attempt in range(self.max_retries):
try:
if ttl:
return self.redis.setex(key, ttl, value)
else:
return self.redis.set(key, value)
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
print(f"Redis error after {self.max_retries} attempts: {e}")
return False
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)
Monitoring and Metrics
def get_redis_info(section: str = None) -> dict: """Get Redis server information.""" return r.info(section=section)
def monitor_memory_usage(): """Monitor Redis memory usage.""" info = r.info('memory')
used_memory = info['used_memory_human']
peak_memory = info['used_memory_peak_human']
memory_fragmentation = info['mem_fragmentation_ratio']
print(f"Used Memory: {used_memory}")
print(f"Peak Memory: {peak_memory}")
print(f"Fragmentation Ratio: {memory_fragmentation}")
return info
def monitor_stats(): """Monitor Redis statistics.""" info = r.info('stats')
total_connections = info['total_connections_received']
total_commands = info['total_commands_processed']
ops_per_sec = info['instantaneous_ops_per_sec']
print(f"Total Connections: {total_connections}")
print(f"Total Commands: {total_commands}")
print(f"Ops/sec: {ops_per_sec}")
return info
def get_slow_log(count: int = 10): """Get slow query log.""" slow_log = r.slowlog_get(count)
for entry in slow_log:
print(f"Command: {entry['command']}")
print(f"Duration: {entry['duration']} microseconds")
print(f"Time: {entry['start_time']}")
print("---")
return slow_log
Best Practices
Key Naming Conventions
Use consistent, hierarchical naming:
Good naming patterns
user:123:profile # User profile data user:123:sessions:abc # User session cache:product:456 # Cached product queue:emails:pending # Email queue lock:resource:789 # Resource lock counter:api:requests:daily # Daily API request counter leaderboard:global:score # Global leaderboard
Avoid
u123 # Too cryptic user_profile_123 # Underscores less common 123:user # Wrong hierarchy
Memory Management
Set TTL on all temporary data
r.setex("temp:data", 3600, value) # Expires in 1 hour
Limit collection sizes
r.lpush("activity_log", entry) r.ltrim("activity_log", 0, 999) # Keep only 1000 items
Use appropriate data structures
Hash is more memory-efficient than multiple keys
r.hset("user:123", mapping={"name": "Alice", "email": "alice@example.com"})
vs
r.set("user:123:name", "Alice") r.set("user:123:email", "alice@example.com")
Monitor memory usage
if r.info('memory')['used_memory'] > threshold: # Implement eviction or cleanup cleanup_old_data()
Security
Use authentication
r = redis.Redis( host='localhost', port=6379, password='your-secure-password', username='your-username' # Redis 6+ )
Use SSL/TLS for production
pool = redis.ConnectionPool( host='redis.example.com', port=6380, connection_class=redis.SSLConnection, ssl_cert_reqs='required', ssl_ca_certs='/path/to/ca-cert.pem' )
Credential provider pattern
from redis import UsernamePasswordCredentialProvider
creds_provider = UsernamePasswordCredentialProvider("username", "password") r = redis.Redis( host="localhost", port=6379, credential_provider=creds_provider )
Testing
import fakeredis import pytest
@pytest.fixture def redis_client(): """Provide fake Redis client for testing.""" return fakeredis.FakeRedis(decode_responses=True)
def test_caching(redis_client): """Test caching logic.""" # Test cache miss assert redis_client.get("test_key") is None
# Test cache set
redis_client.setex("test_key", 60, "test_value")
assert redis_client.get("test_key") == "test_value"
# Test expiration
assert redis_client.ttl("test_key") <= 60
def test_session_management(redis_client): """Test session operations.""" session_manager = SessionManager(redis_client)
# Create session
session_id = session_manager.create_session(user_id=123)
assert session_id is not None
# Get session
session = session_manager.get_session(session_id)
assert session['user_id'] == 123
# Delete session
assert session_manager.delete_session(session_id) is True
assert session_manager.get_session(session_id) is None
Examples
Example 1: User Session Management with Redis
import redis import json import uuid from datetime import datetime, timedelta
class UserSessionManager: """Complete user session management with Redis."""
def __init__(self, redis_client: redis.Redis, ttl: int = 1800):
self.redis = redis_client
self.ttl = ttl
def create_session(self, user_id: int, user_data: dict = None) -> str:
"""Create new user session."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"last_accessed": datetime.utcnow().isoformat(),
"data": user_data or {}
}
# Store session with TTL
self.redis.setex(session_key, self.ttl, json.dumps(session_data))
# Track user's active sessions
self.redis.sadd(f"user:{user_id}:sessions", session_id)
return session_id
def get_session(self, session_id: str) -> dict:
"""Get session and refresh TTL."""
session_key = f"session:{session_id}"
session_data = self.redis.get(session_key)
if session_data:
session = json.loads(session_data)
session['last_accessed'] = datetime.utcnow().isoformat()
# Refresh TTL
self.redis.setex(session_key, self.ttl, json.dumps(session))
return session
return None
def delete_session(self, session_id: str) -> bool:
"""Delete session."""
session = self.get_session(session_id)
if not session:
return False
user_id = session['user_id']
# Remove session
self.redis.delete(f"session:{session_id}")
# Remove from user's session set
self.redis.srem(f"user:{user_id}:sessions", session_id)
return True
def delete_all_user_sessions(self, user_id: int):
"""Delete all sessions for a user."""
sessions_key = f"user:{user_id}:sessions"
session_ids = self.redis.smembers(sessions_key)
for session_id in session_ids:
self.redis.delete(f"session:{session_id}")
self.redis.delete(sessions_key)
def get_user_sessions(self, user_id: int) -> list:
"""Get all active sessions for a user."""
sessions_key = f"user:{user_id}:sessions"
session_ids = self.redis.smembers(sessions_key)
sessions = []
for session_id in session_ids:
session = self.get_session(session_id)
if session:
session['session_id'] = session_id
sessions.append(session)
return sessions
Usage
r = redis.Redis(decode_responses=True) session_mgr = UserSessionManager(r)
Create session
session_id = session_mgr.create_session( user_id=123, user_data={"role": "admin", "permissions": ["read", "write"]} )
Get session
session = session_mgr.get_session(session_id) print(f"User ID: {session['user_id']}")
List all user sessions
sessions = session_mgr.get_user_sessions(123) print(f"Active sessions: {len(sessions)}")
Logout (delete session)
session_mgr.delete_session(session_id)
Example 2: Real-Time Leaderboard
import redis import time
class Leaderboard: """Real-time leaderboard using Redis sorted sets."""
def __init__(self, redis_client: redis.Redis, name: str):
self.redis = redis_client
self.key = f"leaderboard:{name}"
def add_score(self, player_id: str, score: float):
"""Add or update player score."""
self.redis.zadd(self.key, {player_id: score})
def increment_score(self, player_id: str, increment: float):
"""Increment player score."""
self.redis.zincrby(self.key, increment, player_id)
def get_top(self, count: int = 10) -> list:
"""Get top players."""
# ZREVRANGE for highest scores first
players = self.redis.zrevrange(self.key, 0, count - 1, withscores=True)
return [
{
"rank": idx + 1,
"player_id": player_id,
"score": score
}
for idx, (player_id, score) in enumerate(players)
]
def get_rank(self, player_id: str) -> dict:
"""Get player rank and score."""
score = self.redis.zscore(self.key, player_id)
if score is None:
return None
# ZREVRANK for rank (0-indexed, highest first)
rank = self.redis.zrevrank(self.key, player_id)
return {
"player_id": player_id,
"rank": rank + 1 if rank is not None else None,
"score": score
}
def get_around(self, player_id: str, count: int = 5) -> list:
"""Get players around a specific player."""
rank = self.redis.zrevrank(self.key, player_id)
if rank is None:
return []
# Get players before and after
start = max(0, rank - count)
end = rank + count
players = self.redis.zrevrange(self.key, start, end, withscores=True)
return [
{
"rank": start + idx + 1,
"player_id": pid,
"score": score,
"is_current": pid == player_id
}
for idx, (pid, score) in enumerate(players)
]
def get_total_players(self) -> int:
"""Get total number of players."""
return self.redis.zcard(self.key)
def remove_player(self, player_id: str) -> bool:
"""Remove player from leaderboard."""
return self.redis.zrem(self.key, player_id) > 0
Usage
r = redis.Redis(decode_responses=True) leaderboard = Leaderboard(r, "global")
Add scores
leaderboard.add_score("alice", 1500) leaderboard.add_score("bob", 2000) leaderboard.add_score("charlie", 1800) leaderboard.increment_score("alice", 200) # alice now at 1700
Get top 10
top_players = leaderboard.get_top(10) for player in top_players: print(f"#{player['rank']}: {player['player_id']} - {player['score']}")
Get player rank
alice_stats = leaderboard.get_rank("alice") print(f"Alice is rank {alice_stats['rank']} with {alice_stats['score']} points")
Get players around alice
nearby = leaderboard.get_around("alice", count=2) for player in nearby: marker = " <-- YOU" if player['is_current'] else "" print(f"#{player['rank']}: {player['player_id']} - {player['score']}{marker}")
Example 3: Distributed Rate Limiter
import redis import time
class RateLimiter: """Distributed rate limiter using Redis."""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# Lua script for atomic rate limiting
self.rate_limit_script = self.redis.register_script("""
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return {0, limit, current - 1}
else
return {1, limit, current}
end
""")
def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
"""
Check if request is within rate limit.
Args:
identifier: User ID, IP address, or API key
limit: Maximum requests allowed
window: Time window in seconds
Returns:
dict with allowed (bool), limit, current, remaining
"""
key = f"rate_limit:{identifier}:{int(time.time() // window)}"
allowed, max_limit, current = self.rate_limit_script(
keys=[key],
args=[limit, window]
)
return {
"allowed": bool(allowed),
"limit": max_limit,
"current": current,
"remaining": max(0, max_limit - current),
"reset_at": (int(time.time() // window) + 1) * window
}
def sliding_window_check(self, identifier: str, limit: int, window: int) -> dict:
"""
Sliding window rate limiter using sorted sets.
More accurate but slightly more expensive.
"""
key = f"rate_limit:sliding:{identifier}"
now = time.time()
window_start = now - window
# Remove old entries
self.redis.zremrangebyscore(key, 0, window_start)
# Count current requests
current = self.redis.zcard(key)
if current < limit:
# Add new request
self.redis.zadd(key, {str(now): now})
self.redis.expire(key, window)
return {
"allowed": True,
"limit": limit,
"current": current + 1,
"remaining": limit - current - 1
}
else:
return {
"allowed": False,
"limit": limit,
"current": current,
"remaining": 0
}
Usage
r = redis.Redis(decode_responses=True) limiter = RateLimiter(r)
API rate limiting: 100 requests per minute
user_id = "user_123" result = limiter.check_rate_limit(user_id, limit=100, window=60)
if result["allowed"]: print(f"Request allowed. {result['remaining']} requests remaining.") # Process request else: print(f"Rate limit exceeded. Try again at {result['reset_at']}") # Return 429 Too Many Requests
More accurate sliding window
result = limiter.sliding_window_check(user_id, limit=100, window=60)
Example 4: Distributed Job Queue
import redis import json import time import uuid from typing import Optional, Callable
class JobQueue: """Distributed job queue with Redis."""
def __init__(self, redis_client: redis.Redis, queue_name: str = "default"):
self.redis = redis_client
self.queue_name = queue_name
self.queue_key = f"queue:{queue_name}"
self.processing_key = f"queue:{queue_name}:processing"
def enqueue(self, job_type: str, payload: dict, priority: int = 0) -> str:
"""
Add job to queue.
Args:
job_type: Type of job (for routing to workers)
payload: Job data
priority: Higher priority = processed first (0 = normal)
Returns:
job_id
"""
job_id = str(uuid.uuid4())
job_data = {
"id": job_id,
"type": job_type,
"payload": payload,
"enqueued_at": time.time(),
"attempts": 0
}
# Add to queue (use ZADD for priority queue)
score = -priority # Negative for higher priority first
self.redis.zadd(self.queue_key, {json.dumps(job_data): score})
return job_id
def dequeue(self, timeout: int = 0) -> Optional[dict]:
"""
Get next job from queue.
Args:
timeout: Block for this many seconds (0 = no blocking)
Returns:
Job data or None
"""
# Get highest priority job (lowest score)
jobs = self.redis.zrange(self.queue_key, 0, 0)
if not jobs:
if timeout > 0:
time.sleep(min(timeout, 1))
return self.dequeue(timeout - 1)
return None
job_json = jobs[0]
# Move to processing set atomically
pipe = self.redis.pipeline()
pipe.zrem(self.queue_key, job_json)
pipe.zadd(self.processing_key, {job_json: time.time()})
pipe.execute()
job_data = json.loads(job_json)
job_data['attempts'] += 1
return job_data
def complete(self, job_data: dict):
"""Mark job as completed."""
job_json = json.dumps({
k: v for k, v in job_data.items()
if k != 'attempts'
})
# Remove from processing
self.redis.zrem(self.processing_key, job_json)
def retry(self, job_data: dict, delay: int = 0):
"""Retry failed job."""
job_json = json.dumps({
k: v for k, v in job_data.items()
if k != 'attempts'
})
# Remove from processing
self.redis.zrem(self.processing_key, job_json)
# Re-enqueue with delay
if delay > 0:
time.sleep(delay)
self.redis.zadd(self.queue_key, {job_json: 0})
def get_stats(self) -> dict:
"""Get queue statistics."""
return {
"queued": self.redis.zcard(self.queue_key),
"processing": self.redis.zcard(self.processing_key)
}
Worker example
class Worker: """Job worker."""
def __init__(self, queue: JobQueue, handlers: dict):
self.queue = queue
self.handlers = handlers
def process_jobs(self):
"""Process jobs from queue."""
print("Worker started. Waiting for jobs...")
while True:
job = self.queue.dequeue(timeout=5)
if job:
print(f"Processing job {job['id']} (type: {job['type']})")
try:
# Get handler for job type
handler = self.handlers.get(job['type'])
if handler:
handler(job['payload'])
self.queue.complete(job)
print(f"Job {job['id']} completed")
else:
print(f"No handler for job type: {job['type']}")
self.queue.complete(job)
except Exception as e:
print(f"Job {job['id']} failed: {e}")
if job['attempts'] < 3:
# Retry with exponential backoff
delay = 2 ** job['attempts']
print(f"Retrying in {delay}s...")
self.queue.retry(job, delay=delay)
else:
print(f"Job {job['id']} failed permanently")
self.queue.complete(job)
Usage
r = redis.Redis(decode_responses=True) queue = JobQueue(r, "email_queue")
Enqueue jobs
job_id = queue.enqueue("send_email", { "to": "user@example.com", "subject": "Welcome!", "body": "Thanks for signing up." }, priority=1)
Define handlers
def send_email_handler(payload): print(f"Sending email to {payload['to']}") # Email sending logic here time.sleep(1) # Simulate work
handlers = { "send_email": send_email_handler }
Start worker
worker = Worker(queue, handlers)
worker.process_jobs() # This blocks - run in separate process
Example 5: Real-Time Event Streaming
import redis import json import time from typing import Callable, Optional
class EventStream: """Real-time event streaming with Redis Streams."""
def __init__(self, redis_client: redis.Redis, stream_name: str):
self.redis = redis_client
self.stream_name = stream_name
def publish(self, event_type: str, data: dict) -> str:
"""Publish event to stream."""
event = {
"type": event_type,
"data": json.dumps(data),
"timestamp": time.time()
}
# Add to stream (returns auto-generated ID)
event_id = self.redis.xadd(self.stream_name, event, maxlen=10000)
return event_id
def read_events(self, last_id: str = '0', count: int = 10) -> list:
"""Read events from stream."""
events = self.redis.xread(
{self.stream_name: last_id},
count=count,
block=1000 # 1 second timeout
)
if not events:
return []
_, event_list = events[0]
return [
{
"id": event_id,
"type": event_data[b'type'].decode(),
"data": json.loads(event_data[b'data'].decode()),
"timestamp": float(event_data[b'timestamp'])
}
for event_id, event_data in event_list
]
def create_consumer_group(self, group_name: str):
"""Create consumer group for parallel processing."""
try:
self.redis.xgroup_create(
name=self.stream_name,
groupname=group_name,
id='0',
mkstream=True
)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def consume_events(self, group_name: str, consumer_name: str,
count: int = 10) -> list:
"""Consume events as part of consumer group."""
events = self.redis.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={self.stream_name: '>'},
count=count,
block=5000
)
if not events:
return []
_, event_list = events[0]
return [
{
"id": event_id,
"type": event_data[b'type'].decode(),
"data": json.loads(event_data[b'data'].decode()),
"timestamp": float(event_data[b'timestamp'])
}
for event_id, event_data in event_list
]
def acknowledge(self, group_name: str, event_id: str):
"""Acknowledge processed event."""
self.redis.xack(self.stream_name, group_name, event_id)
def get_pending(self, group_name: str) -> list:
"""Get pending (unacknowledged) events."""
pending = self.redis.xpending_range(
name=self.stream_name,
groupname=group_name,
min='-',
max='+',
count=100
)
return pending
Usage Example: Activity Feed
r = redis.Redis() activity_stream = EventStream(r, "user_activity")
Publish events
activity_stream.publish("user_signup", { "user_id": 123, "email": "alice@example.com" })
activity_stream.publish("post_created", { "user_id": 123, "post_id": 456, "title": "My First Post" })
Read events (simple consumer)
last_id = '0' while True: events = activity_stream.read_events(last_id, count=10)
for event in events:
print(f"Event: {event['type']}")
print(f"Data: {event['data']}")
last_id = event['id']
if not events:
break
Consumer group example
activity_stream.create_consumer_group("processors")
Worker consuming events
while True: events = activity_stream.consume_events( group_name="processors", consumer_name="worker-1", count=10 )
for event in events:
try:
# Process event
process_event(event)
# Acknowledge
activity_stream.acknowledge("processors", event['id'])
except Exception as e:
print(f"Failed to process event {event['id']}: {e}")
# Event remains unacknowledged for retry
Example 6: Cache-Aside Pattern with Multi-Level Caching
import redis import json import hashlib from typing import Optional, Any, Callable
class MultiLevelCache: """Multi-level caching with Redis and local cache."""
def __init__(self, redis_client: redis.Redis,
local_cache_size: int = 100,
local_ttl: int = 60,
redis_ttl: int = 3600):
self.redis = redis_client
self.local_cache = {}
self.local_cache_size = local_cache_size
self.local_ttl = local_ttl
self.redis_ttl = redis_ttl
def _make_key(self, namespace: str, key: str) -> str:
"""Generate cache key."""
return f"cache:{namespace}:{key}"
def get(self, namespace: str, key: str,
compute_fn: Optional[Callable] = None) -> Optional[Any]:
"""
Get value from cache with fallback to compute function.
Lookup order: Local cache → Redis → Compute function
"""
cache_key = self._make_key(namespace, key)
# Level 1: Local cache
if cache_key in self.local_cache:
entry = self.local_cache[cache_key]
if time.time() < entry['expires_at']:
return entry['value']
else:
del self.local_cache[cache_key]
# Level 2: Redis cache
redis_value = self.redis.get(cache_key)
if redis_value:
value = json.loads(redis_value)
# Populate local cache
self._set_local(cache_key, value)
return value
# Level 3: Compute function
if compute_fn:
value = compute_fn()
if value is not None:
self.set(namespace, key, value)
return value
return None
def set(self, namespace: str, key: str, value: Any):
"""Set value in both cache levels."""
cache_key = self._make_key(namespace, key)
serialized = json.dumps(value)
# Set in Redis
self.redis.setex(cache_key, self.redis_ttl, serialized)
# Set in local cache
self._set_local(cache_key, value)
def _set_local(self, key: str, value: Any):
"""Set value in local cache with LRU eviction."""
# Simple LRU: remove oldest if at capacity
if len(self.local_cache) >= self.local_cache_size:
# Remove oldest entry
oldest_key = min(
self.local_cache.keys(),
key=lambda k: self.local_cache[k]['expires_at']
)
del self.local_cache[oldest_key]
self.local_cache[key] = {
'value': value,
'expires_at': time.time() + self.local_ttl
}
def delete(self, namespace: str, key: str):
"""Delete from all cache levels."""
cache_key = self._make_key(namespace, key)
# Delete from Redis
self.redis.delete(cache_key)
# Delete from local cache
if cache_key in self.local_cache:
del self.local_cache[cache_key]
def invalidate_namespace(self, namespace: str):
"""Invalidate all keys in namespace."""
pattern = f"cache:{namespace}:*"
# Delete from Redis
for key in self.redis.scan_iter(match=pattern, count=100):
self.redis.delete(key)
# Delete from local cache
to_delete = [
k for k in self.local_cache.keys()
if k.startswith(f"cache:{namespace}:")
]
for k in to_delete:
del self.local_cache[k]
Usage
r = redis.Redis(decode_responses=True) cache = MultiLevelCache(r)
def get_user(user_id: int) -> dict: """Get user with multi-level caching.""" return cache.get( namespace="users", key=str(user_id), compute_fn=lambda: database.query_user(user_id) )
First call: Queries database, caches result
user = get_user(123)
Second call: Returns from local cache (fastest)
user = get_user(123)
Update user
def update_user(user_id: int, data: dict): database.update_user(user_id, data)
# Invalidate cache
cache.delete("users", str(user_id))
Invalidate all user caches
cache.invalidate_namespace("users")
Example 7: Geo-Location with Redis
import redis
class GeoLocation: """Geo-spatial indexing and queries with Redis."""
def __init__(self, redis_client: redis.Redis, index_name: str):
self.redis = redis_client
self.key = f"geo:{index_name}"
def add_location(self, location_id: str, longitude: float, latitude: float):
"""Add location to geo index."""
self.redis.geoadd(self.key, longitude, latitude, location_id)
def add_locations(self, locations: list):
"""Batch add locations.
Args:
locations: List of (location_id, longitude, latitude) tuples
"""
self.redis.geoadd(self.key, *[
item for loc in locations
for item in (loc[1], loc[2], loc[0])
])
def get_position(self, location_id: str) -> tuple:
"""Get coordinates of a location."""
result = self.redis.geopos(self.key, location_id)
if result and result[0]:
return result[0] # (longitude, latitude)
return None
def find_nearby(self, longitude: float, latitude: float,
radius: float, unit: str = 'km', count: int = None) -> list:
"""
Find locations within radius.
Args:
longitude: Center longitude
latitude: Center latitude
radius: Search radius
unit: Distance unit ('m', 'km', 'mi', 'ft')
count: Maximum results
"""
args = {
'longitude': longitude,
'latitude': latitude,
'radius': radius,
'unit': unit,
'withdist': True,
'withcoord': True,
'sort': 'ASC'
}
if count:
args['count'] = count
results = self.redis.georadius(self.key, **args)
return [
{
'location_id': location_id,
'distance': distance,
'coordinates': (longitude, latitude)
}
for location_id, distance, (longitude, latitude) in results
]
def find_nearby_member(self, location_id: str, radius: float,
unit: str = 'km', count: int = None) -> list:
"""Find locations near an existing member."""
args = {
'member': location_id,
'radius': radius,
'unit': unit,
'withdist': True,
'sort': 'ASC'
}
if count:
args['count'] = count
results = self.redis.georadiusbymember(self.key, **args)
return [
{
'location_id': loc_id,
'distance': distance
}
for loc_id, distance in results
if loc_id != location_id # Exclude self
]
def distance_between(self, location_id1: str, location_id2: str,
unit: str = 'km') -> float:
"""Calculate distance between two locations."""
return self.redis.geodist(self.key, location_id1, location_id2, unit)
Usage Example: Restaurant finder
r = redis.Redis(decode_responses=True) restaurants = GeoLocation(r, "restaurants")
Add restaurants
restaurants.add_locations([ ("rest1", -122.4194, 37.7749), # San Francisco ("rest2", -122.4068, 37.7849), ("rest3", -122.4312, 37.7652), ])
Find restaurants near coordinates
nearby = restaurants.find_nearby( longitude=-122.4194, latitude=37.7749, radius=5, unit='km', count=10 )
for restaurant in nearby: print(f"{restaurant['location_id']}: {restaurant['distance']:.2f} km away")
Find restaurants near a specific restaurant
similar = restaurants.find_nearby_member("rest1", radius=2, unit='km')
Get distance between two restaurants
distance = restaurants.distance_between("rest1", "rest2", unit='km') print(f"Distance: {distance:.2f} km")
Quick Reference
Common Operations
Connection
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
Strings
r.set('key', 'value') r.setex('key', 3600, 'value') # With TTL r.get('key') r.incr('counter')
Hashes
r.hset('user:123', 'name', 'Alice') r.hset('user:123', mapping={'name': 'Alice', 'age': 30}) r.hget('user:123', 'name') r.hgetall('user:123')
Lists
r.lpush('queue', 'item') r.rpush('queue', 'item') r.lpop('queue') r.lrange('queue', 0, -1)
Sets
r.sadd('tags', 'python', 'redis') r.smembers('tags') r.sismember('tags', 'python')
Sorted Sets
r.zadd('leaderboard', {'alice': 100, 'bob': 200}) r.zrange('leaderboard', 0, -1, withscores=True) r.zrank('leaderboard', 'alice')
Expiration
r.expire('key', 3600) r.ttl('key')
Pipelining
pipe = r.pipeline() pipe.set('key1', 'value1') pipe.set('key2', 'value2') results = pipe.execute()
Time Complexity
-
GET, SET: O(1)
-
HGET, HSET: O(1)
-
LPUSH, RPUSH, LPOP, RPOP: O(1)
-
SADD, SREM, SISMEMBER: O(1)
-
ZADD, ZREM: O(log(N))
-
ZRANGE, ZREVRANGE: O(log(N)+M) where M is result size
-
SCAN, SSCAN, HSCAN, ZSCAN: O(1) per iteration
Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: State Management, Distributed Systems, Performance Optimization Compatible With: redis-py, Redis 6.0+, Redis 7.0+