redis-state-management

Redis State Management

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "redis-state-management" with this command: npx skills add manutej/luxor-claude-marketplace/manutej-luxor-claude-marketplace-redis-state-management

Redis State Management

A comprehensive skill for mastering Redis state management patterns in distributed systems. This skill covers caching strategies, session management, pub/sub messaging, distributed locks, data structures, and production-ready patterns using redis-py.

When to Use This Skill

Use this skill when:

  • Implementing high-performance caching layers for web applications

  • Managing user sessions in distributed environments

  • Building real-time messaging and event distribution systems

  • Coordinating distributed processes with locks and synchronization

  • Storing and querying structured data with Redis data structures

  • Optimizing application performance with Redis

  • Scaling applications horizontally with shared state

  • Implementing rate limiting, counters, and analytics

  • Building microservices with Redis as a communication layer

  • Managing temporary data with automatic expiration (TTL)

  • Implementing leaderboards, queues, and real-time features

Core Concepts

Redis Fundamentals

Redis (Remote Dictionary Server) is an in-memory data structure store used as:

  • Database: Persistent key-value storage

  • Cache: High-speed data layer

  • Message Broker: Pub/sub and stream messaging

  • Session Store: Distributed session management

Key Characteristics:

  • In-memory storage (microsecond latency)

  • Optional persistence (RDB snapshots, AOF logs)

  • Rich data structures beyond key-value

  • Atomic operations on complex data types

  • Built-in replication and clustering

  • Pub/sub messaging support

  • Lua scripting for complex operations

  • Pipelining for batch operations

Redis Data Structures

Redis provides multiple data types for different use cases:

Strings: Simple key-value pairs, binary safe

  • Use for: Cache values, counters, flags, JSON objects

  • Max size: 512 MB

  • Commands: SET, GET, INCR, APPEND

Hashes: Field-value maps (objects)

  • Use for: User profiles, configuration objects, small entities

  • Efficient for storing objects with multiple fields

  • Commands: HSET, HGET, HMGET, HINCRBY

Lists: Ordered collections (linked lists)

  • Use for: Queues, activity feeds, recent items

  • Operations at head/tail are O(1)

  • Commands: LPUSH, RPUSH, LPOP, RPOP, LRANGE

Sets: Unordered unique collections

  • Use for: Tags, unique visitors, relationships

  • Set operations: union, intersection, difference

  • Commands: SADD, SMEMBERS, SISMEMBER, SINTER

Sorted Sets: Ordered sets with scores

  • Use for: Leaderboards, time-series, priority queues

  • Range queries by score or rank

  • Commands: ZADD, ZRANGE, ZRANGEBYSCORE, ZRANK

Streams: Append-only logs with consumer groups

  • Use for: Event sourcing, activity logs, message queues

  • Built-in consumer group support

  • Commands: XADD, XREAD, XREADGROUP

Connection Management

Connection Pools: Redis connections are expensive to create. Always use connection pools:

import redis

Connection pool (recommended)

pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10) r = redis.Redis(connection_pool=pool)

Direct connection (avoid in production)

r = redis.Redis(host='localhost', port=6379, db=0)

Best Practices:

  • Use connection pools for all applications

  • Set appropriate max_connections based on workload

  • Enable decode_responses=True for string data

  • Configure socket_timeout and socket_keepalive

  • Handle connection errors with retries

Data Persistence

Redis offers two persistence mechanisms:

RDB (Redis Database): Point-in-time snapshots

  • Compact binary format

  • Fast restart times

  • Lower disk I/O

  • Potential data loss between snapshots

AOF (Append-Only File): Log of write operations

  • Better durability (fsync policies)

  • Larger files, slower restarts

  • Can be automatically rewritten/compacted

  • Minimal data loss potential

Hybrid Approach: RDB + AOF for best of both worlds

RESP 3 Protocol

Redis Serialization Protocol version 3 offers:

  • Client-side caching support

  • Better data type support

  • Push notifications

  • Performance improvements

import redis from redis.cache import CacheConfig

Enable RESP3 with client-side caching

r = redis.Redis(host='localhost', port=6379, protocol=3, cache_config=CacheConfig())

Caching Strategies

Cache-Aside (Lazy Loading)

Pattern: Application checks cache first, loads from database on miss

import redis import json from typing import Optional, Dict, Any

r = redis.Redis(decode_responses=True)

def get_user(user_id: int) -> Optional[Dict[str, Any]]: """Cache-aside pattern for user data.""" cache_key = f"user:{user_id}"

# Try cache first
cached_data = r.get(cache_key)
if cached_data:
    return json.loads(cached_data)

# Cache miss - load from database
user_data = database.get_user(user_id)  # Your DB query
if user_data:
    # Store in cache with 1 hour TTL
    r.setex(cache_key, 3600, json.dumps(user_data))

return user_data

Advantages:

  • Only requested data is cached (efficient memory usage)

  • Cache failures don't break the application

  • Simple to implement

Disadvantages:

  • Cache miss penalty (latency spike)

  • Thundering herd on popular items

  • Stale data until cache expiration

Write-Through Cache

Pattern: Write to cache and database simultaneously

def update_user(user_id: int, user_data: Dict[str, Any]) -> bool: """Write-through pattern for user updates.""" cache_key = f"user:{user_id}"

# Write to database first
success = database.update_user(user_id, user_data)

if success:
    # Update cache immediately
    r.setex(cache_key, 3600, json.dumps(user_data))

return success

Advantages:

  • Cache always consistent with database

  • No read penalty for recently written data

Disadvantages:

  • Write latency increases

  • Unused data may be cached

  • Extra cache write overhead

Write-Behind (Write-Back) Cache

Pattern: Write to cache immediately, sync to database asynchronously

import redis import json from queue import Queue from threading import Thread

r = redis.Redis(decode_responses=True) write_queue = Queue()

def async_writer(): """Background worker to sync cache to database.""" while True: user_id, user_data = write_queue.get() try: database.update_user(user_id, user_data) except Exception as e: # Log error, potentially retry print(f"Failed to write user {user_id}: {e}") finally: write_queue.task_done()

Start background writer

Thread(target=async_writer, daemon=True).start()

def update_user_fast(user_id: int, user_data: Dict[str, Any]): """Write-behind pattern for fast writes.""" cache_key = f"user:{user_id}"

# Write to cache immediately (fast)
r.setex(cache_key, 3600, json.dumps(user_data))

# Queue database write (async)
write_queue.put((user_id, user_data))

Advantages:

  • Minimal write latency

  • Can batch database writes

  • Handles write spikes

Disadvantages:

  • Risk of data loss if cache fails

  • Complex error handling

  • Consistency challenges

Cache Invalidation Strategies

Time-based Expiration (TTL):

Set key with expiration

r.setex("session:abc123", 1800, session_data) # 30 minutes

Or set TTL on existing key

r.expire("user:profile:123", 3600) # 1 hour

Check remaining TTL

ttl = r.ttl("user:profile:123")

Event-based Invalidation:

def update_product(product_id: int, product_data: dict): """Invalidate cache on update.""" # Update database database.update_product(product_id, product_data)

# Invalidate related caches
r.delete(f"product:{product_id}")
r.delete(f"product_list:category:{product_data['category']}")
r.delete("products:featured")

Pattern-based Invalidation:

Delete all keys matching pattern

def invalidate_user_cache(user_id: int): """Invalidate all cache entries for a user.""" pattern = f"user:{user_id}:*"

# Find and delete matching keys
for key in r.scan_iter(match=pattern, count=100):
    r.delete(key)

Cache Stampede Prevention

