queue-job-processor

Build robust background job processing with BullMQ and Redis.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "queue-job-processor" with this command: npx skills add monkey1sai/openai-cli/monkey1sai-openai-cli-queue-job-processor

Queue Job Processor

Build robust background job processing with BullMQ and Redis.

Core Workflow

  • Setup Redis: Configure connection

  • Create queues: Define job queues

  • Implement workers: Process jobs

  • Add job types: Type-safe job definitions

  • Configure retries: Handle failures

  • Add monitoring: Dashboard and alerts

Installation

npm install bullmq ioredis npm install -D @types/ioredis

Redis Connection

// lib/redis.ts import IORedis from 'ioredis';

export const redis = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, // Required for BullMQ enableReadyCheck: false, });

export const redisSubscriber = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, enableReadyCheck: false, });

Queue Setup

Define Job Types

// jobs/types.ts export interface EmailJobData { to: string; subject: string; template: string; variables: Record<string, string>; }

export interface ImageProcessingJobData { imageId: string; userId: string; operations: Array<{ type: 'resize' | 'crop' | 'watermark'; params: Record<string, any>; }>; }

export interface ReportJobData { reportId: string; userId: string; type: 'daily' | 'weekly' | 'monthly'; dateRange: { start: string; end: string; }; }

export interface WebhookJobData { url: string; payload: Record<string, any>; headers?: Record<string, string>; retryCount?: number; }

export type JobData = | { type: 'email'; data: EmailJobData } | { type: 'image-processing'; data: ImageProcessingJobData } | { type: 'report'; data: ReportJobData } | { type: 'webhook'; data: WebhookJobData };

Create Queues

// queues/index.ts import { Queue, QueueOptions } from 'bullmq'; import { redis } from '../lib/redis'; import { EmailJobData, ImageProcessingJobData, ReportJobData, WebhookJobData, } from './types';

