data-products

Data products are self-contained, discoverable data assets with clear ownership, SLAs, and contracts. This skill provides patterns for designing, implementing, and governing data products using Data Mesh principles.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-products" with this command: npx skills add vivekgana/databricks-platform-marketplace/vivekgana-databricks-platform-marketplace-data-products

Data Products Skill

Overview

Data products are self-contained, discoverable data assets with clear ownership, SLAs, and contracts. This skill provides patterns for designing, implementing, and governing data products using Data Mesh principles.

Key Benefits:

  • Clear ownership and accountability

  • Explicit contracts and SLAs

  • Discoverable and self-serve

  • Version-controlled interfaces

  • Quality guarantees

  • Consumer-oriented design

When to Use This Skill

Use data product patterns when you need to:

  • Build self-serve data platforms

  • Implement Data Mesh architecture

  • Define clear data contracts between teams

  • Establish data ownership and accountability

  • Provide discoverable data assets

  • Guarantee data quality and freshness

  • Enable cross-domain data sharing

Core Concepts

  1. Data Product Definition

Product Specification:

product: name: customer-360 version: 2.1.0 description: Complete customer view with demographics, behavior, and preferences owner: team: customer-data-platform email: cdp-team@company.com domain: customer subdomain: customer-intelligence

interfaces:

  • name: customer_profile type: delta_table location: catalog.customer.profile_v2 format: delta access_pattern: batch_query

  • name: customer_events_stream type: kafka_topic location: customer.events.v2 format: avro access_pattern: streaming

contracts: schema: version: 2.1.0 evolution_policy: backward_compatible

sla: freshness: profile: 1_hour events: real_time availability: 99.9% completeness: 99.5% accuracy: 99%

quality_rules: - name: primary_key_uniqueness rule: customer_id IS NOT NULL AND UNIQUE severity: critical - name: email_format rule: email matches RFC5322 pattern severity: high

governance: classification: PII retention_days: 2555 # 7 years access_controls: - role: data_analyst permissions: [SELECT] - role: data_engineer permissions: [SELECT, INSERT]

  1. Data Contracts

Contract Schema Definition:

""" Data contract implementation. """ from dataclasses import dataclass from typing import List, Dict, Any from enum import Enum

class DataType(Enum): """Supported data types.""" STRING = "string" INTEGER = "integer" DOUBLE = "double" TIMESTAMP = "timestamp" BOOLEAN = "boolean"

@dataclass class FieldContract: """Contract for a single field.""" name: str type: DataType required: bool description: str constraints: Dict[str, Any] = None pii: bool = False

@dataclass class DataContract: """Complete data contract specification.""" product_name: str version: str fields: List[FieldContract] primary_keys: List[str] partitioning: List[str] quality_rules: List[Dict[str, Any]] sla: Dict[str, Any]

def validate_schema(self, df) -> bool:
    """Validate DataFrame against contract."""
    # Check all required fields present
    contract_fields = {f.name for f in self.fields}
    df_fields = set(df.columns)

    if not contract_fields.issubset(df_fields):
        missing = contract_fields - df_fields
        raise ValueError(f"Missing required fields: {missing}")

    # Check field types
    for field in self.fields:
        if field.required and df.filter(df[field.name].isNull()).count() > 0:
            raise ValueError(f"Required field {field.name} has null values")

    return True

Example usage

customer_contract = DataContract( product_name="customer-360", version="2.1.0", fields=[ FieldContract( name="customer_id", type=DataType.STRING, required=True, description="Unique customer identifier", pii=False ), FieldContract( name="email", type=DataType.STRING, required=True, description="Customer email address", constraints={"format": "email"}, pii=True ) ], primary_keys=["customer_id"], partitioning=["registration_date"], quality_rules=[ {"name": "uniqueness", "column": "customer_id", "rule": "unique"}, {"name": "email_format", "column": "email", "rule": "matches_regex", "pattern": r"^[A-Za-z0-9._%+-]+@"} ], sla={ "freshness_hours": 1, "availability_percent": 99.9, "completeness_percent": 99.5 } )

  1. SLA Management

SLA Monitoring:

""" SLA monitoring and enforcement. """ from datetime import datetime, timedelta from typing import Dict, Any

class SLAMonitor: """Monitor and enforce data product SLAs."""

def __init__(self, spark):
    self.spark = spark

