kafka

Apache Kafka Core Knowledge

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "kafka" with this command: npx skills add claude-dev-suite/claude-dev-suite/claude-dev-suite-claude-dev-suite-kafka

Apache Kafka Core Knowledge

Deep Knowledge: Use mcp__documentation__fetch_docs with technology: kafka for comprehensive documentation.

Quick Start (Docker)

docker-compose.yml

services: kafka: image: bitnami/kafka:latest ports: - "9092:9092" environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

Start

docker-compose up -d

Create topic

docker exec kafka kafka-topics.sh --create --topic my-topic
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

Core Concepts

Concept Description

Topic Named stream of records, append-only log

Partition Ordered, immutable sequence within topic

Offset Unique ID for record within partition

Consumer Group Set of consumers sharing topic consumption

Broker Kafka server handling storage and requests

Replication Factor Number of partition copies across brokers

Architecture

┌─────────────────────────────────────────────────────────┐ │ Kafka Cluster │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │Broker 0 │ │Broker 1 │ │Broker 2 │ │ │ │ P0(L) │ │ P0(F) │ │ P1(L) │ │ │ │ P1(F) │ │ P1(F) │ │ P0(F) │ │ │ └─────────┘ └─────────┘ └─────────┘ │ └─────────────────────────────────────────────────────────┘ ▲ │ │ ▼ ┌──────────┐ ┌──────────────┐ │ Producer │ │Consumer Group│ └──────────┘ │ C1 C2 C3 │ └──────────────┘

Producer Patterns

Node.js (kafkajs)

import { Kafka, Partitioners } from 'kafkajs';

const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092'], });

const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner, idempotent: true, // Enable exactly-once });

await producer.connect();

// Send single message await producer.send({ topic: 'orders', messages: [ { key: orderId, // Partition key value: JSON.stringify(order), headers: { 'correlation-id': correlationId, 'source': 'order-service', }, }, ], });

// Batch send await producer.sendBatch({ topicMessages: [ { topic: 'orders', messages: orders.map(o => ({ key: o.id, value: JSON.stringify(o), })), }, ], });

await producer.disconnect();

Java (Spring Kafka)

@Configuration public class KafkaConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); config.put(ProducerConfig.ACKS_CONFIG, "all"); return new DefaultKafkaProducerFactory<>(config); }

@Bean
public KafkaTemplate&#x3C;String, String> kafkaTemplate() {
    return new KafkaTemplate&#x3C;>(producerFactory());
}

}

@Service public class OrderProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate;

public void sendOrder(Order order) {
    kafkaTemplate.send("orders", order.getId(), objectMapper.writeValueAsString(order))
        .whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("Failed to send order", ex);
            }
        });
}

}

Python (confluent-kafka)

from confluent_kafka import Producer import json

conf = { 'bootstrap.servers': 'localhost:9092', 'client.id': 'my-app', 'acks': 'all', 'enable.idempotence': True, }

producer = Producer(conf)

def delivery_callback(err, msg): if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to {msg.topic()}[{msg.partition()}]')

Send message

producer.produce( topic='orders', key=order_id.encode('utf-8'), value=json.dumps(order).encode('utf-8'), callback=delivery_callback, headers={'correlation-id': correlation_id} )

producer.flush() # Wait for delivery

Go (segmentio/kafka-go)

package main

import ( "context" "encoding/json" "github.com/segmentio/kafka-go" )

func main() { writer := &kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "orders", Balancer: &kafka.LeastBytes{}, RequiredAcks: kafka.RequireAll, }

defer writer.Close()

order := Order{ID: "123", Amount: 100}
value, _ := json.Marshal(order)

err := writer.WriteMessages(context.Background(),
    kafka.Message{
        Key:   []byte(order.ID),
        Value: value,
        Headers: []kafka.Header{
            {Key: "correlation-id", Value: []byte("abc123")},
        },
    },
)

}

Consumer Patterns

Node.js (kafkajs)

const consumer = kafka.consumer({ groupId: 'order-processor', sessionTimeout: 30000, heartbeatInterval: 3000, });

await consumer.connect(); await consumer.subscribe({ topics: ['orders'], fromBeginning: false });

await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const order = JSON.parse(message.value.toString()); const correlationId = message.headers['correlation-id']?.toString();

try {
  await processOrder(order);
  // Auto-commit on success
} catch (error) {
  // Handle error - message will be redelivered
  throw error;
}

}, });

// Manual commit await consumer.run({ autoCommit: false, eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => { for (const message of batch.messages) { await processMessage(message); resolveOffset(message.offset); } await commitOffsetsIfNecessary(); }, });

Java (Spring Kafka)

@Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return new DefaultKafkaConsumerFactory<>(config); }

@Bean
public ConcurrentKafkaListenerContainerFactory&#x3C;String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory&#x3C;String, String> factory =
        new ConcurrentKafkaListenerContainerFactory&#x3C;>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    return factory;
}

}

@Service public class OrderConsumer { @KafkaListener(topics = "orders", groupId = "order-processor") public void consume( @Payload String message, @Header(KafkaHeaders.RECEIVED_KEY) String key, @Header("correlation-id") String correlationId, Acknowledgment ack) {

