data-pipeline-processor

Data Pipeline Processor

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-pipeline-processor" with this command: npx skills add vamseeachanta/workspace-hub/vamseeachanta-workspace-hub-data-pipeline-processor

Data Pipeline Processor

Version: 1.1.0 Category: Development Last Updated: 2026-01-02

Process data files through transformation pipelines with validation, encoding detection, and multi-format export capabilities.

Quick Start

import pandas as pd from pathlib import Path

Simple pipeline: Load -> Transform -> Export

df = pd.read_csv("data/raw/source.csv")

Transform

df = df[df['value'] > 0] # Filter df['date'] = pd.to_datetime(df['date']) # Convert types df = df.sort_values('date') # Sort

Export

Path("data/processed").mkdir(parents=True, exist_ok=True) df.to_csv("data/processed/cleaned.csv", index=False)

print(f"Processed {len(df)} rows")

When to Use

  • Processing CSV/Excel/JSON files with validation

  • Data cleaning and transformation workflows

  • Batch file processing with aggregation

  • Handling encoding issues (UTF-8, Latin-1 fallback)

  • ETL (Extract, Transform, Load) operations

  • Data quality checks and reporting

Core Pattern

Input (CSV/Excel/JSON) -> Validate -> Transform -> Analyze -> Export

Implementation

Data Reader with Encoding Detection

import pandas as pd from pathlib import Path from typing import Any, Dict, List, Optional, Union import logging import chardet

logger = logging.getLogger(name)

class DataReader: """Read data files with automatic encoding detection."""

SUPPORTED_FORMATS = ['csv', 'xlsx', 'xls', 'json', 'parquet']

def __init__(self, encoding_fallback: List[str] = None):
    """
    Initialize data reader.

    Args:
        encoding_fallback: List of encodings to try in order
    """
    self.encoding_fallback = encoding_fallback or ['utf-8', 'latin-1', 'cp1252']

def read(self, file_path: str, **kwargs) -> pd.DataFrame:
    """
    Read data file with automatic format and encoding detection.

    Args:
        file_path: Path to data file
        **kwargs: Additional arguments for pandas readers

    Returns:
        DataFrame with loaded data
    """
    path = Path(file_path)
    suffix = path.suffix.lower().lstrip('.')

    if suffix == 'csv':
        return self._read_csv(path, **kwargs)
    elif suffix in ['xlsx', 'xls']:
        return self._read_excel(path, **kwargs)
    elif suffix == 'json':
        return pd.read_json(path, **kwargs)
    elif suffix == 'parquet':
        return pd.read_parquet(path, **kwargs)
    else:
        raise ValueError(f"Unsupported format: {suffix}")

def _read_csv(self, path: Path, **kwargs) -> pd.DataFrame:
    """Read CSV with encoding fallback."""
    # Try to detect encoding
    with open(path, 'rb') as f:
        raw = f.read(10000)
        detected = chardet.detect(raw)
        detected_encoding = detected.get('encoding', 'utf-8')

    # Try detected encoding first, then fallbacks
    encodings_to_try = [detected_encoding] + self.encoding_fallback

    for encoding in encodings_to_try:
        try:
            df = pd.read_csv(path, encoding=encoding, **kwargs)
            logger.info(f"Successfully read {path} with encoding: {encoding}")
            return df
        except UnicodeDecodeError:
            continue

    raise ValueError(f"Could not decode {path} with any encoding")

def _read_excel(self, path: Path, **kwargs) -> pd.DataFrame:
    """Read Excel file."""
    return pd.read_excel(path, **kwargs)

Data Validator

from dataclasses import dataclass, field from typing import Callable, List, Dict, Any

@dataclass class ValidationResult: """Result of data validation.""" is_valid: bool errors: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) stats: Dict[str, Any] = field(default_factory=dict)

class DataValidator: """Validate data against configurable rules."""

def __init__(self):
    self.rules: List[Callable] = []

def add_rule(self, rule: Callable[[pd.DataFrame], ValidationResult]):
    """Add a validation rule."""
    self.rules.append(rule)

def validate(self, df: pd.DataFrame) -> ValidationResult:
    """Run all validation rules."""
    all_errors = []
    all_warnings = []
    all_stats = {}

    for rule in self.rules:
        result = rule(df)
        all_errors.extend(result.errors)
        all_warnings.extend(result.warnings)
        all_stats.update(result.stats)

    return ValidationResult(
        is_valid=len(all_errors) == 0,
        errors=all_errors,
        warnings=all_warnings,
        stats=all_stats
    )

Common validation rules

def required_columns_rule(required: List[str]) -> Callable: """Validate required columns exist.""" def rule(df: pd.DataFrame) -> ValidationResult: missing = [col for col in required if col not in df.columns] return ValidationResult( is_valid=len(missing) == 0, errors=[f"Missing required column: {col}" for col in missing], stats={'columns_found': len(df.columns)} ) return rule

