messaging-testing-kafka

Kafka Integration Testing

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "messaging-testing-kafka" with this command: npx skills add claude-dev-suite/claude-dev-suite/claude-dev-suite-claude-dev-suite-messaging-testing-kafka

Kafka Integration Testing

Quick References: See quick-ref/embedded-kafka.md for @EmbeddedKafka details, quick-ref/testcontainers-kafka.md for Testcontainers patterns.

Testing Approach Selection

Approach Speed Fidelity Best For

@EmbeddedKafka Fast (~2s startup) High (real broker, in-process) Spring Boot unit/integration tests

MockConsumer/MockProducer Instant Low (no broker) Unit testing producer/consumer logic

Testcontainers KafkaContainer Slow (~10s startup) Highest (real Docker broker) Full integration tests, CI pipelines

Decision rule: Use @EmbeddedKafka for Spring tests by default. Use Testcontainers when you need specific Kafka versions, multi-broker clusters, or non-Spring projects. Use Mocks only for isolated unit tests.

Java/Spring: @EmbeddedKafka

Dependencies

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>

Producer Test

@SpringBootTest @EmbeddedKafka(partitions = 1, topics = {"orders"}) class OrderProducerTest {

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private OrderProducer orderProducer;

@Test
void shouldSendOrderEvent() {
    Map&#x3C;String, Object> consumerProps = KafkaTestUtils.consumerProps(
        "test-group", "true", embeddedKafka);
    consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    DefaultKafkaConsumerFactory&#x3C;String, OrderEvent> cf =
        new DefaultKafkaConsumerFactory&#x3C;>(consumerProps);
    Consumer&#x3C;String, OrderEvent> consumer = cf.createConsumer();
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders");

    orderProducer.sendOrder(new OrderEvent("123", "CREATED"));

    ConsumerRecord&#x3C;String, OrderEvent> record =
        KafkaTestUtils.getSingleRecord(consumer, "orders", Duration.ofSeconds(10));
    assertThat(record.value().getOrderId()).isEqualTo("123");
    assertThat(record.value().getStatus()).isEqualTo("CREATED");

    consumer.close();
}

}

Consumer Test with CountDownLatch

@SpringBootTest @EmbeddedKafka(partitions = 1, topics = {"orders"}) class OrderConsumerTest {

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@SpyBean
private OrderConsumer orderConsumer;

private CountDownLatch latch = new CountDownLatch(1);

@BeforeEach
void setup() {
    doAnswer(invocation -> {
        invocation.callRealMethod();
        latch.countDown();
        return null;
    }).when(orderConsumer).consume(any(), any());
}

@Test
void shouldConsumeOrderEvent() throws Exception {
    Map&#x3C;String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
    DefaultKafkaProducerFactory&#x3C;String, String> pf =
        new DefaultKafkaProducerFactory&#x3C;>(producerProps);
    KafkaTemplate&#x3C;String, String> template = new KafkaTemplate&#x3C;>(pf);

    template.send("orders", "{\"orderId\":\"456\",\"status\":\"CREATED\"}");

    assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    verify(orderConsumer).consume(any(), any());
}

}

Partition Assignment Wait

// Wait for consumer to be assigned partitions before sending ContainerTestUtils.waitForAssignment( listenerContainer, embeddedKafka.getPartitionsPerTopic());

Java/Spring: MockConsumer/MockProducer

MockProducer (Unit Testing)

class OrderProducerUnitTest {

private MockProducer&#x3C;String, String> mockProducer;
private KafkaTemplate&#x3C;String, String> kafkaTemplate;

@BeforeEach
void setup() {
    mockProducer = new MockProducer&#x3C;>(true, new StringSerializer(), new StringSerializer());
    ProducerFactory&#x3C;String, String> pf = new MockProducerFactory&#x3C;>(mockProducer);
    kafkaTemplate = new KafkaTemplate&#x3C;>(pf);
}

@Test
void shouldSendToCorrectTopic() {
    kafkaTemplate.send("orders", "key", "value");

    assertThat(mockProducer.history()).hasSize(1);
    assertThat(mockProducer.history().get(0).topic()).isEqualTo("orders");
    assertThat(mockProducer.history().get(0).key()).isEqualTo("key");
}

}

MockConsumer (Unit Testing)

