open-data-integrator

Based on DDC methodology (Chapter 2.2), this skill integrates open construction datasets from various sources like government databases, industry benchmarks, weather services, and geospatial data.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "open-data-integrator" with this command: npx skills add datadrivenconstruction/ddc_skills_for_ai_agents_in_construction/datadrivenconstruction-ddc-skills-for-ai-agents-in-construction-open-data-integrator

Open Data Integrator

Overview

Based on DDC methodology (Chapter 2.2), this skill integrates open construction datasets from various sources like government databases, industry benchmarks, weather services, and geospatial data.

Book Reference: "Доминирование открытых данных" / "Open Data Dominance"

Quick Start

from dataclasses import dataclass, field from enum import Enum from typing import List, Dict, Optional, Any, Callable from datetime import datetime, date import json import requests from abc import ABC, abstractmethod

class DataSourceType(Enum): """Types of open data sources""" GOVERNMENT = "government" # Government statistics INDUSTRY_BENCHMARK = "benchmark" # Industry benchmarks WEATHER = "weather" # Weather data GEOSPATIAL = "geospatial" # Geographic data MATERIAL_PRICES = "material_prices" # Material cost indices LABOR_RATES = "labor_rates" # Labor cost data BUILDING_PERMITS = "permits" # Permit data ENERGY = "energy" # Energy prices/data ECONOMIC = "economic" # Economic indicators

class UpdateFrequency(Enum): """Data update frequency""" REALTIME = "realtime" HOURLY = "hourly" DAILY = "daily" WEEKLY = "weekly" MONTHLY = "monthly" QUARTERLY = "quarterly" ANNUAL = "annual"

@dataclass class OpenDataSource: """Definition of an open data source""" id: str name: str source_type: DataSourceType url: str api_key_required: bool = False update_frequency: UpdateFrequency = UpdateFrequency.DAILY format: str = "json" license: str = "open" description: Optional[str] = None fields: List[str] = field(default_factory=list)

@dataclass class DataRecord: """A single data record from a source""" source_id: str timestamp: datetime data: Dict[str, Any] metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass class IntegrationResult: """Result of data integration""" source: str records_fetched: int records_processed: int errors: List[str] last_updated: datetime sample_data: List[Dict]

@dataclass class EnrichedData: """Data enriched with open data""" original_data: Dict[str, Any] enrichments: Dict[str, Any] sources_used: List[str] confidence: float

class OpenDataConnector(ABC): """Base class for open data connectors"""

@abstractmethod
def fetch(self, params: Dict) -> List[DataRecord]:
    pass

@abstractmethod
def get_metadata(self) -> Dict:
    pass

class WeatherDataConnector(OpenDataConnector): """Connector for weather data (e.g., OpenWeatherMap)"""

def __init__(self, api_key: Optional[str] = None):
    self.api_key = api_key
    self.base_url = "https://api.openweathermap.org/data/2.5"

def fetch(
    self,
    params: Dict
) -> List[DataRecord]:
    """Fetch weather data for location"""
    lat = params.get("lat")
    lon = params.get("lon")
    start_date = params.get("start_date")
    end_date = params.get("end_date")

    # Simulate API call (in production, use actual API)
    records = []

    # Generate sample historical data
    current = start_date
    while current <= end_date:
        records.append(DataRecord(
            source_id="openweathermap",
            timestamp=datetime.combine(current, datetime.min.time()),
            data={
                "date": current.isoformat(),
                "temp_max": 25.0,
                "temp_min": 15.0,
                "precipitation": 0.0,
                "wind_speed": 10.0,
                "weather_code": "clear"
            },
            metadata={"lat": lat, "lon": lon}
        ))
        current = date(current.year, current.month, current.day + 1) if current.day < 28 else date(current.year, current.month + 1 if current.month < 12 else 1, 1)

    return records[:30]  # Limit for demo

def get_metadata(self) -> Dict:
    return {
        "source": "OpenWeatherMap",
        "type": DataSourceType.WEATHER.value,
        "frequency": UpdateFrequency.HOURLY.value,
        "fields": ["temp_max", "temp_min", "precipitation", "wind_speed"]
    }

class MaterialPriceConnector(OpenDataConnector): """Connector for material price indices"""

