kafka-producer-implementation

Implement type-safe Kafka producers for event streaming with msgspec serialization. Use when building async/await producers that publish domain events (orders, transactions, etc.) with schema validation, error handling, retry logic, and distributed tracing. Handles producer configuration, idempotent writes, and graceful shutdown.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "kafka-producer-implementation" with this command: npx skills add dawiddutoit/custom-claude/dawiddutoit-custom-claude-kafka-producer-implementation

Kafka Producer Implementation

Purpose

Implement production-grade Kafka producers that reliably publish domain events with high performance, type safety, and comprehensive error handling. Covers msgspec serialization, confluent-kafka configuration, OpenTelemetry tracing, and anti-corruption layer patterns for translating domain models to message schemas.

When to Use This Skill

Use when building event publishers that send domain events to Kafka topics with "implement Kafka producer", "publish events to Kafka", "send order events", or "create event publisher".

Do NOT use for consuming events (use kafka-consumer-implementation), testing with testcontainers (use kafka-integration-testing), or designing schemas (use kafka-schema-management).

Quick Start

Create a high-performance Kafka producer in 3 steps:

  1. Define message schema:
import msgspec

class OrderEventMessage(msgspec.Struct, frozen=True):
    order_id: str
    created_at: str  # ISO 8601
    customer_name: str
    total_price: float
  1. Implement producer:
from confluent_kafka import Producer
import msgspec

class OrderEventPublisher:
    def __init__(self, brokers: list[str], topic: str) -> None:
        config = {
            "bootstrap.servers": ",".join(brokers),
            "acks": "all",
            "enable.idempotence": True,
            "compression.type": "snappy",
        }
        self.producer = Producer(config)
        self.topic = topic
        self.encoder = msgspec.json.Encoder()

    def publish(self, event: OrderEventMessage) -> None:
        payload = self.encoder.encode(event)
        self.producer.produce(
            topic=self.topic,
            key=event.order_id.encode("utf-8"),
            value=payload,
        )
        self.producer.poll(0)

    def close(self) -> None:
        self.producer.flush(10.0)
  1. Use in application:
publisher = OrderEventPublisher(["localhost:9092"], "orders")
publisher.publish(order_event)
publisher.close()

Implementation Steps

1. Message Schema with msgspec

Define immutable schemas using msgspec.Struct for 10-20x faster serialization:

import msgspec

class LineItemEventMessage(msgspec.Struct, frozen=True):
    line_item_id: str
    product_id: str
    product_title: str
    quantity: int
    price: float

class OrderEventMessage(msgspec.Struct, frozen=True):
    order_id: str
    created_at: str  # ISO 8601 format string
    customer_name: str
    line_items: list[LineItemEventMessage]
    total_price: float

Key Points:

  • Use frozen=True for immutability
  • Use primitive types (str, float, int) not custom objects
  • Store timestamps as ISO 8601 strings
  • msgspec produces JSON bytes automatically

2. Producer Adapter

Implement producer with error handling and tracing:

import msgspec
from confluent_kafka import Producer, KafkaException
from opentelemetry import trace
from structlog import get_logger

class OrderEventPublisher:
    """Publishes order events with high performance and reliability.

    Features:
    - msgspec serialization (10-20x faster than Pydantic)
    - OpenTelemetry distributed tracing
    - Idempotent exactly-once semantics
    - Message ordering guarantees (by order_id key)

    Configuration:
    - acks=all: Wait for all in-sync replicas
    - enable.idempotence=True: Exactly-once-per-send
    - max.in.flight.requests.per.connection=1: Preserve order
    - compression.type=snappy: Balance CPU/network
    """

    def __init__(self, brokers: list[str], topic: str) -> None:
        self.topic = topic
        self.logger = get_logger(__name__)
        self.tracer = trace.get_tracer(__name__)
        self.encoder = msgspec.json.Encoder()

        config = {
            "bootstrap.servers": ",".join(brokers),
            "acks": "all",
            "retries": 5,
            "max.in.flight.requests.per.connection": 1,
            "compression.type": "snappy",
            "enable.idempotence": True,
        }

        self.producer = Producer(config)

    def publish_order(self, event: OrderEventMessage) -> None:
        """Publish order event with order_id as partition key."""
        with self.tracer.start_as_current_span("publish_order") as span:
            span.set_attribute("order_id", event.order_id)

            payload = self.encoder.encode(event)
            self.producer.produce(
                topic=self.topic,
                key=event.order_id.encode("utf-8"),
                value=payload,
                on_delivery=self._delivery_callback,
            )
            self.producer.poll(0)

    def _delivery_callback(self, err, msg):
        """Handle delivery callback."""
        if err:
            self.logger.error("delivery_failed", error=str(err))
        else:
            self.logger.debug("message_delivered", partition=msg.partition(), offset=msg.offset())

    def flush(self, timeout: float = 10.0) -> None:
        """Flush all pending messages."""
        remaining = self.producer.flush(timeout)
        if remaining > 0:
            raise KafkaProducerError(f"Failed to flush {remaining} messages")

    def close(self) -> None:
        """Close producer and release resources."""
        self.flush()

