rate-limiting

Rate Limiting Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "rate-limiting" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-rate-limiting

Rate Limiting Patterns

Protect APIs with distributed rate limiting using Redis and modern algorithms.

Overview

  • Protecting public APIs from abuse

  • Implementing tiered rate limits (free/pro/enterprise)

  • Scaling rate limiting across multiple instances

  • Preventing brute force attacks on auth endpoints

  • Managing third-party API consumption

Algorithm Selection

Algorithm Use Case Burst Handling

Token Bucket General API, allows bursts Excellent

Sliding Window Precise, no burst spikes Good

Leaky Bucket Steady rate, queue excess None

Fixed Window Simple, some edge issues Moderate

SlowAPI + Redis (FastAPI)

Basic Setup

from slowapi import Limiter from slowapi.util import get_remote_address from slowapi.middleware import SlowAPIMiddleware

limiter = Limiter( key_func=get_remote_address, storage_uri="redis://localhost:6379", strategy="moving-window", # sliding window )

app = FastAPI() app.state.limiter = limiter app.add_middleware(SlowAPIMiddleware)

Endpoint Limits

from slowapi import Limiter

@router.post("/api/v1/auth/login") @limiter.limit("10/minute") # Strict for auth async def login(request: Request, credentials: LoginRequest): ...

@router.get("/api/v1/analyses") @limiter.limit("100/minute") # Normal for reads async def list_analyses(request: Request): ...

@router.post("/api/v1/analyses") @limiter.limit("20/minute") # Moderate for writes async def create_analysis(request: Request, data: AnalysisCreate): ...

User-Based Limits

def get_user_identifier(request: Request) -> str: """Rate limit by user ID if authenticated, else IP.""" if hasattr(request.state, "user"): return f"user:{request.state.user.id}" return f"ip:{get_remote_address(request)}"

limiter = Limiter(key_func=get_user_identifier)

Token Bucket with Redis (Custom)

import redis.asyncio as redis from datetime import datetime, timezone

class TokenBucketLimiter: def init( self, redis_client: redis.Redis, capacity: int = 100, refill_rate: float = 10.0, # tokens per second ): self.redis = redis_client self.capacity = capacity self.refill_rate = refill_rate

async def is_allowed(self, key: str, tokens: int = 1) -> bool:
    """Check if request is allowed, consume tokens atomically."""
    lua_script = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local tokens_requested = tonumber(ARGV[3])
    local now = tonumber(ARGV[4])

    local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
    local current_tokens = tonumber(bucket[1]) or capacity
    local last_update = tonumber(bucket[2]) or now

    -- Calculate refill
    local elapsed = now - last_update
    local refill = elapsed * refill_rate
    current_tokens = math.min(capacity, current_tokens + refill)

    -- Check and consume
    if current_tokens >= tokens_requested then
        current_tokens = current_tokens - tokens_requested
        redis.call('HMSET', key, 'tokens', current_tokens, 'last_update', now)
        redis.call('EXPIRE', key, 3600)
        return 1
    else
        return 0
    end
    """
    now = datetime.now(timezone.utc).timestamp()
    result = await self.redis.eval(
        lua_script, 1, key,
        self.capacity, self.refill_rate, tokens, now
    )
    return result == 1

Sliding Window Counter

class SlidingWindowLimiter: def init(self, redis_client: redis.Redis, window_seconds: int = 60): self.redis = redis_client self.window = window_seconds

async def is_allowed(self, key: str, limit: int) -> tuple[bool, int]:
    """Returns (allowed, remaining)."""
    now = datetime.now(timezone.utc).timestamp()
    window_start = now - self.window

    pipe = self.redis.pipeline()
    # Remove old entries
    pipe.zremrangebyscore(key, 0, window_start)
    # Count current window
    pipe.zcard(key)
    # Add this request
    pipe.zadd(key, {str(now): now})
    # Set expiry
    pipe.expire(key, self.window * 2)

    results = await pipe.execute()
    current_count = results[1]

    if current_count < limit:
        return True, limit - current_count - 1
    return False, 0

Tiered Rate Limits

from enum import Enum

class UserTier(Enum): FREE = "free" PRO = "pro" ENTERPRISE = "enterprise"

TIER_LIMITS = { UserTier.FREE: {"requests": 100, "window": 3600}, # 100/hour UserTier.PRO: {"requests": 1000, "window": 3600}, # 1000/hour UserTier.ENTERPRISE: {"requests": 10000, "window": 3600}, # 10000/hour }

async def get_rate_limit(user: User) -> str: limits = TIER_LIMITS[user.tier] return f"{limits['requests']}/{limits['window']}seconds"

@router.get("/api/v1/data") @limiter.limit(get_rate_limit) async def get_data(request: Request, user: User = Depends(get_current_user)): ...

Response Headers (RFC 6585)

from fastapi import Response

async def add_rate_limit_headers( response: Response, limit: int, remaining: int, reset_at: datetime, ): response.headers["X-RateLimit-Limit"] = str(limit) response.headers["X-RateLimit-Remaining"] = str(remaining) response.headers["X-RateLimit-Reset"] = str(int(reset_at.timestamp())) response.headers["Retry-After"] = str(int((reset_at - datetime.now(timezone.utc)).seconds))

Error Response (429)

from fastapi import HTTPException from fastapi.responses import JSONResponse

def rate_limit_exceeded_handler(request: Request, exc: Exception): return JSONResponse( status_code=429, content={ "type": "https://api.example.com/errors/rate-limit-exceeded", "title": "Too Many Requests", "status": 429, "detail": "Rate limit exceeded. Please retry after the reset time.", "instance": str(request.url), }, headers={ "Retry-After": "60", "X-RateLimit-Limit": "100", "X-RateLimit-Remaining": "0", } )

Anti-Patterns (FORBIDDEN)

NEVER use in-memory counters in distributed systems

request_counts = {} # Lost on restart, not shared across instances

NEVER skip rate limiting on internal APIs (defense in depth)

@router.get("/internal/admin") async def admin_endpoint(): # No rate limit = vulnerable ...

NEVER use fixed window without considering edge spikes

A user can hit 100 at 0:59 and 100 at 1:01 = 200 in 2 seconds

Key Decisions

Decision Recommendation

Storage Redis (distributed, atomic)

Algorithm Token bucket for most APIs

Key User ID if auth, else IP + fingerprint

Auth endpoints 10/min (strict)

Read endpoints 100-1000/min (based on tier)

Write endpoints 20-100/min (moderate)

Related Skills

  • auth-patterns

  • Authentication integration

  • resilience-patterns

  • Circuit breakers

  • observability-monitoring

  • Rate limit metrics

Capability Details

token-bucket

Keywords: token bucket, rate limit, burst, capacity Solves:

  • How do I implement token bucket rate limiting?

  • Allow bursts while limiting rate

sliding-window

Keywords: sliding window, moving window, rate limit Solves:

  • How to implement precise rate limiting?

  • Avoid fixed window edge cases

slowapi-redis

Keywords: slowapi, fastapi rate limit, redis limiter Solves:

  • How to add rate limiting to FastAPI?

  • Distributed rate limiting

tiered-limits

Keywords: tiered, user tier, free pro enterprise Solves:

  • Different rate limits per subscription tier

  • User-based rate limiting

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

rag-retrieval

No summary provided by upstream source.

Repository SourceNeeds Review
General

memory

No summary provided by upstream source.

Repository SourceNeeds Review
General

zustand-patterns

No summary provided by upstream source.

Repository SourceNeeds Review