ml-data-pipeline-architecture

ML Data Pipeline Architecture

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "ml-data-pipeline-architecture" with this command: npx skills add terrylica/cc-skills/terrylica-cc-skills-ml-data-pipeline-architecture

ML Data Pipeline Architecture

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.

ADR: 2026-01-22-polars-preference-hook (efficiency preferences framework)

Note: A PreToolUse hook enforces Polars preference. To use Pandas, add # polars-exception: <reason> at file top.

When to Use This Skill

Use this skill when:

  • Deciding between Polars and Pandas for a data pipeline

  • Optimizing memory usage with zero-copy Arrow patterns

  • Loading data from ClickHouse into PyTorch DataLoaders

  • Implementing lazy evaluation for large datasets

  • Migrating existing Pandas code to Polars

  1. Decision Tree: Polars vs Pandas

Dataset size? ├─ < 1M rows → Pandas OK (simpler API, richer ecosystem) ├─ 1M-10M rows → Consider Polars (2-5x faster, less memory) └─ > 10M rows → Use Polars (required for memory efficiency)

Operations? ├─ Simple transforms → Either works ├─ Group-by aggregations → Polars 5-10x faster ├─ Complex joins → Polars with lazy evaluation └─ Streaming/chunked → Polars scan_* functions

Integration? ├─ scikit-learn heavy → Pandas (better interop) ├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor) └─ ClickHouse source → Arrow stream → Polars (optimal)

  1. Zero-Copy Pipeline Architecture

The Problem with Pandas

BAD: 3 memory copies

df = pd.read_sql(query, conn) # Copy 1: DB → pandas X = df[features].values # Copy 2: pandas → numpy tensor = torch.from_numpy(X) # Copy 3: numpy → tensor

Peak memory: 3x data size

The Solution with Arrow

GOOD: 0-1 memory copies

import clickhouse_connect import polars as pl import torch

client = clickhouse_connect.get_client(...) arrow_table = client.query_arrow("SELECT * FROM bars") # Arrow in DB memory df = pl.from_arrow(arrow_table) # Zero-copy view X = df.select(features).to_numpy() # Single allocation tensor = torch.from_numpy(X) # View (no copy)

Peak memory: 1.2x data size

  1. ClickHouse Integration Patterns

Pattern A: Arrow Stream (Recommended)

def query_arrow(client, query: str) -> pl.DataFrame: """ClickHouse → Arrow → Polars (zero-copy chain).""" arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream") return pl.from_arrow(arrow_table)

Usage

df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")

Pattern B: Polars Native (Simpler)

Polars has native ClickHouse support (see pola.rs for version requirements)

df = pl.read_database_uri( query="SELECT * FROM bars", uri="clickhouse://user:pass@host/db" )

Pattern C: Parquet Export (Batch Jobs)

For reproducible batch processing

client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet") df = pl.scan_parquet("data.parquet") # Lazy, memory-mapped

  1. PyTorch DataLoader Integration

Minimal Change Pattern

from torch.utils.data import TensorDataset, DataLoader

Accept both pandas and polars

def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]: if isinstance(df, pd.DataFrame): df = pl.from_pandas(df)

X = df.select(features).to_numpy()
y = df.select(target).to_numpy()

return (
    torch.from_numpy(X).float(),
    torch.from_numpy(y).float()
)

X, y = prepare_data(df) dataset = TensorDataset(X, y) loader = DataLoader(dataset, batch_size=32, pin_memory=True)

Custom PolarsDataset (Large Data)

class PolarsDataset(torch.utils.data.Dataset): """Memory-efficient dataset from Polars DataFrame."""

def __init__(self, df: pl.DataFrame, features: list[str], target: str):
    self.arrow = df.to_arrow()  # Arrow backing for zero-copy slicing
    self.features = features
    self.target = target

def __len__(self) -> int:
    return self.arrow.num_rows

def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
    row = self.arrow.slice(idx, 1)
    x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
    y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
    return x, y

5. Lazy Evaluation Patterns

Pipeline Composition

Define transformations lazily (no computation yet)

pipeline = ( pl.scan_parquet("raw_data.parquet") .filter(pl.col("timestamp") >= start_date) .with_columns([ (pl.col("close").pct_change()).alias("returns"), (pl.col("volume").log()).alias("log_volume"), ]) .select(features + [target]) )