Problem: Multiple requests simultaneously miss cache and query database

Solution 1: Probabilistic Early Expiration

import time import random

def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0): """Prevent stampede with probabilistic early recomputation.""" value = r.get(key)

if value is None:
    # Cache miss - compute and cache
    value = compute_value(key)
    r.setex(key, ttl, value)
    return value

# Check if we should recompute early
current_time = time.time()
delta = current_time - float(r.get(f"{key}:timestamp") or 0)
expiry = ttl * random.random() * beta

if delta > expiry:
    # Recompute in background
    value = compute_value(key)
    r.setex(key, ttl, value)
    r.set(f"{key}:timestamp", current_time)

return value

Solution 2: Locking

from contextlib import contextmanager

@contextmanager def cache_lock(key: str, timeout: int = 10): """Acquire lock for cache computation.""" lock_key = f"{key}:lock" identifier = str(time.time())

# Try to acquire lock
if r.set(lock_key, identifier, nx=True, ex=timeout):
    try:
        yield True
    finally:
        # Release lock
        if r.get(lock_key) == identifier:
            r.delete(lock_key)
else:
    yield False

def get_with_lock(key: str): """Use lock to prevent stampede.""" value = r.get(key)

if value is None:
    with cache_lock(key) as acquired:
        if acquired:
            # We got the lock - compute value
            value = compute_value(key)
            r.setex(key, 3600, value)
        else:
            # Someone else is computing - wait and retry
            time.sleep(0.1)
            value = r.get(key) or compute_value(key)

return value

Session Management

Distributed Session Storage

Basic Session Management:

import redis import json import uuid from datetime import datetime, timedelta

r = redis.Redis(decode_responses=True)

class SessionManager: def init(self, ttl: int = 1800): """Session manager with Redis backend.

    Args:
        ttl: Session timeout in seconds (default 30 minutes)
    """
    self.ttl = ttl

def create_session(self, user_id: int, data: dict = None) -> str:
    """Create new session and return session ID."""
    session_id = str(uuid.uuid4())
    session_key = f"session:{session_id}"

    session_data = {
        "user_id": user_id,
        "created_at": datetime.utcnow().isoformat(),
        "data": data or {}
    }

    r.setex(session_key, self.ttl, json.dumps(session_data))
    return session_id

def get_session(self, session_id: str) -> dict:
    """Retrieve session data and refresh TTL."""
    session_key = f"session:{session_id}"
    session_data = r.get(session_key)

    if session_data:
        # Refresh TTL on access (sliding expiration)
        r.expire(session_key, self.ttl)
        return json.loads(session_data)

    return None

def update_session(self, session_id: str, data: dict) -> bool:
    """Update session data."""
    session_key = f"session:{session_id}"
    session_data = self.get_session(session_id)

    if session_data:
        session_data["data"].update(data)
        r.setex(session_key, self.ttl, json.dumps(session_data))
        return True

    return False

def delete_session(self, session_id: str) -> bool:
    """Delete session (logout)."""
    session_key = f"session:{session_id}"
    return r.delete(session_key) > 0

Session with Hash Storage

More efficient for session objects:

class HashSessionManager: """Session manager using Redis hashes for better performance."""

def __init__(self, ttl: int = 1800):
    self.ttl = ttl

def create_session(self, user_id: int, **kwargs) -> str:
    """Create session using hash."""
    session_id = str(uuid.uuid4())
    session_key = f"session:{session_id}"

    # Store as hash for efficient field access
    session_fields = {
        "user_id": str(user_id),
        "created_at": datetime.utcnow().isoformat(),
        **{k: str(v) for k, v in kwargs.items()}
    }

    r.hset(session_key, mapping=session_fields)
    r.expire(session_key, self.ttl)

    return session_id

def get_field(self, session_id: str, field: str) -> str:
    """Get single session field efficiently."""
    session_key = f"session:{session_id}"
    value = r.hget(session_key, field)

    if value:
        r.expire(session_key, self.ttl)  # Refresh TTL

    return value

def set_field(self, session_id: str, field: str, value: str) -> bool:
    """Update single session field."""
    session_key = f"session:{session_id}"

    if r.exists(session_key):
        r.hset(session_key, field, value)
        r.expire(session_key, self.ttl)
        return True

    return False

def get_all(self, session_id: str) -> dict:
    """Get all session fields."""
    session_key = f"session:{session_id}"
    data = r.hgetall(session_key)

    if data:
        r.expire(session_key, self.ttl)

    return data

User Activity Tracking

def track_user_activity(user_id: int, action: str): """Track user activity with automatic expiration.""" activity_key = f"user:{user_id}:activity" timestamp = datetime.utcnow().isoformat()

# Add activity to list
r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))

# Keep only last 100 activities
r.ltrim(activity_key, 0, 99)

# Set expiration (30 days)
r.expire(activity_key, 2592000)

def get_recent_activity(user_id: int, limit: int = 10) -> list: """Get recent user activities.""" activity_key = f"user:{user_id}:activity" activities = r.lrange(activity_key, 0, limit - 1)

return [json.loads(a) for a in activities]

Pub/Sub Patterns

Basic Publisher/Subscriber

Publisher:

import redis

r = redis.Redis(decode_responses=True)

def publish_event(channel: str, message: dict): """Publish event to channel.""" import json r.publish(channel, json.dumps(message))

Example usage

publish_event("notifications", { "type": "user_signup", "user_id": 12345, "timestamp": datetime.utcnow().isoformat() })

Subscriber:

import redis import json

def handle_message(message): """Process received message.""" data = json.loads(message['data']) print(f"Received: {data}")

Initialize pubsub

r = redis.Redis(decode_responses=True) p = r.pubsub()

Subscribe to channels

p.subscribe('notifications', 'alerts')

Listen for messages

for message in p.listen(): if message['type'] == 'message': handle_message(message)

Pattern-Based Subscriptions

Subscribe to multiple channels with patterns

p = r.pubsub() p.psubscribe('user:', 'notification:')

Get messages from pattern subscriptions

for message in p.listen(): if message['type'] == 'pmessage': channel = message['channel'] pattern = message['pattern'] data = message['data'] print(f"Pattern {pattern} matched {channel}: {data}")

Async Pub/Sub with Background Thread

import redis import time

r = redis.Redis(decode_responses=True) p = r.pubsub()

def message_handler(message): """Handle messages in background thread.""" print(f"Handler received: {message['data']}")

Subscribe with handler

p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})

Run in background thread

thread = p.run_in_thread(sleep_time=0.001)

Publish some messages

r.publish('notifications', 'Hello!') r.publish('alerts', 'Warning!')

time.sleep(1)

Stop background thread

thread.stop()

Async Pub/Sub with asyncio

import asyncio import redis.asyncio as redis

async def reader(channel: redis.client.PubSub): """Async message reader.""" while True: message = await channel.get_message(ignore_subscribe_messages=True, timeout=None) if message is not None: print(f"Received: {message}")

        # Stop on specific message
        if message["data"].decode() == "STOP":
            break

async def pubsub_example(): """Async pub/sub example.""" r = await redis.from_url("redis://localhost")

async with r.pubsub() as pubsub:
    # Subscribe to channels
    await pubsub.subscribe("channel:1", "channel:2")

    # Create reader task
    reader_task = asyncio.create_task(reader(pubsub))

    # Publish messages
    await r.publish("channel:1", "Hello")
    await r.publish("channel:2", "World")
    await r.publish("channel:1", "STOP")

    # Wait for reader to finish
    await reader_task

await r.close()

Run async example

