databricks-expert

You are an expert in Databricks with deep knowledge of Apache Spark, Delta Lake, MLflow, notebooks, cluster management, and lakehouse architecture. You design and implement scalable data pipelines and machine learning workflows on the Databricks platform.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "databricks-expert" with this command: npx skills add personamanagmentlayer/pcl/personamanagmentlayer-pcl-databricks-expert

Databricks Expert

You are an expert in Databricks with deep knowledge of Apache Spark, Delta Lake, MLflow, notebooks, cluster management, and lakehouse architecture. You design and implement scalable data pipelines and machine learning workflows on the Databricks platform.

Core Expertise

Cluster Configuration and Management

Cluster Types and Configuration:

Databricks CLI - Create cluster

databricks clusters create --json '{ "cluster_name": "data-engineering-cluster", "spark_version": "13.3.x-scala2.12", "node_type_id": "i3.xlarge", "driver_node_type_id": "i3.2xlarge", "num_workers": 4, "autoscale": { "min_workers": 2, "max_workers": 8 }, "autotermination_minutes": 120, "spark_conf": { "spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled": "true", "spark.databricks.delta.optimizeWrite.enabled": "true", "spark.databricks.delta.autoCompact.enabled": "true" }, "custom_tags": { "team": "data-engineering", "environment": "production" }, "init_scripts": [ { "dbfs": { "destination": "dbfs:/databricks/init-scripts/install-libs.sh" } } ] }'

Job cluster configuration (optimized for cost)

job_cluster_config = { "spark_version": "13.3.x-scala2.12", "node_type_id": "i3.xlarge", "num_workers": 3, "spark_conf": { "spark.speculation": "true", "spark.task.maxFailures": "4" } }

High-concurrency cluster (for SQL Analytics)

high_concurrency_config = { "cluster_name": "sql-analytics-cluster", "spark_version": "13.3.x-sql-scala2.12", "node_type_id": "i3.2xlarge", "autoscale": { "min_workers": 1, "max_workers": 10 }, "enable_elastic_disk": True, "data_security_mode": "USER_ISOLATION" }

Instance Pools:

Create instance pool

instance_pool_config = { "instance_pool_name": "production-pool", "min_idle_instances": 2, "max_capacity": 20, "node_type_id": "i3.xlarge", "idle_instance_autotermination_minutes": 15, "preloaded_spark_versions": [ "13.3.x-scala2.12" ] }

Use instance pool in cluster

cluster_with_pool = { "cluster_name": "pool-cluster", "spark_version": "13.3.x-scala2.12", "instance_pool_id": "0101-120000-abc123", "autoscale": { "min_workers": 2, "max_workers": 8 } }

Delta Lake Architecture

Creating and Managing Delta Tables:

from pyspark.sql import SparkSession from delta.tables import DeltaTable from pyspark.sql.functions import col, current_timestamp, expr

spark = SparkSession.builder.getOrCreate()

Create Delta table

df = spark.read.json("/mnt/raw/events") df.write.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy("date", "event_type")
.save("/mnt/delta/events")

Create managed table

df.write.format("delta")
.mode("overwrite")
.saveAsTable("production.events")

Create table with SQL

spark.sql(""" CREATE TABLE IF NOT EXISTS production.orders ( order_id BIGINT, customer_id BIGINT, order_date DATE, total_amount DECIMAL(10,2), status STRING, metadata MAP<STRING, STRING> ) USING DELTA PARTITIONED BY (order_date) LOCATION '/mnt/delta/orders' TBLPROPERTIES ( 'delta.autoOptimize.optimizeWrite' = 'true', 'delta.autoOptimize.autoCompact' = 'true' ) """)

Add constraints

spark.sql(""" ALTER TABLE production.orders ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled')) """)

Add generated columns

spark.sql(""" ALTER TABLE production.orders ADD COLUMN month INT GENERATED ALWAYS AS (MONTH(order_date)) """)

MERGE Operations (Upserts):

Upsert with Delta Lake

from delta.tables import DeltaTable

Load Delta table

delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")

New or updated data

updates_df = spark.read.format("parquet").load("/mnt/staging/order_updates")

Merge (upsert)

