Lakeflow Spark Declarative Pipelines (SDP)
IMPORTANT: If this is a new pipeline (one does not already exist), see Quick Start. Be sure to use whatever language user has specified only (Python or SQL). Be sure to use Databricks Asset Bundles for new projects.
Critical Rules (always follow)
-
MUST confirm language as Python or SQL. Stick with that language unless told otherwise.
-
MUST if not modifying an existing pipeline, use Quick Start below.
-
MUST create serverless pipelines by default. ** Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.
Required Steps
Copy this checklist and verify each item:
- Language selected: Python or SQL
- Compute type decided: serverless or classic compute
- Decide on multiple catalogs or schemas vs. all in one default schema
- Consider what should be parameterized at the pipeline level to make deployment easy.
- Consider Multi-Schema Patterns below, ask if unclear on best choices.
- Consider Modern Defaults below, ask if unclear on best choices.
Quick Start: Initialize New Pipeline Project
RECOMMENDED: Use databricks pipelines init to create production-ready Asset Bundle projects with multi-environment support.
When to Use Bundle Initialization
Use bundle initialization for New pipeline projects for a professional structure from the start
Use manual workflow for:
- Quick prototyping without multi-environment needs
- Existing manual projects you want to continue
- Learning/experimentation
Step 1: Initialize Project
I will automatically run this command when you request a new pipeline:
databricks pipelines init
Interactive Prompts:
- Project name: e.g., customer_orders_pipeline
- Initial catalog: Unity Catalog name (e.g., main
, prod_catalog
)
- Personal schema per user?: yes
for dev (each user gets their own schema), no
for prod
- Language: SQL or Python (auto-detected from your request - see language detection below)
Generated Structure:
my_pipeline/
├── databricks.yml # Multi-environment config (dev/prod)
├── resources/
│ └── *_etl.pipeline.yml # Pipeline resource definition
└── src/
└── *_etl/
├── explorations/ # Exploratory code in .ipynb
└── transformations/ # Your .sql or .py files here
Step 2: Customize Transformations
Replace the example code created by the init process with custom transformation files in src/transformations/
based on provided requirements, using best practice guidance from this skill.
For Python pipelines using cloudFiles: Ask the user where to store Auto Loader schema metadata. Recommend:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas
Step 3: Deploy and Run
# Deploy to workspace (dev by default)
databricks bundle deploy
# Run pipeline
databricks bundle run my_pipeline_etl
# Deploy to production
databricks bundle deploy --target prod
Quick Reference
Concept
Details
Names
SDP = Spark Declarative Pipelines = LDP = Lakeflow Declarative Pipelines = Lakeflow Pipelines (all interchangeable)
Python Import
from pyspark import pipelines as dp
Primary Decorators
@dp.table()
, @dp.materialized_view()
, @dp.temporary_view()
Temporary Views
@dp.temporary_view()
creates in-pipeline temporary views (no catalog/schema, no cluster_by). Useful for intermediate logic before AUTO CDC or when a view needs multiple references without persistence.
Replaces
Delta Live Tables (DLT) with import dlt
Based On
Apache Spark 4.1+ (Databricks' modern data pipeline framework)
Docs
https://docs.databricks.com/aws/en/ldp/developer/python-dev
Detailed guides
Ingestion patterns: Use 1-ingestion-patterns.md when planning how to get new data into your Lakeflow pipeline —- covers file formats, batch/streaming options, and tips for incremental and full loads. (Keywords: Auto Loader, Kafka, Event Hub, Kinesis, file formats)
Streaming pipeline patterns: See 2-streaming-patterns.md for designing pipelines with streaming data sources, change data detection, triggers, and windowing. (Keywords: deduplication, windowing, stateful operations, joins)
SCD query patterns: See 3-scd-query-patterns.md for querying Slowly Changing Dimensions Type 2 history tables, including current state queries, point-in-time analysis, temporal joins, and change tracking. (Keywords: SCD Type 2 history tables, temporal joins, querying historical data)
Performance tuning: Use 4-performance-tuning.md for optimizing pipelines with Liquid Clustering, state management, and best practices for high-performance streaming workloads. (Keywords: Liquid Clustering, optimization, state management)
Python API reference: See 5-python-api.md for the modern pyspark.pipelines
(dp) API reference and migration from legacy dlt
API patterns. (Keywords: dp API, dlt API comparison)
DLT migration: Use 6-dlt-migration.md when migrating existing Delta Live Tables (DLT) pipelines to Spark Declarative Pipelines (SDP). (Keywords: migrating DLT pipelines to SDP)
Advanced configuration: See 7-advanced-configuration.md for advanced pipeline settings including development mode, continuous execution, notifications, Python dependencies, and custom cluster configurations. (Keywords: extra_settings parameter reference, examples)
Project initialization: Use 8-project-initialization.md for setting up new pipeline projects with databricks pipelines init
, Asset Bundles, multi-environment deployments, and language detection logic. (Keywords: databricks pipelines init, Asset Bundles, language detection, migration guides)
AUTO CDC patterns: Use 9-auto_cdc.md for implementing Change Data Capture with AUTO CDC, including Slow Changing Dimensions (SCD Type 1 and Type 2) for tracking changes and deduplication. (Keywords: AUTO CDC, Slow Changing Dimension, SCD, SCD Type 1, SCD Type 2, change data capture, deduplication)
Workflow
-
Determine the task type:
Setting up new project? → Read 8-project-initialization.md first
Creating new pipeline? → Read 1-ingestion-patterns.md
Creating stream table? → Read 2-streaming-patterns.md
Querying SCD history tables? → Read 3-scd-query-patterns.md
Implementing AUTO CDC or SCD? → Read 9-auto_cdc.md
Performance issues? → Read 4-performance-tuning.md
Using Python API? → Read 5-python-api.md
Migrating from DLT? → Read 6-dlt-migration.md
Advanced configuration? → Read 7-advanced-configuration.md
Validating? → Read validation-checklist.md
-
Follow the instructions in the relevant guide
-
Repeat for next task type
Official Documentation
- Lakeflow Spark Declarative Pipelines Overview - Main documentation hub
- SQL Language Reference - SQL syntax for streaming tables and materialized views
- Python Language Reference - pyspark.pipelines
API
- Loading Data - Auto Loader, Kafka, Kinesis ingestion
- Change Data Capture (CDC) - AUTO CDC, SCD Type 1/2
Medallion Architecture Pattern
Bronze Layer (Raw)
- Raw data ingested from sources in original format
- Minimal transformations (append-only, add metadata like _ingested_at
, _source_file
)
- Single source of truth preserving data lineage
Silver Layer (Validated)
- Cleaned and validated data.
- Might deduplicate here with auto_cdc, but often wait until the final step for auto_cdc if possible.
- Business logic applied (type casting, quality checks, filtering invalid records)
- Enterprise view of key business entities
- Enables self-service analytics and ML
Gold Layer (Business-Ready)
- Aggregated, denormalized, project-specific tables
- Optimized for consumption (reporting, dashboards, BI tools)
- Fewer joins, read-optimized data models
- Kimball star schema tables - dim_<entity_name>, fact_<entity_name>
- Deduplication often happens here via Slow Changing Dimensions (SCD), using auto_cdc. Sometimes that will happen upstream in silver instead, such as when joining multiple tables or business users plan to query the table from silver.
Typical Flow (Can vary)
Bronze: read_files() or spark.readStream.format("cloudFiles") → streaming table
Silver: read bronze → filter/clean/validate → streaming table
Gold: read silver → aggregate/denormalize → auto_cdc or materialized view
Sources:
- https://www.databricks.com/glossary/medallion-architecture
- https://docs.databricks.com/aws/en/lakehouse/medallion
- https://www.databricks.com/blog/2022/06/24/data-warehousing-modeling-techniques-and-their-implementation-on-the-databricks-lakehouse-platform.html
For medallion architecture (bronze/silver/gold), two approaches work:
- Flat with naming (template default): bronze_*.sql
, silver_*.sql
, gold_*.sql
- Subdirectories: bronze/orders.sql
, silver/cleaned.sql
, gold/summary.sql
Both work with the transformations/**
glob pattern. Choose based on preference.
See 8-project-initialization.md for complete details on bundle initialization, migration, and troubleshooting.
General SDP development guidance
Step 1: Write Pipeline Files Locally
Create .sql
or .py
files in a local folder:
my_pipeline/
├── bronze/
│ ├── ingest_orders.sql # SQL (default for most cases)
│ └── ingest_events.py # Python (for complex logic)
├── silver/
│ └── clean_orders.sql
└── gold/
└── daily_summary.sql
SQL Example (bronze/ingest_orders.sql
):
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM read_files(
'/Volumes/catalog/schema/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);
Python Example (bronze/ingest_events.py
):
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp
# Get schema location from pipeline configuration
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events")
.load("/Volumes/catalog/schema/raw/events/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)
IMPORTANT for Python Pipelines: When using spark.readStream.format("cloudFiles")
for cloud storage ingestion, with schema inference (no schema specified), you must specify a schema location.
Always ask the user where to store Auto Loader schema metadata. Recommend:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas
Example: /Volumes/my_catalog/pipeline_metadata/orders_pipeline_metadata/schemas
Never use the source data volume - this causes permission conflicts. The schema location should be configured in the pipeline settings and accessed via spark.conf.get("schema_location_base")
.
Language Selection:
CRITICAL RULE: If the user explicitly mentions "Python" in their request (e.g., "Python Spark Declarative Pipeline", "Python SDP", "use Python"), ALWAYS use Python without asking. The same applies to SQL - if they say "SQL pipeline", use SQL.
- Explicit language request: User says "Python" → Use Python. User says "SQL" → Use SQL. Do not ask for clarification.
- Auto-detection (only when no explicit language mentioned):
- SQL indicators: "sql files", "simple transformations", "aggregations", "materialized view", "CREATE OR REFRESH"
- Python indicators: ".py files", "UDF", "complex logic", "ML inference", "external API", "@dp.table", "pandas", "decorator"
- Prompt for clarification only when language intent is truly ambiguous (no explicit mention, mixed signals)
- Default to SQL only when ambiguous AND no Python indicators present
See 8-project-initialization.md for detailed language detection logic.
Option 1: Pipelines with DABs:
Use asset bundles and pipeline CLI.
See Quick Start and 8-project-initialization.md for complete details.
Option 2: Manual Workflow (Advanced)
For rapid prototyping, experimentation, or when you prefer direct control without Asset Bundles, use the manual workflow with MCP tools.
Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is create_or_update_pipeline
which handles the entire lifecycle.
IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly ask for classic, pro, advances compute or requires R language, Spark RDD APIs, or JAR libraries.
See 10-mcp-approach.md for detailed guide.
Best Practices (2026)
Project Structure
- Default to databricks pipelines init
for new projects (creates Asset Bundle)
- Use Asset Bundles for multi-environment deployments (dev/staging/prod)
- Manual structure only for quick prototypes or legacy migration
- Medallion architecture: Two approaches work with Asset Bundles:
- Flat structure (template default): bronze_*.sql
, silver_*.sql
, gold_*.sql
in transformations/
- Subdirectories: transformations/bronze/
, transformations/silver/
, transformations/gold/
- Both work with the transformations/**
glob pattern - choose based on team preference
- See 8-project-initialization.md for project setup details
Minimal pipeline config pointers
- Define parameters in your pipeline’s configuration and access them in code with spark.conf.get("key").
- In Databricks Asset Bundles, set these under resources.pipelines..configuration; validate with databricks bundle validate.
Modern Defaults
- CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
- Raw .sql
/.py
files, not notebooks
- Serverless compute ONLY - Do not use classic clusters unless explicitly required
- Unity Catalog (required for serverless)
- read_files() when using SQL for cloud storage ingestion - see 1-ingestion-patterns.md
Multi-Schema Patterns
Default: Single target schema per pipeline. Each pipeline has one target catalog
and schema
where all tables are written.
Option 1: Single Pipeline, Single Schema with Prefixes (Recommended)
Use one schema with table name prefixes to distinguish layers:
# All tables write to: catalog.schema.bronze_*, silver_*, gold_*
@dp.table(name="bronze_orders") # → catalog.schema.bronze_orders
@dp.table(name="silver_orders") # → catalog.schema.silver_orders
@dp.table(name="gold_summary") # → catalog.schema.gold_summary
Advantages:
- Simpler configuration (one pipeline)
- All tables in one schema for easy discovery
Option 2:
Use varaiables to specific separate catalog and/or schema for different steps.
Below are Python SDP examples that source variables from pipeline configs via spark.conf.get, and use the default catalog/schema for bronze.
Same catalog, separate schemas; bronze uses pipeline defaults
- Set your pipeline’s default catalog and default schema to the bronze layer (for example, catalog=my_catalog, schema=bronze). When you omit catalog/schema in code, reads/writes go to these defaults.
- Use pipeline parameters for the other schemas and any source schema/path, retrieved in code with spark.conf.get(...).
from pyspark import pipelines as dp
from pyspark.sql.functions import col
# Pull variables from pipeline configuration parameters
silver_schema = spark.conf.get("silver_schema") # e.g., "silver"
gold_schema = spark.conf.get("gold_schema") # e.g., "gold"
landing_schema = spark.conf.get("landing_schema") # e.g., "landing"
# Bronze → uses default catalog/schema (set to bronze in pipeline settings)
@dp.table(name="orders_bronze")
def orders_bronze():
# Read from another schema in the same default catalog
return spark.readStream.table(f"{landing_schema}.orders_raw")
# Silver → same catalog, schema from parameter
@dp.table(name=f"{silver_schema}.orders_clean")
def orders_clean():
return (spark.read.table("orders_bronze") # unqualified = default catalog/schema
.filter(col("order_id").isNotNull()))
# Gold → same catalog, schema from parameter
@dp.materialized_view(name=f"{gold_schema}.orders_by_date")
def orders_by_date():
return (spark.read.table(f"{silver_schema}.orders_clean")
.groupBy("order_date")
.count().withColumnRenamed("count", "order_count"))
- Using unqualified names for bronze ensures it lands in the pipeline’s default catalog/schema; silver/gold are explicitly schema-qualified within the same catalog.
Custom catalog/schema per layer; bronze still uses pipeline defaults
- Keep bronze in the pipeline defaults (default catalog/schema set to your bronze layer). For silver/gold, use fully-qualified names with catalog and schema variables from pipeline configuration.
from pyspark import pipelines as dp
from pyspark.sql.functions import col
# Pull variables from pipeline configuration parameters
silver_catalog = spark.conf.get("silver_catalog") # e.g., "my_catalog"
silver_schema = spark.conf.get("silver_schema") # e.g., "silver"
gold_catalog = spark.conf.get("gold_catalog") # e.g., "my_catalog"
gold_schema = spark.conf.get("gold_schema") # e.g., "gold"
landing_catalog = spark.conf.get("landing_catalog") # optional, if source is in another catalog
landing_schema = spark.conf.get("landing_schema")
# Bronze → uses default catalog/schema (set to bronze)
@dp.table(name="orders_bronze")
def orders_bronze():
# If source is in a specified catalog/schema:
return spark.readStream.table(f"{landing_catalog}.{landing_schema}.orders_raw")
# Silver → custom catalog + schema via parameters
@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean")
def orders_clean():
# Read bronze by its unqualified name (defaults), or fully qualify if preferred
return (spark.read.table("orders_bronze")
.filter(col("order_id").isNotNull()))
# Gold → custom catalog + schema via parameters
@dp.materialized_view(name=f"{gold_catalog}.{gold_schema}.orders_by_date}")
def orders_by_date():
return (spark.read.table(f"{silver_catalog}.{silver_schema}.orders_clean")
.groupBy("order_date")
.count().withColumnRenamed("count", "order_count"))
- Multipart names in the decorator’s name argument let you publish to explicit catalog.schema targets within one pipeline.
- Unqualified reads/writes use the pipeline defaults; use fully-qualified names when crossing catalogs or when you need explicit namespace control.
Note: The @dp.table()
decorator does not currently support separate for schema=
or catalog=
parameters. The table parameter is a string that contains the catalog.schema.table_name, or it can leave off catalog and or schema to use the pipeilnes configured default target schema.
Reading Tables in Python
Modern SDP Best Practice:
- Use spark.read.table()
for batch reads
- Use spark.readStream.table()
for streaming reads
- Don't use dp.read()
or dp.read_stream()
(old syntax, no longer documented)
- Don't use dlt.read()
or dlt.read_stream()
(legacy DLT API)
Key Point: SDP automatically tracks table dependencies from standard Spark DataFrame operations. No special read APIs are needed.
Three-Tier Identifier Resolution
SDP supports three levels of table name qualification:
Level
Syntax
When to Use
Unqualified
spark.read.table("my_table")
Reading tables within the same pipeline's target catalog/schema (recommended)
Partially-qualified
spark.read.table("other_schema.my_table")
Reading from different schema in same catalog
Fully-qualified
spark.read.table("other_catalog.other_schema.my_table")
Reading from external catalogs/schemas
Option 1: Unqualified Names (Recommended for Pipeline Tables)
Best practice for tables within the same pipeline. SDP resolves unqualified names to the pipeline's configured target catalog and schema. This makes code portable across environments (dev/prod).
@dp.table(name="silver_clean")
def silver_clean():
# Reads from pipeline's target catalog/schema (e.g., dev_catalog.dev_schema.bronze_raw)
return (
spark.read.table("bronze_raw")
.filter(F.col("valid") == True)
)
@dp.table(name="silver_events")
def silver_events():
# Streaming read from same pipeline's bronze_events table
return (
spark.readStream.table("bronze_events")
.withColumn("processed_at", F.current_timestamp())
)
Option 2: Pipeline Parameters (For External Sources)
Use spark.conf.get()
to parameterize external catalog/schema references. Define parameters in pipeline configuration, then reference them at the module level.
from pyspark import pipelines as dp
from pyspark.sql import functions as F
# Get parameterized values at module level (evaluated once at pipeline start)
source_catalog = spark.conf.get("source_catalog")
source_schema = spark.conf.get("source_schema", "sales") # with default
@dp.table(name="transaction_summary")
def transaction_summary():
return (
spark.read.table(f"{source_catalog}.{source_schema}.transactions")
.groupBy("account_id")
.agg(
F.count("txn_id").alias("txn_count"),
F.sum("txn_amount").alias("account_revenue")
)
)
Configure parameters in pipeline settings:
- Asset Bundles: Add to pipeline.yml
under configuration:
- Manual/MCP: Pass via extra_settings.configuration
dict
# In resources/my_pipeline.pipeline.yml
configuration:
source_catalog: "shared_catalog"
source_schema: "sales"
Option 3: Fully-Qualified Names (For Fixed External References)
Use when referencing specific external tables that don't change across environments:
@dp.table(name="enriched_orders")
def enriched_orders():
# Pipeline-internal table (unqualified)
orders = spark.read.table("bronze_orders")
# External reference table (fully-qualified)
products = spark.read.table("shared_catalog.reference.products")
return orders.join(products, "product_id")
Choosing the Right Approach
Scenario
Recommended Approach
Reading tables created in same pipeline
Unqualified names - portable, uses target catalog/schema
Reading from external source that varies by environment
Pipeline parameters - configurable per deployment
Reading from shared/reference tables with fixed location
Fully-qualified names - explicit and clear
Mixed pipeline (some internal, some external)
Combine approaches - unqualified for internal, parameters for external
Common Issues
Issue
Solution
Empty output tables
Use get_table_details
to verify, check upstream sources
Pipeline stuck INITIALIZING
Normal for serverless, wait a few minutes
"Column not found"
Check schemaHints
match actual data
Streaming reads fail
For file ingestion in a streaming table, you must use the STREAM
keyword with read_files
: FROM STREAM read_files(...)
. For table streams use FROM stream(table)
. See read_files — Usage in streaming tables.
Timeout during run
Increase timeout
, or use wait_for_completion=False
and check status with get_pipeline
MV doesn't refresh
Enable row tracking on source tables
SCD2: query column not found
Lakeflow uses __START_AT
and __END_AT
(double underscore), not START_AT
/END_AT
. Use WHERE __END_AT IS NULL
for current rows. See 3-scd-patterns.md.
AUTO CDC parse error at APPLY/SEQUENCE
Put APPLY AS DELETE WHEN
before SEQUENCE BY
. Only list columns in COLUMNS * EXCEPT (...)
that exist in the source (omit _rescued_data
unless bronze uses rescue data). Omit TRACK HISTORY ON *
if it causes "end of input" errors; default is equivalent. See 2-streaming-patterns.md.
"Cannot create streaming table from batch query"
In a streaming table query, use FROM STREAM read_files(...)
so read_files
leverages Auto Loader; FROM read_files(...)
alone is batch. See 1-ingestion-patterns.md and read_files — Usage in streaming tables.
For detailed errors, the result["message"]
from create_or_update_pipeline
includes suggested next steps. Use get_pipeline(pipeline_id=...)
which includes recent events and error details.
Advanced Pipeline Configuration
For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.
Platform Constraints
Serverless Pipeline Requirements (Default)
Requirement
Details
Unity Catalog
Required - serverless pipelines always use UC
Workspace Region
Must be in serverless-enabled region
Serverless Terms
Must accept serverless terms of use
CDC Features
Requires serverless (or Pro/Advanced with classic clusters)
Serverless Limitations (When Classic Clusters Required)
Limitation
Workaround
R language
Not supported - use classic clusters if required
Spark RDD APIs
Not supported - use classic clusters if required
JAR libraries
Not supported - use classic clusters if required
Maven coordinates
Not supported - use classic clusters if required
DBFS root access
Limited - must use Unity Catalog external locations
Global temp views
Not supported
General Constraints
Constraint
Details
Schema Evolution
Streaming tables require full refresh for incompatible changes
SQL Limitations
PIVOT clause unsupported
Sinks
Python only, streaming only, append flows only
Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.
Related Skills
- databricks-jobs - for orchestrating and scheduling pipeline runs
- databricks-asset-bundles - for multi-environment deployment of pipeline projects
- databricks-synthetic-data-gen - for generating test data to feed into pipelines
- databricks-unity-catalog - for catalog/schema/volume management and governance