pandas-data-processing

Pandas Data Processing Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "pandas-data-processing" with this command: npx skills add vamseeachanta/workspace-hub/vamseeachanta-workspace-hub-pandas-data-processing

Pandas Data Processing Skill

Master Pandas for time series analysis, OrcaFlex results processing, and configuration-driven data workflows in marine and offshore engineering.

When to Use This Skill

Use Pandas data processing when you need:

  • Time series analysis - Wave elevation, vessel motions, mooring tensions

  • OrcaFlex results - Load simulation results, process RAOs, analyze dynamics

  • Multi-format data - CSV, Excel, HDF5, Parquet for large datasets

  • Statistical analysis - Summary statistics, rolling windows, resampling

  • Data transformation - Pivot, melt, merge, group operations

  • Engineering reports - Automated data extraction and summary generation

Avoid when:

  • Real-time streaming data (use Polars or streaming libraries)

  • Extremely large datasets (>100GB) - use Dask, Vaex, or PySpark

  • Pure numerical computation (use NumPy directly)

  • Graph/network data (use NetworkX)

Core Capabilities

  1. Time Series Analysis

Load and Process Time Series:

import pandas as pd import numpy as np from pathlib import Path

def load_orcaflex_time_series( csv_file: Path, time_column: str = 'Time', parse_dates: bool = True ) -> pd.DataFrame: """ Load OrcaFlex time series results from CSV.

Args:
    csv_file: Path to CSV file
    time_column: Name of time column
    parse_dates: Whether to parse time column as datetime

Returns:
    DataFrame with time as index
"""
# Load CSV
df = pd.read_csv(csv_file)

# Set time as index
if parse_dates:
    df[time_column] = pd.to_datetime(df[time_column], unit='s')

df.set_index(time_column, inplace=True)

return df

Usage

results = load_orcaflex_time_series( Path('data/processed/vessel_motions.csv') )

print(f"Time range: {results.index[0]} to {results.index[-1]}") print(f"Duration: {(results.index[-1] - results.index[0]).total_seconds()} seconds") print(f"Sampling rate: {1 / results.index.to_series().diff().mean().total_seconds():.2f} Hz")

Resampling and Aggregation:

def resample_time_series( df: pd.DataFrame, target_frequency: str = '1S', method: str = 'mean' ) -> pd.DataFrame: """ Resample time series to target frequency.

Args:
    df: Input DataFrame with datetime index
    target_frequency: Target frequency ('1S', '0.1S', '1min', etc.)
    method: Aggregation method ('mean', 'max', 'min', 'std')

Returns:
    Resampled DataFrame
"""
# Resample
if method == 'mean':
    resampled = df.resample(target_frequency).mean()
elif method == 'max':
    resampled = df.resample(target_frequency).max()
elif method == 'min':
    resampled = df.resample(target_frequency).min()
elif method == 'std':
    resampled = df.resample(target_frequency).std()
else:
    raise ValueError(f"Unknown method: {method}")

# Fill NaN values (forward fill)
resampled.fillna(method='ffill', inplace=True)

return resampled

Example: Downsample from 0.05s to 1s

high_freq_data = load_orcaflex_time_series( Path('data/processed/mooring_tension_0.05s.csv') )

low_freq_data = resample_time_series( high_freq_data, target_frequency='1S', method='mean' )

print(f"Original points: {len(high_freq_data)}") print(f"Resampled points: {len(low_freq_data)}")

Rolling Statistics:

def calculate_rolling_statistics( df: pd.DataFrame, column: str, window: str = '60S' ) -> pd.DataFrame: """ Calculate rolling statistics for time series.

Args:
    df: Input DataFrame with datetime index
    column: Column name to analyze
    window: Rolling window size (time-based)

Returns:
    DataFrame with rolling statistics
"""
stats = pd.DataFrame(index=df.index)

# Rolling calculations
rolling = df[column].rolling(window=window)

stats[f'{column}_mean'] = rolling.mean()
stats[f'{column}_std'] = rolling.std()
stats[f'{column}_max'] = rolling.max()
stats[f'{column}_min'] = rolling.min()

return stats

Example: 60-second rolling statistics

tension_stats = calculate_rolling_statistics( results, column='Tension_Line1', window='60S' )

Plot rolling mean and standard deviation

import plotly.graph_objects as go

