data-engineer

Expert in data pipelines, ETL processes, and data infrastructure

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "data-engineer" with this command: npx skills add daffy0208/ai-dev-standards/daffy0208-ai-dev-standards-data-engineer

Data Engineer Skill

I help you build robust data pipelines, ETL processes, and data infrastructure.

What I Do

Data Pipelines:

  • Extract, Transform, Load (ETL) processes
  • Data ingestion from multiple sources
  • Batch and real-time processing
  • Data quality validation

Data Infrastructure:

  • Database schema design
  • Data warehousing
  • Caching strategies
  • Data replication

Analytics:

  • Data aggregation
  • Metrics calculation
  • Report generation
  • Data export

ETL Patterns

Pattern 1: Simple ETL Pipeline

Use case: Daily sync from external API to database

// lib/etl/daily-sync.ts

interface RawCustomer {
  id: string
  full_name: string
  email_address: string
  signup_date: string
}

interface Customer {
  id: string
  name: string
  email: string
  signupDate: Date
}

export async function syncCustomers() {
  console.log('Starting customer sync...')

  // EXTRACT: Fetch data from external API
  const response = await fetch('https://api.example.com/customers', {
    headers: {
      Authorization: `Bearer ${process.env.API_KEY}`
    }
  })

  const rawCustomers: RawCustomer[] = await response.json()
  console.log(`Extracted ${rawCustomers.length} customers`)

  // TRANSFORM: Clean and normalize data
  const transformedCustomers: Customer[] = rawCustomers.map(raw => ({
    id: raw.id,
    name: raw.full_name.trim(),
    email: raw.email_address.toLowerCase(),
    signupDate: new Date(raw.signup_date)
  }))

  // LOAD: Insert into database
  let inserted = 0
  let updated = 0

  for (const customer of transformedCustomers) {
    const existing = await db.customers.findUnique({
      where: { id: customer.id }
    })

    if (existing) {
      await db.customers.update({
        where: { id: customer.id },
        data: customer
      })
      updated++
    } else {
      await db.customers.create({
        data: customer
      })
      inserted++
    }
  }

  console.log(`Sync complete: ${inserted} inserted, ${updated} updated`)

  return { inserted, updated, total: transformedCustomers.length }
}

Schedule with Vercel Cron:

// vercel.json
{
  "crons": [
    {
      "path": "/api/cron/sync-customers",
      "schedule": "0 2 * * *"
    }
  ]
}
// app/api/cron/sync-customers/route.ts
import { syncCustomers } from '@/lib/etl/daily-sync'

export async function GET(req: Request) {
  const authHeader = req.headers.get('authorization')
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return Response.json({ error: 'Unauthorized' }, { status: 401 })
  }

  try {
    const result = await syncCustomers()
    return Response.json(result)
  } catch (error) {
    console.error('Sync failed:', error)
    return Response.json({ error: 'Sync failed' }, { status: 500 })
  }
}

Pattern 2: Incremental ETL (Delta Sync)

Use case: Only process new/changed records

// lib/etl/incremental-sync.ts

export async function incrementalSync() {
  // Get last sync timestamp
  const lastSync = await db.syncLog.findFirst({
    where: { source: 'customers' },
    orderBy: { syncedAt: 'desc' }
  })

  const since = lastSync?.syncedAt || new Date('2020-01-01')

  // EXTRACT: Only fetch records modified since last sync
  const response = await fetch(
    `https://api.example.com/customers?modified_since=${since.toISOString()}`,
    {
      headers: { Authorization: `Bearer ${process.env.API_KEY}` }
    }
  )

  const newOrModified = await response.json()
  console.log(`Found ${newOrModified.length} new/modified records`)

  // TRANSFORM & LOAD
  for (const record of newOrModified) {
    await db.customers.upsert({
      where: { id: record.id },
      create: transformCustomer(record),
      update: transformCustomer(record)
    })
  }

  // Log sync
  await db.syncLog.create({
    data: {
      source: 'customers',
      recordsProcessed: newOrModified.length,
      syncedAt: new Date()
    }
  })

  return { processed: newOrModified.length }
}

Benefits:

  • Faster (only process changes)
  • Lower API costs
  • Reduced database load

Pattern 3: Real-Time Data Pipeline

Use case: Process events as they happen

// lib/pipelines/events-processor.ts

import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'myapp',
  brokers: [process.env.KAFKA_BROKER!]
})

const consumer = kafka.consumer({ groupId: 'analytics-group' })

