etl-sync-job-builder

Build reliable, incremental data synchronization pipelines.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "etl-sync-job-builder" with this command: npx skills add patricio0312rev/skills/patricio0312rev-skills-etl-sync-job-builder

ETL/Sync Job Builder

Build reliable, incremental data synchronization pipelines.

ETL Job Pattern

// jobs/sync-users.ts interface SyncJob { name: string; source: "database" | "api" | "file"; destination: "database" | "warehouse" | "s3"; schedule: string; }

export class ETLJob { constructor(private name: string, private watermarkKey: string) {}

async run() { console.log(🔄 Starting ${this.name}...);

try {
  // 1. Get last watermark
  const lastSync = await this.getWatermark();
  console.log(`  Last sync: ${lastSync}`);

  // 2. Extract data
  const data = await this.extract(lastSync);
  console.log(`  Extracted ${data.length} records`);

  // 3. Transform data
  const transformed = await this.transform(data);

  // 4. Load data
  await this.load(transformed);

  // 5. Update watermark
  await this.updateWatermark(new Date());

  console.log(`✅ ${this.name} complete`);
} catch (error) {
  console.error(`❌ ${this.name} failed:`, error);
  throw error;
}

}

private async extract(since: Date) { // Extract logic return []; }

private async transform(data: any[]) { // Transform logic return data; }

private async load(data: any[]) { // Load logic }

private async getWatermark(): Promise<Date> { const watermark = await prisma.syncWatermark.findUnique({ where: { key: this.watermarkKey }, }); return watermark?.lastSync || new Date(0); }

private async updateWatermark(timestamp: Date) { await prisma.syncWatermark.upsert({ where: { key: this.watermarkKey }, create: { key: this.watermarkKey, lastSync: timestamp }, update: { lastSync: timestamp }, }); } }

Watermark Strategy

// Track sync progress model SyncWatermark { key String @id lastSync DateTime metadata Json?

@@index([lastSync]) }

// Incremental sync using watermark async function syncOrdersIncremental() { // Get last sync time const watermark = await prisma.syncWatermark.findUnique({ where: { key: "orders_sync" }, });

const lastSync = watermark?.lastSync || new Date(0);

// Fetch only new/updated records const newOrders = await sourceDb.order.findMany({ where: { updated_at: { gt: lastSync }, }, orderBy: { updated_at: "asc" }, });

console.log(📦 Syncing ${newOrders.length} orders...);

// Process in batches for (let i = 0; i < newOrders.length; i += 100) { const batch = newOrders.slice(i, i + 100);

await destinationDb.order.createMany({
  data: batch,
  skipDuplicates: true, // Idempotency
});

}

// Update watermark to latest record if (newOrders.length > 0) { const latestTimestamp = newOrders[newOrders.length - 1].updated_at;

await prisma.syncWatermark.upsert({
  where: { key: "orders_sync" },
  create: { key: "orders_sync", lastSync: latestTimestamp },
  update: { lastSync: latestTimestamp },
});

}

console.log(✅ Sync complete); }

Idempotent Upsert Pattern

// Idempotent sync - safe to run multiple times async function syncUsersIdempotent(users: User[]) { for (const user of users) { await prisma.user.upsert({ where: { id: user.id }, create: user, update: { email: user.email, name: user.name, updated_at: user.updated_at, }, }); } }

// Batch upsert for better performance async function syncUsersBatch(users: User[]) { // PostgreSQL: Use ON CONFLICT await prisma.$executeRaw INSERT INTO users (id, email, name, updated_at) SELECT * FROM UNNEST( ${users.map((u) => u.id)}::bigint[], ${users.map((u) => u.email)}::text[], ${users.map((u) => u.name)}::text[], ${users.map((u) => u.updated_at)}::timestamp[] ) ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email, name = EXCLUDED.name, updated_at = EXCLUDED.updated_at WHERE users.updated_at &#x3C; EXCLUDED.updated_at ; }

Retry Logic with Exponential Backoff

