dagster-development

Dagster Development Expert

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "dagster-development" with this command: npx skills add c00ldudenoonan/economic-data-project/c00ldudenoonan-economic-data-project-dagster-development

Dagster Development Expert

Quick Reference

If you're writing... Check this section/reference

@dg.asset

Assets or references/assets.md

ConfigurableResource

Resources or references/resources.md

@dg.schedule or ScheduleDefinition

Automation or references/automation.md

@dg.sensor

Sensors or references/automation.md

PartitionsDefinition

Partitions or references/automation.md

Tests with dg.materialize()

Testing or references/testing.md

@asset_check

references/testing.md#asset-checks

@dlt_assets or @sling_assets

references/etl-patterns.md

@dbt_assets

dbt Integration or dbt-development skill

Definitions or code locations references/project-structure.md

Core Concepts

Asset: A persistent object (table, file, model) that your pipeline produces. Define with @dg.asset .

Resource: External services/tools (databases, APIs) shared across assets. Define with ConfigurableResource .

Job: A selection of assets to execute together. Create with dg.define_asset_job() .

Schedule: Time-based automation for jobs. Create with dg.ScheduleDefinition .

Sensor: Event-driven automation that watches for changes. Define with @dg.sensor .

Partition: Logical divisions of data (by date, category). Define with PartitionsDefinition .

Definitions: The container for all Dagster objects in a code location.

Assets Quick Reference

Basic Asset

import dagster as dg

@dg.asset def my_asset() -> None: """Asset description appears in the UI.""" # Your computation logic here pass

Asset with Dependencies

@dg.asset def downstream_asset(upstream_asset) -> dict: """Depends on upstream_asset by naming it as a parameter.""" return {"processed": upstream_asset}

Asset with Metadata

@dg.asset( group_name="analytics", key_prefix=["warehouse", "staging"], description="Cleaned customer data", ) def customers() -> None: pass

Naming: Use nouns describing what is produced (customers , daily_revenue ), not verbs (load_customers ).

Resources Quick Reference

Define a Resource

from dagster import ConfigurableResource

class DatabaseResource(ConfigurableResource): connection_string: str

def query(self, sql: str) -> list:
    # Implementation here
    pass

Use in Assets

@dg.asset def my_asset(database: DatabaseResource) -> None: results = database.query("SELECT * FROM table")

Register in Definitions

dg.Definitions( assets=[my_asset], resources={"database": DatabaseResource(connection_string="...")}, )

Automation Quick Reference

Schedule

import dagster as dg from my_project.defs.jobs import my_job

my_schedule = dg.ScheduleDefinition( job=my_job, cron_schedule="0 0 * * *", # Daily at midnight )

Common Cron Patterns

Pattern Meaning

0 * * * *

Every hour

0 0 * * *

Daily at midnight

0 0 * * 1

Weekly on Monday

0 0 1 * *

Monthly on the 1st

0 0 5 * *

Monthly on the 5th

Sensors Quick Reference

Basic Sensor Pattern

@dg.sensor(job=my_job) def my_sensor(context: dg.SensorEvaluationContext): # 1. Read cursor (previous state) previous_state = json.loads(context.cursor) if context.cursor else {} current_state = {} runs_to_request = []

# 2. Check for changes
for item in get_items_to_check():
    current_state[item.id] = item.modified_at
    if item.id not in previous_state or previous_state[item.id] != item.modified_at:
        runs_to_request.append(dg.RunRequest(
            run_key=f"run_{item.id}_{item.modified_at}",
            run_config={...}
        ))

# 3. Return result with updated cursor
return dg.SensorResult(
    run_requests=runs_to_request,
    cursor=json.dumps(current_state)
)

Key: Use cursors to track state between sensor evaluations.

Partitions Quick Reference

Time-Based Partition

weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")

@dg.asset(partitions_def=weekly_partition) def weekly_data(context: dg.AssetExecutionContext) -> None: partition_key = context.partition_key # e.g., "2023-01-01" # Process data for this partition

Static Partition

region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])

@dg.asset(partitions_def=region_partition) def regional_data(context: dg.AssetExecutionContext) -> None: region = context.partition_key

Partition Types

Type Use Case

DailyPartitionsDefinition

One partition per day

WeeklyPartitionsDefinition

One partition per week

MonthlyPartitionsDefinition

One partition per month

StaticPartitionsDefinition

Fixed set of partitions

MultiPartitionsDefinition

Combine multiple partition dimensions

Testing Quick Reference

Direct Function Testing

def test_my_asset(): result = my_asset() assert result == expected_value

Testing with Materialization

def test_asset_graph(): result = dg.materialize( assets=[asset_a, asset_b], resources={"database": mock_database}, ) assert result.success assert result.output_for_node("asset_b") == expected

Mocking Resources

from unittest.mock import Mock

def test_with_mocked_resource(): mocked_resource = Mock() mocked_resource.query.return_value = [{"id": 1}]

result = dg.materialize(
    assets=[my_asset],
    resources={"database": mocked_resource},
)
assert result.success

Asset Checks

@dg.asset_check(asset=my_asset) def validate_non_empty(my_asset): return dg.AssetCheckResult( passed=len(my_asset) > 0, metadata={"row_count": len(my_asset)}, )

dbt Integration

