Dagster Development Expert
When to Use This Skill vs. Others
If User Says... Use This Skill/Command Why
"what's the best way to X" /dagster-conventions
Need patterns/best practices
"how do I structure assets" /dagster-conventions
Asset design guidance
"which integration should I use" /dagster-integrations
Integration discovery needed
"implement X pipeline" /dg:prototype
Ready to build, not just learn
"is this pythonic" /dignified-python
Python code review needed
"create new project" /dg:create-project
Project initialization needed
"how do I test assets" /dagster-conventions (testing section) Testing patterns guidance
"schedule patterns" /dagster-conventions (automation section) Scheduling/automation guidance
"dbt best practices" /dagster-conventions (dbt section) dbt-specific patterns
Core Philosophy
Think in Assets: Dagster is built around the asset abstraction—persistent objects like tables, files, or models that your pipeline produces. Assets provide:
-
Clear Lineage: Explicit dependencies define data flow
-
Better Observability: Track what data exists and how it was created
-
Improved Testability: Assets are just Python functions that can be tested directly
-
Declarative Pipelines: Focus on what to produce, not how to execute
Assets over Ops: For most data pipelines, prefer assets over ops. Use ops only when the asset abstraction doesn't fit (non-data workflows, complex execution patterns).
Environment Separation: Use resources and EnvVar to maintain separate configurations for dev, staging, and production without code changes.
Quick Reference
If you're writing... Check this section/reference
@dg.asset
Assets or references/assets.md
ConfigurableResource
Resources or references/resources.md
AutomationCondition
Declarative Automation or references/automation.md
@dg.schedule or ScheduleDefinition
Automation or references/automation.md
@dg.sensor
Sensors or references/automation.md
PartitionsDefinition
Partitions or references/automation.md
Tests with dg.materialize()
Testing or references/testing.md
@asset_check
references/testing.md#asset-checks
@dlt_assets or @sling_assets
references/etl-patterns.md
@dbt_assets
dbt Integration or dbt-development skill
Definitions or code locations references/project-structure.md
Components (defs.yaml ) references/project-structure.md#components
Core Concepts
Asset: A persistent object (table, file, model) that your pipeline produces. Define with @dg.asset .
Resource: External services/tools (databases, APIs) shared across assets. Define with ConfigurableResource .
Job: A selection of assets to execute together. Create with dg.define_asset_job() .
Schedule: Time-based automation for jobs. Create with dg.ScheduleDefinition .
Sensor: Event-driven automation that watches for changes. Define with @dg.sensor .
Partition: Logical divisions of data (by date, category). Define with PartitionsDefinition .
Definitions: The container for all Dagster objects in a code location.
Component: Reusable, declarative building blocks that generate Definitions from configuration (YAML). Use for standardized patterns.
Declarative Automation: Modern automation framework where you set conditions on assets rather than scheduling jobs.
Assets Quick Reference
Basic Asset
import dagster as dg
@dg.asset def my_asset() -> None: """Asset description appears in the UI.""" # Your computation logic here pass
Asset with Dependencies
@dg.asset def downstream_asset(upstream_asset) -> dict: """Depends on upstream_asset by naming it as a parameter.""" return {"processed": upstream_asset}
Asset with Metadata
@dg.asset( group_name="analytics", key_prefix=["warehouse", "staging"], description="Cleaned customer data", owners=["team:data-engineering", "alice@example.com"], tags={"priority": "high", "domain": "sales"}, code_version="1.2.0", ) def customers() -> None: pass
Best Practices:
-
Naming: Use nouns describing what is produced (customers , daily_revenue ), not verbs (load_customers )
-
Tags: Primary mechanism for organization (use liberally)
-
Owners: Specify team or individual owners for accountability
-
code_version: Track when asset logic changes for lineage
Resources Quick Reference
Define a Resource
from dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource): connection_string: str
def query(self, sql: str) -> list:
# Implementation here
pass
Use in Assets
@dg.asset def my_asset(database: DatabaseResource) -> None: results = database.query("SELECT * FROM table")
Register in Definitions
dg.Definitions( assets=[my_asset], resources={"database": DatabaseResource(connection_string="...")}, )
Automation Quick Reference
Schedule
import dagster as dg from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition( job=my_job, cron_schedule="0 0 * * *", # Daily at midnight )
Common Cron Patterns
Pattern Meaning
0 * * * *
Every hour
0 0 * * *
Daily at midnight
0 0 * * 1
Weekly on Monday
0 0 1 * *
Monthly on the 1st
0 0 5 * *
Monthly on the 5th
Declarative Automation Quick Reference
Modern automation pattern: Set conditions on assets instead of scheduling jobs.
AutomationCondition Examples
from dagster import AutomationCondition
Update when upstream data changes
@dg.asset( automation_condition=AutomationCondition.on_missing() ) def my_asset() -> None: pass
Update daily at a specific time
@dg.asset( automation_condition=AutomationCondition.on_cron("0 9 * * *") ) def daily_report() -> None: pass
Combine conditions
@dg.asset( automation_condition=( AutomationCondition.on_missing() | AutomationCondition.on_cron("0 0 * * *") ) ) def flexible_asset() -> None: pass
Benefits over Schedules:
-
More expressive condition logic
-
Asset-native (no separate job definitions needed)
-
Automatic dependency-aware execution
-
Better for complex automation scenarios
When to Use:
-
Asset-centric pipelines with complex update logic
-
Condition-based triggers (data availability, freshness)
-
Prefer over schedules for new projects
Sensors Quick Reference
Basic Sensor Pattern
@dg.sensor(job=my_job) def my_sensor(context: dg.SensorEvaluationContext): # 1. Read cursor (previous state) previous_state = json.loads(context.cursor) if context.cursor else {} current_state = {} runs_to_request = []
# 2. Check for changes
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. Return result with updated cursor
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)
Key: Use cursors to track state between sensor evaluations.
Partitions Quick Reference
Time-Based Partition
weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition) def weekly_data(context: dg.AssetExecutionContext) -> None: partition_key = context.partition_key # e.g., "2023-01-01" # Process data for this partition
Static Partition
region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition) def regional_data(context: dg.AssetExecutionContext) -> None: region = context.partition_key
Partition Types
Type Use Case
DailyPartitionsDefinition
One partition per day
WeeklyPartitionsDefinition
One partition per week
MonthlyPartitionsDefinition
One partition per month
HourlyPartitionsDefinition
One partition per hour
StaticPartitionsDefinition
Fixed set of partitions
DynamicPartitionsDefinition
Partitions created at runtime
MultiPartitionsDefinition
Combine multiple partition dimensions
Best Practice: Limit partitions to 100,000 or fewer per asset for optimal UI performance.
Testing Quick Reference
Direct Function Testing
def test_my_asset(): result = my_asset() assert result == expected_value
Testing with Materialization
def test_asset_graph(): result = dg.materialize( assets=[asset_a, asset_b], resources={"database": mock_database}, ) assert result.success assert result.output_for_node("asset_b") == expected
Mocking Resources
from unittest.mock import Mock
def test_with_mocked_resource(): mocked_resource = Mock() mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.success
Asset Checks
@dg.asset_check(asset=my_asset) def validate_non_empty(my_asset): return dg.AssetCheckResult( passed=len(my_asset) > 0, metadata={"row_count": len(my_asset)}, )
dbt Integration
For dbt integration, prefer the component-based approach for standard dbt projects. Use Pythonic assets only when you need custom logic or fine-grained control.
Component-Based dbt (Recommended)
Use DbtProjectComponent with remote Git repository:
defs/transform/defs.yaml
type: dagster_dbt.DbtProjectComponent
attributes: project: repo_url: https://github.com/dagster-io/jaffle-platform.git repo_relative_path: jdbt dbt: target: dev
When to use:
-
Standard dbt transformations
-
Remote dbt project in Git repository
-
Declarative configuration preferred
-
Component reusability desired
For private repositories:
attributes: project: repo_url: https://github.com/your-org/dbt-project.git repo_relative_path: dbt token: "{{ env.GIT_TOKEN }}" dbt: target: dev
Pythonic dbt Assets
For custom logic or local development:
from dagster_dbt import DbtCliResource, dbt_assets from pathlib import Path
dbt_project_dir = Path(file).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json") def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream()
dg.Definitions( assets=[my_dbt_assets], resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)}, )
When to use:
-
Custom transformation logic needed
-
Local development with frequent dbt code changes
-
Fine-grained control over dbt execution
Full patterns: See Dagster dbt docs
When to Load References
Load references/assets.md when:
-
Defining complex asset dependencies
-
Adding metadata, groups, or key prefixes
-
Working with asset factories
-
Understanding asset materialization patterns
Load references/resources.md when:
-
Creating custom ConfigurableResource classes
-
Integrating with databases, APIs, or cloud services
-
Understanding resource scoping and lifecycle
Load references/automation.md when:
-
Creating schedules with complex cron patterns
-
Building sensors with cursors and state management
-
Implementing partitions and backfills
-
Using declarative automation conditions
-
Automating dbt or other integration runs
Load references/testing.md when:
-
Writing unit tests for assets
-
Mocking resources and dependencies
-
Using dg.materialize() for integration tests
-
Creating asset checks for data validation
Load references/etl-patterns.md when:
-
Using dlt for embedded ETL
-
Using Sling for database replication
-
Loading data from files or APIs
-
Integrating external ETL tools
Load references/project-structure.md when:
-
Setting up a new Dagster project
-
Configuring Definitions and code locations
-
Using dg CLI for scaffolding
-
Organizing large projects with Components
Project Structure
Recommended Layout
my_project/ ├── pyproject.toml ├── src/ │ └── my_project/ │ ├── definitions.py # Main Definitions │ └── defs/ │ ├── assets/ │ │ ├── init.py │ │ └── my_assets.py │ ├── jobs.py │ ├── schedules.py │ ├── sensors.py │ └── resources.py └── tests/ └── test_assets.py
Definitions Pattern (Modern)
Auto-Discovery (Simplest):
src/my_project/definitions.py
from dagster import Definitions from dagster_dg import load_defs
Automatically discovers all definitions in defs/ folder
defs = Definitions.merge( load_defs() )
Combining Components with Pythonic Assets:
src/my_project/definitions.py
from dagster import Definitions from dagster_dg import load_defs from my_project.assets import custom_assets
Load component definitions from defs/ folder
component_defs = load_defs()
Define pythonic assets separately
pythonic_defs = Definitions( assets=custom_assets, resources={...} )
Merge them together
defs = Definitions.merge(component_defs, pythonic_defs)
Traditional (Explicit):
src/my_project/definitions.py
from dagster import Definitions from my_project.defs import assets, jobs, schedules, resources
defs = Definitions( assets=assets, jobs=jobs, schedules=schedules, resources=resources, )
Scaffolding with dg CLI
Create new project
uvx create-dagster my_project
Scaffold new asset file
dg scaffold defs dagster.asset assets/new_asset.py
Scaffold schedule
dg scaffold defs dagster.schedule schedules.py
Scaffold sensor
dg scaffold defs dagster.sensor sensors.py
Validate definitions
dg check defs
Common Patterns
Job Definition
trip_update_job = dg.define_asset_job( name="trip_update_job", selection=["taxi_trips", "taxi_zones"], )
Run Configuration
from dagster import Config
class MyAssetConfig(Config): filename: str limit: int = 100
@dg.asset def configurable_asset(config: MyAssetConfig) -> None: print(f"Processing {config.filename} with limit {config.limit}")
Asset Dependencies with External Sources
@dg.asset(deps=["external_table"]) def derived_asset() -> None: """Depends on external_table which isn't managed by Dagster.""" pass
Anti-Patterns to Avoid
Anti-Pattern Better Approach
Hardcoding credentials in assets Use ConfigurableResource with env vars
Giant assets that do everything Split into focused, composable assets
Ignoring asset return types Use type annotations for clarity
Skipping tests for assets Test assets like regular Python functions
Not using partitions for time-series Use DailyPartitionsDefinition etc.
Putting all assets in one file Organize by domain in separate modules
CLI Quick Reference
dg CLI (Recommended for Modern Projects)
Development
dg dev # Start Dagster UI (port 3000) dg check defs # Validate definitions load correctly dg list defs # Show all loaded definitions dg list components # Show available components
Scaffolding
dg scaffold defs dagster.asset assets/file.py dg scaffold defs dagster.schedule schedules.py dg scaffold defs dagster.sensor sensors.py dg scaffold defs dagster.resources resources.py
Execution
dg launch --assets my_asset # Materialize specific asset dg launch --assets asset1 asset2 # Multiple assets dg launch --assets "*" # Materialize all assets dg launch --assets "tag:priority=high" # Assets by tag dg launch --assets "group:sales_analytics" # Assets by group dg launch --assets "kind:dbt" # Assets by kind dg launch --job my_job # Execute a job
Partitions
dg launch --assets my_asset --partition 2024-01-15 # Single partition dg launch --assets my_asset --partition-range "2024-01-01...2024-01-31" # Backfill range
Configuration
dg launch --assets my_asset --config-json '{"ops": {"my_asset": {"config": {"param": "value"}}}}'
Environment Variables
uv run dg launch --assets my_asset # Auto-loads .env with uv set -a; source .env; set +a; dg launch --assets my_asset # Manual .env loading
See /dg:launch command for comprehensive launch documentation
dagster CLI (Legacy/General Purpose)
Use for non-dg projects or advanced scenarios
dagster dev # Start Dagster UI dagster job execute -j my_job # Execute a job dagster asset materialize -a my_asset # Materialize an asset
Use dg CLI for projects created with create-dagster . It provides auto-discovery, scaffolding, and modern workflow support.
References
-
Assets: references/assets.md
-
Detailed asset patterns and launching guidance
-
Resources: references/resources.md
-
Resource configuration
-
Automation: references/automation.md
-
Schedules, sensors, partitions
-
Testing: references/testing.md
-
Testing patterns and asset checks
-
ETL Patterns: references/etl-patterns.md
-
dlt, Sling, file/API ingestion
-
Project Structure: references/project-structure.md
-
Definitions, Components
-
Launch Command: /dg:launch
-
Comprehensive asset launching documentation
-
Official Docs: https://docs.dagster.io
-
API Reference: https://docs.dagster.io/api/dagster