resource-leveler

Resource Leveler for Construction

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "resource-leveler" with this command: npx skills add datadrivenconstruction/ddc_skills_for_ai_agents_in_construction/datadrivenconstruction-ddc-skills-for-ai-agents-in-construction-resource-leveler

Resource Leveler for Construction

Overview

Optimize resource allocation across construction schedules. Level labor and equipment to avoid peaks, balance workload, and maintain project deadlines while reducing costs.

Business Case

Resource leveling provides:

  • Cost Reduction: Avoid overtime and idle time

  • Workforce Stability: Consistent crew sizes

  • Equipment Optimization: Reduce rental costs

  • Realistic Schedules: Achievable resource plans

Technical Implementation

from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Tuple from datetime import datetime, date, timedelta import pandas as pd import numpy as np from collections import defaultdict

@dataclass class Resource: id: str name: str resource_type: str # labor, equipment, material max_units: float cost_per_unit: float unit: str # hours, days, each

@dataclass class ResourceAssignment: task_id: str resource_id: str units: float start_date: date end_date: date

@dataclass class Task: id: str name: str duration: int # days start_date: date end_date: date predecessors: List[str] total_float: int is_critical: bool resource_assignments: List[ResourceAssignment] = field(default_factory=list)

@dataclass class LevelingResult: success: bool original_end_date: date leveled_end_date: date tasks_moved: int peak_reduction: Dict[str, float] warnings: List[str]

class ConstructionResourceLeveler: """Level resources across construction schedules."""

def __init__(self):
    self.resources: Dict[str, Resource] = {}
    self.tasks: Dict[str, Task] = {}
    self.assignments: List[ResourceAssignment] = []

def add_resource(self, resource: Resource):
    """Add a resource to the pool."""
    self.resources[resource.id] = resource

def add_task(self, task: Task):
    """Add a task to the schedule."""
    self.tasks[task.id] = task

def add_assignment(self, assignment: ResourceAssignment):
    """Assign a resource to a task."""
    self.assignments.append(assignment)
    if assignment.task_id in self.tasks:
        self.tasks[assignment.task_id].resource_assignments.append(assignment)

def calculate_resource_usage(self, start_date: date = None,
                              end_date: date = None) -> pd.DataFrame:
    """Calculate daily resource usage."""
    if not self.assignments:
        return pd.DataFrame()

    # Determine date range
    if start_date is None:
        start_date = min(a.start_date for a in self.assignments)
    if end_date is None:
        end_date = max(a.end_date for a in self.assignments)

    # Create date range
    dates = pd.date_range(start_date, end_date, freq='D')

    # Initialize usage matrix
    usage = {r_id: [0.0] * len(dates) for r_id in self.resources}

    # Fill in usage
    for assignment in self.assignments:
        if assignment.resource_id in usage:
            for i, d in enumerate(dates):
                if assignment.start_date <= d.date() <= assignment.end_date:
                    usage[assignment.resource_id][i] += assignment.units

    df = pd.DataFrame(usage, index=dates)
    df.index.name = 'date'

    return df

def identify_overallocations(self) -> List[Dict]:
    """Identify resource overallocations."""
    usage = self.calculate_resource_usage()
    overallocations = []

    for resource_id, resource in self.resources.items():
        if resource_id in usage.columns:
            daily_usage = usage[resource_id]
            over_days = daily_usage[daily_usage > resource.max_units]

            if len(over_days) > 0:
                overallocations.append({
                    'resource_id': resource_id,
                    'resource_name': resource.name,
                    'max_units': resource.max_units,
                    'peak_usage': daily_usage.max(),
                    'over_by': daily_usage.max() - resource.max_units,
                    'days_overallocated': len(over_days),
                    'first_overallocation': over_days.index[0].date(),
                    'worst_day': daily_usage.idxmax().date()
                })

    return overallocations

