airflow-dag

Apache Airflow DAG for Construction

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "airflow-dag" with this command: npx skills add datadrivenconstruction/ddc_skills_for_ai_agents_in_construction/datadrivenconstruction-ddc-skills-for-ai-agents-in-construction-airflow-dag

Apache Airflow DAG for Construction

Overview

Apache Airflow orchestrates complex data pipelines. This skill creates DAGs for construction ETL processes - from BIM extraction to cost reports.

Python Implementation

from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass from enum import Enum import json

class TaskStatus(Enum): """Task execution status.""" PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" SKIPPED = "skipped"

@dataclass class DAGTask: """Single task in DAG.""" task_id: str operator: str params: Dict[str, Any] upstream: List[str] downstream: List[str]

@dataclass class DAGConfig: """DAG configuration.""" dag_id: str schedule: str start_date: datetime catchup: bool default_args: Dict[str, Any] tags: List[str]

class ConstructionDAGBuilder: """Build Airflow DAGs for construction pipelines."""

# Default DAG arguments
DEFAULT_ARGS = {
    'owner': 'ddc',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2)
}

def __init__(self, dag_id: str,
             schedule: str = '@daily',
             tags: List[str] = None):
    self.dag_id = dag_id
    self.schedule = schedule
    self.tags = tags or ['construction', 'ddc']
    self.tasks: Dict[str, DAGTask] = {}

def add_bash_task(self, task_id: str,
                  command: str,
                  upstream: List[str] = None) -> str:
    """Add bash command task."""
    self.tasks[task_id] = DAGTask(
        task_id=task_id,
        operator='BashOperator',
        params={'bash_command': command},
        upstream=upstream or [],
        downstream=[]
    )
    self._update_downstream(task_id, upstream)
    return task_id

def add_python_task(self, task_id: str,
                    python_callable: str,
                    op_kwargs: Dict = None,
                    upstream: List[str] = None) -> str:
    """Add Python callable task."""
    self.tasks[task_id] = DAGTask(
        task_id=task_id,
        operator='PythonOperator',
        params={
            'python_callable': python_callable,
            'op_kwargs': op_kwargs or {}
        },
        upstream=upstream or [],
        downstream=[]
    )
    self._update_downstream(task_id, upstream)
    return task_id

def add_sensor_task(self, task_id: str,
                    filepath: str,
                    upstream: List[str] = None) -> str:
    """Add file sensor task."""
    self.tasks[task_id] = DAGTask(
        task_id=task_id,
        operator='FileSensor',
        params={
            'filepath': filepath,
            'poke_interval': 300,
            'timeout': 3600
        },
        upstream=upstream or [],
        downstream=[]
    )
    self._update_downstream(task_id, upstream)
    return task_id

def add_branch_task(self, task_id: str,
                    python_callable: str,
                    upstream: List[str] = None) -> str:
    """Add branching task."""
    self.tasks[task_id] = DAGTask(
        task_id=task_id,
        operator='BranchPythonOperator',
        params={'python_callable': python_callable},
        upstream=upstream or [],
        downstream=[]
    )
    self._update_downstream(task_id, upstream)
    return task_id

def _update_downstream(self, task_id: str, upstream: List[str]):
    """Update downstream references."""
    if upstream:
        for up_task in upstream:
            if up_task in self.tasks:
                self.tasks[up_task].downstream.append(task_id)

def generate_dag_code(self) -> str:
    """Generate Airflow DAG Python code."""

    code = '''

from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.sensors.filesystem import FileSensor from datetime import datetime, timedelta

default_args = { 'owner': 'ddc', 'depends_on_past': False, 'email_on_failure': True, 'retries': 2, 'retry_delay': timedelta(minutes=5), }

''' code += f''' with DAG( dag_id='{self.dag_id}', default_args=default_args, schedule_interval='{self.schedule}', start_date=datetime(2024, 1, 1), catchup=False, tags={self.tags} ) as dag:

''' # Generate task definitions for task_id, task in self.tasks.items(): code += self._generate_task_code(task) code += '\n'

    # Generate dependencies
    code += '\n    # Task dependencies\n'
    for task_id, task in self.tasks.items():
        if task.upstream:
            for upstream in task.upstream:
                code += f"    {upstream} >> {task_id}\n"

    return code

def _generate_task_code(self, task: DAGTask) -> str:
    """Generate code for single task."""

    if task.operator == 'BashOperator':
        return f'''    {task.task_id} = BashOperator(
    task_id='{task.task_id}',
    bash_command="{task.params['bash_command']}"
)'''

    elif task.operator == 'PythonOperator':
        kwargs = json.dumps(task.params.get('op_kwargs', {}))
        return f'''    {task.task_id} = PythonOperator(
    task_id='{task.task_id}',
    python_callable={task.params['python_callable']},
    op_kwargs={kwargs}
)'''

    elif task.operator == 'FileSensor':
        return f'''    {task.task_id} = FileSensor(
    task_id='{task.task_id}',
    filepath='{task.params["filepath"]}',
    poke_interval={task.params['poke_interval']},
    timeout={task.params['timeout']}
)'''

    elif task.operator == 'BranchPythonOperator':
        return f'''    {task.task_id} = BranchPythonOperator(
    task_id='{task.task_id}',
    python_callable={task.params['python_callable']}
)'''

    return ''

