python-async-patterns

Python Async/Await Patterns and Best Practices

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "python-async-patterns" with this command: npx skills add clostaunau/holiday-card/clostaunau-holiday-card-python-async-patterns

Python Async/Await Patterns and Best Practices

Purpose

This skill provides comprehensive guidance on Python asynchronous programming patterns for reviewing and writing async code. It covers async/await syntax, best practices, common patterns, anti-patterns, and framework-specific considerations.

Use this skill when:

  • Reviewing async Python code

  • Writing new async functions or services

  • Refactoring sync code to async

  • Troubleshooting async performance issues

  • Evaluating async library usage

  • Implementing async endpoints in web frameworks

Context

Asynchronous programming in Python enables efficient I/O-bound operations by allowing the event loop to switch between tasks while waiting for I/O operations to complete. However, async code introduces complexity and has specific patterns and pitfalls that must be understood for correct implementation.

Key Principles:

  • Async is beneficial for I/O-bound operations, not CPU-bound tasks

  • The event loop must never be blocked with synchronous operations

  • Coroutines must be properly awaited

  • Resources must be cleaned up correctly in async contexts

  • Mixing sync and async code requires careful consideration

Python Version Requirements:

  • async /await : Python 3.5+

  • asyncio.run() : Python 3.7+

  • asyncio.create_task() : Python 3.7+

  • Task Groups (asyncio.TaskGroup ): Python 3.11+

  • Exception Groups: Python 3.11+

Async Fundamentals

  1. Async/Await Syntax

Defining Async Functions:

Async function (coroutine function)

async def fetch_data(url: str) -> dict: """Coroutine that fetches data asynchronously.""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()

Regular function (synchronous)

def process_data(data: dict) -> list: """Regular synchronous function.""" return [item['value'] for item in data['items']]

Key Differences:

  • async def creates a coroutine function

  • Calling an async function returns a coroutine object (not the result)

  • Must use await to execute the coroutine and get the result

  • Can only use await inside async def functions

  1. Coroutines vs Regular Functions

Coroutine function

async def async_operation(): await asyncio.sleep(1) return "done"

This creates a coroutine object but doesn't execute it

coro = async_operation() # RuntimeWarning: coroutine was never awaited

Must await to execute

result = await async_operation() # Correct

Or run from synchronous code

result = asyncio.run(async_operation()) # Correct (Python 3.7+)

Coroutine Characteristics:

  • Lazy evaluation: not executed until awaited

  • Can be suspended and resumed

  • Return values through await

  • Must be awaited or scheduled for execution

  1. Event Loop Fundamentals

The event loop is the core of async execution:

import asyncio

High-level API (Python 3.7+) - PREFERRED

async def main(): result = await some_async_operation() return result

Run the event loop

if name == "main": result = asyncio.run(main()) # Creates, runs, and closes loop

Event Loop Responsibilities:

  • Schedules and executes coroutines

  • Manages I/O operations

  • Handles callbacks and tasks

  • Coordinates concurrent execution

  1. Using asyncio.run()

GOOD: Modern approach (Python 3.7+)

async def main(): result = await fetch_data() processed = await process_data(result) return processed

if name == "main": result = asyncio.run(main())

AVOID: Manual loop management (legacy, pre-3.7)

loop = asyncio.get_event_loop() try: result = loop.run_until_complete(main()) finally: loop.close()

Why prefer asyncio.run():

  • Automatically creates and closes the loop

  • Handles cleanup properly

  • Simpler and less error-prone

  • Cancels remaining tasks on shutdown

Common Async Patterns

  1. Async Context Managers

Async context managers use async with for resource management:

class AsyncDatabaseConnection: """Example async context manager."""

async def __aenter__(self):
    """Called when entering async with block."""
    self.connection = await self._connect()
    return self.connection

async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Called when exiting async with block."""
    await self.connection.close()
    return False  # Don't suppress exceptions

async def _connect(self):
    """Establish connection."""
    await asyncio.sleep(0.1)  # Simulated async connection
    return {"connected": True}

Usage

async def query_database(): async with AsyncDatabaseConnection() as conn: # Connection automatically closed after block result = await execute_query(conn) return result

