bsee-sodir-extraction

BSEE/SODIR Data Extraction Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "bsee-sodir-extraction" with this command: npx skills add vamseeachanta/workspace-hub/vamseeachanta-workspace-hub-bsee-sodir-extraction

BSEE/SODIR Data Extraction Skill

Master data extraction from the Bureau of Safety and Environmental Enforcement (BSEE) and Norwegian Offshore Directorate (SODIR) for comprehensive offshore energy analysis.

When to Use This Skill

Use BSEE/SODIR data extraction when you need:

  • Production data - Oil, gas, water production by field/well

  • Well information - Directional surveys, completions, drilling data

  • Field data - Reserves, operators, development status

  • HSE data - Safety incidents, environmental compliance

  • Economic analysis - NPV calculations using regulatory data

  • Regulatory compliance - Track permits, violations, inspections

Data sources covered:

  • BSEE (US Gulf of Mexico): Production, wells, platforms, safety

  • SODIR (Norway): Fields, production, wells, discoveries

  • NPD FactPages: Norwegian petroleum data (legacy)

Core Capabilities

  1. BSEE Data Extraction

Available datasets:

  • Production data (monthly oil/gas/water)

  • Well data (API numbers, directional surveys)

  • Platform/structure data

  • Operator information

  • Safety and incident data (OCS incidents)

  • Environmental compliance

Base URLs:

BSEE_BASE_URLS = { "production": "https://www.data.bsee.gov/Production/", "well": "https://www.data.bsee.gov/Well/", "platform": "https://www.data.bsee.gov/Platform/", "company": "https://www.data.bsee.gov/Company/", "field": "https://www.data.bsee.gov/Field/", "incidents": "https://www.data.bsee.gov/Incidents/", }

Production Data Extraction:

import pandas as pd import requests from pathlib import Path from datetime import datetime from typing import Optional

def fetch_bsee_production_data( year: int, output_dir: Path, area_code: Optional[str] = None ) -> pd.DataFrame: """ Fetch BSEE production data for a given year.

Args:
    year: Production year (e.g., 2024)
    output_dir: Directory to save downloaded data
    area_code: Optional area filter ('GC', 'MC', 'WR', etc.)

Returns:
    DataFrame with production data
"""
output_dir.mkdir(parents=True, exist_ok=True)

# BSEE provides production data as downloadable files
url = f"https://www.data.bsee.gov/Production/Files/ogoraan{year}.zip"

# Download file
response = requests.get(url, timeout=60)
response.raise_for_status()

zip_path = output_dir / f"production_{year}.zip"
with open(zip_path, "wb") as f:
    f.write(response.content)

# Extract and read
import zipfile
with zipfile.ZipFile(zip_path, "r") as z:
    z.extractall(output_dir)

# Read the extracted CSV
csv_files = list(output_dir.glob(f"*{year}*.csv"))
if not csv_files:
    raise FileNotFoundError(f"No CSV found for {year}")

df = pd.read_csv(csv_files[0])

# Filter by area if specified
if area_code:
    df = df[df["AREA_CODE"] == area_code]

# Clean column names
df.columns = df.columns.str.strip().str.upper()

# Add metadata
df["EXTRACTION_DATE"] = datetime.now().isoformat()
df["SOURCE"] = "BSEE"

print(f"Fetched {len(df)} production records for {year}")

return df

def aggregate_production_by_field( df: pd.DataFrame, time_period: str = "monthly" ) -> pd.DataFrame: """ Aggregate production data by field.

Args:
    df: Raw production DataFrame
    time_period: 'monthly', 'quarterly', or 'annual'

Returns:
    Aggregated production DataFrame
"""
# Group by field
group_cols = ["FIELD_NAME", "AREA_CODE", "BLOCK_NUMBER"]

if time_period == "monthly":
    group_cols.extend(["PRODUCTION_YEAR", "PRODUCTION_MONTH"])
