AI/ML Data Pipelines
Data engineering patterns for AI/ML workloads: embedding generation, vector databases, retrieval-augmented generation (RAG), LLM output monitoring, and batch inference. Covers LanceDB, pgvector, and OpenAI integrations.
When to Use These Patterns?
-
RAG Applications: Building chatbots, semantic search, question-answering
-
LLM Monitoring: Tracking token usage, latency, output quality
-
Embedding Pipelines: Generating and storing vector embeddings for ML models
-
Batch Inference: Large-scale model inference pipelines
-
Feature Stores: Versioned feature data for ML training/serving
Skill Dependencies
-
@data-engineering-core
-
Polars, DuckDB for data processing
-
@data-engineering-storage-remote-access
-
Cloud storage for embeddings and models
-
@data-engineering-orchestration
-
Schedule/batch embedding generation
-
@data-engineering-quality
-
Validate embedding quality
Detailed Guides
Embeddings
See: @data-engineering-ai-ml/embeddings.md
-
OpenAI embeddings API
-
Sentence Transformers (local models)
-
Batch processing with Polars
-
Chunking strategies for text
-
Token counting with tiktoken
Vector Databases
See: @data-engineering-ai-ml/vector-databases.md
-
LanceDB: Embedded, Arrow-native, scales from local to cloud
-
pgvector: PostgreSQL extension, ACID transactions
-
DuckDB: List type for simple cosine similarity
-
Indexing strategies (IVF_PQ, HNSW)
RAG Pipelines
See: @data-engineering-ai-ml/rag-pipelines.md
-
Document chunking (by tokens, paragraphs, semantic)
-
Context assembly with token budget
-
Retrieval: vector search + metadata filters
-
Prompt construction and LLM invocation
-
Source attribution
LLM Monitoring
See: @data-engineering-ai-ml/monitoring.md
-
Tracking API calls, costs, latencies
-
Prompt caching and deduplication
-
Error tracking and retry logic
-
Quality evaluation (human + automated)
Quick Start: Complete RAG Pipeline
import polars as pl import lancedb from sentence_transformers import SentenceTransformer
1. Generate embeddings
model = SentenceTransformer('all-MiniLM-L6-v2') df = pl.read_parquet("documents.parquet") df = df.with_columns([ pl.Series("embedding", model.encode(df["text"].to_list()).tolist()) ])
2. Store in LanceDB
db = lancedb.connect("./.lancedb") table = db.create_table("documents", df.to_arrow()) table.create_index( vector_column_name="embedding", metric="cosine", index_type="IVF_PQ", num_partitions=256, num_sub_vectors=96 )
3. Query
query_embedding = model.encode(["What is RAG?"])[0] results = ( table.search(query_embedding) .where("category = 'tech'") .limit(5) .to_pandas() )
print(f"Top result: {results.iloc[0]['text'][:200]}...")
Common Patterns
Batch Embedding Generation
from sentence_transformers import SentenceTransformer import polars as pl
model = SentenceTransformer('all-MiniLM-L6-v2')
Process in batches to avoid OOM
batch_size = 1000 reader = pl.read_csv_batched("large_corpus.csv", batch_size=batch_size)
embeddings = [] while (batches := reader.next_batches(1)): for batch in batches: batch_embeddings = model.encode(batch["text"].to_list()) embeddings.extend(batch_embeddings)
df = batch.with_columns(pl.Series("embedding", embeddings)) df.write_parquet("corpus_with_embeddings.parquet")
Vector Search with Filtering
import lancedb
db = lancedb.connect("./.lancedb") table = db.open_table("documents")
Hybrid search: vector + metadata filter
results = ( table.search(query_embedding) .where("date >= '2024-01-01' AND category IN ('tech', 'science')") .select(["id", "text", "date"]) # Only return needed columns .limit(10) .to_arrow() )
Cost Monitoring for LLMs
import duckdb from datetime import datetime
class LLMMonitor: def init(self, duckdb_path="llm_monitoring.db"): self.conn = duckdb.connect(duckdb_path) self._init_table()
def _init_table(self):
self.conn.sql("""
CREATE TABLE IF NOT EXISTS calls (
call_id VARCHAR PRIMARY KEY,
timestamp TIMESTAMP,
model VARCHAR,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
cost_usd DOUBLE,
latency_ms INTEGER
)
""")
def log_call(self, model: str, prompt_tokens: int, completion_tokens: int, latency_ms: int):
# Approximate cost calculation (update with real pricing)
cost_per_1k = {
"gpt-4o": 0.005,
"text-embedding-3-small": 0.00002
}.get(model, 0.001)
total_cost = (prompt_tokens + completion_tokens) / 1000 * cost_per_1k
call_id = f"{datetime.now().timestamp():.6f}-{model}"
self.conn.sql("""
INSERT INTO calls VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", [
call_id,
datetime.now().isoformat(),
model,
prompt_tokens,
completion_tokens,
prompt_tokens + completion_tokens,
total_cost,
latency_ms
])
monitor = LLMMonitor() monitor.log_call("gpt-4o", 100, 50, 1200)
Best Practices
-
✅ Batch embedding generation - Use batch API calls, not one-by-one
-
✅ Index after bulk load - Build vector indexes on full dataset, not incremental
-
✅ Use appropriate embedding dimensions - Smaller = cheaper/faster, larger = more accurate
-
✅ Monitor token usage - Track costs, set quotas
-
✅ Cache prompts & results - Avoid duplicate API calls
-
✅ Human evaluation - Automated metrics (cosine similarity) ≠ quality
-
❌ Don't store embeddings without metadata - need source attribution
-
❌ Don't use cosine similarity for multi-modal vectors - use L2 or dot product
-
❌ Don't skip chunking - Very long documents need splitting for embeddings
Performance Tips
-
Embedding models: Run locally for privacy/offline, or OpenAI for convenience
-
Vector DB: LanceDB for embedded use; pgvector for transactional; specialized (Pinecone, Weaviate) for scale
-
Index tuning: IVF_PQ for disk efficiency, HNSW for recall
-
Query parameters: Tune nprobes (accuracy) and refine_factor (reranking) based on latency/recall tradeoff
References
-
LanceDB Documentation
-
Sentence Transformers
-
OpenAI Embeddings Guide
-
pgvector Documentation
-
@data-engineering-storage-lakehouse
-
Versioned storage for models/embeddings