const defaultOptions: QueueOptions = { connection: redis, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000, }, removeOnComplete: { count: 1000, // Keep last 1000 completed jobs age: 24 * 3600, // Keep for 24 hours }, removeOnFail: { count: 5000, // Keep last 5000 failed jobs }, }, };

export const emailQueue = new Queue<EmailJobData>('email', defaultOptions);

export const imageQueue = new Queue<ImageProcessingJobData>('image-processing', { ...defaultOptions, defaultJobOptions: { ...defaultOptions.defaultJobOptions, attempts: 5, timeout: 5 * 60 * 1000, // 5 minutes }, });

export const reportQueue = new Queue<ReportJobData>('reports', { ...defaultOptions, defaultJobOptions: { ...defaultOptions.defaultJobOptions, timeout: 30 * 60 * 1000, // 30 minutes }, });

export const webhookQueue = new Queue<WebhookJobData>('webhooks', { ...defaultOptions, defaultJobOptions: { ...defaultOptions.defaultJobOptions, attempts: 5, backoff: { type: 'exponential', delay: 5000, }, }, });

Workers

Email Worker

// workers/email.worker.ts import { Worker, Job } from 'bullmq'; import { redis } from '../lib/redis'; import { EmailJobData } from '../jobs/types'; import { sendEmail } from '../lib/email';

const emailWorker = new Worker<EmailJobData>( 'email', async (job: Job<EmailJobData>) => { const { to, subject, template, variables } = job.data;

console.log(`Processing email job ${job.id} to ${to}`);

// Update progress
await job.updateProgress(10);

// Render template
const html = await renderTemplate(template, variables);
await job.updateProgress(50);

// Send email
const result = await sendEmail({
  to,
  subject,
  html,
});

await job.updateProgress(100);

return { messageId: result.messageId, sentAt: new Date() };

}, { connection: redis, concurrency: 10, // Process 10 emails at a time limiter: { max: 100, // Max 100 jobs duration: 60000, // Per minute }, } );

// Event handlers emailWorker.on('completed', (job, result) => { console.log(Email job ${job.id} completed:, result); });

emailWorker.on('failed', (job, error) => { console.error(Email job ${job?.id} failed:, error); });

emailWorker.on('progress', (job, progress) => { console.log(Email job ${job.id} progress: ${progress}%); });

export { emailWorker };

Image Processing Worker

// workers/image.worker.ts import { Worker, Job } from 'bullmq'; import { redis } from '../lib/redis'; import { ImageProcessingJobData } from '../jobs/types'; import sharp from 'sharp'; import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';

const s3 = new S3Client({ region: process.env.AWS_REGION });

const imageWorker = new Worker<ImageProcessingJobData>( 'image-processing', async (job: Job<ImageProcessingJobData>) => { const { imageId, userId, operations } = job.data;

console.log(`Processing image ${imageId} for user ${userId}`);

// Download original image
const originalBuffer = await downloadImage(imageId);
let image = sharp(originalBuffer);

// Apply operations
for (let i = 0; i &#x3C; operations.length; i++) {
  const op = operations[i];

  switch (op.type) {
    case 'resize':
      image = image.resize(op.params.width, op.params.height, {
        fit: op.params.fit || 'cover',
      });
      break;
    case 'crop':
      image = image.extract({
        left: op.params.left,
        top: op.params.top,
        width: op.params.width,
        height: op.params.height,
      });
      break;
    case 'watermark':
      image = image.composite([
        { input: op.params.watermarkPath, gravity: 'southeast' },
      ]);
      break;
  }

  await job.updateProgress(((i + 1) / operations.length) * 80);
}

// Convert and upload
const processedBuffer = await image.webp({ quality: 85 }).toBuffer();

const key = `processed/${userId}/${imageId}.webp`;
await s3.send(
  new PutObjectCommand({
    Bucket: process.env.S3_BUCKET,
    Key: key,
    Body: processedBuffer,
    ContentType: 'image/webp',
  })
);

await job.updateProgress(100);

return {
  url: `https://${process.env.S3_BUCKET}.s3.amazonaws.com/${key}`,
  size: processedBuffer.length,
};

}, { connection: redis, concurrency: 5, } );

imageWorker.on('failed', async (job, error) => { // Notify user of failure if (job) { await notifyUser(job.data.userId, { type: 'image-processing-failed', imageId: job.data.imageId, error: error.message, }); } });

export { imageWorker };

Webhook Worker with Retries

// workers/webhook.worker.ts import { Worker, Job } from 'bullmq'; import { redis } from '../lib/redis'; import { WebhookJobData } from '../jobs/types';

const webhookWorker = new Worker<WebhookJobData>( 'webhooks', async (job: Job<WebhookJobData>) => { const { url, payload, headers = {} } = job.data;

const response = await fetch(url, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'X-Webhook-Signature': generateSignature(payload),
    ...headers,
  },
  body: JSON.stringify(payload),
  signal: AbortSignal.timeout(30000), // 30s timeout
});

if (!response.ok) {
  // Retry for server errors
  if (response.status >= 500) {
    throw new Error(`Webhook failed: ${response.status}`);
  }
  // Don't retry for client errors
  return {
    success: false,
    status: response.status,
    message: 'Client error, not retrying',
  };
}

return {
  success: true,
  status: response.status,
};

}, { connection: redis, concurrency: 20, } );

export { webhookWorker };

Adding Jobs

Service Layer

// services/jobs.service.ts import { emailQueue, imageQueue, reportQueue, webhookQueue } from '../queues'; import { JobsOptions } from 'bullmq';

export class JobService { // Send email static async sendEmail(data: EmailJobData, options?: JobsOptions) { return emailQueue.add('send-email', data, { ...options, priority: data.template === 'password-reset' ? 1 : 10, }); }

// Bulk emails static async sendBulkEmails(emails: EmailJobData[]) { const jobs = emails.map((data, index) => ({ name: 'send-email', data, opts: { delay: index * 100, // Stagger by 100ms }, }));

return emailQueue.addBulk(jobs);

}

// Process image static async processImage(data: ImageProcessingJobData) { return imageQueue.add('process', data, { jobId: image-${data.imageId}, // Prevent duplicates }); }

// Schedule report static async scheduleReport(data: ReportJobData, runAt: Date) { return reportQueue.add('generate', data, { delay: runAt.getTime() - Date.now(), }); }

// Send webhook static async sendWebhook(data: WebhookJobData) { return webhookQueue.add('deliver', data); } }

API Usage

// app/api/users/route.ts import { JobService } from '@/services/jobs.service';

export async function POST(req: Request) { const data = await req.json();

// Create user const user = await db.user.create({ data });

// Queue welcome email await JobService.sendEmail({ to: user.email, subject: 'Welcome!', template: 'welcome', variables: { name: user.name }, });

return Response.json(user); }

Scheduled Jobs (Cron)

// schedulers/index.ts import { Queue, QueueScheduler } from 'bullmq'; import { redis } from '../lib/redis';

// Daily report scheduler export async function setupSchedulers() { // Clean up old jobs daily await reportQueue.add( 'cleanup', {}, { repeat: { pattern: '0 0 * * *', // Every day at midnight }, } );

// Hourly metrics aggregation await metricsQueue.add( 'aggregate', {}, { repeat: { pattern: '0 * * * *', // Every hour }, } );

// Weekly digest await emailQueue.add( 'weekly-digest', { template: 'weekly-digest' }, { repeat: { pattern: '0 9 * * 1', // Every Monday at 9 AM }, } ); }

Job Events & Monitoring

Event Listeners

// monitoring/events.ts import { QueueEvents } from 'bullmq'; import { redis } from '../lib/redis';

const emailQueueEvents = new QueueEvents('email', { connection: redis });

emailQueueEvents.on('completed', ({ jobId, returnvalue }) => { console.log(Job ${jobId} completed with:, returnvalue); metrics.increment('email.completed'); });

emailQueueEvents.on('failed', ({ jobId, failedReason }) => { console.error(Job ${jobId} failed:, failedReason); metrics.increment('email.failed');

// Alert on repeated failures alertOnFailure(jobId, failedReason); });

emailQueueEvents.on('delayed', ({ jobId, delay }) => { console.log(Job ${jobId} delayed by ${delay}ms); });

emailQueueEvents.on('progress', ({ jobId, data }) => { console.log(Job ${jobId} progress:, data); });

emailQueueEvents.on('stalled', ({ jobId }) => { console.warn(Job ${jobId} stalled); metrics.increment('email.stalled'); });

Bull Board Dashboard

// app/api/admin/queues/route.ts import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; import { ExpressAdapter } from '@bull-board/express'; import { emailQueue, imageQueue, reportQueue, webhookQueue } from '@/queues';

const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/api/admin/queues');

createBullBoard({ queues: [ new BullMQAdapter(emailQueue), new BullMQAdapter(imageQueue), new BullMQAdapter(reportQueue), new BullMQAdapter(webhookQueue), ], serverAdapter, });

export const GET = serverAdapter.getRouter(); export const POST = serverAdapter.getRouter();

Error Handling

// workers/base.worker.ts import { Worker, Job, UnrecoverableError } from 'bullmq';

// Custom error for non-retryable failures export class NonRetryableError extends UnrecoverableError { constructor(message: string) { super(message); this.name = 'NonRetryableError'; } }

// Worker with error handling const worker = new Worker( 'queue-name', async (job: Job) => { try { // Validate input if (!job.data.requiredField) { throw new NonRetryableError('Missing required field'); }

  // Process job
  return await processJob(job.data);
} catch (error) {
  if (error instanceof NonRetryableError) {
    throw error; // Won't retry
  }

  // Log and rethrow for retry
  console.error(`Job ${job.id} error:`, error);
  throw error;
}

}, { connection: redis, } );

// Handle worker errors worker.on('error', (error) => { console.error('Worker error:', error); });

Graceful Shutdown

// server.ts import { emailWorker, imageWorker, reportWorker } from './workers';

const workers = [emailWorker, imageWorker, reportWorker];

async function gracefulShutdown() { console.log('Shutting down workers...');

// Close workers gracefully await Promise.all( workers.map((worker) => worker.close().catch((err) => { console.error('Error closing worker:', err); }) ) );

// Close Redis connections await redis.quit(); await redisSubscriber.quit();

console.log('Workers shut down'); process.exit(0); }

process.on('SIGTERM', gracefulShutdown); process.on('SIGINT', gracefulShutdown);

Best Practices

  • Idempotent jobs: Jobs should be safe to retry

  • Unique job IDs: Prevent duplicate processing

  • Set timeouts: Prevent stuck jobs

  • Use progress updates: For long-running jobs

  • Handle failures gracefully: Alert and log

  • Clean up old jobs: Remove completed/failed jobs

  • Graceful shutdown: Wait for jobs to complete

  • Monitor queues: Use Bull Board or similar

Output Checklist

Every queue implementation should include:

  • Redis connection with proper config

  • Typed job data interfaces

  • Queue with default options

  • Worker with concurrency limits

  • Retry and backoff configuration

  • Event handlers for monitoring

  • Error handling (retryable vs non-retryable)

  • Graceful shutdown handling

  • Bull Board or monitoring dashboard

  • Scheduled/recurring jobs (if needed)

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

readme-generator

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

bruno-collection-generator

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

redis-patterns

No summary provided by upstream source.

Repository SourceNeeds Review