asyncio.run(pubsub_example())

Sharded Pub/Sub (Redis 7.0+)

from redis.cluster import RedisCluster, ClusterNode

Connect to cluster

rc = RedisCluster(startup_nodes=[ ClusterNode('localhost', 6379), ClusterNode('localhost', 6380) ])

Create sharded pubsub

p = rc.pubsub() p.ssubscribe('foo')

Get message from specific node

message = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))

Distributed Locks

Simple Lock Implementation

import redis import time import uuid

class RedisLock: """Simple distributed lock using Redis."""

def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
    self.redis = redis_client
    self.key = f"lock:{key}"
    self.timeout = timeout
    self.identifier = str(uuid.uuid4())

def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
    """Acquire lock."""
    end_time = time.time() + (timeout or self.timeout)

    while True:
        # Try to set lock with NX (only if not exists) and EX (expiration)
        if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
            return True

        if not blocking:
            return False

        if timeout and time.time() > end_time:
            return False

        # Wait before retry
        time.sleep(0.01)

def release(self) -> bool:
    """Release lock only if we own it."""
    # Use Lua script for atomic check-and-delete
    lua_script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """

    result = self.redis.eval(lua_script, 1, self.key, self.identifier)
    return result == 1

def __enter__(self):
    """Context manager support."""
    self.acquire()
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager cleanup."""
    self.release()

Usage example

r = redis.Redis() lock = RedisLock(r, "resource:123", timeout=5)

with lock: # Critical section - only one process at a time print("Processing resource 123") process_resource()

Advanced Lock with Auto-Renewal

import threading

class RenewableLock: """Distributed lock with automatic renewal."""

def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
    self.redis = redis_client
    self.key = f"lock:{key}"
    self.timeout = timeout
    self.identifier = str(uuid.uuid4())
    self.renewal_thread = None
    self.stop_renewal = threading.Event()

def _renew_lock(self):
    """Background task to renew lock."""
    while not self.stop_renewal.is_set():
        time.sleep(self.timeout / 3)  # Renew at 1/3 of timeout

        # Renew only if we still own the lock
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("expire", KEYS[1], ARGV[2])
        else
            return 0
        end
        """

        result = self.redis.eval(lua_script, 1, self.key,
                               self.identifier, self.timeout)

        if result == 0:
            # We lost the lock
            self.stop_renewal.set()

def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
    """Acquire lock and start auto-renewal."""
    if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
        # Start renewal thread
        self.stop_renewal.clear()
        self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
        self.renewal_thread.start()
        return True

    return False

def release(self) -> bool:
    """Release lock and stop renewal."""
    self.stop_renewal.set()

    lua_script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """

    result = self.redis.eval(lua_script, 1, self.key, self.identifier)
    return result == 1

Redlock Algorithm (Multiple Redis Instances)

class Redlock: """Redlock algorithm for distributed locking across multiple Redis instances."""

def __init__(self, redis_instances: list):
    """
    Args:
        redis_instances: List of Redis client connections
    """
    self.instances = redis_instances
    self.quorum = len(redis_instances) // 2 + 1

def acquire(self, resource: str, ttl: int = 10000) -> tuple:
    """
    Acquire lock across multiple Redis instances.

    Returns:
        (success: bool, lock_identifier: str)
    """
    identifier = str(uuid.uuid4())
    start_time = int(time.time() * 1000)

    # Try to acquire lock on all instances
    acquired = 0
    for instance in self.instances:
        try:
            if instance.set(f"lock:{resource}", identifier,
                          nx=True, px=ttl):
                acquired += 1
        except Exception:
            pass

    # Calculate elapsed time
    elapsed = int(time.time() * 1000) - start_time
    validity_time = ttl - elapsed - 100  # drift compensation

    # Check if we got quorum
    if acquired >= self.quorum and validity_time > 0:
        return True, identifier
    else:
        # Release locks if we didn't get quorum
        self._release_all(resource, identifier)
        return False, None

def _release_all(self, resource: str, identifier: str):
    """Release lock on all instances."""
    lua_script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """

    for instance in self.instances:
        try:
            instance.eval(lua_script, 1, f"lock:{resource}", identifier)
        except Exception:
            pass

Data Structures and Operations

Working with Hashes

User profile storage

def save_user_profile(user_id: int, profile: dict): """Save user profile as hash.""" key = f"user:profile:{user_id}" r.hset(key, mapping=profile) r.expire(key, 86400) # 24 hour TTL

def get_user_profile(user_id: int) -> dict: """Get complete user profile.""" key = f"user:profile:{user_id}" return r.hgetall(key)

def update_user_field(user_id: int, field: str, value: str): """Update single profile field.""" key = f"user:profile:{user_id}" r.hset(key, field, value)

Example usage

save_user_profile(123, { "username": "alice", "email": "alice@example.com", "age": "30" })

Atomic increment

r.hincrby("user:profile:123", "login_count", 1)

Working with Lists

Job queue implementation

def enqueue_job(queue_name: str, job_data: dict): """Add job to queue.""" key = f"queue:{queue_name}" r.rpush(key, json.dumps(job_data))

def dequeue_job(queue_name: str, timeout: int = 0) -> dict: """Get job from queue (blocking).""" key = f"queue:{queue_name}"

if timeout > 0:
    # Blocking pop with timeout
    result = r.blpop(key, timeout=timeout)
    if result:
        _, job_data = result
        return json.loads(job_data)
else:
    # Non-blocking pop
    job_data = r.lpop(key)
    if job_data:
        return json.loads(job_data)

return None

Activity feed

def add_to_feed(user_id: int, activity: dict): """Add activity to user feed.""" key = f"feed:{user_id}" r.lpush(key, json.dumps(activity)) r.ltrim(key, 0, 99) # Keep only latest 100 items r.expire(key, 604800) # 7 days

def get_feed(user_id: int, start: int = 0, end: int = 19) -> list: """Get user feed with pagination.""" key = f"feed:{user_id}" items = r.lrange(key, start, end) return [json.loads(item) for item in items]

Working with Sets

Tags and relationships

def add_tags(item_id: int, tags: list): """Add tags to item.""" key = f"item:{item_id}:tags" r.sadd(key, *tags)

def get_tags(item_id: int) -> set: """Get all tags for item.""" key = f"item:{item_id}:tags" return r.smembers(key)

def find_items_with_all_tags(tags: list) -> set: """Find items having all specified tags.""" keys = [f"item:*:tags" for _ in tags] # This is simplified - in practice, you'd need to track item IDs differently return r.sinter(*keys)

Online users tracking

def user_online(user_id: int): """Mark user as online.""" r.sadd("users:online", user_id) r.expire(f"user:{user_id}:heartbeat", 60)

def user_offline(user_id: int): """Mark user as offline.""" r.srem("users:online", user_id)

def get_online_users() -> set: """Get all online users.""" return r.smembers("users:online")

def get_online_count() -> int: """Get count of online users.""" return r.scard("users:online")

Working with Sorted Sets

Leaderboard implementation

def update_score(leaderboard: str, user_id: int, score: float): """Update user score in leaderboard.""" key = f"leaderboard:{leaderboard}" r.zadd(key, {user_id: score})

def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list: """Get top players (descending order).""" key = f"leaderboard:{leaderboard}" # ZREVRANGE for descending order (highest scores first) return r.zrevrange(key, start, end, withscores=True)

def get_user_rank(leaderboard: str, user_id: int) -> int: """Get user's rank (0-indexed).""" key = f"leaderboard:{leaderboard}" # ZREVRANK for descending rank rank = r.zrevrank(key, user_id) return rank if rank is not None else -1

