Streaming Data Systems
Real-time data ingestion and stream processing with Apache Kafka, MQTT, and NATS JetStream. Covers producers, consumers, and stream processing patterns for data engineering pipelines.
Quick Comparison
Feature Apache Kafka MQTT NATS JetStream
Use Case High-throughput event streaming IoT, mobile, constrained devices Cloud-native, microservices
Throughput Millions/sec Thousands/sec Hundreds of thousands/sec
Durability Disk-based log, replayable Ephemeral (configurable) Disk-based persistence
Ordering Per-partition N/A (topic-based) Per-subject
Python Client confluent-kafka paho-mqtt nats-py
Best For Event sourcing, CDC, log aggregation Sensor data, telemetry Service-to-service messaging
When to Use Which?
-
Kafka: High-volume event streams, log aggregation, CDC, data lake ingestion
-
MQTT: IoT devices, mobile push, constrained networks
-
NATS JetStream: Microservices, request-reply, cloud-native, simpler ops than Kafka
Skill Dependencies
-
@data-engineering-core
-
Process stream data with Polars/DuckDB
-
@data-engineering-orchestration
-
Orchestrate stream processing jobs
-
@data-engineering-quality
-
Validate streaming data
-
@data-engineering-storage-lakehouse
-
Persist streams to Delta/Iceberg
Detailed Guides
Apache Kafka
Install: pip install confluent-kafka
Producer
from confluent_kafka import Producer import socket import json
def delivery_report(err, msg): if err is not None: print(f"Delivery failed: {err}") else: print(f"Delivered to {msg.topic()} [{msg.partition()}]")
conf = { 'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname(), 'acks': 'all' # Wait for all replicas }
producer = Producer(conf)
Send messages asynchronously
for i in range(100): data = {'id': i, 'event': 'user_activity', 'value': i * 10} producer.produce( topic='user_activity_events', key=str(i), value=json.dumps(data).encode('utf-8'), callback=delivery_report ) producer.poll(0) # Trigger callbacks
producer.flush() # Wait for delivery
With Schema Registry (Avro)
from confluent_kafka.serialization import StringSerializer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'}) avro_serializer = AvroSerializer(schema_registry_client, schema_str)
producer = SerializingProducer({ 'bootstrap.servers': 'localhost:9092', 'key.serializer': StringSerializer('utf_8'), 'value.serializer': avro_serializer, })
Consumer
from confluent_kafka import Consumer, KafkaError
conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'my_consumer_group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, # Manual commit 'max.poll.interval.ms': 300000 }
consumer = Consumer(conf) consumer.subscribe(['user_activity_events'])
try: while True: msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise KafkaException(msg.error())
# Process message
data = json.loads(msg.value().decode('utf-8'))
print(f"Received: {data}")
# Manual commit after processing
consumer.commit(asynchronous=False)
except KeyboardInterrupt: pass finally: consumer.close()
Stream Processing (ksqlDB pattern)
For complex transformations, use ksqlDB or Kafka Streams. REST API example:
import requests
ksql_query = { "ksql": """ SELECT id, COUNT(*) AS event_count, SUM(value) AS total_value FROM user_activity_events WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY id EMIT CHANGES """ }
response = requests.post("http://localhost:8088/query", json=ksql_query) for line in response.iter_lines(): print(line)
MQTT for IoT
Install: pip install paho-mqtt
Publisher
import paho.mqtt.client as mqtt import json import time
broker = "broker.emqx.io" topic = "iot/sensors/temperature"
client = mqtt.Client(client_id="publisher_1", protocol=mqtt.MQTTv5)
def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to broker") else: print(f"Connection failed: {rc}")
client.on_connect = on_connect client.connect(broker) client.loop_start()
for i in range(10): payload = { "sensor_id": "temp_sensor_1", "temperature": 20 + i * 0.5, "humidity": 45 + i, "timestamp": time.time() } client.publish( topic=topic, payload=json.dumps(payload), qos=1 # At-least-once delivery ) time.sleep(5)
client.loop_stop() client.disconnect()
Subscriber
def on_connect(client, userdata, flags, rc): client.subscribe(topic, qos=1)
def on_message(client, userdata, msg): payload = json.loads(msg.payload.decode("utf-8")) print(f"[{msg.topic}] {payload}")
client.on_connect = on_connect client.on_message = on_message client.connect(broker) client.loop_forever()
NATS JetStream
Install: pip install nats-py
Producer/Consumer
import asyncio import nats
async def main(): nc = await nats.connect("nats://localhost:4222") js = nc.jetstream()
# Create stream
await js.add_stream(
name="events",
subjects=["events.*"],
storage="file",
max_msgs=10000,
max_age=3600
)
# Publish
await js.publish("events.page_loaded", b'{"page": "/home"}')
# Push consumer
sub = await js.subscribe("events.*", durable="worker-1")
async for msg in sub:
print(f"Received: {msg.data.decode()}")
await msg.ack()
await nc.close()
asyncio.run(main())
Work Queue Pattern
async def worker(name: str): nc = await nats.connect("nats://localhost:4222") js = nc.jetstream() sub = await js.subscribe("jobs.*", durable="workers", queue_group="processing")
async for msg in sub:
job_data = msg.data.decode()
print(f"Worker {name} processing: {job_data}")
await msg.ack()
await nc.close()
Production Patterns
Idempotent Processing
Stream processors may receive duplicates. Design idempotent consumers:
processed_ids = load_checkpoint() # From DB/Redis if msg.id in processed_ids: ack() # Skip duplicate else: process(msg) save_checkpoint(msg.id) ack()
Batch Processing
Accumulate messages before writing to reduce DB load
batch = [] while True: msg = consumer.poll(timeout=0.1) if msg: batch.append(msg.value()) if len(batch) >= BATCH_SIZE or timeout_reached: write_to_db(batch) consumer.commit() # Commit after batch write batch.clear()
Error Handling (Dead Letter Queue)
try: process(msg) except Exception as e: if is_retryable(e): nack(requeue=True) # Retry else: produce_to_dlq(msg, str(e)) # Send to dead letter queue ack()
Schema Evolution
-
Use Avro/Protobuf with Schema Registry for compatibility
-
Evolve schemas additively (new fields optional, old fields preserved)
-
Register schemas per topic
References
-
Apache Kafka Documentation
-
confluent-kafka Python Client
-
MQTT 5.0 Specification
-
NATS JetStream Documentation
-
@data-engineering-orchestration
-
Orchestrate consumers/producers as flows