Batch Processing
30-40% throughput improvement by batching database operations with graceful fallback.
When to Use This Skill
-
Processing multiple related records (invoices, orders, events)
-
Network latency is significant (cloud databases)
-
Writes are independent (no inter-record dependencies)
-
You can implement fallback for reliability
Core Concepts
Sequential processing is slow because each item requires multiple DB round trips. The solution is to collect all data first, then execute batch operations:
Sequential (slow): Item 1 → DB → DB → DB Item 2 → DB → DB → DB Item 3 → DB → DB → DB
Batched (fast): Item 1 → collect Item 2 → collect Item 3 → collect All items → BATCH INSERT
Key insight: Sequential mapping (fuzzy matching needs context), but batched writes (independent operations).
Implementation
Python
from decimal import Decimal from typing import Dict, List import time
class BatchProcessor: """ Batch-optimized processor with fallback """
def process_batch(self, items: List[Dict], user_id: str) -> Dict:
start_time = time.perf_counter()
# Collectors for batch operations
transactions_to_create = []
inventory_updates = {}
failed_items = []
items_processed = 0
# Step 1: Process mappings sequentially (context-dependent)
for idx, item in enumerate(items, 1):
try:
# Business logic that needs context
mapping = self.find_or_create_mapping(item)
# Collect for batch insert
transactions_to_create.append({
"user_id": user_id,
"item_id": mapping['item_id'],
"quantity": float(item['quantity']),
"unit_cost": float(item['unit_price']),
})
# Aggregate inventory updates by item
item_id = mapping['item_id']
if item_id not in inventory_updates:
inventory_updates[item_id] = Decimal('0')
inventory_updates[item_id] += Decimal(str(item['quantity']))
items_processed += 1
except Exception as e:
failed_items.append({
"line": idx,
"error": str(e)
})
continue
# Step 2: BATCH INSERT transactions
if transactions_to_create:
try:
self.client.table("transactions").insert(
transactions_to_create
).execute()
except Exception as e:
# CRITICAL: Fallback to sequential on batch failure
return self._fallback_to_sequential(items, user_id)
# Step 3: BATCH UPDATE inventory (aggregate first)
if inventory_updates:
self._batch_update_inventory(inventory_updates)
return {
"status": "partial_success" if failed_items else "success",
"items_processed": items_processed,
"items_failed": len(failed_items),
"failed_items": failed_items or None,
"processing_time_seconds": round(time.perf_counter() - start_time, 2)
}
def _batch_update_inventory(self, updates: Dict[str, Decimal]):
"""Batch query, individual updates (Supabase limitation)"""
item_ids = list(updates.keys())
# Get current quantities in one query
current = self.client.table("inventory").select(
"id, quantity"
).in_("id", item_ids).execute()
# Apply updates
for item in current.data:
item_id = item['id']
new_qty = Decimal(str(item['quantity'])) + updates[item_id]
self.client.table("inventory").update({
"quantity": float(new_qty)
}).eq("id", item_id).execute()
def _fallback_to_sequential(self, items: List[Dict], user_id: str) -> Dict:
"""Fallback ensures data integrity when batch fails"""
logger.warning("Falling back to sequential processing")
# Process one at a time
for item in items:
self.process_single(item, user_id)
TypeScript
interface BatchResult { status: 'success' | 'partial_success' | 'failed'; itemsProcessed: number; itemsFailed: number; failedItems?: { line: number; error: string }[]; processingTimeMs: number; }
class BatchProcessor { async processBatch(items: Item[], userId: string): Promise<BatchResult> { const startTime = Date.now();
const transactionsToCreate: Transaction[] = [];
const inventoryUpdates = new Map<string, number>();
const failedItems: { line: number; error: string }[] = [];
let itemsProcessed = 0;
// Step 1: Process mappings sequentially
for (let idx = 0; idx < items.length; idx++) {
try {
const mapping = await this.findOrCreateMapping(items[idx]);
transactionsToCreate.push({
userId,
itemId: mapping.itemId,
quantity: items[idx].quantity,
unitCost: items[idx].unitPrice,
});
// Aggregate updates
const current = inventoryUpdates.get(mapping.itemId) || 0;
inventoryUpdates.set(mapping.itemId, current + items[idx].quantity);
itemsProcessed++;
} catch (error) {
failedItems.push({ line: idx + 1, error: error.message });
}
}
// Step 2: Batch insert
if (transactionsToCreate.length > 0) {
try {
await this.db.transactions.insertMany(transactionsToCreate);
} catch (error) {
return this.fallbackToSequential(items, userId);
}
}
// Step 3: Batch update inventory
await this.batchUpdateInventory(inventoryUpdates);
return {
status: failedItems.length > 0 ? 'partial_success' : 'success',
itemsProcessed,
itemsFailed: failedItems.length,
failedItems: failedItems.length > 0 ? failedItems : undefined,
processingTimeMs: Date.now() - startTime,
};
} }
Usage Examples
Invoice Processing
processor = BatchProcessor()
result = processor.process_batch( items=invoice_data['line_items'], user_id=user_id )
if result['status'] == 'partial_success': logger.warning(f"Some items failed: {result['failed_items']}")
Best Practices
-
Sequential mapping, batched writes - fuzzy matching needs context, writes don't
-
Always implement fallback - batch operations can fail, sequential is reliable
-
Aggregate before update - combine multiple updates to same record
-
Handle partial success - one bad item shouldn't fail the entire batch
-
Chunk large batches - 500 records max to avoid timeouts
Common Mistakes
-
Batching operations that depend on each other's results
-
No fallback when batch operations fail
-
Not aggregating updates to the same record
-
Collecting too many records before writing (memory pressure)
-
Not logging individual items when batch fails (lose context)
Related Patterns
-
checkpoint-resume - Resume processing after failures
-
idempotency - Prevent duplicate processing on retry
-
dead-letter-queue - Handle failed items