python-background-jobs

Python Background Jobs & Task Queues

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "python-background-jobs" with this command: npx skills add julianobarbosa/claude-code-skills/julianobarbosa-claude-code-skills-python-background-jobs

Python Background Jobs & Task Queues

Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.

When to Use This Skill

  • Processing tasks that take longer than a few seconds

  • Sending emails, notifications, or webhooks

  • Generating reports or exporting data

  • Processing uploads or media transformations

  • Integrating with unreliable external services

  • Building event-driven architectures

Core Concepts

  1. Task Queue Pattern

API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.

  1. Idempotency

Tasks may be retried on failure. Design for safe re-execution.

  1. Job State Machine

Jobs transition through states: pending → running → succeeded/failed.

  1. At-Least-Once Delivery

Most queues guarantee at-least-once delivery. Your code must handle duplicates.

Quick Start

This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

@app.task def send_email(to: str, subject: str, body: str) -> None: # This runs in a background worker email_client.send(to, subject, body)

In your API handler

send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")

Fundamental Patterns

Pattern 1: Return Job ID Immediately

For operations exceeding a few seconds, return a job ID and process asynchronously.

from uuid import uuid4 from dataclasses import dataclass from enum import Enum from datetime import datetime

class JobStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCEEDED = "succeeded" FAILED = "failed"

@dataclass class Job: id: str status: JobStatus created_at: datetime started_at: datetime | None = None completed_at: datetime | None = None result: dict | None = None error: str | None = None

API endpoint

async def start_export(request: ExportRequest) -> JobResponse: """Start export job and return job ID.""" job_id = str(uuid4())

# Persist job record
await jobs_repo.create(Job(
    id=job_id,
    status=JobStatus.PENDING,
    created_at=datetime.utcnow(),
))

# Enqueue task for background processing
await task_queue.enqueue(
    "export_data",
    job_id=job_id,
    params=request.model_dump(),
)

# Return immediately with job ID
return JobResponse(
    job_id=job_id,
    status="pending",
    poll_url=f"/jobs/{job_id}",
)

Pattern 2: Celery Task Configuration

Configure Celery tasks with proper retry and timeout settings.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

Global configuration

app.conf.update( task_time_limit=3600, # Hard limit: 1 hour task_soft_time_limit=3000, # Soft limit: 50 minutes task_acks_late=True, # Acknowledge after completion task_reject_on_worker_lost=True, worker_prefetch_multiplier=1, # Don't prefetch too many tasks )

@app.task( bind=True, max_retries=3, default_retry_delay=60, autoretry_for=(ConnectionError, TimeoutError), ) def process_payment(self, payment_id: str) -> dict: """Process payment with automatic retry on transient errors.""" try: result = payment_gateway.charge(payment_id) return {"status": "success", "transaction_id": result.id} except PaymentDeclinedError as e: # Don't retry permanent failures return {"status": "declined", "reason": str(e)} except TransientError as e: # Retry with exponential backoff raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)

Pattern 3: Make Tasks Idempotent

Workers may retry on crash or timeout. Design for safe re-execution.

@app.task(bind=True) def process_order(self, order_id: str) -> None: """Process order idempotently.""" order = orders_repo.get(order_id)

# Already processed? Return early
if order.status == OrderStatus.COMPLETED:
    logger.info("Order already processed", order_id=order_id)
    return

# Already in progress? Check if we should continue
if order.status == OrderStatus.PROCESSING:
    # Use idempotency key to avoid double-charging
    pass

# Process with idempotency key
result = payment_provider.charge(
    amount=order.total,
    idempotency_key=f"order-{order_id}",  # Critical!
)

orders_repo.update(order_id, status=OrderStatus.COMPLETED)

Idempotency Strategies:

  • Check-before-write: Verify state before action

  • Idempotency keys: Use unique tokens with external services

  • Upsert patterns: INSERT ... ON CONFLICT UPDATE

  • Deduplication window: Track processed IDs for N hours

Pattern 4: Job State Management

Persist job state transitions for visibility and debugging.

class JobRepository: """Repository for managing job state."""

async def create(self, job: Job) -> Job:
    """Create new job record."""
    await self._db.execute(
        """INSERT INTO jobs (id, status, created_at)
           VALUES ($1, $2, $3)""",
        job.id, job.status.value, job.created_at,
    )
    return job

async def update_status(
    self,
    job_id: str,
    status: JobStatus,
    **fields,
) -> None:
    """Update job status with timestamp."""
    updates = {"status": status.value, **fields}

    if status == JobStatus.RUNNING:
        updates["started_at"] = datetime.utcnow()
    elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
        updates["completed_at"] = datetime.utcnow()

    await self._db.execute(
        "UPDATE jobs SET status = $1, ... WHERE id = $2",
        updates, job_id,
    )

    logger.info(
        "Job status updated",
        job_id=job_id,
        status=status.value,
    )

Advanced Patterns

Pattern 5: Dead Letter Queue

Handle permanently failed tasks for manual inspection.

@app.task(bind=True, max_retries=3) def process_webhook(self, webhook_id: str, payload: dict) -> None: """Process webhook with DLQ for failures.""" try: result = send_webhook(payload) if not result.success: raise WebhookFailedError(result.error) except Exception as e: if self.request.retries >= self.max_retries: # Move to dead letter queue for manual inspection dead_letter_queue.send({ "task": "process_webhook", "webhook_id": webhook_id, "payload": payload, "error": str(e), "attempts": self.request.retries + 1, "failed_at": datetime.utcnow().isoformat(), }) logger.error( "Webhook moved to DLQ after max retries", webhook_id=webhook_id, error=str(e), ) return

    # Exponential backoff retry
    raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)

