data-quality-frameworks

Data Quality Frameworks

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-quality-frameworks" with this command: npx skills add wshobson/agents/wshobson-agents-data-quality-frameworks

Data Quality Frameworks

Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.

When to Use This Skill

  • Implementing data quality checks in pipelines

  • Setting up Great Expectations validation

  • Building comprehensive dbt test suites

  • Establishing data contracts between teams

  • Monitoring data quality metrics

  • Automating data validation in CI/CD

Core Concepts

  1. Data Quality Dimensions

Dimension Description Example Check

Completeness No missing values expect_column_values_to_not_be_null

Uniqueness No duplicates expect_column_values_to_be_unique

Validity Values in expected range expect_column_values_to_be_in_set

Accuracy Data matches reality Cross-reference validation

Consistency No contradictions expect_column_pair_values_A_to_be_greater_than_B

Timeliness Data is recent expect_column_max_to_be_between

  1. Testing Pyramid for Data

       /\
      /  \     Integration Tests (cross-table)
     /────\
    /      \   Unit Tests (single column)
    

    /────────
    / \ Schema Tests (structure) /────────────\

Quick Start

Great Expectations Setup

Install

pip install great_expectations

Initialize project

great_expectations init

Create datasource

great_expectations datasource new

great_expectations/checkpoints/daily_validation.yml

import great_expectations as gx

Create context

context = gx.get_context()

Create expectation suite

suite = context.add_expectation_suite("orders_suite")

Add expectations

suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="order_id") )

Validate

results = context.run_checkpoint(checkpoint_name="daily_orders")

Patterns

Pattern 1: Great Expectations Suite

expectations/orders_suite.py

import great_expectations as gx from great_expectations.core import ExpectationSuite from great_expectations.core.expectation_configuration import ExpectationConfiguration

def build_orders_suite() -> ExpectationSuite: """Build comprehensive orders expectation suite"""

suite = ExpectationSuite(expectation_suite_name="orders_suite")

# Schema expectations
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_table_columns_to_match_set",
    kwargs={
        "column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
        "exact_match": False  # Allow additional columns
    }
))

# Primary key
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_unique",
    kwargs={"column": "order_id"}
))

# Foreign key
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "customer_id"}
))

# Categorical values
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_in_set",
    kwargs={
        "column": "status",
        "value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
    }
))

# Numeric ranges
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_between",
    kwargs={
        "column": "amount",
        "min_value": 0,
        "max_value": 100000,
        "strict_min": True  # amount > 0
    }
))

# Date validity
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_dateutil_parseable",
    kwargs={"column": "created_at"}
))

# Freshness - data should be recent
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_max_to_be_between",
    kwargs={
        "column": "created_at",
        "min_value": {"$PARAMETER": "now - timedelta(days=1)"},
        "max_value": {"$PARAMETER": "now"}
    }
))

# Row count sanity
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_table_row_count_to_be_between",
    kwargs={
        "min_value": 1000,  # Expect at least 1000 rows
        "max_value": 10000000
    }
))

# Statistical expectations
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_mean_to_be_between",
    kwargs={
        "column": "amount",
        "min_value": 50,
        "max_value": 500
    }
))

return suite

Pattern 2: Great Expectations Checkpoint

great_expectations/checkpoints/orders_checkpoint.yml

name: orders_checkpoint config_version: 1.0 class_name: Checkpoint run_name_template: "%Y%m%d-%H%M%S-orders-validation"

validations:

  • batch_request: datasource_name: warehouse data_connector_name: default_inferred_data_connector_name data_asset_name: orders data_connector_query: index: -1 # Latest batch expectation_suite_name: orders_suite

action_list:

  • name: store_validation_result action: class_name: StoreValidationResultAction

  • name: store_evaluation_parameters action: class_name: StoreEvaluationParametersAction

  • name: update_data_docs action: class_name: UpdateDataDocsAction

