Task Patterns Skill
Production-ready Celery task templates with comprehensive error handling, retry mechanisms, rate limiting, and custom behavior patterns.
Overview
This skill provides battle-tested templates and patterns for building robust Celery tasks. Each template demonstrates production-ready code with proper error handling, logging, retry logic, and security best practices.
Core Patterns
- Basic Task Template
Template: templates/basic-task.py
Simple task with standard error handling and logging.
Use when:
-
Creating your first Celery task
-
Need straightforward task execution
-
No complex retry or limiting requirements
Features:
-
Proper logging setup
-
Basic error handling
-
Synchronous and asynchronous execution examples
-
Clear docstrings
- Retry Task Template
Template: templates/retry-task.py
Tasks with automatic retry mechanisms and exponential backoff.
Use when:
-
Calling external APIs that may fail transiently
-
Database operations that could timeout
-
Network operations prone to connection issues
Features:
-
Automatic retry with autoretry_for
-
Exponential backoff with jitter
-
Manual retry control
-
Fixed delay retries
-
Configurable max retries
Example: See examples/task-with-retries.md for complete implementation guide
- Rate Limited Task Template
Template: templates/rate-limited-task.py
Tasks with rate limiting to control execution speed.
Use when:
-
Respecting external API rate limits
-
Protecting database from overload
-
Controlling resource consumption
-
Meeting SLA requirements
Features:
-
Per-second, per-minute, per-hour limits
-
Batch processing with rate control
-
Large dataset handling
-
Dynamic rate limit adjustment
Example: See examples/rate-limiting.md for complete guide
- Time Limited Task Template
Template: templates/time-limited-task.py
Tasks with soft and hard time limits to prevent runaway execution.
Use when:
-
Long-running operations that could hang
-
Operations with external dependencies
-
Tasks that must complete within timeframe
-
Preventing resource exhaustion
Features:
-
Soft time limit (catchable)
-
Hard time limit (force kill)
-
Graceful timeout handling
-
Progress saving on timeout
-
Combined with retry logic
- Custom Task Class Template
Template: templates/custom-task-class.py
Custom task base classes with specialized behavior.
Use when:
-
Need database connection pooling
-
Want to cache task results
-
Require metrics tracking
-
Need shared resources across tasks
-
Implementing lifecycle hooks
Features:
-
Database connection pooling
-
Automatic result caching
-
Metrics and monitoring
-
Resource pool management
-
Lifecycle hooks (before_start, on_success, on_failure, on_retry, after_return)
Example: See examples/custom-task-classes.md for comprehensive guide
- Pydantic Validation Template
Template: templates/pydantic-validation.py
Type-safe tasks with Pydantic model validation.
Use when:
-
Need strict input validation
-
Want type safety
-
Complex nested data structures
-
API contract enforcement
Features:
-
Pydantic models for input validation
-
Enum types for constrained values
-
Custom validators
-
Standardized result format
-
Comprehensive error messages
- Database Task Template
Template: templates/database-task.py
Best practices for database operations in tasks.
Use when:
-
Performing database queries
-
Bulk database operations
-
Transactional operations
-
Database migrations
Features:
-
Connection pooling
-
Parameterized queries (SQL injection prevention)
-
Transaction management
-
Bulk insert operations
-
Pagination support
-
Aggregation queries
- API Task Template
Template: templates/api-task.py
Best practices for external API calls in tasks.
Use when:
-
Calling external APIs
-
Webhook delivery
-
Paginated API fetching
-
Batch API calls
-
API authentication
Features:
-
Automatic retry on connection errors
-
Rate limiting for API quotas
-
Timeout handling
-
Authentication patterns (Bearer, API key)
-
Pagination support
-
Batch processing
Scripts
generate-task.sh
Usage: ./scripts/generate-task.sh <template_name> <output_file> [task_name]
Generate new Celery task from template with customizations.
Available templates:
-
basic-task
-
retry-task
-
rate-limited-task
-
time-limited-task
-
custom-task-class
-
pydantic-validation
-
database-task
-
api-task
Example:
./scripts/generate-task.sh retry-task tasks/my_api_call.py fetch_data
test-task.sh
Usage: ./scripts/test-task.sh <task_file.py> [task_name]
Test Celery task by:
-
Validating Python syntax
-
Checking imports
-
Verifying task structure
-
Security scanning
-
Best practices check
-
Optionally running task
Example:
./scripts/test-task.sh templates/retry-task.py fetch_api_data
validate-task.sh
Usage: ./scripts/validate-task.sh <task_file.py>
Comprehensive validation including:
-
Required elements (Celery import, app initialization, decorators)
-
Best practices (docstrings, type hints, error handling, logging)
-
Retry configuration
-
Security checks (no hardcoded credentials, SQL injection)
-
Performance checks (rate limits, time limits)
-
Code quality (examples, configuration)
-
Pattern detection
Example:
./scripts/validate-task.sh templates/api-task.py
Usage Examples
Example 1: Create API Task with Retries
Use retry-task.py template
from templates.retry_task import fetch_api_data
Queue task
result = fetch_api_data.delay('https://api.example.com/data')
Get result
data = result.get(timeout=60) print(data)
Example 2: Rate-Limited Batch Processing
Use rate-limited-task.py template
from templates.rate_limited_task import api_call_rate_limited
Process 100 items at 10/minute rate
for i in range(100): api_call_rate_limited.delay(f'/endpoint/{i}')
Celery automatically enforces rate limit
Example 3: Database Operations with Connection Pooling
Use database-task.py template
from templates.database_task import insert_record, bulk_insert
Single insert
result1 = insert_record.delay('users', { 'name': 'John Doe', 'email': 'john@example.com' })
Bulk insert
records = [ {'name': 'Jane', 'email': 'jane@example.com'}, {'name': 'Bob', 'email': 'bob@example.com'}, ] result2 = bulk_insert.delay('users', records)
Example 4: Custom Task with Metrics
Use custom-task-class.py template
from templates.custom_task_class import monitored_task
Automatic metrics tracking
result = monitored_task.delay(123)
Metrics logged automatically:
- Start time
- Duration
- Success/failure
- Arguments
Example 5: Type-Safe Task with Pydantic
Use pydantic-validation.py template
from templates.pydantic_validation import process_user
Valid data
user_data = { 'user_id': 123, 'email': 'user@example.com', 'username': 'john_doe', 'age': 30, 'tags': ['premium'] } result = process_user.delay(user_data)
Invalid data returns structured error
invalid_data = { 'user_id': -1, # Invalid 'email': 'not-email', # Invalid } result = process_user.delay(invalid_data)
Returns: {'status': 'error', 'errors': [...]}
Best Practices
- Always Use Placeholders for Credentials
✅ CORRECT
api_key = os.getenv('API_KEY', 'your_api_key_here')
❌ WRONG
api_key = 'sk-abc123xyz456'
- Set Appropriate Timeouts
✅ CORRECT
response = requests.get(url, timeout=30)
❌ WRONG
response = requests.get(url) # Can hang forever
- Use Parameterized Queries
✅ CORRECT
db.execute("SELECT * FROM users WHERE id = %s", (user_id,))
❌ WRONG
db.execute(f"SELECT * FROM users WHERE id = {user_id}")
- Combine Retry with Rate Limiting
@app.task( bind=True, autoretry_for=(RequestException,), retry_backoff=True, rate_limit='10/m' ) def robust_api_call(self, url: str): """Retries on failure, respects rate limits.""" pass
- Log All Important Events
@app.task(bind=True) def well_logged_task(self, item_id: int): logger.info(f"Starting task for item {item_id}") try: result = process(item_id) logger.info(f"Successfully processed {item_id}") return result except Exception as exc: logger.error(f"Failed to process {item_id}: {exc}") raise
Security Compliance
This skill follows strict security rules:
-
All templates use placeholders for credentials
-
No real API keys, passwords, or secrets
-
Environment variable references in all code
-
Parameterized queries to prevent SQL injection
-
Security validation in test scripts
Pattern Selection Guide
Choose basic-task.py when:
-
Simple, straightforward operations
-
No external dependencies
-
Low failure risk
Choose retry-task.py when:
-
External API calls
-
Network operations
-
Transient failures expected
Choose rate-limited-task.py when:
-
API rate limits exist
-
Need to control execution speed
-
Protecting resources from overload
Choose time-limited-task.py when:
-
Long-running operations
-
Risk of hanging
-
Need guaranteed completion time
Choose custom-task-class.py when:
-
Need resource pooling
-
Want lifecycle hooks
-
Shared logic across tasks
-
Metrics tracking required
Choose pydantic-validation.py when:
-
Complex input validation needed
-
Type safety important
-
API contract enforcement
-
Clear error messages required
Choose database-task.py when:
-
Database operations
-
Need connection pooling
-
Transactional operations
-
Bulk processing
Choose api-task.py when:
-
External API integration
-
Webhook delivery
-
Authentication required
-
Pagination needed
Testing Tasks
Test Individual Task
Validate task structure
./scripts/validate-task.sh templates/retry-task.py
Test task execution
./scripts/test-task.sh templates/retry-task.py fetch_api_data
Run with Celery Worker
Start worker
celery -A tasks worker --loglevel=info
Execute tasks
python3 templates/retry-task.py
Common Patterns
Pattern: Retry with Backoff
@app.task( bind=True, autoretry_for=(RequestException,), retry_backoff=True, retry_jitter=True, max_retries=5 )
Pattern: Rate + Time Limits
@app.task( rate_limit='10/m', soft_time_limit=60, time_limit=120 )
Pattern: Validation + Retry
@app.task( bind=True, autoretry_for=(ValidationError, RequestException), retry_backoff=True, max_retries=3 ) def validated_api_task(self, request: dict): # Validate with Pydantic req = ApiRequest(**request) # Make API call with retry return make_call(req.url)
Quick Reference
Task Decorator Options
@app.task( bind=True, # Access self (required for retry) base=CustomTask, # Custom task class autoretry_for=(Exception,), # Exceptions to retry retry_backoff=True, # Exponential backoff retry_backoff_max=600, # Max backoff seconds retry_jitter=True, # Add randomness max_retries=5, # Max attempts default_retry_delay=10, # Default delay rate_limit='10/m', # Rate limit soft_time_limit=60, # Soft timeout (catchable) time_limit=120, # Hard timeout (kills task) ignore_result=False, # Store result priority=5 # Task priority (0-9) )
Retry Syntax
Automatic
autoretry_for=(RequestException, Timeout)
Manual
raise self.retry(exc=exc, countdown=60)
Rate Limit Syntax
rate_limit='10/s' # 10 per second rate_limit='100/m' # 100 per minute rate_limit='1000/h' # 1000 per hour
References
-
Celery Tasks Documentation
-
examples/task-with-retries.md
-
Complete retry guide
-
examples/rate-limiting.md
-
Complete rate limiting guide
-
examples/custom-task-classes.md
-
Custom class guide
Validation
Always run validation before committing:
./scripts/validate-task.sh your-task.py
Validation checks:
-
✅ Required imports and setup
-
✅ Task decorators present
-
✅ Docstrings and type hints
-
✅ Error handling
-
✅ Security (no hardcoded credentials)
-
✅ Best practices compliance