Common Async Context Managers:

  • aiohttp.ClientSession() : HTTP client sessions

  • aiofiles.open() : Async file operations

  • Database connection pools (asyncpg, motor)

  • Lock primitives (asyncio.Lock() )

  1. Async Iterators and Generators

class AsyncDataStream: """Async iterator for streaming data."""

def __init__(self, count: int):
    self.count = count
    self.index = 0

def __aiter__(self):
    """Return the async iterator."""
    return self

async def __anext__(self):
    """Get next item asynchronously."""
    if self.index >= self.count:
        raise StopAsyncIteration

    await asyncio.sleep(0.1)  # Simulated async operation
    value = f"item_{self.index}"
    self.index += 1
    return value

Usage

async def process_stream(): async for item in AsyncDataStream(5): print(item)

Async generator (simpler approach)

async def async_range(count: int): """Async generator function.""" for i in range(count): await asyncio.sleep(0.1) yield i

Usage

async def use_generator(): async for value in async_range(5): print(value)

  1. Async Comprehensions

Async list comprehension

async def fetch_all_urls(urls: list[str]) -> list[dict]: async with aiohttp.ClientSession() as session: # Create tasks for all URLs results = [ await fetch_one(session, url) for url in urls ] return results

Async generator expression

async def process_stream_data(stream): processed = ( transform(item) async for item in stream if await validate(item) ) return processed

Async dict comprehension

async def fetch_user_data(user_ids: list[int]) -> dict[int, dict]: return { user_id: await fetch_user(user_id) for user_id in user_ids }

  1. Concurrent Task Execution

Using asyncio.gather()

Run multiple coroutines concurrently

async def fetch_multiple_resources(): # All tasks run concurrently results = await asyncio.gather( fetch_user(1), fetch_posts(1), fetch_comments(1) ) user, posts, comments = results return user, posts, comments

With error handling