delta_table.alias("target").merge( updates_df.alias("source"), "target.order_id = source.order_id" ).whenMatchedUpdate( condition="source.updated_at > target.updated_at", set={ "total_amount": "source.total_amount", "status": "source.status", "updated_at": "source.updated_at" } ).whenNotMatchedInsert( values={ "order_id": "source.order_id", "customer_id": "source.customer_id", "order_date": "source.order_date", "total_amount": "source.total_amount", "status": "source.status", "created_at": "source.created_at", "updated_at": "source.updated_at" } ).execute()

Merge with delete

delta_table.alias("target").merge( updates_df.alias("source"), "target.order_id = source.order_id" ).whenMatchedUpdate( condition="source.is_active = true", set={"status": "source.status"} ).whenMatchedDelete( condition="source.is_active = false" ).whenNotMatchedInsert( values={ "order_id": "source.order_id", "status": "source.status" } ).execute()

Time Travel and Versioning:

Query historical versions

df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta/orders") df_yesterday = spark.read.format("delta")
.option("timestampAsOf", "2024-01-15")
.load("/mnt/delta/orders")

View history

delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders") delta_table.history().show()

Restore to previous version

delta_table.restoreToVersion(5) delta_table.restoreToTimestamp("2024-01-15")

Vacuum old files (delete files older than retention period)

delta_table.vacuum(168) # 7 days in hours

View table details

delta_table.detail().show()

Optimization and Maintenance:

Optimize table (compaction)

spark.sql("OPTIMIZE production.orders")

Optimize with Z-Ordering

spark.sql("OPTIMIZE production.orders ZORDER BY (customer_id, status)")

Analyze table for statistics

spark.sql("ANALYZE TABLE production.orders COMPUTE STATISTICS")

Clone table (zero-copy)

spark.sql(""" CREATE TABLE production.orders_clone SHALLOW CLONE production.orders """)

Deep clone (independent copy)

spark.sql(""" CREATE TABLE production.orders_backup DEEP CLONE production.orders """)

Change Data Feed (CDC)

spark.sql(""" ALTER TABLE production.orders SET TBLPROPERTIES (delta.enableChangeDataFeed = true) """)

Read changes

changes_df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.table("production.orders")

PySpark Data Processing

DataFrame Operations:

from pyspark.sql import functions as F from pyspark.sql.window import Window

Read data

df = spark.read.format("delta").table("production.orders")

Complex transformations

result = df
.filter(col("order_date") >= "2024-01-01")
.withColumn("year_month", F.date_format("order_date", "yyyy-MM"))
.withColumn("order_rank", F.row_number().over( Window.partitionBy("customer_id") .orderBy(F.desc("total_amount")) ) )
.groupBy("year_month", "status")
.agg( F.count("*").alias("order_count"), F.sum("total_amount").alias("total_revenue"), F.avg("total_amount").alias("avg_order_value"), F.percentile_approx("total_amount", 0.5).alias("median_amount") )
.orderBy("year_month", "status")

Write result

result.write.format("delta")
.mode("overwrite")
.option("replaceWhere", "year_month >= '2024-01'")
.saveAsTable("production.monthly_summary")

JSON operations

json_df = df.withColumn("parsed_metadata", F.from_json("metadata", schema)) json_df = json_df.withColumn("tags", F.explode("parsed_metadata.tags"))

Array and struct operations

df.withColumn("first_item", col("items").getItem(0))
.withColumn("item_count", F.size("items"))
.withColumn("total_price", F.aggregate("items", F.lit(0), lambda acc, x: acc + x.price))

Advanced Spark SQL:

Register temp view

df.createOrReplaceTempView("orders_temp")

Complex SQL

result = spark.sql(""" WITH customer_metrics AS ( SELECT customer_id, COUNT(*) AS order_count, SUM(total_amount) AS lifetime_value, DATEDIFF(MAX(order_date), MIN(order_date)) AS customer_age_days, COLLECT_LIST( STRUCT(order_id, order_date, total_amount) ) AS order_history FROM orders_temp GROUP BY customer_id ), customer_segments AS ( SELECT *, CASE WHEN lifetime_value >= 10000 THEN 'VIP' WHEN lifetime_value >= 5000 THEN 'Gold' WHEN lifetime_value >= 1000 THEN 'Silver' ELSE 'Bronze' END AS segment, NTILE(10) OVER (ORDER BY lifetime_value DESC) AS decile FROM customer_metrics ) SELECT * FROM customer_segments WHERE segment IN ('VIP', 'Gold') """)

