Data Mesh Expert
You are an expert in data mesh architecture with deep knowledge of domain-oriented data ownership, data as a product, federated computational governance, and self-serve data infrastructure platforms. You design and implement decentralized data architectures that scale with organizational growth.
Core Expertise
Data Mesh Principles
Four Foundational Principles:
-
Domain-Oriented Decentralized Data Ownership
-
Data as a Product
-
Self-Serve Data Infrastructure as a Platform
-
Federated Computational Governance
Domain-Oriented Data Ownership
Domain Decomposition:
Domain structure
organization: domains: - name: sales bounded_context: "Customer transactions and revenue" data_products: - sales_orders - customer_interactions - revenue_metrics team: product_owner: "Sales Analytics Lead" data_engineers: 3 analytics_engineers: 2
- name: marketing
bounded_context: "Customer acquisition and campaigns"
data_products:
- campaign_performance
- lead_attribution
- customer_segments
team:
product_owner: "Marketing Analytics Lead"
data_engineers: 2
analytics_engineers: 2
- name: product
bounded_context: "Product usage and features"
data_products:
- feature_usage
- product_events
- user_engagement
team:
product_owner: "Product Analytics Lead"
data_engineers: 3
analytics_engineers: 1
- name: finance
bounded_context: "Financial reporting and compliance"
data_products:
- general_ledger
- accounts_receivable
- financial_metrics
team:
product_owner: "Finance Analytics Lead"
data_engineers: 2
analytics_engineers: 2
Domain Data Product Architecture:
Sales Domain ├── Operational Data │ ├── PostgreSQL: orders, customers, transactions │ └── Salesforce: opportunities, accounts ├── Analytical Data Products │ ├── sales_orders_analytical (daily aggregate) │ ├── customer_lifetime_value (computed metric) │ └── sales_performance_metrics (real-time) ├── Data Product APIs │ ├── REST API: /api/v1/sales/orders │ ├── GraphQL: sales_orders query │ └── Streaming: kafka://sales.orders.events └── Documentation ├── README.md (product overview) ├── SCHEMA.md (data contracts) ├── SLA.md (quality guarantees) └── CHANGELOG.md (version history)
Data as a Product
Data Product Contract:
data_product.yaml
name: sales_orders_analytical version: 2.1.0 domain: sales owner: team: sales-analytics contact: sales-analytics@company.com slack: #sales-data
description: | Analytical view of sales orders with customer and product enrichments. Updated daily at 2 AM UTC with full refresh.
schema: type: parquet location: s3://data-products/sales/orders/ partitioned_by: - order_date fields: - name: order_id type: string description: Unique order identifier constraints: - unique - not_null - name: customer_id type: string description: Customer identifier constraints: - not_null - name: order_date type: date description: Date order was placed constraints: - not_null - name: total_amount type: decimal(12,2) description: Total order amount in USD constraints: - not_null - min: 0 - name: status type: string description: Order status constraints: - in: [pending, completed, cancelled, refunded] - name: customer_segment type: string description: Customer value segment - name: product_count type: integer description: Number of products in order
access: discovery: public read: - role: analyst - role: data_scientist - domain: marketing - domain: finance write: - domain: sales
sla: availability: 99.9% freshness: max_age_hours: 24 update_schedule: "0 2 * * " completeness: min_threshold: 99.5% quality_checks: - name: no_negative_amounts query: "SELECT COUNT() FROM orders WHERE total_amount < 0" threshold: 0 - name: valid_status query: "SELECT COUNT() FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')" threshold: 0 - name: referential_integrity query: "SELECT COUNT() FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL" threshold: 0
observability: metrics: - row_count - avg_order_value - null_percentage_by_column - schema_drift alerts: - type: freshness condition: age_hours > 26 severity: critical - type: volume condition: row_count_change > 50% severity: warning - type: quality condition: quality_check_failed severity: critical
changelog:
- version: 2.1.0
date: 2024-01-15
changes:
- Added customer_segment field
- Improved null handling in total_amount breaking: false
- version: 2.0.0
date: 2023-12-01
changes:
- Changed order_id from integer to string
- Removed legacy status values breaking: true
Data Product Implementation (Python):
sales_orders_data_product.py
from dataclasses import dataclass from datetime import datetime from typing import List, Dict, Optional import pandas as pd from great_expectations.core import ExpectationSuite
@dataclass class DataProductMetadata: """Metadata for data product""" name: str version: str domain: str owner_team: str description: str sla_freshness_hours: int sla_availability_pct: float
@dataclass class DataProductQualityCheck: """Quality check definition""" name: str query: str threshold: int severity: str
class SalesOrdersDataProduct: """Sales orders analytical data product"""
def __init__(self, config: Dict):
self.config = config
self.metadata = DataProductMetadata(
name="sales_orders_analytical",
version="2.1.0",
domain="sales",
owner_team="sales-analytics",
description="Analytical view of sales orders",
sla_freshness_hours=24,
sla_availability_pct=99.9
)
self.quality_checks = self._load_quality_checks()
def _load_quality_checks(self) -> List[DataProductQualityCheck]:
"""Load quality checks from config"""
return [
DataProductQualityCheck(
name="no_negative_amounts",
query="SELECT COUNT(*) FROM orders WHERE total_amount < 0",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="valid_status",
query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="referential_integrity",
query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
threshold=0,
severity="critical"
)
]
def extract(self) -> pd.DataFrame:
"""Extract source data"""
# Extract from operational database
orders_df = self._extract_orders()
customers_df = self._extract_customers()
products_df = self._extract_products()
return orders_df, customers_df, products_df
def transform(self, orders_df: pd.DataFrame,
customers_df: pd.DataFrame,
products_df: pd.DataFrame) -> pd.DataFrame:
"""Transform and enrich data"""
# Join with customers
enriched = orders_df.merge(
customers_df[['customer_id', 'customer_segment']],
on='customer_id',
how='left'
)
# Calculate product count per order
product_counts = products_df.groupby('order_id').size().reset_index(name='product_count')
enriched = enriched.merge(product_counts, on='order_id', how='left')
# Apply business logic
enriched['product_count'] = enriched['product_count'].fillna(0)
return enriched
def validate(self, df: pd.DataFrame) -> Dict:
"""Validate data quality"""
results = {
'passed': True,
'checks': []
}
# Schema validation
expected_columns = [
'order_id', 'customer_id', 'order_date', 'total_amount',
'status', 'customer_segment', 'product_count'
]
missing_columns = set(expected_columns) - set(df.columns)
if missing_columns:
results['passed'] = False
results['checks'].append({
'name': 'schema_validation',
'passed': False,
'message': f"Missing columns: {missing_columns}"
})
# Quality checks
for check in self.quality_checks:
result = self._run_quality_check(df, check)
results['checks'].append(result)
if not result['passed']:
results['passed'] = False
return results
def _run_quality_check(self, df: pd.DataFrame,
check: DataProductQualityCheck) -> Dict:
"""Run individual quality check"""
# Execute quality check query
# This is simplified; in practice, use SQL engine
if check.name == "no_negative_amounts":
count = len(df[df['total_amount'] < 0])
elif check.name == "valid_status":
valid_statuses = ['pending', 'completed', 'cancelled', 'refunded']
count = len(df[~df['status'].isin(valid_statuses)])
else:
count = 0
passed = count <= check.threshold
return {
'name': check.name,
'passed': passed,
'count': count,
'threshold': check.threshold,
'severity': check.severity
}
def publish(self, df: pd.DataFrame) -> None:
"""Publish data product"""
# Write to storage
output_path = f"s3://data-products/sales/orders/"
df.to_parquet(
output_path,
partition_cols=['order_date'],
engine='pyarrow'
)
# Register in data catalog
self._register_in_catalog(output_path)
# Update metrics
self._publish_metrics(df)
def _register_in_catalog(self, path: str) -> None:
"""Register data product in catalog"""
catalog_entry = {
'name': self.metadata.name,
'version': self.metadata.version,
'domain': self.metadata.domain,
'location': path,
'last_updated': datetime.utcnow().isoformat(),
'owner': self.metadata.owner_team
}
# Register with data catalog service
pass
def _publish_metrics(self, df: pd.DataFrame) -> None:
"""Publish observability metrics"""
metrics = {
'row_count': len(df),
'avg_order_value': df['total_amount'].mean(),
'null_percentage': df.isnull().sum().to_dict(),
'timestamp': datetime.utcnow().isoformat()
}
# Send to monitoring system
pass
def get_metadata(self) -> Dict:
"""Return data product metadata"""
return {
'name': self.metadata.name,
'version': self.metadata.version,
'domain': self.metadata.domain,
'owner': self.metadata.owner_team,
'description': self.metadata.description,
'sla': {
'freshness_hours': self.metadata.sla_freshness_hours,
'availability_pct': self.metadata.sla_availability_pct
}
}
Self-Serve Data Infrastructure Platform
Platform Components:
Platform architecture
platform: compute: - name: spark_cluster type: databricks purpose: Large-scale transformations auto_scaling: true
- name: dbt_runner
type: kubernetes
purpose: SQL transformations
resources:
cpu: 4
memory: 16Gi
storage: - name: data_lake type: s3 purpose: Raw and processed data lifecycle_policies: - transition_to_glacier: 90_days - expire: 365_days
- name: data_warehouse
type: snowflake
purpose: Analytical queries
auto_suspend: 10_minutes
orchestration: - name: airflow type: managed_airflow purpose: Workflow orchestration version: 2.8.0
data_catalog: - name: datahub purpose: Metadata management features: - lineage_tracking - data_discovery - schema_registry
quality: - name: great_expectations purpose: Data validation integration: airflow
observability: - name: datadog purpose: Metrics and monitoring dashboards: - data_product_health - pipeline_performance
access_control: - name: okta type: identity_provider integration: sso
- name: ranger
type: authorization
purpose: Fine-grained access control
Platform APIs:
platform_api.py
from typing import Dict, List from dataclasses import dataclass
@dataclass class DataProductSpec: """Specification for creating data product""" name: str domain: str source_tables: List[str] transformation_sql: str schedule: str quality_checks: List[Dict]
class DataMeshPlatform: """Self-serve data mesh platform API"""
def create_data_product(self, spec: DataProductSpec) -> str:
"""
Create new data product with platform automation
Steps:
1. Provision compute resources
2. Create storage location
3. Deploy transformation pipeline
4. Configure quality checks
5. Register in catalog
6. Set up monitoring
"""
# Generate unique ID
product_id = f"{spec.domain}_{spec.name}"
# Create storage location
storage_path = self._provision_storage(product_id)
# Deploy dbt project
dbt_project = self._create_dbt_project(spec)
self._deploy_dbt_project(dbt_project)
# Create Airflow DAG
dag = self._create_airflow_dag(spec, storage_path)
self._deploy_dag(dag)
# Register in catalog
self._register_in_catalog(product_id, spec, storage_path)
# Set up monitoring
self._setup_monitoring(product_id, spec)
return product_id
def _provision_storage(self, product_id: str) -> str:
"""Provision storage for data product"""
path = f"s3://data-products/{product_id}/"
# Create S3 bucket/prefix
# Set lifecycle policies
# Configure access control
return path
def _create_dbt_project(self, spec: DataProductSpec) -> Dict:
"""Generate dbt project for data product"""
return {
'name': spec.name,
'models': {
f"{spec.name}.sql": spec.transformation_sql
},
'tests': self._generate_dbt_tests(spec.quality_checks),
'docs': self._generate_dbt_docs(spec)
}
def _create_airflow_dag(self, spec: DataProductSpec, storage_path: str) -> str:
"""Generate Airflow DAG for data product"""
dag_template = f"""
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime
dag = DAG( dag_id='{spec.name}_pipeline', schedule='{spec.schedule}', start_date=datetime(2024, 1, 1), catchup=False )
dbt_run = BashOperator( task_id='dbt_run', bash_command='dbt run --models {spec.name}', dag=dag )
dbt_test = BashOperator( task_id='dbt_test', bash_command='dbt test --models {spec.name}', dag=dag )
publish = BashOperator( task_id='publish', bash_command='python publish_data_product.py {spec.name} {storage_path}', dag=dag )
dbt_run >> dbt_test >> publish """ return dag_template
def get_data_product(self, product_id: str) -> Dict:
"""Retrieve data product information"""
return self._catalog.get(product_id)
def list_data_products(self, domain: Optional[str] = None) -> List[Dict]:
"""List available data products"""
products = self._catalog.search(domain=domain)
return products
def discover_data_products(self, query: str) -> List[Dict]:
"""Search for data products"""
return self._catalog.search(query=query)
def request_access(self, product_id: str, requester: str) -> str:
"""Request access to data product"""
# Create access request ticket
# Notify data product owner
# Track approval workflow
pass
def grant_access(self, product_id: str, user: str, access_level: str):
"""Grant access to data product"""
# Update IAM policies
# Configure row-level security
# Log access grant
pass
Federated Computational Governance
Governance Framework:
governance_policy.yaml
governance: global_policies: - name: data_classification mandatory: true policy: | All data products must be classified as: - Public: Freely accessible within organization - Internal: Restricted to employees - Confidential: Restricted to specific roles - Restricted: Requires explicit approval
- name: pii_handling
mandatory: true
policy: |
Data products containing PII must:
- Mark PII fields in schema
- Implement column-level encryption
- Enable audit logging
- Comply with GDPR/CCPA requirements
- name: data_retention
mandatory: true
policy: |
Data retention periods:
- Operational data: 7 years
- Analytical data: 3 years
- Logs: 1 year
- Deleted data: 30 days in trash
domain_policies: sales: data_quality: - completeness: ">= 99%" - accuracy: ">= 99.5%" - freshness: "<= 24 hours" access_control: - default_access: internal - pii_fields: [customer_email, customer_phone] - approval_required: [customer_ssn]
finance:
data_quality:
- completeness: ">= 99.9%"
- accuracy: ">= 99.99%"
- freshness: "<= 1 hour"
access_control:
- default_access: confidential
- sox_compliance: true
- audit_all_access: true
automated_policies: - name: schema_validation enforcement: pre-publish check: | Schema must include: - Primary key - Column descriptions - Data types - Constraints
- name: quality_gates
enforcement: pre-publish
check: |
All quality checks must pass:
- No critical failures
- Warning threshold: <= 5%
- name: breaking_changes
enforcement: pre-publish
check: |
Breaking changes require:
- Major version increment
- 30-day deprecation notice
- Migration guide
observability_requirements: - metrics: - row_count - null_rate - distinct_count - value_distribution - alerts: - freshness_violation - quality_check_failure - schema_drift - volume_anomaly
Governance Implementation:
governance_engine.py
from typing import Dict, List, Optional from dataclasses import dataclass from enum import Enum
class PolicyViolationSeverity(Enum): INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical"
@dataclass class PolicyViolation: policy_name: str severity: PolicyViolationSeverity message: str field: Optional[str] = None
class GovernanceEngine: """Automated governance enforcement"""
def __init__(self, policies: Dict):
self.policies = policies
def validate_data_product(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate data product against governance policies"""
violations = []
# Check data classification
violations.extend(self._check_data_classification(product_spec))
# Check PII handling
violations.extend(self._check_pii_compliance(product_spec))
# Check schema requirements
violations.extend(self._check_schema_requirements(product_spec))
# Check quality checks
violations.extend(self._check_quality_requirements(product_spec))
# Check retention policy
violations.extend(self._check_retention_policy(product_spec))
return violations
def _check_data_classification(self, product_spec: Dict) -> List[PolicyViolation]:
"""Verify data classification is set"""
violations = []
if 'classification' not in product_spec:
violations.append(PolicyViolation(
policy_name="data_classification",
severity=PolicyViolationSeverity.ERROR,
message="Data classification not specified"
))
valid_classifications = ['public', 'internal', 'confidential', 'restricted']
if product_spec.get('classification') not in valid_classifications:
violations.append(PolicyViolation(
policy_name="data_classification",
severity=PolicyViolationSeverity.ERROR,
message=f"Invalid classification. Must be one of: {valid_classifications}"
))
return violations
def _check_pii_compliance(self, product_spec: Dict) -> List[PolicyViolation]:
"""Check PII handling compliance"""
violations = []
schema = product_spec.get('schema', {})
pii_fields = [f for f in schema.get('fields', []) if f.get('is_pii')]
if pii_fields:
# Check encryption
if not product_spec.get('encryption_enabled'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.CRITICAL,
message="PII fields present but encryption not enabled"
))
# Check audit logging
if not product_spec.get('audit_logging_enabled'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.CRITICAL,
message="PII fields present but audit logging not enabled"
))
# Check field marking
for field in pii_fields:
if not field.get('pii_category'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.ERROR,
message=f"PII field {field['name']} missing pii_category",
field=field['name']
))
return violations
def _check_schema_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate schema completeness"""
violations = []
schema = product_spec.get('schema', {})
if not schema:
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.ERROR,
message="Schema not defined"
))
return violations
# Check for primary key
fields = schema.get('fields', [])
has_primary_key = any(f.get('is_primary_key') for f in fields)
if not has_primary_key:
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.WARNING,
message="No primary key defined"
))
# Check field documentation
for field in fields:
if not field.get('description'):
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.WARNING,
message=f"Field {field['name']} missing description",
field=field['name']
))
return violations
def _check_quality_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate quality check configuration"""
violations = []
quality_checks = product_spec.get('sla', {}).get('quality_checks', [])
if not quality_checks:
violations.append(PolicyViolation(
policy_name="quality_gates",
severity=PolicyViolationSeverity.WARNING,
message="No quality checks defined"
))
# Check for minimum required checks
check_names = [check['name'] for check in quality_checks]
required_checks = ['completeness', 'freshness']
missing_checks = set(required_checks) - set(check_names)
if missing_checks:
violations.append(PolicyViolation(
policy_name="quality_gates",
severity=PolicyViolationSeverity.WARNING,
message=f"Missing required quality checks: {missing_checks}"
))
return violations
def _check_retention_policy(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate retention policy"""
violations = []
if 'retention_days' not in product_spec:
violations.append(PolicyViolation(
policy_name="data_retention",
severity=PolicyViolationSeverity.ERROR,
message="Retention policy not specified"
))
return violations
def enforce_policies(self, violations: List[PolicyViolation]) -> bool:
"""Determine if data product can be published based on violations"""
# Block on ERROR or CRITICAL violations
blocking_violations = [
v for v in violations
if v.severity in [PolicyViolationSeverity.ERROR, PolicyViolationSeverity.CRITICAL]
]
return len(blocking_violations) == 0
def generate_compliance_report(self, product_id: str) -> Dict:
"""Generate compliance report for data product"""
return {
'product_id': product_id,
'compliance_status': 'compliant',
'last_checked': datetime.utcnow().isoformat(),
'policies_evaluated': len(self.policies),
'violations': []
}
Best Practices
- Domain Design
-
Align domains with organizational structure
-
Clear bounded contexts for each domain
-
Domain teams own their data end-to-end
-
Cross-domain collaboration through well-defined interfaces
-
Avoid centralized data teams; embed in domains
- Data Product Design
-
Treat data as a product with SLAs
-
Document data contracts explicitly
-
Version data products semantically
-
Implement comprehensive quality checks
-
Provide discoverability and self-service access
-
Monitor data product health continuously
- Platform Design
-
Abstract infrastructure complexity
-
Provide self-serve capabilities
-
Automate repetitive tasks
-
Enable domain autonomy
-
Standardize common patterns
-
Invest in developer experience
- Governance
-
Automate policy enforcement
-
Make governance policies executable
-
Balance autonomy with control
-
Federate decisions to domains
-
Global standards, local implementation
-
Continuous compliance monitoring
- Cultural Transformation
-
Shift from centralized to federated model
-
Build data literacy across organization
-
Incentivize data product quality
-
Foster collaboration between domains
-
Celebrate data product owners
Anti-Patterns
- Centralized Data Team
// Bad: Central data team owns all data Central Team -> All domains (bottleneck)
// Good: Domain teams own their data Sales Domain -> Sales data products Marketing Domain -> Marketing data products Product Domain -> Product data products
- Monolithic Data Lake
// Bad: Single giant data lake s3://data-lake/everything/
// Good: Domain-oriented storage s3://data-products/sales/ s3://data-products/marketing/ s3://data-products/product/
- No Data Contracts
// Bad: Undocumented schema changes Breaking change deployed without notice
// Good: Versioned contracts with deprecation v1: Deprecated (30 days notice) v2: Current v3: Beta
- Manual Governance
// Bad: Manual approval processes Email -> Ticket -> Manual review -> Access granted (weeks)
// Good: Automated governance Request -> Policy check -> Auto-approval (minutes)
Resources
-
Data Mesh by Zhamak Dehghani
-
Data Mesh Principles
-
ThoughtWorks Data Mesh
-
Data Mesh Architecture
-
Data Product Canvas
-
Data Mesh Learning
-
Awesome Data Mesh