environmental-monitoring

Environmental Monitoring

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "environmental-monitoring" with this command: npx skills add datadrivenconstruction/ddc_skills_for_ai_agents_in_construction/datadrivenconstruction-ddc-skills-for-ai-agents-in-construction-environmental-monitoring

Environmental Monitoring

Overview

Monitor and analyze environmental conditions on construction sites including air quality, noise, vibration, dust, and weather. Support regulatory compliance, worker safety, and community relations through real-time environmental tracking.

Environmental Monitoring System

┌─────────────────────────────────────────────────────────────────┐ │ ENVIRONMENTAL MONITORING │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ SENSORS MONITORING COMPLIANCE │ │ ─────── ────────── ────────── │ │ │ │ 💨 Air Quality ───┐ ✅ OSHA limits │ │ 🔊 Noise Level ───┼─────→ Real-time ────────→ ✅ EPA limits │ │ 📊 Vibration ───┤ Dashboard ✅ Local codes │ │ 🌫️ Dust/PM ───┤ Alerts ✅ Permits │ │ 🌡️ Weather ───┘ Reports ✅ Neighbors │ │ │ │ THRESHOLDS: │ │ • Noise: 85 dB (OSHA 8hr TWA) │ │ • PM2.5: 35 µg/m³ (EPA 24hr) │ │ • Vibration: 25 mm/s (structural) │ │ • CO: 50 ppm (OSHA ceiling) │ │ │ └─────────────────────────────────────────────────────────────────┘

Technical Implementation

from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple from datetime import datetime, timedelta from enum import Enum import statistics import math

class ParameterType(Enum): NOISE = "noise" PM25 = "pm25" PM10 = "pm10" CO = "co" CO2 = "co2" VOC = "voc" VIBRATION = "vibration" TEMPERATURE = "temperature" HUMIDITY = "humidity" WIND_SPEED = "wind_speed" WIND_DIRECTION = "wind_direction" RAINFALL = "rainfall"

class ComplianceStatus(Enum): COMPLIANT = "compliant" WARNING = "warning" EXCEEDANCE = "exceedance" CRITICAL = "critical"

class AlertType(Enum): THRESHOLD_WARNING = "threshold_warning" THRESHOLD_EXCEEDANCE = "threshold_exceedance" EQUIPMENT_MALFUNCTION = "equipment_malfunction" WEATHER_ALERT = "weather_alert" COMMUNITY_COMPLAINT = "community_complaint"

@dataclass class RegulatoryLimit: parameter: ParameterType limit_value: float unit: str averaging_period_hours: float # e.g., 8 for 8-hour TWA regulation: str # e.g., "OSHA", "EPA" description: str

@dataclass class EnvironmentalReading: station_id: str parameter: ParameterType timestamp: datetime value: float unit: str quality_flag: str = "valid"

@dataclass class MonitoringStation: id: str name: str location: Dict # {lat, lon, description} parameters: List[ParameterType] installation_date: datetime last_calibration: datetime status: str = "active"

@dataclass class ComplianceRecord: parameter: ParameterType regulation: str limit_value: float measured_value: float averaging_period: str status: ComplianceStatus timestamp: datetime location: str

@dataclass class EnvironmentalAlert: id: str alert_type: AlertType parameter: ParameterType station_id: str timestamp: datetime value: float threshold: float message: str acknowledged: bool = False resolved: bool = False resolution_notes: str = ""

@dataclass class DailyReport: date: datetime site_name: str parameters_monitored: int readings_collected: int exceedances: int alerts_triggered: int compliance_status: ComplianceStatus summary: Dict[str, Dict]

class EnvironmentalMonitor: """Monitor environmental conditions on construction sites."""