See references/detailed-implementation.md for complete producer adapter code with full error handling.

3. Anti-Corruption Layer

Translate domain models to message schemas:

class OrderEventTranslator:
    """Translates domain Order to message schema.

    Anti-corruption layer that:
    - Converts domain entities to message DTOs
    - Handles type conversions (OrderId -> str, Money -> float)
    - Preserves timestamp information
    """

    @staticmethod
    def to_event_message(order: Order) -> OrderEventMessage:
        line_items = [
            LineItemEventMessage(
                line_item_id=item.line_item_id,
                product_id=str(item.product_id),
                product_title=str(item.product_title),
                quantity=item.quantity,
                price=float(item.price.amount),
            )
            for item in order.line_items
        ]

        return OrderEventMessage(
            order_id=str(order.order_id),
            created_at=order.created_at.isoformat(),
            customer_name=order.customer_name,
            line_items=line_items,
            total_price=float(order.total_price.amount),
        )

4. Use Case Integration

Integrate producer in extraction use case:

class ExtractOrdersUseCase:
    def __init__(self, shopify_gateway, publisher):
        self.shopify_gateway = shopify_gateway
        self.publisher = publisher
        self.translator = OrderEventTranslator()

    async def execute(self) -> int:
        orders = await self.shopify_gateway.fetch_all_orders()

        for order in orders:
            event = self.translator.to_event_message(order)
            self.publisher.publish_order(event)

        self.publisher.flush()
        return len(orders)

5. Graceful Shutdown

Handle signals for clean shutdown:

from contextlib import asynccontextmanager
import signal

@asynccontextmanager
async def managed_publisher(brokers, topic):
    """Context manager for producer lifecycle."""
    publisher = OrderEventPublisher(brokers, topic)

    def handle_shutdown(signum, frame):
        print(f"Received signal {signum}, shutting down...")
        publisher.close()

    signal.signal(signal.SIGTERM, handle_shutdown)
    signal.signal(signal.SIGINT, handle_shutdown)

    try:
        yield publisher
    finally:
        publisher.close()

Requirements

  • confluent-kafka>=2.3.0 - Production-grade Kafka client (10-20x faster than kafka-python)
  • msgspec>=0.18.6 - Ultra-fast serialization (10-20x faster than Pydantic)
  • structlog>=23.2.0 - Structured logging
  • opentelemetry-api>=1.22.0 - Distributed tracing
  • Kafka/Redpanda broker (3.x or later for exactly-once semantics)
  • Python 3.11+ with type checking

Configuration Guidelines

For Low Latency (real-time):

config = {
    "acks": "all",
    "linger.ms": 0,  # Send immediately
    "batch.size": 16384,  # Small batches
}

For High Throughput (batch processing):

config = {
    "acks": "all",
    "linger.ms": 100,  # Wait 100ms for batching
    "batch.size": 131072,  # 128KB batches
    "compression.type": "lz4",  # Better compression
}

Error Handling

Transient Errors (auto-retry by confluent-kafka):

  • Network timeouts
  • Broker temporarily unavailable
  • Leader election

Permanent Errors (fail fast):

  • Invalid topic
  • Permission denied
  • Schema validation failed

See references/error-handling.md for comprehensive error handling strategies including retry with exponential backoff, dead letter queues, circuit breaker patterns, and monitoring metrics.

Integration Examples

See examples/examples.md for 10 production-ready examples:

  1. Basic Order Publisher
  2. Multi-Topic Publisher
  3. Async Batch Publisher
  4. Monitored Publisher
  5. Context Manager Pattern
  6. Testing with Mocks
  7. Integration with Use Case
  8. Performance Tuning
  9. Low-Latency Configuration
  10. Graceful Shutdown

See Also

  • kafka-consumer-implementation skill - For consuming events
  • kafka-schema-management skill - For schema design
  • kafka-integration-testing skill - For testing
  • references/detailed-implementation.md - Complete implementation code
  • references/error-handling.md - Error handling strategies
  • examples/examples.md - Production-ready examples

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

playwright-web-scraper

No summary provided by upstream source.

Repository SourceNeeds Review
General

openscad-collision-detection

No summary provided by upstream source.

Repository SourceNeeds Review
General

java-test-generator

No summary provided by upstream source.

Repository SourceNeeds Review
General

playwright-network-analyzer

No summary provided by upstream source.

Repository SourceNeeds Review