class OrderConsumerUnitTest {

private MockConsumer&#x3C;String, String> mockConsumer;

@BeforeEach
void setup() {
    mockConsumer = new MockConsumer&#x3C;>(OffsetResetStrategy.EARLIEST);
}

@Test
void shouldProcessRecords() {
    mockConsumer.assign(List.of(new TopicPartition("orders", 0)));
    mockConsumer.updateBeginningOffsets(Map.of(new TopicPartition("orders", 0), 0L));

    mockConsumer.addRecord(new ConsumerRecord&#x3C;>("orders", 0, 0L, "key", "{\"orderId\":\"1\"}"));

    ConsumerRecords&#x3C;String, String> records = mockConsumer.poll(Duration.ofMillis(100));
    assertThat(records.count()).isEqualTo(1);
}

}

Java/Spring: Testcontainers KafkaContainer

With @ServiceConnection (Spring Boot 3.1+)

@SpringBootTest @Testcontainers class KafkaIntegrationTest {

@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(
    DockerImageName.parse("apache/kafka-native:3.8.0"));

@Autowired
private KafkaTemplate&#x3C;String, OrderEvent> kafkaTemplate;

@Autowired
private OrderRepository orderRepository;

@Test
void shouldProduceAndConsumeOrder() throws Exception {
    OrderEvent event = new OrderEvent("789", "CREATED");
    kafkaTemplate.send("orders", event.getOrderId(), event).get(10, TimeUnit.SECONDS);

    await().atMost(Duration.ofSeconds(10))
        .untilAsserted(() -> {
            Optional&#x3C;Order> order = orderRepository.findById("789");
            assertThat(order).isPresent();
            assertThat(order.get().getStatus()).isEqualTo("CREATED");
        });
}

}

With @DynamicPropertySource (pre-3.1)

@DynamicPropertySource static void kafkaProperties(DynamicPropertyRegistry registry) { registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); }

Dependencies

<dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> </dependency>

Node.js: kafkajs + Testcontainers

import { KafkaContainer } from "@testcontainers/kafka"; import { Kafka } from "kafkajs";

describe("Kafka Integration", () => { let container: StartedTestContainer; let kafka: Kafka;

beforeAll(async () => { container = await new KafkaContainer("apache/kafka-native:3.8.0").start(); kafka = new Kafka({ brokers: [container.getBootstrapServers()] }); }, 60_000);

afterAll(async () => { await container.stop(); });

it("should produce and consume messages", async () => { const admin = kafka.admin(); await admin.connect(); await admin.createTopics({ topics: [{ topic: "test-topic", numPartitions: 1 }] }); await admin.disconnect();

const producer = kafka.producer();
await producer.connect();
await producer.send({
  topic: "test-topic",
  messages: [{ key: "key1", value: JSON.stringify({ orderId: "123" }) }],
});
await producer.disconnect();

const messages: any[] = [];
const consumer = kafka.consumer({ groupId: "test-group" });
await consumer.connect();
await consumer.subscribe({ topic: "test-topic", fromBeginning: true });
await consumer.run({
  eachMessage: async ({ message }) => {
    messages.push(JSON.parse(message.value!.toString()));
  },
});

await new Promise((r) => setTimeout(r, 2000));
expect(messages).toHaveLength(1);
expect(messages[0].orderId).toBe("123");

await consumer.disconnect();

}); });

Python: confluent-kafka + Testcontainers

import pytest from testcontainers.kafka import KafkaContainer from confluent_kafka import Producer, Consumer

@pytest.fixture(scope="module") def kafka_container(): with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka: yield kafka

@pytest.fixture def bootstrap_servers(kafka_container): return kafka_container.get_bootstrap_server()

def test_produce_and_consume(bootstrap_servers): producer = Producer({"bootstrap.servers": bootstrap_servers}) producer.produce("test-topic", key="key1", value=b'{"orderId": "123"}') producer.flush()

consumer = Consumer({
    "bootstrap.servers": bootstrap_servers,
    "group.id": "test-group",
    "auto.offset.reset": "earliest",
})
consumer.subscribe(["test-topic"])

msg = consumer.poll(timeout=10.0)
assert msg is not None
assert msg.error() is None
assert b"123" in msg.value()
consumer.close()

Anti-Patterns

Anti-Pattern Problem Solution

Not waiting for partition assignment Consumer misses messages Use ContainerTestUtils.waitForAssignment()

Hardcoded timeouts too short Flaky tests Use await() with atMost() or generous timeouts

Shared topic names across tests Tests interfere Use unique topic names per test or @DirtiesContext

Not closing consumers in tests Resource leaks, port exhaustion Always close in @AfterEach or try-with-resources

Using auto.offset.reset=latest in tests Consumer misses messages sent before subscription Use earliest for test consumers

Quick Troubleshooting

Problem Cause Solution

"No records found" Consumer not assigned partitions Wait for assignment, use earliest offset reset

EmbeddedKafka port conflict Multiple test classes sharing broker Use @DirtiesContext or EmbeddedKafkaHolder pattern

Deserialization errors Mismatched serializer config Set spring.json.trusted.packages in test properties

Testcontainers timeout Docker not running or slow pull Check Docker, increase startup timeout

Consumer lag in tests Consumer not started before produce Start consumer first, then produce

Reference Documentation

  • Spring Kafka Testing

  • Testcontainers Kafka Module

  • kafkajs Testing

Cross-reference: For Spring Kafka producer/consumer patterns, see spring-kafka skill. For generic Testcontainers patterns, see testcontainers skill.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

cron-scheduling

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

token-optimization

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

react-19

No summary provided by upstream source.

Repository SourceNeeds Review