fig = go.Figure() fig.add_trace(go.Scatter( x=results.index, y=results['Tension_Line1'], name='Raw Tension', opacity=0.3 )) fig.add_trace(go.Scatter( x=tension_stats.index, y=tension_stats['Tension_Line1_mean'], name='60s Rolling Mean', line=dict(width=3) ))

fig.update_layout( title='Mooring Tension: Raw vs Rolling Mean', xaxis_title='Time', yaxis_title='Tension (kN)' ) fig.write_html('reports/tension_rolling_mean.html')

  1. Statistical Analysis

Summary Statistics:

def generate_statistical_summary( df: pd.DataFrame, columns: list = None ) -> pd.DataFrame: """ Generate comprehensive statistical summary.

Args:
    df: Input DataFrame
    columns: Columns to analyze (None = all numeric)

Returns:
    DataFrame with statistical metrics
"""
if columns is None:
    columns = df.select_dtypes(include=[np.number]).columns.tolist()

# Standard statistics
summary = df[columns].describe()

# Additional statistics
additional_stats = pd.DataFrame({
    'median': df[columns].median(),
    'skewness': df[columns].skew(),
    'kurtosis': df[columns].kurtosis(),
    'variance': df[columns].var()
}).T

# Combine
full_summary = pd.concat([summary, additional_stats])

return full_summary

Example

motion_stats = generate_statistical_summary( results, columns=['Surge', 'Sway', 'Heave', 'Roll', 'Pitch', 'Yaw'] )

print(motion_stats)

Export to CSV

motion_stats.to_csv('reports/motion_statistics.csv')

Extreme Value Analysis:

def extract_extreme_values( df: pd.DataFrame, column: str, n_extremes: int = 10, extreme_type: str = 'max' ) -> pd.DataFrame: """ Extract extreme values (max or min) from time series.

Args:
    df: Input DataFrame with datetime index
    column: Column to analyze
    n_extremes: Number of extreme values to extract
    extreme_type: 'max' or 'min'

Returns:
    DataFrame with extreme events
"""
if extreme_type == 'max':
    extremes = df.nlargest(n_extremes, column)
elif extreme_type == 'min':
    extremes = df.nsmallest(n_extremes, column)
else:
    raise ValueError("extreme_type must be 'max' or 'min'")

# Sort by time
extremes = extremes.sort_index()

return extremes

Example: Top 10 maximum tensions

max_tensions = extract_extreme_values( results, column='Tension_Line1', n_extremes=10, extreme_type='max' )

print("Top 10 Maximum Tensions:") print(max_tensions[['Tension_Line1']])

  1. Data Transformation

Pivot Operations:

def pivot_mooring_data( df: pd.DataFrame, index: str = 'Time', columns: str = 'LineID', values: str = 'Tension' ) -> pd.DataFrame: """ Pivot long-format mooring data to wide format.

Args:
    df: Input DataFrame in long format
    index: Index column (usually time)
    columns: Column to pivot (usually line identifier)
    values: Value column (tension, angle, etc.)

Returns:
    Pivoted DataFrame
"""
pivoted = df.pivot(
    index=index,
    columns=columns,
    values=values
)

# Rename columns
pivoted.columns = [f'{values}_Line{col}' for col in pivoted.columns]

return pivoted

Example: Convert long format to wide format

Long format:

Time LineID Tension

0.0 1 1500

0.0 2 1520

0.1 1 1505

0.1 2 1525

long_format = pd.DataFrame({ 'Time': [0.0, 0.0, 0.1, 0.1, 0.2, 0.2], 'LineID': [1, 2, 1, 2, 1, 2], 'Tension': [1500, 1520, 1505, 1525, 1510, 1530] })

wide_format = pivot_mooring_data(long_format) print(wide_format)

Output:

Tension_Line1 Tension_Line2

Time

0.0 1500 1520

0.1 1505 1525

0.2 1510 1530

Melt Operations:

def melt_wide_format( df: pd.DataFrame, id_vars: list = None, value_name: str = 'Value', var_name: str = 'Parameter' ) -> pd.DataFrame: """ Convert wide-format data to long format.

Args:
    df: Input DataFrame in wide format
    id_vars: Identifier variables to preserve
    value_name: Name for value column
    var_name: Name for variable column

Returns:
    Melted DataFrame
"""
if id_vars is None:
    id_vars = [df.index.name or 'index']
    df_reset = df.reset_index()
else:
    df_reset = df