Execute only when needed

train_df = pipeline.filter(pl.col("timestamp") < split_date).collect() test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()

Streaming for Large Files

Process file in chunks (never loads full file)

def process_large_file(path: str, chunk_size: int = 100_000): reader = pl.scan_parquet(path)

for batch in reader.iter_batches(n_rows=chunk_size):
    # Process each chunk
    features = compute_features(batch)
    yield features.to_numpy()

6. Schema Validation

Pydantic for Config

from pydantic import BaseModel, field_validator

class FeatureConfig(BaseModel): features: list[str] target: str seq_len: int = 15

@field_validator("features")
@classmethod
def validate_features(cls, v):
    required = {"returns_vs", "momentum_z", "atr_pct"}
    missing = required - set(v)
    if missing:
        raise ValueError(f"Missing required features: {missing}")
    return v

DataFrame Schema Validation

def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None: """Fail-fast schema validation.""" missing = [c for c in required if c not in df.columns] if missing: raise ValueError( f"[{stage}] Missing columns: {missing}\n" f"Available: {sorted(df.columns)}" )

  1. Performance Benchmarks

Operation Pandas Polars Speedup

Read CSV (1GB) 45s 4s 11x

Filter rows 2.1s 0.4s 5x

Group-by agg 3.8s 0.3s 13x

Sort 5.2s 0.4s 13x

Memory peak 10GB 2.5GB 4x

Benchmark: 50M rows, 20 columns, MacBook M2

  1. Migration Checklist

Phase 1: Add Arrow Support

  • Add polars = "<version>" to dependencies (see PyPI)

  • Implement query_arrow() in data client

  • Verify zero-copy with memory profiler

Phase 2: Polars at Entry Points

  • Add pl.from_pandas() wrapper at trainer entry

  • Update prepare_sequences() to accept both types

  • Add schema validation after conversion

Phase 3: Full Lazy Evaluation

  • Convert file reads to pl.scan_*

  • Compose transformations lazily

  • Call .collect() only before .to_numpy()

  1. Anti-Patterns to Avoid

DON'T: Mix APIs Unnecessarily

BAD: Convert back and forth

df_polars = pl.from_pandas(df_pandas) df_pandas_again = df_polars.to_pandas() # Why?

DON'T: Collect Too Early

BAD: Defeats lazy evaluation

df = pl.scan_parquet("data.parquet").collect() # Full load filtered = df.filter(...) # After the fact

GOOD: Filter before collect

df = pl.scan_parquet("data.parquet").filter(...).collect()

DON'T: Ignore Memory Pressure

BAD: Loads entire file

df = pl.read_parquet("huge_file.parquet")

GOOD: Stream in chunks

for batch in pl.scan_parquet("huge_file.parquet").iter_batches(): process(batch)

References

  • Polars User Guide

  • Polars Migration Guide

  • Apache Arrow Python

  • ClickHouse Python Client

  • PyTorch Data Loading

  • Polars Preference Hook ADR

Troubleshooting

Issue Cause Solution

Memory spike during load Collecting too early Use lazy evaluation, call collect() only when needed

Arrow conversion fails Unsupported data type Check for object columns, convert to native types

ClickHouse connection error Wrong port or credentials Verify host:8123 (HTTP) or host:9000 (native)

Zero-copy not working Intermediate pandas conversion Remove to_pandas() calls, stay in Arrow/Polars

Polars hook blocking code Pandas used without exception Add # polars-exception: reason comment at file top

Slow group-by operations Using pandas for large datasets Migrate to Polars for 5-10x speedup

Schema validation failure Column names case-sensitive Verify exact column names from source

PyTorch DataLoader OOM Loading full dataset into memory Use PolarsDataset with Arrow backing for lazy access

Parquet scan performance Not using predicate pushdown Add filters before collect() for lazy evaluation

Type mismatch in tensor Float64 vs Float32 mismatch Explicitly cast with .cast(pl.Float32) before numpy

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

pandoc-pdf-generation

No summary provided by upstream source.

Repository SourceNeeds Review
General

mql5-indicator-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

mise-tasks

No summary provided by upstream source.

Repository SourceNeeds Review
General

semantic-release

No summary provided by upstream source.

Repository SourceNeeds Review