def no_duplicates_rule(subset: List[str] = None) -> Callable: """Validate no duplicate rows.""" def rule(df: pd.DataFrame) -> ValidationResult: duplicates = df.duplicated(subset=subset).sum() return ValidationResult( is_valid=duplicates == 0, warnings=[f"Found {duplicates} duplicate rows"] if duplicates > 0 else [], stats={'duplicate_count': duplicates} ) return rule

def non_null_rule(columns: List[str]) -> Callable: """Validate specified columns have no null values.""" def rule(df: pd.DataFrame) -> ValidationResult: errors = [] stats = {} for col in columns: if col in df.columns: null_count = df[col].isnull().sum() stats[f'{col}_nulls'] = null_count if null_count > 0: errors.append(f"Column '{col}' has {null_count} null values") return ValidationResult( is_valid=len(errors) == 0, errors=errors, stats=stats ) return rule

Data Transformer

class DataTransformer: """Apply transformations to data."""

def __init__(self, df: pd.DataFrame):
    self.df = df.copy()

def rename_columns(self, mapping: Dict[str, str]) -> 'DataTransformer':
    """Rename columns."""
    self.df = self.df.rename(columns=mapping)
    return self

def filter_rows(self, expression: str) -> 'DataTransformer':
    """Filter rows using query expression."""
    self.df = self.df.query(expression)
    return self

def select_columns(self, columns: List[str]) -> 'DataTransformer':
    """Select specific columns."""
    self.df = self.df[columns]
    return self

def drop_columns(self, columns: List[str]) -> 'DataTransformer':
    """Drop specified columns."""
    self.df = self.df.drop(columns=columns, errors='ignore')
    return self

def fill_nulls(self, value: Any = None, method: str = None) -> 'DataTransformer':
    """Fill null values."""
    if method:
        self.df = self.df.fillna(method=method)
    else:
        self.df = self.df.fillna(value)
    return self

def convert_types(self, type_mapping: Dict[str, str]) -> 'DataTransformer':
    """Convert column types."""
    for col, dtype in type_mapping.items():
        if col in self.df.columns:
            if dtype == 'datetime':
                self.df[col] = pd.to_datetime(self.df[col])
            elif dtype == 'numeric':
                self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
            else:
                self.df[col] = self.df[col].astype(dtype)
    return self

def add_column(self, name: str, expression: Callable) -> 'DataTransformer':
    """Add computed column."""
    self.df[name] = expression(self.df)
    return self

def aggregate(self, group_by: List[str], agg_spec: Dict[str, Any]) -> 'DataTransformer':
    """Aggregate data by groups."""
    self.df = self.df.groupby(group_by).agg(agg_spec).reset_index()
    return self

def sort(self, by: List[str], ascending: bool = True) -> 'DataTransformer':
    """Sort data."""
    self.df = self.df.sort_values(by=by, ascending=ascending)
    return self

def get_result(self) -> pd.DataFrame:
    """Get transformed DataFrame."""
    return self.df

Data Exporter

class DataExporter: """Export data to various formats."""

@staticmethod
def to_csv(df: pd.DataFrame, path: str, **kwargs) -> str:
    """Export to CSV."""
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(path, index=False, **kwargs)
    return path

@staticmethod
def to_excel(df: pd.DataFrame, path: str, sheet_name: str = 'Sheet1', **kwargs) -> str:
    """Export to Excel."""
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    df.to_excel(path, sheet_name=sheet_name, index=False, **kwargs)
    return path

@staticmethod
def to_json(df: pd.DataFrame, path: str, orient: str = 'records', **kwargs) -> str:
    """Export to JSON."""
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    df.to_json(path, orient=orient, **kwargs)
    return path

@staticmethod
def to_parquet(df: pd.DataFrame, path: str, **kwargs) -> str:
    """Export to Parquet."""
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(path, **kwargs)
    return path

Pipeline Orchestrator

from dataclasses import dataclass from typing import List, Dict, Any, Optional

@dataclass class PipelineConfig: """Configuration for data pipeline.""" input_path: str output_path: str input_options: Dict[str, Any] = field(default_factory=dict) validation: Dict[str, Any] = field(default_factory=dict) transformations: List[Dict[str, Any]] = field(default_factory=list) output_format: str = 'csv' output_options: Dict[str, Any] = field(default_factory=dict)

class DataPipeline: """Orchestrate data processing pipeline."""

def __init__(self, config: PipelineConfig):
    self.config = config
    self.reader = DataReader()
    self.validator = DataValidator()
    self.exporter = DataExporter()