export async function startEventProcessor() {
  await consumer.connect()
  await consumer.subscribe({ topic: 'user-events', fromBeginning: false })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value!.toString())

      // TRANSFORM: Enrich event data
      const enrichedEvent = {
        ...event,
        processedAt: new Date(),
        userId: event.user_id,
        eventType: event.type.toLowerCase()
      }

      // LOAD: Write to analytics database
      await analyticsDb.events.create({
        data: enrichedEvent
      })

      // Also update real-time metrics
      await updateRealtimeMetrics(enrichedEvent)
    }
  })
}

async function updateRealtimeMetrics(event: any) {
  if (event.eventType === 'purchase') {
    await redis.hincrby('metrics:today', 'purchases', 1)
    await redis.hincrbyfloat('metrics:today', 'revenue', event.amount)
  }
}

Data Transformation Patterns

Transformation 1: Data Cleaning

// lib/transformers/cleaners.ts

export function cleanEmail(email: string): string {
  return email.trim().toLowerCase()
}

export function cleanPhone(phone: string): string {
  // Remove all non-numeric characters
  return phone.replace(/\D/g, '')
}

export function cleanName(name: string): string {
  return name
    .trim()
    .replace(/\s+/g, ' ') // Multiple spaces → single space
    .split(' ')
    .map(word => word.charAt(0).toUpperCase() + word.slice(1).toLowerCase())
    .join(' ')
}

export function parseDate(dateStr: string): Date | null {
  try {
    const date = new Date(dateStr)
    return isNaN(date.getTime()) ? null : date
  } catch {
    return null
  }
}

Transformation 2: Data Enrichment

// lib/transformers/enrichers.ts

export async function enrichCustomer(customer: RawCustomer) {
  // Add geolocation data
  const geo = await geocode(customer.address)

  // Add lifecycle stage
  const daysSinceSignup = differenceInDays(new Date(), customer.signupDate)
  const lifecycleStage =
    daysSinceSignup < 7
      ? 'new'
      : daysSinceSignup < 30
        ? 'active'
        : daysSinceSignup < 90
          ? 'engaged'
          : 'dormant'

  // Add lifetime value
  const orders = await db.orders.findMany({
    where: { customerId: customer.id }
  })
  const lifetimeValue = orders.reduce((sum, order) => sum + order.total, 0)

  return {
    ...customer,
    latitude: geo.lat,
    longitude: geo.lng,
    lifecycleStage,
    lifetimeValue,
    totalOrders: orders.length
  }
}

Transformation 3: Data Aggregation

// lib/transformers/aggregators.ts

export async function aggregateDailySales() {
  const sales = await db.$queryRaw`
    SELECT
      DATE(created_at) as date,
      COUNT(*) as order_count,
      SUM(total) as total_revenue,
      AVG(total) as average_order_value,
      COUNT(DISTINCT user_id) as unique_customers
    FROM orders
    WHERE created_at >= NOW() - INTERVAL '30 days'
    GROUP BY DATE(created_at)
    ORDER BY date DESC
  `

  return sales
}

export async function aggregateByRegion() {
  const regions = await db.$queryRaw`
    SELECT
      country,
      COUNT(*) as customer_count,
      SUM(lifetime_value) as total_revenue
    FROM customers
    GROUP BY country
    ORDER BY total_revenue DESC
  `

  return regions
}

Data Validation

Schema Validation with Zod

// lib/validators/customer.ts
import { z } from 'zod'

export const customerSchema = z.object({
  id: z.string().uuid(),
  email: z.string().email(),
  name: z.string().min(1).max(100),
  age: z.number().int().min(0).max(150).optional(),
  signupDate: z.coerce.date(),
  tags: z.array(z.string()).default([])
})

export type ValidatedCustomer = z.infer<typeof customerSchema>

export function validateCustomer(data: unknown): ValidatedCustomer {
  return customerSchema.parse(data)
}

// In ETL pipeline
const rawData = await fetchFromAPI()
const validatedData = rawData
  .map(record => {
    try {
      return validateCustomer(record)
    } catch (error) {
      console.error(`Validation failed for record ${record.id}:`, error)
      return null
    }
  })
  .filter(Boolean)

Data Quality Checks

// lib/quality/checks.ts