async function syncWithRetry<T>( operation: () => Promise<T>, maxRetries: number = 3, baseDelay: number = 1000 ): Promise<T> { for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await operation(); } catch (error) { if (attempt === maxRetries) throw error;

  const delay = baseDelay * Math.pow(2, attempt);
  console.log(`  Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
  await sleep(delay);
}

}

throw new Error("Max retries exceeded"); }

// Usage await syncWithRetry( async () => { return await syncOrders(); }, 3, 1000 );

Change Data Capture (CDC)

// Listen to database changes import { PrismaClient } from "@prisma/client";

const prisma = new PrismaClient();

// PostgreSQL: Listen to logical replication async function setupCDC() { await prisma.$executeRaw CREATE PUBLICATION orders_publication FOR TABLE orders; ;

// Subscribe to changes (using pg library) const client = await pg.connect();

client.query("LISTEN orders_changed;");

client.on("notification", async (msg) => { const change = JSON.parse(msg.payload);

if (change.operation === "INSERT" || change.operation === "UPDATE") {
  await syncOrder(change.data);
}

}); }

Conflict Resolution

interface ConflictResolution { strategy: "source-wins" | "dest-wins" | "latest-wins" | "merge"; }

async function syncWithConflictResolution( sourceRecord: any, destRecord: any, strategy: ConflictResolution["strategy"] ) { if (strategy === "source-wins") { return sourceRecord; }

if (strategy === "dest-wins") { return destRecord; }

if (strategy === "latest-wins") { return sourceRecord.updated_at > destRecord.updated_at ? sourceRecord : destRecord; }

if (strategy === "merge") { // Merge non-null fields return { ...destRecord, ...Object.fromEntries( Object.entries(sourceRecord).filter(([_, v]) => v != null) ), }; } }

Monitoring & Observability

// Track sync job metrics interface SyncMetrics { jobName: string; startTime: Date; endTime: Date; recordsProcessed: number; recordsInserted: number; recordsUpdated: number; recordsSkipped: number; errors: number; durationMs: number; }

async function logSyncMetrics(metrics: SyncMetrics) { await prisma.syncMetric.create({ data: metrics, });

console.log(📊 Sync Metrics Job: ${metrics.jobName} Records: ${metrics.recordsProcessed} Inserted: ${metrics.recordsInserted} Updated: ${metrics.recordsUpdated} Errors: ${metrics.errors} Duration: ${metrics.durationMs}ms ); }

Full ETL Job Example

// jobs/sync-orders-to-warehouse.ts export class OrdersETLJob extends ETLJob { constructor() { super("orders-etl", "orders_warehouse_sync"); }

async extract(since: Date): Promise<Order[]> { return prisma.order.findMany({ where: { updated_at: { gt: since }, }, include: { items: true, user: true, }, orderBy: { updated_at: "asc" }, }); }

async transform(orders: Order[]): Promise<WarehouseOrder[]> { return orders.map((order) => ({ order_id: order.id, user_email: order.user.email, total_amount: order.total, item_count: order.items.length, status: order.status, order_date: order.created_at, synced_at: new Date(), })); }

async load(data: WarehouseOrder[]): Promise<void> { const batchSize = 100;

for (let i = 0; i &#x3C; data.length; i += batchSize) {
  const batch = data.slice(i, i + batchSize);

  await warehouseDb.$executeRaw`
    INSERT INTO orders_fact (
      order_id, user_email, total_amount, item_count,
      status, order_date, synced_at
    )
    VALUES ${batch
      .map(
        (o) => `(
      ${o.order_id}, '${o.user_email}', ${o.total_amount},
      ${o.item_count}, '${o.status}', '${o.order_date}',
      '${o.synced_at}'
    )`
      )
      .join(",")}
    ON CONFLICT (order_id) DO UPDATE SET
      total_amount = EXCLUDED.total_amount,
      status = EXCLUDED.status,
      synced_at = EXCLUDED.synced_at
  `;
}

} }

// Run job new OrdersETLJob().run();

Scheduling

// Schedule ETL jobs import cron from "node-cron";

// Run every hour cron.schedule("0 * * * *", async () => { await new OrdersETLJob().run(); });

// Run every 15 minutes cron.schedule("*/15 * * * *", async () => { await syncUsersIncremental(); });

// Run nightly at 2 AM cron.schedule("0 2 * * *", async () => { await fullDataSync(); });

Error Handling & Recovery

async function syncWithErrorHandling() { const checkpoint = await getCheckpoint(); let processedRecords = 0;

try { const records = await fetchRecords(checkpoint);

for (const record of records) {
  try {
    await processRecord(record);
    processedRecords++;

    // Save checkpoint every 100 records
    if (processedRecords % 100 === 0) {
      await saveCheckpoint(record.id);
    }
  } catch (error) {
    // Log error but continue
    console.error(`Failed to process record ${record.id}:`, error);
    await logFailedRecord(record.id, error);
  }
}

await saveCheckpoint("completed");

} catch (error) { // Critical failure - job will retry from checkpoint console.error("Job failed:", error); throw error; } }

Best Practices

  • Incremental sync: Use watermarks, don't full-scan

  • Idempotent operations: Safe to retry

  • Batch processing: Process 100-1000 records at a time

  • Checkpointing: Resume from failure point

  • Retry with backoff: Handle transient failures

  • Monitor metrics: Track job health

  • Test thoroughly: Including failure scenarios

Output Checklist

  • ETL job class created

  • Watermark tracking implemented

  • Incremental sync logic

  • Idempotent upsert operations

  • Retry logic with backoff

  • Conflict resolution strategy

  • Monitoring and metrics

  • Error handling and recovery

  • Job scheduling configured

  • Testing including failure cases

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

framer-motion-animator

No summary provided by upstream source.

Repository SourceNeeds Review
General

eslint-prettier-config

No summary provided by upstream source.

Repository SourceNeeds Review
General

postman-collection-generator

No summary provided by upstream source.

Repository SourceNeeds Review
General

changelog-writer

No summary provided by upstream source.

Repository SourceNeeds Review