melted = pd.melt(
    df_reset,
    id_vars=id_vars,
    value_name=value_name,
    var_name=var_name
)

return melted

Example: Convert multi-column tensions to long format

wide_data = pd.DataFrame({ 'Time': [0.0, 0.1, 0.2], 'Tension_Line1': [1500, 1505, 1510], 'Tension_Line2': [1520, 1525, 1530], 'Tension_Line3': [1480, 1485, 1490] })

long_data = melt_wide_format( wide_data, id_vars=['Time'], value_name='Tension', var_name='Line' )

print(long_data)

Output:

Time Line Tension

0.0 Tension_Line1 1500

0.0 Tension_Line2 1520

0.0 Tension_Line3 1480

...

  1. Multi-File Processing

Batch CSV Loading:

def load_multiple_csv_files( directory: Path, pattern: str = '*.csv', concat_axis: int = 0 ) -> pd.DataFrame: """ Load and concatenate multiple CSV files.

Args:
    directory: Directory containing CSV files
    pattern: Glob pattern for file matching
    concat_axis: Concatenation axis (0=rows, 1=columns)

Returns:
    Concatenated DataFrame
"""
csv_files = sorted(directory.glob(pattern))

if not csv_files:
    raise FileNotFoundError(f"No CSV files found matching {pattern} in {directory}")

# Load all files
dfs = []
for csv_file in csv_files:
    df = pd.read_csv(csv_file)
    df['source_file'] = csv_file.name  # Track source
    dfs.append(df)

# Concatenate
combined = pd.concat(dfs, axis=concat_axis, ignore_index=True)

print(f"Loaded {len(csv_files)} files, total {len(combined)} rows")

return combined

Example: Load all mooring tension results

all_tensions = load_multiple_csv_files( Path('data/processed/mooring_tensions/'), pattern='tension_line*.csv' )

print(f"Combined dataset: {all_tensions.shape}")

Multi-Format Data Loading:

def load_engineering_data( file_path: Path, file_type: str = None ) -> pd.DataFrame: """ Load data from multiple engineering file formats.

Args:
    file_path: Path to data file
    file_type: File type ('csv', 'excel', 'hdf5', 'parquet', 'json')
               If None, inferred from extension

Returns:
    Loaded DataFrame
"""
if file_type is None:
    file_type = file_path.suffix.lstrip('.')

# Load based on type
if file_type == 'csv':
    df = pd.read_csv(file_path)
elif file_type in ['xls', 'xlsx', 'excel']:
    df = pd.read_excel(file_path)
elif file_type in ['h5', 'hdf5']:
    df = pd.read_hdf(file_path)
elif file_type == 'parquet':
    df = pd.read_parquet(file_path)
elif file_type == 'json':
    df = pd.read_json(file_path)
else:
    raise ValueError(f"Unsupported file type: {file_type}")

print(f"Loaded {file_type.upper()}: {df.shape[0]} rows, {df.shape[1]} columns")

return df

Usage examples

csv_data = load_engineering_data(Path('data/processed/results.csv')) excel_data = load_engineering_data(Path('data/processed/summary.xlsx')) hdf5_data = load_engineering_data(Path('data/processed/large_dataset.h5'))

  1. GroupBy Operations

Group and Aggregate:

def group_by_sea_state( df: pd.DataFrame, hs_column: str = 'Hs', tp_column: str = 'Tp', hs_bins: list = None, tp_bins: list = None ) -> pd.DataFrame: """ Group results by sea state (Hs, Tp bins).

Args:
    df: Input DataFrame with sea state parameters
    hs_column: Column name for significant wave height
    tp_column: Column name for peak period
    hs_bins: Bins for Hs [0, 2, 4, 6, 8, 10]
    tp_bins: Bins for Tp [0, 6, 8, 10, 12, 14]

Returns:
    Grouped statistics by sea state
"""
if hs_bins is None:
    hs_bins = [0, 2, 4, 6, 8, 10, 12]
if tp_bins is None:
    tp_bins = [0, 6, 8, 10, 12, 14, 16]

# Create bins
df['Hs_bin'] = pd.cut(df[hs_column], bins=hs_bins)
df['Tp_bin'] = pd.cut(df[tp_column], bins=tp_bins)

# Group and aggregate
grouped = df.groupby(['Hs_bin', 'Tp_bin']).agg({
    'Tension_Max': ['mean', 'std', 'max'],
    'Motion_Max': ['mean', 'std', 'max'],
    'Offset_Max': ['mean', 'std', 'max']
})