def save_dag(self, output_path: str):
    """Save DAG to file."""
    code = self.generate_dag_code()
    with open(output_path, 'w') as f:
        f.write(code)
    return output_path

class ConstructionPipelineTemplates: """Pre-built construction pipeline templates."""

@staticmethod
def bim_validation_pipeline(dag_id: str = 'bim_validation') -> ConstructionDAGBuilder:
    """Create BIM validation pipeline."""
    builder = ConstructionDAGBuilder(dag_id, schedule='@daily',
                                     tags=['bim', 'validation'])

    # Wait for file
    builder.add_sensor_task('wait_for_model', '/data/input/*.ifc')

    # Convert to Excel
    builder.add_bash_task(
        'convert_ifc',
        'IfcExporter.exe /data/input/*.ifc bbox',
        upstream=['wait_for_model']
    )

    # Validate data
    builder.add_python_task(
        'validate_data',
        'validate_bim_data',
        {'rules_file': '/config/validation_rules.xlsx'},
        upstream=['convert_ifc']
    )

    # Branch based on validation
    builder.add_branch_task(
        'check_validation',
        'check_validation_result',
        upstream=['validate_data']
    )

    # Success path
    builder.add_python_task(
        'generate_report',
        'generate_validation_report',
        upstream=['check_validation']
    )

    # Failure path
    builder.add_python_task(
        'send_alert',
        'send_validation_alert',
        upstream=['check_validation']
    )

    return builder

@staticmethod
def cost_estimation_pipeline(dag_id: str = 'cost_estimation') -> ConstructionDAGBuilder:
    """Create cost estimation pipeline."""
    builder = ConstructionDAGBuilder(dag_id, schedule='@weekly',
                                     tags=['cost', 'estimation'])

    # Extract BIM data
    builder.add_bash_task('extract_bim', 'RvtExporter.exe /data/model.rvt complete bbox')

    # Generate QTO
    builder.add_python_task(
        'generate_qto',
        'generate_quantity_takeoff',
        upstream=['extract_bim']
    )

    # Match with cost database
    builder.add_python_task(
        'match_costs',
        'match_cwicr_costs',
        upstream=['generate_qto']
    )

    # Calculate estimate
    builder.add_python_task(
        'calculate_estimate',
        'calculate_project_estimate',
        upstream=['match_costs']
    )

    # Generate report
    builder.add_python_task(
        'create_report',
        'create_cost_report',
        upstream=['calculate_estimate']
    )

    return builder

@staticmethod
def batch_conversion_pipeline(dag_id: str = 'batch_convert') -> ConstructionDAGBuilder:
    """Create batch CAD conversion pipeline."""
    builder = ConstructionDAGBuilder(dag_id, schedule='0 2 * * *',  # 2 AM daily
                                     tags=['conversion', 'batch'])

    # Scan for new files
    builder.add_python_task('scan_files', 'scan_input_folder')

    # Convert Revit files
    builder.add_bash_task(
        'convert_rvt',
        'for %%f in (/data/input/*.rvt) do RvtExporter.exe "%%f" standard',
        upstream=['scan_files']
    )

    # Convert IFC files
    builder.add_bash_task(
        'convert_ifc',
        'for %%f in (/data/input/*.ifc) do IfcExporter.exe "%%f"',
        upstream=['scan_files']
    )

    # Convert DWG files
    builder.add_bash_task(
        'convert_dwg',
        'for %%f in (/data/input/*.dwg) do DwgExporter.exe "%%f"',
        upstream=['scan_files']
    )

    # Consolidate results
    builder.add_python_task(
        'consolidate',
        'consolidate_conversion_results',
        upstream=['convert_rvt', 'convert_ifc', 'convert_dwg']
    )

    # Archive input files
    builder.add_python_task(
        'archive',
        'archive_processed_files',
        upstream=['consolidate']
    )

    return builder

Quick Start

Create custom pipeline

builder = ConstructionDAGBuilder('my_pipeline', schedule='@daily')

Add tasks

builder.add_bash_task('convert', 'RvtExporter.exe model.rvt') builder.add_python_task('analyze', 'analyze_data', upstream=['convert']) builder.add_python_task('report', 'create_report', upstream=['analyze'])

Generate DAG code

code = builder.generate_dag_code() print(code)

Save to file

builder.save_dag('/airflow/dags/my_pipeline.py')

Pipeline Templates

  1. BIM Validation

templates = ConstructionPipelineTemplates() validation_dag = templates.bim_validation_pipeline() validation_dag.save_dag('/airflow/dags/bim_validation.py')

  1. Cost Estimation

cost_dag = templates.cost_estimation_pipeline() cost_dag.save_dag('/airflow/dags/cost_estimation.py')

  1. Batch Conversion

batch_dag = templates.batch_conversion_pipeline() batch_dag.save_dag('/airflow/dags/batch_convert.py')

Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

drawing-analyzer

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

cad-to-data

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

dwg-to-excel

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

cost-estimation-resource

No summary provided by upstream source.

Repository SourceNeeds Review