dead-letter-queue

Store failed jobs for replay and debugging.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "dead-letter-queue" with this command: npx skills add dadbodgeoff/drift/dadbodgeoff-drift-dead-letter-queue

Dead Letter Queue

Store failed jobs for replay and debugging.

When to Use This Skill

  • Jobs fail after max retries

  • Need visibility into failure patterns

  • Want to replay failed jobs manually

  • Can't afford to lose failed work

Core Concepts

  • Capture context - Store enough info to replay

  • Track attempts - Record all error messages

  • Enable replay - Allow manual re-processing

  • Enforce limits - Prevent unbounded growth

TypeScript Implementation

// dead-letter-queue.ts interface DeadLetterJob { id: string; workerName: string; payload: Record<string, unknown>; errorMessage: string; errorType: string; stackTrace?: string; attempts: number; attemptErrors: string[]; firstAttemptAt: Date; lastAttemptAt: Date; createdAt: Date; resolvedAt?: Date; resolution?: string; }

class DeadLetterQueue { private jobs = new Map<string, DeadLetterJob>(); private maxSize = 1000; private counter = 0;

add( workerName: string, payload: Record<string, unknown>, errorMessage: string, errorType: string, attempts: number, stackTrace?: string ): DeadLetterJob { const id = dlq_${++this.counter}_${Date.now()}; const now = new Date();

const job: DeadLetterJob = {
  id,
  workerName,
  payload,
  errorMessage,
  errorType,
  stackTrace,
  attempts,
  attemptErrors: [errorMessage],
  firstAttemptAt: now,
  lastAttemptAt: now,
  createdAt: now,
};

this.jobs.set(id, job);
this.enforceMaxSize();

console.log(`[DLQ] Added: ${id} (${workerName})`);
return job;

}

recordAttempt(jobId: string, errorMessage: string): boolean { const job = this.jobs.get(jobId); if (!job) return false;

job.attempts++;
job.lastAttemptAt = new Date();
job.attemptErrors.push(errorMessage);
job.errorMessage = errorMessage;
return true;

}

resolve(jobId: string, resolution: string): boolean { const job = this.jobs.get(jobId); if (!job) return false;

job.resolvedAt = new Date();
job.resolution = resolution;
return true;

}

discard(jobId: string): boolean { return this.jobs.delete(jobId); }

getUnresolved(): DeadLetterJob[] { return Array.from(this.jobs.values()) .filter(j => !j.resolvedAt) .sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime()); }

getReplayable(maxAttempts = 5): DeadLetterJob[] { return this.getUnresolved().filter(j => j.attempts < maxAttempts); }

getByWorker(workerName: string): DeadLetterJob[] { return this.getUnresolved().filter(j => j.workerName === workerName); }

getStats() { const jobs = Array.from(this.jobs.values()); const unresolved = jobs.filter(j => !j.resolvedAt);

const byWorker: Record&#x3C;string, number> = {};
const byErrorType: Record&#x3C;string, number> = {};

for (const job of unresolved) {
  byWorker[job.workerName] = (byWorker[job.workerName] || 0) + 1;
  byErrorType[job.errorType] = (byErrorType[job.errorType] || 0) + 1;
}

return { total: jobs.length, unresolved: unresolved.length, byWorker, byErrorType };

}

cleanupResolved(olderThanHours = 24): number { const cutoff = Date.now() - olderThanHours * 60 * 60 * 1000; let deleted = 0;

for (const [id, job] of this.jobs) {
  if (job.resolvedAt &#x26;&#x26; job.resolvedAt.getTime() &#x3C; cutoff) {
    this.jobs.delete(id);
    deleted++;
  }
}
return deleted;

}

private enforceMaxSize(): void { if (this.jobs.size <= this.maxSize) return;

// Remove oldest resolved first, then oldest unresolved
const sorted = Array.from(this.jobs.entries())
  .sort((a, b) => {
    if (a[1].resolvedAt &#x26;&#x26; !b[1].resolvedAt) return -1;
    return a[1].createdAt.getTime() - b[1].createdAt.getTime();
  });

while (sorted.length > this.maxSize) {
  const [id] = sorted.shift()!;
  this.jobs.delete(id);
}

} }

// Singleton let dlq: DeadLetterQueue | null = null; export function getDeadLetterQueue(): DeadLetterQueue { if (!dlq) dlq = new DeadLetterQueue(); return dlq; }

Python Implementation

dead_letter_queue.py

from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Optional, Any

@dataclass class DeadLetterJob: id: str worker_name: str payload: Dict[str, Any] error_message: str error_type: str attempts: int attempt_errors: List[str] first_attempt_at: datetime last_attempt_at: datetime created_at: datetime stack_trace: Optional[str] = None resolved_at: Optional[datetime] = None resolution: Optional[str] = None

class DeadLetterQueue: def init(self, max_size: int = 1000): self._jobs: Dict[str, DeadLetterJob] = {} self._max_size = max_size self._counter = 0

