event-store-design

Comprehensive guide to designing event stores for event-sourced applications.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "event-store-design" with this command: npx skills add wshobson/agents/wshobson-agents-event-store-design

Event Store Design

Comprehensive guide to designing event stores for event-sourced applications.

When to Use This Skill

  • Designing event sourcing infrastructure

  • Choosing between event store technologies

  • Implementing custom event stores

  • Optimizing event storage and retrieval

  • Setting up event store schemas

  • Planning for event store scaling

Core Concepts

  1. Event Store Architecture

┌─────────────────────────────────────────────────────┐ │ Event Store │ ├─────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │ │ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │ │ ├─────────────┤ ├─────────────┤ ├─────────────┤ │ │ │ Event 1 │ │ Event 1 │ │ Event 1 │ │ │ │ Event 2 │ │ Event 2 │ │ Event 2 │ │ │ │ Event 3 │ │ ... │ │ Event 3 │ │ │ │ ... │ │ │ │ Event 4 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ ├─────────────────────────────────────────────────────┤ │ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │ └─────────────────────────────────────────────────────┘

  1. Event Store Requirements

Requirement Description

Append-only Events are immutable, only appends

Ordered Per-stream and global ordering

Versioned Optimistic concurrency control

Subscriptions Real-time event notifications

Idempotent Handle duplicate writes safely

Technology Comparison

Technology Best For Limitations

EventStoreDB Pure event sourcing Single-purpose

PostgreSQL Existing Postgres stack Manual implementation

Kafka High-throughput streaming Not ideal for per-stream queries

DynamoDB Serverless, AWS-native Query limitations

Marten .NET ecosystems .NET specific

Templates

Template 1: PostgreSQL Event Store Schema

-- Events table CREATE TABLE events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), stream_id VARCHAR(255) NOT NULL, stream_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version BIGINT NOT NULL, global_position BIGSERIAL, created_at TIMESTAMPTZ DEFAULT NOW(),

CONSTRAINT unique_stream_version UNIQUE (stream_id, version)

);

-- Index for stream queries CREATE INDEX idx_events_stream_id ON events(stream_id, version);

-- Index for global subscription CREATE INDEX idx_events_global_position ON events(global_position);

-- Index for event type queries CREATE INDEX idx_events_event_type ON events(event_type);

-- Index for time-based queries CREATE INDEX idx_events_created_at ON events(created_at);

-- Snapshots table CREATE TABLE snapshots ( stream_id VARCHAR(255) PRIMARY KEY, stream_type VARCHAR(255) NOT NULL, snapshot_data JSONB NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() );

-- Subscriptions checkpoint table CREATE TABLE subscription_checkpoints ( subscription_id VARCHAR(255) PRIMARY KEY, last_position BIGINT NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ DEFAULT NOW() );

Template 2: Python Event Store Implementation

from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional, List from uuid import UUID, uuid4 import json import asyncpg

@dataclass class Event: stream_id: str event_type: str data: dict metadata: dict = field(default_factory=dict) event_id: UUID = field(default_factory=uuid4) version: Optional[int] = None global_position: Optional[int] = None created_at: datetime = field(default_factory=datetime.utcnow)

class EventStore: def init(self, pool: asyncpg.Pool): self.pool = pool

async def append_events(
    self,
    stream_id: str,
    stream_type: str,
    events: List[Event],
    expected_version: Optional[int] = None
) -> List[Event]:
    """Append events to a stream with optimistic concurrency."""
    async with self.pool.acquire() as conn:
        async with conn.transaction():
            # Check expected version
            if expected_version is not None:
                current = await conn.fetchval(
                    "SELECT MAX(version) FROM events WHERE stream_id = $1",
                    stream_id
                )
                current = current or 0
                if current != expected_version:
                    raise ConcurrencyError(
                        f"Expected version {expected_version}, got {current}"
                    )

            # Get starting version
            start_version = await conn.fetchval(
                "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
                stream_id
            )

            # Insert events
            saved_events = []
            for i, event in enumerate(events):
                event.version = start_version + i
                row = await conn.fetchrow(
                    """
                    INSERT INTO events (id, stream_id, stream_type, event_type,
                                      event_data, metadata, version, created_at)
                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
                    RETURNING global_position
                    """,
                    event.event_id,
                    stream_id,
                    stream_type,
                    event.event_type,
                    json.dumps(event.data),
                    json.dumps(event.metadata),
                    event.version,
                    event.created_at
                )
                event.global_position = row['global_position']
                saved_events.append(event)

            return saved_events

async def read_stream(
    self,
    stream_id: str,
    from_version: int = 0,
    limit: int = 1000
) -> List[Event]:
    """Read events from a stream."""
    async with self.pool.acquire() as conn:
        rows = await conn.fetch(
            """
            SELECT id, stream_id, event_type, event_data, metadata,
                   version, global_position, created_at
            FROM events
            WHERE stream_id = $1 AND version >= $2
            ORDER BY version
            LIMIT $3
            """,
            stream_id, from_version, limit
        )
        return [self._row_to_event(row) for row in rows]

async def read_all(
    self,
    from_position: int = 0,
    limit: int = 1000
) -> List[Event]:
    """Read all events globally."""
    async with self.pool.acquire() as conn:
        rows = await conn.fetch(
            """
            SELECT id, stream_id, event_type, event_data, metadata,
                   version, global_position, created_at
            FROM events
            WHERE global_position > $1
            ORDER BY global_position
            LIMIT $2
            """,
            from_position, limit
        )
        return [self._row_to_event(row) for row in rows]