async def fetch_with_error_handling(): results = await asyncio.gather( fetch_user(1), fetch_posts(1), fetch_comments(1), return_exceptions=True # Return exceptions instead of raising )

# Check for exceptions
for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Task {i} failed: {result}")

Using asyncio.create_task()

Create tasks explicitly for more control

async def fetch_with_tasks(): # Create tasks (they start executing immediately) user_task = asyncio.create_task(fetch_user(1)) posts_task = asyncio.create_task(fetch_posts(1)) comments_task = asyncio.create_task(fetch_comments(1))

# Wait for all tasks
user = await user_task
posts = await posts_task
comments = await comments_task

return user, posts, comments

Cancel tasks if needed

async def fetch_with_cancellation(): task = asyncio.create_task(long_running_operation())

try:
    result = await asyncio.wait_for(task, timeout=5.0)
    return result
except asyncio.TimeoutError:
    task.cancel()  # Cancel the task
    raise

Using Task Groups (Python 3.11+)

Task groups provide better error handling and cancellation

async def fetch_with_task_group(): async with asyncio.TaskGroup() as tg: user_task = tg.create_task(fetch_user(1)) posts_task = tg.create_task(fetch_posts(1)) comments_task = tg.create_task(fetch_comments(1))

# All tasks complete here (or exception raised)
return user_task.result(), posts_task.result(), comments_task.result()

If any task fails, all tasks are cancelled

async def task_group_error_handling(): try: async with asyncio.TaskGroup() as tg: tg.create_task(task_that_fails()) tg.create_task(task_that_succeeds()) except ExceptionGroup as eg: # All exceptions collected in ExceptionGroup for exc in eg.exceptions: print(f"Task failed: {exc}")

  1. Task Coordination Patterns

Wait for first completion

Wait for first task to complete

async def fetch_from_multiple_sources(urls: list[str]): async with aiohttp.ClientSession() as session: tasks = [ asyncio.create_task(fetch_one(session, url)) for url in urls ]

    # Wait for first completion
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Cancel remaining tasks
    for task in pending:
        task.cancel()

    # Return first result
    return done.pop().result()

Semaphore for rate limiting

Limit concurrent operations

async def fetch_with_rate_limit(urls: list[str], max_concurrent: int = 5): semaphore = asyncio.Semaphore(max_concurrent)

async def fetch_with_sem(url: str):
    async with semaphore:
        return await fetch_one(url)

results = await asyncio.gather(*[
    fetch_with_sem(url) for url in urls
])
return results

Best Practices

  1. Avoid Blocking the Event Loop

BAD: Blocking operations in async function

async def bad_async_function(): # These block the event loop! time.sleep(1) # WRONG requests.get(url) # WRONG open('file.txt').read() # WRONG some_cpu_intensive_task() # WRONG

GOOD: Use async alternatives

async def good_async_function(): await asyncio.sleep(1) # Non-blocking sleep async with aiohttp.ClientSession() as session: await session.get(url) # Async HTTP async with aiofiles.open('file.txt') as f: await f.read() # Async file I/O # For CPU-bound: use asyncio.to_thread() result = await asyncio.to_thread(cpu_intensive_task)

  1. Using asyncio.to_thread for Blocking I/O

import asyncio from functools import partial

For blocking sync operations (Python 3.9+)

async def async_wrapper_for_blocking(): # Run blocking operation in thread pool result = await asyncio.to_thread( blocking_operation, arg1, arg2 ) return result

For CPU-bound tasks

async def process_data_async(data: list): # Offload CPU-intensive work to thread processed = await asyncio.to_thread( cpu_intensive_processing, data ) return processed

With partial for complex arguments

async def complex_blocking_call(): func = partial( blocking_function, timeout=30, retry=3 ) result = await asyncio.to_thread(func) return result

  1. Proper Exception Handling

Handle exceptions in individual coroutines

async def fetch_with_exception_handling(url: str): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: response.raise_for_status() return await response.json() except aiohttp.ClientError as e: # Handle specific async HTTP errors print(f"HTTP error: {e}") raise except asyncio.TimeoutError: # Handle timeout print(f"Timeout fetching {url}") raise except Exception as e: # Handle unexpected errors print(f"Unexpected error: {e}") raise

Handle exceptions in gather()

async def fetch_multiple_with_errors(urls: list[str]): results = await asyncio.gather( *[fetch_with_exception_handling(url) for url in urls], return_exceptions=True )

# Process results and exceptions
successful = []
failed = []

for url, result in zip(urls, results):
    if isinstance(result, Exception):
        failed.append((url, result))
    else:
        successful.append(result)

return successful, failed

Exception groups (Python 3.11+)

async def handle_exception_groups(): try: async with asyncio.TaskGroup() as tg: tg.create_task(task1()) tg.create_task(task2()) except* ValueError as eg: # Handle all ValueErrors print(f"Value errors: {eg.exceptions}") except* TypeError as eg: # Handle all TypeErrors print(f"Type errors: {eg.exceptions}")

  1. Cancellation Handling

Proper cancellation handling

async def cancellable_operation(): try: await long_running_task() except asyncio.CancelledError: # Clean up resources await cleanup() # Re-raise to propagate cancellation raise

Shielding from cancellation

async def critical_operation(): # This won't be cancelled await asyncio.shield(important_task())

Timeout with proper cleanup

async def operation_with_timeout(): try: result = await asyncio.wait_for( some_operation(), timeout=5.0 ) return result except asyncio.TimeoutError: # Cleanup happens here await cleanup() raise

  1. Timeout Patterns

Single operation timeout

async def fetch_with_timeout(url: str, timeout: float = 10.0): try: async with aiohttp.ClientSession() as session: async with asyncio.timeout(timeout): # Python 3.11+ async with session.get(url) as response: return await response.json() except asyncio.TimeoutError: print(f"Timeout after {timeout}s") raise

Legacy timeout (Python < 3.11)

async def fetch_with_timeout_legacy(url: str, timeout: float = 10.0): try: result = await asyncio.wait_for( fetch_one(url), timeout=timeout ) return result except asyncio.TimeoutError: print(f"Timeout after {timeout}s") raise

Multiple timeouts

async def complex_operation_with_timeouts(): # Per-operation timeouts async with asyncio.timeout(30): # Overall timeout result1 = await asyncio.wait_for(op1(), timeout=10) result2 = await asyncio.wait_for(op2(), timeout=10) result3 = await asyncio.wait_for(op3(), timeout=10) return result1, result2, result3

  1. Resource Cleanup

Always use async context managers

async def proper_resource_cleanup(): # GOOD: Automatic cleanup async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Session and response closed automatically

For resources without context managers

async def manual_cleanup(): resource = await acquire_resource() try: result = await use_resource(resource) return result finally: # Always cleanup await resource.close()

Async cleanup in exception handlers

async def cleanup_on_error(): resources = [] try: for i in range(10): resource = await acquire_resource(i) resources.append(resource) await use_resource(resource) except Exception as e: # Cleanup all acquired resources await asyncio.gather( *[res.close() for res in resources], return_exceptions=True ) raise finally: # Final cleanup await asyncio.gather( *[res.close() for res in resources], return_exceptions=True )

Common Libraries

  1. aiohttp - Async HTTP Client/Server

import aiohttp import asyncio

Client usage

async def fetch_data(url: str) -> dict: """Fetch data using aiohttp.""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: response.raise_for_status() return await response.json()

With custom headers and timeout

async def fetch_with_options(url: str): headers = {"Authorization": "Bearer token"} timeout = aiohttp.ClientTimeout(total=30)

async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
    async with session.get(url) as response:
        return await response.text()

POST request

async def post_data(url: str, data: dict): async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: return await response.json()

Concurrent requests with rate limiting

async def fetch_multiple(urls: list[str], max_concurrent: int = 5): semaphore = asyncio.Semaphore(max_concurrent)

async def fetch_one(session, url):
    async with semaphore:
        async with session.get(url) as response:
            return await response.json()

async with aiohttp.ClientSession() as session:
    tasks = [fetch_one(session, url) for url in urls]
    return await asyncio.gather(*tasks)

2. aiofiles - Async File I/O

import aiofiles

Read file asynchronously

async def read_file(filepath: str) -> str: """Read file contents asynchronously.""" async with aiofiles.open(filepath, mode='r') as f: contents = await f.read() return contents

Write file asynchronously

async def write_file(filepath: str, content: str): """Write content to file asynchronously.""" async with aiofiles.open(filepath, mode='w') as f: await f.write(content)

Read lines

async def read_lines(filepath: str) -> list[str]: """Read file line by line.""" lines = [] async with aiofiles.open(filepath, mode='r') as f: async for line in f: lines.append(line.strip()) return lines

Process large file

async def process_large_file(filepath: str): """Process file line by line without loading all into memory.""" async with aiofiles.open(filepath, mode='r') as f: async for line in f: processed = await process_line(line) await save_result(processed)

  1. asyncpg - Async PostgreSQL Driver

import asyncpg

Create connection pool

async def create_pool(): """Create database connection pool.""" pool = await asyncpg.create_pool( host='localhost', port=5432, user='user', password='password', database='mydb', min_size=10, max_size=20 ) return pool

Query with pool

async def fetch_users(pool: asyncpg.Pool): """Fetch users from database.""" async with pool.acquire() as connection: rows = await connection.fetch( 'SELECT id, name, email FROM users WHERE active = $1', True ) return [dict(row) for row in rows]

Transaction

async def create_user_with_profile(pool: asyncpg.Pool, user_data: dict): """Create user and profile in transaction.""" async with pool.acquire() as connection: async with connection.transaction(): user_id = await connection.fetchval( 'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id', user_data['name'], user_data['email'] ) await connection.execute( 'INSERT INTO profiles (user_id, bio) VALUES ($1, $2)', user_id, user_data['bio'] ) return user_id

Prepared statements for better performance

async def fetch_user_by_id(pool: asyncpg.Pool, user_id: int): """Fetch user using prepared statement.""" async with pool.acquire() as connection: stmt = await connection.prepare('SELECT * FROM users WHERE id = $1') row = await stmt.fetchrow(user_id) return dict(row) if row else None

  1. motor - Async MongoDB Driver

from motor.motor_asyncio import AsyncIOMotorClient

Create client

async def create_mongo_client(): """Create MongoDB client.""" client = AsyncIOMotorClient('mongodb://localhost:27017') return client

Query documents

async def fetch_documents(collection_name: str): """Fetch documents from MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name]