# Default regulatory limits
REGULATORY_LIMITS = {
    ParameterType.NOISE: [
        RegulatoryLimit(ParameterType.NOISE, 85, "dBA", 8.0, "OSHA", "8-hour TWA"),
        RegulatoryLimit(ParameterType.NOISE, 90, "dBA", 8.0, "OSHA", "Action level"),
        RegulatoryLimit(ParameterType.NOISE, 115, "dBA", 0.25, "OSHA", "15-min max"),
    ],
    ParameterType.PM25: [
        RegulatoryLimit(ParameterType.PM25, 35, "µg/m³", 24.0, "EPA", "24-hour standard"),
        RegulatoryLimit(ParameterType.PM25, 12, "µg/m³", 8760.0, "EPA", "Annual standard"),
    ],
    ParameterType.PM10: [
        RegulatoryLimit(ParameterType.PM10, 150, "µg/m³", 24.0, "EPA", "24-hour standard"),
    ],
    ParameterType.CO: [
        RegulatoryLimit(ParameterType.CO, 50, "ppm", 0.0, "OSHA", "Ceiling limit"),
        RegulatoryLimit(ParameterType.CO, 35, "ppm", 8.0, "OSHA", "8-hour TWA"),
    ],
    ParameterType.VIBRATION: [
        RegulatoryLimit(ParameterType.VIBRATION, 25, "mm/s", 0.0, "ISO 4866", "Structural damage threshold"),
        RegulatoryLimit(ParameterType.VIBRATION, 5, "mm/s", 0.0, "DIN 4150", "Sensitive structures"),
    ],
}

def __init__(self, site_name: str):
    self.site_name = site_name
    self.stations: Dict[str, MonitoringStation] = {}
    self.readings: List[EnvironmentalReading] = []
    self.alerts: List[EnvironmentalAlert] = []
    self.custom_limits: Dict[ParameterType, List[RegulatoryLimit]] = {}

def add_station(self, id: str, name: str, location: Dict,
               parameters: List[ParameterType]) -> MonitoringStation:
    """Add monitoring station."""
    station = MonitoringStation(
        id=id,
        name=name,
        location=location,
        parameters=parameters,
        installation_date=datetime.now(),
        last_calibration=datetime.now()
    )
    self.stations[id] = station
    return station

def add_custom_limit(self, parameter: ParameterType, limit_value: float,
                    unit: str, averaging_hours: float, regulation: str,
                    description: str):
    """Add custom regulatory limit."""
    limit = RegulatoryLimit(
        parameter=parameter,
        limit_value=limit_value,
        unit=unit,
        averaging_period_hours=averaging_hours,
        regulation=regulation,
        description=description
    )
    if parameter not in self.custom_limits:
        self.custom_limits[parameter] = []
    self.custom_limits[parameter].append(limit)

def record_reading(self, station_id: str, parameter: ParameterType,
                  value: float, unit: str,
                  timestamp: datetime = None) -> EnvironmentalReading:
    """Record environmental reading."""
    if station_id not in self.stations:
        raise ValueError(f"Unknown station: {station_id}")

    reading = EnvironmentalReading(
        station_id=station_id,
        parameter=parameter,
        timestamp=timestamp or datetime.now(),
        value=value,
        unit=unit
    )

    self.readings.append(reading)

    # Check against limits
    self._check_limits(station_id, parameter, value)

    return reading

def record_batch(self, readings: List[Dict]) -> int:
    """Record multiple readings."""
    count = 0
    for r in readings:
        try:
            self.record_reading(
                station_id=r['station_id'],
                parameter=ParameterType(r['parameter']),
                value=r['value'],
                unit=r['unit'],
                timestamp=r.get('timestamp')
            )
            count += 1
        except Exception:
            pass
    return count

def _check_limits(self, station_id: str, parameter: ParameterType, value: float):
    """Check value against regulatory limits."""
    # Get applicable limits
    limits = self.REGULATORY_LIMITS.get(parameter, [])
    limits.extend(self.custom_limits.get(parameter, []))

    for limit in limits:
        if limit.averaging_period_hours == 0:
            # Instantaneous limit
            check_value = value
        else:
            # Time-weighted average
            check_value = self._calculate_twa(
                station_id, parameter, limit.averaging_period_hours
            )
            if check_value is None:
                continue

        # Check against limit
        if check_value >= limit.limit_value:
            self._create_alert(
                station_id, parameter, check_value, limit
            )
        elif check_value >= limit.limit_value * 0.8:
            # Warning at 80% of limit
            self._create_alert(
                station_id, parameter, check_value, limit,
                is_warning=True
            )