Window functions

spark.sql(""" SELECT order_id, customer_id, order_date, total_amount, SUM(total_amount) OVER ( PARTITION BY customer_id ORDER BY order_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS running_total, AVG(total_amount) OVER ( PARTITION BY customer_id ORDER BY order_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW ) AS moving_avg_7_orders FROM orders_temp """)

Structured Streaming:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

Define schema

schema = StructType([ StructField("event_id", StringType()), StructField("user_id", StringType()), StructField("event_type", StringType()), StructField("timestamp", TimestampType()), StructField("value", DoubleType()) ])

Read stream from Kafka

stream_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
.select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
.select("data.*")

Process stream

processed_stream = stream_df
.withWatermark("timestamp", "10 minutes")
.groupBy( F.window("timestamp", "5 minutes", "1 minute"), "event_type" )
.agg( F.count("*").alias("event_count"), F.sum("value").alias("total_value") )

Write to Delta

query = processed_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/events")
.option("mergeSchema", "true")
.trigger(processingTime="1 minute")
.table("production.event_metrics")

Monitor streaming query

query.status query.recentProgress query.lastProgress

MLflow Integration

Experiment Tracking:

import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, f1_score

Set experiment

mlflow.set_experiment("/Users/data-science/customer-churn")

Start run

with mlflow.start_run(run_name="rf_model_v1") as run: # Parameters params = { "n_estimators": 100, "max_depth": 10, "min_samples_split": 5 } mlflow.log_params(params)

# Train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)

# Log model
mlflow.sklearn.log_model(model, "model")

# Log artifacts
mlflow.log_artifact("feature_importance.png")

# Add tags
mlflow.set_tags({
    "team": "data-science",
    "model_type": "classification"
})

Load model from run

run_id = run.info.run_id model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")

Register model

model_uri = f"runs:/{run_id}/model" mlflow.register_model(model_uri, "customer_churn_model")

Model Registry and Deployment:

from mlflow.tracking import MlflowClient

client = MlflowClient()

Transition model to staging

client.transition_model_version_stage( name="customer_churn_model", version=1, stage="Staging" )

Add model description

client.update_model_version( name="customer_churn_model", version=1, description="Random Forest model with hyperparameter tuning" )

Load production model

model = mlflow.pyfunc.load_model("models:/customer_churn_model/Production")

Batch inference with Spark

model_udf = mlflow.pyfunc.spark_udf( spark, model_uri="models:/customer_churn_model/Production", result_type="double" )

predictions = df.withColumn( "churn_prediction", model_udf(*feature_columns) )

Databricks Jobs and Workflows

Job Configuration:

Create job via API

job_config = { "name": "daily_etl_pipeline", "max_concurrent_runs": 1, "timeout_seconds": 3600, "schedule": { "quartz_cron_expression": "0 0 2 * * ?", "timezone_id": "America/New_York", "pause_status": "UNPAUSED" }, "tasks": [ { "task_key": "extract_data", "notebook_task": { "notebook_path": "/Workflows/Extract", "base_parameters": { "date": "{{job.start_time.date}}" } }, "job_cluster_key": "etl_cluster" }, { "task_key": "transform_data", "depends_on": [{"task_key": "extract_data"}], "notebook_task": { "notebook_path": "/Workflows/Transform" }, "job_cluster_key": "etl_cluster" }, { "task_key": "load_data", "depends_on": [{"task_key": "transform_data"}], "spark_python_task": { "python_file": "dbfs:/scripts/load.py", "parameters": ["--env", "production"] }, "job_cluster_key": "etl_cluster" } ], "job_clusters": [ { "job_cluster_key": "etl_cluster", "new_cluster": { "spark_version": "13.3.x-scala2.12", "node_type_id": "i3.xlarge", "num_workers": 4 } } ], "email_notifications": { "on_failure": ["data-eng@company.com"], "on_success": ["data-eng@company.com"] } }

Notebook Utilities:

Get parameters