def __init__(self, region: str = "US"):
    self.region = region
    self.price_indices = self._load_indices()

def _load_indices(self) -> Dict[str, Dict]:
    """Load material price indices"""
    return {
        "concrete": {"base": 100, "current": 125, "trend": "up"},
        "steel": {"base": 100, "current": 145, "trend": "up"},
        "lumber": {"base": 100, "current": 180, "trend": "stable"},
        "copper": {"base": 100, "current": 135, "trend": "up"},
        "asphalt": {"base": 100, "current": 115, "trend": "stable"},
        "gypsum": {"base": 100, "current": 110, "trend": "stable"},
        "glass": {"base": 100, "current": 105, "trend": "down"},
        "cement": {"base": 100, "current": 120, "trend": "up"},
    }

def fetch(self, params: Dict) -> List[DataRecord]:
    """Fetch material price data"""
    materials = params.get("materials", list(self.price_indices.keys()))

    records = []
    for material in materials:
        if material in self.price_indices:
            records.append(DataRecord(
                source_id="material_prices",
                timestamp=datetime.now(),
                data={
                    "material": material,
                    "region": self.region,
                    **self.price_indices[material]
                }
            ))
    return records

def get_metadata(self) -> Dict:
    return {
        "source": "Material Price Index",
        "type": DataSourceType.MATERIAL_PRICES.value,
        "frequency": UpdateFrequency.MONTHLY.value,
        "materials": list(self.price_indices.keys())
    }

class LaborRateConnector(OpenDataConnector): """Connector for labor rate data"""

def __init__(self, region: str = "US"):
    self.region = region
    self.labor_rates = self._load_rates()

def _load_rates(self) -> Dict[str, Dict]:
    """Load labor rates by trade"""
    return {
        "carpenter": {"hourly": 45.00, "burden_rate": 1.35},
        "electrician": {"hourly": 55.00, "burden_rate": 1.40},
        "plumber": {"hourly": 52.00, "burden_rate": 1.38},
        "ironworker": {"hourly": 58.00, "burden_rate": 1.42},
        "laborer": {"hourly": 32.00, "burden_rate": 1.30},
        "operator": {"hourly": 48.00, "burden_rate": 1.35},
        "mason": {"hourly": 50.00, "burden_rate": 1.36},
        "painter": {"hourly": 38.00, "burden_rate": 1.32},
        "hvac_tech": {"hourly": 54.00, "burden_rate": 1.38},
        "welder": {"hourly": 52.00, "burden_rate": 1.40},
    }

def fetch(self, params: Dict) -> List[DataRecord]:
    """Fetch labor rate data"""
    trades = params.get("trades", list(self.labor_rates.keys()))

    records = []
    for trade in trades:
        if trade in self.labor_rates:
            rate_data = self.labor_rates[trade]
            records.append(DataRecord(
                source_id="labor_rates",
                timestamp=datetime.now(),
                data={
                    "trade": trade,
                    "region": self.region,
                    "hourly_rate": rate_data["hourly"],
                    "burden_rate": rate_data["burden_rate"],
                    "fully_loaded": rate_data["hourly"] * rate_data["burden_rate"]
                }
            ))
    return records

def get_metadata(self) -> Dict:
    return {
        "source": "Labor Rate Database",
        "type": DataSourceType.LABOR_RATES.value,
        "frequency": UpdateFrequency.QUARTERLY.value,
        "trades": list(self.labor_rates.keys())
    }

class BuildingPermitConnector(OpenDataConnector): """Connector for building permit data"""

def __init__(self, jurisdiction: str = "default"):
    self.jurisdiction = jurisdiction

def fetch(self, params: Dict) -> List[DataRecord]:
    """Fetch permit data"""
    # Simulate permit data
    permit_types = ["new_construction", "renovation", "addition", "demolition"]

    records = []
    for ptype in permit_types:
        records.append(DataRecord(
            source_id="building_permits",
            timestamp=datetime.now(),
            data={
                "permit_type": ptype,
                "jurisdiction": self.jurisdiction,
                "count_ytd": 150,
                "total_value": 25000000,
                "avg_processing_days": 21
            }
        ))
    return records

def get_metadata(self) -> Dict:
    return {
        "source": "Building Permit Database",
        "type": DataSourceType.BUILDING_PERMITS.value,
        "frequency": UpdateFrequency.DAILY.value
    }