elif time_period == "quarterly":
    df["QUARTER"] = ((df["PRODUCTION_MONTH"] - 1) // 3) + 1
    group_cols.extend(["PRODUCTION_YEAR", "QUARTER"])
else:  # annual
    group_cols.append("PRODUCTION_YEAR")

# Aggregate
agg_dict = {
    "OIL_BBL": "sum",
    "GAS_MCF": "sum",
    "WATER_BBL": "sum",
    "WELL_COUNT": "nunique" if "API_NUMBER" in df.columns else "count"
}

# Only aggregate columns that exist
agg_dict = {k: v for k, v in agg_dict.items() if k in df.columns}

aggregated = df.groupby(group_cols).agg(agg_dict).reset_index()

return aggregated

Example usage

production_2024 = fetch_bsee_production_data( year=2024, output_dir=Path("data/raw/bsee"), area_code="GC" # Green Canyon )

field_production = aggregate_production_by_field( production_2024, time_period="monthly" )

print(field_production.head())

Well Data Extraction:

def fetch_bsee_well_data( api_number: Optional[str] = None, field_name: Optional[str] = None, output_dir: Path = Path("data/raw/bsee") ) -> pd.DataFrame: """ Fetch BSEE well data.

Args:
    api_number: Specific API number (14-digit)
    field_name: Filter by field name
    output_dir: Output directory

Returns:
    DataFrame with well data
"""
output_dir.mkdir(parents=True, exist_ok=True)

# BSEE Well File download
url = "https://www.data.bsee.gov/Well/Files/Well.zip"

response = requests.get(url, timeout=120)
response.raise_for_status()

zip_path = output_dir / "well_data.zip"
with open(zip_path, "wb") as f:
    f.write(response.content)

import zipfile
with zipfile.ZipFile(zip_path, "r") as z:
    z.extractall(output_dir)

# Read well data
well_file = output_dir / "Well.csv"
df = pd.read_csv(well_file)

# Filter if specified
if api_number:
    df = df[df["API_WELL_NUMBER"] == api_number]
if field_name:
    df = df[df["FIELD_NAME"].str.contains(field_name, case=False, na=False)]

return df

def fetch_directional_surveys( api_number: str, output_dir: Path = Path("data/raw/bsee") ) -> pd.DataFrame: """ Fetch directional survey data for a well.

Args:
    api_number: 14-digit API number
    output_dir: Output directory

Returns:
    DataFrame with directional survey points
"""
# BSEE provides directional survey data
url = f"https://www.data.bsee.gov/Well/DirectionalSurvey/Files/DirectionalSurvey.zip"

response = requests.get(url, timeout=120)
response.raise_for_status()

zip_path = output_dir / "directional_surveys.zip"
with open(zip_path, "wb") as f:
    f.write(response.content)

import zipfile
with zipfile.ZipFile(zip_path, "r") as z:
    z.extractall(output_dir)

survey_file = output_dir / "DirectionalSurvey.csv"
df = pd.read_csv(survey_file)

# Filter by API
df = df[df["API_WELL_NUMBER"] == api_number]

# Sort by measured depth
df = df.sort_values("MEASURED_DEPTH")

return df

Example: Get well data for a deepwater field

gom_wells = fetch_bsee_well_data(field_name="THUNDER HORSE") print(f"Found {len(gom_wells)} wells in Thunder Horse field")

HSE Data Extraction:

def fetch_bsee_incident_data( start_year: int = 2020, end_year: int = 2024, output_dir: Path = Path("data/raw/bsee") ) -> pd.DataFrame: """ Fetch BSEE incident/accident data.

Args:
    start_year: Start year for data
    end_year: End year for data
    output_dir: Output directory

Returns:
    DataFrame with incident records
"""
output_dir.mkdir(parents=True, exist_ok=True)

# Fetch incident data
url = "https://www.data.bsee.gov/Incidents/Files/Accidents.zip"

response = requests.get(url, timeout=120)
response.raise_for_status()

zip_path = output_dir / "incidents.zip"
with open(zip_path, "wb") as f:
    f.write(response.content)

import zipfile
with zipfile.ZipFile(zip_path, "r") as z:
    z.extractall(output_dir)

incident_file = output_dir / "Accidents.csv"
df = pd.read_csv(incident_file)

# Convert date columns
df["INCIDENT_DATE"] = pd.to_datetime(df["INCIDENT_DATE"], errors="coerce")

# Filter by year range
df = df[
    (df["INCIDENT_DATE"].dt.year >= start_year) &
    (df["INCIDENT_DATE"].dt.year <= end_year)
]

return df

def calculate_operator_safety_score( incidents_df: pd.DataFrame, production_df: pd.DataFrame ) -> pd.DataFrame: """ Calculate safety score per operator based on incidents per production.

Args:
    incidents_df: Incident data
    production_df: Production data

Returns:
    DataFrame with operator safety metrics
"""
# Count incidents by operator
incident_counts = incidents_df.groupby("OPERATOR_NAME").agg({
    "INCIDENT_ID": "count",
    "FATALITY_COUNT": "sum",
    "INJURY_COUNT": "sum"
}).rename(columns={
    "INCIDENT_ID": "TOTAL_INCIDENTS",
    "FATALITY_COUNT": "TOTAL_FATALITIES",
    "INJURY_COUNT": "TOTAL_INJURIES"
})

# Sum production by operator
production_totals = production_df.groupby("OPERATOR_NAME").agg({
    "OIL_BBL": "sum",
    "GAS_MCF": "sum"
})

# Merge
safety_df = incident_counts.join(production_totals, how="outer").fillna(0)

# Calculate incidents per million BOE
safety_df["TOTAL_BOE"] = safety_df["OIL_BBL"] + safety_df["GAS_MCF"] / 6000
safety_df["INCIDENTS_PER_MM_BOE"] = (
    safety_df["TOTAL_INCIDENTS"] / safety_df["TOTAL_BOE"] * 1e6
)

# Risk score (lower is better)
safety_df["RISK_SCORE"] = (
    safety_df["INCIDENTS_PER_MM_BOE"] +
    safety_df["TOTAL_FATALITIES"] * 10 +
    safety_df["TOTAL_INJURIES"] * 2
)

return safety_df.sort_values("RISK_SCORE", ascending=False)

Example: Safety analysis

incidents = fetch_bsee_incident_data(start_year=2020, end_year=2024) production = fetch_bsee_production_data(year=2024, output_dir=Path("data/raw/bsee"))

safety_scores = calculate_operator_safety_score(incidents, production) print("Operator Safety Scores (higher = more risk):") print(safety_scores.head(10))

  1. SODIR/NPD Data Extraction (Norway)

Available datasets:

  • Field production (oil, gas, NGL, condensate)

  • Well data (exploration, development)

  • Discoveries and prospects

  • Company information

  • Pipeline and infrastructure

FactPages API:

import requests import pandas as pd from typing import Dict, List, Optional

class SODIRDataFetcher: """Fetch data from SODIR (Norwegian Offshore Directorate) FactPages."""

BASE_URL = "https://factpages.sodir.no/api/v1"

ENDPOINTS = {
    "fields": "/fields",
    "field_production": "/field-production-yearly",
    "wells": "/wells",
    "discoveries": "/discoveries",
    "companies": "/companies",
    "pipelines": "/pipelines",
    "facilities": "/facilities",
}

def __init__(self):
    self.session = requests.Session()
    self.session.headers.update({
        "Accept": "application/json",
        "User-Agent": "EnergyDataAnalysis/1.0"
    })

def _fetch(self, endpoint: str, params: Optional[Dict] = None) -> List[Dict]:
    """Fetch data from SODIR API."""
    url = f"{self.BASE_URL}{endpoint}"

    response = self.session.get(url, params=params, timeout=60)
    response.raise_for_status()

    return response.json()

def get_all_fields(self) -> pd.DataFrame:
    """Get all Norwegian offshore fields."""
    data = self._fetch(self.ENDPOINTS["fields"])
    df = pd.DataFrame(data)
    return df

def get_field_production(
    self,
    field_name: Optional[str] = None,
    start_year: Optional[int] = None,
    end_year: Optional[int] = None
) -> pd.DataFrame:
    """
    Get field production data.

    Args:
        field_name: Filter by field name
        start_year: Start year
        end_year: End year

    Returns:
        DataFrame with production data
    """
    data = self._fetch(self.ENDPOINTS["field_production"])
    df = pd.DataFrame(data)

    # Filter
    if field_name:
        df = df[df["fieldName"].str.contains(field_name, case=False, na=False)]
    if start_year:
        df = df[df["year"] >= start_year]
    if end_year:
        df = df[df["year"] <= end_year]

    return df

def get_wells(
    self,
    well_type: Optional[str] = None,
    status: Optional[str] = None
) -> pd.DataFrame:
    """
    Get well data.

    Args:
        well_type: 'exploration', 'development', or 'other'
        status: Well status filter

    Returns:
        DataFrame with well data
    """
    data = self._fetch(self.ENDPOINTS["wells"])
    df = pd.DataFrame(data)

    if well_type:
        df = df[df["wellType"].str.lower() == well_type.lower()]
    if status:
        df = df[df["status"].str.contains(status, case=False, na=False)]

    return df

def get_discoveries(self, status: Optional[str] = None) -> pd.DataFrame:
    """Get discoveries data."""
    data = self._fetch(self.ENDPOINTS["discoveries"])
    df = pd.DataFrame(data)

    if status:
        df = df[df["status"].str.contains(status, case=False, na=False)]

    return df

Example usage

sodir = SODIRDataFetcher()

Get all fields

fields = sodir.get_all_fields() print(f"Total Norwegian fields: {len(fields)}")

Get production for Johan Sverdrup

sverdrup_production = sodir.get_field_production( field_name="JOHAN SVERDRUP", start_year=2019 ) print(sverdrup_production)

Get recent exploration wells

exploration_wells = sodir.get_wells(well_type="exploration") print(f"Total exploration wells: {len(exploration_wells)}")

  1. Combined Analysis

Cross-Basin Comparison:

import pandas as pd import plotly.graph_objects as go from plotly.subplots import make_subplots from pathlib import Path

def compare_gom_norway_production( gom_data: pd.DataFrame, norway_data: pd.DataFrame, output_dir: Path = Path("reports") ) -> None: """ Create comparative analysis of GOM vs Norway production.

Args:
    gom_data: BSEE production data
    norway_data: SODIR production data
    output_dir: Report output directory
"""
output_dir.mkdir(parents=True, exist_ok=True)

# Aggregate by year
gom_annual = gom_data.groupby("PRODUCTION_YEAR").agg({
    "OIL_BBL": "sum",
    "GAS_MCF": "sum"
}).reset_index()
gom_annual["REGION"] = "Gulf of Mexico"
gom_annual["OIL_MM_BBL"] = gom_annual["OIL_BBL"] / 1e6
gom_annual["GAS_BCF"] = gom_annual["GAS_MCF"] / 1e6

norway_annual = norway_data.groupby("year").agg({
    "oilProduction": "sum",
    "gasProduction": "sum"
}).reset_index()
norway_annual.columns = ["PRODUCTION_YEAR", "OIL_MM_BBL", "GAS_BCF"]
norway_annual["REGION"] = "Norway"

# Create comparison chart
fig = make_subplots(
    rows=1, cols=2,
    subplot_titles=["Oil Production (MM BBL)", "Gas Production (BCF)"]
)

# Oil production
fig.add_trace(
    go.Bar(
        x=gom_annual["PRODUCTION_YEAR"],
        y=gom_annual["OIL_MM_BBL"],
        name="GOM Oil",
        marker_color="blue"
    ),
    row=1, col=1
)
fig.add_trace(
    go.Bar(
        x=norway_annual["PRODUCTION_YEAR"],
        y=norway_annual["OIL_MM_BBL"],
        name="Norway Oil",
        marker_color="red"
    ),
    row=1, col=1
)

# Gas production
fig.add_trace(
    go.Bar(
        x=gom_annual["PRODUCTION_YEAR"],
        y=gom_annual["GAS_BCF"],
        name="GOM Gas",
        marker_color="lightblue"
    ),
    row=1, col=2
)
fig.add_trace(
    go.Bar(
        x=norway_annual["PRODUCTION_YEAR"],
        y=norway_annual["GAS_BCF"],
        name="Norway Gas",
        marker_color="pink"
    ),
    row=1, col=2
)

fig.update_layout(
    title="Gulf of Mexico vs Norway: Offshore Production Comparison",
    barmode="group",
    height=500
)

fig.write_html(output_dir / "gom_norway_comparison.html")
print(f"Report saved to {output_dir / 'gom_norway_comparison.html'}")

4. NPV Analysis with Regulatory Data

import numpy as np import numpy_financial as npf import pandas as pd from dataclasses import dataclass from typing import List, Tuple

@dataclass class EconomicAssumptions: """Economic assumptions for NPV calculation.""" oil_price: float = 75.0 # $/bbl gas_price: float = 3.0 # $/mcf opex_per_boe: float = 15.0 # $/BOE capex_remaining: float = 0 # $ millions (for ongoing development) discount_rate: float = 0.10 # 10% royalty_rate: float = 0.125 # 12.5% federal royalty tax_rate: float = 0.21 # Corporate tax rate

def calculate_field_npv( production_df: pd.DataFrame, assumptions: EconomicAssumptions, forecast_years: int = 10 ) -> Tuple[float, pd.DataFrame]: """ Calculate NPV for a field based on BSEE production data.

Args:
    production_df: Historical production data
    assumptions: Economic assumptions
    forecast_years: Years to forecast

Returns:
    Tuple of (NPV, detailed cashflow DataFrame)
"""
# Get latest year's production as baseline
latest_year = production_df["PRODUCTION_YEAR"].max()
baseline = production_df[production_df["PRODUCTION_YEAR"] == latest_year]

annual_oil = baseline["OIL_BBL"].sum()
annual_gas = baseline["GAS_MCF"].sum()

# Simple decline curve (exponential decline)
decline_rate = 0.10  # 10% annual decline

cashflows = []

for year in range(1, forecast_years + 1):
    # Decline production
    oil_prod = annual_oil * ((1 - decline_rate) ** year)
    gas_prod = annual_gas * ((1 - decline_rate) ** year)

    # Revenue
    oil_revenue = oil_prod * assumptions.oil_price
    gas_revenue = gas_prod * assumptions.gas_price
    gross_revenue = oil_revenue + gas_revenue

    # Royalties
    royalties = gross_revenue * assumptions.royalty_rate
    net_revenue = gross_revenue - royalties

    # Operating costs
    boe_produced = oil_prod + gas_prod / 6000
    opex = boe_produced * assumptions.opex_per_boe

    # EBITDA
    ebitda = net_revenue - opex

    # CapEx (if any)
    capex = assumptions.capex_remaining / forecast_years if year <= 3 else 0

    # Pre-tax income
    pretax_income = ebitda - capex

    # Taxes
    taxes = max(0, pretax_income * assumptions.tax_rate)

    # Net cash flow
    ncf = pretax_income - taxes

    cashflows.append({
        "Year": year,
        "Oil_BBL": oil_prod,
        "Gas_MCF": gas_prod,
        "Gross_Revenue_MM": gross_revenue / 1e6,
        "Royalties_MM": royalties / 1e6,
        "OPEX_MM": opex / 1e6,
        "CAPEX_MM": capex / 1e6,
        "Pre_Tax_MM": pretax_income / 1e6,
        "Taxes_MM": taxes / 1e6,
        "NCF_MM": ncf / 1e6
    })

cashflow_df = pd.DataFrame(cashflows)

# Calculate NPV
ncf_series = [-assumptions.capex_remaining] + cashflow_df["NCF_MM"].tolist()
npv = npf.npv(assumptions.discount_rate, ncf_series)

return npv, cashflow_df

Example: Calculate NPV for a GOM field

production = fetch_bsee_production_data( year=2024, output_dir=Path("data/raw/bsee") )

Filter to specific field

thunder_horse = production[ production["FIELD_NAME"].str.contains("THUNDER HORSE", case=False, na=False) ]

assumptions = EconomicAssumptions( oil_price=75.0, gas_price=3.5, opex_per_boe=18.0, discount_rate=0.10 )

npv, cashflows = calculate_field_npv(thunder_horse, assumptions)

print(f"Thunder Horse NPV (10 year): ${npv:.1f} MM") print("\nCashflow Summary:") print(cashflows.to_string(index=False))

Complete Pipeline Example

""" Complete BSEE/SODIR data extraction and analysis pipeline. """ import pandas as pd from pathlib import Path from datetime import datetime import plotly.graph_objects as go

def run_extraction_pipeline( output_dir: Path = Path("data"), report_dir: Path = Path("reports") ) -> dict: """ Run complete data extraction and analysis pipeline.

Returns:
    Dictionary with extraction summary
"""
output_dir.mkdir(parents=True, exist_ok=True)
report_dir.mkdir(parents=True, exist_ok=True)

results = {
    "extraction_date": datetime.now().isoformat(),
    "datasets": {}
}

# 1. Extract BSEE Production Data
print("Fetching BSEE production data...")
try:
    bsee_production = fetch_bsee_production_data(
        year=2024,
        output_dir=output_dir / "raw" / "bsee"
    )
    bsee_production.to_csv(
        output_dir / "processed" / "bsee_production.csv",
        index=False
    )
    results["datasets"]["bsee_production"] = len(bsee_production)
except Exception as e:
    print(f"BSEE production error: {e}")
    results["datasets"]["bsee_production"] = "error"

# 2. Extract BSEE Well Data
print("Fetching BSEE well data...")
try:
    bsee_wells = fetch_bsee_well_data(
        output_dir=output_dir / "raw" / "bsee"
    )
    results["datasets"]["bsee_wells"] = len(bsee_wells)
except Exception as e:
    print(f"BSEE wells error: {e}")
    results["datasets"]["bsee_wells"] = "error"

# 3. Extract BSEE Incident Data
print("Fetching BSEE incident data...")
try:
    incidents = fetch_bsee_incident_data(start_year=2020, end_year=2024)
    incidents.to_csv(
        output_dir / "processed" / "bsee_incidents.csv",
        index=False
    )
    results["datasets"]["bsee_incidents"] = len(incidents)
except Exception as e:
    print(f"BSEE incidents error: {e}")
    results["datasets"]["bsee_incidents"] = "error"

# 4. Extract SODIR Data
print("Fetching SODIR data...")
try:
    sodir = SODIRDataFetcher()
    norway_fields = sodir.get_all_fields()
    norway_production = sodir.get_field_production(start_year=2020)

    norway_fields.to_csv(
        output_dir / "processed" / "sodir_fields.csv",
        index=False
    )
    norway_production.to_csv(
        output_dir / "processed" / "sodir_production.csv",
        index=False
    )

    results["datasets"]["sodir_fields"] = len(norway_fields)
    results["datasets"]["sodir_production"] = len(norway_production)
except Exception as e:
    print(f"SODIR error: {e}")
    results["datasets"]["sodir"] = "error"

# 5. Generate Summary Report
print("Generating summary report...")
generate_summary_report(results, report_dir)

return results

def generate_summary_report(results: dict, report_dir: Path) -> None: """Generate HTML summary report."""

html_content = f"""
<!DOCTYPE html>
<html>
<head>
    <title>Energy Data Extraction Report</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 40px; }}
        h1 {{ color: #2c3e50; }}
        table {{ border-collapse: collapse; width: 100%; }}
        th, td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
        th {{ background-color: #3498db; color: white; }}
        tr:nth-child(even) {{ background-color: #f2f2f2; }}
        .success {{ color: green; }}
        .error {{ color: red; }}
    </style>
</head>
<body>
    <h1>Energy Data Extraction Report</h1>
    <p><strong>Extraction Date:</strong> {results['extraction_date']}</p>

    <h2>Dataset Summary</h2>
    <table>
        <tr>
            <th>Dataset</th>
            <th>Records</th>
            <th>Status</th>
        </tr>
"""

for dataset, count in results["datasets"].items():
    status_class = "error" if count == "error" else "success"
    status_text = "Error" if count == "error" else "Success"
    html_content += f"""
        <tr>
            <td>{dataset}</td>
            <td>{count if count != 'error' else 'N/A'}</td>
            <td class="{status_class}">{status_text}</td>
        </tr>
    """

html_content += """
    </table>
</body>
</html>
"""

report_path = report_dir / "extraction_summary.html"
with open(report_path, "w") as f:
    f.write(html_content)

print(f"Summary report saved to {report_path}")

Run the pipeline

if name == "main": results = run_extraction_pipeline() print("\nExtraction Complete!") print(f"Datasets extracted: {len(results['datasets'])}")

Best Practices

  1. Rate Limiting

import time from functools import wraps

def rate_limit(calls_per_minute: int = 30): """Decorator to rate limit API calls.""" min_interval = 60.0 / calls_per_minute last_call = [0.0]

def decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        elapsed = time.time() - last_call[0]
        if elapsed < min_interval:
            time.sleep(min_interval - elapsed)
        last_call[0] = time.time()
        return func(*args, **kwargs)
    return wrapper
return decorator

@rate_limit(calls_per_minute=30) def fetch_with_rate_limit(url: str) -> requests.Response: return requests.get(url)

  1. Caching

from functools import lru_cache from datetime import datetime, timedelta

@lru_cache(maxsize=100) def cached_fetch(url: str, cache_hours: int = 24) -> pd.DataFrame: """Fetch with caching.""" cache_file = Path(f".cache/{hash(url)}.parquet")

if cache_file.exists():
    mtime = datetime.fromtimestamp(cache_file.stat().st_mtime)
    if datetime.now() - mtime < timedelta(hours=cache_hours):
        return pd.read_parquet(cache_file)

# Fetch fresh data
response = requests.get(url)
df = pd.DataFrame(response.json())

cache_file.parent.mkdir(exist_ok=True)
df.to_parquet(cache_file)

return df

3. Error Handling

import logging from tenacity import retry, stop_after_attempt, wait_exponential

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def robust_fetch(url: str) -> requests.Response: """Fetch with automatic retry on failure.""" try: response = requests.get(url, timeout=60) response.raise_for_status() return response except requests.exceptions.RequestException as e: logger.error(f"Fetch failed for {url}: {e}") raise

Resources

Use this skill for all energy regulatory data extraction in worldenergydata!

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

echarts

No summary provided by upstream source.

Repository SourceNeeds Review
General

pandoc

No summary provided by upstream source.

Repository SourceNeeds Review
General

mkdocs

No summary provided by upstream source.

Repository SourceNeeds Review
General

gis

No summary provided by upstream source.

Repository SourceNeeds Review