def _calculate_twa(self, station_id: str, parameter: ParameterType,
                  hours: float) -> Optional[float]:
    """Calculate time-weighted average."""
    cutoff = datetime.now() - timedelta(hours=hours)
    readings = [r for r in self.readings
               if r.station_id == station_id
               and r.parameter == parameter
               and r.timestamp > cutoff]

    if not readings:
        return None

    return statistics.mean([r.value for r in readings])

def _create_alert(self, station_id: str, parameter: ParameterType,
                 value: float, limit: RegulatoryLimit,
                 is_warning: bool = False):
    """Create environmental alert."""
    # Avoid duplicate alerts
    recent_alerts = [a for a in self.alerts
                    if a.station_id == station_id
                    and a.parameter == parameter
                    and not a.resolved
                    and (datetime.now() - a.timestamp).total_seconds() < 3600]

    if recent_alerts:
        return

    alert_type = (AlertType.THRESHOLD_WARNING if is_warning
                 else AlertType.THRESHOLD_EXCEEDANCE)

    station = self.stations.get(station_id)

    alert = EnvironmentalAlert(
        id=f"ENV-{len(self.alerts)+1:05d}",
        alert_type=alert_type,
        parameter=parameter,
        station_id=station_id,
        timestamp=datetime.now(),
        value=value,
        threshold=limit.limit_value,
        message=f"{parameter.value} {'approaching' if is_warning else 'exceeds'} "
                f"{limit.regulation} limit ({limit.limit_value} {limit.unit}) "
                f"at {station.name if station else station_id}"
    )

    self.alerts.append(alert)

def get_current_conditions(self, station_id: str = None) -> Dict:
    """Get current environmental conditions."""
    conditions = {}

    stations = ([self.stations[station_id]] if station_id
               else self.stations.values())

    for station in stations:
        station_conditions = {}

        for param in station.parameters:
            # Get latest reading
            readings = [r for r in self.readings
                       if r.station_id == station.id
                       and r.parameter == param]

            if readings:
                latest = max(readings, key=lambda r: r.timestamp)
                station_conditions[param.value] = {
                    "value": latest.value,
                    "unit": latest.unit,
                    "timestamp": latest.timestamp,
                    "status": self._get_compliance_status(param, latest.value)
                }

        conditions[station.id] = {
            "name": station.name,
            "location": station.location,
            "parameters": station_conditions
        }

    return conditions

def _get_compliance_status(self, parameter: ParameterType,
                          value: float) -> ComplianceStatus:
    """Determine compliance status for value."""
    limits = self.REGULATORY_LIMITS.get(parameter, [])
    limits.extend(self.custom_limits.get(parameter, []))

    # Check instantaneous limits
    instant_limits = [l for l in limits if l.averaging_period_hours == 0]
    for limit in instant_limits:
        if value >= limit.limit_value:
            return ComplianceStatus.EXCEEDANCE
        elif value >= limit.limit_value * 0.9:
            return ComplianceStatus.WARNING

    return ComplianceStatus.COMPLIANT

def check_compliance(self, start_date: datetime,
                    end_date: datetime) -> List[ComplianceRecord]:
    """Check compliance for period."""
    records = []

    for station in self.stations.values():
        for param in station.parameters:
            limits = self.REGULATORY_LIMITS.get(param, [])
            limits.extend(self.custom_limits.get(param, []))

            for limit in limits:
                # Calculate average for period
                readings = [r for r in self.readings
                           if r.station_id == station.id
                           and r.parameter == param
                           and start_date <= r.timestamp <= end_date]

                if not readings:
                    continue

                avg_value = statistics.mean([r.value for r in readings])
                max_value = max(r.value for r in readings)

                # Check appropriate value
                if limit.averaging_period_hours == 0:
                    check_value = max_value
                    period_str = "Instantaneous"
                else:
                    check_value = avg_value
                    period_str = f"{limit.averaging_period_hours:.0f}-hour avg"

                # Determine status
                if check_value >= limit.limit_value:
                    status = ComplianceStatus.EXCEEDANCE
                elif check_value >= limit.limit_value * 0.9:
                    status = ComplianceStatus.WARNING
                else:
                    status = ComplianceStatus.COMPLIANT

                records.append(ComplianceRecord(
                    parameter=param,
                    regulation=limit.regulation,
                    limit_value=limit.limit_value,
                    measured_value=check_value,
                    averaging_period=period_str,
                    status=status,
                    timestamp=end_date,
                    location=station.name
                ))

    return records

