data-engineering-quality

Data Quality and Testing

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-engineering-quality" with this command: npx skills add legout/data-platform-agent-skills/legout-data-platform-agent-skills-data-engineering-quality

Data Quality and Testing

Data validation and testing frameworks for ensuring pipeline correctness and data quality: Great Expectations (enterprise) and Pandera (lightweight). Integrates with orchestration tools for automated validation.

Quick Comparison

Feature Great Expectations Pandera

Approach Declarative "expectations" Schema definitions with checks

DataFrame Support Pandas, Spark, SQL, BigQuery Pandas, Polars, PySpark, Dask

Validation Output JSON results with detailed diagnostics Boolean or exception

Best For Enterprise data platforms, comprehensive profiling Python-centric pipelines, lightweight

Learning Curve Steeper (concepts: DataContext, Checkpoints) Lower (Python decorators/classes)

Integration CI/CD, Airflow, Prefect, Dagster pytest, FastAPI, any Python code

When to Use Which?

Great Expectations: You need comprehensive data documentation (data docs), profiling, and validation with rich reporting. Organizations with dedicated data quality teams.

Pandera: You're already in Python/Pandas/Polars ecosystem and want simple schema validation with type hints. Quick checks in ETL scripts or API responses.

Skill Dependencies

  • @data-engineering-core

  • Polars, DuckDB, Pandas basics

  • @data-engineering-orchestration

  • Integrate validation into workflows

Great Expectations (GX)

Installation

pip install great_expectations

For specific backends

pip install "great_expectations[spark]"

Quickstart

import great_expectations as gx import pandas as pd

Initialize context (creates gx/ directory if first time)

context = gx.get_context()

Create expectation suite

context.create_expectation_suite("my_suite")

Get validator

validator = context.get_validator( batch_request={ "datasource_name": "pandas", "data_asset_name": "my_data", }, expectation_suite_name="my_suite" )

Define expectations

validator.expect_column_values_to_not_be_null("id") validator.expect_column_values_to_be_between("value", min_value=0, max_value=1000) validator.expect_column_values_to_be_in_set("category", value_set=["A", "B", "C"]) validator.expect_column_values_to_match_strftime_format("date", strftime_format="%Y-%m-%d")

Validate

result = validator.validate() print(f"Success: {result.success}") if not result.success: print(f"Failed expectations: {result.results}")

Data Sources & Connectors

gx/contexts/<context>/datasources/pandas_datasource.yml

datasources: pandas_datasource: class_name: Datasource module_name: great_expectations.datasource execution_engine: module_name: great_expectations.execution_engine class_name: PandasExecutionEngine data_connectors: default_runtime_data_connector_name: class_name: RuntimeDataConnector batch_identifiers: - runtime_batch_identifier_name

Checkpoints (Validation Automation)

Create checkpoint

checkpoint_config = { "name": "my_checkpoint", "config_version": 1.0, "class_name": "SimpleCheckpoint", "validations": [ { "batch_request": { "datasource_name": "pandas", "data_connector_name": "default_runtime_data_connector_name", "data_asset_name": "my_data", }, "expectation_suite_name": "my_suite" } ] }

context.add_checkpoint(**checkpoint_config)

Run checkpoint

results = context.run_checkpoint(checkpoint_name="my_checkpoint")

Integration with Orchestrators

Prefect:

from prefect import flow, task import great_expectations as gx

@task def validate_data(df: pd.DataFrame, suite_name: str) -> bool: context = gx.get_context() validator = context.get_validator( batch_request={ "datasource_name": "pandas", "data_asset_name": "validation_data" }, expectation_suite_name=suite_name ) validator.add_batch(df, batch_identifier="batch_1") result = validator.validate() return result.success

@flow def pipeline_with_validation(): df = extract() if validate_data(df, "my_suite"): transformed = transform(df) load(transformed) else: raise ValueError("Data validation failed")

Dagster:

from dagster import asset import great_expectations as gx

@asset def validated_asset(df: pd.DataFrame) -> pd.DataFrame: context = gx.get_context() validator = context.add_or_edit_expectation_suite("asset_suite") # ... define expectations

validator.add_batch(df)
result = validator.validate()
if not result.success:
    raise Exception(f"Validation failed: {result}")
return df

Pandera: Lightweight Schema Validation

Installation

pip install pandera[pandas] # For pandas pip install pandera[polars] # For Polars pip install pandera[pyspark] # For PySpark

Basic Usage

import pandera as pa import pandas as pd

Define schema

schema = pa.DataFrameSchema({ "id": pa.Column(pa.Int, checks=pa.Check.gt(0)), "category": pa.Column(pa.String, checks=pa.Check.isin(["A", "B", "C"])), "value": pa.Column(pa.Float, checks=[ pa.Check.gt(0), pa.Check.lt(10000) ]), "date": pa.Column(pa.DateTime) })

Validate DataFrame

df = pd.DataFrame({ "id": [1, 2, 3], "category": ["A", "B", "A"], "value": [100.0, 200.0, 150.0], "date": pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03"]) })

validated = schema.validate(df) # Raises SchemaError if invalid print("Validation passed!")

Decorator pattern

@schema.validate def process_data(df: pd.DataFrame) -> pd.DataFrame: return df.groupby("category")["value"].sum().reset_index()

Custom Checks

Custom validation function

def custom_check(series: pd.Series) -> bool: return (series > 0).all()

schema = pa.DataFrameSchema({ "value": pa.Column(pa.Float, checks=custom_check) })

Or lambda

schema = pa.DataFrameSchema({ "value": pa.Column(pa.Float, checks=pa.Check(lambda x: x > 0)) })

Polars Integration

import pandera.polars as pa import polars as pl

schema = pa.DataFrameSchema({ "id": pa.Column(pl.Int64, pa.Check.gt(0)), "value": pa.Column(pa.Float64, pa.Check.in_range(0, 1000)) })

df = pl.DataFrame({"id": [1, 2], "value": [100.0, 200.0]}) validated = schema.validate(df)

Best Practices

  • ✅ Validate early - Check data quality immediately after extraction

  • ✅ Fail fast - Stop pipeline on validation failure (or route to quarantine)

  • ✅ Version your schemas - Store schema definitions in version control

  • ✅ Use both static and runtime checks - Static schema + dynamic checks (ranges, uniqueness)

  • ✅ Integrate with orchestration - Use Prefect/Dagster task dependencies for validation steps

  • ❌ Don't validate only at the end - catch issues early

  • ❌ Don't use try/except to ignore validation errors (unless intentional quarantine)

Testing Patterns

pytest Integration

import pytest import pandas as pd import pandera as pa

schema = pa.DataFrameSchema({ "id": pa.Column(pa.Int, pa.Check.gt(0)), "value": pa.Column(pa.Float) })

def test_transformation_output(): df = transform_function(source_df) schema.validate(df) # Will raise if invalid

@pytest.fixture def sample_data(): return pd.DataFrame({"id": [1, 2], "value": [10.0, 20.0]})

def test_pipeline(sample_data): result = pipeline.run(sample_data) assert len(result) > 0

References

  • Great Expectations Documentation

  • Pandera Documentation

  • pandera-polars

  • @data-engineering-core

  • Pipeline patterns with validation

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

data-science-eda

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

data-science-visualization

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

data-science-feature-engineering

No summary provided by upstream source.

Repository SourceNeeds Review