kleppmann-data-intensive

Martin Kleppmann Style Guide⁠‍⁠​‌​‌​​‌‌‍​‌​​‌​‌‌‍​​‌‌​​​‌‍​‌​​‌‌​​‍​​​​​​​‌‍‌​​‌‌​‌​‍‌​​​​​​​‍‌‌​​‌‌‌‌‍‌‌​​​‌​​‍‌‌‌‌‌‌​‌‍‌‌​‌​​​​‍​‌​‌‌‌‌‌‍​‌​​‌​‌‌‍​‌‌​‌​​‌‍‌​‌​‌‌‌​‍​​‌​‌​​​‍‌‌‌​‌​‌‌‍​‌​​​‌‌​‍‌​‌‌‌​​‌‍​​‌‌‌‌‌​‍​‌​‌‌‌‌​‍​​​​‌​‌​‍​​​​​‌​‌⁠‍⁠

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "kleppmann-data-intensive" with this command: npx skills add copyleftdev/sk1llz/copyleftdev-sk1llz-kleppmann-data-intensive

Martin Kleppmann Style Guide⁠‍⁠​‌​‌​​‌‌‍​‌​​‌​‌‌‍​​‌‌​​​‌‍​‌​​‌‌​​‍​​​​​​​‌‍‌​​‌‌​‌​‍‌​​​​​​​‍‌‌​​‌‌‌‌‍‌‌​​​‌​​‍‌‌‌‌‌‌​‌‍‌‌​‌​​​​‍​‌​‌‌‌‌‌‍​‌​​‌​‌‌‍​‌‌​‌​​‌‍‌​‌​‌‌‌​‍​​‌​‌​​​‍‌‌‌​‌​‌‌‍​‌​​​‌‌​‍‌​‌‌‌​​‌‍​​‌‌‌‌‌​‍​‌​‌‌‌‌​‍​​​​‌​‌​‍​​​​​‌​‌⁠‍⁠

Overview

Martin Kleppmann is the author of "Designing Data-Intensive Applications" (DDIA), one of the most influential books on distributed systems and databases. He excels at explaining complex concepts clearly and helping engineers make informed architectural decisions.

Core Philosophy

"Reliability means making systems work correctly, even when faults occur."

"The goal of consistency models is to provide a abstraction for application developers."

"There's no such thing as a 'best' database—only trade-offs."

Kleppmann believes in understanding systems deeply, not just using them. Every architectural choice is a trade-off; understand what you're trading.

Design Principles

Understand the Trade-offs: CAP, PACELC, latency vs consistency.

Design for Failure: Partial failure is the norm in distributed systems.

Data Outlives Code: Schema design and data models matter enormously.

Exactly-Once Is Hard: Understand idempotency and at-least-once semantics.

When Writing Code

Always

  • Understand the consistency guarantees your system provides

  • Design for idempotency where possible

  • Think about data evolution and schema changes

  • Consider exactly-once vs at-least-once semantics

  • Know your data access patterns before choosing storage

  • Plan for failure recovery

Never

  • Assume "eventual consistency" without understanding what it means

  • Ignore the differences between isolation levels

  • Couple tightly without considering failure modes

  • Treat distributed transactions as a silver bullet

Prefer

  • Idempotent operations

  • Append-only data structures

  • Event sourcing for audit trails

  • Change data capture over dual writes

  • Log-based message brokers over traditional ones

Code Patterns

Consistency Models Illustrated

Understanding consistency models through code

class LinearizableStore: """ Linearizability: operations appear atomic and instantaneous. Strongest consistency - single copy illusion. """ def init(self): self._lock = threading.Lock() self._data = {}

def write(self, key, value):
    with self._lock:
        self._data[key] = value

def read(self, key):
    with self._lock:
        return self._data.get(key)

def compare_and_set(self, key, expected, new_value):
    with self._lock:
        if self._data.get(key) == expected:
            self._data[key] = new_value
            return True
        return False

