databricks-performance-tuning

Databricks Performance Tuning

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "databricks-performance-tuning" with this command: npx skills add jeremylongshore/claude-code-plugins-plus-skills/jeremylongshore-claude-code-plugins-plus-skills-databricks-performance-tuning

Databricks Performance Tuning

Overview

Optimize Databricks cluster, Spark, and Delta Lake performance.

Prerequisites

  • Access to cluster configuration

  • Understanding of workload characteristics

  • Query history access

Instructions

Step 1: Cluster Sizing

Cluster sizing calculator

def recommend_cluster_size( data_size_gb: float, complexity: str = "medium", # low, medium, high parallelism_need: str = "standard", # standard, high ) -> dict: """ Recommend cluster configuration based on workload.

Args:
    data_size_gb: Estimated data size to process
    complexity: Query/transform complexity
    parallelism_need: Required parallelism level

Returns:
    Recommended cluster configuration
"""
# Memory per executor (standard DS3_v2 = 14GB)
memory_per_worker = 14

# Base calculation
workers_by_data = max(1, int(data_size_gb / memory_per_worker / 2))

# Adjust for complexity
complexity_multiplier = {"low": 1, "medium": 1.5, "high": 2.5}
workers = int(workers_by_data * complexity_multiplier.get(complexity, 1.5))

# Adjust for parallelism
if parallelism_need == "high":
    workers = max(workers, 8)

return {
    "node_type_id": "Standard_DS3_v2",
    "num_workers": workers,
    "autoscale": {
        "min_workers": max(1, workers // 2),
        "max_workers": workers * 2,
    },
    "spark_conf": {
        "spark.sql.shuffle.partitions": str(workers * 4),
        "spark.default.parallelism": str(workers * 4),
    }
}

Step 2: Spark Configuration Optimization

Optimized Spark configurations by workload type

spark_configs = { "etl_batch": { # Memory and parallelism "spark.sql.shuffle.partitions": "200", # HTTP 200 OK "spark.default.parallelism": "200", # HTTP 200 OK "spark.sql.files.maxPartitionBytes": "134217728", # 134217728: 128MB

    # Delta Lake optimizations
    "spark.databricks.delta.optimizeWrite.enabled": "true",
    "spark.databricks.delta.autoCompact.enabled": "true",
    "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",

    # Adaptive query execution
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    "spark.sql.adaptive.skewJoin.enabled": "true",
},

"ml_training": {
    # Memory for ML workloads
    "spark.driver.memory": "16g",
    "spark.executor.memory": "16g",
    "spark.memory.fraction": "0.8",
    "spark.memory.storageFraction": "0.3",

    # Serialization for ML
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.kryoserializer.buffer.max": "1024m",
},

"streaming": {
    # Streaming configurations
    "spark.sql.streaming.schemaInference": "true",
    "spark.sql.streaming.checkpointLocation": "/mnt/checkpoints",
    "spark.databricks.delta.autoCompact.minNumFiles": "10",

    # Micro-batch tuning
    "spark.sql.streaming.forEachBatch.enabled": "true",
},

"interactive": {
    # Fast startup
    "spark.databricks.cluster.profile": "singleNode",
    "spark.master": "local[*]",

    # Caching
    "spark.sql.inMemoryColumnarStorage.compressed": "true",
    "spark.sql.inMemoryColumnarStorage.batchSize": "10000",  # 10000: 10 seconds in ms
}

}

Step 3: Delta Lake Optimization

delta_optimization.py

from pyspark.sql import SparkSession

def optimize_delta_table( spark: SparkSession, table_name: str, z_order_columns: list[str] = None, vacuum_hours: int = 168, # 7 days ) -> dict: """ Optimize Delta table for query performance.

Args:
    spark: SparkSession
    table_name: Fully qualified table name
    z_order_columns: Columns for Z-ordering (max 4)
    vacuum_hours: Retention period for vacuum

Returns:
    Optimization results
"""
results = {}

# 1. Run OPTIMIZE with Z-ordering
if z_order_columns:
    z_order_clause = ", ".join(z_order_columns[:4])  # Max 4 columns
    spark.sql(f"OPTIMIZE {table_name} ZORDER BY ({z_order_clause})")
    results["z_order"] = z_order_columns
else:
    spark.sql(f"OPTIMIZE {table_name}")

results["optimized"] = True

# 2. Analyze table statistics
spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
results["statistics_computed"] = True

# 3. Vacuum old files
spark.sql(f"VACUUM {table_name} RETAIN {vacuum_hours} HOURS")
results["vacuumed"] = True

# 4. Get table metrics
detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
results["metrics"] = {
    "num_files": detail.numFiles,
    "size_bytes": detail.sizeInBytes,
    "partitions": detail.partitionColumns,
}

return results

def enable_liquid_clustering( spark: SparkSession, table_name: str, cluster_columns: list[str], ) -> None: """ Enable Liquid Clustering for automatic data layout optimization.

Liquid Clustering replaces traditional partitioning and Z-ordering
with automatic, incremental clustering.
"""
columns = ", ".join(cluster_columns)
spark.sql(f"""
    ALTER TABLE {table_name}
    CLUSTER BY ({columns})
""")

def enable_predictive_optimization( spark: SparkSession, table_name: str, ) -> None: """Enable Databricks Predictive Optimization.""" spark.sql(f""" ALTER TABLE {table_name} SET TBLPROPERTIES ( 'delta.enableDeletionVectors' = 'true', 'delta.targetFileSize' = '104857600' # 104857600 = configured value ) """)

Step 4: Query Performance Analysis

-- Find slow queries from query history SELECT query_id, query_text, duration / 1000 as seconds, # 1000: 1 second in ms rows_produced, bytes_read, start_time FROM system.query.history WHERE duration > 60000 -- > 60 seconds # 60000: 1 minute in ms AND start_time > current_timestamp() - INTERVAL 24 HOURS ORDER BY duration DESC LIMIT 20;

-- Analyze query plan EXPLAIN FORMATTED SELECT * FROM main.silver.orders WHERE order_date > '2024-01-01' # 2024 year AND region = 'US';

-- Check table scan statistics SELECT table_name, SUM(bytes_read) / 1024 / 1024 / 1024 as gb_read, # 1024: 1 KB SUM(rows_produced) as total_rows, COUNT(*) as query_count FROM system.query.history WHERE start_time > current_timestamp() - INTERVAL 7 DAYS GROUP BY table_name ORDER BY gb_read DESC;

Step 5: Caching Strategy

Intelligent caching for repeated queries

from pyspark.sql import DataFrame from functools import lru_cache

class CacheManager: """Manage Spark DataFrame caching."""

def __init__(self, spark: SparkSession):
    self.spark = spark
    self._cache_registry = {}

def cache_table(
    self,
    table_name: str,
    cache_level: str = "MEMORY_AND_DISK",
) -> DataFrame:
    """Cache table with specified storage level."""
    if table_name in self._cache_registry:
        return self._cache_registry[table_name]

    df = self.spark.table(table_name)

    if cache_level == "MEMORY_ONLY":
        df.cache()
    elif cache_level == "MEMORY_AND_DISK":
        from pyspark import StorageLevel
        df.persist(StorageLevel.MEMORY_AND_DISK)
    elif cache_level == "DISK_ONLY":
        from pyspark import StorageLevel
        df.persist(StorageLevel.DISK_ONLY)

    # Trigger caching
    df.count()

    self._cache_registry[table_name] = df
    return df

def uncache_all(self):
    """Clear all cached DataFrames."""
    for df in self._cache_registry.values():
        df.unpersist()
    self._cache_registry.clear()
    self.spark.catalog.clearCache()

Delta Cache (automatic)

Enable in cluster config:

"spark.databricks.io.cache.enabled": "true"

"spark.databricks.io.cache.maxDiskUsage": "50g"

Step 6: Join Optimization

from pyspark.sql import DataFrame from pyspark.sql.functions import broadcast

def optimize_join( df_large: DataFrame, df_small: DataFrame, join_key: str, small_table_threshold_mb: int = 100, ) -> DataFrame: """ Optimize join based on table sizes.

Uses broadcast join for small tables,
sort-merge join for large tables.
"""
# Estimate small table size
small_size_mb = df_small.count() * 100 / 1024 / 1024  # 1024: rough estimate

if small_size_mb < small_table_threshold_mb:
    # Broadcast join (small table fits in memory)
    return df_large.join(broadcast(df_small), join_key)
else:
    # Sort-merge join with bucketing hint
    return df_large.join(df_small, join_key, "inner")

Bucketed tables for frequent joins

def create_bucketed_table( spark: SparkSession, df: DataFrame, table_name: str, bucket_columns: list[str], num_buckets: int = 100, ): """Create bucketed table for join optimization.""" ( df.write .bucketBy(num_buckets, *bucket_columns) .sortBy(*bucket_columns) .saveAsTable(table_name) )

Output

  • Optimized cluster configuration

  • Tuned Spark settings

  • Optimized Delta tables

  • Improved query performance

Error Handling

Issue Cause Solution

OOM errors Insufficient memory Increase executor memory or reduce partition size

Skewed data Uneven distribution Use salting or AQE skew handling

Slow joins Large shuffle Use broadcast for small tables

Too many files Small files problem Run OPTIMIZE regularly

Examples

Performance Benchmark

import time

def benchmark_query(spark, query: str, runs: int = 3) -> dict: """Benchmark query execution time.""" times = [] for _ in range(runs): spark.catalog.clearCache() start = time.time() spark.sql(query).collect() times.append(time.time() - start)

return {
    "min": min(times),
    "max": max(times),
    "avg": sum(times) / len(times),
    "runs": runs,
}

Resources

  • Databricks Performance Guide

  • Delta Lake Optimization

  • Spark Tuning Guide

Next Steps

For cost optimization, see databricks-cost-tuning .

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

optimizing-deep-learning-models

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

fastapi-ml-endpoint

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

analyzing-text-with-nlp

No summary provided by upstream source.

Repository SourceNeeds Review