data-pipeline

Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-pipeline" with this command: npx skills add claude-office-skills/skills/claude-office-skills-skills-data-pipeline

Data Pipeline

Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.

Overview

This skill covers:

  • Data extraction from multiple sources
  • Transformation and cleaning
  • Loading to destinations
  • Scheduling and monitoring
  • Error handling and alerts

ETL Patterns

Basic ETL Flow

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   EXTRACT   │───▶│  TRANSFORM  │───▶│    LOAD     │
│             │    │             │    │             │
│ • APIs      │    │ • Clean     │    │ • Database  │
│ • Databases │    │ • Map       │    │ • Warehouse │
│ • Files     │    │ • Aggregate │    │ • Files     │
│ • Webhooks  │    │ • Enrich    │    │ • APIs      │
└─────────────┘    └─────────────┘    └─────────────┘

n8n ETL Workflow

workflow: "Daily Sales ETL"
schedule: "2am daily"

nodes:
  # EXTRACT
  - name: "Extract from Shopify"
    type: shopify
    action: get_orders
    filter: created_at >= yesterday
    
  - name: "Extract from Stripe"
    type: stripe
    action: get_payments
    filter: created >= yesterday
    
  # TRANSFORM
  - name: "Merge Data"
    type: merge
    mode: combine_by_key
    key: order_id
    
  - name: "Transform"
    type: code
    code: |
      return items.map(item => ({
        date: item.created_at.split('T')[0],
        order_id: item.id,
        customer_email: item.email,
        total: parseFloat(item.total_price),
        currency: item.currency,
        items: item.line_items.length,
        source: item.source_name,
        payment_status: item.payment.status
      }));
      
  # LOAD
  - name: "Load to BigQuery"
    type: google_bigquery
    action: insert_rows
    table: sales_daily
    
  - name: "Update Google Sheets"
    type: google_sheets
    action: append_rows
    spreadsheet: "Daily Sales Report"

Data Sources

Common Extractors

extractors:
  databases:
    - postgresql:
        connection: connection_string
        query: "SELECT * FROM orders WHERE date >= $1"
        
    - mysql:
        connection: connection_string
        query: custom_sql
        
    - mongodb:
        connection: connection_string
        collection: orders
        filter: {date: {$gte: yesterday}}
        
  apis:
    - rest_api:
        url: "https://api.example.com/data"
        method: GET
        headers: {Authorization: "Bearer {token}"}
        pagination: handle_automatically
        
    - graphql:
        url: "https://api.example.com/graphql"
        query: graphql_query
        
  files:
    - csv:
        source: sftp/s3/google_drive
        delimiter: ","
        encoding: utf-8
        
    - excel:
        source: file_path
        sheet: "Sheet1"
        
    - json:
        source: api/file
        path: "data.items"
        
  saas:
    - salesforce: get_objects
    - hubspot: get_contacts/deals
    - stripe: get_charges
    - shopify: get_orders

Transformations

Common Transformations

transformations:
  cleaning:
    - remove_nulls: drop_or_fill
    - trim_whitespace: all_string_fields
    - deduplicate: by_key
    - validate: against_schema
    
  mapping:
    - rename_fields: {old_name: new_name}
    - convert_types: {date_string: date}
    - map_values: {status_code: status_name}
    
  aggregation:
    - group_by: [date, category]
    - sum: [revenue, quantity]
    - count: orders
    - average: order_value
    
  enrichment:
    - lookup: from_reference_table
    - geocode: from_address
    - calculate: derived_fields
    
  filtering:
    - where: condition
    - limit: n_rows
    - sample: percentage

Code Transform Examples

// Clean and normalize data
function transform(items) {
  return items.map(item => ({
    // Clean strings
    name: item.name?.trim().toLowerCase(),
    
    // Parse dates
    date: new Date(item.created_at).toISOString().split('T')[0],
    
    // Convert types
    amount: parseFloat(item.amount) || 0,
    
    // Map values
    status: statusMap[item.status_code] || 'unknown',
    
    // Calculate fields
    total: item.quantity * item.unit_price,
    
    // Filter nested
    tags: item.tags?.filter(t => t.active).map(t => t.name),
    
    // Default values
    source: item.source || 'direct'
  }));
}

