Equipment Telematics
Overview
Integrate telematics data from heavy construction equipment (excavators, cranes, loaders, trucks) to monitor utilization, track location, analyze fuel efficiency, predict maintenance needs, and ensure safe operation.
Telematics Data Flow
┌─────────────────────────────────────────────────────────────────┐ │ EQUIPMENT TELEMATICS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ EQUIPMENT TELEMATICS ANALYTICS │ │ ───────── ────────── ───────── │ │ │ │ 🚜 Excavator ────┐ 📍 Location 📊 Utilization│ │ 🏗️ Crane ────┼──────→ 🔧 Engine Hours ────────→ ⛽ Fuel │ │ 🚛 Truck ────┤ ⛽ Fuel Level 🔧 Maintenance│ │ 🚧 Loader ────┘ ⚡ Performance 👷 Operator │ │ │ │ METRICS TRACKED: │ │ • GPS location and geofencing │ │ • Engine hours and idle time │ │ • Fuel consumption rate │ │ • Load cycles and productivity │ │ • Fault codes and diagnostics │ │ • Operator behavior and safety │ │ │ └─────────────────────────────────────────────────────────────────┘
Technical Implementation
from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple from datetime import datetime, timedelta from enum import Enum import statistics import math
class EquipmentType(Enum): EXCAVATOR = "excavator" CRANE = "crane" LOADER = "loader" BULLDOZER = "bulldozer" DUMP_TRUCK = "dump_truck" CONCRETE_MIXER = "concrete_mixer" FORKLIFT = "forklift" COMPACTOR = "compactor" GRADER = "grader" TELEHANDLER = "telehandler"
class OperatingStatus(Enum): OPERATING = "operating" IDLE = "idle" OFF = "off" MAINTENANCE = "maintenance" FAULT = "fault"
class FaultSeverity(Enum): INFO = "info" WARNING = "warning" CRITICAL = "critical" SHUTDOWN = "shutdown"
@dataclass class GPSLocation: latitude: float longitude: float altitude: float = 0.0 speed: float = 0.0 heading: float = 0.0 timestamp: datetime = field(default_factory=datetime.now)
@dataclass class TelematicsReading: equipment_id: str timestamp: datetime location: GPSLocation engine_hours: float fuel_level: float # Percentage fuel_rate: float # L/hr engine_rpm: int hydraulic_temp: float coolant_temp: float operating_status: OperatingStatus load_percentage: float = 0.0 operator_id: str = ""
@dataclass class FaultCode: code: str description: str severity: FaultSeverity timestamp: datetime equipment_id: str resolved: bool = False
@dataclass class Equipment: id: str name: str equipment_type: EquipmentType make: str model: str year: int serial_number: str hourly_rate: float = 0.0 fuel_capacity: float = 0.0 # Liters current_hours: float = 0.0 next_service_hours: float = 0.0 assigned_site: str = "" assigned_operator: str = ""
@dataclass class Geofence: id: str name: str center_lat: float center_lon: float radius_meters: float allowed_equipment: List[str] = field(default_factory=list)
@dataclass class UtilizationReport: equipment_id: str period_start: datetime period_end: datetime total_hours: float operating_hours: float idle_hours: float off_hours: float utilization_pct: float idle_pct: float fuel_consumed: float fuel_efficiency: float # L/operating hour cycles: int
class EquipmentTelematics: """Integrate and analyze equipment telematics data."""
# Maintenance intervals by type (hours)
SERVICE_INTERVALS = {
EquipmentType.EXCAVATOR: 250,
EquipmentType.CRANE: 200,
EquipmentType.LOADER: 250,
EquipmentType.BULLDOZER: 250,
EquipmentType.DUMP_TRUCK: 300,
}
# Typical fuel rates (L/hr)
TYPICAL_FUEL_RATES = {
EquipmentType.EXCAVATOR: 15,
EquipmentType.CRANE: 12,
EquipmentType.LOADER: 18,
EquipmentType.BULLDOZER: 25,
EquipmentType.DUMP_TRUCK: 20,
}
def __init__(self, fleet_name: str):
self.fleet_name = fleet_name
self.equipment: Dict[str, Equipment] = {}
self.readings: List[TelematicsReading] = []
self.faults: List[FaultCode] = []
self.geofences: Dict[str, Geofence] = {}
def register_equipment(self, id: str, name: str, equipment_type: EquipmentType,
make: str, model: str, year: int, serial_number: str,
hourly_rate: float = 0, fuel_capacity: float = 0) -> Equipment:
"""Register equipment in fleet."""
equipment = Equipment(
id=id,
name=name,
equipment_type=equipment_type,
make=make,
model=model,
year=year,
serial_number=serial_number,
hourly_rate=hourly_rate,
fuel_capacity=fuel_capacity
)
self.equipment[id] = equipment
return equipment
def add_geofence(self, id: str, name: str, center_lat: float,
center_lon: float, radius_meters: float,
allowed_equipment: List[str] = None) -> Geofence:
"""Add geofence boundary."""
geofence = Geofence(
id=id,
name=name,
center_lat=center_lat,
center_lon=center_lon,
radius_meters=radius_meters,
allowed_equipment=allowed_equipment or []
)
self.geofences[id] = geofence
return geofence
def ingest_reading(self, equipment_id: str, location: GPSLocation,
engine_hours: float, fuel_level: float, fuel_rate: float,
engine_rpm: int, hydraulic_temp: float, coolant_temp: float,
load_percentage: float = 0, operator_id: str = "") -> TelematicsReading:
"""Ingest telematics reading from equipment."""
if equipment_id not in self.equipment:
raise ValueError(f"Unknown equipment: {equipment_id}")
# Determine operating status
if engine_rpm == 0:
status = OperatingStatus.OFF
elif engine_rpm < 800 or load_percentage < 10:
status = OperatingStatus.IDLE
else:
status = OperatingStatus.OPERATING
reading = TelematicsReading(
equipment_id=equipment_id,
timestamp=location.timestamp,
location=location,
engine_hours=engine_hours,
fuel_level=fuel_level,
fuel_rate=fuel_rate,
engine_rpm=engine_rpm,
hydraulic_temp=hydraulic_temp,
coolant_temp=coolant_temp,
operating_status=status,
load_percentage=load_percentage,
operator_id=operator_id
)
self.readings.append(reading)
# Update equipment status
equip = self.equipment[equipment_id]
equip.current_hours = engine_hours
# Check for issues
self._check_diagnostics(equipment_id, reading)
self._check_geofence(equipment_id, location)
return reading
def _check_diagnostics(self, equipment_id: str, reading: TelematicsReading):
"""Check for diagnostic issues."""
equip = self.equipment[equipment_id]
# High temperature warning
if reading.hydraulic_temp > 90:
self._add_fault(equipment_id, "HYD_TEMP_HIGH",
"Hydraulic temperature high", FaultSeverity.WARNING)
if reading.coolant_temp > 100:
self._add_fault(equipment_id, "COOLANT_TEMP_HIGH",
"Coolant temperature critical", FaultSeverity.CRITICAL)
# Low fuel warning
if reading.fuel_level < 15:
self._add_fault(equipment_id, "FUEL_LOW",
"Fuel level below 15%", FaultSeverity.WARNING)
# Service due
service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250)
hours_to_service = equip.next_service_hours - reading.engine_hours
if hours_to_service < 0:
self._add_fault(equipment_id, "SERVICE_OVERDUE",
"Maintenance service overdue", FaultSeverity.WARNING)
elif hours_to_service < 50:
self._add_fault(equipment_id, "SERVICE_DUE",
f"Service due in {hours_to_service:.0f} hours", FaultSeverity.INFO)
def _check_geofence(self, equipment_id: str, location: GPSLocation):
"""Check geofence violations."""
for geofence in self.geofences.values():
# Calculate distance from center
distance = self._haversine_distance(
location.latitude, location.longitude,
geofence.center_lat, geofence.center_lon
)
if distance > geofence.radius_meters:
if (not geofence.allowed_equipment or
equipment_id in geofence.allowed_equipment):
self._add_fault(equipment_id, "GEOFENCE_EXIT",
f"Equipment left {geofence.name} boundary",
FaultSeverity.WARNING)
def _haversine_distance(self, lat1: float, lon1: float,
lat2: float, lon2: float) -> float:
"""Calculate distance between two coordinates in meters."""
R = 6371000 # Earth radius in meters
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
delta_phi = math.radians(lat2 - lat1)
delta_lambda = math.radians(lon2 - lon1)
a = (math.sin(delta_phi/2)**2 +
math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
def _add_fault(self, equipment_id: str, code: str,
description: str, severity: FaultSeverity):
"""Add fault code."""
# Check if same fault already active
existing = [f for f in self.faults
if f.equipment_id == equipment_id
and f.code == code
and not f.resolved]
if existing:
return
fault = FaultCode(
code=code,
description=description,
severity=severity,
timestamp=datetime.now(),
equipment_id=equipment_id
)
self.faults.append(fault)
def get_current_status(self, equipment_id: str) -> Dict:
"""Get current status of equipment."""
if equipment_id not in self.equipment:
raise ValueError(f"Unknown equipment: {equipment_id}")
equip = self.equipment[equipment_id]
# Get latest reading
readings = [r for r in self.readings if r.equipment_id == equipment_id]
if not readings:
return {"equipment": equip, "status": "no_data"}
latest = max(readings, key=lambda r: r.timestamp)
# Active faults
active_faults = [f for f in self.faults
if f.equipment_id == equipment_id and not f.resolved]
return {
"equipment_id": equip.id,
"name": equip.name,
"type": equip.equipment_type.value,
"status": latest.operating_status.value,
"location": {
"lat": latest.location.latitude,
"lon": latest.location.longitude,
"speed": latest.location.speed
},
"engine_hours": latest.engine_hours,
"fuel_level": latest.fuel_level,
"fuel_rate": latest.fuel_rate,
"temps": {
"hydraulic": latest.hydraulic_temp,
"coolant": latest.coolant_temp
},
"operator": latest.operator_id,
"active_faults": len(active_faults),
"last_update": latest.timestamp
}
def calculate_utilization(self, equipment_id: str,
start_date: datetime,
end_date: datetime) -> UtilizationReport:
"""Calculate utilization metrics for equipment."""
readings = [r for r in self.readings
if r.equipment_id == equipment_id
and start_date <= r.timestamp <= end_date]
if not readings:
return None
readings.sort(key=lambda r: r.timestamp)
total_hours = (end_date - start_date).total_seconds() / 3600
operating_hours = 0
idle_hours = 0
fuel_consumed = 0
# Calculate from readings
for i in range(1, len(readings)):
prev = readings[i-1]
curr = readings[i]
interval_hours = (curr.timestamp - prev.timestamp).total_seconds() / 3600
if prev.operating_status == OperatingStatus.OPERATING:
operating_hours += interval_hours
fuel_consumed += prev.fuel_rate * interval_hours
elif prev.operating_status == OperatingStatus.IDLE:
idle_hours += interval_hours
fuel_consumed += prev.fuel_rate * interval_hours * 0.3 # Idle uses ~30% fuel
off_hours = total_hours - operating_hours - idle_hours
utilization_pct = (operating_hours / total_hours * 100) if total_hours > 0 else 0
idle_pct = (idle_hours / (operating_hours + idle_hours) * 100) if (operating_hours + idle_hours) > 0 else 0
fuel_efficiency = (fuel_consumed / operating_hours) if operating_hours > 0 else 0
return UtilizationReport(
equipment_id=equipment_id,
period_start=start_date,
period_end=end_date,
total_hours=total_hours,
operating_hours=operating_hours,
idle_hours=idle_hours,
off_hours=off_hours,
utilization_pct=utilization_pct,
idle_pct=idle_pct,
fuel_consumed=fuel_consumed,
fuel_efficiency=fuel_efficiency,
cycles=0 # Would need load cycle detection
)
def get_fleet_summary(self) -> Dict:
"""Get summary of entire fleet."""
summary = {
"total_equipment": len(self.equipment),
"by_status": {},
"by_type": {},
"active_faults": 0,
"service_due": []
}
for equip in self.equipment.values():
# Count by type
eq_type = equip.equipment_type.value
summary["by_type"][eq_type] = summary["by_type"].get(eq_type, 0) + 1
# Get current status
try:
status = self.get_current_status(equip.id)
op_status = status.get("status", "unknown")
summary["by_status"][op_status] = summary["by_status"].get(op_status, 0) + 1
# Check service due
service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250)
hours_to_service = equip.next_service_hours - equip.current_hours
if hours_to_service < 50:
summary["service_due"].append({
"equipment": equip.name,
"hours_remaining": hours_to_service
})
except Exception:
summary["by_status"]["unknown"] = summary["by_status"].get("unknown", 0) + 1
# Count active faults
summary["active_faults"] = len([f for f in self.faults if not f.resolved])
return summary
def predict_maintenance(self, equipment_id: str) -> Dict:
"""Predict maintenance needs based on usage patterns."""
if equipment_id not in self.equipment:
raise ValueError(f"Unknown equipment: {equipment_id}")
equip = self.equipment[equipment_id]
# Calculate average daily hours
week_ago = datetime.now() - timedelta(days=7)
recent_readings = [r for r in self.readings
if r.equipment_id == equipment_id
and r.timestamp > week_ago]
if len(recent_readings) < 2:
return {"prediction": "insufficient_data"}
hours_start = min(r.engine_hours for r in recent_readings)
hours_end = max(r.engine_hours for r in recent_readings)
days = (max(r.timestamp for r in recent_readings) -
min(r.timestamp for r in recent_readings)).days or 1
daily_hours = (hours_end - hours_start) / days
# Predict service date
service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250)
hours_to_service = equip.next_service_hours - equip.current_hours
if daily_hours > 0:
days_to_service = hours_to_service / daily_hours
service_date = datetime.now() + timedelta(days=days_to_service)
else:
service_date = None
return {
"equipment_id": equipment_id,
"current_hours": equip.current_hours,
"next_service_hours": equip.next_service_hours,
"hours_to_service": hours_to_service,
"avg_daily_hours": daily_hours,
"predicted_service_date": service_date,
"service_type": "Routine maintenance",
"estimated_downtime_hours": 8
}
def generate_report(self) -> str:
"""Generate fleet telematics report."""
summary = self.get_fleet_summary()
lines = [
"# Equipment Telematics Report",
"",
f"**Fleet:** {self.fleet_name}",
f"**Report Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
"",
"## Fleet Summary",
"",
f"| Metric | Value |",
f"|--------|-------|",
f"| Total Equipment | {summary['total_equipment']} |",
f"| Active Faults | {summary['active_faults']} |",
f"| Service Due | {len(summary['service_due'])} |",
"",
"## Status Distribution",
""
]
for status, count in summary["by_status"].items():
lines.append(f"- {status}: {count}")
# Equipment details
lines.extend([
"",
"## Equipment Status",
"",
"| Equipment | Type | Status | Hours | Fuel | Faults |",
"|-----------|------|--------|-------|------|--------|"
])
for equip in self.equipment.values():
try:
status = self.get_current_status(equip.id)
status_icon = "✅" if status['status'] == 'operating' else "⏸️" if status['status'] == 'idle' else "⏹️"
lines.append(
f"| {equip.name} | {equip.equipment_type.value} | "
f"{status_icon} {status['status']} | {status['engine_hours']:.0f} | "
f"{status['fuel_level']:.0f}% | {status['active_faults']} |"
)
except Exception:
lines.append(
f"| {equip.name} | {equip.equipment_type.value} | ⚠️ No data | - | - | - |"
)
# Service due
if summary["service_due"]:
lines.extend([
"",
"## Service Due Soon",
"",
"| Equipment | Hours Remaining |",
"|-----------|-----------------|"
])
for svc in summary["service_due"]:
lines.append(f"| {svc['equipment']} | {svc['hours_remaining']:.0f} |")
# Active faults
active_faults = [f for f in self.faults if not f.resolved]
if active_faults:
lines.extend([
"",
"## Active Faults",
"",
"| Equipment | Code | Description | Severity |",
"|-----------|------|-------------|----------|"
])
for fault in active_faults[:10]:
sev_icon = "🔴" if fault.severity == FaultSeverity.CRITICAL else "🟡"
equip = self.equipment.get(fault.equipment_id)
lines.append(
f"| {equip.name if equip else fault.equipment_id} | "
f"{fault.code} | {fault.description} | {sev_icon} {fault.severity.value} |"
)
return "\n".join(lines)
Quick Start
from datetime import datetime, timedelta
Initialize telematics system
telematics = EquipmentTelematics("Site A Fleet")
Register equipment
telematics.register_equipment( "EX-001", "Excavator #1", EquipmentType.EXCAVATOR, make="Caterpillar", model="320", year=2022, serial_number="CAT320X12345", hourly_rate=150, fuel_capacity=400 )
telematics.register_equipment( "CR-001", "Tower Crane #1", EquipmentType.CRANE, make="Liebherr", model="200EC-H", year=2021, serial_number="LH200EC54321", hourly_rate=200, fuel_capacity=300 )
Add geofence for site boundary
telematics.add_geofence( "SITE-A", "Site A Boundary", center_lat=40.7128, center_lon=-74.0060, radius_meters=500 )
Ingest telematics reading
location = GPSLocation( latitude=40.7128, longitude=-74.0059, speed=5.0, timestamp=datetime.now() )
telematics.ingest_reading( "EX-001", location, engine_hours=1250.5, fuel_level=65.0, fuel_rate=18.5, engine_rpm=1800, hydraulic_temp=75.0, coolant_temp=85.0, load_percentage=75, operator_id="OP-101" )
Get current status
status = telematics.get_current_status("EX-001") print(f"Excavator status: {status['status']}") print(f"Location: {status['location']}") print(f"Fuel: {status['fuel_level']}%")
Calculate utilization
util = telematics.calculate_utilization( "EX-001", datetime.now() - timedelta(days=7), datetime.now() ) if util: print(f"Utilization: {util.utilization_pct:.1f}%") print(f"Fuel efficiency: {util.fuel_efficiency:.1f} L/hr")
Predict maintenance
maintenance = telematics.predict_maintenance("EX-001") print(f"Days to service: {maintenance.get('predicted_service_date')}")
Fleet summary
summary = telematics.get_fleet_summary() print(f"Fleet: {summary['total_equipment']} units")
Generate report
print(telematics.generate_report())
Requirements
pip install (no external dependencies)