return grouped

Example

sea_state_results = pd.DataFrame({ 'Hs': [2.5, 3.0, 4.5, 5.0, 6.5, 7.0], 'Tp': [7.0, 8.5, 9.0, 10.5, 11.0, 12.5], 'Tension_Max': [1500, 1600, 1800, 2000, 2200, 2400], 'Motion_Max': [2.0, 2.5, 3.0, 3.5, 4.0, 4.5], 'Offset_Max': [50, 60, 70, 80, 90, 100] })

grouped_stats = group_by_sea_state(sea_state_results) print(grouped_stats)

Multi-Level Grouping:

def analyze_by_loadcase_and_direction( df: pd.DataFrame, group_columns: list = ['LoadCase', 'Direction'], value_columns: list = None ) -> pd.DataFrame: """ Analyze results grouped by load case and direction.

Args:
    df: Input DataFrame
    group_columns: Columns to group by
    value_columns: Columns to aggregate (None = all numeric)

Returns:
    Multi-level grouped statistics
"""
if value_columns is None:
    value_columns = df.select_dtypes(include=[np.number]).columns.tolist()

# Group and calculate statistics
grouped = df.groupby(group_columns)[value_columns].agg([
    'count', 'mean', 'std', 'min', 'max'
])

return grouped

Example

load_case_data = pd.DataFrame({ 'LoadCase': ['Operating', 'Operating', 'Storm', 'Storm', 'Extreme', 'Extreme'], 'Direction': [0, 45, 0, 45, 0, 45], 'Tension': [1500, 1520, 2000, 2050, 2500, 2600], 'Offset': [50, 55, 75, 80, 100, 110] })

stats_by_case = analyze_by_loadcase_and_direction(load_case_data) print(stats_by_case)

Complete Examples

Example 1: OrcaFlex Results Processing

import pandas as pd import numpy as np from pathlib import Path import plotly.graph_objects as go

def process_orcaflex_results( results_dir: Path, output_dir: Path ) -> dict: """ Complete OrcaFlex results processing pipeline.

Process time series results, calculate statistics,
generate reports, and create visualizations.

Args:
    results_dir: Directory with OrcaFlex CSV results
    output_dir: Directory for processed results

Returns:
    Dictionary with processing summary
"""
output_dir.mkdir(parents=True, exist_ok=True)

# 1. Load vessel motions
motions = pd.read_csv(results_dir / 'vessel_motions.csv')
motions['Time'] = pd.to_datetime(motions['Time'], unit='s')
motions.set_index('Time', inplace=True)

# 2. Load mooring tensions
tensions = pd.read_csv(results_dir / 'mooring_tensions.csv')
tensions['Time'] = pd.to_datetime(tensions['Time'], unit='s')
tensions.set_index('Time', inplace=True)

# 3. Calculate statistics
motion_stats = motions.describe()
tension_stats = tensions.describe()

# 4. Identify extreme events
max_heave = motions['Heave'].idxmax()
max_tension = tensions.max(axis=1).idxmax()

# 5. Create summary report
summary = {
    'motion_statistics': motion_stats,
    'tension_statistics': tension_stats,
    'max_heave_time': max_heave,
    'max_heave_value': motions.loc[max_heave, 'Heave'],
    'max_tension_time': max_tension,
    'max_tension_value': tensions.loc[max_tension].max(),
    'duration_seconds': (motions.index[-1] - motions.index[0]).total_seconds()
}

# 6. Export processed data
motion_stats.to_csv(output_dir / 'motion_statistics.csv')
tension_stats.to_csv(output_dir / 'tension_statistics.csv')

# 7. Create time series plot
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=motions.index,
    y=motions['Heave'],
    name='Heave',
    line=dict(color='blue')
))

fig.add_trace(go.Scatter(
    x=tensions.index,
    y=tensions['Line1_Tension'],
    name='Line 1 Tension',
    yaxis='y2',
    line=dict(color='red')
))

fig.update_layout(
    title='Vessel Motion and Mooring Tension',
    xaxis_title='Time',
    yaxis=dict(title='Heave (m)', side='left'),
    yaxis2=dict(title='Tension (kN)', side='right', overlaying='y'),
    hovermode='x unified'
)

fig.write_html(output_dir / 'time_series.html')