def level_resources(self, method: str = 'float_priority',
                    protect_critical_path: bool = True,
                    max_extension: int = 30) -> LevelingResult:
    """Level resources to resolve overallocations."""

    original_end = max(t.end_date for t in self.tasks.values())
    tasks_moved = 0
    warnings = []

    # Get initial overallocations
    initial_over = self.identify_overallocations()
    if not initial_over:
        return LevelingResult(
            success=True,
            original_end_date=original_end,
            leveled_end_date=original_end,
            tasks_moved=0,
            peak_reduction={},
            warnings=["No overallocations found"]
        )

    # Track peak usage before
    usage_before = self.calculate_resource_usage()
    peaks_before = {r: usage_before[r].max() for r in usage_before.columns}

    # Leveling loop
    iteration = 0
    max_iterations = len(self.tasks) * 2

    while iteration < max_iterations:
        iteration += 1
        overallocations = self.identify_overallocations()

        if not overallocations:
            break

        # Find task to move
        moved = False
        for over in overallocations:
            resource_id = over['resource_id']
            worst_day = over['worst_day']

            # Find tasks using this resource on worst day
            candidates = self._find_movable_tasks(
                resource_id, worst_day, protect_critical_path
            )

            if candidates:
                # Sort by priority (lowest float first to preserve options)
                candidates.sort(key=lambda t: -t.total_float)
                task_to_move = candidates[0]

                # Calculate new dates
                new_start, new_end = self._calculate_shift(
                    task_to_move, resource_id, max_extension
                )

                if new_start:
                    self._shift_task(task_to_move.id, new_start, new_end)
                    tasks_moved += 1
                    moved = True
                    break

        if not moved:
            warnings.append("Could not resolve all overallocations")
            break

    # Calculate results
    usage_after = self.calculate_resource_usage()
    peaks_after = {r: usage_after[r].max() for r in usage_after.columns}

    peak_reduction = {}
    for r in peaks_before:
        if r in peaks_after:
            reduction = (peaks_before[r] - peaks_after[r]) / peaks_before[r] * 100
            peak_reduction[r] = reduction

    leveled_end = max(t.end_date for t in self.tasks.values())

    if leveled_end > original_end + timedelta(days=max_extension):
        warnings.append(f"Project extended beyond max allowed ({max_extension} days)")

    remaining_over = self.identify_overallocations()

    return LevelingResult(
        success=len(remaining_over) == 0,
        original_end_date=original_end,
        leveled_end_date=leveled_end,
        tasks_moved=tasks_moved,
        peak_reduction=peak_reduction,
        warnings=warnings
    )

def _find_movable_tasks(self, resource_id: str, on_date: date,
                        protect_critical: bool) -> List[Task]:
    """Find tasks that can be moved to reduce overallocation."""
    candidates = []

    for task in self.tasks.values():
        # Check if task uses this resource on this date
        uses_resource = any(
            a.resource_id == resource_id and
            a.start_date <= on_date <= a.end_date
            for a in task.resource_assignments
        )

        if not uses_resource:
            continue

        # Check if critical path protected
        if protect_critical and task.is_critical:
            continue

        # Check if has float
        if task.total_float > 0:
            candidates.append(task)

    return candidates

def _calculate_shift(self, task: Task, resource_id: str,
                     max_extension: int) -> Tuple[date, date]:
    """Calculate optimal shift for a task."""
    resource = self.resources[resource_id]

    # Try shifting forward
    for days in range(1, min(task.total_float + 1, max_extension + 1)):
        new_start = task.start_date + timedelta(days=days)
        new_end = task.end_date + timedelta(days=days)

        # Check if this resolves overallocation
        temp_usage = self._calculate_usage_if_moved(task.id, new_start, new_end)

        if temp_usage.get(resource_id, 0) <= resource.max_units:
            return new_start, new_end

    return None, None

def _calculate_usage_if_moved(self, task_id: str, new_start: date,
                               new_end: date) -> Dict[str, float]:
    """Calculate resource usage if task were moved."""
    # Simplified: calculate peak on affected dates
    usage = defaultdict(float)

    for assignment in self.assignments:
        if assignment.task_id == task_id:
            # Use new dates
            for d in pd.date_range(new_start, new_end):
                usage[assignment.resource_id] = max(
                    usage[assignment.resource_id],
                    assignment.units
                )
        else:
            # Use existing dates
            for d in pd.date_range(assignment.start_date, assignment.end_date):
                usage[assignment.resource_id] = max(
                    usage[assignment.resource_id],
                    assignment.units
                )

    return dict(usage)

def _shift_task(self, task_id: str, new_start: date, new_end: date):
    """Shift a task to new dates."""
    task = self.tasks[task_id]
    delta = new_start - task.start_date

    # Update task
    task.start_date = new_start
    task.end_date = new_end

    # Update assignments
    for assignment in self.assignments:
        if assignment.task_id == task_id:
            assignment.start_date += delta
            assignment.end_date += delta