cursor = collection.find({'active': True})
documents = await cursor.to_list(length=100)
return documents

Insert document

async def insert_document(collection_name: str, document: dict): """Insert document into MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name]

result = await collection.insert_one(document)
return result.inserted_id

Update document

async def update_document(collection_name: str, filter_dict: dict, update_dict: dict): """Update document in MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name]

result = await collection.update_one(
    filter_dict,
    {'$set': update_dict}
)
return result.modified_count

Async iteration over cursor

async def process_large_collection(collection_name: str): """Process large collection with cursor.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name]

cursor = collection.find({})
async for document in cursor:
    await process_document(document)

Anti-Patterns to Avoid

  1. Not Awaiting Coroutines

BAD: Coroutine never executed

async def bad_example(): result = fetch_data() # RuntimeWarning: coroutine 'fetch_data' was never awaited return result

GOOD: Properly awaited

async def good_example(): result = await fetch_data() return result

  1. Mixing Sync and Async Incorrectly

BAD: Calling async function from sync code without event loop

def sync_function(): result = await async_function() # SyntaxError: 'await' outside async function return result

GOOD: Use asyncio.run() to bridge sync/async

def sync_function(): result = asyncio.run(async_function()) return result

BAD: Creating new event loop in async context

async def bad_nested(): result = asyncio.run(another_async()) # RuntimeError: asyncio.run() cannot be called from a running event loop return result

GOOD: Just await in async context

async def good_nested(): result = await another_async() return result

  1. Creating Event Loops Manually

BAD: Manual loop management (unless necessary)

def bad_approach(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete(main()) finally: loop.close() return result

GOOD: Use asyncio.run()

def good_approach(): result = asyncio.run(main()) return result

EXCEPTION: Multiple event loops needed (advanced use case)

Only create manual loops when you specifically need multiple loops

  1. Blocking Operations in Async Functions

BAD: Blocking the event loop

async def bad_blocking(): time.sleep(5) # Blocks entire event loop! data = requests.get(url).json() # Blocks event loop! with open('file.txt') as f: content = f.read() # Blocks event loop! result = compute_fibonacci(1000000) # Blocks event loop! return result

GOOD: Use async alternatives

async def good_async(): await asyncio.sleep(5) # Non-blocking async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Non-blocking async with aiofiles.open('file.txt') as f: content = await f.read() # Non-blocking result = await asyncio.to_thread(compute_fibonacci, 1000000) # Non-blocking return result

  1. Improper Exception Handling

BAD: Bare except swallows CancelledError

async def bad_exception_handling(): try: await some_operation() except: # Catches CancelledError, preventing proper cancellation! print("Error occurred")

GOOD: Specific exception handling

async def good_exception_handling(): try: await some_operation() except asyncio.CancelledError: # Handle cancellation specifically await cleanup() raise # Re-raise to propagate except Exception as e: # Handle other exceptions print(f"Error: {e}") raise

BAD: Silencing errors in gather()

async def bad_gather(): results = await asyncio.gather( task1(), task2(), task3(), return_exceptions=True ) return results # Might contain exceptions that are ignored

GOOD: Check for exceptions

async def good_gather(): results = await asyncio.gather( task1(), task2(), task3(), return_exceptions=True )

for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Task {i} failed: {result}")
        # Handle or raise as appropriate

return [r for r in results if not isinstance(r, Exception)]

6. Resource Leaks

BAD: Not closing resources

async def bad_resource_management(): session = aiohttp.ClientSession() response = await session.get(url) data = await response.json() # Session and response never closed! return data

GOOD: Use context managers

async def good_resource_management(): async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Automatically closed return data

BAD: Creating session per request

async def bad_session_usage(): for url in urls: async with aiohttp.ClientSession() as session: # Creating new session each iteration is wasteful! await session.get(url)

GOOD: Reuse session

async def good_session_usage(): async with aiohttp.ClientSession() as session: for url in urls: # Reuse same session await session.get(url)

  1. Not Using Task Groups for Error Propagation (Python 3.11+)

BAD: Errors might be swallowed

async def bad_error_propagation(): tasks = [ asyncio.create_task(task1()), asyncio.create_task(task2()), asyncio.create_task(task3()) ] # If task2 fails, task1 and task3 keep running await asyncio.gather(*tasks)

GOOD: Use TaskGroup for automatic cancellation

async def good_error_propagation(): async with asyncio.TaskGroup() as tg: tg.create_task(task1()) tg.create_task(task2()) tg.create_task(task3()) # If any task fails, all tasks are cancelled

Framework-Specific Patterns

  1. FastAPI Async Endpoints

from fastapi import FastAPI, HTTPException, Depends from typing import List import asyncpg

app = FastAPI()

Async endpoint

@app.get("/users/{user_id}") async def get_user(user_id: int, db: asyncpg.Pool = Depends(get_db_pool)): """Async endpoint that queries database.""" async with db.acquire() as connection: user = await connection.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) if not user: raise HTTPException(status_code=404, detail="User not found") return dict(user)

Async endpoint with external API call

@app.get("/external-data") async def fetch_external_data(): """Fetch data from external API.""" async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as response: return await response.json()

Background tasks (async)

from fastapi import BackgroundTasks

@app.post("/send-notification") async def send_notification( email: str, background_tasks: BackgroundTasks ): """Send notification in background.""" background_tasks.add_task(send_email_async, email) return {"message": "Notification queued"}

async def send_email_async(email: str): """Send email asynchronously.""" async with aiohttp.ClientSession() as session: await session.post( 'https://email-service.com/send', json={'to': email} )

Dependency injection with async

async def get_db_pool() -> asyncpg.Pool: """Dependency that provides database pool.""" pool = await asyncpg.create_pool(...) try: yield pool finally: await pool.close()

Lifespan events

from contextlib import asynccontextmanager

@asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown events.""" # Startup app.state.db_pool = await asyncpg.create_pool(...) app.state.http_session = aiohttp.ClientSession() yield # Shutdown await app.state.db_pool.close() await app.state.http_session.close()

