spring-kafka

Spring Kafka - Quick Reference

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "spring-kafka" with this command: npx skills add claude-dev-suite/claude-dev-suite/claude-dev-suite-claude-dev-suite-spring-kafka

Spring Kafka - Quick Reference

Deep Knowledge: Use mcp__documentation__fetch_docs with technology: kafka for comprehensive documentation.

Dependencies

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

Configuration

application.yml

spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.example.dto" producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all properties: enable.idempotence: true listener: ack-mode: manual concurrency: 3

Producer Pattern

KafkaTemplate

@Service @RequiredArgsConstructor public class OrderProducer {

private final KafkaTemplate&#x3C;String, OrderEvent> kafkaTemplate;

public void sendOrder(OrderEvent event) {
    kafkaTemplate.send("orders", event.getOrderId(), event)
        .whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("Failed to send order: {}", event.getOrderId(), ex);
            } else {
                log.info("Order sent: {} to partition {}",
                    event.getOrderId(),
                    result.getRecordMetadata().partition());
            }
        });
}

// With headers
public void sendWithHeaders(OrderEvent event, String correlationId) {
    ProducerRecord&#x3C;String, OrderEvent> record = new ProducerRecord&#x3C;>(
        "orders", event.getOrderId(), event);
    record.headers()
        .add("correlation-id", correlationId.getBytes())
        .add("source", "order-service".getBytes());

    kafkaTemplate.send(record);
}

}

Transactional Producer

@Configuration public class KafkaConfig {

@Bean
public ProducerFactory&#x3C;String, Object> producerFactory() {
    Map&#x3C;String, Object> config = new HashMap&#x3C;>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    return new DefaultKafkaProducerFactory&#x3C;>(config);
}

@Bean
public KafkaTemplate&#x3C;String, Object> kafkaTemplate() {
    return new KafkaTemplate&#x3C;>(producerFactory());
}

@Bean
public KafkaTransactionManager&#x3C;String, Object> kafkaTransactionManager() {
    return new KafkaTransactionManager&#x3C;>(producerFactory());
}

}

@Service @Transactional("kafkaTransactionManager") public class TransactionalProducer {

public void sendMultiple(List&#x3C;OrderEvent> events) {
    events.forEach(e -> kafkaTemplate.send("orders", e.getId(), e));
}

}

Consumer Patterns

Basic @KafkaListener

@Service @RequiredArgsConstructor public class OrderConsumer {

@KafkaListener(topics = "orders", groupId = "order-processor")
public void consume(
        @Payload OrderEvent event,
        @Header(KafkaHeaders.RECEIVED_KEY) String key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset,
        Acknowledgment ack) {

    log.info("Received order: {} from partition {} offset {}",
        event.getOrderId(), partition, offset);

    try {
        processOrder(event);
        ack.acknowledge();
    } catch (Exception e) {
        log.error("Failed to process order: {}", event.getOrderId(), e);
        throw e; // Will trigger retry
    }
}

}

Batch Consumer

@KafkaListener( topics = "orders", groupId = "batch-processor", containerFactory = "batchKafkaListenerContainerFactory" ) public void consumeBatch( List<OrderEvent> events, @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions, Acknowledgment ack) {

log.info("Received batch of {} orders", events.size());
processBatch(events);
ack.acknowledge();

}

@Bean public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> batchKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(AckMode.MANUAL); return factory; }

Class-Level Listener

@KafkaListener(topics = "orders", groupId = "order-handler") @Service public class OrderHandler {

@KafkaHandler
public void handleCreated(OrderCreatedEvent event) {
    // Handle order created
}

@KafkaHandler
public void handleUpdated(OrderUpdatedEvent event) {
    // Handle order updated
}

@KafkaHandler(isDefault = true)
public void handleDefault(Object event) {
    log.warn("Unknown event type: {}", event.getClass());
}

}

Retry Topics (Spring Kafka 3.x)

@RetryableTopic

@RetryableTopic( attempts = "3", backoff = @Backoff(delay = 1000, multiplier = 2.0, maxDelay = 10000), dltStrategy = DltStrategy.FAIL_ON_ERROR, autoCreateTopics = "true", topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE ) @KafkaListener(topics = "orders", groupId = "retry-consumer") public void consumeWithRetry(OrderEvent event, Acknowledgment ack) { processOrder(event); ack.acknowledge(); }

@DltHandler public void handleDlt(OrderEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {

log.error("DLT received: {} from {} - error: {}",
    event.getOrderId(), topic, errorMessage);
// Store in database for manual review
failedOrderRepository.save(new FailedOrder(event, errorMessage));

}

Manual Retry Configuration

@Configuration @EnableKafka public class KafkaRetryConfig {

@Bean
public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate&#x3C;String, Object> template) {
    return RetryTopicConfigurationBuilder
        .newInstance()
        .maxAttempts(4)
        .fixedBackOff(3000)
        .includeTopic("orders")
        .doNotAutoCreateRetryTopics()
        .create(template);
}

}