def get_exceedance_summary(self, days: int = 30) -> Dict:
    """Get summary of exceedances."""
    cutoff = datetime.now() - timedelta(days=days)
    recent_alerts = [a for a in self.alerts
                   if a.timestamp > cutoff
                   and a.alert_type == AlertType.THRESHOLD_EXCEEDANCE]

    summary = {
        "period_days": days,
        "total_exceedances": len(recent_alerts),
        "by_parameter": {},
        "by_station": {},
        "recent_events": []
    }

    for alert in recent_alerts:
        # By parameter
        param = alert.parameter.value
        summary["by_parameter"][param] = summary["by_parameter"].get(param, 0) + 1

        # By station
        station = alert.station_id
        summary["by_station"][station] = summary["by_station"].get(station, 0) + 1

    # Recent events
    summary["recent_events"] = sorted(
        recent_alerts, key=lambda a: a.timestamp, reverse=True
    )[:10]

    return summary

def generate_daily_report(self, date: datetime = None) -> DailyReport:
    """Generate daily environmental report."""
    if date is None:
        date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

    next_day = date + timedelta(days=1)

    # Filter readings
    day_readings = [r for r in self.readings
                   if date <= r.timestamp < next_day]

    # Filter alerts
    day_alerts = [a for a in self.alerts
                 if date <= a.timestamp < next_day]

    # Check compliance
    compliance_records = self.check_compliance(date, next_day)
    exceedances = [r for r in compliance_records
                  if r.status == ComplianceStatus.EXCEEDANCE]

    # Overall status
    if exceedances:
        overall_status = ComplianceStatus.EXCEEDANCE
    elif any(r.status == ComplianceStatus.WARNING for r in compliance_records):
        overall_status = ComplianceStatus.WARNING
    else:
        overall_status = ComplianceStatus.COMPLIANT

    # Summary by parameter
    param_summary = {}
    for param in ParameterType:
        param_readings = [r for r in day_readings if r.parameter == param]
        if param_readings:
            values = [r.value for r in param_readings]
            param_summary[param.value] = {
                "count": len(values),
                "min": min(values),
                "max": max(values),
                "avg": statistics.mean(values),
                "exceedances": len([r for r in compliance_records
                                   if r.parameter == param
                                   and r.status == ComplianceStatus.EXCEEDANCE])
            }

    return DailyReport(
        date=date,
        site_name=self.site_name,
        parameters_monitored=len(set(r.parameter for r in day_readings)),
        readings_collected=len(day_readings),
        exceedances=len(exceedances),
        alerts_triggered=len(day_alerts),
        compliance_status=overall_status,
        summary=param_summary
    )