app = FastAPI(lifespan=lifespan)

  1. Django Async Views (Django 3.1+)

from django.http import JsonResponse from django.views import View from asgiref.sync import sync_to_async import aiohttp

Async function-based view

async def async_user_list(request): """Async view for listing users.""" # Use sync_to_async for Django ORM users = await sync_to_async(list)( User.objects.filter(active=True) ) return JsonResponse({ 'users': [ {'id': u.id, 'name': u.name} for u in users ] })

Async class-based view

class AsyncUserDetailView(View): """Async class-based view."""

async def get(self, request, user_id):
    """Get user details asynchronously."""
    # Wrap ORM calls with sync_to_async
    user = await sync_to_async(User.objects.get)(id=user_id)
    return JsonResponse({
        'id': user.id,
        'name': user.name,
        'email': user.email
    })

Async view with external API

async def fetch_external_data_view(request): """Fetch data from external API.""" async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as response: data = await response.json() return JsonResponse(data)

Django ORM with async (Django 4.1+)

async def async_orm_view(request): """Use async ORM operations.""" # Django 4.1+ supports async ORM users = [user async for user in User.objects.filter(active=True)] count = await User.objects.filter(active=True).acount()

return JsonResponse({
    'count': count,
    'users': [{'id': u.id, 'name': u.name} for u in users]
})

