Databricks Development Guide
This skill provides guidance for Databricks SDK, Databricks Connect, CLI, and REST API.
SDK Documentation: https://databricks-sdk-py.readthedocs.io/en/latest/ GitHub Repository: https://github.com/databricks/databricks-sdk-py
Environment Setup
-
Use existing virtual environment at .venv or use uv to create one
-
For Spark operations: uv pip install databricks-connect
-
For SDK operations: uv pip install databricks-sdk
-
Databricks CLI version should be 0.278.0 or higher
Configuration
-
Default profile name: DEFAULT
-
Config file: ~/.databrickscfg
-
Environment variables: DATABRICKS_HOST , DATABRICKS_TOKEN
Databricks Connect (Spark Operations)
Use databricks-connect for running Spark code locally against a Databricks cluster.
from databricks.connect import DatabricksSession
Auto-detects 'DEFAULT' profile from ~/.databrickscfg
spark = DatabricksSession.builder.getOrCreate()
With explicit profile
spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()
Use spark as normal
df = spark.sql("SELECT * FROM catalog.schema.table") df.show()
IMPORTANT: Do NOT set .master("local[*]")
- this will cause issues with Databricks Connect.
Direct REST API Access
For operations not yet in SDK or overly complex via SDK, use direct REST API:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
Direct API call using authenticated client
response = w.api_client.do( method="GET", path="/api/2.0/clusters/list" )
POST with body
response = w.api_client.do( method="POST", path="/api/2.0/jobs/run-now", body={"job_id": 123} )
When to use: Prefer SDK methods when available. Use api_client.do for:
-
New API endpoints not yet in SDK
-
Complex operations where SDK abstraction is problematic
-
Debugging/testing raw API responses
Databricks CLI
Check version (should be >= 0.278.0)
databricks --version
Use specific profile
databricks --profile MY_PROFILE clusters list
Common commands
databricks clusters list databricks jobs list databricks workspace ls /Users/me
SDK Documentation Architecture
The SDK documentation follows a predictable URL pattern:
Base: https://databricks-sdk-py.readthedocs.io/en/latest/
Workspace APIs: /workspace/{category}/{service}.html Account APIs: /account/{category}/{service}.html Authentication: /authentication.html DBUtils: /dbutils.html
Workspace API Categories
Category Services
compute
clusters, cluster_policies, command_execution, instance_pools, libraries
catalog
catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations
jobs
jobs
sql
warehouses, statement_execution, queries, alerts, dashboards
serving
serving_endpoints
vectorsearch
vector_search_indexes, vector_search_endpoints
pipelines
pipelines
workspace
repos, secrets, workspace, git_credentials
files
files, dbfs
ml
experiments, model_registry
Authentication
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html
Environment Variables
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com DATABRICKS_TOKEN=dapi... # Personal Access Token
Code Patterns
Auto-detect credentials from environment
from databricks.sdk import WorkspaceClient w = WorkspaceClient()
Explicit token auth
w = WorkspaceClient( host="https://your-workspace.cloud.databricks.com", token="dapi..." )
Azure Service Principal
w = WorkspaceClient( host="https://adb-xxx.azuredatabricks.net", azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...", azure_tenant_id="tenant-id", azure_client_id="client-id", azure_client_secret="secret" )
Use a named profile from ~/.databrickscfg
w = WorkspaceClient(profile="MY_PROFILE")
Core API Reference
Clusters API
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html
List all clusters
for cluster in w.clusters.list(): print(f"{cluster.cluster_name}: {cluster.state}")
Get cluster details
cluster = w.clusters.get(cluster_id="0123-456789-abcdef")
Create a cluster (returns Wait object)
wait = w.clusters.create( cluster_name="my-cluster", spark_version=w.clusters.select_spark_version(latest=True), node_type_id=w.clusters.select_node_type(local_disk=True), num_workers=2 ) cluster = wait.result() # Wait for cluster to be running
Or use create_and_wait for blocking call
cluster = w.clusters.create_and_wait( cluster_name="my-cluster", spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", num_workers=2, timeout=timedelta(minutes=30) )
Start/stop/delete
w.clusters.start(cluster_id="...").result() w.clusters.stop(cluster_id="...") w.clusters.delete(cluster_id="...")
Jobs API
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html
from databricks.sdk.service.jobs import Task, NotebookTask
List jobs
for job in w.jobs.list(): print(f"{job.job_id}: {job.settings.name}")
Create a job
created = w.jobs.create( name="my-job", tasks=[ Task( task_key="main", notebook_task=NotebookTask(notebook_path="/Users/me/notebook"), existing_cluster_id="0123-456789-abcdef" ) ] )
Run a job now
run = w.jobs.run_now_and_wait(job_id=created.job_id) print(f"Run completed: {run.state.result_state}")
Get run output
output = w.jobs.get_run_output(run_id=run.run_id)
SQL Statement Execution
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html
Execute SQL query
response = w.statement_execution.execute_statement( warehouse_id="abc123", statement="SELECT * FROM catalog.schema.table LIMIT 10", wait_timeout="30s" )
Check status and get results
if response.status.state == StatementState.SUCCEEDED: for row in response.result.data_array: print(row)
For large results, fetch chunks
chunk = w.statement_execution.get_statement_result_chunk_n( statement_id=response.statement_id, chunk_index=0 )
SQL Warehouses
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html
List warehouses
for wh in w.warehouses.list(): print(f"{wh.name}: {wh.state}")
Get warehouse
warehouse = w.warehouses.get(id="abc123")
Create warehouse
created = w.warehouses.create_and_wait( name="my-warehouse", cluster_size="Small", max_num_clusters=1, auto_stop_mins=15 )
Start/stop
w.warehouses.start(id="abc123").result() w.warehouses.stop(id="abc123").result()
Unity Catalog - Tables
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html
List tables in a schema
for table in w.tables.list(catalog_name="main", schema_name="default"): print(f"{table.full_name}: {table.table_type}")
Get table info
table = w.tables.get(full_name="main.default.my_table") print(f"Columns: {[c.name for c in table.columns]}")
Check if table exists
exists = w.tables.exists(full_name="main.default.my_table")
Unity Catalog - Catalogs & Schemas
Doc (Catalogs): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html Doc (Schemas): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html
List catalogs
for catalog in w.catalogs.list(): print(catalog.name)
Create catalog
w.catalogs.create(name="my_catalog", comment="Description")
List schemas
for schema in w.schemas.list(catalog_name="main"): print(schema.name)
Create schema
w.schemas.create(name="my_schema", catalog_name="main")
Volumes
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html
from databricks.sdk.service.catalog import VolumeType
List volumes
for vol in w.volumes.list(catalog_name="main", schema_name="default"): print(f"{vol.full_name}: {vol.volume_type}")
Create managed volume
w.volumes.create( catalog_name="main", schema_name="default", name="my_volume", volume_type=VolumeType.MANAGED )
Read volume info
vol = w.volumes.read(name="main.default.my_volume")
Files API
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html
Upload file to volume
w.files.upload( file_path="/Volumes/main/default/my_volume/data.csv", contents=open("local_file.csv", "rb") )
Download file
with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f: content = f.read()
List directory contents
for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"): print(f"{entry.name}: {entry.is_directory}")
Upload/download with progress (parallel)
w.files.upload_from( file_path="/Volumes/main/default/my_volume/large.parquet", source_path="/local/path/large.parquet", use_parallel=True )
w.files.download_to( file_path="/Volumes/main/default/my_volume/large.parquet", destination="/local/output/", use_parallel=True )
Serving Endpoints (Model Serving)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html
List endpoints
for ep in w.serving_endpoints.list(): print(f"{ep.name}: {ep.state}")
Get endpoint
endpoint = w.serving_endpoints.get(name="my-endpoint")
Query endpoint
response = w.serving_endpoints.query( name="my-endpoint", inputs={"prompt": "Hello, world!"} )
For chat/completions endpoints
response = w.serving_endpoints.query( name="my-chat-endpoint", messages=[{"role": "user", "content": "Hello!"}] )
Get OpenAI-compatible client
openai_client = w.serving_endpoints.get_open_ai_client()
Vector Search
Doc (Indexes): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html Doc (Endpoints): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_endpoints.html
List vector search indexes
for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"): print(idx.name)
Query index
results = w.vector_search_indexes.query_index( index_name="main.default.my_index", columns=["id", "text", "embedding"], query_text="search query", num_results=10 ) for doc in results.result.data_array: print(doc)
Pipelines (Delta Live Tables)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html
List pipelines
for pipeline in w.pipelines.list_pipelines(): print(f"{pipeline.name}: {pipeline.state}")
Get pipeline
pipeline = w.pipelines.get(pipeline_id="abc123")
Start pipeline update
w.pipelines.start_update(pipeline_id="abc123")
Stop pipeline
w.pipelines.stop_and_wait(pipeline_id="abc123")
Secrets
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html
List secret scopes
for scope in w.secrets.list_scopes(): print(scope.name)
Create scope
w.secrets.create_scope(scope="my-scope")
Put secret
w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")
Get secret (returns GetSecretResponse with value)
secret = w.secrets.get_secret(scope="my-scope", key="api-key")
List secrets in scope (metadata only, not values)
for s in w.secrets.list_secrets(scope="my-scope"): print(s.key)
DBUtils
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html
Access dbutils through WorkspaceClient
dbutils = w.dbutils
File system operations
files = dbutils.fs.ls("/") dbutils.fs.cp("dbfs:/source", "dbfs:/dest") dbutils.fs.rm("dbfs:/path", recurse=True)
Secrets (same as w.secrets but dbutils interface)
value = dbutils.secrets.get(scope="my-scope", key="my-key")
Common Patterns
CRITICAL: Async Applications (FastAPI, etc.)
The Databricks SDK is fully synchronous. All calls block the thread. In async applications (FastAPI, asyncio), you MUST wrap SDK calls with asyncio.to_thread() to avoid blocking the event loop.
import asyncio from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
WRONG - blocks the event loop
async def get_clusters_bad(): return list(w.clusters.list()) # BLOCKS!
CORRECT - runs in thread pool
async def get_clusters_good(): return await asyncio.to_thread(lambda: list(w.clusters.list()))
CORRECT - for simple calls
async def get_cluster(cluster_id: str): return await asyncio.to_thread(w.clusters.get, cluster_id)
CORRECT - FastAPI endpoint
from fastapi import FastAPI app = FastAPI()
@app.get("/clusters") async def list_clusters(): clusters = await asyncio.to_thread(lambda: list(w.clusters.list())) return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query") async def run_query(sql: str, warehouse_id: str): # Wrap the blocking SDK call response = await asyncio.to_thread( w.statement_execution.execute_statement, statement=sql, warehouse_id=warehouse_id, wait_timeout="30s" ) return response.result.data_array
Note: WorkspaceClient().config.host is NOT a network call - it just reads config. No need to wrap property access.
Wait for Long-Running Operations
from datetime import timedelta
Pattern 1: Use *_and_wait methods
cluster = w.clusters.create_and_wait( cluster_name="test", spark_version="14.3.x-scala2.12", node_type_id="i3.xlarge", num_workers=2, timeout=timedelta(minutes=30) )
Pattern 2: Use Wait object
wait = w.clusters.create(...) cluster = wait.result() # Blocks until ready
Pattern 3: Manual polling with callback
def progress(cluster): print(f"State: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running( cluster_id="...", timeout=timedelta(minutes=30), callback=progress )
Pagination
All list methods return iterators that handle pagination automatically
for job in w.jobs.list(): # Fetches all pages print(job.settings.name)
For manual control
from databricks.sdk.service.jobs import ListJobsRequest response = w.jobs.list(limit=10) for job in response: print(job)
Error Handling
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists
try: cluster = w.clusters.get(cluster_id="invalid-id") except NotFound: print("Cluster not found") except PermissionDenied: print("Access denied")
When Uncertain
If I'm unsure about a method, I should:
Check the documentation URL pattern:
Common categories:
-
Clusters: /workspace/compute/clusters.html
-
Jobs: /workspace/jobs/jobs.html
-
Tables: /workspace/catalog/tables.html
-
Warehouses: /workspace/sql/warehouses.html
-
Serving: /workspace/serving/serving_endpoints.html
Fetch and verify before providing guidance on parameters or return types.
Quick Reference Links
API Documentation URL
Authentication https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html
Clusters https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html
Jobs https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html
SQL Warehouses https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html
Statement Execution https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html
Tables https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html
Catalogs https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html
Schemas https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html
Volumes https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html
Files https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html
Serving Endpoints https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html
Vector Search https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html
Pipelines https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html
Secrets https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html
DBUtils https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html