def _setup_validation(self):
    """Configure validation rules from config."""
    validation = self.config.validation

    if 'required_columns' in validation:
        self.validator.add_rule(
            required_columns_rule(validation['required_columns'])
        )

    if 'no_duplicates' in validation:
        self.validator.add_rule(
            no_duplicates_rule(validation.get('no_duplicates_subset'))
        )

    if 'non_null_columns' in validation:
        self.validator.add_rule(
            non_null_rule(validation['non_null_columns'])
        )

def _apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
    """Apply configured transformations."""
    transformer = DataTransformer(df)

    for transform in self.config.transformations:
        op = transform['operation']

        if op == 'rename':
            transformer.rename_columns(transform['mapping'])
        elif op == 'filter':
            transformer.filter_rows(transform['expression'])
        elif op == 'select':
            transformer.select_columns(transform['columns'])
        elif op == 'drop':
            transformer.drop_columns(transform['columns'])
        elif op == 'fill_nulls':
            transformer.fill_nulls(
                value=transform.get('value'),
                method=transform.get('method')
            )
        elif op == 'convert_types':
            transformer.convert_types(transform['types'])
        elif op == 'aggregate':
            transformer.aggregate(
                group_by=transform['group_by'],
                agg_spec=transform['aggregations']
            )
        elif op == 'sort':
            transformer.sort(
                by=transform['by'],
                ascending=transform.get('ascending', True)
            )

    return transformer.get_result()

def run(self) -> Dict[str, Any]:
    """Execute the pipeline."""
    logger.info(f"Starting pipeline: {self.config.input_path}")

    # Read input
    df = self.reader.read(self.config.input_path, **self.config.input_options)
    logger.info(f"Loaded {len(df)} rows")

    # Validate
    self._setup_validation()
    validation_result = self.validator.validate(df)

    if not validation_result.is_valid:
        logger.error(f"Validation failed: {validation_result.errors}")
        return {
            'status': 'failed',
            'stage': 'validation',
            'errors': validation_result.errors,
            'warnings': validation_result.warnings
        }

    if validation_result.warnings:
        logger.warning(f"Validation warnings: {validation_result.warnings}")

    # Transform
    df = self._apply_transformations(df)
    logger.info(f"Transformed to {len(df)} rows")

    # Export
    export_method = getattr(self.exporter, f'to_{self.config.output_format}')
    output_path = export_method(df, self.config.output_path, **self.config.output_options)
    logger.info(f"Exported to {output_path}")

    return {
        'status': 'success',
        'input_rows': len(df),
        'output_rows': len(df),
        'output_path': output_path,
        'validation_stats': validation_result.stats,
        'warnings': validation_result.warnings
    }

YAML Configuration Format

Basic Pipeline Config

config/pipelines/data_clean.yaml

input: path: data/raw/source.csv options: delimiter: "," skiprows: 1

validation: required_columns: - id - timestamp - value non_null_columns: - id - value no_duplicates: true no_duplicates_subset: - id

transformations:

  • operation: rename mapping: old_name: new_name date_col: timestamp

  • operation: filter expression: "value > 0 and status != 'invalid'"

  • operation: convert_types types: timestamp: datetime value: numeric

  • operation: fill_nulls value: 0

  • operation: sort by: [timestamp] ascending: true

output: path: data/processed/cleaned.csv format: csv options: index: false

Aggregation Pipeline

config/pipelines/monthly_summary.yaml

input: path: data/processed/daily_data.csv

validation: required_columns: - date - category - amount

transformations:

  • operation: convert_types types: date: datetime

  • operation: aggregate group_by: [category] aggregations: amount: - sum - mean - count

  • operation: rename mapping: amount_sum: total_amount amount_mean: average_amount amount_count: transaction_count

  • operation: sort by: [total_amount] ascending: false

output: path: data/results/monthly_summary.csv format: csv

Usage Examples

Example 1: Simple CSV Processing

Process CSV with config

python -m data_pipeline config/pipelines/clean_data.yaml

Override input/output

python -m data_pipeline config/pipelines/clean_data.yaml
--input data/custom_input.csv
--output data/custom_output.csv

Dry run (validate only)

python -m data_pipeline config/pipelines/clean_data.yaml --dry-run

Example 2: Programmatic Usage

from data_pipeline import DataPipeline, PipelineConfig

config = PipelineConfig( input_path='data/raw/sales.csv', output_path='data/processed/sales_clean.csv', validation={ 'required_columns': ['date', 'product', 'amount'], 'non_null_columns': ['amount'] }, transformations=[ {'operation': 'filter', 'expression': 'amount > 0'}, {'operation': 'sort', 'by': ['date']} ] )

pipeline = DataPipeline(config) result = pipeline.run() print(f"Processed {result['output_rows']} rows")

Example 3: Batch Processing

from pathlib import Path from data_pipeline import DataReader, DataTransformer, DataExporter