Slack notification on failure

  • name: send_slack_notification action: class_name: SlackNotificationAction slack_webhook: ${SLACK_WEBHOOK} notify_on: failure renderer: module_name: great_expectations.render.renderer.slack_renderer class_name: SlackRenderer

Run checkpoint

import great_expectations as gx

context = gx.get_context() result = context.run_checkpoint(checkpoint_name="orders_checkpoint")

if not result.success: failed_expectations = [ r for r in result.run_results.values() if not r.success ] raise ValueError(f"Data quality check failed: {failed_expectations}")

Pattern 3: dbt Data Tests

models/marts/core/_core__models.yml

version: 2

models:

  • name: fct_orders description: Order fact table tests:

    Table-level tests

    • dbt_utils.recency: datepart: day field: created_at interval: 1
    • dbt_utils.at_least_one
    • dbt_utils.expression_is_true: expression: "total_amount >= 0"

    columns:

    • name: order_id description: Primary key tests:

      • unique
      • not_null
    • name: customer_id description: Foreign key to dim_customers tests:

      • not_null
      • relationships: to: ref('dim_customers') field: customer_id
    • name: order_status tests:

      • accepted_values: values: ["pending", "processing", "shipped", "delivered", "cancelled"]
    • name: total_amount tests:

      • not_null
      • dbt_utils.expression_is_true: expression: ">= 0"
    • name: created_at tests:

      • not_null
      • dbt_utils.expression_is_true: expression: "<= current_timestamp"
  • name: dim_customers columns:

    • name: customer_id tests:

      • unique
      • not_null
    • name: email tests:

      • unique
      • not_null

      Custom regex test

      • dbt_utils.expression_is_true: expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'"

Pattern 4: Custom dbt Tests

-- tests/generic/test_row_count_in_range.sql {% test row_count_in_range(model, min_count, max_count) %}

with row_count as ( select count(*) as cnt from {{ model }} )

select cnt from row_count where cnt < {{ min_count }} or cnt > {{ max_count }}

{% endtest %}

-- Usage in schema.yml: -- tests: -- - row_count_in_range: -- min_count: 1000 -- max_count: 10000000

-- tests/generic/test_sequential_values.sql {% test sequential_values(model, column_name, interval=1) %}

with lagged as ( select {{ column_name }}, lag({{ column_name }}) over (order by {{ column_name }}) as prev_value from {{ model }} )

select * from lagged where {{ column_name }} - prev_value != {{ interval }} and prev_value is not null

{% endtest %}

-- tests/singular/assert_orders_customers_match.sql -- Singular test: specific business rule

with orders_customers as ( select distinct customer_id from {{ ref('fct_orders') }} ),

dim_customers as ( select customer_id from {{ ref('dim_customers') }} ),

orphaned_orders as ( select o.customer_id from orders_customers o left join dim_customers c using (customer_id) where c.customer_id is null )

select * from orphaned_orders -- Test passes if this returns 0 rows

Pattern 5: Data Contracts

contracts/orders_contract.yaml

apiVersion: datacontract.com/v1.0.0 kind: DataContract metadata: name: orders version: 1.0.0 owner: data-platform-team contact: data-team@company.com

info: title: Orders Data Contract description: Contract for order event data from the ecommerce platform purpose: Analytics, reporting, and ML features

servers: production: type: snowflake account: company.us-east-1 database: ANALYTICS schema: CORE

terms: usage: Internal analytics only limitations: PII must not be exposed in downstream marts billing: Charged per query TB scanned

schema: type: object properties: order_id: type: string format: uuid description: Unique order identifier required: true unique: true pii: false

customer_id:
  type: string
  format: uuid
  description: Customer identifier
  required: true
  pii: true
  piiClassification: indirect

total_amount:
  type: number
  minimum: 0
  maximum: 100000
  description: Order total in USD

created_at:
  type: string
  format: date-time
  description: Order creation timestamp
  required: true