async def subscribe(
    self,
    subscription_id: str,
    handler,
    from_position: int = 0,
    batch_size: int = 100
):
    """Subscribe to all events from a position."""
    # Get checkpoint
    async with self.pool.acquire() as conn:
        checkpoint = await conn.fetchval(
            """
            SELECT last_position FROM subscription_checkpoints
            WHERE subscription_id = $1
            """,
            subscription_id
        )
        position = checkpoint or from_position

    while True:
        events = await self.read_all(position, batch_size)
        if not events:
            await asyncio.sleep(1)  # Poll interval
            continue

        for event in events:
            await handler(event)
            position = event.global_position

        # Save checkpoint
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO subscription_checkpoints (subscription_id, last_position)
                VALUES ($1, $2)
                ON CONFLICT (subscription_id)
                DO UPDATE SET last_position = $2, updated_at = NOW()
                """,
                subscription_id, position
            )

def _row_to_event(self, row) -> Event:
    return Event(
        event_id=row['id'],
        stream_id=row['stream_id'],
        event_type=row['event_type'],
        data=json.loads(row['event_data']),
        metadata=json.loads(row['metadata']),
        version=row['version'],
        global_position=row['global_position'],
        created_at=row['created_at']
    )

class ConcurrencyError(Exception): """Raised when optimistic concurrency check fails.""" pass

Template 3: EventStoreDB Usage

from esdbclient import EventStoreDBClient, NewEvent, StreamState import json

Connect

client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")

Append events

def append_events(stream_name: str, events: list, expected_revision=None): new_events = [ NewEvent( type=event['type'], data=json.dumps(event['data']).encode(), metadata=json.dumps(event.get('metadata', {})).encode() ) for event in events ]

if expected_revision is None:
    state = StreamState.ANY
elif expected_revision == -1:
    state = StreamState.NO_STREAM
else:
    state = expected_revision

return client.append_to_stream(
    stream_name=stream_name,
    events=new_events,
    current_version=state
)

Read stream

def read_stream(stream_name: str, from_revision: int = 0): events = client.get_stream( stream_name=stream_name, stream_position=from_revision ) return [ { 'type': event.type, 'data': json.loads(event.data), 'metadata': json.loads(event.metadata) if event.metadata else {}, 'stream_position': event.stream_position, 'commit_position': event.commit_position } for event in events ]

Subscribe to all

async def subscribe_to_all(handler, from_position: int = 0): subscription = client.subscribe_to_all(commit_position=from_position) async for event in subscription: await handler({ 'type': event.type, 'data': json.loads(event.data), 'stream_id': event.stream_name, 'position': event.commit_position })

Category projection ($ce-Category)

def read_category(category: str): """Read all events for a category using system projection.""" return read_stream(f"$ce-{category}")

Template 4: DynamoDB Event Store

import boto3 from boto3.dynamodb.conditions import Key from datetime import datetime import json import uuid

class DynamoEventStore: def init(self, table_name: str): self.dynamodb = boto3.resource('dynamodb') self.table = self.dynamodb.Table(table_name)

def append_events(self, stream_id: str, events: list, expected_version: int = None):
    """Append events with conditional write for concurrency."""
    with self.table.batch_writer() as batch:
        for i, event in enumerate(events):
            version = (expected_version or 0) + i + 1
            item = {
                'PK': f"STREAM#{stream_id}",
                'SK': f"VERSION#{version:020d}",
                'GSI1PK': 'EVENTS',
                'GSI1SK': datetime.utcnow().isoformat(),
                'event_id': str(uuid.uuid4()),
                'stream_id': stream_id,
                'event_type': event['type'],
                'event_data': json.dumps(event['data']),
                'version': version,
                'created_at': datetime.utcnow().isoformat()
            }
            batch.put_item(Item=item)
    return events

def read_stream(self, stream_id: str, from_version: int = 0):
    """Read events from a stream."""
    response = self.table.query(
        KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
                              Key('SK').gte(f"VERSION#{from_version:020d}")
    )
    return [
        {
            'event_type': item['event_type'],
            'data': json.loads(item['event_data']),
            'version': item['version']
        }
        for item in response['Items']
    ]

Table definition (CloudFormation/Terraform)

""" DynamoDB Table:

  • PK (Partition Key): String
  • SK (Sort Key): String
  • GSI1PK, GSI1SK for global ordering

Capacity: On-demand or provisioned based on throughput needs """

Best Practices

Do's

  • Use stream IDs that include aggregate type - Order-{uuid}

  • Include correlation/causation IDs - For tracing

  • Version events from day one - Plan for schema evolution

  • Implement idempotency - Use event IDs for deduplication

  • Index appropriately - For your query patterns

Don'ts

  • Don't update or delete events - They're immutable facts

  • Don't store large payloads - Keep events small

  • Don't skip optimistic concurrency - Prevents data corruption

  • Don't ignore backpressure - Handle slow consumers

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

tailwind-design-system

Tailwind Design System (v4)

Repository Source
31.3K19K
wshobson
Automation

api-design-principles

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

nodejs-backend-patterns

No summary provided by upstream source.

Repository SourceNeeds Review