Incident Reporting System
Overview
Comprehensive incident reporting system for construction safety. Capture near-misses, injuries, and property damage. Conduct root cause analysis and track corrective actions to prevent recurrence.
"Near-miss reporting prevents 90% of future serious incidents" — DDC Community
Incident Pyramid
△
/│\ Fatality (1)
/ │ \
/ │ \ Serious Injury (10)
/ │ \
/ │ \ Minor Injury (30)
/ │ \
/ │ \ Near Miss (300)
/ │ \
/ │ \ Unsafe Acts (3000)
──────────┴──────────
Technical Implementation
from dataclasses import dataclass, field from typing import List, Dict, Optional from enum import Enum from datetime import datetime, timedelta import json
class IncidentType(Enum): NEAR_MISS = "near_miss" FIRST_AID = "first_aid" MEDICAL_TREATMENT = "medical_treatment" LOST_TIME = "lost_time" FATALITY = "fatality" PROPERTY_DAMAGE = "property_damage" ENVIRONMENTAL = "environmental"
class IncidentCategory(Enum): FALL = "fall" STRUCK_BY = "struck_by" CAUGHT_IN = "caught_in" ELECTROCUTION = "electrocution" VEHICLE = "vehicle" MATERIAL_HANDLING = "material_handling" TOOL_EQUIPMENT = "tool_equipment" SLIP_TRIP = "slip_trip" FIRE = "fire" CHEMICAL = "chemical" OTHER = "other"
class InvestigationStatus(Enum): REPORTED = "reported" UNDER_INVESTIGATION = "under_investigation" ROOT_CAUSE_IDENTIFIED = "root_cause_identified" CORRECTIVE_ACTIONS_ASSIGNED = "corrective_actions_assigned" IN_REMEDIATION = "in_remediation" CLOSED = "closed"
@dataclass class Person: name: str company: str role: str contact: str years_experience: int = 0
@dataclass class CorrectiveAction: id: str description: str assigned_to: str due_date: datetime status: str = "open" completed_date: Optional[datetime] = None verification_notes: str = ""
@dataclass class Incident: id: str incident_type: IncidentType category: IncidentCategory date_time: datetime location: str project_id: str project_name: str
# Description
description: str
immediate_actions: str
# People involved
injured_person: Optional[Person] = None
witnesses: List[Person] = field(default_factory=list)
reported_by: str = ""
# Investigation
status: InvestigationStatus = InvestigationStatus.REPORTED
root_causes: List[str] = field(default_factory=list)
contributing_factors: List[str] = field(default_factory=list)
corrective_actions: List[CorrectiveAction] = field(default_factory=list)
# Documentation
photos: List[str] = field(default_factory=list)
weather_conditions: str = ""
equipment_involved: List[str] = field(default_factory=list)
# Metrics
days_lost: int = 0
property_damage_cost: float = 0.0
osha_recordable: bool = False
class IncidentManager: """Manage construction incident reporting and investigation."""
# 5 Whys root cause categories
ROOT_CAUSE_CATEGORIES = [
"Training/Competency",
"Procedures/Work Instructions",
"Equipment/Tools",
"Supervision",
"Communication",
"Housekeeping",
"PPE",
"Work Environment",
"Physical/Mental State",
"Management System"
]
def __init__(self):
self.incidents: Dict[str, Incident] = {}
self.corrective_actions: Dict[str, CorrectiveAction] = {}
def report_incident(self, incident_type: IncidentType,
category: IncidentCategory,
date_time: datetime,
location: str,
project_id: str,
project_name: str,
description: str,
immediate_actions: str,
reported_by: str,
injured_person: Dict = None) -> Incident:
"""Report new incident."""
incident_id = f"INC-{datetime.now().strftime('%Y%m%d%H%M%S')}"
injured = None
if injured_person:
injured = Person(**injured_person)
incident = Incident(
id=incident_id,
incident_type=incident_type,
category=category,
date_time=date_time,
location=location,
project_id=project_id,
project_name=project_name,
description=description,
immediate_actions=immediate_actions,
reported_by=reported_by,
injured_person=injured
)
# Auto-flag OSHA recordable
if incident_type in [IncidentType.MEDICAL_TREATMENT,
IncidentType.LOST_TIME,
IncidentType.FATALITY]:
incident.osha_recordable = True
self.incidents[incident_id] = incident
return incident
def add_witness(self, incident_id: str, witness: Dict) -> Incident:
"""Add witness to incident."""
if incident_id not in self.incidents:
raise ValueError(f"Incident {incident_id} not found")
self.incidents[incident_id].witnesses.append(Person(**witness))
return self.incidents[incident_id]
def conduct_investigation(self, incident_id: str,
root_causes: List[str],
contributing_factors: List[str]) -> Incident:
"""Record investigation findings."""
if incident_id not in self.incidents:
raise ValueError(f"Incident {incident_id} not found")
incident = self.incidents[incident_id]
incident.root_causes = root_causes
incident.contributing_factors = contributing_factors
incident.status = InvestigationStatus.ROOT_CAUSE_IDENTIFIED
return incident
def five_whys_analysis(self, incident_id: str, whys: List[str]) -> Dict:
"""Conduct 5 Whys analysis."""
if incident_id not in self.incidents:
raise ValueError(f"Incident {incident_id} not found")
analysis = {
"incident_id": incident_id,
"analysis_date": datetime.now().isoformat(),
"whys": []
}
for i, why in enumerate(whys):
analysis["whys"].append({
"level": i + 1,
"question": f"Why #{i+1}?",
"answer": why
})
# The last "why" is typically the root cause
if whys:
self.incidents[incident_id].root_causes.append(whys[-1])
return analysis
def assign_corrective_action(self, incident_id: str,
description: str,
assigned_to: str,
due_days: int = 7) -> CorrectiveAction:
"""Assign corrective action."""
if incident_id not in self.incidents:
raise ValueError(f"Incident {incident_id} not found")
action_id = f"CA-{datetime.now().strftime('%Y%m%d%H%M%S')}"
action = CorrectiveAction(
id=action_id,
description=description,
assigned_to=assigned_to,
due_date=datetime.now() + timedelta(days=due_days)
)
self.incidents[incident_id].corrective_actions.append(action)
self.incidents[incident_id].status = InvestigationStatus.CORRECTIVE_ACTIONS_ASSIGNED
self.corrective_actions[action_id] = action
return action
def complete_corrective_action(self, action_id: str,
verification_notes: str) -> CorrectiveAction:
"""Mark corrective action complete."""
if action_id not in self.corrective_actions:
raise ValueError(f"Corrective action {action_id} not found")
action = self.corrective_actions[action_id]
action.status = "completed"
action.completed_date = datetime.now()
action.verification_notes = verification_notes
return action
def get_incident_metrics(self, project_id: str = None,
start_date: datetime = None,
end_date: datetime = None) -> Dict:
"""Calculate incident metrics."""
incidents = list(self.incidents.values())
if project_id:
incidents = [i for i in incidents if i.project_id == project_id]
if start_date:
incidents = [i for i in incidents if i.date_time >= start_date]
if end_date:
incidents = [i for i in incidents if i.date_time <= end_date]
# Calculate metrics
total = len(incidents)
near_misses = len([i for i in incidents if i.incident_type == IncidentType.NEAR_MISS])
first_aid = len([i for i in incidents if i.incident_type == IncidentType.FIRST_AID])
recordables = len([i for i in incidents if i.osha_recordable])
lost_time = len([i for i in incidents if i.incident_type == IncidentType.LOST_TIME])
total_days_lost = sum(i.days_lost for i in incidents)
# Category breakdown
by_category = {}
for cat in IncidentCategory:
count = len([i for i in incidents if i.category == cat])
if count > 0:
by_category[cat.value] = count
return {
"total_incidents": total,
"near_misses": near_misses,
"first_aid_cases": first_aid,
"osha_recordables": recordables,
"lost_time_incidents": lost_time,
"total_days_lost": total_days_lost,
"by_category": by_category,
"near_miss_ratio": near_misses / recordables if recordables else 0
}
def calculate_trir(self, hours_worked: int, project_id: str = None) -> float:
"""Calculate Total Recordable Incident Rate."""
incidents = list(self.incidents.values())
if project_id:
incidents = [i for i in incidents if i.project_id == project_id]
recordables = len([i for i in incidents if i.osha_recordable])
if hours_worked == 0:
return 0
# TRIR = (Recordables × 200,000) / Hours Worked
return (recordables * 200000) / hours_worked
def calculate_dart(self, hours_worked: int, project_id: str = None) -> float:
"""Calculate Days Away, Restricted, or Transferred rate."""
incidents = list(self.incidents.values())
if project_id:
incidents = [i for i in incidents if i.project_id == project_id]
dart_cases = len([i for i in incidents
if i.incident_type in [IncidentType.LOST_TIME]])
if hours_worked == 0:
return 0
return (dart_cases * 200000) / hours_worked
def get_trend_analysis(self, months: int = 6) -> List[Dict]:
"""Analyze incident trends over time."""
trends = []
now = datetime.now()
for i in range(months):
month_start = datetime(now.year, now.month - i, 1) if now.month > i else datetime(now.year - 1, 12 - (i - now.month), 1)
month_end = month_start.replace(day=28) + timedelta(days=4)
month_end = month_end - timedelta(days=month_end.day)
month_incidents = [inc for inc in self.incidents.values()
if month_start <= inc.date_time <= month_end]
trends.append({
"month": month_start.strftime("%Y-%m"),
"total": len(month_incidents),
"near_misses": len([i for i in month_incidents if i.incident_type == IncidentType.NEAR_MISS]),
"recordables": len([i for i in month_incidents if i.osha_recordable])
})
return list(reversed(trends))
def generate_incident_report(self, incident_id: str) -> str:
"""Generate detailed incident report."""
if incident_id not in self.incidents:
return "Incident not found"
inc = self.incidents[incident_id]
lines = [
f"# Incident Report",
f"",
f"**Incident ID:** {inc.id}",
f"**Type:** {inc.incident_type.value}",
f"**Category:** {inc.category.value}",
f"**Date/Time:** {inc.date_time.strftime('%Y-%m-%d %H:%M')}",
f"**Location:** {inc.location}",
f"**Project:** {inc.project_name}",
f"**Status:** {inc.status.value}",
f"**OSHA Recordable:** {'Yes' if inc.osha_recordable else 'No'}",
f"",
f"## Description",
f"{inc.description}",
f"",
f"## Immediate Actions Taken",
f"{inc.immediate_actions}",
f"",
]
if inc.injured_person:
lines.extend([
f"## Injured Person",
f"- Name: {inc.injured_person.name}",
f"- Company: {inc.injured_person.company}",
f"- Role: {inc.injured_person.role}",
f"- Experience: {inc.injured_person.years_experience} years",
f""
])
if inc.root_causes:
lines.extend([
f"## Root Causes",
*[f"- {rc}" for rc in inc.root_causes],
f""
])
if inc.corrective_actions:
lines.extend([
f"## Corrective Actions",
f"| Action | Assigned To | Due | Status |",
f"|--------|-------------|-----|--------|"
])
for ca in inc.corrective_actions:
lines.append(f"| {ca.description} | {ca.assigned_to} | {ca.due_date.strftime('%Y-%m-%d')} | {ca.status} |")
return "\n".join(lines)
Quick Start
Initialize manager
manager = IncidentManager()
Report near-miss
incident = manager.report_incident( incident_type=IncidentType.NEAR_MISS, category=IncidentCategory.STRUCK_BY, date_time=datetime.now(), location="Level 5, Grid C-4", project_id="PRJ-001", project_name="Office Tower", description="Unsecured tool fell from scaffold, landed 2 feet from worker", immediate_actions="Area cordoned off, toolbox talk conducted", reported_by="Site Foreman" )
5 Whys analysis
analysis = manager.five_whys_analysis(incident.id, [ "Tool fell from scaffold", "Tool was not secured", "No tool lanyard was used", "Worker not trained on tool tethering", "Tool tethering training not included in orientation" ])
Assign corrective actions
manager.assign_corrective_action( incident.id, "Update orientation to include tool tethering training", "Safety Manager", due_days=14 )
manager.assign_corrective_action( incident.id, "Provide tool lanyards to all workers at height", "Procurement", due_days=7 )
Get metrics
metrics = manager.get_incident_metrics() print(f"Total Incidents: {metrics['total_incidents']}") print(f"Near Miss Ratio: {metrics['near_miss_ratio']:.1f}")
Generate report
print(manager.generate_incident_report(incident.id))
Requirements
pip install (no external dependencies)