data-engineer

Build scalable data pipelines and analytics infrastructure.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-engineer" with this command: npx skills add htlin222/dotfiles/htlin222-dotfiles-data-engineer

Data Engineering

Build scalable data pipelines and analytics infrastructure.

When to use

  • ETL/ELT pipeline design

  • Data warehouse modeling

  • Streaming data processing

  • Data quality monitoring

Airflow DAG

from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago from datetime import timedelta

default_args = { 'owner': 'data-team', 'retries': 3, 'retry_delay': timedelta(minutes=5), 'email_on_failure': True, }

with DAG( 'etl_pipeline', default_args=default_args, schedule_interval='0 2 * * *', # Daily at 2 AM start_date=days_ago(1), catchup=False, ) as dag:

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
)

transform = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
)

load = PythonOperator(
    task_id='load',
    python_callable=load_data,
)

validate = PythonOperator(
    task_id='validate',
    python_callable=validate_data,
)

extract >> transform >> load >> validate

Data warehouse schema

Star schema

-- Fact table CREATE TABLE fact_sales ( sale_id BIGINT PRIMARY KEY, date_key INT REFERENCES dim_date(date_key), product_key INT REFERENCES dim_product(product_key), customer_key INT REFERENCES dim_customer(customer_key), quantity INT, amount DECIMAL(10,2), created_at TIMESTAMP DEFAULT NOW() );

-- Dimension tables CREATE TABLE dim_date ( date_key INT PRIMARY KEY, date DATE, year INT, quarter INT, month INT, week INT, day_of_week INT );

CREATE TABLE dim_product ( product_key INT PRIMARY KEY, product_id VARCHAR(50), name VARCHAR(255), category VARCHAR(100), -- SCD Type 2 fields valid_from DATE, valid_to DATE, is_current BOOLEAN );

Spark job

from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum, avg

spark = SparkSession.builder
.appName("ETL Job")
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()

Read with partitioning

df = spark.read
.option("inferSchema", "true")
.parquet("s3://bucket/data/")
.filter(col("date") >= "2024-01-01")

Transform

result = df
.groupBy("category", "date")
.agg( sum("amount").alias("total_amount"), avg("quantity").alias("avg_quantity") )
.repartition(10, "date") # Optimize for writes

Write partitioned

result.write
.mode("overwrite")
.partitionBy("date")
.parquet("s3://bucket/output/")

Data quality

from great_expectations.core import ExpectationSuite

suite = ExpectationSuite("sales_data")

Define expectations

suite.add_expectation( expect_column_values_to_not_be_null(column="sale_id") ) suite.add_expectation( expect_column_values_to_be_between( column="amount", min_value=0, max_value=1000000 ) ) suite.add_expectation( expect_column_values_to_be_unique(column="sale_id") )

Best practices

  • Idempotent operations (re-runnable)

  • Incremental processing over full refresh

  • Data lineage tracking

  • Schema evolution handling

  • Cost monitoring for cloud services

Examples

Input: "Design ETL for user events" Action: Create Airflow DAG with extract/transform/load, add quality checks

Input: "Optimize slow Spark job" Action: Check partitioning, reduce shuffles, tune memory settings

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

data-science

No summary provided by upstream source.

Repository SourceNeeds Review
General

c-lang

No summary provided by upstream source.

Repository SourceNeeds Review
General

cpp

No summary provided by upstream source.

Repository SourceNeeds Review