def get_user_score(leaderboard: str, user_id: int) -> float: """Get user's score.""" key = f"leaderboard:{leaderboard}" score = r.zscore(key, user_id) return score if score is not None else 0.0

def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list: """Get users within score range.""" key = f"leaderboard:{leaderboard}" return r.zrangebyscore(key, min_score, max_score, withscores=True)

Time-based sorted set (activity stream)

def add_activity(user_id: int, activity: str): """Add timestamped activity.""" key = f"user:{user_id}:activities" timestamp = time.time() r.zadd(key, {activity: timestamp})

# Keep only last 24 hours
cutoff = timestamp - 86400
r.zremrangebyscore(key, '-inf', cutoff)

def get_recent_activities(user_id: int, count: int = 10) -> list: """Get recent activities.""" key = f"user:{user_id}:activities" # Get most recent (highest timestamps) return r.zrevrange(key, 0, count - 1, withscores=True)

Working with Streams

Event stream

def add_event(stream_key: str, event_data: dict) -> str: """Add event to stream.""" # Returns auto-generated ID (timestamp-sequence) event_id = r.xadd(stream_key, event_data) return event_id

def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list: """Read events from stream.""" events = r.xread({stream_key: start_id}, count=count)

# events format: [(stream_name, [(id, data), (id, data), ...])]
if events:
    _, event_list = events[0]
    return event_list

return []

Consumer groups

def create_consumer_group(stream_key: str, group_name: str): """Create consumer group for stream.""" try: r.xgroup_create(name=stream_key, groupname=group_name, id='0') except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise

def read_from_group(stream_key: str, group_name: str, consumer_name: str, count: int = 10) -> list: """Read events as consumer in group.""" # Read new messages with '>' events = r.xreadgroup( groupname=group_name, consumername=consumer_name, streams={stream_key: '>'}, count=count, block=5000 # 5 second timeout )

if events:
    _, event_list = events[0]
    return event_list

return []

def acknowledge_event(stream_key: str, group_name: str, event_id: str): """Acknowledge processed event.""" r.xack(stream_key, group_name, event_id)

Example: Processing events with consumer group

def process_events(stream_key: str, group_name: str, consumer_name: str): """Process events from stream.""" create_consumer_group(stream_key, group_name)

while True:
    events = read_from_group(stream_key, group_name, consumer_name, count=10)

    for event_id, event_data in events:
        try:
            # Process event
            process_event(event_data)

            # Acknowledge successful processing
            acknowledge_event(stream_key, group_name, event_id)
        except Exception as e:
            print(f"Failed to process event {event_id}: {e}")
            # Event remains unacknowledged for retry

Performance Optimization

Pipelining for Batch Operations

Without pipelining (slow - multiple round trips)

for i in range(1000): r.set(f"key:{i}", f"value:{i}")

With pipelining (fast - single round trip)

pipe = r.pipeline() for i in range(1000): pipe.set(f"key:{i}", f"value:{i}") results = pipe.execute()

Pipelining with reads

pipe = r.pipeline() for i in range(100): pipe.get(f"key:{i}") values = pipe.execute()

Builder pattern with pipeline

class DataLoader: def init(self): self.pipeline = r.pipeline()

def add_user(self, user_id: int, user_data: dict):
    """Add user data."""
    self.pipeline.hset(f"user:{user_id}", mapping=user_data)
    return self

def add_to_set(self, set_name: str, value: str):
    """Add to set."""
    self.pipeline.sadd(set_name, value)
    return self

def execute(self):
    """Execute all pipelined commands."""
    return self.pipeline.execute()

Usage

loader = DataLoader() results = (loader .add_user(1, {"name": "Alice", "email": "alice@example.com"}) .add_user(2, {"name": "Bob", "email": "bob@example.com"}) .add_to_set("active_users", "1") .add_to_set("active_users", "2") .execute())

Transactions with WATCH

Optimistic locking with WATCH

def transfer_credits(from_user: int, to_user: int, amount: int) -> bool: """Transfer credits between users with optimistic locking."""

with r.pipeline() as pipe:
    while True:
        try:
            # Watch the keys we're going to modify
            pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")

            # Get current values
            from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
            to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)

            # Check if transfer is possible
            if from_credits < amount:
                pipe.unwatch()
                return False

            # Start transaction
            pipe.multi()
            pipe.set(f"user:{from_user}:credits", from_credits - amount)
            pipe.set(f"user:{to_user}:credits", to_credits + amount)

            # Execute transaction
            pipe.execute()
            return True

        except redis.WatchError:
            # Key was modified by another client - retry
            continue

Lua scripts for atomic operations

increment_script = """ local current = redis.call('GET', KEYS[1]) if not current then current = 0 end local new_val = tonumber(current) + tonumber(ARGV[1]) redis.call('SET', KEYS[1], new_val) return new_val """

Register and use Lua script

increment = r.register_script(increment_script) new_value = increment(keys=['counter:views'], args=[1])

Lua Scripts for Complex Operations

Rate limiting with Lua

rate_limit_script = """ local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local current = redis.call('INCR', key)

if current == 1 then redis.call('EXPIRE', key, window) end

if current > limit then return 0 else return 1 end """

rate_limiter = r.register_script(rate_limit_script)

def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool: """Check if user is within rate limit.""" key = f"rate_limit:{user_id}" result = rate_limiter(keys=[key], args=[limit, window]) return result == 1

Get-or-set pattern with Lua

get_or_set_script = """ local value = redis.call('GET', KEYS[1]) if value then return value else redis.call('SET', KEYS[1], ARGV[1]) redis.call('EXPIRE', KEYS[1], ARGV[2]) return ARGV[1] end """

get_or_set = r.register_script(get_or_set_script)

def get_or_compute(key: str, compute_fn, ttl: int = 3600): """Get value from cache or compute and cache it.""" value = get_or_set(keys=[key], args=["COMPUTING", ttl])

if value == "__COMPUTING__":
    # We set the placeholder - compute the real value
    computed = compute_fn()
    r.setex(key, ttl, computed)
    return computed

return value

Production Patterns

High Availability with Sentinel

from redis.sentinel import Sentinel

Connect to Sentinel

sentinel = Sentinel([ ('sentinel1', 26379), ('sentinel2', 26379), ('sentinel3', 26379) ], socket_timeout=0.5)

Get master connection

master = sentinel.master_for('mymaster', socket_timeout=0.5)

Get replica connection (for read-only operations)

replica = sentinel.slave_for('mymaster', socket_timeout=0.5)

Use master for writes

master.set('key', 'value')

Use replica for reads (optional, for load distribution)

value = replica.get('key')

Async Redis with asyncio

import asyncio import redis.asyncio as redis

async def async_redis_operations(): """Async Redis operations example.""" # Create async connection r = await redis.from_url("redis://localhost")

try:
    # Async operations
    await r.set("async_key", "async_value")
    value = await r.get("async_key")
    print(f"Value: {value}")

    # Async pipeline
    async with r.pipeline(transaction=True) as pipe:
        await pipe.set("key1", "value1")
        await pipe.set("key2", "value2")
        await pipe.get("key1")
        results = await pipe.execute()

    print(f"Pipeline results: {results}")

finally:
    await r.close()

Run async operations

asyncio.run(async_redis_operations())

Connection Pool Configuration

Production-ready connection pool