    Order order = objectMapper.readValue(message, Order.class);
    processOrder(order);
    ack.acknowledge();  // Manual commit
}

}

Python (confluent-kafka)

from confluent_kafka import Consumer

conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'order-processor', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, }

consumer = Consumer(conf) consumer.subscribe(['orders'])

try: while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): print(f"Consumer error: {msg.error()}") continue

    order = json.loads(msg.value().decode('utf-8'))
    process_order(order)
    consumer.commit(msg)  # Manual commit

finally: consumer.close()

Topic Configuration

Create topic with configuration

kafka-topics.sh --create --topic orders
--bootstrap-server localhost:9092
--partitions 12
--replication-factor 3
--config retention.ms=604800000
--config cleanup.policy=delete
--config min.insync.replicas=2

Alter topic config

kafka-configs.sh --alter --topic orders
--bootstrap-server localhost:9092
--add-config retention.ms=172800000

Config Description Production Value

partitions

Parallelism level 3x consumer instances

replication.factor

Durability 3 (minimum)

min.insync.replicas

Write guarantee 2

retention.ms

Message retention 7 days (604800000)

cleanup.policy

delete or compact Depends on use case

When NOT to Use This Skill

Use alternative messaging solutions when:

  • Simple request/reply patterns - RabbitMQ or ActiveMQ are better suited

  • Low message volume (< 1000 msg/s) - Simpler brokers have less operational overhead

  • Strict message ordering across all messages - Use single partition or different broker

  • Serverless/managed services preferred - Use cloud-native options (SQS, Pub/Sub, Service Bus)

  • JMS compliance required - Use ActiveMQ

  • Lightweight microservices - NATS provides simpler operations

  • Primarily caching with messaging - Redis Pub/Sub may be sufficient

Anti-Patterns

Anti-Pattern Why It's Bad Solution

Single partition for scale Limits throughput to one consumer Use multiple partitions (3x consumer count)

No replication factor Data loss on broker failure Set replication.factor >= 3

acks=1 in production Messages can be lost Use acks=all with min.insync.replicas=2

Large messages (>1MB) Broker memory pressure Use external storage, send reference

Auto-commit offsets Duplicate/lost messages on crash Manual commit after processing

No consumer group Can't scale consumers Always use consumer groups

Synchronous send Poor throughput Use async with callbacks

No dead letter topic Failed messages lost Configure DLT for poison messages

Topic per message type Topic explosion Use fewer topics with message headers

No monitoring Invisible consumer lag Monitor lag, throughput, errors

Quick Troubleshooting

Issue Likely Cause Fix

Consumer lag growing Slow processing or insufficient consumers Add consumers, optimize processing

Messages not arriving Topic doesn't exist or wrong name Verify topic with kafka-topics --list

Duplicate messages Consumer crash before commit Implement idempotent processing

Out of order messages Multiple partitions Use single partition or partition key

"Leader not available" Broker down or partition reassignment Check broker health, wait for leader election

High latency Network issues or under-replicated Check UnderReplicatedPartitions metric

Producer timeout Broker overload or network Increase request.timeout.ms , check broker load

Offset commit failed Rebalance in progress Increase session.timeout.ms

Serialization errors Schema mismatch Use schema registry, validate messages

Disk full Retention too long or high volume Adjust retention, add disk, compact logs

Production Readiness

Security Configuration

Server (server.properties)

listeners=SASL_SSL://0.0.0.0:9093 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN

ssl.keystore.location=/path/to/keystore.jks ssl.keystore.password=password ssl.key.password=password ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=password

ACLs

authorizer.class.name=kafka.security.authorizer.AclAuthorizer super.users=User:admin

// Client with SASL/SSL const kafka = new Kafka({ clientId: 'my-app', brokers: ['kafka:9093'], ssl: { rejectUnauthorized: true, ca: [fs.readFileSync('/path/to/ca.pem')], }, sasl: { mechanism: 'plain', username: 'user', password: 'password', }, });

Monitoring Metrics

Metric Alert Threshold

Consumer lag

10000 messages

Under-replicated partitions

0

Request latency p99

100ms

Broker disk usage

80%

Active controller count != 1

Producer Best Practices

const producer = kafka.producer({ idempotent: true, // Exactly-once maxInFlightRequests: 5, // Ordering with idempotence retry: { retries: 5, initialRetryTime: 100, maxRetryTime: 30000, }, });

Consumer Best Practices

const consumer = kafka.consumer({ groupId: 'order-processor', sessionTimeout: 30000, // Failure detection heartbeatInterval: 3000, // Session keepalive maxBytesPerPartition: 1048576, // 1MB per partition retry: { retries: 5, }, });

Checklist

  • TLS/SSL encryption enabled

  • SASL authentication configured

  • ACLs defined for topics

  • Replication factor >= 3

  • min.insync.replicas = 2

  • Consumer lag monitoring

  • Dead letter topic configured

  • Schema registry for evolution

  • Idempotent producers enabled

  • Proper partition key strategy

Reference Documentation

Deep Knowledge: Use mcp__documentation__fetch_docs with technology: kafka for comprehensive documentation.

Available topics: basics , producers , consumers , streams , connect , configuration , production

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

cron-scheduling

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

token-optimization

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

react-19

No summary provided by upstream source.

Repository SourceNeeds Review