Data Engineering Guide
Data pipelines, warehousing, and modern data stack.
When to Use
-
Building data pipelines
-
Designing data warehouses
-
Implementing ETL/ELT processes
-
Setting up data lakes
-
Optimizing data infrastructure
Modern Data Stack
Components
Sources → Ingestion → Storage → Transform → Serve → Consume
Layer Tools
Ingestion Fivetran, Airbyte, Stitch
Storage S3, GCS, Snowflake, BigQuery
Transform dbt, Spark, Airflow
Orchestration Airflow, Dagster, Prefect
Serving Looker, Tableau, Metabase
Data Pipeline Patterns
Batch Processing
Airflow DAG example
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime
dag = DAG( 'daily_etl', schedule_interval='0 6 * * *', start_date=datetime(2024, 1, 1) )
def extract(): # Extract from source pass
def transform(): # Transform data pass
def load(): # Load to warehouse pass
extract_task = PythonOperator( task_id='extract', python_callable=extract, dag=dag )
transform_task = PythonOperator( task_id='transform', python_callable=transform, dag=dag )
load_task = PythonOperator( task_id='load', python_callable=load, dag=dag )
extract_task >> transform_task >> load_task
Streaming Processing
Kafka consumer example
from kafka import KafkaConsumer import json
consumer = KafkaConsumer( 'events', bootstrap_servers=['localhost:9092'], value_deserializer=lambda x: json.loads(x.decode('utf-8')) )
for message in consumer: process_event(message.value)
dbt Patterns
Model Structure
models/ ├── staging/ # 1:1 with source │ ├── stg_orders.sql │ └── stg_customers.sql ├── intermediate/ # Business logic │ └── int_order_items.sql └── marts/ # Final models ├── dim_customers.sql └── fct_orders.sql
Example Model
-- models/marts/fct_orders.sql {{ config( materialized='incremental', unique_key='order_id' ) }}
select o.order_id, o.customer_id, o.order_date, sum(oi.quantity * oi.unit_price) as order_total from {{ ref('stg_orders') }} o join {{ ref('stg_order_items') }} oi on o.order_id = oi.order_id {% if is_incremental() %} where o.order_date > (select max(order_date) from {{ this }}) {% endif %} group by 1, 2, 3
Data Modeling
Dimensional Modeling
Fact Tables (events/transactions) ├── fct_orders ├── fct_page_views └── fct_transactions
Dimension Tables (context) ├── dim_customers ├── dim_products ├── dim_dates └── dim_locations
Star Schema
dim_customers
│
dim_dates ── fct_orders ── dim_products │ dim_locations
Data Quality
Validation Rules
-- dbt tests models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_total
tests:
- not_null
- positive_value
- name: order_id
tests:
Quality Metrics
Metric Description
Completeness % non-null values
Uniqueness % distinct values
Timeliness Data freshness
Accuracy Matches source
Consistency Across systems
Performance Optimization
Partitioning
-- BigQuery partitioned table CREATE TABLE orders PARTITION BY DATE(order_date) CLUSTER BY customer_id AS SELECT * FROM staging.orders
Query Optimization
Technique Impact
Partitioning Reduce scanned data
Clustering Improve filter speed
Materialization Pre-compute joins
Caching Reduce repeat queries
Monitoring
Pipeline Metrics
Metric Alert Threshold
Runtime
2x normal
Row count ±20% variance
Freshness
SLA
Failures Any failure
Data Observability
Monte Carlo / Elementary example
monitors:
- table: fct_orders
tests:
- freshness: threshold: 6 hours
- volume: threshold: 10%
- schema_change: true
Best Practices
Pipeline Design
-
Idempotent operations
-
Incremental processing
-
Clear data lineage
-
Automated testing
Data Governance
-
Document all models
-
Track data lineage
-
Implement access controls
-
Version control SQL
Cost Management
-
Monitor query costs
-
Use partitioning
-
Schedule off-peak
-
Archive old data