class OpenDataIntegrator: """ Integrate open construction datasets. Based on DDC methodology Chapter 2.2. """

def __init__(self, region: str = "US"):
    self.region = region
    self.connectors: Dict[str, OpenDataConnector] = {}
    self.cache: Dict[str, List[DataRecord]] = {}
    self.cache_expiry: Dict[str, datetime] = {}
    self._register_default_connectors()

def _register_default_connectors(self):
    """Register default data connectors"""
    self.register_connector("weather", WeatherDataConnector())
    self.register_connector("material_prices", MaterialPriceConnector(self.region))
    self.register_connector("labor_rates", LaborRateConnector(self.region))
    self.register_connector("permits", BuildingPermitConnector())

def register_connector(
    self,
    name: str,
    connector: OpenDataConnector
):
    """Register a data connector"""
    self.connectors[name] = connector

def fetch_data(
    self,
    source: str,
    params: Optional[Dict] = None,
    use_cache: bool = True
) -> IntegrationResult:
    """
    Fetch data from a source.

    Args:
        source: Name of the data source
        params: Query parameters
        use_cache: Whether to use cached data

    Returns:
        Integration result with fetched data
    """
    if source not in self.connectors:
        return IntegrationResult(
            source=source,
            records_fetched=0,
            records_processed=0,
            errors=[f"Unknown source: {source}"],
            last_updated=datetime.now(),
            sample_data=[]
        )

    # Check cache
    cache_key = f"{source}_{json.dumps(params or {}, sort_keys=True)}"
    if use_cache and cache_key in self.cache:
        expiry = self.cache_expiry.get(cache_key)
        if expiry and expiry > datetime.now():
            cached = self.cache[cache_key]
            return IntegrationResult(
                source=source,
                records_fetched=len(cached),
                records_processed=len(cached),
                errors=[],
                last_updated=expiry,
                sample_data=[r.data for r in cached[:5]]
            )

    # Fetch fresh data
    connector = self.connectors[source]
    errors = []

    try:
        records = connector.fetch(params or {})

        # Cache the results
        self.cache[cache_key] = records
        self.cache_expiry[cache_key] = datetime.now()

        return IntegrationResult(
            source=source,
            records_fetched=len(records),
            records_processed=len(records),
            errors=errors,
            last_updated=datetime.now(),
            sample_data=[r.data for r in records[:5]]
        )

    except Exception as e:
        errors.append(str(e))
        return IntegrationResult(
            source=source,
            records_fetched=0,
            records_processed=0,
            errors=errors,
            last_updated=datetime.now(),
            sample_data=[]
        )

def enrich_project_data(
    self,
    project_data: Dict[str, Any],
    enrichment_sources: Optional[List[str]] = None
) -> EnrichedData:
    """
    Enrich project data with open data.

    Args:
        project_data: Original project data
        enrichment_sources: Sources to use for enrichment

    Returns:
        Enriched data
    """
    sources = enrichment_sources or ["material_prices", "labor_rates", "weather"]
    enrichments = {}
    sources_used = []

    # Material price enrichment
    if "material_prices" in sources and "materials" in project_data:
        materials = project_data["materials"]
        result = self.fetch_data("material_prices", {"materials": materials})
        if result.records_fetched > 0:
            enrichments["material_price_indices"] = result.sample_data
            sources_used.append("material_prices")

    # Labor rate enrichment
    if "labor_rates" in sources and "trades" in project_data:
        trades = project_data["trades"]
        result = self.fetch_data("labor_rates", {"trades": trades})
        if result.records_fetched > 0:
            enrichments["labor_rates"] = result.sample_data
            sources_used.append("labor_rates")

    # Weather enrichment
    if "weather" in sources and "location" in project_data:
        loc = project_data["location"]
        params = {
            "lat": loc.get("lat"),
            "lon": loc.get("lon"),
            "start_date": project_data.get("start_date", date.today()),
            "end_date": project_data.get("end_date", date.today())
        }
        result = self.fetch_data("weather", params)
        if result.records_fetched > 0:
            enrichments["weather_forecast"] = result.sample_data
            sources_used.append("weather")

    # Calculate confidence based on enrichment success
    confidence = len(sources_used) / len(sources) if sources else 0

    return EnrichedData(
        original_data=project_data,
        enrichments=enrichments,
        sources_used=sources_used,
        confidence=confidence
    )