Middleware (async)

class AsyncMiddleware: """Async middleware example."""

async def __call__(self, request):
    # Pre-processing
    await log_request_async(request)

    # Call next middleware/view
    response = await self.get_response(request)

    # Post-processing
    await log_response_async(response)

    return response

3. Async Database Operations

asyncpg with connection pool

class AsyncPostgresService: """Service for async PostgreSQL operations."""

def __init__(self, pool: asyncpg.Pool):
    self.pool = pool

async def get_user(self, user_id: int) -> dict | None:
    """Get user by ID."""
    async with self.pool.acquire() as conn:
        row = await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            user_id
        )
        return dict(row) if row else None

async def create_user(self, name: str, email: str) -> int:
    """Create new user."""
    async with self.pool.acquire() as conn:
        user_id = await conn.fetchval(
            'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id',
            name,
            email
        )
        return user_id

async def update_user(self, user_id: int, **kwargs) -> bool:
    """Update user fields."""
    if not kwargs:
        return False

    set_clause = ', '.join(f"{k} = ${i+2}" for i, k in enumerate(kwargs.keys()))
    query = f'UPDATE users SET {set_clause} WHERE id = $1'

    async with self.pool.acquire() as conn:
        result = await conn.execute(query, user_id, *kwargs.values())
        return result != 'UPDATE 0'

