Message Queue Patterns ()
Asynchronous communication patterns for distributed systems using RabbitMQ, Redis Streams, Kafka, and FastStream.
Overview
-
Decoupling services in microservices architecture
-
Implementing pub/sub and work queue patterns
-
Building event-driven systems with reliable delivery
-
Load leveling and buffering between services
-
Task distribution across multiple workers
-
High-throughput event streaming (Kafka)
Quick Reference
FastStream: Unified API ( Recommended)
pip install faststream[kafka,rabbit,redis]
from faststream import FastStream from faststream.kafka import KafkaBroker from pydantic import BaseModel
broker = KafkaBroker("localhost:9092") app = FastStream(broker)
class OrderCreated(BaseModel): order_id: str customer_id: str total: float
@broker.subscriber("orders.created") async def handle_order(event: OrderCreated): """Automatic Pydantic validation and deserialization.""" print(f"Processing order {event.order_id}") await process_order(event)
@broker.publisher("orders.processed") async def publish_processed(order_id: str) -> dict: return {"order_id": order_id, "status": "processed"}
Run with: faststream run app:app
Kafka Producer (aiokafka)
from aiokafka import AIOKafkaProducer import json
class KafkaPublisher: def init(self, bootstrap_servers: str): self.bootstrap_servers = bootstrap_servers self._producer: AIOKafkaProducer | None = None
async def start(self):
self._producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode(),
acks="all", # Wait for all replicas
enable_idempotence=True, # Exactly-once semantics
)
await self._producer.start()
async def publish(
self,
topic: str,
value: dict,
key: str | None = None,
):
await self._producer.send_and_wait(
topic,
value=value,
key=key.encode() if key else None,
)
async def stop(self):
await self._producer.stop()
Kafka Consumer with Consumer Group
from aiokafka import AIOKafkaConsumer from aiokafka.errors import OffsetOutOfRangeError
class KafkaConsumer: def init( self, topic: str, group_id: str, bootstrap_servers: str, ): self.consumer = AIOKafkaConsumer( topic, bootstrap_servers=bootstrap_servers, group_id=group_id, auto_offset_reset="earliest", enable_auto_commit=False, # Manual commit for reliability value_deserializer=lambda v: json.loads(v.decode()), )
async def consume(self, handler):
await self.consumer.start()
try:
async for msg in self.consumer:
try:
await handler(msg.value, msg.key, msg.partition)
await self.consumer.commit()
except Exception as e:
# Handle or send to DLQ
await self.send_to_dlq(msg, e)
finally:
await self.consumer.stop()
RabbitMQ Publisher
import aio_pika from aio_pika import Message, DeliveryMode
class RabbitMQPublisher: def init(self, url: str): self.url = url self._connection = None self._channel = None
async def connect(self):
self._connection = await aio_pika.connect_robust(self.url)
self._channel = await self._connection.channel()
await self._channel.set_qos(prefetch_count=10)
async def publish(self, exchange: str, routing_key: str, message: dict):
exchange_obj = await self._channel.get_exchange(exchange)
await exchange_obj.publish(
Message(
body=json.dumps(message).encode(),
delivery_mode=DeliveryMode.PERSISTENT,
content_type="application/json"
),
routing_key=routing_key
)
RabbitMQ Consumer with Retry
class RabbitMQConsumer: async def consume(self, queue_name: str, handler, max_retries: int = 3): queue = await self._channel.get_queue(queue_name) async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(requeue=False): try: body = json.loads(message.body.decode()) await handler(body) except Exception as e: retry_count = message.headers.get("x-retry-count", 0) if retry_count < max_retries: await self.publish(exchange, routing_key, body, headers={"x-retry-count": retry_count + 1}) else: await self.publish("dlx", "failed", body, headers={"x-error": str(e)})
Redis Streams Consumer Group
import redis.asyncio as redis
class RedisStreamConsumer: def init(self, url: str, stream: str, group: str, consumer: str): self.redis = redis.from_url(url) self.stream, self.group, self.consumer = stream, group, consumer
async def setup(self):
try:
await self.redis.xgroup_create(self.stream, self.group, "0", mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e): raise
async def consume(self, handler):
while True:
messages = await self.redis.xreadgroup(
groupname=self.group, consumername=self.consumer,
streams={self.stream: ">"}, count=10, block=5000
)
for stream, stream_messages in messages:
for message_id, data in stream_messages:
try:
await handler(message_id, data)
await self.redis.xack(self.stream, self.group, message_id)
except Exception:
pass # Message redelivered on restart
"Just Use Postgres" Pattern
For simpler use cases - Postgres LISTEN/NOTIFY + FOR UPDATE SKIP LOCKED
from sqlalchemy import text
class PostgresQueue: """Simple queue using Postgres - good for moderate throughput."""
async def publish(self, db: AsyncSession, channel: str, payload: dict):
await db.execute(
text("SELECT pg_notify(:channel, :payload)"),
{"channel": channel, "payload": json.dumps(payload)}
)
async def get_next_job(self, db: AsyncSession) -> dict | None:
"""Get next job with advisory lock."""
result = await db.execute(text("""
SELECT id, payload FROM job_queue
WHERE status = 'pending'
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
"""))
return result.first()
Key Decisions
Technology Best For Throughput Ordering Persistence
Kafka Event streaming, logs, high-volume 100K+ msg/s Partition-level Excellent
RabbitMQ Task queues, RPC, routing ~50K msg/s Queue-level Good
Redis Streams Real-time, simple streaming ~100K msg/s Stream-level Good (AOF)
Postgres Moderate volume, simplicity ~10K msg/s Query-defined Excellent
When to Choose Each
┌────────────────────────────────────────────────────────────────────────┐ │ DECISION FLOWCHART │ ├────────────────────────────────────────────────────────────────────────┤ │ │ │ Need > 50K msg/s? │ │ YES → Kafka (partitioned, replicated) │ │ NO ↓ │ │ │ │ Need complex routing (topic, headers)? │ │ YES → RabbitMQ (exchanges, bindings) │ │ NO ↓ │ │ │ │ Need real-time + simple? │ │ YES → Redis Streams (XREAD, consumer groups) │ │ NO ↓ │ │ │ │ Already using Postgres + < 10K msg/s? │ │ YES → Postgres (LISTEN/NOTIFY + FOR UPDATE SKIP LOCKED) │ │ NO → Re-evaluate requirements │ │ │ └────────────────────────────────────────────────────────────────────────┘
Anti-Patterns (FORBIDDEN)
NEVER process without acknowledgment
async for msg in consumer: process(msg) # Message lost on failure!
NEVER use sync calls in handlers
def handle(msg): requests.post(url, data=msg) # Blocks event loop!
NEVER ignore ordering when required
await publish("orders", {"order_id": "123"}) # No partition key!
NEVER store large payloads
await publish("files", {"content": large_bytes}) # Use URL reference!
NEVER skip dead letter handling
except Exception: pass # Failed messages vanish!
NEVER choose Kafka for simple task queue
RabbitMQ or Redis is simpler for work distribution
NEVER use Redis Streams when strict delivery matters
Use RabbitMQ or Kafka for guaranteed delivery
Related Skills
-
outbox-pattern
-
Transactional outbox for reliable publishing
-
background-jobs
-
Celery/ARQ task processing
-
streaming-api-patterns
-
SSE/WebSocket real-time
-
observability-monitoring
-
Queue metrics and alerting
-
event-sourcing
-
Event store and CQRS patterns
Capability Details
kafka-streaming
Keywords: kafka, aiokafka, partition, consumer group, exactly-once, offset Solves:
-
How do I set up Kafka producers/consumers?
-
Partition key selection for ordering
-
Exactly-once semantics with idempotence
-
Consumer group rebalancing
rabbitmq-messaging
Keywords: rabbitmq, amqp, aio-pika, exchange, queue, topic, fanout, routing Solves:
-
How do I set up RabbitMQ pub/sub?
-
Exchange types and queue binding
-
Dead letter queue configuration
-
Message persistence and acknowledgment
redis-streams
Keywords: redis streams, xadd, xread, xreadgroup, consumer group, xack Solves:
-
How do I use Redis Streams?
-
Consumer group setup and message claiming
-
Stream trimming and retention
-
At-least-once delivery patterns
faststream-framework
Keywords: faststream, unified api, pydantic, asyncapi, broker Solves:
-
Unified API for Kafka/RabbitMQ/Redis
-
Automatic Pydantic serialization
-
AsyncAPI documentation generation
-
Dependency injection for handlers
postgres-queue
Keywords: postgres queue, listen notify, skip locked, simple queue Solves:
-
When to use Postgres instead of dedicated queue
-
LISTEN/NOTIFY for pub/sub
-
FOR UPDATE SKIP LOCKED for job queue