export async function dataQualityChecks() {
  const checks = []

  // Check 1: No duplicate emails
  const duplicates = await db.$queryRaw`
    SELECT email, COUNT(*) as count
    FROM customers
    GROUP BY email
    HAVING COUNT(*) > 1
  `

  checks.push({
    name: 'No duplicate emails',
    passed: duplicates.length === 0,
    issues: duplicates
  })

  // Check 2: All customers have valid emails
  const invalidEmails = await db.customers.count({
    where: {
      email: {
        not: {
          contains: '@'
        }
      }
    }
  })

  checks.push({
    name: 'Valid email format',
    passed: invalidEmails === 0,
    issues: invalidEmails
  })

  // Check 3: No orphaned orders
  const orphanedOrders = await db.$queryRaw`
    SELECT COUNT(*) as count
    FROM orders
    WHERE user_id NOT IN (SELECT id FROM customers)
  `

  checks.push({
    name: 'No orphaned orders',
    passed: orphanedOrders[0].count === 0,
    issues: orphanedOrders[0].count
  })

  return checks
}

Data Warehousing

Star Schema Design

// prisma/schema.prisma

// Fact table (metrics/events)
model FactSales {
  id             String   @id @default(cuid())

  // Foreign keys to dimensions
  dateId         String
  customerId     String
  productId      String
  locationId     String

  // Metrics
  quantity       Int
  unitPrice      Decimal
  totalAmount    Decimal
  discountAmount Decimal
  netAmount      Decimal

  // Relations
  date           DimDate     @relation(fields: [dateId], references: [id])
  customer       DimCustomer @relation(fields: [customerId], references: [id])
  product        DimProduct  @relation(fields: [productId], references: [id])
  location       DimLocation @relation(fields: [locationId], references: [id])
}

// Dimension tables
model DimDate {
  id         String   @id
  date       DateTime
  year       Int
  quarter    Int
  month      Int
  dayOfWeek  Int
  isWeekend  Boolean
  isHoliday  Boolean

  sales      FactSales[]
}

model DimCustomer {
  id             String   @id
  name           String
  email          String
  segment        String  // 'enterprise', 'smb', 'consumer'
  lifecycleStage String

  sales          FactSales[]
}

model DimProduct {
  id       String   @id
  name     String
  category String
  brand    String
  sku      String

  sales    FactSales[]
}

model DimLocation {
  id      String   @id
  country String
  state   String
  city    String
  zipCode String

  sales   FactSales[]
}

Populate Data Warehouse

// lib/warehouse/populate.ts

export async function populateWarehouse() {
  // Extract from operational database
  const orders = await db.orders.findMany({
    include: {
      customer: true,
      items: {
        include: { product: true }
      }
    },
    where: {
      createdAt: {
        gte: new Date(Date.now() - 24 * 60 * 60 * 1000) // Last 24 hours
      }
    }
  })

  for (const order of orders) {
    // Transform into fact table format
    for (const item of order.items) {
      await warehouseDb.factSales.create({
        data: {
          dateId: formatDateId(order.createdAt),
          customerId: order.customer.id,
          productId: item.product.id,
          locationId: order.customer.locationId,
          quantity: item.quantity,
          unitPrice: item.price,
          totalAmount: item.quantity * item.price,
          discountAmount: item.discount || 0,
          netAmount: item.quantity * item.price - (item.discount || 0)
        }
      })
    }
  }
}

function formatDateId(date: Date): string {
  return date.toISOString().split('T')[0] // "2025-10-22"
}

Performance Optimization

Batch Processing

// lib/etl/batch-processor.ts

export async function processBatch<T>(
  items: T[],
  processor: (item: T) => Promise<void>,
  batchSize = 100
) {
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize)

    // Process batch in parallel
    await Promise.all(batch.map(processor))

    console.log(`Processed ${Math.min(i + batchSize, items.length)}/${items.length}`)

    // Small delay to avoid overwhelming database
    await new Promise(resolve => setTimeout(resolve, 100))
  }
}

// Usage
await processBatch(
  customers,
  async customer => {
    await db.customers.upsert({
      where: { id: customer.id },
      create: customer,
      update: customer
    })
  },
  100 // Process 100 at a time
)

Database Optimization

// lib/db/optimizations.ts

// Use raw SQL for complex aggregations
export async function efficientAggregation() {
  // Instead of multiple queries
  const result = await db.$queryRaw`
    SELECT
      c.segment,
      COUNT(DISTINCT c.id) as customer_count,
      COUNT(o.id) as order_count,
      SUM(o.total) as total_revenue,
      AVG(o.total) as avg_order_value
    FROM customers c
    LEFT JOIN orders o ON c.id = o.customer_id
    GROUP BY c.segment
  `

  return result
}