def get_cost_indices(
    self,
    materials: Optional[List[str]] = None,
    trades: Optional[List[str]] = None
) -> Dict:
    """Get current cost indices"""
    indices = {
        "timestamp": datetime.now().isoformat(),
        "region": self.region
    }

    if materials:
        result = self.fetch_data("material_prices", {"materials": materials})
        indices["materials"] = result.sample_data

    if trades:
        result = self.fetch_data("labor_rates", {"trades": trades})
        indices["labor"] = result.sample_data

    return indices

def get_weather_risk(
    self,
    lat: float,
    lon: float,
    start_date: date,
    end_date: date
) -> Dict:
    """Assess weather risk for project period"""
    result = self.fetch_data("weather", {
        "lat": lat,
        "lon": lon,
        "start_date": start_date,
        "end_date": end_date
    })

    if result.records_fetched == 0:
        return {"error": "No weather data available"}

    # Calculate risk metrics
    rain_days = sum(1 for d in result.sample_data
                   if d.get("precipitation", 0) > 5)
    extreme_temp_days = sum(1 for d in result.sample_data
                           if d.get("temp_max", 0) > 35 or d.get("temp_min", 0) < 0)

    total_days = len(result.sample_data)
    risk_score = (rain_days + extreme_temp_days) / total_days if total_days > 0 else 0

    return {
        "total_days": total_days,
        "rain_days": rain_days,
        "extreme_temperature_days": extreme_temp_days,
        "risk_score": risk_score,
        "risk_level": "high" if risk_score > 0.3 else "medium" if risk_score > 0.1 else "low"
    }

def list_sources(self) -> List[Dict]:
    """List all available data sources"""
    sources = []
    for name, connector in self.connectors.items():
        meta = connector.get_metadata()
        sources.append({
            "name": name,
            **meta
        })
    return sources

def generate_report(self) -> str:
    """Generate data availability report"""
    output = """

Open Data Integration Report

Available Sources

""" for source in self.list_sources(): output += f"""

{source['name'].title()}

  • Type: {source['type']}

  • Update Frequency: {source['frequency']} """

      output += """
    

Cache Status

""" for key, expiry in self.cache_expiry.items(): status = "valid" if expiry > datetime.now() else "expired" output += f"- {key}: {status}\n"

    return output

Common Use Cases

Fetch Material Prices

integrator = OpenDataIntegrator(region="US")

Get material price indices

result = integrator.fetch_data("material_prices", { "materials": ["concrete", "steel", "lumber"] })

print(f"Fetched: {result.records_fetched} records") for record in result.sample_data: print(f" {record['material']}: index={record['current']}, trend={record['trend']}")

Enrich Project Data

project = { "name": "Office Building", "materials": ["concrete", "steel", "glass"], "trades": ["carpenter", "electrician", "plumber"], "location": {"lat": 40.7128, "lon": -74.0060}, "start_date": date(2024, 6, 1), "end_date": date(2024, 12, 31) }

enriched = integrator.enrich_project_data(project)

print(f"Sources used: {enriched.sources_used}") print(f"Confidence: {enriched.confidence:.0%}") print(f"Material indices: {enriched.enrichments.get('material_price_indices')}")

Assess Weather Risk

risk = integrator.get_weather_risk( lat=40.7128, lon=-74.0060, start_date=date(2024, 6, 1), end_date=date(2024, 8, 31) )

print(f"Risk Level: {risk['risk_level']}") print(f"Rain Days: {risk['rain_days']}")

Quick Reference

Component Purpose

OpenDataIntegrator

Main integration engine

OpenDataConnector

Base connector class

WeatherDataConnector

Weather API connector

MaterialPriceConnector

Material price indices

LaborRateConnector

Labor rate data

EnrichedData

Enriched data result

Resources

Next Steps

  • Use ontology-mapper for semantic mapping

  • Use cost-prediction with indices

  • Use weather-impact-analysis for scheduling

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

cad-to-data

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

drawing-analyzer

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

dwg-to-excel

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

cost-estimation-resource

No summary provided by upstream source.

Repository SourceNeeds Review