# 8. Create statistics table plot
fig_stats = go.Figure(data=[go.Table(
    header=dict(
        values=['Metric', 'Heave (m)', 'Line 1 Tension (kN)'],
        fill_color='paleturquoise',
        align='left'
    ),
    cells=dict(
        values=[
            ['Mean', 'Std Dev', 'Min', 'Max'],
            [
                f"{motion_stats.loc['mean', 'Heave']:.3f}",
                f"{motion_stats.loc['std', 'Heave']:.3f}",
                f"{motion_stats.loc['min', 'Heave']:.3f}",
                f"{motion_stats.loc['max', 'Heave']:.3f}"
            ],
            [
                f"{tension_stats.loc['mean', 'Line1_Tension']:.1f}",
                f"{tension_stats.loc['std', 'Line1_Tension']:.1f}",
                f"{tension_stats.loc['min', 'Line1_Tension']:.1f}",
                f"{tension_stats.loc['max', 'Line1_Tension']:.1f}"
            ]
        ],
        fill_color='lavender',
        align='left'
    )
)])

fig_stats.update_layout(title='Statistical Summary')
fig_stats.write_html(output_dir / 'statistics_table.html')

print(f"✓ Processed OrcaFlex results")
print(f"  Duration: {summary['duration_seconds']:.1f} seconds")
print(f"  Max heave: {summary['max_heave_value']:.2f} m at {summary['max_heave_time']}")
print(f"  Max tension: {summary['max_tension_value']:.1f} kN at {summary['max_tension_time']}")

return summary

Usage

results = process_orcaflex_results( results_dir=Path('data/processed/orcaflex_results'), output_dir=Path('reports/processed_results') )

Example 2: Wave Scatter Diagram Analysis

def process_wave_scatter_diagram( scatter_csv: Path, output_dir: Path ) -> pd.DataFrame: """ Process wave scatter diagram and calculate occurrence frequencies.

Args:
    scatter_csv: Path to wave scatter CSV
    output_dir: Output directory

Returns:
    Processed scatter diagram with frequencies
"""
output_dir.mkdir(parents=True, exist_ok=True)

# Load scatter diagram
scatter = pd.read_csv(scatter_csv)

# Create Hs and Tp bins
scatter['Hs_bin'] = pd.cut(
    scatter['Hs'],
    bins=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    labels=['0-1', '1-2', '2-3', '3-4', '4-5', '5-6', '6-7', '7-8', '8-9', '9-10']
)

scatter['Tp_bin'] = pd.cut(
    scatter['Tp'],
    bins=[0, 4, 6, 8, 10, 12, 14, 16],
    labels=['0-4', '4-6', '6-8', '8-10', '10-12', '12-14', '14-16']
)

# Calculate occurrence frequency
frequency = scatter.groupby(['Hs_bin', 'Tp_bin'])['Occurrence'].sum().reset_index()

# Pivot for heatmap
heatmap_data = frequency.pivot(
    index='Hs_bin',
    columns='Tp_bin',
    values='Occurrence'
).fillna(0)

# Calculate annual hours
heatmap_data_hours = heatmap_data * 8760  # Hours per year

# Export
heatmap_data_hours.to_csv(output_dir / 'wave_scatter_annual_hours.csv')

# Create heatmap
import plotly.graph_objects as go

fig = go.Figure(data=go.Heatmap(
    z=heatmap_data_hours.values,
    x=heatmap_data_hours.columns,
    y=heatmap_data_hours.index,
    colorscale='Blues',
    text=heatmap_data_hours.values,
    texttemplate='%{text:.1f}',
    colorbar=dict(title='Hours/Year')
))

fig.update_layout(
    title='Wave Scatter Diagram - Annual Occurrence',
    xaxis_title='Tp (s)',
    yaxis_title='Hs (m)'
)

fig.write_html(output_dir / 'wave_scatter_heatmap.html')

print(f"✓ Wave scatter diagram processed")
print(f"  Total annual hours: {heatmap_data_hours.values.sum():.1f}")
print(f"  Most common sea state: Hs={heatmap_data_hours.stack().idxmax()[0]}, Tp={heatmap_data_hours.stack().idxmax()[1]}")

return heatmap_data_hours

Usage

scatter_processed = process_wave_scatter_diagram( scatter_csv=Path('data/raw/wave_scatter.csv'), output_dir=Path('reports/wave_analysis') )

Example 3: Fatigue Damage Calculation

