Kafka Setup Skill
Quick Start
-
Read Phase 5 Constitution - constitution-prompt-phase-5.md
-
Choose deployment - Strimzi (local) or Redpanda Cloud (production)
-
Install Strimzi operator - For Kubernetes deployment
-
Create Kafka cluster - Using Strimzi CRDs
-
Create topics - task-events, reminder-events, audit-events
-
Configure Dapr component - Connect Dapr to Kafka
Deployment Options
Option Environment Use Case
Strimzi Minikube/DOKS Full Kubernetes-native Kafka
Redpanda Cloud Production Managed Kafka-compatible streaming
Docker Compose Local Dev Quick local development
Strimzi Installation (Kubernetes)
Install Strimzi Operator
Create namespace
kubectl create namespace kafka
Install Strimzi operator
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
Wait for operator to be ready
kubectl wait deployment/strimzi-cluster-operator
--for=condition=available
--timeout=300s
-n kafka
Create Kafka Cluster
Create kafka/kafka-cluster.yaml :
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: todo-kafka namespace: kafka spec: kafka: version: 3.6.0 replicas: 3 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 inter.broker.protocol.version: "3.6" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 10Gi deleteClaim: false resources: requests: memory: 1Gi cpu: "500m" limits: memory: 2Gi cpu: "1" zookeeper: replicas: 3 storage: type: persistent-claim size: 5Gi deleteClaim: false resources: requests: memory: 512Mi cpu: "250m" limits: memory: 1Gi cpu: "500m" entityOperator: topicOperator: {} userOperator: {}
Minikube-Optimized Cluster (Single Node)
Create kafka/kafka-cluster-minikube.yaml :
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: todo-kafka namespace: kafka spec: kafka: version: 3.6.0 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 storage: type: ephemeral resources: requests: memory: 512Mi cpu: "250m" limits: memory: 1Gi cpu: "500m" zookeeper: replicas: 1 storage: type: ephemeral resources: requests: memory: 256Mi cpu: "100m" limits: memory: 512Mi cpu: "250m" entityOperator: topicOperator: {}
Create Kafka Topics
Create kafka/kafka-topics.yaml :
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: task-events namespace: kafka labels: strimzi.io/cluster: todo-kafka spec: partitions: 3 replicas: 1 config: retention.ms: 604800000 # 7 days cleanup.policy: delete
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: reminder-events namespace: kafka labels: strimzi.io/cluster: todo-kafka spec: partitions: 3 replicas: 1 config: retention.ms: 604800000 cleanup.policy: delete
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: audit-events namespace: kafka labels: strimzi.io/cluster: todo-kafka spec: partitions: 3 replicas: 1 config: retention.ms: 2592000000 # 30 days cleanup.policy: delete
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: task-updates namespace: kafka labels: strimzi.io/cluster: todo-kafka spec: partitions: 3 replicas: 1 config: retention.ms: 86400000 # 1 day cleanup.policy: delete
Docker Compose (Local Development)
Add to docker-compose.yaml :
services: zookeeper: image: confluentinc/cp-zookeeper:7.5.0 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" networks: - todo-network
kafka: image: confluentinc/cp-kafka:7.5.0 container_name: kafka depends_on: - zookeeper ports: - "9092:9092" - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 networks: - todo-network
kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui depends_on: - kafka ports: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 networks: - todo-network
Redpanda Cloud Setup (Production)
Create Redpanda Cloud Cluster
-
Sign up at https://cloud.redpanda.com
-
Create a new cluster (Dedicated or Serverless)
-
Get connection credentials
Configure Dapr for Redpanda
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: taskpubsub namespace: todo-app spec: type: pubsub.kafka version: v1 metadata: - name: brokers secretKeyRef: name: redpanda-secrets key: brokers - name: authType value: "password" - name: saslUsername secretKeyRef: name: redpanda-secrets key: username - name: saslPassword secretKeyRef: name: redpanda-secrets key: password - name: saslMechanism value: "SCRAM-SHA-256" - name: tls value: "true"
Dapr Pub/Sub Component
Create dapr-components/pubsub.yaml :
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: taskpubsub namespace: todo-app spec: type: pubsub.kafka version: v1 metadata: - name: brokers value: "todo-kafka-kafka-bootstrap.kafka.svc.cluster.local:9092" - name: consumerGroup value: "todo-consumer-group" - name: authType value: "none" - name: disableTls value: "true" - name: maxMessageBytes value: "1048576" - name: consumeRetryInterval value: "100ms" scopes:
- backend
- notification-service
- recurring-service
- audit-service
- websocket-service
Python Kafka Client (Direct)
Installation
uv add aiokafka
Producer
from aiokafka import AIOKafkaProducer import json
class KafkaEventProducer: def init(self, bootstrap_servers: str): self.producer = AIOKafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8') )
async def start(self):
await self.producer.start()
async def stop(self):
await self.producer.stop()
async def publish(self, topic: str, event: dict):
await self.producer.send_and_wait(topic, event)
Consumer
from aiokafka import AIOKafkaConsumer import json
class KafkaEventConsumer: def init(self, bootstrap_servers: str, topics: list[str], group_id: str): self.consumer = AIOKafkaConsumer( *topics, bootstrap_servers=bootstrap_servers, group_id=group_id, value_deserializer=lambda v: json.loads(v.decode('utf-8')) )
async def start(self):
await self.consumer.start()
async def stop(self):
await self.consumer.stop()
async def consume(self):
async for msg in self.consumer:
yield msg.topic, msg.value
Verification Checklist
-
Kafka cluster running (Strimzi or Docker)
-
All topics created (task-events, reminder-events, audit-events, task-updates)
-
Dapr component configured
-
Producer can publish messages
-
Consumer receives messages
-
Kafka UI accessible (optional)
-
Redpanda Cloud configured (for production)
Topic Schema
task-events
{ "event_type": "task.created | task.updated | task.deleted | task.completed", "task_id": "uuid", "user_id": "uuid", "task": { "id": "uuid", "title": "string", "description": "string", "priority": "low | medium | high", "status": "pending | in_progress | completed", "due_date": "ISO8601", "tags": ["string"] }, "timestamp": "ISO8601" }
reminder-events
{ "event_type": "reminder.triggered | reminder.created | reminder.deleted", "reminder_id": "uuid", "task_id": "uuid", "user_id": "uuid", "message": "string", "timestamp": "ISO8601" }
Troubleshooting
Issue Cause Solution
Broker not reachable Wrong address Use internal K8s DNS
Topic not found Not created Apply KafkaTopic CRD
Consumer lag Slow processing Scale consumers
Auth failed Wrong credentials Check secrets
References
-
Strimzi Documentation
-
Apache Kafka
-
Redpanda Cloud
-
Dapr Kafka Pub/Sub
-
Phase 5 Constitution