async def batch_insert_users(self, users: list[tuple[str, str]]) -> int:
    """Batch insert users."""
    async with self.pool.acquire() as conn:
        result = await conn.executemany(
            'INSERT INTO users (name, email) VALUES ($1, $2)',
            users
        )
        return len(users)

Motor (MongoDB) service

class AsyncMongoService: """Service for async MongoDB operations."""

def __init__(self, client: AsyncIOMotorClient):
    self.db = client.mydb

async def get_document(self, collection: str, doc_id: str) -> dict | None:
    """Get document by ID."""
    coll = self.db[collection]
    return await coll.find_one({'_id': doc_id})

async def insert_document(self, collection: str, document: dict) -> str:
    """Insert document."""
    coll = self.db[collection]
    result = await coll.insert_one(document)
    return str(result.inserted_id)

async def find_documents(
    self,
    collection: str,
    filter_dict: dict,
    limit: int = 100
) -> list[dict]:
    """Find documents matching filter."""
    coll = self.db[collection]
    cursor = coll.find(filter_dict).limit(limit)
    return await cursor.to_list(length=limit)

async def aggregate(self, collection: str, pipeline: list[dict]) -> list[dict]:
    """Run aggregation pipeline."""
    coll = self.db[collection]
    cursor = coll.aggregate(pipeline)
    return await cursor.to_list(length=None)

Performance Considerations

  1. When Async Provides Benefits (I/O-Bound)

Async is beneficial for I/O-bound operations:

GOOD: I/O-bound operations benefit from async

async def io_bound_operations(): """These operations benefit from async."""

# Network requests
async with aiohttp.ClientSession() as session:
    responses = await asyncio.gather(
        session.get('https://api1.example.com'),
        session.get('https://api2.example.com'),
        session.get('https://api3.example.com')
    )

# Database queries
async with pool.acquire() as conn:
    users, posts, comments = await asyncio.gather(
        conn.fetch('SELECT * FROM users'),
        conn.fetch('SELECT * FROM posts'),
        conn.fetch('SELECT * FROM comments')
    )

# File I/O
async with aiofiles.open('large_file.txt') as f:
    content = await f.read()

return responses, users, posts, comments

Example: 100 HTTP requests

async def fetch_many_urls(urls: list[str]): """Async is much faster for concurrent I/O.""" async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in urls] responses = await asyncio.gather(*tasks) # With async: ~2 seconds # With sync requests: ~200 seconds (2s per request × 100)

  1. When Async Doesn't Help (CPU-Bound)

Async provides no benefit for CPU-bound operations:

BAD: CPU-bound operations don't benefit from async

async def cpu_bound_async(): """This gains nothing from being async.""" result = compute_fibonacci(1000000) # Blocks event loop! return result

GOOD: Use asyncio.to_thread for CPU-bound work

async def cpu_bound_with_threads(): """Offload CPU-bound work to threads.""" result = await asyncio.to_thread(compute_fibonacci, 1000000) return result

BETTER: Use ProcessPoolExecutor for true parallelism

from concurrent.futures import ProcessPoolExecutor

async def cpu_bound_with_processes(): """Use processes for CPU-bound parallelism.""" loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, compute_fibonacci, 1000000 ) return result

Example: Multiple CPU-bound tasks

async def parallel_cpu_tasks(numbers: list[int]): """Run CPU-bound tasks in parallel using processes.""" loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: results = await asyncio.gather(*[ loop.run_in_executor(pool, compute_fibonacci, n) for n in numbers ]) return results

  1. Measuring Async Performance

import time import asyncio from functools import wraps

Decorator to measure async function performance

def async_timer(func): """Measure execution time of async function.""" @wraps(func) async def wrapper(*args, **kwargs): start = time.perf_counter() result = await func(*args, **kwargs) end = time.perf_counter() print(f"{func.name} took {end - start:.2f} seconds") return result return wrapper

Usage