def calculate_fatigue_damage( stress_ranges: pd.DataFrame, sn_curve: dict, design_life_years: float = 25 ) -> pd.DataFrame: """ Calculate fatigue damage using stress range histogram.

Args:
    stress_ranges: DataFrame with stress range bins and counts
    sn_curve: S-N curve parameters {'m': 3.0, 'a': 1.52e12}
    design_life_years: Design life in years

Returns:
    DataFrame with fatigue damage per bin
"""
# S-N curve parameters
m = sn_curve['m']
a = sn_curve['a']

# Calculate damage per bin
stress_ranges['Cycles_to_failure'] = a / (stress_ranges['StressRange'] ** m)
stress_ranges['Damage'] = stress_ranges['Count'] / stress_ranges['Cycles_to_failure']

# Scale to design life
total_simulation_time_years = stress_ranges['SimulationTime_hours'].iloc[0] / 8760
scale_factor = design_life_years / total_simulation_time_years

stress_ranges['Damage_Scaled'] = stress_ranges['Damage'] * scale_factor

# Calculate cumulative damage
total_damage = stress_ranges['Damage_Scaled'].sum()
fatigue_life_years = design_life_years / total_damage if total_damage > 0 else np.inf

# Summary
summary = pd.DataFrame({
    'Metric': [
        'Total Damage',
        'Design Life (years)',
        'Predicted Fatigue Life (years)',
        'Utilization (%)'
    ],
    'Value': [
        total_damage,
        design_life_years,
        fatigue_life_years,
        (total_damage / 1.0) * 100  # Assuming damage limit = 1.0
    ]
})

print(summary)

return stress_ranges

Example usage