def optimize_crew_size(self, resource_id: str,
                        target_utilization: float = 0.85) -> Dict:
    """Recommend optimal crew size for a resource."""
    usage = self.calculate_resource_usage()

    if resource_id not in usage.columns:
        return None

    daily_usage = usage[resource_id]
    resource = self.resources[resource_id]

    # Calculate statistics
    peak = daily_usage.max()
    avg = daily_usage.mean()
    working_days = (daily_usage > 0).sum()

    # Current utilization
    current_util = avg / resource.max_units if resource.max_units > 0 else 0

    # Optimal size for target utilization
    optimal_size = avg / target_utilization

    return {
        'resource_id': resource_id,
        'current_max_units': resource.max_units,
        'peak_usage': peak,
        'average_usage': avg,
        'working_days': int(working_days),
        'current_utilization': current_util,
        'recommended_max_units': round(optimal_size, 1),
        'potential_savings': (resource.max_units - optimal_size) * resource.cost_per_unit * working_days
    }

def generate_histogram(self, resource_id: str) -> pd.DataFrame:
    """Generate resource histogram data."""
    usage = self.calculate_resource_usage()

    if resource_id not in usage.columns:
        return pd.DataFrame()

    resource = self.resources[resource_id]

    df = pd.DataFrame({
        'date': usage.index,
        'usage': usage[resource_id].values,
        'capacity': resource.max_units,
        'overallocated': usage[resource_id].values > resource.max_units
    })

    return df

def generate_report(self) -> str:
    """Generate resource leveling report."""
    lines = ["# Resource Leveling Report", ""]
    lines.append(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M')}")
    lines.append(f"**Resources:** {len(self.resources)}")
    lines.append(f"**Tasks:** {len(self.tasks)}")
    lines.append("")

    # Overallocations
    overallocations = self.identify_overallocations()
    if overallocations:
        lines.append("## Overallocations Found")
        for over in overallocations:
            lines.append(f"\n### {over['resource_name']}")
            lines.append(f"- **Max Units:** {over['max_units']}")
            lines.append(f"- **Peak Usage:** {over['peak_usage']}")
            lines.append(f"- **Days Overallocated:** {over['days_overallocated']}")
            lines.append(f"- **Worst Day:** {over['worst_day']}")
    else:
        lines.append("## No Overallocations")
        lines.append("All resources are within capacity.")

    # Resource utilization
    lines.append("\n## Resource Utilization")
    for resource_id in self.resources:
        opt = self.optimize_crew_size(resource_id)
        if opt:
            lines.append(f"\n### {self.resources[resource_id].name}")
            lines.append(f"- **Utilization:** {opt['current_utilization']:.1%}")
            lines.append(f"- **Peak:** {opt['peak_usage']:.1f}")
            lines.append(f"- **Average:** {opt['average_usage']:.1f}")

    return "\n".join(lines)

Quick Start

from datetime import date

Initialize leveler

leveler = ConstructionResourceLeveler()

Add resources

leveler.add_resource(Resource( id="CARP", name="Carpenters", resource_type="labor", max_units=10, cost_per_unit=75, unit="hours" ))

Add tasks

leveler.add_task(Task( id="T1", name="Frame Level 1", duration=10, start_date=date(2026, 3, 1), end_date=date(2026, 3, 14), predecessors=[], total_float=5, is_critical=False ))

Add assignments

leveler.add_assignment(ResourceAssignment( task_id="T1", resource_id="CARP", units=8, start_date=date(2026, 3, 1), end_date=date(2026, 3, 14) ))

Check overallocations

overallocations = leveler.identify_overallocations() for over in overallocations: print(f"{over['resource_name']}: {over['peak_usage']} vs {over['max_units']} max")

Level resources

result = leveler.level_resources(protect_critical_path=True) print(f"Tasks moved: {result.tasks_moved}") print(f"End date change: {result.original_end_date} -> {result.leveled_end_date}")

Generate report

print(leveler.generate_report())

Dependencies

pip install pandas numpy

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

drawing-analyzer

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

cad-to-data

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

dwg-to-excel

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

cost-estimation-resource

No summary provided by upstream source.

Repository SourceNeeds Review