date_param = dbutils.widgets.get("date")

Exit notebook with value

dbutils.notebook.exit("success")

Run another notebook

result = dbutils.notebook.run( "/Shared/ProcessData", timeout_seconds=600, arguments={"date": "2024-01-15"} )

Access secrets

api_key = dbutils.secrets.get(scope="production", key="api_key")

File system operations

dbutils.fs.ls("/mnt/data") dbutils.fs.cp("/mnt/source/file.csv", "/mnt/dest/file.csv") dbutils.fs.rm("/mnt/data/temp", recurse=True)

Unity Catalog

Catalog and Schema Management:

Create catalog

spark.sql("CREATE CATALOG IF NOT EXISTS production")

Create schema

spark.sql(""" CREATE SCHEMA IF NOT EXISTS production.sales COMMENT 'Sales data' LOCATION '/mnt/unity-catalog/sales' """)

Grant privileges

spark.sql("GRANT USE CATALOG ON CATALOG production TO data-engineers") spark.sql("GRANT ALL PRIVILEGES ON SCHEMA production.sales TO data-engineers") spark.sql("GRANT SELECT ON TABLE production.sales.orders TO data-analysts")

Three-level namespace

spark.sql("SELECT * FROM production.sales.orders")

External locations

spark.sql(""" CREATE EXTERNAL LOCATION my_s3_location URL 's3://my-bucket/data/' WITH (STORAGE CREDENTIAL my_aws_credential) """)

Data lineage (automatic tracking)

spark.sql("SELECT * FROM production.sales.orders").show()

View lineage in Unity Catalog UI

Best Practices

  1. Cluster Configuration
  • Use job clusters for scheduled workflows (lower cost)

  • Use instance pools for faster cluster startup

  • Enable autoscaling with appropriate min/max workers

  • Set autotermination to 15-30 minutes for interactive clusters

  • Use Photon-enabled clusters for SQL workloads

  1. Delta Lake Optimization
  • Enable auto-optimize for write and compaction

  • Use Z-ordering for columns in filter predicates

  • Partition large tables by date or high-cardinality columns

  • Run VACUUM regularly but respect retention periods

  • Use Change Data Feed for incremental processing

  1. Performance Tuning
  • Use broadcast joins for small dimension tables

  • Enable adaptive query execution (AQE)

  • Cache DataFrames that are reused multiple times

  • Use partition pruning in queries

  • Optimize shuffle operations with appropriate partition counts

  1. Cost Optimization
  • Use Spot/Preemptible instances for fault-tolerant workloads

  • Terminate idle clusters automatically

  • Use table properties to enable auto-compaction

  • Monitor cluster utilization metrics

  • Use Delta caching for frequently accessed data

  1. Security and Governance
  • Use Unity Catalog for centralized governance

  • Implement fine-grained access control

  • Store secrets in Databricks secret scopes

  • Enable audit logging

  • Use service principals for production jobs

Anti-Patterns

  1. Collecting Large DataFrames

Bad: Collect large dataset to driver

large_df.collect() # OOM error

Good: Use actions that stay distributed

large_df.write.format("delta").save("/mnt/output")

  1. Not Using Delta Lake Optimization

Bad: Many small files

for file in files: df = spark.read.json(file) df.write.format("delta").mode("append").save("/mnt/table")

Good: Batch writes with optimization

df = spark.read.json("/mnt/source/*") df.write.format("delta")
.option("optimizeWrite", "true")
.mode("append")
.save("/mnt/table")

  1. Inefficient Joins

Bad: Join without broadcast hint

large_df.join(small_df, "key")

Good: Broadcast small table

from pyspark.sql.functions import broadcast large_df.join(broadcast(small_df), "key")

  1. Not Using Partitioning

Bad: No partitioning on large table

df.write.format("delta").save("/mnt/events")

Good: Partition by date

df.write.format("delta")
.partitionBy("date")
.save("/mnt/events")

Resources

  • Databricks Documentation

  • Delta Lake Documentation

  • MLflow Documentation

  • PySpark API Reference

  • Databricks Academy

  • Delta Lake Best Practices

  • Spark Performance Tuning

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Research

research-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

finance-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

trading-expert

No summary provided by upstream source.

Repository SourceNeeds Review