// Aggregate data
function aggregate(items) {
  const grouped = {};
  
  items.forEach(item => {
    const key = `${item.date}_${item.category}`;
    if (!grouped[key]) {
      grouped[key] = {
        date: item.date,
        category: item.category,
        total_revenue: 0,
        order_count: 0
      };
    }
    grouped[key].total_revenue += item.amount;
    grouped[key].order_count += 1;
  });
  
  return Object.values(grouped);
}

Data Destinations

Common Loaders

loaders:
  data_warehouses:
    - bigquery:
        project: project_id
        dataset: analytics
        table: sales
        write_mode: append/truncate
        
    - snowflake:
        account: account_id
        warehouse: compute_wh
        database: analytics
        schema: public
        
    - redshift:
        cluster: cluster_id
        database: analytics
        
  databases:
    - postgresql:
        upsert: on_conflict_update
        
    - mysql:
        batch_insert: 1000_rows
        
  files:
    - s3:
        bucket: data-lake
        path: /processed/{date}/
        format: parquet
        
    - google_cloud_storage:
        bucket: data-bucket
        
  spreadsheets:
    - google_sheets:
        mode: append/overwrite
        
    - airtable:
        base: base_id
        table: table_name
        
  apis:
    - webhook:
        url: destination_url
        batch_size: 100

Scheduling & Monitoring

Pipeline Scheduling

scheduling:
  patterns:
    hourly:
      cron: "0 * * * *"
      use_for: real_time_dashboards
      
    daily:
      cron: "0 2 * * *"
      use_for: daily_reports
      
    weekly:
      cron: "0 3 * * 1"
      use_for: weekly_summaries
      
    on_demand:
      trigger: webhook/manual
      use_for: ad_hoc_analysis
      
  dependencies:
    - pipeline_a: must_complete_before pipeline_b
    - wait_for: all_extracts_complete
    
  retries:
    max_attempts: 3
    delay: exponential_backoff
    alert_on: final_failure

Monitoring & Alerts

monitoring:
  metrics:
    - rows_processed
    - execution_time
    - error_count
    - data_freshness
    
  alerts:
    pipeline_failed:
      channels: [slack, pagerduty]
      template: |
        🚨 *Pipeline Failed*
        
        Pipeline: {pipeline_name}
        Stage: {failed_stage}
        Error: {error_message}
        
        [View Logs]({logs_url})
        
    data_quality:
      trigger: anomaly_detected
      conditions:
        - row_count: differs_by > 50%
        - null_rate: exceeds_threshold
        - schema: changed_unexpectedly
        
    stale_data:
      trigger: last_update > threshold
      threshold: 2_hours

Data Quality

Quality Checks

data_quality:
  schema_validation:
    - required_fields: [id, date, amount]
    - field_types:
        id: integer
        date: date
        amount: number
    - allowed_values:
        status: [active, pending, closed]
        
  statistical_checks:
    - null_rate: < 5%
    - duplicate_rate: < 1%
    - value_range:
        amount: [0, 1000000]
        
  business_rules:
    - total_equals_sum_of_line_items
    - dates_are_not_in_future
    - email_format_valid
    
  trend_analysis:
    - row_count: within_2_std_of_mean
    - total_value: within_expected_range

Output Example

Request: "Create a daily sales data pipeline"

Output:

# Daily Sales Data Pipeline

## Pipeline Overview

Shopify + Stripe → Transform → BigQuery + Sheets


## Schedule
- Runs: 2am daily
- Timezone: UTC
- Retry: 3 attempts

## Extract

### Shopify Orders
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]

Stripe Payments

source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]

Transform

// Join and clean data
{
  date: order.created_at.split('T')[0],
  order_id: order.id,
  customer: order.email,
  revenue: parseFloat(order.total_price),
  items: order.line_items.length,
  payment_status: payment.status
}

Load

BigQuery

  • Table: analytics.sales_daily
  • Mode: Append

Google Sheets

  • Sheet: "Daily Sales Dashboard"
  • Tab: "Raw Data"

Quality Checks

  • Row count > 0
  • No null order_ids
  • Revenue sum matches Stripe

Alerts

  • Slack: #data-alerts
  • On failure: @data-team

---

*Data Pipeline Skill - Part of Claude Office Skills*

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

excel-automation

No summary provided by upstream source.

Repository SourceNeeds Review
General

ppt-visual

No summary provided by upstream source.

Repository SourceNeeds Review
General

office-mcp

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

calendar-automation

No summary provided by upstream source.

Repository SourceNeeds Review