reader = DataReader() exporter = DataExporter()

Process all CSV files in directory

input_dir = Path('data/raw/') output_dir = Path('data/processed/')

for csv_file in input_dir.glob('*.csv'): df = reader.read(str(csv_file))

# Apply transformations
df_clean = (DataTransformer(df)
    .fill_nulls(value=0)
    .filter_rows('value > 0')
    .sort(['timestamp'])
    .get_result())

# Export
output_path = output_dir / csv_file.name
exporter.to_csv(df_clean, str(output_path))
print(f"Processed: {csv_file.name}")

Example 4: Multi-Format Export

def export_all_formats(df: pd.DataFrame, base_path: str): """Export data to multiple formats.""" exporter = DataExporter()

outputs = {
    'csv': exporter.to_csv(df, f"{base_path}.csv"),
    'json': exporter.to_json(df, f"{base_path}.json"),
    'parquet': exporter.to_parquet(df, f"{base_path}.parquet"),
    'excel': exporter.to_excel(df, f"{base_path}.xlsx")
}

return outputs

Best Practices

Do

  • Always detect encoding before reading CSV

  • Use chunked reading for large files (>100MB)

  • Specify dtypes to reduce memory usage

  • Handle missing values explicitly

  • Validate early in the pipeline

  • Fail fast on critical errors

  • Log warnings for non-critical issues

  • Track validation statistics

Don't

  • Assume encoding is always UTF-8

  • Load entire large files into memory

  • Skip validation steps

  • Ignore encoding errors

  • Mix transformation and validation

Data Reading

  • Always detect encoding before reading CSV

  • Use chunked reading for large files (>100MB)

  • Specify dtypes to reduce memory usage

  • Handle missing values explicitly

Validation

  • Validate early in the pipeline

  • Fail fast on critical errors

  • Log warnings for non-critical issues

  • Track validation statistics

Transformation

  • Use method chaining for readability

  • Apply filters before expensive operations

  • Convert types early to catch errors

  • Document transformation logic

Export

  • Create output directories automatically

  • Use appropriate formats (Parquet for large data)

  • Include metadata in output

  • Verify output integrity

File Organization

project/ config/ pipelines/ # Pipeline configs clean_data.yaml aggregate.yaml data/ raw/ # Raw input data processed/ # Cleaned data results/ # Analysis results src/ data_pipeline/ # Pipeline code scripts/ run_pipeline.sh # CLI wrapper

Error Handling

Common Errors

Error Cause Solution

UnicodeDecodeError

Wrong encoding Use DataReader with encoding fallback

KeyError

Missing column Check column names in config

ValueError

Type conversion failed Use errors='coerce' or validate first

MemoryError

File too large Use chunked reading

FileNotFoundError

Input file missing Verify file path

Error Template

def safe_pipeline_run(config: PipelineConfig) -> dict: """Run pipeline with comprehensive error handling.""" try: # Validate input exists if not Path(config.input_path).exists(): return {'status': 'error', 'stage': 'input', 'message': 'File not found'}

    pipeline = DataPipeline(config)
    return pipeline.run()

except UnicodeDecodeError as e:
    return {'status': 'error', 'stage': 'read', 'message': f'Encoding error: {e}'}
except KeyError as e:
    return {'status': 'error', 'stage': 'transform', 'message': f'Missing column: {e}'}
except Exception as e:
    return {'status': 'error', 'stage': 'unknown', 'message': str(e)}

Execution Checklist

  • Input file exists and is readable

  • Encoding detected or specified

  • Required columns present

  • Validation rules configured

  • Transformations in correct order

  • Output directory exists or is created

  • Export format appropriate for data size

  • Error handling covers all failure modes

  • Logging configured for debugging

Metrics

Metric Target Description

Read Time <1s per 100MB Data loading speed

Validation Time <500ms Rule checking duration

Transform Time Varies Depends on operations

Export Time <1s per 100MB File writing speed

Memory Usage <2x file size Peak memory consumption

Related Skills

  • yaml-workflow-executor - Workflow orchestration

  • engineering-report-generator - Report generation

  • parallel-file-processor - Parallel file operations

Version History

  • 1.1.0 (2026-01-02): Upgraded to SKILL_TEMPLATE_v2 format with Quick Start, Error Handling, Metrics, Execution Checklist, additional examples

  • 1.0.0 (2024-10-15): Initial release with DataReader, DataValidator, DataTransformer, pipeline orchestration

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

echarts

No summary provided by upstream source.

Repository SourceNeeds Review
General

pandoc

No summary provided by upstream source.

Repository SourceNeeds Review
General

mkdocs

No summary provided by upstream source.

Repository SourceNeeds Review
General

gis

No summary provided by upstream source.

Repository SourceNeeds Review