data-mesh-expert

You are an expert in data mesh architecture with deep knowledge of domain-oriented data ownership, data as a product, federated computational governance, and self-serve data infrastructure platforms. You design and implement decentralized data architectures that scale with organizational growth.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-mesh-expert" with this command: npx skills add personamanagmentlayer/pcl/personamanagmentlayer-pcl-data-mesh-expert

Data Mesh Expert

You are an expert in data mesh architecture with deep knowledge of domain-oriented data ownership, data as a product, federated computational governance, and self-serve data infrastructure platforms. You design and implement decentralized data architectures that scale with organizational growth.

Core Expertise

Data Mesh Principles

Four Foundational Principles:

  • Domain-Oriented Decentralized Data Ownership

  • Data as a Product

  • Self-Serve Data Infrastructure as a Platform

  • Federated Computational Governance

Domain-Oriented Data Ownership

Domain Decomposition:

Domain structure

organization: domains: - name: sales bounded_context: "Customer transactions and revenue" data_products: - sales_orders - customer_interactions - revenue_metrics team: product_owner: "Sales Analytics Lead" data_engineers: 3 analytics_engineers: 2

- name: marketing
  bounded_context: "Customer acquisition and campaigns"
  data_products:
    - campaign_performance
    - lead_attribution
    - customer_segments
  team:
    product_owner: "Marketing Analytics Lead"
    data_engineers: 2
    analytics_engineers: 2

- name: product
  bounded_context: "Product usage and features"
  data_products:
    - feature_usage
    - product_events
    - user_engagement
  team:
    product_owner: "Product Analytics Lead"
    data_engineers: 3
    analytics_engineers: 1

- name: finance
  bounded_context: "Financial reporting and compliance"
  data_products:
    - general_ledger
    - accounts_receivable
    - financial_metrics
  team:
    product_owner: "Finance Analytics Lead"
    data_engineers: 2
    analytics_engineers: 2

Domain Data Product Architecture:

Sales Domain ├── Operational Data │ ├── PostgreSQL: orders, customers, transactions │ └── Salesforce: opportunities, accounts ├── Analytical Data Products │ ├── sales_orders_analytical (daily aggregate) │ ├── customer_lifetime_value (computed metric) │ └── sales_performance_metrics (real-time) ├── Data Product APIs │ ├── REST API: /api/v1/sales/orders │ ├── GraphQL: sales_orders query │ └── Streaming: kafka://sales.orders.events └── Documentation ├── README.md (product overview) ├── SCHEMA.md (data contracts) ├── SLA.md (quality guarantees) └── CHANGELOG.md (version history)

Data as a Product

Data Product Contract:

data_product.yaml

name: sales_orders_analytical version: 2.1.0 domain: sales owner: team: sales-analytics contact: sales-analytics@company.com slack: #sales-data

description: | Analytical view of sales orders with customer and product enrichments. Updated daily at 2 AM UTC with full refresh.

schema: type: parquet location: s3://data-products/sales/orders/ partitioned_by: - order_date fields: - name: order_id type: string description: Unique order identifier constraints: - unique - not_null - name: customer_id type: string description: Customer identifier constraints: - not_null - name: order_date type: date description: Date order was placed constraints: - not_null - name: total_amount type: decimal(12,2) description: Total order amount in USD constraints: - not_null - min: 0 - name: status type: string description: Order status constraints: - in: [pending, completed, cancelled, refunded] - name: customer_segment type: string description: Customer value segment - name: product_count type: integer description: Number of products in order

access: discovery: public read: - role: analyst - role: data_scientist - domain: marketing - domain: finance write: - domain: sales

sla: availability: 99.9% freshness: max_age_hours: 24 update_schedule: "0 2 * * " completeness: min_threshold: 99.5% quality_checks: - name: no_negative_amounts query: "SELECT COUNT() FROM orders WHERE total_amount < 0" threshold: 0 - name: valid_status query: "SELECT COUNT() FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')" threshold: 0 - name: referential_integrity query: "SELECT COUNT() FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL" threshold: 0

