Kafka Producer Implementation
Purpose
Implement production-grade Kafka producers that reliably publish domain events with high performance, type safety, and comprehensive error handling. Covers msgspec serialization, confluent-kafka configuration, OpenTelemetry tracing, and anti-corruption layer patterns for translating domain models to message schemas.
When to Use This Skill
Use when building event publishers that send domain events to Kafka topics with "implement Kafka producer", "publish events to Kafka", "send order events", or "create event publisher".
Do NOT use for consuming events (use kafka-consumer-implementation), testing with testcontainers (use kafka-integration-testing), or designing schemas (use kafka-schema-management).
Quick Start
Create a high-performance Kafka producer in 3 steps:
- Define message schema:
import msgspec
class OrderEventMessage(msgspec.Struct, frozen=True):
order_id: str
created_at: str # ISO 8601
customer_name: str
total_price: float
- Implement producer:
from confluent_kafka import Producer
import msgspec
class OrderEventPublisher:
def __init__(self, brokers: list[str], topic: str) -> None:
config = {
"bootstrap.servers": ",".join(brokers),
"acks": "all",
"enable.idempotence": True,
"compression.type": "snappy",
}
self.producer = Producer(config)
self.topic = topic
self.encoder = msgspec.json.Encoder()
def publish(self, event: OrderEventMessage) -> None:
payload = self.encoder.encode(event)
self.producer.produce(
topic=self.topic,
key=event.order_id.encode("utf-8"),
value=payload,
)
self.producer.poll(0)
def close(self) -> None:
self.producer.flush(10.0)
- Use in application:
publisher = OrderEventPublisher(["localhost:9092"], "orders")
publisher.publish(order_event)
publisher.close()
Implementation Steps
1. Message Schema with msgspec
Define immutable schemas using msgspec.Struct for 10-20x faster serialization:
import msgspec
class LineItemEventMessage(msgspec.Struct, frozen=True):
line_item_id: str
product_id: str
product_title: str
quantity: int
price: float
class OrderEventMessage(msgspec.Struct, frozen=True):
order_id: str
created_at: str # ISO 8601 format string
customer_name: str
line_items: list[LineItemEventMessage]
total_price: float
Key Points:
- Use
frozen=Truefor immutability - Use primitive types (str, float, int) not custom objects
- Store timestamps as ISO 8601 strings
- msgspec produces JSON bytes automatically
2. Producer Adapter
Implement producer with error handling and tracing:
import msgspec
from confluent_kafka import Producer, KafkaException
from opentelemetry import trace
from structlog import get_logger
class OrderEventPublisher:
"""Publishes order events with high performance and reliability.
Features:
- msgspec serialization (10-20x faster than Pydantic)
- OpenTelemetry distributed tracing
- Idempotent exactly-once semantics
- Message ordering guarantees (by order_id key)
Configuration:
- acks=all: Wait for all in-sync replicas
- enable.idempotence=True: Exactly-once-per-send
- max.in.flight.requests.per.connection=1: Preserve order
- compression.type=snappy: Balance CPU/network
"""
def __init__(self, brokers: list[str], topic: str) -> None:
self.topic = topic
self.logger = get_logger(__name__)
self.tracer = trace.get_tracer(__name__)
self.encoder = msgspec.json.Encoder()
config = {
"bootstrap.servers": ",".join(brokers),
"acks": "all",
"retries": 5,
"max.in.flight.requests.per.connection": 1,
"compression.type": "snappy",
"enable.idempotence": True,
}
self.producer = Producer(config)
def publish_order(self, event: OrderEventMessage) -> None:
"""Publish order event with order_id as partition key."""
with self.tracer.start_as_current_span("publish_order") as span:
span.set_attribute("order_id", event.order_id)
payload = self.encoder.encode(event)
self.producer.produce(
topic=self.topic,
key=event.order_id.encode("utf-8"),
value=payload,
on_delivery=self._delivery_callback,
)
self.producer.poll(0)
def _delivery_callback(self, err, msg):
"""Handle delivery callback."""
if err:
self.logger.error("delivery_failed", error=str(err))
else:
self.logger.debug("message_delivered", partition=msg.partition(), offset=msg.offset())
def flush(self, timeout: float = 10.0) -> None:
"""Flush all pending messages."""
remaining = self.producer.flush(timeout)
if remaining > 0:
raise KafkaProducerError(f"Failed to flush {remaining} messages")
def close(self) -> None:
"""Close producer and release resources."""
self.flush()
See references/detailed-implementation.md for complete producer adapter code with full error handling.
3. Anti-Corruption Layer
Translate domain models to message schemas:
class OrderEventTranslator:
"""Translates domain Order to message schema.
Anti-corruption layer that:
- Converts domain entities to message DTOs
- Handles type conversions (OrderId -> str, Money -> float)
- Preserves timestamp information
"""
@staticmethod
def to_event_message(order: Order) -> OrderEventMessage:
line_items = [
LineItemEventMessage(
line_item_id=item.line_item_id,
product_id=str(item.product_id),
product_title=str(item.product_title),
quantity=item.quantity,
price=float(item.price.amount),
)
for item in order.line_items
]
return OrderEventMessage(
order_id=str(order.order_id),
created_at=order.created_at.isoformat(),
customer_name=order.customer_name,
line_items=line_items,
total_price=float(order.total_price.amount),
)
4. Use Case Integration
Integrate producer in extraction use case:
class ExtractOrdersUseCase:
def __init__(self, shopify_gateway, publisher):
self.shopify_gateway = shopify_gateway
self.publisher = publisher
self.translator = OrderEventTranslator()
async def execute(self) -> int:
orders = await self.shopify_gateway.fetch_all_orders()
for order in orders:
event = self.translator.to_event_message(order)
self.publisher.publish_order(event)
self.publisher.flush()
return len(orders)
5. Graceful Shutdown
Handle signals for clean shutdown:
from contextlib import asynccontextmanager
import signal
@asynccontextmanager
async def managed_publisher(brokers, topic):
"""Context manager for producer lifecycle."""
publisher = OrderEventPublisher(brokers, topic)
def handle_shutdown(signum, frame):
print(f"Received signal {signum}, shutting down...")
publisher.close()
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)
try:
yield publisher
finally:
publisher.close()
Requirements
confluent-kafka>=2.3.0- Production-grade Kafka client (10-20x faster than kafka-python)msgspec>=0.18.6- Ultra-fast serialization (10-20x faster than Pydantic)structlog>=23.2.0- Structured loggingopentelemetry-api>=1.22.0- Distributed tracing- Kafka/Redpanda broker (3.x or later for exactly-once semantics)
- Python 3.11+ with type checking
Configuration Guidelines
For Low Latency (real-time):
config = {
"acks": "all",
"linger.ms": 0, # Send immediately
"batch.size": 16384, # Small batches
}
For High Throughput (batch processing):
config = {
"acks": "all",
"linger.ms": 100, # Wait 100ms for batching
"batch.size": 131072, # 128KB batches
"compression.type": "lz4", # Better compression
}
Error Handling
Transient Errors (auto-retry by confluent-kafka):
- Network timeouts
- Broker temporarily unavailable
- Leader election
Permanent Errors (fail fast):
- Invalid topic
- Permission denied
- Schema validation failed
See references/error-handling.md for comprehensive error handling strategies including retry with exponential backoff, dead letter queues, circuit breaker patterns, and monitoring metrics.
Integration Examples
See examples/examples.md for 10 production-ready examples:
- Basic Order Publisher
- Multi-Topic Publisher
- Async Batch Publisher
- Monitored Publisher
- Context Manager Pattern
- Testing with Mocks
- Integration with Use Case
- Performance Tuning
- Low-Latency Configuration
- Graceful Shutdown
See Also
kafka-consumer-implementationskill - For consuming eventskafka-schema-managementskill - For schema designkafka-integration-testingskill - For testingreferences/detailed-implementation.md- Complete implementation codereferences/error-handling.md- Error handling strategiesexamples/examples.md- Production-ready examples