def check_freshness(
    self,
    table_name: str,
    timestamp_column: str,
    max_age_hours: int
) -> Dict[str, Any]:
    """Check if data meets freshness SLA."""
    df = self.spark.table(table_name)

    latest_timestamp = df.agg(
        {timestamp_column: "max"}
    ).collect()[0][0]

    age_hours = (
        datetime.now() - latest_timestamp
    ).total_seconds() / 3600

    return {
        "met": age_hours <= max_age_hours,
        "latest_timestamp": latest_timestamp,
        "age_hours": age_hours,
        "sla_hours": max_age_hours
    }

def check_completeness(
    self,
    table_name: str,
    required_columns: List[str],
    threshold_percent: float = 99.0
) -> Dict[str, Any]:
    """Check if data meets completeness SLA."""
    df = self.spark.table(table_name)
    total_records = df.count()

    results = {}
    for column in required_columns:
        non_null_count = df.filter(df[column].isNotNull()).count()
        completeness = (non_null_count / total_records) * 100

        results[column] = {
            "completeness_percent": completeness,
            "met": completeness >= threshold_percent,
            "missing_count": total_records - non_null_count
        }

    overall_met = all(r["met"] for r in results.values())

    return {
        "met": overall_met,
        "by_column": results
    }

def check_availability(
    self,
    table_name: str,
    lookback_days: int = 7
) -> Dict[str, Any]:
    """Check if data product meets availability SLA."""
    # Query system tables for availability metrics
    query = f"""
    SELECT
        COUNT(*) as total_checks,
        SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_checks
    FROM system.monitoring.table_health_checks
    WHERE table_name = '{table_name}'
      AND check_time >= current_date() - INTERVAL {lookback_days} DAYS
    """

    result = self.spark.sql(query).first()

    availability_percent = (
        result["successful_checks"] / result["total_checks"] * 100
        if result["total_checks"] > 0 else 0
    )

    return {
        "availability_percent": availability_percent,
        "total_checks": result["total_checks"],
        "successful_checks": result["successful_checks"],
        "lookback_days": lookback_days
    }

4. Product Metadata

Metadata Management:

""" Data product metadata and discovery. """ from dataclasses import dataclass, asdict from typing import List, Dict, Any from datetime import datetime import json

@dataclass class ProductMetadata: """Metadata for data product discovery.""" name: str version: str description: str owner_team: str owner_email: str domain: str tags: List[str] interfaces: List[Dict[str, Any]] documentation_url: str created_at: datetime updated_at: datetime

def to_delta(self, spark, catalog_table: str):
    """Persist metadata to Unity Catalog."""
    metadata_dict = asdict(self)
    metadata_dict["created_at"] = self.created_at.isoformat()
    metadata_dict["updated_at"] = self.updated_at.isoformat()
    metadata_dict["interfaces"] = json.dumps(self.interfaces)
    metadata_dict["tags"] = json.dumps(self.tags)

    df = spark.createDataFrame([metadata_dict])
    df.write.format("delta").mode("append").saveAsTable(catalog_table)

class ProductCatalog: """Centralized product catalog for discovery."""

def __init__(self, spark, catalog_table: str):
    self.spark = spark
    self.catalog_table = catalog_table

def register_product(self, metadata: ProductMetadata):
    """Register new data product."""
    metadata.to_delta(self.spark, self.catalog_table)

def search_products(
    self,
    domain: str = None,
    tags: List[str] = None
) -> List[Dict[str, Any]]:
    """Search for data products."""
    query = f"SELECT * FROM {self.catalog_table} WHERE 1=1"

    if domain:
        query += f" AND domain = '{domain}'"

    if tags:
        for tag in tags:
            query += f" AND array_contains(tags, '{tag}')"

    return self.spark.sql(query).collect()

def get_product(self, name: str, version: str = None) -> Dict[str, Any]:
    """Get specific product metadata."""
    query = f"SELECT * FROM {self.catalog_table} WHERE name = '{name}'"

    if version:
        query += f" AND version = '{version}'"
    else:
        query += " ORDER BY updated_at DESC LIMIT 1"

    result = self.spark.sql(query).first()
    return result.asDict() if result else None

Implementation Patterns

Pattern 1: Building a Data Product

Complete Product Implementation:

""" Customer 360 Data Product Implementation """ import dlt from pyspark.sql.functions import *

Product metadata

PRODUCT_NAME = "customer-360" PRODUCT_VERSION = "2.1.0" PRODUCT_OWNER = "customer-data-platform"

