Data Quality and Testing
Data validation and testing frameworks for ensuring pipeline correctness and data quality: Great Expectations (enterprise) and Pandera (lightweight). Integrates with orchestration tools for automated validation.
Quick Comparison
Feature Great Expectations Pandera
Approach Declarative "expectations" Schema definitions with checks
DataFrame Support Pandas, Spark, SQL, BigQuery Pandas, Polars, PySpark, Dask
Validation Output JSON results with detailed diagnostics Boolean or exception
Best For Enterprise data platforms, comprehensive profiling Python-centric pipelines, lightweight
Learning Curve Steeper (concepts: DataContext, Checkpoints) Lower (Python decorators/classes)
Integration CI/CD, Airflow, Prefect, Dagster pytest, FastAPI, any Python code
When to Use Which?
Great Expectations: You need comprehensive data documentation (data docs), profiling, and validation with rich reporting. Organizations with dedicated data quality teams.
Pandera: You're already in Python/Pandas/Polars ecosystem and want simple schema validation with type hints. Quick checks in ETL scripts or API responses.
Skill Dependencies
-
@data-engineering-core
-
Polars, DuckDB, Pandas basics
-
@data-engineering-orchestration
-
Integrate validation into workflows
Great Expectations (GX)
Installation
pip install great_expectations
For specific backends
pip install "great_expectations[spark]"
Quickstart
import great_expectations as gx import pandas as pd
Initialize context (creates gx/ directory if first time)
context = gx.get_context()
Create expectation suite
context.create_expectation_suite("my_suite")
Get validator
validator = context.get_validator( batch_request={ "datasource_name": "pandas", "data_asset_name": "my_data", }, expectation_suite_name="my_suite" )
Define expectations
validator.expect_column_values_to_not_be_null("id") validator.expect_column_values_to_be_between("value", min_value=0, max_value=1000) validator.expect_column_values_to_be_in_set("category", value_set=["A", "B", "C"]) validator.expect_column_values_to_match_strftime_format("date", strftime_format="%Y-%m-%d")
Validate
result = validator.validate() print(f"Success: {result.success}") if not result.success: print(f"Failed expectations: {result.results}")
Data Sources & Connectors
gx/contexts/<context>/datasources/pandas_datasource.yml
datasources: pandas_datasource: class_name: Datasource module_name: great_expectations.datasource execution_engine: module_name: great_expectations.execution_engine class_name: PandasExecutionEngine data_connectors: default_runtime_data_connector_name: class_name: RuntimeDataConnector batch_identifiers: - runtime_batch_identifier_name
Checkpoints (Validation Automation)
Create checkpoint
checkpoint_config = { "name": "my_checkpoint", "config_version": 1.0, "class_name": "SimpleCheckpoint", "validations": [ { "batch_request": { "datasource_name": "pandas", "data_connector_name": "default_runtime_data_connector_name", "data_asset_name": "my_data", }, "expectation_suite_name": "my_suite" } ] }
context.add_checkpoint(**checkpoint_config)
Run checkpoint
results = context.run_checkpoint(checkpoint_name="my_checkpoint")
Integration with Orchestrators
Prefect:
from prefect import flow, task import great_expectations as gx
@task def validate_data(df: pd.DataFrame, suite_name: str) -> bool: context = gx.get_context() validator = context.get_validator( batch_request={ "datasource_name": "pandas", "data_asset_name": "validation_data" }, expectation_suite_name=suite_name ) validator.add_batch(df, batch_identifier="batch_1") result = validator.validate() return result.success
@flow def pipeline_with_validation(): df = extract() if validate_data(df, "my_suite"): transformed = transform(df) load(transformed) else: raise ValueError("Data validation failed")
Dagster:
from dagster import asset import great_expectations as gx
@asset def validated_asset(df: pd.DataFrame) -> pd.DataFrame: context = gx.get_context() validator = context.add_or_edit_expectation_suite("asset_suite") # ... define expectations
validator.add_batch(df)
result = validator.validate()
if not result.success:
raise Exception(f"Validation failed: {result}")
return df
Pandera: Lightweight Schema Validation
Installation
pip install pandera[pandas] # For pandas pip install pandera[polars] # For Polars pip install pandera[pyspark] # For PySpark
Basic Usage
import pandera as pa import pandas as pd
Define schema
schema = pa.DataFrameSchema({ "id": pa.Column(pa.Int, checks=pa.Check.gt(0)), "category": pa.Column(pa.String, checks=pa.Check.isin(["A", "B", "C"])), "value": pa.Column(pa.Float, checks=[ pa.Check.gt(0), pa.Check.lt(10000) ]), "date": pa.Column(pa.DateTime) })
Validate DataFrame
df = pd.DataFrame({ "id": [1, 2, 3], "category": ["A", "B", "A"], "value": [100.0, 200.0, 150.0], "date": pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03"]) })
validated = schema.validate(df) # Raises SchemaError if invalid print("Validation passed!")
Decorator pattern
@schema.validate def process_data(df: pd.DataFrame) -> pd.DataFrame: return df.groupby("category")["value"].sum().reset_index()
Custom Checks
Custom validation function
def custom_check(series: pd.Series) -> bool: return (series > 0).all()
schema = pa.DataFrameSchema({ "value": pa.Column(pa.Float, checks=custom_check) })
Or lambda
schema = pa.DataFrameSchema({ "value": pa.Column(pa.Float, checks=pa.Check(lambda x: x > 0)) })
Polars Integration
import pandera.polars as pa import polars as pl
schema = pa.DataFrameSchema({ "id": pa.Column(pl.Int64, pa.Check.gt(0)), "value": pa.Column(pa.Float64, pa.Check.in_range(0, 1000)) })
df = pl.DataFrame({"id": [1, 2], "value": [100.0, 200.0]}) validated = schema.validate(df)
Best Practices
-
✅ Validate early - Check data quality immediately after extraction
-
✅ Fail fast - Stop pipeline on validation failure (or route to quarantine)
-
✅ Version your schemas - Store schema definitions in version control
-
✅ Use both static and runtime checks - Static schema + dynamic checks (ranges, uniqueness)
-
✅ Integrate with orchestration - Use Prefect/Dagster task dependencies for validation steps
-
❌ Don't validate only at the end - catch issues early
-
❌ Don't use try/except to ignore validation errors (unless intentional quarantine)
Testing Patterns
pytest Integration
import pytest import pandas as pd import pandera as pa
schema = pa.DataFrameSchema({ "id": pa.Column(pa.Int, pa.Check.gt(0)), "value": pa.Column(pa.Float) })
def test_transformation_output(): df = transform_function(source_df) schema.validate(df) # Will raise if invalid
@pytest.fixture def sample_data(): return pd.DataFrame({"id": [1, 2], "value": [10.0, 20.0]})
def test_pipeline(sample_data): result = pipeline.run(sample_data) assert len(result) > 0
References
-
Great Expectations Documentation
-
Pandera Documentation
-
pandera-polars
-
@data-engineering-core
-
Pipeline patterns with validation