@async_timer async def fetch_data(url: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()

Context manager for timing blocks

from contextlib import asynccontextmanager

@asynccontextmanager async def timer(name: str): """Time a block of async code.""" start = time.perf_counter() yield end = time.perf_counter() print(f"{name} took {end - start:.2f} seconds")

Usage

async def timed_operations(): async with timer("Database queries"): users = await fetch_users() posts = await fetch_posts()

async with timer("External API calls"):
    data = await fetch_external_data()

Compare sync vs async performance

async def compare_performance(): """Compare sync and async performance.""" urls = ['https://example.com'] * 100

# Sync version (for comparison)
start = time.perf_counter()
sync_results = [requests.get(url).json() for url in urls]
sync_time = time.perf_counter() - start
print(f"Sync: {sync_time:.2f}s")

# Async version
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
    tasks = [session.get(url) for url in urls]
    async_results = await asyncio.gather(*tasks)
async_time = time.perf_counter() - start
print(f"Async: {async_time:.2f}s")
print(f"Speedup: {sync_time / async_time:.1f}x")

Code Review Checklist

Use this checklist when reviewing async Python code:

Syntax and Structure

  • All async functions defined with async def

  • All coroutines properly awaited

  • asyncio.run() used for top-level entry point (Python 3.7+)

  • No manual event loop creation (unless necessary)

  • Async context managers used with async with

  • Async iterators used with async for

Concurrency Patterns

  • asyncio.gather() used for concurrent operations

  • asyncio.create_task() used for fire-and-forget tasks

  • Task groups used for error propagation (Python 3.11+)

  • Semaphores used for rate limiting

  • Proper task cancellation handling

Event Loop

  • No blocking operations in async functions

  • asyncio.to_thread() used for blocking I/O (Python 3.9+)

  • No time.sleep() (use asyncio.sleep() )

  • No sync HTTP libraries (use aiohttp )

  • No sync file operations (use aiofiles )

Exception Handling

  • Specific exception types caught

  • asyncio.CancelledError handled and re-raised

  • return_exceptions=True used appropriately in gather()

  • Exceptions in tasks not silently ignored

  • Exception groups used for multiple task errors (Python 3.11+)

Resource Management

  • Async context managers used for resources

  • Resources properly closed in finally blocks

  • No session/connection leaks

  • Sessions reused across multiple requests

  • Connection pools used for databases

Timeouts and Cancellation

  • Timeouts set for external operations

  • asyncio.wait_for() or asyncio.timeout() used

  • Tasks can be cancelled gracefully

  • Critical sections shielded from cancellation if needed

  • Cleanup performed on timeout/cancellation

Library Usage

  • aiohttp used instead of requests

  • aiofiles used for async file I/O

  • Async database drivers used (asyncpg , motor )

  • Library-specific best practices followed

Framework Integration

  • FastAPI async endpoints defined correctly

  • Django ORM calls wrapped with sync_to_async (Django < 4.1)

  • Async ORM operations used (Django 4.1+)

  • Dependency injection works with async

Performance

  • Async used for I/O-bound operations

  • CPU-bound work offloaded to threads/processes

  • Unnecessary async overhead avoided

  • Appropriate concurrency limits set

Type Hints

  • Async functions return type hints

  • Coroutine types annotated

  • Awaitable , Coroutine types used where appropriate

Related Skills

  • python-testing: For testing async code with pytest-asyncio

  • python-type-hints: For properly typing async functions and coroutines

  • python-error-handling: For exception handling patterns

  • fastapi-best-practices: For FastAPI-specific async patterns

  • django-best-practices: For Django async views and ORM

References

  • PEP 492 - Coroutines with async and await syntax

  • PEP 3156 - Asynchronous IO Support Rebooted: the "asyncio" Module

  • Python asyncio Documentation

  • aiohttp Documentation

  • Real Python - Async IO in Python

  • FastAPI Async Documentation

  • Django Async Documentation

Version: 1.0 Last Updated: 2025-12-24 Maintainer: Claude Code Skills

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

python-type-hints-guide

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python-testing-standards

No summary provided by upstream source.

Repository SourceNeeds Review
General

java-best-practices

No summary provided by upstream source.

Repository SourceNeeds Review