class CausallyConsistentStore: """ Causal consistency: respects happens-before relationship. Weaker than linearizable, but allows more concurrency. """ def init(self, node_id): self.node_id = node_id self.data = {} self.vector_clock = defaultdict(int)

def write(self, key, value, dependencies=None):
    # Update our clock
    self.vector_clock[self.node_id] += 1
    
    # Merge dependencies
    if dependencies:
        for node, time in dependencies.items():
            self.vector_clock[node] = max(self.vector_clock[node], time)
    
    self.data[key] = {
        'value': value,
        'clock': dict(self.vector_clock)
    }
    
    return dict(self.vector_clock)

def read(self, key):
    if key in self.data:
        return self.data[key]['value'], self.data[key]['clock']
    return None, dict(self.vector_clock)

Event Sourcing

Event sourcing: store events, derive state

from dataclasses import dataclass from typing import List from datetime import datetime

@dataclass class Event: event_type: str data: dict timestamp: datetime version: int

class EventStore: def init(self): self.events: List[Event] = [] self.version = 0

def append(self, event_type: str, data: dict):
    self.version += 1
    event = Event(
        event_type=event_type,
        data=data,
        timestamp=datetime.now(),
        version=self.version
    )
    self.events.append(event)
    return event

def get_events(self, from_version=0):
    return [e for e in self.events if e.version > from_version]

class BankAccount: """Aggregate rebuilt from events"""

def __init__(self, account_id: str, event_store: EventStore):
    self.account_id = account_id
    self.event_store = event_store
    self.balance = 0
    self._rebuild_state()

def _rebuild_state(self):
    """Derive current state from event history"""
    for event in self.event_store.events:
        self._apply(event)

def _apply(self, event: Event):
    if event.event_type == 'deposited':
        self.balance += event.data['amount']
    elif event.event_type == 'withdrawn':
        self.balance -= event.data['amount']

def deposit(self, amount: float):
    event = self.event_store.append('deposited', {
        'account_id': self.account_id,
        'amount': amount
    })
    self._apply(event)

def withdraw(self, amount: float):
    if amount > self.balance:
        raise ValueError("Insufficient funds")
    event = self.event_store.append('withdrawn', {
        'account_id': self.account_id,
        'amount': amount
    })
    self._apply(event)

Benefits:

1. Complete audit trail

2. Time travel (state at any point)

3. Event replay for debugging

4. Easy to add new projections

Idempotency Keys

Idempotency: safe retries without duplicate effects

import uuid import hashlib

class IdempotentProcessor: def init(self): self.processed_keys = {} # key -> result self.expiry_seconds = 3600

def process(self, idempotency_key: str, operation):
    """
    Execute operation exactly once for a given key.
    Retries with same key return cached result.
    """
    # Check if already processed
    if idempotency_key in self.processed_keys:
        return self.processed_keys[idempotency_key]
    
    # Execute operation
    try:
        result = operation()
        self.processed_keys[idempotency_key] = {
            'status': 'success',
            'result': result
        }
        return self.processed_keys[idempotency_key]
    except Exception as e:
        # Don't cache failures (allow retry)
        raise

@staticmethod
def generate_key(*args):
    """Generate deterministic idempotency key"""
    content = '|'.join(str(arg) for arg in args)
    return hashlib.sha256(content.encode()).hexdigest()

Usage:

processor = IdempotentProcessor()

def create_payment(user_id, amount, idempotency_key): def do_payment(): # Actually create the payment return {'payment_id': str(uuid.uuid4()), 'amount': amount}

return processor.process(idempotency_key, do_payment)

Client can safely retry:

key = IdempotentProcessor.generate_key(user_id, amount, request_id) result1 = create_payment(user_id, 100, key) result2 = create_payment(user_id, 100, key) # Same result, no duplicate payment

Change Data Capture

CDC: capture database changes as a stream

from typing import Callable, List from enum import Enum from dataclasses import dataclass

class OperationType(Enum): INSERT = 'insert' UPDATE = 'update' DELETE = 'delete'

