dbt Transformation Patterns
Production-ready patterns for dbt (data build tool) including model organization, testing strategies, documentation, and incremental processing.
When to Use This Skill
-
Building data transformation pipelines with dbt
-
Organizing models into staging, intermediate, and marts layers
-
Implementing data quality tests
-
Creating incremental models for large datasets
-
Documenting data models and lineage
-
Setting up dbt project structure
Core Concepts
- Model Layers (Medallion Architecture)
sources/ Raw data definitions ↓ staging/ 1:1 with source, light cleaning ↓ intermediate/ Business logic, joins, aggregations ↓ marts/ Final analytics tables
- Naming Conventions
Layer Prefix Example
Staging stg_
stg_stripe__payments
Intermediate int_
int_payments_pivoted
Marts dim_ , fct_
dim_customers , fct_orders
Quick Start
dbt_project.yml
name: "analytics" version: "1.0.0" profile: "analytics"
model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"]
vars: start_date: "2020-01-01"
models: analytics: staging: +materialized: view +schema: staging intermediate: +materialized: ephemeral marts: +materialized: table +schema: analytics
Project structure
models/ ├── staging/ │ ├── stripe/ │ │ ├── _stripe__sources.yml │ │ ├── _stripe__models.yml │ │ ├── stg_stripe__customers.sql │ │ └── stg_stripe__payments.sql │ └── shopify/ │ ├── _shopify__sources.yml │ └── stg_shopify__orders.sql ├── intermediate/ │ └── finance/ │ └── int_payments_pivoted.sql └── marts/ ├── core/ │ ├── _core__models.yml │ ├── dim_customers.sql │ └── fct_orders.sql └── finance/ └── fct_revenue.sql
Patterns
Pattern 1: Source Definitions
models/staging/stripe/_stripe__sources.yml
version: 2
sources:
- name: stripe
description: Raw Stripe data loaded via Fivetran
database: raw
schema: stripe
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
tables:
-
name: customers description: Stripe customer records columns:
- name: id
description: Primary key
tests:
- unique
- not_null
- name: email description: Customer email
- name: created description: Account creation timestamp
- name: id
description: Primary key
tests:
-
name: payments description: Stripe payment transactions columns:
- name: id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships: to: source('stripe', 'customers') field: id
- name: id
tests:
-
Pattern 2: Staging Models
-- models/staging/stripe/stg_stripe__customers.sql with source as ( select * from {{ source('stripe', 'customers') }} ),
renamed as ( select -- ids id as customer_id,
-- strings
lower(email) as email,
name as customer_name,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
-- models/staging/stripe/stg_stripe__payments.sql {{ config( materialized='incremental', unique_key='payment_id', on_schema_change='append_new_columns' ) }}
with source as ( select * from {{ source('stripe', 'payments') }}
{% if is_incremental() %}
where _fivetran_synced > (select max(_loaded_at) from {{ this }})
{% endif %}
),
renamed as ( select -- ids id as payment_id, customer_id, invoice_id,
-- amounts (convert cents to dollars)
amount / 100.0 as amount,
amount_refunded / 100.0 as amount_refunded,
-- status
status as payment_status,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
Pattern 3: Intermediate Models
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql with payments as ( select * from {{ ref('stg_stripe__payments') }} ),
customers as ( select * from {{ ref('stg_stripe__customers') }} ),
payment_summary as ( select customer_id, count(*) as total_payments, count(case when payment_status = 'succeeded' then 1 end) as successful_payments, sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid, min(created_at) as first_payment_at, max(created_at) as last_payment_at from payments group by customer_id )
select customers.customer_id, customers.email, customers.created_at as customer_created_at, coalesce(payment_summary.total_payments, 0) as total_payments, coalesce(payment_summary.successful_payments, 0) as successful_payments, coalesce(payment_summary.total_amount_paid, 0) as lifetime_value, payment_summary.first_payment_at, payment_summary.last_payment_at
from customers left join payment_summary using (customer_id)
Pattern 4: Mart Models (Dimensions and Facts)
-- models/marts/core/dim_customers.sql {{ config( materialized='table', unique_key='customer_id' ) }}
with customers as ( select * from {{ ref('int_payments_pivoted_to_customer') }} ),
orders as ( select * from {{ ref('stg_shopify__orders') }} ),
order_summary as ( select customer_id, count(*) as total_orders, sum(total_price) as total_order_value, min(created_at) as first_order_at, max(created_at) as last_order_at from orders group by customer_id ),
final as ( select -- surrogate key {{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,
-- natural key
customers.customer_id,
-- attributes
customers.email,
customers.customer_created_at,
-- payment metrics
customers.total_payments,
customers.successful_payments,
customers.lifetime_value,
customers.first_payment_at,
customers.last_payment_at,
-- order metrics
coalesce(order_summary.total_orders, 0) as total_orders,
coalesce(order_summary.total_order_value, 0) as total_order_value,
order_summary.first_order_at,
order_summary.last_order_at,
-- calculated fields
case
when customers.lifetime_value >= 1000 then 'high'
when customers.lifetime_value >= 100 then 'medium'
else 'low'
end as customer_tier,
-- timestamps
current_timestamp as _loaded_at
from customers
left join order_summary using (customer_id)
)
select * from final
-- models/marts/core/fct_orders.sql {{ config( materialized='incremental', unique_key='order_id', incremental_strategy='merge' ) }}
with orders as ( select * from {{ ref('stg_shopify__orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
customers as ( select * from {{ ref('dim_customers') }} ),
final as ( select -- keys orders.order_id, customers.customer_key, orders.customer_id,
-- dimensions
orders.order_status,
orders.fulfillment_status,
orders.payment_status,
-- measures
orders.subtotal,
orders.tax,
orders.shipping,
orders.total_price,
orders.total_discount,
orders.item_count,
-- timestamps
orders.created_at,
orders.updated_at,
orders.fulfilled_at,
-- metadata
current_timestamp as _loaded_at
from orders
left join customers on orders.customer_id = customers.customer_id
)
select * from final
Pattern 5: Testing and Documentation
models/marts/core/_core__models.yml
version: 2
models:
-
name: dim_customers description: Customer dimension with payment and order metrics columns:
-
name: customer_key description: Surrogate key for the customer dimension tests:
- unique
- not_null
-
name: customer_id description: Natural key from source system tests:
- unique
- not_null
-
name: email description: Customer email address tests:
- not_null
-
name: customer_tier description: Customer value tier based on lifetime value tests:
- accepted_values: values: ["high", "medium", "low"]
-
name: lifetime_value description: Total amount paid by customer tests:
- dbt_utils.expression_is_true: expression: ">= 0"
-
-
name: fct_orders description: Order fact table with all order transactions tests:
- dbt_utils.recency: datepart: day field: created_at interval: 1 columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_key
tests:
- not_null
- relationships: to: ref('dim_customers') field: customer_key
Pattern 6: Macros and DRY Code
-- macros/cents_to_dollars.sql {% macro cents_to_dollars(column_name, precision=2) %} round({{ column_name }} / 100.0, {{ precision }}) {% endmacro %}
-- macros/generate_schema_name.sql {% macro generate_schema_name(custom_schema_name, node) %} {%- set default_schema = target.schema -%} {%- if custom_schema_name is none -%} {{ default_schema }} {%- else -%} {{ default_schema }}_{{ custom_schema_name }} {%- endif -%} {% endmacro %}
-- macros/limit_data_in_dev.sql {% macro limit_data_in_dev(column_name, days=3) %} {% if target.name == 'dev' %} where {{ column_name }} >= dateadd(day, -{{ days }}, current_date) {% endif %} {% endmacro %}
-- Usage in model select * from {{ ref('stg_orders') }} {{ limit_data_in_dev('created_at') }}
Pattern 7: Incremental Strategies
-- Delete+Insert (default for most warehouses) {{ config( materialized='incremental', unique_key='id', incremental_strategy='delete+insert' ) }}
-- Merge (best for late-arriving data) {{ config( materialized='incremental', unique_key='id', incremental_strategy='merge', merge_update_columns=['status', 'amount', 'updated_at'] ) }}
-- Insert Overwrite (partition-based) {{ config( materialized='incremental', incremental_strategy='insert_overwrite', partition_by={ "field": "created_date", "data_type": "date", "granularity": "day" } ) }}
select *, date(created_at) as created_date from {{ ref('stg_events') }}
{% if is_incremental() %} where created_date >= dateadd(day, -3, current_date) {% endif %}
dbt Commands
Development
dbt run # Run all models dbt run --select staging # Run staging models only dbt run --select +fct_orders # Run fct_orders and its upstream dbt run --select fct_orders+ # Run fct_orders and its downstream dbt run --full-refresh # Rebuild incremental models
Testing
dbt test # Run all tests dbt test --select stg_stripe # Test specific models dbt build # Run + test in DAG order
Documentation
dbt docs generate # Generate docs dbt docs serve # Serve docs locally
Debugging
dbt compile # Compile SQL without running dbt debug # Test connection dbt ls --select tag:critical # List models by tag
Best Practices
Do's
-
Use staging layer - Clean data once, use everywhere
-
Test aggressively - Not null, unique, relationships
-
Document everything - Column descriptions, model descriptions
-
Use incremental - For tables > 1M rows
-
Version control - dbt project in Git
Don'ts
-
Don't skip staging - Raw → mart is tech debt
-
Don't hardcode dates - Use {{ var('start_date') }}
-
Don't repeat logic - Extract to macros
-
Don't test in prod - Use dev target
-
Don't ignore freshness - Monitor source data