@dlt.table( name="customer_360_profile", comment="Customer 360 profile data product", table_properties={ "product.name": PRODUCT_NAME, "product.version": PRODUCT_VERSION, "product.owner": PRODUCT_OWNER, "product.interface": "customer_profile", "quality.tier": "gold", "pii.contains": "true" } ) @dlt.expect_or_fail("pk_not_null", "customer_id IS NOT NULL") @dlt.expect_or_fail("pk_unique", "customer_id IS UNIQUE") @dlt.expect_or_drop("valid_email", "email RLIKE '^[A-Za-z0-9._%+-]+@'") @dlt.expect("complete_profile", "phone IS NOT NULL AND address IS NOT NULL") def customer_360_profile(): """ Build customer 360 profile data product.

SLA:
- Freshness: 1 hour
- Availability: 99.9%
- Completeness: 99.5%
"""
# Source data
demographics = dlt.read("silver_customer_demographics")
behavior = dlt.read("silver_customer_behavior")
preferences = dlt.read("silver_customer_preferences")

# Join sources
profile = (
    demographics
    .join(behavior, "customer_id", "left")
    .join(preferences, "customer_id", "left")
    .select(
        "customer_id",
        "email",
        "phone",
        "address",
        "registration_date",
        "segment",
        "lifetime_value",
        "last_purchase_date",
        "total_orders",
        "preferred_category",
        "communication_preference"
    )
    .withColumn("product_version", lit(PRODUCT_VERSION))
    .withColumn("updated_at", current_timestamp())
)

return profile

Product quality monitoring

@dlt.table( name="customer_360_quality_metrics", comment="Quality metrics for customer 360 product" ) def quality_metrics(): """Monitor product quality against SLAs.""" profile = dlt.read("customer_360_profile")

return spark.sql(f"""
    SELECT
        current_timestamp() as metric_timestamp,
        '{PRODUCT_NAME}' as product_name,
        '{PRODUCT_VERSION}' as product_version,
        COUNT(*) as total_records,
        COUNT_IF(customer_id IS NOT NULL) * 100.0 / COUNT(*) as id_completeness,
        COUNT_IF(email IS NOT NULL) * 100.0 / COUNT(*) as email_completeness,
        COUNT_IF(phone IS NOT NULL) * 100.0 / COUNT(*) as phone_completeness,
        MAX(updated_at) as last_update_time,
        (unix_timestamp(current_timestamp()) - unix_timestamp(MAX(updated_at))) / 3600 as data_age_hours
    FROM LIVE.customer_360_profile
""")

Pattern 2: Contract Enforcement

Automated Contract Validation:

""" Contract validation in data pipelines. """ from typing import Dict, Any from pyspark.sql import DataFrame

class ContractValidator: """Validate data against contracts."""

def __init__(self, contract: DataContract):
    self.contract = contract

def validate(self, df: DataFrame) -> Dict[str, Any]:
    """
    Validate DataFrame against contract.

    Returns validation results with pass/fail status.
    """
    results = {
        "product": self.contract.product_name,
        "version": self.contract.version,
        "timestamp": datetime.now().isoformat(),
        "validations": []
    }

    # Schema validation
    try:
        self.contract.validate_schema(df)
        results["validations"].append({
            "check": "schema",
            "status": "PASS"
        })
    except ValueError as e:
        results["validations"].append({
            "check": "schema",
            "status": "FAIL",
            "error": str(e)
        })

    # Primary key validation
    pk_violations = (
        df.groupBy(self.contract.primary_keys)
        .count()
        .filter(col("count") > 1)
        .count()
    )

    results["validations"].append({
        "check": "primary_key_uniqueness",
        "status": "PASS" if pk_violations == 0 else "FAIL",
        "violations": pk_violations
    })

    # Quality rules validation
    for rule in self.contract.quality_rules:
        violations = self._check_quality_rule(df, rule)
        results["validations"].append({
            "check": rule["name"],
            "status": "PASS" if violations == 0 else "FAIL",
            "violations": violations
        })

    results["overall_status"] = (
        "PASS" if all(v["status"] == "PASS" for v in results["validations"])
        else "FAIL"
    )

    return results

def _check_quality_rule(self, df: DataFrame, rule: Dict[str, Any]) -> int:
    """Check individual quality rule."""
    if rule["rule"] == "unique":
        total = df.count()
        unique = df.select(rule["column"]).distinct().count()
        return total - unique

    elif rule["rule"] == "matches_regex":
        return df.filter(
            ~col(rule["column"]).rlike(rule["pattern"])
        ).count()

    return 0

Pattern 3: Product Versioning

Semantic Versioning for Data:

""" Data product versioning strategy. """

class ProductVersion: """Manage data product versions."""

def __init__(self, major: int, minor: int, patch: int):
    self.major = major
    self.minor = minor
    self.patch = patch