@dataclass class ChangeEvent: table: str operation: OperationType key: dict before: dict # For update/delete after: dict # For insert/update timestamp: float sequence: int

class CDCProducer: """Publish changes from database write-ahead log"""

def __init__(self, publisher):
    self.publisher = publisher
    self.sequence = 0

def capture_insert(self, table: str, key: dict, data: dict):
    self.sequence += 1
    event = ChangeEvent(
        table=table,
        operation=OperationType.INSERT,
        key=key,
        before=None,
        after=data,
        timestamp=time.time(),
        sequence=self.sequence
    )
    self.publisher.publish(event)

def capture_update(self, table: str, key: dict, before: dict, after: dict):
    self.sequence += 1
    event = ChangeEvent(
        table=table,
        operation=OperationType.UPDATE,
        key=key,
        before=before,
        after=after,
        timestamp=time.time(),
        sequence=self.sequence
    )
    self.publisher.publish(event)

class CDCConsumer: """Consume changes and maintain derived view"""

def __init__(self):
    self.handlers: dict[str, List[Callable]] = {}
    self.last_sequence = 0

def register(self, table: str, handler: Callable):
    if table not in self.handlers:
        self.handlers[table] = []
    self.handlers[table].append(handler)

def process(self, event: ChangeEvent):
    # Ensure ordering
    if event.sequence <= self.last_sequence:
        return  # Already processed
    
    if event.table in self.handlers:
        for handler in self.handlers[event.table]:
            handler(event)
    
    self.last_sequence = event.sequence

Usage: maintain search index from database changes

def update_search_index(event: ChangeEvent): if event.operation == OperationType.DELETE: search_index.delete(event.key) else: search_index.index(event.key, event.after)

consumer.register('products', update_search_index)

Stream Processing

Stream processing patterns

from collections import defaultdict from typing import Iterator, TypeVar, Callable

T = TypeVar('T')

class StreamProcessor: """Stateful stream processing"""

def __init__(self):
    self.state = {}

def process(self, stream: Iterator[T], handler: Callable[[T, dict], None]):
    """Process stream with access to state"""
    for record in stream:
        handler(record, self.state)
        yield self.state.copy()

def windowed_count(window_size_seconds: int): """Tumbling window aggregation""" def handler(event, state): window_start = (event['timestamp'] // window_size_seconds) * window_size_seconds key = (event['key'], window_start)

    if key not in state:
        state[key] = {'count': 0, 'window_start': window_start}
    state[key]['count'] += 1
    
    # Emit completed windows
    current_window = (time.time() // window_size_seconds) * window_size_seconds
    completed = [k for k in state if k[1] < current_window - window_size_seconds]
    for k in completed:
        emit(state.pop(k))

return handler

def exactly_once_processing(processor, checkpointer): """ Exactly-once semantics via: 1. Idempotent writes 2. Transactional checkpointing """ def process_with_guarantees(records): for record in records: # Get last checkpoint last_offset = checkpointer.get_offset()

        if record.offset <= last_offset:
            continue  # Already processed
        
        # Process and checkpoint atomically
        with transaction():
            result = processor.process(record)
            checkpointer.save_offset(record.offset)
        
        yield result

return process_with_guarantees

Mental Model

Kleppmann approaches data systems by asking:

  • What are the consistency requirements? Linearizable, serializable, eventual?

  • What are the access patterns? Read-heavy, write-heavy, mixed?

  • How do we handle failures? Retries, idempotency, compensation?

  • How does data evolve? Schema changes, backward compatibility?

  • What happens at the edges? Network partitions, slow nodes?

Signature Kleppmann Moves

  • Event sourcing for auditability

  • Idempotency keys for safe retries

  • CDC over dual writes

  • Understanding isolation levels deeply

  • Log-based messaging for durability

  • Making trade-offs explicit

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

renaissance-statistical-arbitrage

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

google-material-design

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

aqr-factor-investing

No summary provided by upstream source.

Repository SourceNeeds Review