Error Handling

Custom Error Handler

@Bean public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) { // Send to DLT after 3 retries DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));

DefaultErrorHandler handler = new DefaultErrorHandler(recoverer,
    new FixedBackOff(1000L, 3L));

// Don't retry for these exceptions
handler.addNotRetryableExceptions(
    ValidationException.class,
    DeserializationException.class
);

return handler;

}

@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setCommonErrorHandler(errorHandler(kafkaTemplate())); return factory; }

Testing

@EmbeddedKafka

@SpringBootTest @EmbeddedKafka(partitions = 1, topics = {"orders"}) class OrderProducerTest {

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private OrderProducer orderProducer;

@Test
void shouldSendOrder() throws Exception {
    OrderEvent event = new OrderEvent("123", "CREATED");

    Map&#x3C;String, Object> consumerProps = KafkaTestUtils.consumerProps(
        "test-group", "true", embeddedKafka);
    ConsumerFactory&#x3C;String, OrderEvent> cf = new DefaultKafkaConsumerFactory&#x3C;>(consumerProps);
    Consumer&#x3C;String, OrderEvent> consumer = cf.createConsumer();
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders");

    orderProducer.sendOrder(event);

    ConsumerRecord&#x3C;String, OrderEvent> record = KafkaTestUtils.getSingleRecord(consumer, "orders");
    assertThat(record.value().getOrderId()).isEqualTo("123");
}

}

Consumer Test with CountDownLatch

@SpringBootTest @EmbeddedKafka(partitions = 1, topics = {"orders"}) class OrderConsumerTest {

@SpyBean
private OrderConsumer orderConsumer;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Test
void shouldConsumeAndProcessOrder() throws Exception {
    CountDownLatch latch = new CountDownLatch(1);
    doAnswer(inv -> { inv.callRealMethod(); latch.countDown(); return null; })
        .when(orderConsumer).consume(any(), any());

    Map&#x3C;String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
    KafkaTemplate&#x3C;String, String> template = new KafkaTemplate&#x3C;>(
        new DefaultKafkaProducerFactory&#x3C;>(props));
    template.send("orders", "{\"orderId\":\"456\",\"status\":\"CREATED\"}");

    assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    verify(orderConsumer).consume(any(), any());
}

}

Testcontainers

@SpringBootTest @Testcontainers class OrderIntegrationTest {

@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(
    DockerImageName.parse("apache/kafka-native:3.8.0"));

@Autowired
private KafkaTemplate&#x3C;String, OrderEvent> kafkaTemplate;

@Autowired
private OrderRepository orderRepository;

@Test
void shouldProcessOrderEndToEnd() throws Exception {
    kafkaTemplate.send("orders", "key-1",
        new OrderEvent("789", "CREATED")).get(10, TimeUnit.SECONDS);

    await().atMost(Duration.ofSeconds(10))
        .untilAsserted(() -> {
            Optional&#x3C;Order> order = orderRepository.findById("789");
            assertThat(order).isPresent();
            assertThat(order.get().getStatus()).isEqualTo("CREATED");
        });
}

}

Deep dive: For MockConsumer/MockProducer, @EmbeddedKafka advanced patterns, and Node.js/Python Kafka testing, see the messaging-testing-kafka skill.

Best Practices

Do Don't

Use acks=all for durability Use acks=0 in production

Enable idempotence Ignore duplicate messages

Configure DLT for failures Silently drop failed messages

Use manual acknowledgment Auto-commit without processing

Set proper deserializer trust Trust all packages

Production Checklist

  • acks=all configured

  • Idempotence enabled

  • Consumer group ID set

  • Manual acknowledgment mode

  • Retry topics configured

  • DLT handler implemented

  • Error handler configured

  • Proper serializers set

  • Trusted packages configured

  • Monitoring metrics exposed

When NOT to Use This Skill

  • Raw Kafka - Use kafka skill for broker config

  • RabbitMQ - Use spring-amqp instead

  • Simple messaging - Consider Spring Events

  • Kafka Streams - May need additional skill

Anti-Patterns

Anti-Pattern Problem Solution

Auto commit Message loss Use manual ack

No error handler Silent failures Configure error handler

No DLT Lost failed messages Add dead letter topic

Blocking in listener Consumer lag Use async processing

Wrong deserializer Errors on consume Match producer serializer

No idempotency Duplicate processing Implement idempotent consumer

Quick Troubleshooting

Problem Diagnostic Fix

Consumer not receiving Check group.id Verify consumer group

Serialization error Check value type Configure correct deserializer

Rebalancing often Check session.timeout Increase timeout

Consumer lag Check processing time Optimize or scale consumers

Messages in DLT Check error logs Fix processing error

Reference Documentation

  • Spring Kafka Reference

  • Spring Boot Kafka Docs

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

cron-scheduling

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

token-optimization

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

react-19

No summary provided by upstream source.

Repository SourceNeeds Review