// Use indexes for faster queries
// In migration file:
await db.$executeRaw`
  CREATE INDEX idx_orders_created_at ON orders(created_at);
  CREATE INDEX idx_orders_customer_id ON orders(customer_id);
  CREATE INDEX idx_customers_email ON customers(email);
`

Caching Strategies

Redis for Aggregated Data

// lib/cache/metrics.ts
import Redis from 'ioredis'

const redis = new Redis(process.env.REDIS_URL!)

export async function getCachedMetrics(key: string) {
  const cached = await redis.get(key)

  if (cached) {
    return JSON.parse(cached)
  }

  // Calculate metrics
  const metrics = await calculateMetrics()

  // Cache for 1 hour
  await redis.setex(key, 3600, JSON.stringify(metrics))

  return metrics
}

async function calculateMetrics() {
  const [revenue, orders, customers] = await Promise.all([
    db.orders.aggregate({ _sum: { total: true } }),
    db.orders.count(),
    db.customers.count()
  ])

  return {
    totalRevenue: revenue._sum.total || 0,
    totalOrders: orders,
    totalCustomers: customers,
    avgOrderValue: (revenue._sum.total || 0) / orders
  }
}

Monitoring and Logging

Pipeline Monitoring

// lib/monitoring/pipeline.ts

interface PipelineRun {
  pipelineName: string
  startTime: Date
  endTime?: Date
  status: 'running' | 'success' | 'failed'
  recordsProcessed: number
  errorMessage?: string
}

export async function trackPipeline<T>(name: string, pipeline: () => Promise<T>): Promise<T> {
  const run: PipelineRun = {
    pipelineName: name,
    startTime: new Date(),
    status: 'running',
    recordsProcessed: 0
  }

  // Log start
  await db.pipelineRuns.create({ data: run })

  try {
    const result = await pipeline()

    // Log success
    run.endTime = new Date()
    run.status = 'success'
    await db.pipelineRuns.update({
      where: { id: run.id },
      data: run
    })

    return result
  } catch (error) {
    // Log failure
    run.endTime = new Date()
    run.status = 'failed'
    run.errorMessage = error.message

    await db.pipelineRuns.update({
      where: { id: run.id },
      data: run
    })

    // Alert team
    await sendAlert({
      channel: '#data-engineering',
      message: `Pipeline ${name} failed: ${error.message}`
    })

    throw error
  }
}

// Usage
await trackPipeline('daily-customer-sync', async () => {
  return await syncCustomers()
})

Common Patterns

Pattern: Upsert with Conflict Resolution

export async function upsertWithConflict(record: any) {
  const existing = await db.customers.findUnique({
    where: { id: record.id }
  })

  if (existing) {
    // Conflict resolution: Use most recent data
    if (record.updatedAt > existing.updatedAt) {
      await db.customers.update({
        where: { id: record.id },
        data: record
      })
      return { action: 'updated' }
    } else {
      return { action: 'skipped', reason: 'stale data' }
    }
  } else {
    await db.customers.create({ data: record })
    return { action: 'created' }
  }
}

Pattern: Dead Letter Queue

// lib/queue/dead-letter.ts

export async function processWithRetry<T>(
  item: T,
  processor: (item: T) => Promise<void>,
  maxRetries = 3
) {
  let attempt = 0

  while (attempt < maxRetries) {
    try {
      await processor(item)
      return { success: true }
    } catch (error) {
      attempt++
      console.error(`Attempt ${attempt} failed:`, error)

      if (attempt >= maxRetries) {
        // Move to dead letter queue
        await db.deadLetterQueue.create({
          data: {
            item: JSON.stringify(item),
            error: error.message,
            attempts: attempt,
            queuedAt: new Date()
          }
        })

        return { success: false, deadLettered: true }
      }

      // Exponential backoff
      await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(2, attempt)))
    }
  }
}

When to Use Me

Perfect for:

  • Building ETL pipelines
  • Data migration projects
  • Analytics infrastructure
  • Data quality improvement
  • Real-time data processing

I'll help you:

  • Design data pipelines
  • Transform and clean data
  • Build data warehouses
  • Optimize database queries
  • Monitor data quality

What I'll Create

🔄 ETL Pipelines
📊 Data Transformations
🏛️ Data Warehouses
✅ Data Quality Checks
⚡ Real-Time Processors
📈 Analytics Infrastructure

Let's build robust, scalable data infrastructure!

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

animation-designer

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

brand-designer

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

data-visualizer

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

3d-visualizer

No summary provided by upstream source.

Repository SourceNeeds Review