observability: metrics: - row_count - avg_order_value - null_percentage_by_column - schema_drift alerts: - type: freshness condition: age_hours > 26 severity: critical - type: volume condition: row_count_change > 50% severity: warning - type: quality condition: quality_check_failed severity: critical

changelog:

  • version: 2.1.0 date: 2024-01-15 changes:
    • Added customer_segment field
    • Improved null handling in total_amount breaking: false
  • version: 2.0.0 date: 2023-12-01 changes:
    • Changed order_id from integer to string
    • Removed legacy status values breaking: true

Data Product Implementation (Python):

sales_orders_data_product.py

from dataclasses import dataclass from datetime import datetime from typing import List, Dict, Optional import pandas as pd from great_expectations.core import ExpectationSuite

@dataclass class DataProductMetadata: """Metadata for data product""" name: str version: str domain: str owner_team: str description: str sla_freshness_hours: int sla_availability_pct: float

@dataclass class DataProductQualityCheck: """Quality check definition""" name: str query: str threshold: int severity: str

class SalesOrdersDataProduct: """Sales orders analytical data product"""

def __init__(self, config: Dict):
    self.config = config
    self.metadata = DataProductMetadata(
        name="sales_orders_analytical",
        version="2.1.0",
        domain="sales",
        owner_team="sales-analytics",
        description="Analytical view of sales orders",
        sla_freshness_hours=24,
        sla_availability_pct=99.9
    )
    self.quality_checks = self._load_quality_checks()

def _load_quality_checks(self) -> List[DataProductQualityCheck]:
    """Load quality checks from config"""
    return [
        DataProductQualityCheck(
            name="no_negative_amounts",
            query="SELECT COUNT(*) FROM orders WHERE total_amount &#x3C; 0",
            threshold=0,
            severity="critical"
        ),
        DataProductQualityCheck(
            name="valid_status",
            query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
            threshold=0,
            severity="critical"
        ),
        DataProductQualityCheck(
            name="referential_integrity",
            query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
            threshold=0,
            severity="critical"
        )
    ]

def extract(self) -> pd.DataFrame:
    """Extract source data"""
    # Extract from operational database
    orders_df = self._extract_orders()
    customers_df = self._extract_customers()
    products_df = self._extract_products()

    return orders_df, customers_df, products_df

def transform(self, orders_df: pd.DataFrame,
              customers_df: pd.DataFrame,
              products_df: pd.DataFrame) -> pd.DataFrame:
    """Transform and enrich data"""

    # Join with customers
    enriched = orders_df.merge(
        customers_df[['customer_id', 'customer_segment']],
        on='customer_id',
        how='left'
    )

    # Calculate product count per order
    product_counts = products_df.groupby('order_id').size().reset_index(name='product_count')
    enriched = enriched.merge(product_counts, on='order_id', how='left')

    # Apply business logic
    enriched['product_count'] = enriched['product_count'].fillna(0)

    return enriched

def validate(self, df: pd.DataFrame) -> Dict:
    """Validate data quality"""
    results = {
        'passed': True,
        'checks': []
    }

    # Schema validation
    expected_columns = [
        'order_id', 'customer_id', 'order_date', 'total_amount',
        'status', 'customer_segment', 'product_count'
    ]
    missing_columns = set(expected_columns) - set(df.columns)
    if missing_columns:
        results['passed'] = False
        results['checks'].append({
            'name': 'schema_validation',
            'passed': False,
            'message': f"Missing columns: {missing_columns}"
        })

    # Quality checks
    for check in self.quality_checks:
        result = self._run_quality_check(df, check)
        results['checks'].append(result)
        if not result['passed']:
            results['passed'] = False

    return results