For dbt integration, use the minimal pattern below. For comprehensive dbt patterns, see the dbt-development skill.

Basic dbt Assets

from dagster_dbt import DbtCliResource, dbt_assets from pathlib import Path

dbt_project_dir = Path(file).parent / "dbt_project"

@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json") def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream()

dbt Resource

dg.Definitions( assets=[my_dbt_assets], resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)}, )

Full patterns: See Dagster dbt docs

When to Load References

Load references/assets.md when:

  • Defining complex asset dependencies

  • Adding metadata, groups, or key prefixes

  • Working with asset factories

  • Understanding asset materialization patterns

Load references/resources.md when:

  • Creating custom ConfigurableResource classes

  • Integrating with databases, APIs, or cloud services

  • Understanding resource scoping and lifecycle

Load references/automation.md when:

  • Creating schedules with complex cron patterns

  • Building sensors with cursors and state management

  • Implementing partitions and backfills

  • Automating dbt or other integration runs

Load references/testing.md when:

  • Writing unit tests for assets

  • Mocking resources and dependencies

  • Using dg.materialize() for integration tests

  • Creating asset checks for data validation

Load references/etl-patterns.md when:

  • Using dlt for embedded ETL

  • Using Sling for database replication

  • Loading data from files or APIs

  • Integrating external ETL tools

Load references/project-structure.md when:

  • Setting up a new Dagster project

  • Configuring Definitions and code locations

  • Using dg CLI for scaffolding

  • Organizing large projects with Components

Project Structure

Recommended Layout

my_project/ ├── pyproject.toml ├── src/ │ └── my_project/ │ ├── definitions.py # Main Definitions │ └── defs/ │ ├── assets/ │ │ ├── init.py │ │ └── my_assets.py │ ├── jobs.py │ ├── schedules.py │ ├── sensors.py │ └── resources.py └── tests/ └── test_assets.py

Definitions Pattern (Modern)

src/my_project/definitions.py

from pathlib import Path from dagster import definitions, load_from_defs_folder

@definitions def defs(): return load_from_defs_folder(project_root=Path(file).parent.parent.parent)

Scaffolding with dg CLI

Create new project

uvx create-dagster my_project

Scaffold new asset file

dg scaffold defs dagster.asset assets/new_asset.py

Scaffold schedule

dg scaffold defs dagster.schedule schedules.py

Scaffold sensor

dg scaffold defs dagster.sensor sensors.py

Validate definitions

dg check defs

Common Patterns

Job Definition

trip_update_job = dg.define_asset_job( name="trip_update_job", selection=["taxi_trips", "taxi_zones"], )

Run Configuration

from dagster import Config

class MyAssetConfig(Config): filename: str limit: int = 100

@dg.asset def configurable_asset(config: MyAssetConfig) -> None: print(f"Processing {config.filename} with limit {config.limit}")

Asset Dependencies with External Sources

@dg.asset(deps=["external_table"]) def derived_asset() -> None: """Depends on external_table which isn't managed by Dagster.""" pass

Anti-Patterns to Avoid

Anti-Pattern Better Approach

Hardcoding credentials in assets Use ConfigurableResource with env vars

Giant assets that do everything Split into focused, composable assets

Ignoring asset return types Use type annotations for clarity

Skipping tests for assets Test assets like regular Python functions

Not using partitions for time-series Use DailyPartitionsDefinition etc.

Putting all assets in one file Organize by domain in separate modules

CLI Quick Reference

Development

dg dev # Start Dagster UI dg check defs # Validate definitions

Scaffolding

dg scaffold defs dagster.asset assets/file.py dg scaffold defs dagster.schedule schedules.py dg scaffold defs dagster.sensor sensors.py

Production

dagster job execute -j my_job # Execute a job dagster asset materialize -a my_asset # Materialize an asset

References

  • Assets: references/assets.md

  • Detailed asset patterns

  • Resources: references/resources.md

  • Resource configuration

  • Automation: references/automation.md

  • Schedules, sensors, partitions

  • Testing: references/testing.md

  • Testing patterns and asset checks

  • ETL Patterns: references/etl-patterns.md

  • dlt, Sling, file/API ingestion

  • Project Structure: references/project-structure.md

  • Definitions, Components

  • Official Docs: https://docs.dagster.io

  • API Reference: https://docs.dagster.io/api/dagster

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

openclaw-version-monitor

监控 OpenClaw GitHub 版本更新,获取最新版本发布说明,翻译成中文, 并推送到 Telegram 和 Feishu。用于:(1) 定时检查版本更新 (2) 推送版本更新通知 (3) 生成中文版发布说明

Archived SourceRecently Updated
Coding

ask-claude

Delegate a task to Claude Code CLI and immediately report the result back in chat. Supports persistent sessions with full context memory. Safe execution: no data exfiltration, no external calls, file operations confined to workspace. Use when the user asks to run Claude, delegate a coding task, continue a previous Claude session, or any task benefiting from Claude Code's tools (file editing, code analysis, bash, etc.).

Archived SourceRecently Updated
Coding

ai-dating

This skill enables dating and matchmaking workflows. Use it when a user asks to make friends, find a partner, run matchmaking, or provide dating preferences/profile updates. The skill should execute `dating-cli` commands to complete profile setup, task creation/update, match checking, contact reveal, and review.

Archived SourceRecently Updated