Python Async/Await Patterns and Best Practices
Purpose
This skill provides comprehensive guidance on Python asynchronous programming patterns for reviewing and writing async code. It covers async/await syntax, best practices, common patterns, anti-patterns, and framework-specific considerations.
Use this skill when:
-
Reviewing async Python code
-
Writing new async functions or services
-
Refactoring sync code to async
-
Troubleshooting async performance issues
-
Evaluating async library usage
-
Implementing async endpoints in web frameworks
Context
Asynchronous programming in Python enables efficient I/O-bound operations by allowing the event loop to switch between tasks while waiting for I/O operations to complete. However, async code introduces complexity and has specific patterns and pitfalls that must be understood for correct implementation.
Key Principles:
-
Async is beneficial for I/O-bound operations, not CPU-bound tasks
-
The event loop must never be blocked with synchronous operations
-
Coroutines must be properly awaited
-
Resources must be cleaned up correctly in async contexts
-
Mixing sync and async code requires careful consideration
Python Version Requirements:
-
async /await : Python 3.5+
-
asyncio.run() : Python 3.7+
-
asyncio.create_task() : Python 3.7+
-
Task Groups (asyncio.TaskGroup ): Python 3.11+
-
Exception Groups: Python 3.11+
Async Fundamentals
- Async/Await Syntax
Defining Async Functions:
Async function (coroutine function)
async def fetch_data(url: str) -> dict: """Coroutine that fetches data asynchronously.""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()
Regular function (synchronous)
def process_data(data: dict) -> list: """Regular synchronous function.""" return [item['value'] for item in data['items']]
Key Differences:
-
async def creates a coroutine function
-
Calling an async function returns a coroutine object (not the result)
-
Must use await to execute the coroutine and get the result
-
Can only use await inside async def functions
- Coroutines vs Regular Functions
Coroutine function
async def async_operation(): await asyncio.sleep(1) return "done"
This creates a coroutine object but doesn't execute it
coro = async_operation() # RuntimeWarning: coroutine was never awaited
Must await to execute
result = await async_operation() # Correct
Or run from synchronous code
result = asyncio.run(async_operation()) # Correct (Python 3.7+)
Coroutine Characteristics:
-
Lazy evaluation: not executed until awaited
-
Can be suspended and resumed
-
Return values through await
-
Must be awaited or scheduled for execution
- Event Loop Fundamentals
The event loop is the core of async execution:
import asyncio
High-level API (Python 3.7+) - PREFERRED
async def main(): result = await some_async_operation() return result
Run the event loop
if name == "main": result = asyncio.run(main()) # Creates, runs, and closes loop
Event Loop Responsibilities:
-
Schedules and executes coroutines
-
Manages I/O operations
-
Handles callbacks and tasks
-
Coordinates concurrent execution
- Using asyncio.run()
GOOD: Modern approach (Python 3.7+)
async def main(): result = await fetch_data() processed = await process_data(result) return processed
if name == "main": result = asyncio.run(main())
AVOID: Manual loop management (legacy, pre-3.7)
loop = asyncio.get_event_loop() try: result = loop.run_until_complete(main()) finally: loop.close()
Why prefer asyncio.run():
-
Automatically creates and closes the loop
-
Handles cleanup properly
-
Simpler and less error-prone
-
Cancels remaining tasks on shutdown
Common Async Patterns
- Async Context Managers
Async context managers use async with for resource management:
class AsyncDatabaseConnection: """Example async context manager."""
async def __aenter__(self):
"""Called when entering async with block."""
self.connection = await self._connect()
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Called when exiting async with block."""
await self.connection.close()
return False # Don't suppress exceptions
async def _connect(self):
"""Establish connection."""
await asyncio.sleep(0.1) # Simulated async connection
return {"connected": True}
Usage
async def query_database(): async with AsyncDatabaseConnection() as conn: # Connection automatically closed after block result = await execute_query(conn) return result
Common Async Context Managers:
-
aiohttp.ClientSession() : HTTP client sessions
-
aiofiles.open() : Async file operations
-
Database connection pools (asyncpg, motor)
-
Lock primitives (asyncio.Lock() )
- Async Iterators and Generators
class AsyncDataStream: """Async iterator for streaming data."""
def __init__(self, count: int):
self.count = count
self.index = 0
def __aiter__(self):
"""Return the async iterator."""
return self
async def __anext__(self):
"""Get next item asynchronously."""
if self.index >= self.count:
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulated async operation
value = f"item_{self.index}"
self.index += 1
return value
Usage
async def process_stream(): async for item in AsyncDataStream(5): print(item)
Async generator (simpler approach)
async def async_range(count: int): """Async generator function.""" for i in range(count): await asyncio.sleep(0.1) yield i
Usage
async def use_generator(): async for value in async_range(5): print(value)
- Async Comprehensions
Async list comprehension
async def fetch_all_urls(urls: list[str]) -> list[dict]: async with aiohttp.ClientSession() as session: # Create tasks for all URLs results = [ await fetch_one(session, url) for url in urls ] return results
Async generator expression
async def process_stream_data(stream): processed = ( transform(item) async for item in stream if await validate(item) ) return processed
Async dict comprehension
async def fetch_user_data(user_ids: list[int]) -> dict[int, dict]: return { user_id: await fetch_user(user_id) for user_id in user_ids }
- Concurrent Task Execution
Using asyncio.gather()
Run multiple coroutines concurrently
async def fetch_multiple_resources(): # All tasks run concurrently results = await asyncio.gather( fetch_user(1), fetch_posts(1), fetch_comments(1) ) user, posts, comments = results return user, posts, comments
With error handling
async def fetch_with_error_handling(): results = await asyncio.gather( fetch_user(1), fetch_posts(1), fetch_comments(1), return_exceptions=True # Return exceptions instead of raising )
# Check for exceptions
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
Using asyncio.create_task()
Create tasks explicitly for more control
async def fetch_with_tasks(): # Create tasks (they start executing immediately) user_task = asyncio.create_task(fetch_user(1)) posts_task = asyncio.create_task(fetch_posts(1)) comments_task = asyncio.create_task(fetch_comments(1))
# Wait for all tasks
user = await user_task
posts = await posts_task
comments = await comments_task
return user, posts, comments
Cancel tasks if needed
async def fetch_with_cancellation(): task = asyncio.create_task(long_running_operation())
try:
result = await asyncio.wait_for(task, timeout=5.0)
return result
except asyncio.TimeoutError:
task.cancel() # Cancel the task
raise
Using Task Groups (Python 3.11+)
Task groups provide better error handling and cancellation
async def fetch_with_task_group(): async with asyncio.TaskGroup() as tg: user_task = tg.create_task(fetch_user(1)) posts_task = tg.create_task(fetch_posts(1)) comments_task = tg.create_task(fetch_comments(1))
# All tasks complete here (or exception raised)
return user_task.result(), posts_task.result(), comments_task.result()
If any task fails, all tasks are cancelled
async def task_group_error_handling(): try: async with asyncio.TaskGroup() as tg: tg.create_task(task_that_fails()) tg.create_task(task_that_succeeds()) except ExceptionGroup as eg: # All exceptions collected in ExceptionGroup for exc in eg.exceptions: print(f"Task failed: {exc}")
- Task Coordination Patterns
Wait for first completion
Wait for first task to complete
async def fetch_from_multiple_sources(urls: list[str]): async with aiohttp.ClientSession() as session: tasks = [ asyncio.create_task(fetch_one(session, url)) for url in urls ]
# Wait for first completion
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel remaining tasks
for task in pending:
task.cancel()
# Return first result
return done.pop().result()
Semaphore for rate limiting
Limit concurrent operations
async def fetch_with_rate_limit(urls: list[str], max_concurrent: int = 5): semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_sem(url: str):
async with semaphore:
return await fetch_one(url)
results = await asyncio.gather(*[
fetch_with_sem(url) for url in urls
])
return results
Best Practices
- Avoid Blocking the Event Loop
BAD: Blocking operations in async function
async def bad_async_function(): # These block the event loop! time.sleep(1) # WRONG requests.get(url) # WRONG open('file.txt').read() # WRONG some_cpu_intensive_task() # WRONG
GOOD: Use async alternatives
async def good_async_function(): await asyncio.sleep(1) # Non-blocking sleep async with aiohttp.ClientSession() as session: await session.get(url) # Async HTTP async with aiofiles.open('file.txt') as f: await f.read() # Async file I/O # For CPU-bound: use asyncio.to_thread() result = await asyncio.to_thread(cpu_intensive_task)
- Using asyncio.to_thread for Blocking I/O
import asyncio from functools import partial
For blocking sync operations (Python 3.9+)
async def async_wrapper_for_blocking(): # Run blocking operation in thread pool result = await asyncio.to_thread( blocking_operation, arg1, arg2 ) return result
For CPU-bound tasks
async def process_data_async(data: list): # Offload CPU-intensive work to thread processed = await asyncio.to_thread( cpu_intensive_processing, data ) return processed
With partial for complex arguments
async def complex_blocking_call(): func = partial( blocking_function, timeout=30, retry=3 ) result = await asyncio.to_thread(func) return result
- Proper Exception Handling
Handle exceptions in individual coroutines
async def fetch_with_exception_handling(url: str): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: response.raise_for_status() return await response.json() except aiohttp.ClientError as e: # Handle specific async HTTP errors print(f"HTTP error: {e}") raise except asyncio.TimeoutError: # Handle timeout print(f"Timeout fetching {url}") raise except Exception as e: # Handle unexpected errors print(f"Unexpected error: {e}") raise
Handle exceptions in gather()
async def fetch_multiple_with_errors(urls: list[str]): results = await asyncio.gather( *[fetch_with_exception_handling(url) for url in urls], return_exceptions=True )
# Process results and exceptions
successful = []
failed = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
failed.append((url, result))
else:
successful.append(result)
return successful, failed
Exception groups (Python 3.11+)
async def handle_exception_groups(): try: async with asyncio.TaskGroup() as tg: tg.create_task(task1()) tg.create_task(task2()) except* ValueError as eg: # Handle all ValueErrors print(f"Value errors: {eg.exceptions}") except* TypeError as eg: # Handle all TypeErrors print(f"Type errors: {eg.exceptions}")
- Cancellation Handling
Proper cancellation handling
async def cancellable_operation(): try: await long_running_task() except asyncio.CancelledError: # Clean up resources await cleanup() # Re-raise to propagate cancellation raise
Shielding from cancellation
async def critical_operation(): # This won't be cancelled await asyncio.shield(important_task())
Timeout with proper cleanup
async def operation_with_timeout(): try: result = await asyncio.wait_for( some_operation(), timeout=5.0 ) return result except asyncio.TimeoutError: # Cleanup happens here await cleanup() raise
- Timeout Patterns
Single operation timeout
async def fetch_with_timeout(url: str, timeout: float = 10.0): try: async with aiohttp.ClientSession() as session: async with asyncio.timeout(timeout): # Python 3.11+ async with session.get(url) as response: return await response.json() except asyncio.TimeoutError: print(f"Timeout after {timeout}s") raise
Legacy timeout (Python < 3.11)
async def fetch_with_timeout_legacy(url: str, timeout: float = 10.0): try: result = await asyncio.wait_for( fetch_one(url), timeout=timeout ) return result except asyncio.TimeoutError: print(f"Timeout after {timeout}s") raise
Multiple timeouts
async def complex_operation_with_timeouts(): # Per-operation timeouts async with asyncio.timeout(30): # Overall timeout result1 = await asyncio.wait_for(op1(), timeout=10) result2 = await asyncio.wait_for(op2(), timeout=10) result3 = await asyncio.wait_for(op3(), timeout=10) return result1, result2, result3
- Resource Cleanup
Always use async context managers
async def proper_resource_cleanup(): # GOOD: Automatic cleanup async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Session and response closed automatically
For resources without context managers
async def manual_cleanup(): resource = await acquire_resource() try: result = await use_resource(resource) return result finally: # Always cleanup await resource.close()
Async cleanup in exception handlers
async def cleanup_on_error(): resources = [] try: for i in range(10): resource = await acquire_resource(i) resources.append(resource) await use_resource(resource) except Exception as e: # Cleanup all acquired resources await asyncio.gather( *[res.close() for res in resources], return_exceptions=True ) raise finally: # Final cleanup await asyncio.gather( *[res.close() for res in resources], return_exceptions=True )
Common Libraries
- aiohttp - Async HTTP Client/Server
import aiohttp import asyncio
Client usage
async def fetch_data(url: str) -> dict: """Fetch data using aiohttp.""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: response.raise_for_status() return await response.json()
With custom headers and timeout
async def fetch_with_options(url: str): headers = {"Authorization": "Bearer token"} timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.get(url) as response:
return await response.text()
POST request
async def post_data(url: str, data: dict): async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: return await response.json()
Concurrent requests with rate limiting
async def fetch_multiple(urls: list[str], max_concurrent: int = 5): semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(session, url):
async with semaphore:
async with session.get(url) as response:
return await response.json()
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
2. aiofiles - Async File I/O
import aiofiles
Read file asynchronously
async def read_file(filepath: str) -> str: """Read file contents asynchronously.""" async with aiofiles.open(filepath, mode='r') as f: contents = await f.read() return contents
Write file asynchronously
async def write_file(filepath: str, content: str): """Write content to file asynchronously.""" async with aiofiles.open(filepath, mode='w') as f: await f.write(content)
Read lines
async def read_lines(filepath: str) -> list[str]: """Read file line by line.""" lines = [] async with aiofiles.open(filepath, mode='r') as f: async for line in f: lines.append(line.strip()) return lines
Process large file
async def process_large_file(filepath: str): """Process file line by line without loading all into memory.""" async with aiofiles.open(filepath, mode='r') as f: async for line in f: processed = await process_line(line) await save_result(processed)
- asyncpg - Async PostgreSQL Driver
import asyncpg
Create connection pool
async def create_pool(): """Create database connection pool.""" pool = await asyncpg.create_pool( host='localhost', port=5432, user='user', password='password', database='mydb', min_size=10, max_size=20 ) return pool
Query with pool
async def fetch_users(pool: asyncpg.Pool): """Fetch users from database.""" async with pool.acquire() as connection: rows = await connection.fetch( 'SELECT id, name, email FROM users WHERE active = $1', True ) return [dict(row) for row in rows]
Transaction
async def create_user_with_profile(pool: asyncpg.Pool, user_data: dict): """Create user and profile in transaction.""" async with pool.acquire() as connection: async with connection.transaction(): user_id = await connection.fetchval( 'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id', user_data['name'], user_data['email'] ) await connection.execute( 'INSERT INTO profiles (user_id, bio) VALUES ($1, $2)', user_id, user_data['bio'] ) return user_id
Prepared statements for better performance
async def fetch_user_by_id(pool: asyncpg.Pool, user_id: int): """Fetch user using prepared statement.""" async with pool.acquire() as connection: stmt = await connection.prepare('SELECT * FROM users WHERE id = $1') row = await stmt.fetchrow(user_id) return dict(row) if row else None
- motor - Async MongoDB Driver
from motor.motor_asyncio import AsyncIOMotorClient
Create client
async def create_mongo_client(): """Create MongoDB client.""" client = AsyncIOMotorClient('mongodb://localhost:27017') return client
Query documents
async def fetch_documents(collection_name: str): """Fetch documents from MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name]
cursor = collection.find({'active': True})
documents = await cursor.to_list(length=100)
return documents
Insert document
async def insert_document(collection_name: str, document: dict): """Insert document into MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name]
result = await collection.insert_one(document)
return result.inserted_id
Update document
async def update_document(collection_name: str, filter_dict: dict, update_dict: dict): """Update document in MongoDB.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name]
result = await collection.update_one(
filter_dict,
{'$set': update_dict}
)
return result.modified_count
Async iteration over cursor
async def process_large_collection(collection_name: str): """Process large collection with cursor.""" client = AsyncIOMotorClient('mongodb://localhost:27017') db = client.mydb collection = db[collection_name]
cursor = collection.find({})
async for document in cursor:
await process_document(document)
Anti-Patterns to Avoid
- Not Awaiting Coroutines
BAD: Coroutine never executed
async def bad_example(): result = fetch_data() # RuntimeWarning: coroutine 'fetch_data' was never awaited return result
GOOD: Properly awaited
async def good_example(): result = await fetch_data() return result
- Mixing Sync and Async Incorrectly
BAD: Calling async function from sync code without event loop
def sync_function(): result = await async_function() # SyntaxError: 'await' outside async function return result
GOOD: Use asyncio.run() to bridge sync/async
def sync_function(): result = asyncio.run(async_function()) return result
BAD: Creating new event loop in async context
async def bad_nested(): result = asyncio.run(another_async()) # RuntimeError: asyncio.run() cannot be called from a running event loop return result
GOOD: Just await in async context
async def good_nested(): result = await another_async() return result
- Creating Event Loops Manually
BAD: Manual loop management (unless necessary)
def bad_approach(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete(main()) finally: loop.close() return result
GOOD: Use asyncio.run()
def good_approach(): result = asyncio.run(main()) return result
EXCEPTION: Multiple event loops needed (advanced use case)
Only create manual loops when you specifically need multiple loops
- Blocking Operations in Async Functions
BAD: Blocking the event loop
async def bad_blocking(): time.sleep(5) # Blocks entire event loop! data = requests.get(url).json() # Blocks event loop! with open('file.txt') as f: content = f.read() # Blocks event loop! result = compute_fibonacci(1000000) # Blocks event loop! return result
GOOD: Use async alternatives
async def good_async(): await asyncio.sleep(5) # Non-blocking async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Non-blocking async with aiofiles.open('file.txt') as f: content = await f.read() # Non-blocking result = await asyncio.to_thread(compute_fibonacci, 1000000) # Non-blocking return result
- Improper Exception Handling
BAD: Bare except swallows CancelledError
async def bad_exception_handling(): try: await some_operation() except: # Catches CancelledError, preventing proper cancellation! print("Error occurred")
GOOD: Specific exception handling
async def good_exception_handling(): try: await some_operation() except asyncio.CancelledError: # Handle cancellation specifically await cleanup() raise # Re-raise to propagate except Exception as e: # Handle other exceptions print(f"Error: {e}") raise
BAD: Silencing errors in gather()
async def bad_gather(): results = await asyncio.gather( task1(), task2(), task3(), return_exceptions=True ) return results # Might contain exceptions that are ignored
GOOD: Check for exceptions
async def good_gather(): results = await asyncio.gather( task1(), task2(), task3(), return_exceptions=True )
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
# Handle or raise as appropriate
return [r for r in results if not isinstance(r, Exception)]
6. Resource Leaks
BAD: Not closing resources
async def bad_resource_management(): session = aiohttp.ClientSession() response = await session.get(url) data = await response.json() # Session and response never closed! return data
GOOD: Use context managers
async def good_resource_management(): async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() # Automatically closed return data
BAD: Creating session per request
async def bad_session_usage(): for url in urls: async with aiohttp.ClientSession() as session: # Creating new session each iteration is wasteful! await session.get(url)
GOOD: Reuse session
async def good_session_usage(): async with aiohttp.ClientSession() as session: for url in urls: # Reuse same session await session.get(url)
- Not Using Task Groups for Error Propagation (Python 3.11+)
BAD: Errors might be swallowed
async def bad_error_propagation(): tasks = [ asyncio.create_task(task1()), asyncio.create_task(task2()), asyncio.create_task(task3()) ] # If task2 fails, task1 and task3 keep running await asyncio.gather(*tasks)
GOOD: Use TaskGroup for automatic cancellation
async def good_error_propagation(): async with asyncio.TaskGroup() as tg: tg.create_task(task1()) tg.create_task(task2()) tg.create_task(task3()) # If any task fails, all tasks are cancelled
Framework-Specific Patterns
- FastAPI Async Endpoints
from fastapi import FastAPI, HTTPException, Depends from typing import List import asyncpg
app = FastAPI()
Async endpoint
@app.get("/users/{user_id}") async def get_user(user_id: int, db: asyncpg.Pool = Depends(get_db_pool)): """Async endpoint that queries database.""" async with db.acquire() as connection: user = await connection.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) if not user: raise HTTPException(status_code=404, detail="User not found") return dict(user)
Async endpoint with external API call
@app.get("/external-data") async def fetch_external_data(): """Fetch data from external API.""" async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as response: return await response.json()
Background tasks (async)
from fastapi import BackgroundTasks
@app.post("/send-notification") async def send_notification( email: str, background_tasks: BackgroundTasks ): """Send notification in background.""" background_tasks.add_task(send_email_async, email) return {"message": "Notification queued"}
async def send_email_async(email: str): """Send email asynchronously.""" async with aiohttp.ClientSession() as session: await session.post( 'https://email-service.com/send', json={'to': email} )
Dependency injection with async
async def get_db_pool() -> asyncpg.Pool: """Dependency that provides database pool.""" pool = await asyncpg.create_pool(...) try: yield pool finally: await pool.close()
Lifespan events
from contextlib import asynccontextmanager
@asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown events.""" # Startup app.state.db_pool = await asyncpg.create_pool(...) app.state.http_session = aiohttp.ClientSession() yield # Shutdown await app.state.db_pool.close() await app.state.http_session.close()
app = FastAPI(lifespan=lifespan)
- Django Async Views (Django 3.1+)
from django.http import JsonResponse from django.views import View from asgiref.sync import sync_to_async import aiohttp
Async function-based view
async def async_user_list(request): """Async view for listing users.""" # Use sync_to_async for Django ORM users = await sync_to_async(list)( User.objects.filter(active=True) ) return JsonResponse({ 'users': [ {'id': u.id, 'name': u.name} for u in users ] })
Async class-based view
class AsyncUserDetailView(View): """Async class-based view."""
async def get(self, request, user_id):
"""Get user details asynchronously."""
# Wrap ORM calls with sync_to_async
user = await sync_to_async(User.objects.get)(id=user_id)
return JsonResponse({
'id': user.id,
'name': user.name,
'email': user.email
})
Async view with external API
async def fetch_external_data_view(request): """Fetch data from external API.""" async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as response: data = await response.json() return JsonResponse(data)
Django ORM with async (Django 4.1+)
async def async_orm_view(request): """Use async ORM operations.""" # Django 4.1+ supports async ORM users = [user async for user in User.objects.filter(active=True)] count = await User.objects.filter(active=True).acount()
return JsonResponse({
'count': count,
'users': [{'id': u.id, 'name': u.name} for u in users]
})
Middleware (async)
class AsyncMiddleware: """Async middleware example."""
async def __call__(self, request):
# Pre-processing
await log_request_async(request)
# Call next middleware/view
response = await self.get_response(request)
# Post-processing
await log_response_async(response)
return response
3. Async Database Operations
asyncpg with connection pool
class AsyncPostgresService: """Service for async PostgreSQL operations."""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def get_user(self, user_id: int) -> dict | None:
"""Get user by ID."""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)
return dict(row) if row else None
async def create_user(self, name: str, email: str) -> int:
"""Create new user."""
async with self.pool.acquire() as conn:
user_id = await conn.fetchval(
'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id',
name,
email
)
return user_id
async def update_user(self, user_id: int, **kwargs) -> bool:
"""Update user fields."""
if not kwargs:
return False
set_clause = ', '.join(f"{k} = ${i+2}" for i, k in enumerate(kwargs.keys()))
query = f'UPDATE users SET {set_clause} WHERE id = $1'
async with self.pool.acquire() as conn:
result = await conn.execute(query, user_id, *kwargs.values())
return result != 'UPDATE 0'
async def batch_insert_users(self, users: list[tuple[str, str]]) -> int:
"""Batch insert users."""
async with self.pool.acquire() as conn:
result = await conn.executemany(
'INSERT INTO users (name, email) VALUES ($1, $2)',
users
)
return len(users)
Motor (MongoDB) service
class AsyncMongoService: """Service for async MongoDB operations."""
def __init__(self, client: AsyncIOMotorClient):
self.db = client.mydb
async def get_document(self, collection: str, doc_id: str) -> dict | None:
"""Get document by ID."""
coll = self.db[collection]
return await coll.find_one({'_id': doc_id})
async def insert_document(self, collection: str, document: dict) -> str:
"""Insert document."""
coll = self.db[collection]
result = await coll.insert_one(document)
return str(result.inserted_id)
async def find_documents(
self,
collection: str,
filter_dict: dict,
limit: int = 100
) -> list[dict]:
"""Find documents matching filter."""
coll = self.db[collection]
cursor = coll.find(filter_dict).limit(limit)
return await cursor.to_list(length=limit)
async def aggregate(self, collection: str, pipeline: list[dict]) -> list[dict]:
"""Run aggregation pipeline."""
coll = self.db[collection]
cursor = coll.aggregate(pipeline)
return await cursor.to_list(length=None)
Performance Considerations
- When Async Provides Benefits (I/O-Bound)
Async is beneficial for I/O-bound operations:
GOOD: I/O-bound operations benefit from async
async def io_bound_operations(): """These operations benefit from async."""
# Network requests
async with aiohttp.ClientSession() as session:
responses = await asyncio.gather(
session.get('https://api1.example.com'),
session.get('https://api2.example.com'),
session.get('https://api3.example.com')
)
# Database queries
async with pool.acquire() as conn:
users, posts, comments = await asyncio.gather(
conn.fetch('SELECT * FROM users'),
conn.fetch('SELECT * FROM posts'),
conn.fetch('SELECT * FROM comments')
)
# File I/O
async with aiofiles.open('large_file.txt') as f:
content = await f.read()
return responses, users, posts, comments
Example: 100 HTTP requests
async def fetch_many_urls(urls: list[str]): """Async is much faster for concurrent I/O.""" async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in urls] responses = await asyncio.gather(*tasks) # With async: ~2 seconds # With sync requests: ~200 seconds (2s per request × 100)
- When Async Doesn't Help (CPU-Bound)
Async provides no benefit for CPU-bound operations:
BAD: CPU-bound operations don't benefit from async
async def cpu_bound_async(): """This gains nothing from being async.""" result = compute_fibonacci(1000000) # Blocks event loop! return result
GOOD: Use asyncio.to_thread for CPU-bound work
async def cpu_bound_with_threads(): """Offload CPU-bound work to threads.""" result = await asyncio.to_thread(compute_fibonacci, 1000000) return result
BETTER: Use ProcessPoolExecutor for true parallelism
from concurrent.futures import ProcessPoolExecutor
async def cpu_bound_with_processes(): """Use processes for CPU-bound parallelism.""" loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, compute_fibonacci, 1000000 ) return result
Example: Multiple CPU-bound tasks
async def parallel_cpu_tasks(numbers: list[int]): """Run CPU-bound tasks in parallel using processes.""" loop = asyncio.get_running_loop() with ProcessPoolExecutor() as pool: results = await asyncio.gather(*[ loop.run_in_executor(pool, compute_fibonacci, n) for n in numbers ]) return results
- Measuring Async Performance
import time import asyncio from functools import wraps
Decorator to measure async function performance
def async_timer(func): """Measure execution time of async function.""" @wraps(func) async def wrapper(*args, **kwargs): start = time.perf_counter() result = await func(*args, **kwargs) end = time.perf_counter() print(f"{func.name} took {end - start:.2f} seconds") return result return wrapper
Usage
@async_timer async def fetch_data(url: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()
Context manager for timing blocks
from contextlib import asynccontextmanager
@asynccontextmanager async def timer(name: str): """Time a block of async code.""" start = time.perf_counter() yield end = time.perf_counter() print(f"{name} took {end - start:.2f} seconds")
Usage
async def timed_operations(): async with timer("Database queries"): users = await fetch_users() posts = await fetch_posts()
async with timer("External API calls"):
data = await fetch_external_data()
Compare sync vs async performance
async def compare_performance(): """Compare sync and async performance.""" urls = ['https://example.com'] * 100
# Sync version (for comparison)
start = time.perf_counter()
sync_results = [requests.get(url).json() for url in urls]
sync_time = time.perf_counter() - start
print(f"Sync: {sync_time:.2f}s")
# Async version
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
async_results = await asyncio.gather(*tasks)
async_time = time.perf_counter() - start
print(f"Async: {async_time:.2f}s")
print(f"Speedup: {sync_time / async_time:.1f}x")
Code Review Checklist
Use this checklist when reviewing async Python code:
Syntax and Structure
-
All async functions defined with async def
-
All coroutines properly awaited
-
asyncio.run() used for top-level entry point (Python 3.7+)
-
No manual event loop creation (unless necessary)
-
Async context managers used with async with
-
Async iterators used with async for
Concurrency Patterns
-
asyncio.gather() used for concurrent operations
-
asyncio.create_task() used for fire-and-forget tasks
-
Task groups used for error propagation (Python 3.11+)
-
Semaphores used for rate limiting
-
Proper task cancellation handling
Event Loop
-
No blocking operations in async functions
-
asyncio.to_thread() used for blocking I/O (Python 3.9+)
-
No time.sleep() (use asyncio.sleep() )
-
No sync HTTP libraries (use aiohttp )
-
No sync file operations (use aiofiles )
Exception Handling
-
Specific exception types caught
-
asyncio.CancelledError handled and re-raised
-
return_exceptions=True used appropriately in gather()
-
Exceptions in tasks not silently ignored
-
Exception groups used for multiple task errors (Python 3.11+)
Resource Management
-
Async context managers used for resources
-
Resources properly closed in finally blocks
-
No session/connection leaks
-
Sessions reused across multiple requests
-
Connection pools used for databases
Timeouts and Cancellation
-
Timeouts set for external operations
-
asyncio.wait_for() or asyncio.timeout() used
-
Tasks can be cancelled gracefully
-
Critical sections shielded from cancellation if needed
-
Cleanup performed on timeout/cancellation
Library Usage
-
aiohttp used instead of requests
-
aiofiles used for async file I/O
-
Async database drivers used (asyncpg , motor )
-
Library-specific best practices followed
Framework Integration
-
FastAPI async endpoints defined correctly
-
Django ORM calls wrapped with sync_to_async (Django < 4.1)
-
Async ORM operations used (Django 4.1+)
-
Dependency injection works with async
Performance
-
Async used for I/O-bound operations
-
CPU-bound work offloaded to threads/processes
-
Unnecessary async overhead avoided
-
Appropriate concurrency limits set
Type Hints
-
Async functions return type hints
-
Coroutine types annotated
-
Awaitable , Coroutine types used where appropriate
Related Skills
-
python-testing: For testing async code with pytest-asyncio
-
python-type-hints: For properly typing async functions and coroutines
-
python-error-handling: For exception handling patterns
-
fastapi-best-practices: For FastAPI-specific async patterns
-
django-best-practices: For Django async views and ORM
References
-
PEP 492 - Coroutines with async and await syntax
-
PEP 3156 - Asynchronous IO Support Rebooted: the "asyncio" Module
-
Python asyncio Documentation
-
aiohttp Documentation
-
Real Python - Async IO in Python
-
FastAPI Async Documentation
-
Django Async Documentation
Version: 1.0 Last Updated: 2025-12-24 Maintainer: Claude Code Skills