Asyncio Advanced Patterns ()
Modern Python asyncio patterns using structured concurrency, TaskGroup, and Python 3.11+ features.
Overview
-
Implementing concurrent HTTP requests or database queries
-
Building async services with proper cancellation handling
-
Managing multiple concurrent tasks with error propagation
-
Rate limiting async operations with semaphores
-
Bridging sync code to async contexts
Quick Reference
TaskGroup (Replaces gather)
import asyncio
async def fetch_user_data(user_id: str) -> dict: """Fetch user data concurrently - all tasks complete or all cancelled.""" async with asyncio.TaskGroup() as tg: user_task = tg.create_task(fetch_user(user_id)) orders_task = tg.create_task(fetch_orders(user_id)) preferences_task = tg.create_task(fetch_preferences(user_id))
# All tasks guaranteed complete here
return {
"user": user_task.result(),
"orders": orders_task.result(),
"preferences": preferences_task.result(),
}
TaskGroup with Timeout
async def fetch_with_timeout(urls: list[str], timeout_sec: float = 30) -> list[dict]: """Fetch all URLs with overall timeout - structured concurrency.""" results = []
async with asyncio.timeout(timeout_sec):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]
Semaphore for Concurrency Limiting
class RateLimitedClient: """HTTP client with concurrency limiting."""
def __init__(self, max_concurrent: int = 10):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._session: aiohttp.ClientSession | None = None
async def fetch(self, url: str) -> dict:
async with self._semaphore: # Limit concurrent requests
async with self._session.get(url) as response:
return await response.json()
async def fetch_many(self, urls: list[str]) -> list[dict]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(self.fetch(url)) for url in urls]
return [t.result() for t in tasks]
Exception Group Handling
async def process_batch(items: list[dict]) -> tuple[list[dict], list[Exception]]: """Process batch, collecting both successes and failures.""" results = [] errors = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_item(item)) for item in items]
except* ValueError as eg:
# Handle specific exception types from ExceptionGroup
errors.extend(eg.exceptions)
except* Exception as eg:
errors.extend(eg.exceptions)
else:
results = [t.result() for t in tasks]
return results, errors
Sync-to-Async Bridge
import asyncio from concurrent.futures import ThreadPoolExecutor
For CPU-bound or blocking sync code
async def run_blocking_operation(data: bytes) -> dict: """Run blocking sync code in thread pool.""" return await asyncio.to_thread(cpu_intensive_parse, data)
For sync code that needs async context
def sync_caller(): """Call async code from sync context (not in existing loop).""" return asyncio.run(async_main())
For sync code within existing async context
async def wrapper_for_sync_lib(): """Bridge sync library to async - use with care.""" loop = asyncio.get_running_loop() with ThreadPoolExecutor() as pool: result = await loop.run_in_executor(pool, sync_blocking_call) return result
Cancellation Handling
async def cancellable_operation(resource_id: str) -> dict: """Properly handle cancellation - NEVER swallow CancelledError.""" resource = await acquire_resource(resource_id) try: return await process_resource(resource) except asyncio.CancelledError: # Clean up but RE-RAISE - this is critical! await cleanup_resource(resource) raise # ALWAYS re-raise CancelledError finally: await release_resource(resource)
Key Decisions
Decision Recommendation Rationale
Task spawning TaskGroup not gather()
Structured concurrency, auto-cancellation
Timeouts asyncio.timeout() context manager Composable, cancels on exit
Concurrency limit asyncio.Semaphore
Prevents resource exhaustion
Sync bridge asyncio.to_thread()
Clean API, manages thread pool
Exception handling except* with ExceptionGroup Handle multiple failures properly
Cancellation Always re-raise CancelledError
Breaking this breaks TaskGroup/timeout
Anti-Patterns (FORBIDDEN)
NEVER use gather() for new code - no structured concurrency
results = await asyncio.gather(task1(), task2()) # LEGACY
NEVER swallow CancelledError - breaks structured concurrency
except asyncio.CancelledError: return None # BREAKS TaskGroup and timeout!
NEVER use create_task() without TaskGroup - tasks leak
asyncio.create_task(background_work()) # Fire and forget = leaked task
NEVER yield inside async context managers (PEP 789)
async with asyncio.timeout(10): yield item # DANGEROUS - cancellation bugs!
NEVER use asyncio.run() inside existing event loop
async def handler(): asyncio.run(other_async()) # CRASHES - loop already running
NEVER block the event loop with sync calls
async def bad_handler(): time.sleep(1) # BLOCKS ALL TASKS requests.get(url) # BLOCKS ALL TASKS
Related Skills
-
sqlalchemy-2-async
-
Async database sessions with SQLAlchemy 2.0
-
fastapi-advanced
-
Async FastAPI patterns
-
background-jobs
-
Celery/ARQ for heavy async work
-
streaming-api-patterns
-
SSE/WebSocket async patterns
Capability Details
taskgroup-patterns
Keywords: taskgroup, structured concurrency, concurrent tasks, parallel execution Solves:
-
How do I run multiple async tasks concurrently?
-
Replace asyncio.gather with TaskGroup
-
Handle exceptions from multiple tasks
timeout-patterns
Keywords: timeout, asyncio.timeout, cancel, deadline Solves:
-
How do I add timeouts to async operations?
-
Timeout multiple concurrent operations
-
Cancel tasks after deadline
semaphore-limiting
Keywords: semaphore, rate limit, concurrency limit, throttle Solves:
-
How do I limit concurrent async operations?
-
Rate limit HTTP requests
-
Prevent connection pool exhaustion
exception-groups
Keywords: ExceptionGroup, except*, multiple exceptions, error handling Solves:
-
How do I handle multiple task failures?
-
Collect errors from concurrent operations
-
Python 3.11+ exception group patterns
sync-async-bridge
Keywords: to_thread, run_in_executor, sync to async, blocking code Solves:
-
How do I call sync code from async?
-
Run CPU-bound code without blocking
-
Bridge sync libraries to async context