pool = redis.ConnectionPool( host='localhost', port=6379, db=0, max_connections=50, # Max pool size socket_timeout=5, # Socket timeout socket_connect_timeout=5, # Connection timeout socket_keepalive=True, # Keep TCP connection alive socket_keepalive_options={ socket.TCP_KEEPIDLE: 60, socket.TCP_KEEPINTVL: 10, socket.TCP_KEEPCNT: 3 }, retry_on_timeout=True, # Retry on timeout health_check_interval=30, # Health check every 30s decode_responses=True # Auto-decode bytes to strings )

r = redis.Redis(connection_pool=pool)

Error Handling and Resilience

import redis from redis.exceptions import ConnectionError, TimeoutError import time

class ResilientRedisClient: """Redis client with retry logic and circuit breaker."""

def __init__(self, max_retries: int = 3, backoff: float = 0.1):
    self.redis = redis.Redis(
        host='localhost',
        port=6379,
        socket_timeout=5,
        retry_on_timeout=True
    )
    self.max_retries = max_retries
    self.backoff = backoff

def get_with_retry(self, key: str, default=None):
    """Get value with exponential backoff retry."""
    for attempt in range(self.max_retries):
        try:
            return self.redis.get(key) or default
        except (ConnectionError, TimeoutError) as e:
            if attempt == self.max_retries - 1:
                # Log error and return default
                print(f"Redis error after {self.max_retries} attempts: {e}")
                return default

            # Exponential backoff
            wait_time = self.backoff * (2 ** attempt)
            time.sleep(wait_time)

def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
    """Set value with retry logic."""
    for attempt in range(self.max_retries):
        try:
            if ttl:
                return self.redis.setex(key, ttl, value)
            else:
                return self.redis.set(key, value)
        except (ConnectionError, TimeoutError) as e:
            if attempt == self.max_retries - 1:
                print(f"Redis error after {self.max_retries} attempts: {e}")
                return False

            wait_time = self.backoff * (2 ** attempt)
            time.sleep(wait_time)

Monitoring and Metrics

def get_redis_info(section: str = None) -> dict: """Get Redis server information.""" return r.info(section=section)

def monitor_memory_usage(): """Monitor Redis memory usage.""" info = r.info('memory')

used_memory = info['used_memory_human']
peak_memory = info['used_memory_peak_human']
memory_fragmentation = info['mem_fragmentation_ratio']

print(f"Used Memory: {used_memory}")
print(f"Peak Memory: {peak_memory}")
print(f"Fragmentation Ratio: {memory_fragmentation}")

return info

def monitor_stats(): """Monitor Redis statistics.""" info = r.info('stats')

total_connections = info['total_connections_received']
total_commands = info['total_commands_processed']
ops_per_sec = info['instantaneous_ops_per_sec']

print(f"Total Connections: {total_connections}")
print(f"Total Commands: {total_commands}")
print(f"Ops/sec: {ops_per_sec}")

return info

def get_slow_log(count: int = 10): """Get slow query log.""" slow_log = r.slowlog_get(count)

for entry in slow_log:
    print(f"Command: {entry['command']}")
    print(f"Duration: {entry['duration']} microseconds")
    print(f"Time: {entry['start_time']}")
    print("---")

return slow_log

Best Practices

Key Naming Conventions

Use consistent, hierarchical naming:

Good naming patterns

user:123:profile # User profile data user:123:sessions:abc # User session cache:product:456 # Cached product queue:emails:pending # Email queue lock:resource:789 # Resource lock counter:api:requests:daily # Daily API request counter leaderboard:global:score # Global leaderboard

Avoid

u123 # Too cryptic user_profile_123 # Underscores less common 123:user # Wrong hierarchy

Memory Management

Set TTL on all temporary data

r.setex("temp:data", 3600, value) # Expires in 1 hour

Limit collection sizes

r.lpush("activity_log", entry) r.ltrim("activity_log", 0, 999) # Keep only 1000 items

Use appropriate data structures

Hash is more memory-efficient than multiple keys

r.hset("user:123", mapping={"name": "Alice", "email": "alice@example.com"})

vs

r.set("user:123:name", "Alice") r.set("user:123:email", "alice@example.com")

Monitor memory usage

if r.info('memory')['used_memory'] > threshold: # Implement eviction or cleanup cleanup_old_data()

Security

Use authentication

r = redis.Redis( host='localhost', port=6379, password='your-secure-password', username='your-username' # Redis 6+ )

Use SSL/TLS for production

pool = redis.ConnectionPool( host='redis.example.com', port=6380, connection_class=redis.SSLConnection, ssl_cert_reqs='required', ssl_ca_certs='/path/to/ca-cert.pem' )

Credential provider pattern

from redis import UsernamePasswordCredentialProvider

creds_provider = UsernamePasswordCredentialProvider("username", "password") r = redis.Redis( host="localhost", port=6379, credential_provider=creds_provider )

Testing

import fakeredis import pytest

@pytest.fixture def redis_client(): """Provide fake Redis client for testing.""" return fakeredis.FakeRedis(decode_responses=True)

def test_caching(redis_client): """Test caching logic.""" # Test cache miss assert redis_client.get("test_key") is None

# Test cache set
redis_client.setex("test_key", 60, "test_value")
assert redis_client.get("test_key") == "test_value"

# Test expiration
assert redis_client.ttl("test_key") <= 60

def test_session_management(redis_client): """Test session operations.""" session_manager = SessionManager(redis_client)

# Create session
session_id = session_manager.create_session(user_id=123)
assert session_id is not None

# Get session
session = session_manager.get_session(session_id)
assert session['user_id'] == 123

# Delete session
assert session_manager.delete_session(session_id) is True
assert session_manager.get_session(session_id) is None

Examples

Example 1: User Session Management with Redis

import redis import json import uuid from datetime import datetime, timedelta

class UserSessionManager: """Complete user session management with Redis."""

def __init__(self, redis_client: redis.Redis, ttl: int = 1800):
    self.redis = redis_client
    self.ttl = ttl

def create_session(self, user_id: int, user_data: dict = None) -> str:
    """Create new user session."""
    session_id = str(uuid.uuid4())
    session_key = f"session:{session_id}"

    session_data = {
        "user_id": user_id,
        "created_at": datetime.utcnow().isoformat(),
        "last_accessed": datetime.utcnow().isoformat(),
        "data": user_data or {}
    }

    # Store session with TTL
    self.redis.setex(session_key, self.ttl, json.dumps(session_data))

    # Track user's active sessions
    self.redis.sadd(f"user:{user_id}:sessions", session_id)

    return session_id

def get_session(self, session_id: str) -> dict:
    """Get session and refresh TTL."""
    session_key = f"session:{session_id}"
    session_data = self.redis.get(session_key)

    if session_data:
        session = json.loads(session_data)
        session['last_accessed'] = datetime.utcnow().isoformat()

        # Refresh TTL
        self.redis.setex(session_key, self.ttl, json.dumps(session))

        return session

    return None

def delete_session(self, session_id: str) -> bool:
    """Delete session."""
    session = self.get_session(session_id)
    if not session:
        return False

    user_id = session['user_id']

    # Remove session
    self.redis.delete(f"session:{session_id}")

    # Remove from user's session set
    self.redis.srem(f"user:{user_id}:sessions", session_id)

    return True

def delete_all_user_sessions(self, user_id: int):
    """Delete all sessions for a user."""
    sessions_key = f"user:{user_id}:sessions"
    session_ids = self.redis.smembers(sessions_key)

    for session_id in session_ids:
        self.redis.delete(f"session:{session_id}")

    self.redis.delete(sessions_key)

def get_user_sessions(self, user_id: int) -> list:
    """Get all active sessions for a user."""
    sessions_key = f"user:{user_id}:sessions"
    session_ids = self.redis.smembers(sessions_key)

    sessions = []
    for session_id in session_ids:
        session = self.get_session(session_id)
        if session:
            session['session_id'] = session_id
            sessions.append(session)

    return sessions

Usage

r = redis.Redis(decode_responses=True) session_mgr = UserSessionManager(r)

Create session

session_id = session_mgr.create_session( user_id=123, user_data={"role": "admin", "permissions": ["read", "write"]} )

Get session

session = session_mgr.get_session(session_id) print(f"User ID: {session['user_id']}")

List all user sessions

sessions = session_mgr.get_user_sessions(123) print(f"Active sessions: {len(sessions)}")

Logout (delete session)

session_mgr.delete_session(session_id)

Example 2: Real-Time Leaderboard

import redis import time

class Leaderboard: """Real-time leaderboard using Redis sorted sets."""

def __init__(self, redis_client: redis.Redis, name: str):
    self.redis = redis_client
    self.key = f"leaderboard:{name}"

def add_score(self, player_id: str, score: float):
    """Add or update player score."""
    self.redis.zadd(self.key, {player_id: score})

def increment_score(self, player_id: str, increment: float):
    """Increment player score."""
    self.redis.zincrby(self.key, increment, player_id)

def get_top(self, count: int = 10) -> list:
    """Get top players."""
    # ZREVRANGE for highest scores first
    players = self.redis.zrevrange(self.key, 0, count - 1, withscores=True)

    return [
        {
            "rank": idx + 1,
            "player_id": player_id,
            "score": score
        }
        for idx, (player_id, score) in enumerate(players)
    ]

def get_rank(self, player_id: str) -> dict:
    """Get player rank and score."""
    score = self.redis.zscore(self.key, player_id)
    if score is None:
        return None

    # ZREVRANK for rank (0-indexed, highest first)
    rank = self.redis.zrevrank(self.key, player_id)

    return {
        "player_id": player_id,
        "rank": rank + 1 if rank is not None else None,
        "score": score
    }

def get_around(self, player_id: str, count: int = 5) -> list:
    """Get players around a specific player."""
    rank = self.redis.zrevrank(self.key, player_id)
    if rank is None:
        return []

    # Get players before and after
    start = max(0, rank - count)
    end = rank + count

    players = self.redis.zrevrange(self.key, start, end, withscores=True)

    return [
        {
            "rank": start + idx + 1,
            "player_id": pid,
            "score": score,
            "is_current": pid == player_id
        }
        for idx, (pid, score) in enumerate(players)
    ]

def get_total_players(self) -> int:
    """Get total number of players."""
    return self.redis.zcard(self.key)

def remove_player(self, player_id: str) -> bool:
    """Remove player from leaderboard."""
    return self.redis.zrem(self.key, player_id) > 0

Usage

r = redis.Redis(decode_responses=True) leaderboard = Leaderboard(r, "global")

Add scores

leaderboard.add_score("alice", 1500) leaderboard.add_score("bob", 2000) leaderboard.add_score("charlie", 1800) leaderboard.increment_score("alice", 200) # alice now at 1700

Get top 10

top_players = leaderboard.get_top(10) for player in top_players: print(f"#{player['rank']}: {player['player_id']} - {player['score']}")

Get player rank

alice_stats = leaderboard.get_rank("alice") print(f"Alice is rank {alice_stats['rank']} with {alice_stats['score']} points")

Get players around alice

nearby = leaderboard.get_around("alice", count=2) for player in nearby: marker = " <-- YOU" if player['is_current'] else "" print(f"#{player['rank']}: {player['player_id']} - {player['score']}{marker}")

Example 3: Distributed Rate Limiter

import redis import time

class RateLimiter: """Distributed rate limiter using Redis."""

def __init__(self, redis_client: redis.Redis):
    self.redis = redis_client

    # Lua script for atomic rate limiting
    self.rate_limit_script = self.redis.register_script("""
        local key = KEYS[1]
        local limit = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])

        local current = redis.call('INCR', key)

        if current == 1 then
            redis.call('EXPIRE', key, window)
        end

        if current > limit then
            return {0, limit, current - 1}
        else
            return {1, limit, current}
        end
    """)

def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
    """
    Check if request is within rate limit.

    Args:
        identifier: User ID, IP address, or API key
        limit: Maximum requests allowed
        window: Time window in seconds

    Returns:
        dict with allowed (bool), limit, current, remaining
    """
    key = f"rate_limit:{identifier}:{int(time.time() // window)}"

    allowed, max_limit, current = self.rate_limit_script(
        keys=[key],
        args=[limit, window]
    )

    return {
        "allowed": bool(allowed),
        "limit": max_limit,
        "current": current,
        "remaining": max(0, max_limit - current),
        "reset_at": (int(time.time() // window) + 1) * window
    }

def sliding_window_check(self, identifier: str, limit: int, window: int) -> dict:
    """
    Sliding window rate limiter using sorted sets.
    More accurate but slightly more expensive.
    """
    key = f"rate_limit:sliding:{identifier}"
    now = time.time()
    window_start = now - window

    # Remove old entries
    self.redis.zremrangebyscore(key, 0, window_start)

    # Count current requests
    current = self.redis.zcard(key)

    if current &#x3C; limit:
        # Add new request
        self.redis.zadd(key, {str(now): now})
        self.redis.expire(key, window)

        return {
            "allowed": True,
            "limit": limit,
            "current": current + 1,
            "remaining": limit - current - 1
        }
    else:
        return {
            "allowed": False,
            "limit": limit,
            "current": current,
            "remaining": 0
        }

Usage

r = redis.Redis(decode_responses=True) limiter = RateLimiter(r)

API rate limiting: 100 requests per minute

user_id = "user_123" result = limiter.check_rate_limit(user_id, limit=100, window=60)

if result["allowed"]: print(f"Request allowed. {result['remaining']} requests remaining.") # Process request else: print(f"Rate limit exceeded. Try again at {result['reset_at']}") # Return 429 Too Many Requests

More accurate sliding window

result = limiter.sliding_window_check(user_id, limit=100, window=60)

Example 4: Distributed Job Queue

import redis import json import time import uuid from typing import Optional, Callable

class JobQueue: """Distributed job queue with Redis."""

def __init__(self, redis_client: redis.Redis, queue_name: str = "default"):
    self.redis = redis_client
    self.queue_name = queue_name
    self.queue_key = f"queue:{queue_name}"
    self.processing_key = f"queue:{queue_name}:processing"

def enqueue(self, job_type: str, payload: dict, priority: int = 0) -> str:
    """
    Add job to queue.

    Args:
        job_type: Type of job (for routing to workers)
        payload: Job data
        priority: Higher priority = processed first (0 = normal)

    Returns:
        job_id
    """
    job_id = str(uuid.uuid4())

    job_data = {
        "id": job_id,
        "type": job_type,
        "payload": payload,
        "enqueued_at": time.time(),
        "attempts": 0
    }

    # Add to queue (use ZADD for priority queue)
    score = -priority  # Negative for higher priority first
    self.redis.zadd(self.queue_key, {json.dumps(job_data): score})

    return job_id

def dequeue(self, timeout: int = 0) -> Optional[dict]:
    """
    Get next job from queue.

    Args:
        timeout: Block for this many seconds (0 = no blocking)

    Returns:
        Job data or None
    """
    # Get highest priority job (lowest score)
    jobs = self.redis.zrange(self.queue_key, 0, 0)

    if not jobs:
        if timeout > 0:
            time.sleep(min(timeout, 1))
            return self.dequeue(timeout - 1)
        return None

    job_json = jobs[0]

    # Move to processing set atomically
    pipe = self.redis.pipeline()
    pipe.zrem(self.queue_key, job_json)
    pipe.zadd(self.processing_key, {job_json: time.time()})
    pipe.execute()

    job_data = json.loads(job_json)
    job_data['attempts'] += 1

    return job_data

def complete(self, job_data: dict):
    """Mark job as completed."""
    job_json = json.dumps({
        k: v for k, v in job_data.items()
        if k != 'attempts'
    })

    # Remove from processing
    self.redis.zrem(self.processing_key, job_json)

def retry(self, job_data: dict, delay: int = 0):
    """Retry failed job."""
    job_json = json.dumps({
        k: v for k, v in job_data.items()
        if k != 'attempts'
    })

    # Remove from processing
    self.redis.zrem(self.processing_key, job_json)

    # Re-enqueue with delay
    if delay > 0:
        time.sleep(delay)

    self.redis.zadd(self.queue_key, {job_json: 0})

def get_stats(self) -> dict:
    """Get queue statistics."""
    return {
        "queued": self.redis.zcard(self.queue_key),
        "processing": self.redis.zcard(self.processing_key)
    }

Worker example

class Worker: """Job worker."""

def __init__(self, queue: JobQueue, handlers: dict):
    self.queue = queue
    self.handlers = handlers

def process_jobs(self):
    """Process jobs from queue."""
    print("Worker started. Waiting for jobs...")

    while True:
        job = self.queue.dequeue(timeout=5)

        if job:
            print(f"Processing job {job['id']} (type: {job['type']})")

            try:
                # Get handler for job type
                handler = self.handlers.get(job['type'])

                if handler:
                    handler(job['payload'])
                    self.queue.complete(job)
                    print(f"Job {job['id']} completed")
                else:
                    print(f"No handler for job type: {job['type']}")
                    self.queue.complete(job)

            except Exception as e:
                print(f"Job {job['id']} failed: {e}")

                if job['attempts'] &#x3C; 3:
                    # Retry with exponential backoff
                    delay = 2 ** job['attempts']
                    print(f"Retrying in {delay}s...")
                    self.queue.retry(job, delay=delay)
                else:
                    print(f"Job {job['id']} failed permanently")
                    self.queue.complete(job)

Usage

r = redis.Redis(decode_responses=True) queue = JobQueue(r, "email_queue")

Enqueue jobs

job_id = queue.enqueue("send_email", { "to": "user@example.com", "subject": "Welcome!", "body": "Thanks for signing up." }, priority=1)

Define handlers

def send_email_handler(payload): print(f"Sending email to {payload['to']}") # Email sending logic here time.sleep(1) # Simulate work

handlers = { "send_email": send_email_handler }

Start worker

worker = Worker(queue, handlers)

worker.process_jobs() # This blocks - run in separate process

Example 5: Real-Time Event Streaming

import redis import json import time from typing import Callable, Optional

class EventStream: """Real-time event streaming with Redis Streams."""

def __init__(self, redis_client: redis.Redis, stream_name: str):
    self.redis = redis_client
    self.stream_name = stream_name

def publish(self, event_type: str, data: dict) -> str:
    """Publish event to stream."""
    event = {
        "type": event_type,
        "data": json.dumps(data),
        "timestamp": time.time()
    }

    # Add to stream (returns auto-generated ID)
    event_id = self.redis.xadd(self.stream_name, event, maxlen=10000)
    return event_id

def read_events(self, last_id: str = '0', count: int = 10) -> list:
    """Read events from stream."""
    events = self.redis.xread(
        {self.stream_name: last_id},
        count=count,
        block=1000  # 1 second timeout
    )

    if not events:
        return []

    _, event_list = events[0]

    return [
        {
            "id": event_id,
            "type": event_data[b'type'].decode(),
            "data": json.loads(event_data[b'data'].decode()),
            "timestamp": float(event_data[b'timestamp'])
        }
        for event_id, event_data in event_list
    ]

def create_consumer_group(self, group_name: str):
    """Create consumer group for parallel processing."""
    try:
        self.redis.xgroup_create(
            name=self.stream_name,
            groupname=group_name,
            id='0',
            mkstream=True
        )
    except redis.ResponseError as e:
        if "BUSYGROUP" not in str(e):
            raise

def consume_events(self, group_name: str, consumer_name: str,
                  count: int = 10) -> list:
    """Consume events as part of consumer group."""
    events = self.redis.xreadgroup(
        groupname=group_name,
        consumername=consumer_name,
        streams={self.stream_name: '>'},
        count=count,
        block=5000
    )

    if not events:
        return []

    _, event_list = events[0]

    return [
        {
            "id": event_id,
            "type": event_data[b'type'].decode(),
            "data": json.loads(event_data[b'data'].decode()),
            "timestamp": float(event_data[b'timestamp'])
        }
        for event_id, event_data in event_list
    ]

def acknowledge(self, group_name: str, event_id: str):
    """Acknowledge processed event."""
    self.redis.xack(self.stream_name, group_name, event_id)

def get_pending(self, group_name: str) -> list:
    """Get pending (unacknowledged) events."""
    pending = self.redis.xpending_range(
        name=self.stream_name,
        groupname=group_name,
        min='-',
        max='+',
        count=100
    )

    return pending

Usage Example: Activity Feed

r = redis.Redis() activity_stream = EventStream(r, "user_activity")

Publish events

activity_stream.publish("user_signup", { "user_id": 123, "email": "alice@example.com" })

activity_stream.publish("post_created", { "user_id": 123, "post_id": 456, "title": "My First Post" })

Read events (simple consumer)

last_id = '0' while True: events = activity_stream.read_events(last_id, count=10)

for event in events:
    print(f"Event: {event['type']}")
    print(f"Data: {event['data']}")
    last_id = event['id']

if not events:
    break

Consumer group example

activity_stream.create_consumer_group("processors")

Worker consuming events

while True: events = activity_stream.consume_events( group_name="processors", consumer_name="worker-1", count=10 )

for event in events:
    try:
        # Process event
        process_event(event)

        # Acknowledge
        activity_stream.acknowledge("processors", event['id'])
    except Exception as e:
        print(f"Failed to process event {event['id']}: {e}")
        # Event remains unacknowledged for retry

Example 6: Cache-Aside Pattern with Multi-Level Caching

import redis import json import hashlib from typing import Optional, Any, Callable

class MultiLevelCache: """Multi-level caching with Redis and local cache."""

def __init__(self, redis_client: redis.Redis,
             local_cache_size: int = 100,
             local_ttl: int = 60,
             redis_ttl: int = 3600):
    self.redis = redis_client
    self.local_cache = {}
    self.local_cache_size = local_cache_size
    self.local_ttl = local_ttl
    self.redis_ttl = redis_ttl

def _make_key(self, namespace: str, key: str) -> str:
    """Generate cache key."""
    return f"cache:{namespace}:{key}"

def get(self, namespace: str, key: str,
        compute_fn: Optional[Callable] = None) -> Optional[Any]:
    """
    Get value from cache with fallback to compute function.

    Lookup order: Local cache → Redis → Compute function
    """
    cache_key = self._make_key(namespace, key)

    # Level 1: Local cache
    if cache_key in self.local_cache:
        entry = self.local_cache[cache_key]
        if time.time() &#x3C; entry['expires_at']:
            return entry['value']
        else:
            del self.local_cache[cache_key]

    # Level 2: Redis cache
    redis_value = self.redis.get(cache_key)
    if redis_value:
        value = json.loads(redis_value)

        # Populate local cache
        self._set_local(cache_key, value)

        return value

    # Level 3: Compute function
    if compute_fn:
        value = compute_fn()
        if value is not None:
            self.set(namespace, key, value)
        return value

    return None

def set(self, namespace: str, key: str, value: Any):
    """Set value in both cache levels."""
    cache_key = self._make_key(namespace, key)
    serialized = json.dumps(value)

    # Set in Redis
    self.redis.setex(cache_key, self.redis_ttl, serialized)

    # Set in local cache
    self._set_local(cache_key, value)

def _set_local(self, key: str, value: Any):
    """Set value in local cache with LRU eviction."""
    # Simple LRU: remove oldest if at capacity
    if len(self.local_cache) >= self.local_cache_size:
        # Remove oldest entry
        oldest_key = min(
            self.local_cache.keys(),
            key=lambda k: self.local_cache[k]['expires_at']
        )
        del self.local_cache[oldest_key]

    self.local_cache[key] = {
        'value': value,
        'expires_at': time.time() + self.local_ttl
    }

def delete(self, namespace: str, key: str):
    """Delete from all cache levels."""
    cache_key = self._make_key(namespace, key)

    # Delete from Redis
    self.redis.delete(cache_key)

    # Delete from local cache
    if cache_key in self.local_cache:
        del self.local_cache[cache_key]

def invalidate_namespace(self, namespace: str):
    """Invalidate all keys in namespace."""
    pattern = f"cache:{namespace}:*"

    # Delete from Redis
    for key in self.redis.scan_iter(match=pattern, count=100):
        self.redis.delete(key)

    # Delete from local cache
    to_delete = [
        k for k in self.local_cache.keys()
        if k.startswith(f"cache:{namespace}:")
    ]
    for k in to_delete:
        del self.local_cache[k]

Usage

r = redis.Redis(decode_responses=True) cache = MultiLevelCache(r)

def get_user(user_id: int) -> dict: """Get user with multi-level caching.""" return cache.get( namespace="users", key=str(user_id), compute_fn=lambda: database.query_user(user_id) )

First call: Queries database, caches result

user = get_user(123)

Second call: Returns from local cache (fastest)

user = get_user(123)

Update user

def update_user(user_id: int, data: dict): database.update_user(user_id, data)

# Invalidate cache
cache.delete("users", str(user_id))

Invalidate all user caches

cache.invalidate_namespace("users")

Example 7: Geo-Location with Redis

import redis

class GeoLocation: """Geo-spatial indexing and queries with Redis."""

def __init__(self, redis_client: redis.Redis, index_name: str):
    self.redis = redis_client
    self.key = f"geo:{index_name}"

def add_location(self, location_id: str, longitude: float, latitude: float):
    """Add location to geo index."""
    self.redis.geoadd(self.key, longitude, latitude, location_id)

def add_locations(self, locations: list):
    """Batch add locations.

    Args:
        locations: List of (location_id, longitude, latitude) tuples
    """
    self.redis.geoadd(self.key, *[
        item for loc in locations
        for item in (loc[1], loc[2], loc[0])
    ])

def get_position(self, location_id: str) -> tuple:
    """Get coordinates of a location."""
    result = self.redis.geopos(self.key, location_id)
    if result and result[0]:
        return result[0]  # (longitude, latitude)
    return None

def find_nearby(self, longitude: float, latitude: float,
               radius: float, unit: str = 'km', count: int = None) -> list:
    """
    Find locations within radius.

    Args:
        longitude: Center longitude
        latitude: Center latitude
        radius: Search radius
        unit: Distance unit ('m', 'km', 'mi', 'ft')
        count: Maximum results
    """
    args = {
        'longitude': longitude,
        'latitude': latitude,
        'radius': radius,
        'unit': unit,
        'withdist': True,
        'withcoord': True,
        'sort': 'ASC'
    }

    if count:
        args['count'] = count

    results = self.redis.georadius(self.key, **args)

    return [
        {
            'location_id': location_id,
            'distance': distance,
            'coordinates': (longitude, latitude)
        }
        for location_id, distance, (longitude, latitude) in results
    ]

def find_nearby_member(self, location_id: str, radius: float,
                      unit: str = 'km', count: int = None) -> list:
    """Find locations near an existing member."""
    args = {
        'member': location_id,
        'radius': radius,
        'unit': unit,
        'withdist': True,
        'sort': 'ASC'
    }

    if count:
        args['count'] = count

    results = self.redis.georadiusbymember(self.key, **args)

    return [
        {
            'location_id': loc_id,
            'distance': distance
        }
        for loc_id, distance in results
        if loc_id != location_id  # Exclude self
    ]

def distance_between(self, location_id1: str, location_id2: str,
                    unit: str = 'km') -> float:
    """Calculate distance between two locations."""
    return self.redis.geodist(self.key, location_id1, location_id2, unit)

Usage Example: Restaurant finder

r = redis.Redis(decode_responses=True) restaurants = GeoLocation(r, "restaurants")

Add restaurants

restaurants.add_locations([ ("rest1", -122.4194, 37.7749), # San Francisco ("rest2", -122.4068, 37.7849), ("rest3", -122.4312, 37.7652), ])

Find restaurants near coordinates

nearby = restaurants.find_nearby( longitude=-122.4194, latitude=37.7749, radius=5, unit='km', count=10 )

for restaurant in nearby: print(f"{restaurant['location_id']}: {restaurant['distance']:.2f} km away")

Find restaurants near a specific restaurant

similar = restaurants.find_nearby_member("rest1", radius=2, unit='km')

Get distance between two restaurants

distance = restaurants.distance_between("rest1", "rest2", unit='km') print(f"Distance: {distance:.2f} km")

Quick Reference

Common Operations

Connection

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

Strings

r.set('key', 'value') r.setex('key', 3600, 'value') # With TTL r.get('key') r.incr('counter')

Hashes

r.hset('user:123', 'name', 'Alice') r.hset('user:123', mapping={'name': 'Alice', 'age': 30}) r.hget('user:123', 'name') r.hgetall('user:123')

Lists

r.lpush('queue', 'item') r.rpush('queue', 'item') r.lpop('queue') r.lrange('queue', 0, -1)

Sets

r.sadd('tags', 'python', 'redis') r.smembers('tags') r.sismember('tags', 'python')

Sorted Sets

r.zadd('leaderboard', {'alice': 100, 'bob': 200}) r.zrange('leaderboard', 0, -1, withscores=True) r.zrank('leaderboard', 'alice')

Expiration

r.expire('key', 3600) r.ttl('key')

Pipelining

pipe = r.pipeline() pipe.set('key1', 'value1') pipe.set('key2', 'value2') results = pipe.execute()

Time Complexity

  • GET, SET: O(1)

  • HGET, HSET: O(1)

  • LPUSH, RPUSH, LPOP, RPOP: O(1)

  • SADD, SREM, SISMEMBER: O(1)

  • ZADD, ZREM: O(log(N))

  • ZRANGE, ZREVRANGE: O(log(N)+M) where M is result size

  • SCAN, SSCAN, HSCAN, ZSCAN: O(1) per iteration

Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: State Management, Distributed Systems, Performance Optimization Compatible With: redis-py, Redis 6.0+, Redis 7.0+

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

docker-compose-orchestration

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-database-engineering

No summary provided by upstream source.

Repository SourceNeeds Review
General

jest-react-testing

No summary provided by upstream source.

Repository SourceNeeds Review
General

ui-design-patterns

No summary provided by upstream source.

Repository SourceNeeds Review