status:
  type: string
  enum: [pending, processing, shipped, delivered, cancelled]
  description: Current order status

quality: type: SodaCL specification: checks for orders: - row_count > 0 - missing_count(order_id) = 0 - duplicate_count(order_id) = 0 - invalid_count(status) = 0: valid values: [pending, processing, shipped, delivered, cancelled] - freshness(created_at) < 24h

sla: availability: 99.9% freshness: 1 hour latency: 5 minutes

Pattern 6: Automated Quality Pipeline

quality_pipeline.py

from dataclasses import dataclass from typing import List, Dict, Any import great_expectations as gx from datetime import datetime

@dataclass class QualityResult: table: str passed: bool total_expectations: int failed_expectations: int details: List[Dict[str, Any]] timestamp: datetime

class DataQualityPipeline: """Orchestrate data quality checks across tables"""

def __init__(self, context: gx.DataContext):
    self.context = context
    self.results: List[QualityResult] = []

def validate_table(self, table: str, suite: str) -> QualityResult:
    """Validate a single table against expectation suite"""

    checkpoint_config = {
        "name": f"{table}_validation",
        "config_version": 1.0,
        "class_name": "Checkpoint",
        "validations": [{
            "batch_request": {
                "datasource_name": "warehouse",
                "data_asset_name": table,
            },
            "expectation_suite_name": suite,
        }],
    }

    result = self.context.run_checkpoint(**checkpoint_config)

    # Parse results
    validation_result = list(result.run_results.values())[0]
    results = validation_result.results

    failed = [r for r in results if not r.success]

    return QualityResult(
        table=table,
        passed=result.success,
        total_expectations=len(results),
        failed_expectations=len(failed),
        details=[{
            "expectation": r.expectation_config.expectation_type,
            "success": r.success,
            "observed_value": r.result.get("observed_value"),
        } for r in results],
        timestamp=datetime.now()
    )

def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
    """Run validation for all tables"""
    results = {}

    for table, suite in tables.items():
        print(f"Validating {table}...")
        results[table] = self.validate_table(table, suite)

    return results

def generate_report(self, results: Dict[str, QualityResult]) -> str:
    """Generate quality report"""
    report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]

    total_passed = sum(1 for r in results.values() if r.passed)
    total_tables = len(results)

    report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
    report.append("")

    for table, result in results.items():
        status = "✅" if result.passed else "❌"
        report.append(f"### {status} {table}")
        report.append(f"- Expectations: {result.total_expectations}")
        report.append(f"- Failed: {result.failed_expectations}")

        if not result.passed:
            report.append("- Failed checks:")
            for detail in result.details:
                if not detail["success"]:
                    report.append(f"  - {detail['expectation']}: {detail['observed_value']}")
        report.append("")

    return "\n".join(report)

Usage

context = gx.get_context() pipeline = DataQualityPipeline(context)

tables_to_validate = { "orders": "orders_suite", "customers": "customers_suite", "products": "products_suite", }

results = pipeline.run_all(tables_to_validate) report = pipeline.generate_report(results)

Fail pipeline if any table failed

if not all(r.passed for r in results.values()): print(report) raise ValueError("Data quality checks failed!")

Best Practices

Do's

  • Test early - Validate source data before transformations

  • Test incrementally - Add tests as you find issues

  • Document expectations - Clear descriptions for each test

  • Alert on failures - Integrate with monitoring

  • Version contracts - Track schema changes

Don'ts

  • Don't test everything - Focus on critical columns

  • Don't ignore warnings - They often precede failures

  • Don't skip freshness - Stale data is bad data

  • Don't hardcode thresholds - Use dynamic baselines

  • Don't test in isolation - Test relationships too

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

tailwind-design-system

Tailwind Design System (v4)

Repository Source
31.3K19K
wshobson
Automation

api-design-principles

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

nodejs-backend-patterns

No summary provided by upstream source.

Repository SourceNeeds Review