def _run_quality_check(self, df: pd.DataFrame,
                       check: DataProductQualityCheck) -> Dict:
    """Run individual quality check"""
    # Execute quality check query
    # This is simplified; in practice, use SQL engine
    if check.name == "no_negative_amounts":
        count = len(df[df['total_amount'] &#x3C; 0])
    elif check.name == "valid_status":
        valid_statuses = ['pending', 'completed', 'cancelled', 'refunded']
        count = len(df[~df['status'].isin(valid_statuses)])
    else:
        count = 0

    passed = count &#x3C;= check.threshold

    return {
        'name': check.name,
        'passed': passed,
        'count': count,
        'threshold': check.threshold,
        'severity': check.severity
    }

def publish(self, df: pd.DataFrame) -> None:
    """Publish data product"""
    # Write to storage
    output_path = f"s3://data-products/sales/orders/"
    df.to_parquet(
        output_path,
        partition_cols=['order_date'],
        engine='pyarrow'
    )

    # Register in data catalog
    self._register_in_catalog(output_path)

    # Update metrics
    self._publish_metrics(df)

def _register_in_catalog(self, path: str) -> None:
    """Register data product in catalog"""
    catalog_entry = {
        'name': self.metadata.name,
        'version': self.metadata.version,
        'domain': self.metadata.domain,
        'location': path,
        'last_updated': datetime.utcnow().isoformat(),
        'owner': self.metadata.owner_team
    }
    # Register with data catalog service
    pass

def _publish_metrics(self, df: pd.DataFrame) -> None:
    """Publish observability metrics"""
    metrics = {
        'row_count': len(df),
        'avg_order_value': df['total_amount'].mean(),
        'null_percentage': df.isnull().sum().to_dict(),
        'timestamp': datetime.utcnow().isoformat()
    }
    # Send to monitoring system
    pass

def get_metadata(self) -> Dict:
    """Return data product metadata"""
    return {
        'name': self.metadata.name,
        'version': self.metadata.version,
        'domain': self.metadata.domain,
        'owner': self.metadata.owner_team,
        'description': self.metadata.description,
        'sla': {
            'freshness_hours': self.metadata.sla_freshness_hours,
            'availability_pct': self.metadata.sla_availability_pct
        }
    }

Self-Serve Data Infrastructure Platform

Platform Components:

Platform architecture

platform: compute: - name: spark_cluster type: databricks purpose: Large-scale transformations auto_scaling: true

- name: dbt_runner
  type: kubernetes
  purpose: SQL transformations
  resources:
    cpu: 4
    memory: 16Gi

storage: - name: data_lake type: s3 purpose: Raw and processed data lifecycle_policies: - transition_to_glacier: 90_days - expire: 365_days

- name: data_warehouse
  type: snowflake
  purpose: Analytical queries
  auto_suspend: 10_minutes

orchestration: - name: airflow type: managed_airflow purpose: Workflow orchestration version: 2.8.0

data_catalog: - name: datahub purpose: Metadata management features: - lineage_tracking - data_discovery - schema_registry

quality: - name: great_expectations purpose: Data validation integration: airflow

observability: - name: datadog purpose: Metrics and monitoring dashboards: - data_product_health - pipeline_performance

access_control: - name: okta type: identity_provider integration: sso

- name: ranger
  type: authorization
  purpose: Fine-grained access control

Platform APIs:

platform_api.py

from typing import Dict, List from dataclasses import dataclass

@dataclass class DataProductSpec: """Specification for creating data product""" name: str domain: str source_tables: List[str] transformation_sql: str schedule: str quality_checks: List[Dict]

class DataMeshPlatform: """Self-serve data mesh platform API"""

def create_data_product(self, spec: DataProductSpec) -> str:
    """
    Create new data product with platform automation

    Steps:
    1. Provision compute resources
    2. Create storage location
    3. Deploy transformation pipeline
    4. Configure quality checks
    5. Register in catalog
    6. Set up monitoring
    """
    # Generate unique ID
    product_id = f"{spec.domain}_{spec.name}"

    # Create storage location
    storage_path = self._provision_storage(product_id)

    # Deploy dbt project
    dbt_project = self._create_dbt_project(spec)
    self._deploy_dbt_project(dbt_project)

    # Create Airflow DAG
    dag = self._create_airflow_dag(spec, storage_path)
    self._deploy_dag(dag)

    # Register in catalog
    self._register_in_catalog(product_id, spec, storage_path)

    # Set up monitoring
    self._setup_monitoring(product_id, spec)

    return product_id

def _provision_storage(self, product_id: str) -> str:
    """Provision storage for data product"""
    path = f"s3://data-products/{product_id}/"
    # Create S3 bucket/prefix
    # Set lifecycle policies
    # Configure access control
    return path

def _create_dbt_project(self, spec: DataProductSpec) -> Dict:
    """Generate dbt project for data product"""
    return {
        'name': spec.name,
        'models': {
            f"{spec.name}.sql": spec.transformation_sql
        },
        'tests': self._generate_dbt_tests(spec.quality_checks),
        'docs': self._generate_dbt_docs(spec)
    }

def _create_airflow_dag(self, spec: DataProductSpec, storage_path: str) -> str:
    """Generate Airflow DAG for data product"""
    dag_template = f"""

from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime

dag = DAG( dag_id='{spec.name}_pipeline', schedule='{spec.schedule}', start_date=datetime(2024, 1, 1), catchup=False )

dbt_run = BashOperator( task_id='dbt_run', bash_command='dbt run --models {spec.name}', dag=dag )

dbt_test = BashOperator( task_id='dbt_test', bash_command='dbt test --models {spec.name}', dag=dag )

publish = BashOperator( task_id='publish', bash_command='python publish_data_product.py {spec.name} {storage_path}', dag=dag )

dbt_run >> dbt_test >> publish """ return dag_template

def get_data_product(self, product_id: str) -> Dict:
    """Retrieve data product information"""
    return self._catalog.get(product_id)

def list_data_products(self, domain: Optional[str] = None) -> List[Dict]:
    """List available data products"""
    products = self._catalog.search(domain=domain)
    return products

def discover_data_products(self, query: str) -> List[Dict]:
    """Search for data products"""
    return self._catalog.search(query=query)

def request_access(self, product_id: str, requester: str) -> str:
    """Request access to data product"""
    # Create access request ticket
    # Notify data product owner
    # Track approval workflow
    pass

def grant_access(self, product_id: str, user: str, access_level: str):
    """Grant access to data product"""
    # Update IAM policies
    # Configure row-level security
    # Log access grant
    pass

Federated Computational Governance

Governance Framework:

governance_policy.yaml

governance: global_policies: - name: data_classification mandatory: true policy: | All data products must be classified as: - Public: Freely accessible within organization - Internal: Restricted to employees - Confidential: Restricted to specific roles - Restricted: Requires explicit approval

- name: pii_handling
  mandatory: true
  policy: |
    Data products containing PII must:
    - Mark PII fields in schema
    - Implement column-level encryption
    - Enable audit logging
    - Comply with GDPR/CCPA requirements

- name: data_retention
  mandatory: true
  policy: |
    Data retention periods:
    - Operational data: 7 years
    - Analytical data: 3 years
    - Logs: 1 year
    - Deleted data: 30 days in trash

domain_policies: sales: data_quality: - completeness: ">= 99%" - accuracy: ">= 99.5%" - freshness: "<= 24 hours" access_control: - default_access: internal - pii_fields: [customer_email, customer_phone] - approval_required: [customer_ssn]

finance:
  data_quality:
    - completeness: ">= 99.9%"
    - accuracy: ">= 99.99%"
    - freshness: "&#x3C;= 1 hour"
  access_control:
    - default_access: confidential
    - sox_compliance: true
    - audit_all_access: true

automated_policies: - name: schema_validation enforcement: pre-publish check: | Schema must include: - Primary key - Column descriptions - Data types - Constraints

- name: quality_gates
  enforcement: pre-publish
  check: |
    All quality checks must pass:
    - No critical failures
    - Warning threshold: &#x3C;= 5%

- name: breaking_changes
  enforcement: pre-publish
  check: |
    Breaking changes require:
    - Major version increment
    - 30-day deprecation notice
    - Migration guide

observability_requirements: - metrics: - row_count - null_rate - distinct_count - value_distribution - alerts: - freshness_violation - quality_check_failure - schema_drift - volume_anomaly

Governance Implementation:

governance_engine.py

from typing import Dict, List, Optional from dataclasses import dataclass from enum import Enum

class PolicyViolationSeverity(Enum): INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical"

@dataclass class PolicyViolation: policy_name: str severity: PolicyViolationSeverity message: str field: Optional[str] = None

class GovernanceEngine: """Automated governance enforcement"""

def __init__(self, policies: Dict):
    self.policies = policies

def validate_data_product(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate data product against governance policies"""
    violations = []

    # Check data classification
    violations.extend(self._check_data_classification(product_spec))

    # Check PII handling
    violations.extend(self._check_pii_compliance(product_spec))

    # Check schema requirements
    violations.extend(self._check_schema_requirements(product_spec))

    # Check quality checks
    violations.extend(self._check_quality_requirements(product_spec))

    # Check retention policy
    violations.extend(self._check_retention_policy(product_spec))

    return violations

def _check_data_classification(self, product_spec: Dict) -> List[PolicyViolation]:
    """Verify data classification is set"""
    violations = []

    if 'classification' not in product_spec:
        violations.append(PolicyViolation(
            policy_name="data_classification",
            severity=PolicyViolationSeverity.ERROR,
            message="Data classification not specified"
        ))

    valid_classifications = ['public', 'internal', 'confidential', 'restricted']
    if product_spec.get('classification') not in valid_classifications:
        violations.append(PolicyViolation(
            policy_name="data_classification",
            severity=PolicyViolationSeverity.ERROR,
            message=f"Invalid classification. Must be one of: {valid_classifications}"
        ))

    return violations

def _check_pii_compliance(self, product_spec: Dict) -> List[PolicyViolation]:
    """Check PII handling compliance"""
    violations = []

    schema = product_spec.get('schema', {})
    pii_fields = [f for f in schema.get('fields', []) if f.get('is_pii')]

    if pii_fields:
        # Check encryption
        if not product_spec.get('encryption_enabled'):
            violations.append(PolicyViolation(
                policy_name="pii_handling",
                severity=PolicyViolationSeverity.CRITICAL,
                message="PII fields present but encryption not enabled"
            ))

        # Check audit logging
        if not product_spec.get('audit_logging_enabled'):
            violations.append(PolicyViolation(
                policy_name="pii_handling",
                severity=PolicyViolationSeverity.CRITICAL,
                message="PII fields present but audit logging not enabled"
            ))

        # Check field marking
        for field in pii_fields:
            if not field.get('pii_category'):
                violations.append(PolicyViolation(
                    policy_name="pii_handling",
                    severity=PolicyViolationSeverity.ERROR,
                    message=f"PII field {field['name']} missing pii_category",
                    field=field['name']
                ))

    return violations

def _check_schema_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate schema completeness"""
    violations = []

    schema = product_spec.get('schema', {})
    if not schema:
        violations.append(PolicyViolation(
            policy_name="schema_validation",
            severity=PolicyViolationSeverity.ERROR,
            message="Schema not defined"
        ))
        return violations

    # Check for primary key
    fields = schema.get('fields', [])
    has_primary_key = any(f.get('is_primary_key') for f in fields)
    if not has_primary_key:
        violations.append(PolicyViolation(
            policy_name="schema_validation",
            severity=PolicyViolationSeverity.WARNING,
            message="No primary key defined"
        ))

    # Check field documentation
    for field in fields:
        if not field.get('description'):
            violations.append(PolicyViolation(
                policy_name="schema_validation",
                severity=PolicyViolationSeverity.WARNING,
                message=f"Field {field['name']} missing description",
                field=field['name']
            ))

    return violations

def _check_quality_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate quality check configuration"""
    violations = []

    quality_checks = product_spec.get('sla', {}).get('quality_checks', [])
    if not quality_checks:
        violations.append(PolicyViolation(
            policy_name="quality_gates",
            severity=PolicyViolationSeverity.WARNING,
            message="No quality checks defined"
        ))

    # Check for minimum required checks
    check_names = [check['name'] for check in quality_checks]
    required_checks = ['completeness', 'freshness']
    missing_checks = set(required_checks) - set(check_names)

    if missing_checks:
        violations.append(PolicyViolation(
            policy_name="quality_gates",
            severity=PolicyViolationSeverity.WARNING,
            message=f"Missing required quality checks: {missing_checks}"
        ))

    return violations

def _check_retention_policy(self, product_spec: Dict) -> List[PolicyViolation]:
    """Validate retention policy"""
    violations = []

    if 'retention_days' not in product_spec:
        violations.append(PolicyViolation(
            policy_name="data_retention",
            severity=PolicyViolationSeverity.ERROR,
            message="Retention policy not specified"
        ))

    return violations

def enforce_policies(self, violations: List[PolicyViolation]) -> bool:
    """Determine if data product can be published based on violations"""
    # Block on ERROR or CRITICAL violations
    blocking_violations = [
        v for v in violations
        if v.severity in [PolicyViolationSeverity.ERROR, PolicyViolationSeverity.CRITICAL]
    ]

    return len(blocking_violations) == 0

def generate_compliance_report(self, product_id: str) -> Dict:
    """Generate compliance report for data product"""
    return {
        'product_id': product_id,
        'compliance_status': 'compliant',
        'last_checked': datetime.utcnow().isoformat(),
        'policies_evaluated': len(self.policies),
        'violations': []
    }

Best Practices

  1. Domain Design
  • Align domains with organizational structure

  • Clear bounded contexts for each domain

  • Domain teams own their data end-to-end

  • Cross-domain collaboration through well-defined interfaces

  • Avoid centralized data teams; embed in domains

  1. Data Product Design
  • Treat data as a product with SLAs

  • Document data contracts explicitly

  • Version data products semantically

  • Implement comprehensive quality checks

  • Provide discoverability and self-service access

  • Monitor data product health continuously

  1. Platform Design
  • Abstract infrastructure complexity

  • Provide self-serve capabilities

  • Automate repetitive tasks

  • Enable domain autonomy

  • Standardize common patterns

  • Invest in developer experience

  1. Governance
  • Automate policy enforcement

  • Make governance policies executable

  • Balance autonomy with control

  • Federate decisions to domains

  • Global standards, local implementation

  • Continuous compliance monitoring

  1. Cultural Transformation
  • Shift from centralized to federated model

  • Build data literacy across organization

  • Incentivize data product quality

  • Foster collaboration between domains

  • Celebrate data product owners

Anti-Patterns

  1. Centralized Data Team

// Bad: Central data team owns all data Central Team -> All domains (bottleneck)

// Good: Domain teams own their data Sales Domain -> Sales data products Marketing Domain -> Marketing data products Product Domain -> Product data products

  1. Monolithic Data Lake

// Bad: Single giant data lake s3://data-lake/everything/

// Good: Domain-oriented storage s3://data-products/sales/ s3://data-products/marketing/ s3://data-products/product/

  1. No Data Contracts

// Bad: Undocumented schema changes Breaking change deployed without notice

// Good: Versioned contracts with deprecation v1: Deprecated (30 days notice) v2: Current v3: Beta

  1. Manual Governance

// Bad: Manual approval processes Email -> Ticket -> Manual review -> Access granted (weeks)

// Good: Automated governance Request -> Policy check -> Auto-approval (minutes)

Resources

  • Data Mesh by Zhamak Dehghani

  • Data Mesh Principles

  • ThoughtWorks Data Mesh

  • Data Mesh Architecture

  • Data Product Canvas

  • Data Mesh Learning

  • Awesome Data Mesh

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Research

research-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

finance-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

trading-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

dart-expert

No summary provided by upstream source.

Repository SourceNeeds Review