def generate_report(self) -> str:
    """Generate environmental monitoring report."""
    lines = [
        "# Environmental Monitoring Report",
        "",
        f"**Site:** {self.site_name}",
        f"**Report Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
        "",
        "## Monitoring Stations",
        "",
        "| Station | Location | Parameters | Status |",
        "|---------|----------|------------|--------|"
    ]

    for station in self.stations.values():
        params = ", ".join([p.value for p in station.parameters])
        lines.append(
            f"| {station.name} | {station.location.get('description', '-')} | "
            f"{params} | {station.status} |"
        )

    # Current conditions
    conditions = self.get_current_conditions()
    lines.extend([
        "",
        "## Current Conditions",
        ""
    ])

    for station_id, data in conditions.items():
        lines.append(f"### {data['name']}")
        lines.append("")
        lines.append("| Parameter | Value | Status |")
        lines.append("|-----------|-------|--------|")

        for param, values in data['parameters'].items():
            status_icon = ("✅" if values['status'] == ComplianceStatus.COMPLIANT
                          else "⚠️" if values['status'] == ComplianceStatus.WARNING
                          else "🔴")
            lines.append(
                f"| {param} | {values['value']:.1f} {values['unit']} | "
                f"{status_icon} {values['status'].value} |"
            )

        lines.append("")

    # Exceedance summary
    exceedance_summary = self.get_exceedance_summary(30)
    lines.extend([
        "## 30-Day Exceedance Summary",
        "",
        f"**Total Exceedances:** {exceedance_summary['total_exceedances']}",
        ""
    ])

    if exceedance_summary['by_parameter']:
        lines.append("By Parameter:")
        for param, count in exceedance_summary['by_parameter'].items():
            lines.append(f"- {param}: {count}")

    # Active alerts
    active_alerts = [a for a in self.alerts if not a.resolved]
    if active_alerts:
        lines.extend([
            "",
            f"## Active Alerts ({len(active_alerts)})",
            "",
            "| Time | Parameter | Station | Value | Threshold |",
            "|------|-----------|---------|-------|-----------|"
        ])

        for alert in sorted(active_alerts, key=lambda a: a.timestamp, reverse=True)[:10]:
            lines.append(
                f"| {alert.timestamp.strftime('%Y-%m-%d %H:%M')} | "
                f"{alert.parameter.value} | {alert.station_id} | "
                f"{alert.value:.1f} | {alert.threshold} |"
            )

    return "\n".join(lines)

Quick Start

from datetime import datetime, timedelta

Initialize monitor

monitor = EnvironmentalMonitor("Downtown Construction Site")

Add monitoring stations

monitor.add_station( "STA-001", "North Perimeter", location={"lat": 40.7128, "lon": -74.0060, "description": "North fence line"}, parameters=[ParameterType.NOISE, ParameterType.PM25, ParameterType.PM10] )

monitor.add_station( "STA-002", "Equipment Area", location={"lat": 40.7125, "lon": -74.0055, "description": "Near excavation"}, parameters=[ParameterType.NOISE, ParameterType.VIBRATION, ParameterType.CO] )

Add custom limit for local ordinance

monitor.add_custom_limit( ParameterType.NOISE, 65, "dBA", 0, "Local Ordinance", "Residential boundary limit" )

Record readings

monitor.record_reading("STA-001", ParameterType.NOISE, 78.5, "dBA") monitor.record_reading("STA-001", ParameterType.PM25, 28.3, "µg/m³") monitor.record_reading("STA-002", ParameterType.VIBRATION, 8.2, "mm/s")

Batch record

readings = [ {"station_id": "STA-001", "parameter": "noise", "value": 82.0, "unit": "dBA"}, {"station_id": "STA-001", "parameter": "pm25", "value": 31.5, "unit": "µg/m³"}, {"station_id": "STA-002", "parameter": "noise", "value": 88.0, "unit": "dBA"} ] monitor.record_batch(readings)

Get current conditions

conditions = monitor.get_current_conditions() for station, data in conditions.items(): print(f"\n{data['name']}:") for param, values in data['parameters'].items(): print(f" {param}: {values['value']} {values['unit']} - {values['status'].value}")

Check compliance

compliance = monitor.check_compliance( datetime.now() - timedelta(days=1), datetime.now() ) for record in compliance: if record.status != ComplianceStatus.COMPLIANT: print(f"⚠️ {record.parameter.value}: {record.measured_value} vs limit {record.limit_value}")

Generate daily report

report = monitor.generate_daily_report() print(f"\nDaily Status: {report.compliance_status.value}") print(f"Exceedances: {report.exceedances}")

Full report

print(monitor.generate_report())

Requirements

pip install (no external dependencies)

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

cad-to-data

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

drawing-analyzer

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

dwg-to-excel

No summary provided by upstream source.

Repository SourceNeeds Review