def __str__(self):
    return f"{self.major}.{self.minor}.{self.patch}"

def bump_major(self):
    """
    Increment major version (breaking changes).

    Examples:
    - Removing columns
    - Changing column types
    - Changing primary keys
    """
    return ProductVersion(self.major + 1, 0, 0)

def bump_minor(self):
    """
    Increment minor version (backward compatible changes).

    Examples:
    - Adding new columns
    - Adding new quality rules
    - Improving data quality
    """
    return ProductVersion(self.major, self.minor + 1, 0)

def bump_patch(self):
    """
    Increment patch version (bug fixes).

    Examples:
    - Fixing data quality issues
    - Correcting transformations
    - Performance improvements
    """
    return ProductVersion(self.major, self.minor, self.patch + 1)

def is_compatible(self, other: 'ProductVersion') -> bool:
    """
    Check if versions are compatible.

    Compatibility: same major version.
    """
    return self.major == other.major

Pattern 4: Self-Service Discovery

Product Discovery Interface:

""" Self-service data product discovery. """

class ProductDiscovery: """Enable self-service product discovery."""

def __init__(self, spark, catalog_table: str):
    self.spark = spark
    self.catalog_table = catalog_table

def list_products_by_domain(self, domain: str) -> DataFrame:
    """List all products in a domain."""
    return self.spark.sql(f"""
        SELECT
            name,
            version,
            description,
            owner_team,
            tags,
            documentation_url
        FROM {self.catalog_table}
        WHERE domain = '{domain}'
        ORDER BY name, version DESC
    """)

def get_product_lineage(self, product_name: str) -> Dict[str, Any]:
    """Get upstream and downstream dependencies."""
    lineage_query = f"""
    SELECT
        upstream_table,
        downstream_table,
        transformation_logic
    FROM system.access.table_lineage
    WHERE downstream_table LIKE '%{product_name}%'
       OR upstream_table LIKE '%{product_name}%'
    """

    lineage = self.spark.sql(lineage_query).collect()

    return {
        "product": product_name,
        "upstream": [row.upstream_table for row in lineage],
        "downstream": [row.downstream_table for row in lineage]
    }

def get_product_usage(
    self,
    product_name: str,
    days: int = 30
) -> DataFrame:
    """Get product usage statistics."""
    return self.spark.sql(f"""
    SELECT
        date_trunc('day', request_time) as date,
        user_name,
        COUNT(*) as query_count
    FROM system.access.audit
    WHERE table_name LIKE '%{product_name}%'
      AND request_time >= current_date() - INTERVAL {days} DAYS
    GROUP BY 1, 2
    ORDER BY 1 DESC, 3 DESC
    """)

Best Practices

  1. Product Organization

products/ ├── customer-360/ │ ├── product.yaml │ ├── contract.json │ ├── pipelines/ │ │ └── build_product.py │ ├── tests/ │ │ └── test_product.py │ └── docs/ │ └── README.md

  1. Contract Evolution
  • Major version: Breaking changes

  • Minor version: Backward compatible additions

  • Patch version: Bug fixes and improvements

  • Always maintain backward compatibility within major version

  • Deprecate fields before removing

  • Provide migration guides for major versions

  1. SLA Definition

Define realistic, measurable SLAs:

  • Freshness: Maximum data age

  • Availability: Uptime percentage

  • Completeness: Minimum non-null percentage

  • Accuracy: Data correctness threshold

  1. Ownership Model
  • Clear team ownership

  • On-call rotation for incidents

  • Documented escalation paths

  • Regular SLA reviews

Common Pitfalls to Avoid

Don't:

  • Create products without clear consumers

  • Skip contract definition

  • Ignore versioning strategy

  • Overcomplicate products

  • Neglect monitoring

Do:

  • Start with consumer needs

  • Define explicit contracts

  • Version semantically

  • Keep products focused

  • Monitor SLAs continuously

Complete Examples

See /examples/ directory for:

  • customer360_product.py : Complete data product

  • sales_analytics_product.py : Analytics product example

Related Skills

  • data-quality : Quality guarantees

  • delta-live-tables : Product pipelines

  • delta-sharing : Product distribution

  • cicd-workflows : Product deployment

References

  • Data Mesh Principles

  • Data Contracts

  • Data Product Design

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

databricks-asset-bundles

No summary provided by upstream source.

Repository SourceNeeds Review
General

medallion-architecture

No summary provided by upstream source.

Repository SourceNeeds Review
General

delta-live-tables

No summary provided by upstream source.

Repository SourceNeeds Review