Pattern 6: Status Polling Endpoint

Provide an endpoint for clients to check job status.

from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/jobs/{job_id}") async def get_job_status(job_id: str) -> JobStatusResponse: """Get current status of a background job.""" job = await jobs_repo.get(job_id)

if job is None:
    raise HTTPException(404, f"Job {job_id} not found")

return JobStatusResponse(
    job_id=job.id,
    status=job.status.value,
    created_at=job.created_at,
    started_at=job.started_at,
    completed_at=job.completed_at,
    result=job.result if job.status == JobStatus.SUCCEEDED else None,
    error=job.error if job.status == JobStatus.FAILED else None,
    # Helpful for clients
    is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
)

Pattern 7: Task Chaining and Workflows

Compose complex workflows from simple tasks.

from celery import chain, group, chord

Simple chain: A → B → C

workflow = chain( extract_data.s(source_id), transform_data.s(), load_data.s(destination_id), )

Parallel execution: A, B, C all at once

parallel = group( send_email.s(user_email), send_sms.s(user_phone), update_analytics.s(event_data), )

Chord: Run tasks in parallel, then a callback

Process all items, then send completion notification

workflow = chord( [process_item.s(item_id) for item_id in item_ids], send_completion_notification.s(batch_id), )

workflow.apply_async()

Pattern 8: Alternative Task Queues

Choose the right tool for your needs.

RQ (Redis Queue): Simple, Redis-based

from rq import Queue from redis import Redis

queue = Queue(connection=Redis()) job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")

Dramatiq: Modern Celery alternative

import dramatiq from dramatiq.brokers.redis import RedisBroker

dramatiq.set_broker(RedisBroker())

@dramatiq.actor def send_email(to: str, subject: str, body: str) -> None: email_client.send(to, subject, body)

Cloud-native options:

  • AWS SQS + Lambda

  • Google Cloud Tasks

  • Azure Functions

Best Practices Summary

  • Return immediately - Don't block requests for long operations

  • Persist job state - Enable status polling and debugging

  • Make tasks idempotent - Safe to retry on any failure

  • Use idempotency keys - For external service calls

  • Set timeouts - Both soft and hard limits

  • Implement DLQ - Capture permanently failed tasks

  • Log transitions - Track job state changes

  • Retry appropriately - Exponential backoff for transient errors

  • Don't retry permanent failures - Validation errors, invalid credentials

  • Monitor queue depth - Alert on backlog growth

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

obsidian-vault-management

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

zabbix

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

neovim

No summary provided by upstream source.

Repository SourceNeeds Review