def add(
    self,
    worker_name: str,
    payload: Dict[str, Any],
    error_message: str,
    error_type: str,
    attempts: int,
    stack_trace: Optional[str] = None,
) -> DeadLetterJob:
    self._counter += 1
    job_id = f"dlq_{self._counter}_{int(datetime.now().timestamp())}"
    now = datetime.now()

    job = DeadLetterJob(
        id=job_id,
        worker_name=worker_name,
        payload=payload,
        error_message=error_message,
        error_type=error_type,
        stack_trace=stack_trace,
        attempts=attempts,
        attempt_errors=[error_message],
        first_attempt_at=now,
        last_attempt_at=now,
        created_at=now,
    )

    self._jobs[job_id] = job
    self._enforce_max_size()
    return job

def record_attempt(self, job_id: str, error_message: str) -> bool:
    job = self._jobs.get(job_id)
    if not job:
        return False

    job.attempts += 1
    job.last_attempt_at = datetime.now()
    job.attempt_errors.append(error_message)
    job.error_message = error_message
    return True

def resolve(self, job_id: str, resolution: str) -> bool:
    job = self._jobs.get(job_id)
    if not job:
        return False

    job.resolved_at = datetime.now()
    job.resolution = resolution
    return True

def get_unresolved(self) -> List[DeadLetterJob]:
    return sorted(
        [j for j in self._jobs.values() if not j.resolved_at],
        key=lambda j: j.created_at,
        reverse=True,
    )

def get_replayable(self, max_attempts: int = 5) -> List[DeadLetterJob]:
    return [j for j in self.get_unresolved() if j.attempts &#x3C; max_attempts]

def get_stats(self) -> Dict[str, Any]:
    unresolved = self.get_unresolved()
    by_worker: Dict[str, int] = {}
    by_error: Dict[str, int] = {}

    for job in unresolved:
        by_worker[job.worker_name] = by_worker.get(job.worker_name, 0) + 1
        by_error[job.error_type] = by_error.get(job.error_type, 0) + 1

    return {
        "total": len(self._jobs),
        "unresolved": len(unresolved),
        "by_worker": by_worker,
        "by_error_type": by_error,
    }

def _enforce_max_size(self):
    if len(self._jobs) &#x3C;= self._max_size:
        return

    # Sort: resolved first, then by age
    sorted_jobs = sorted(
        self._jobs.items(),
        key=lambda x: (x[1].resolved_at is None, x[1].created_at),
    )

    while len(sorted_jobs) > self._max_size:
        job_id, _ = sorted_jobs.pop(0)
        del self._jobs[job_id]

Singleton

_dlq: Optional[DeadLetterQueue] = None

def get_dead_letter_queue() -> DeadLetterQueue: global _dlq if _dlq is None: _dlq = DeadLetterQueue() return _dlq

Usage Examples

Worker Integration

const dlq = getDeadLetterQueue(); const MAX_RETRIES = 3;

async function processJob(job: Job) { try { await doWork(job.payload); } catch (error) { if (job.attempts >= MAX_RETRIES) { dlq.add( 'my-worker', job.payload, error.message, error.name, job.attempts, error.stack ); } else { throw error; // Let retry mechanism handle it } } }

Admin Replay

async function replayFailedJobs() { const dlq = getDeadLetterQueue(); const replayable = dlq.getReplayable();

for (const job of replayable) { try { await processJob({ payload: job.payload, attempts: 0 }); dlq.resolve(job.id, 'Replayed successfully'); } catch (e) { dlq.recordAttempt(job.id, e.message); } } }

Monitoring Endpoint

app.get('/admin/dlq/stats', (req, res) => { const dlq = getDeadLetterQueue(); res.json(dlq.getStats()); });

app.get('/admin/dlq/jobs', (req, res) => { const dlq = getDeadLetterQueue(); res.json(dlq.getUnresolved()); });

app.post('/admin/dlq/jobs/:id/replay', async (req, res) => { const dlq = getDeadLetterQueue(); const job = dlq.getUnresolved().find(j => j.id === req.params.id);

if (!job) { return res.status(404).json({ error: 'Job not found' }); }

// Replay logic... });

Best Practices

  • Store full context - Include everything needed to replay

  • Track all errors - Keep history of attempt failures

  • Enforce size limits - Prevent memory exhaustion

  • Expose stats - Monitor failure patterns

  • Cleanup resolved - Don't keep forever

Common Mistakes

  • Not storing enough context to replay

  • Unbounded queue growth

  • No visibility into failure patterns

  • Forgetting to cleanup old resolved jobs

  • Not tracking attempt history

Related Skills

  • Background Jobs

  • Error Handling

  • Graceful Shutdown

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

oauth-social-login

No summary provided by upstream source.

Repository SourceNeeds Review
General

sse-streaming

No summary provided by upstream source.

Repository SourceNeeds Review
General

multi-tenancy

No summary provided by upstream source.

Repository SourceNeeds Review
General

deduplication

No summary provided by upstream source.

Repository SourceNeeds Review