stress_data = pd.DataFrame({ 'StressRange': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100], # MPa 'Count': [1e6, 5e5, 2e5, 1e5, 5e4, 2e4, 1e4, 5e3, 2e3, 1e3], 'SimulationTime_hours': [3] * 10 # 3-hour simulation })

sn_params = { 'm': 3.0, # S-N curve slope 'a': 1.52e12 # S-N curve constant (DNV F3 curve) }

fatigue_results = calculate_fatigue_damage( stress_ranges=stress_data, sn_curve=sn_params, design_life_years=25 )

fatigue_results.to_csv('reports/fatigue_damage.csv', index=False)

Example 4: Multi-Source Data Merging

def merge_analysis_results( motion_file: Path, tension_file: Path, environmental_file: Path, output_file: Path ) -> pd.DataFrame: """ Merge results from multiple analysis sources.

Args:
    motion_file: Vessel motion CSV
    tension_file: Mooring tension CSV
    environmental_file: Environmental conditions CSV
    output_file: Output merged CSV

Returns:
    Merged DataFrame
"""
# Load individual files
motions = pd.read_csv(motion_file)
tensions = pd.read_csv(tension_file)
environment = pd.read_csv(environmental_file)

# Merge on time
merged = motions.merge(
    tensions,
    on='Time',
    how='inner',
    suffixes=('_motion', '_tension')
)

merged = merged.merge(
    environment,
    on='Time',
    how='inner'
)

# Calculate derived quantities
merged['Total_Motion'] = np.sqrt(
    merged['Surge']**2 + merged['Sway']**2 + merged['Heave']**2
)

merged['Max_Tension'] = merged[[
    col for col in merged.columns if 'Tension' in col
]].max(axis=1)

# Export
merged.to_csv(output_file, index=False)

print(f"✓ Merged {len(merged)} records")
print(f"  Columns: {len(merged.columns)}")
print(f"  Time range: {merged['Time'].min()} to {merged['Time'].max()}")

return merged

Usage

merged_results = merge_analysis_results( motion_file=Path('data/processed/vessel_motions.csv'), tension_file=Path('data/processed/mooring_tensions.csv'), environmental_file=Path('data/processed/environment.csv'), output_file=Path('data/processed/merged_results.csv') )

Example 5: Performance Benchmarking

def benchmark_data_processing_methods( data_size: int = 1_000_000 ) -> pd.DataFrame: """ Benchmark different Pandas operations for performance.

Args:
    data_size: Number of rows to test

Returns:
    Benchmark results
"""
import time

# Generate test data
df = pd.DataFrame({
    'Time': pd.date_range('2025-01-01', periods=data_size, freq='0.1S'),
    'Value1': np.random.randn(data_size),
    'Value2': np.random.randn(data_size),
    'Category': np.random.choice(['A', 'B', 'C'], data_size)
})

results = []

# Test 1: Iterrows (slow)
start = time.time()
total = 0
for idx, row in df.head(10000).iterrows():
    total += row['Value1'] + row['Value2']
results.append({
    'Method': 'iterrows (10k rows)',
    'Time (s)': time.time() - start,
    'Speed': 'Slow ❌'
})

# Test 2: Apply (medium)
start = time.time()
df['Sum_Apply'] = df[['Value1', 'Value2']].apply(lambda x: x.sum(), axis=1)
results.append({
    'Method': 'apply',
    'Time (s)': time.time() - start,
    'Speed': 'Medium ⚠️'
})

# Test 3: Vectorized (fast)
start = time.time()
df['Sum_Vectorized'] = df['Value1'] + df['Value2']
results.append({
    'Method': 'vectorized',
    'Time (s)': time.time() - start,
    'Speed': 'Fast ✅'
})

# Test 4: NumPy (fastest)
start = time.time()
df['Sum_NumPy'] = np.add(df['Value1'].values, df['Value2'].values)
results.append({
    'Method': 'numpy',
    'Time (s)': time.time() - start,
    'Speed': 'Fastest 🚀'
})

# Test 5: GroupBy aggregation
start = time.time()
grouped = df.groupby('Category')[['Value1', 'Value2']].mean()
results.append({
    'Method': 'groupby.mean',
    'Time (s)': time.time() - start,
    'Speed': 'Fast ✅'
})

benchmark_df = pd.DataFrame(results)
print(benchmark_df)

return benchmark_df

Run benchmark

benchmark_results = benchmark_data_processing_methods(data_size=1_000_000)

Best Practices

  1. Memory Efficiency

Use appropriate data types:

❌ Bad: Default float64

df = pd.DataFrame({'value': np.random.randn(1000000)}) print(f"Memory: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB")

✅ Good: Use float32 when precision allows

df_optimized = pd.DataFrame({'value': np.random.randn(1000000).astype(np.float32)}) print(f"Memory: {df_optimized.memory_usage(deep=True).sum() / 1e6:.1f} MB") # 50% reduction

✅ Use categorical for repeated strings

df['category'] = pd.Categorical(['A', 'B', 'C'] * 100000)

Chunking for large files:

def process_large_csv_in_chunks( csv_file: Path, chunksize: int = 100_000 ) -> pd.DataFrame: """Process large CSV in chunks to avoid memory issues.""" chunks = []

for chunk in pd.read_csv(csv_file, chunksize=chunksize):
    # Process each chunk
    chunk_processed = chunk[chunk['Value'] > 0]  # Example filter
    chunks.append(chunk_processed)

# Combine all chunks
result = pd.concat(chunks, ignore_index=True)

return result

2. Vectorization

Always prefer vectorized operations:

❌ Bad: Loop

df['result'] = 0 for i in range(len(df)): df.loc[i, 'result'] = df.loc[i, 'a'] + df.loc[i, 'b']

✅ Good: Vectorized

df['result'] = df['a'] + df['b']

✅ Better: NumPy for complex operations

df['result'] = np.where( df['a'] > 0, df['a'] + df['b'], df['a'] - df['b'] )

  1. Index Usage

Use index for time series:

✅ Set datetime index

df['Time'] = pd.to_datetime(df['Time']) df.set_index('Time', inplace=True)

Fast slicing

subset = df['2025-01-01':'2025-01-31']

Fast resampling

daily_mean = df.resample('D').mean()

  1. Data Validation

Validate data before processing:

def validate_engineering_data(df: pd.DataFrame) -> bool: """Validate engineering data integrity.""" # Check for missing values if df.isnull().any().any(): print("⚠ Warning: Missing values detected") print(df.isnull().sum())

# Check for duplicates
if df.duplicated().any():
    print("⚠ Warning: Duplicate rows detected")
    print(f"Duplicates: {df.duplicated().sum()}")

# Check data types
print("Data types:")
print(df.dtypes)

# Check value ranges
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
    if (df[col] < 0).any():
        print(f"⚠ Warning: Negative values in {col}")

return True

Resources

Use this skill for all time series analysis and data processing in DigitalModel!

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

echarts

No summary provided by upstream source.

Repository SourceNeeds Review
General

pandoc

No summary provided by upstream source.

Repository SourceNeeds Review
General

mkdocs

No summary provided by upstream source.

Repository SourceNeeds Review