asyncio

Python asyncio - Async/Await Concurrency

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "asyncio" with this command: npx skills add bobmatnyc/claude-mpm-skills/bobmatnyc-claude-mpm-skills-asyncio

Python asyncio - Async/Await Concurrency

Overview

Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.

Key Features:

  • async/await syntax for readable concurrent code

  • Event loop for managing concurrent operations

  • Tasks for running multiple coroutines concurrently

  • Primitives: locks, semaphores, events, queues

  • HTTP client/server with aiohttp

  • Database async support (asyncpg, aiomysql, motor)

  • FastAPI async endpoints

  • WebSocket support

  • Background task management

Installation:

asyncio is built-in (Python 3.7+)

Async HTTP client

pip install aiohttp

Async HTTP requests (alternative)

pip install httpx

Async database drivers

pip install asyncpg aiomysql motor # PostgreSQL, MySQL, MongoDB

FastAPI with async support

pip install fastapi uvicorn[standard]

Async testing

pip install pytest-asyncio

Basic Async/Await Patterns

  1. Simple Async Function

import asyncio

async def hello(): """Basic async function (coroutine).""" print("Hello") await asyncio.sleep(1) # Async sleep (non-blocking) print("World") return "Done"

Run async function

result = asyncio.run(hello()) print(result) # "Done"

Key Points:

  • async def defines a coroutine function

  • await suspends execution until awaitable completes

  • asyncio.run() is the entry point for async programs

  • Coroutines must be awaited or scheduled

  1. Multiple Concurrent Tasks

import asyncio import time

async def task(name, duration): """Simulate async task.""" print(f"{name}: Starting (duration: {duration}s)") await asyncio.sleep(duration) print(f"{name}: Complete") return f"{name} result"

async def run_concurrent(): """Run multiple tasks concurrently.""" start = time.time()

# Sequential (slow) - 6 seconds total
# result1 = await task("Task 1", 3)
# result2 = await task("Task 2", 2)
# result3 = await task("Task 3", 1)

# Concurrent (fast) - 3 seconds total
results = await asyncio.gather(
    task("Task 1", 3),
    task("Task 2", 2),
    task("Task 3", 1)
)

elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
print(f"Results: {results}")

asyncio.run(run_concurrent())

Output: Total time: 3.00s (tasks ran concurrently)

  1. Task Creation and Management

import asyncio

async def background_task(name): """Long-running background task.""" for i in range(5): print(f"{name}: iteration {i}") await asyncio.sleep(1) return f"{name} complete"

async def main(): # Create task (starts immediately) task1 = asyncio.create_task(background_task("Task-1")) task2 = asyncio.create_task(background_task("Task-2"))

# Do other work while tasks run
print("Main: doing other work")
await asyncio.sleep(2)

# Wait for tasks to complete
result1 = await task1
result2 = await task2

print(f"Results: {result1}, {result2}")

asyncio.run(main())

  1. Error Handling in Async Code

import asyncio

async def risky_operation(fail=False): """Operation that might fail.""" await asyncio.sleep(1) if fail: raise ValueError("Operation failed") return "Success"

async def handle_errors(): # Individual try/except try: result = await risky_operation(fail=True) except ValueError as e: print(f"Caught error: {e}") result = "Fallback value"

# Gather with error handling
results = await asyncio.gather(
    risky_operation(fail=False),
    risky_operation(fail=True),
    risky_operation(fail=False),
    return_exceptions=True  # Return exceptions instead of raising
)

for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Task {i} failed: {result}")
    else:
        print(f"Task {i} succeeded: {result}")

asyncio.run(handle_errors())

Event Loop Fundamentals

  1. Event Loop Lifecycle

import asyncio

Modern approach (Python 3.7+)

async def main(): print("Main coroutine") await asyncio.sleep(1)

asyncio.run(main()) # Creates loop, runs main, closes loop

Manual loop management (advanced use cases)

async def manual_example(): loop = asyncio.get_event_loop()

# Schedule coroutine
task = loop.create_task(some_coroutine())

# Schedule callback
loop.call_later(5, callback_function)

# Run until complete
result = await task

return result

Get current event loop

async def get_current_loop(): loop = asyncio.get_running_loop() print(f"Loop: {loop}")

# Schedule callback in event loop
loop.call_soon(lambda: print("Callback executed"))

await asyncio.sleep(0)  # Let callback execute

2. Loop Scheduling and Callbacks

import asyncio from datetime import datetime

def callback(name, loop): """Callback function (not async).""" print(f"{datetime.now()}: {name} callback executed")

# Stop loop after callback
# loop.stop()

async def schedule_callbacks(): loop = asyncio.get_running_loop()

# Schedule immediate callback
loop.call_soon(callback, "Immediate", loop)

# Schedule callback after delay
loop.call_later(2, callback, "Delayed 2s", loop)

# Schedule callback at specific time
loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)

# Wait for callbacks to execute
await asyncio.sleep(5)

asyncio.run(schedule_callbacks())

  1. Running Blocking Code

import asyncio import time

def blocking_io(): """CPU-intensive or blocking I/O operation.""" print("Blocking operation started") time.sleep(2) # Blocks thread print("Blocking operation complete") return "Blocking result"

async def run_in_executor(): """Run blocking code in thread pool.""" loop = asyncio.get_running_loop()

# Run in default executor (thread pool)
result = await loop.run_in_executor(
    None,  # Use default executor
    blocking_io
)

print(f"Result: {result}")

Run blocking operations concurrently

async def concurrent_blocking(): loop = asyncio.get_running_loop()

# These run in thread pool, don't block event loop
results = await asyncio.gather(
    loop.run_in_executor(None, blocking_io),
    loop.run_in_executor(None, blocking_io),
    loop.run_in_executor(None, blocking_io)
)

print(f"All results: {results}")

asyncio.run(concurrent_blocking())

Asyncio Primitives

  1. Locks for Mutual Exclusion

import asyncio

Shared resource

counter = 0 lock = asyncio.Lock()

async def increment_with_lock(name): """Increment counter with lock protection.""" global counter

async with lock:
    # Critical section - only one task at a time
    print(f"{name}: acquired lock")
    current = counter
    await asyncio.sleep(0.1)  # Simulate processing
    counter = current + 1
    print(f"{name}: released lock, counter={counter}")

async def increment_without_lock(name): """Increment without lock - race condition!""" global counter

current = counter
await asyncio.sleep(0.1)  # Race condition window
counter = current + 1
print(f"{name}: counter={counter}")

async def test_locks(): global counter

# Without lock (race condition)
counter = 0
await asyncio.gather(
    increment_without_lock("Task-1"),
    increment_without_lock("Task-2"),
    increment_without_lock("Task-3")
)
print(f"Without lock: {counter}")  # Often wrong (< 3)

# With lock (correct)
counter = 0
await asyncio.gather(
    increment_with_lock("Task-1"),
    increment_with_lock("Task-2"),
    increment_with_lock("Task-3")
)
print(f"With lock: {counter}")  # Always 3

asyncio.run(test_locks())

  1. Semaphores for Resource Limiting

import asyncio

Limit concurrent operations

semaphore = asyncio.Semaphore(2) # Max 2 concurrent

async def limited_operation(name): """Operation limited by semaphore.""" print(f"{name}: waiting for semaphore")

async with semaphore:
    print(f"{name}: acquired semaphore")
    await asyncio.sleep(2)  # Simulate work
    print(f"{name}: releasing semaphore")

async def test_semaphore(): # Create 5 tasks, but only 2 run concurrently await asyncio.gather( limited_operation("Task-1"), limited_operation("Task-2"), limited_operation("Task-3"), limited_operation("Task-4"), limited_operation("Task-5") )

asyncio.run(test_semaphore())

Only 2 tasks hold semaphore at any time

  1. Events for Signaling

import asyncio

event = asyncio.Event()

async def waiter(name): """Wait for event to be set.""" print(f"{name}: waiting for event") await event.wait() # Block until event is set print(f"{name}: event received!")

async def setter(): """Set event after delay.""" await asyncio.sleep(2) print("Setter: setting event") event.set() # Wake up all waiters

async def test_event(): # Create waiters await asyncio.gather( waiter("Waiter-1"), waiter("Waiter-2"), waiter("Waiter-3"), setter() )

asyncio.run(test_event())

  1. Queues for Task Distribution

import asyncio import random

async def producer(queue, name): """Produce items and add to queue.""" for i in range(5): item = f"{name}-item-{i}" await queue.put(item) print(f"{name}: produced {item}") await asyncio.sleep(random.uniform(0.1, 0.5))

# Signal completion
await queue.put(None)

async def consumer(queue, name): """Consume items from queue.""" while True: item = await queue.get() # Block until item available

    if item is None:  # Shutdown signal
        queue.task_done()
        break

    print(f"{name}: consumed {item}")
    await asyncio.sleep(random.uniform(0.2, 0.8))
    queue.task_done()

async def test_queue(): queue = asyncio.Queue(maxsize=10)

# Create producers and consumers
await asyncio.gather(
    producer(queue, "Producer-1"),
    producer(queue, "Producer-2"),
    consumer(queue, "Consumer-1"),
    consumer(queue, "Consumer-2"),
    consumer(queue, "Consumer-3")
)

# Wait for all items to be processed
await queue.join()
print("All tasks complete")

asyncio.run(test_queue())

  1. Condition Variables

import asyncio

condition = asyncio.Condition() items = []

async def consumer(name): """Wait for items to be available.""" async with condition: # Wait until items are available await condition.wait_for(lambda: len(items) > 0)

    item = items.pop(0)
    print(f"{name}: consumed {item}")

async def producer(name): """Add items and notify consumers.""" async with condition: item = f"{name}-item" items.append(item) print(f"{name}: produced {item}")

    # Notify one waiting consumer
    condition.notify(n=1)
    # Or notify all: condition.notify_all()

async def test_condition(): await asyncio.gather( consumer("Consumer-1"), consumer("Consumer-2"), producer("Producer-1"), producer("Producer-2") )

asyncio.run(test_condition())

Async HTTP with aiohttp

  1. Basic HTTP Client

import asyncio import aiohttp

async def fetch_url(session, url): """Fetch single URL.""" async with session.get(url) as response: status = response.status text = await response.text() return {"url": url, "status": status, "length": len(text)}

async def fetch_multiple_urls(): """Fetch multiple URLs concurrently.""" urls = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/json", ]

async with aiohttp.ClientSession() as session:
    # Concurrent requests
    tasks = [fetch_url(session, url) for url in urls]
    results = await asyncio.gather(*tasks)

    for result in results:
        print(f"{result['url']}: {result['status']} ({result['length']} bytes)")

asyncio.run(fetch_multiple_urls())

  1. HTTP Client with Error Handling

import asyncio import aiohttp from typing import Dict, Any

async def fetch_with_retry( session: aiohttp.ClientSession, url: str, max_retries: int = 3 ) -> Dict[str, Any]: """Fetch URL with retry logic.""" for attempt in range(max_retries): try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: response.raise_for_status() # Raise for 4xx/5xx data = await response.json() return {"success": True, "data": data}

    except aiohttp.ClientError as e:
        print(f"Attempt {attempt + 1} failed: {e}")
        if attempt == max_retries - 1:
            return {"success": False, "error": str(e)}

        # Exponential backoff
        await asyncio.sleep(2 ** attempt)

    except asyncio.TimeoutError:
        print(f"Attempt {attempt + 1} timed out")
        if attempt == max_retries - 1:
            return {"success": False, "error": "Timeout"}

        await asyncio.sleep(2 ** attempt)

async def parallel_api_calls(): """Make parallel API calls with error handling.""" urls = [ "https://httpbin.org/json", "https://httpbin.org/status/500", # Will fail "https://httpbin.org/delay/1", ]

async with aiohttp.ClientSession() as session:
    results = await asyncio.gather(
        *[fetch_with_retry(session, url) for url in urls],
        return_exceptions=True  # Don't stop on errors
    )

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"{url}: Exception - {result}")
        elif result["success"]:
            print(f"{url}: Success")
        else:
            print(f"{url}: Failed - {result['error']}")

asyncio.run(parallel_api_calls())

  1. HTTP Server with aiohttp

from aiohttp import web import asyncio

async def handle_hello(request): """Simple GET handler.""" name = request.query.get("name", "World") return web.json_response({"message": f"Hello, {name}!"})

async def handle_post(request): """POST handler with JSON body.""" data = await request.json()

# Simulate async processing
await asyncio.sleep(1)

return web.json_response({
    "received": data,
    "status": "processed"
})

async def handle_stream(request): """Streaming response.""" response = web.StreamResponse() await response.prepare(request)

for i in range(10):
    await response.write(f"Chunk {i}\n".encode())
    await asyncio.sleep(0.5)

await response.write_eof()
return response

Create application

app = web.Application() app.router.add_get("/hello", handle_hello) app.router.add_post("/process", handle_post) app.router.add_get("/stream", handle_stream)

Run server

if name == "main": web.run_app(app, host="0.0.0.0", port=8080)

  1. WebSocket Client

import asyncio import aiohttp

async def websocket_client(): """Connect to WebSocket server.""" url = "wss://echo.websocket.org"

async with aiohttp.ClientSession() as session:
    async with session.ws_connect(url) as ws:
        # Send messages
        await ws.send_str("Hello WebSocket")
        await ws.send_json({"type": "greeting", "data": "test"})

        # Receive messages
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                print(f"Received: {msg.data}")

                if msg.data == "close":
                    await ws.close()
                    break

            elif msg.type == aiohttp.WSMsgType.ERROR:
                print(f"Error: {ws.exception()}")
                break

asyncio.run(websocket_client())

Async Database Operations

  1. PostgreSQL with asyncpg

import asyncio import asyncpg

async def database_operations(): """Async PostgreSQL operations.""" # Create connection pool pool = await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password", min_size=5, max_size=20 )

try:
    # Acquire connection from pool
    async with pool.acquire() as conn:
        # Execute query
        rows = await conn.fetch(
            "SELECT id, name, email FROM users WHERE active = $1",
            True
        )

        for row in rows:
            print(f"User: {row['name']} ({row['email']})")

        # Insert data
        await conn.execute(
            "INSERT INTO users (name, email) VALUES ($1, $2)",
            "Alice", "alice@example.com"
        )

        # Transaction
        async with conn.transaction():
            await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
            await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")

finally:
    await pool.close()

asyncio.run(database_operations())

  1. MongoDB with motor

import asyncio from motor.motor_asyncio import AsyncIOMotorClient

async def mongodb_operations(): """Async MongoDB operations.""" # Create client client = AsyncIOMotorClient("mongodb://localhost:27017") db = client.mydb collection = db.users

try:
    # Insert document
    result = await collection.insert_one({
        "name": "Alice",
        "email": "alice@example.com",
        "age": 30
    })
    print(f"Inserted ID: {result.inserted_id}")

    # Find documents
    cursor = collection.find({"age": {"$gte": 25}})
    async for document in cursor:
        print(f"User: {document['name']}")

    # Update document
    await collection.update_one(
        {"name": "Alice"},
        {"$set": {"age": 31}}
    )

    # Aggregation pipeline
    pipeline = [
        {"$match": {"age": {"$gte": 25}}},
        {"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
    ]
    async for result in collection.aggregate(pipeline):
        print(f"Average age: {result['avg_age']}")

finally:
    client.close()

asyncio.run(mongodb_operations())

  1. Connection Pool Pattern

import asyncio import asyncpg from typing import Optional

class DatabasePool: """Async database connection pool manager."""

def __init__(self, dsn: str):
    self.dsn = dsn
    self.pool: Optional[asyncpg.Pool] = None

async def connect(self):
    """Create connection pool."""
    self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)

async def close(self):
    """Close connection pool."""
    if self.pool:
        await self.pool.close()

async def execute(self, query: str, *args):
    """Execute query."""
    async with self.pool.acquire() as conn:
        return await conn.execute(query, *args)

async def fetch(self, query: str, *args):
    """Fetch multiple rows."""
    async with self.pool.acquire() as conn:
        return await conn.fetch(query, *args)

async def fetchrow(self, query: str, *args):
    """Fetch single row."""
    async with self.pool.acquire() as conn:
        return await conn.fetchrow(query, *args)

Usage

async def use_pool(): db = DatabasePool("postgresql://user:pass@localhost/mydb") await db.connect()

try:
    # Execute operations
    rows = await db.fetch("SELECT * FROM users")
    for row in rows:
        print(row)
finally:
    await db.close()

asyncio.run(use_pool())

FastAPI Async Patterns

  1. Async Endpoints

from fastapi import FastAPI, HTTPException import asyncio import httpx

app = FastAPI()

@app.get("/") async def root(): """Simple async endpoint.""" return {"message": "Hello World"}

@app.get("/delay/{seconds}") async def delayed_response(seconds: int): """Endpoint with async delay.""" await asyncio.sleep(seconds) return {"message": f"Waited {seconds} seconds"}

@app.get("/fetch") async def fetch_external(): """Fetch data from external API.""" async with httpx.AsyncClient() as client: response = await client.get("https://httpbin.org/json") return response.json()

@app.get("/parallel") async def parallel_requests(): """Make parallel API calls.""" async with httpx.AsyncClient() as client: responses = await asyncio.gather( client.get("https://httpbin.org/delay/1"), client.get("https://httpbin.org/delay/2"), client.get("https://httpbin.org/json") )

    return {
        "results": [r.json() for r in responses]
    }

2. Background Tasks

from fastapi import FastAPI, BackgroundTasks import asyncio

app = FastAPI()

async def send_email(email: str, message: str): """Simulate sending email.""" print(f"Sending email to {email}") await asyncio.sleep(5) # Simulate slow email service print(f"Email sent to {email}: {message}")

@app.post("/send-notification") async def send_notification( email: str, message: str, background_tasks: BackgroundTasks ): """Send notification in background.""" # Add task to background background_tasks.add_task(send_email, email, message)

# Return immediately
return {"status": "notification queued"}

Alternative: manual task creation

@app.post("/send-notification-manual") async def send_notification_manual(email: str, message: str): """Create background task manually.""" asyncio.create_task(send_email(email, message)) return {"status": "notification queued"}

  1. Async Dependencies

from fastapi import FastAPI, Depends import asyncpg

app = FastAPI()

Database pool (global)

db_pool = None

async def get_db(): """Dependency: database connection.""" async with db_pool.acquire() as conn: yield conn

@app.on_event("startup") async def startup(): """Initialize database pool on startup.""" global db_pool db_pool = await asyncpg.create_pool( "postgresql://user:pass@localhost/mydb" )

@app.on_event("shutdown") async def shutdown(): """Close database pool on shutdown.""" await db_pool.close()

@app.get("/users/{user_id}") async def get_user(user_id: int, conn=Depends(get_db)): """Get user with async database dependency.""" user = await conn.fetchrow( "SELECT * FROM users WHERE id = $1", user_id )

if not user:
    raise HTTPException(status_code=404, detail="User not found")

return dict(user)

4. WebSocket Endpoints

from fastapi import FastAPI, WebSocket, WebSocketDisconnect from typing import List import asyncio

app = FastAPI()

Active connections

active_connections: List[WebSocket] = []

@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint.""" await websocket.accept() active_connections.append(websocket)

try:
    while True:
        # Receive message
        data = await websocket.receive_text()

        # Broadcast to all connections
        for connection in active_connections:
            await connection.send_text(f"Broadcast: {data}")

except WebSocketDisconnect:
    active_connections.remove(websocket)
    print("Client disconnected")

Background task to send periodic updates

async def broadcast_updates(): """Send periodic updates to all clients.""" while True: await asyncio.sleep(5)

    for connection in active_connections:
        try:
            await connection.send_text("Periodic update")
        except:
            active_connections.remove(connection)

@app.on_event("startup") async def startup(): """Start background update task.""" asyncio.create_task(broadcast_updates())

Common Patterns and Best Practices

  1. Timeout Handling

import asyncio

async def slow_operation(): """Slow operation.""" await asyncio.sleep(10) return "Result"

async def with_timeout(): """Run operation with timeout.""" try: result = await asyncio.wait_for(slow_operation(), timeout=5.0) print(f"Result: {result}")

except asyncio.TimeoutError:
    print("Operation timed out")

asyncio.run(with_timeout())

  1. Cancellation Handling

import asyncio

async def cancellable_task(): """Task that can be cancelled.""" try: for i in range(10): print(f"Working: {i}") await asyncio.sleep(1)

    return "Complete"

except asyncio.CancelledError:
    print("Task was cancelled")
    # Cleanup
    raise  # Re-raise to propagate cancellation

async def cancel_example(): """Example of task cancellation.""" task = asyncio.create_task(cancellable_task())

# Let it run for a bit
await asyncio.sleep(3)

# Cancel the task
task.cancel()

try:
    await task
except asyncio.CancelledError:
    print("Confirmed: task was cancelled")

asyncio.run(cancel_example())

  1. Resource Cleanup with Context Managers

import asyncio

class AsyncResource: """Async context manager for resource management."""

async def __aenter__(self):
    """Async setup."""
    print("Acquiring resource")
    await asyncio.sleep(1)  # Simulate async setup
    self.connection = "connected"
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async cleanup."""
    print("Releasing resource")
    await asyncio.sleep(1)  # Simulate async cleanup
    self.connection = None

async def use_resource(): """Use async resource.""" async with AsyncResource() as resource: print(f"Using resource: {resource.connection}") await asyncio.sleep(1) # Resource automatically cleaned up

asyncio.run(use_resource())

  1. Debouncing and Throttling

import asyncio from datetime import datetime

class Debouncer: """Debounce async function calls."""

def __init__(self, delay: float):
    self.delay = delay
    self.task = None

async def call(self, func, *args, **kwargs):
    """Debounced function call."""
    # Cancel previous task
    if self.task:
        self.task.cancel()

    # Create new task
    async def delayed_call():
        await asyncio.sleep(self.delay)
        await func(*args, **kwargs)

    self.task = asyncio.create_task(delayed_call())

async def api_call(query: str): """Simulated API call.""" print(f"{datetime.now()}: API call with query: {query}")

async def debounce_example(): """Example of debouncing.""" debouncer = Debouncer(delay=1.0)

# Rapid calls - only last one executes
await debouncer.call(api_call, "query1")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query2")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query3")

# Wait for debounced call
await asyncio.sleep(2)

asyncio.run(debounce_example())

Output: Only "query3" API call executes

  1. Rate Limiting

import asyncio from datetime import datetime

class RateLimiter: """Limit rate of async operations."""

def __init__(self, max_calls: int, period: float):
    self.max_calls = max_calls
    self.period = period
    self.semaphore = asyncio.Semaphore(max_calls)
    self.calls = []

async def __aenter__(self):
    """Acquire rate limit slot."""
    await self.semaphore.acquire()

    now = asyncio.get_event_loop().time()

    # Remove old calls outside period
    self.calls = [t for t in self.calls if now - t < self.period]

    if len(self.calls) >= self.max_calls:
        # Wait until oldest call expires
        sleep_time = self.period - (now - self.calls[0])
        if sleep_time > 0:
            await asyncio.sleep(sleep_time)

    self.calls.append(now)
    return self

async def __aexit__(self, *args):
    """Release semaphore."""
    self.semaphore.release()

async def rate_limited_operation(limiter, name): """Operation with rate limiting.""" async with limiter: print(f"{datetime.now()}: {name}") await asyncio.sleep(0.1)

async def rate_limit_example(): """Example of rate limiting.""" # Max 3 calls per 2 seconds limiter = RateLimiter(max_calls=3, period=2.0)

# Try to make 6 calls
await asyncio.gather(*[
    rate_limited_operation(limiter, f"Call-{i}")
    for i in range(6)
])

asyncio.run(rate_limit_example())

Debugging Async Code

  1. Enable Debug Mode

import asyncio import logging

Enable asyncio debug mode

asyncio.run(main(), debug=True)

Or set environment variable:

PYTHONASYNCIODEBUG=1 python script.py

Configure logging

logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(name)

async def debug_example(): logger.debug("Starting operation") await asyncio.sleep(1) logger.debug("Operation complete")

  1. Detect Blocking Code

import asyncio import time

async def problematic_code(): """Code with blocking operation.""" print("Starting")

# BAD: Blocking sleep
time.sleep(2)  # This blocks the event loop!

print("Complete")

Run with debug mode to detect blocking

asyncio.run(problematic_code(), debug=True)

Warning: Executing <Task> took 2.001 seconds

  1. Track Pending Tasks

import asyncio

async def track_tasks(): """Track all pending tasks.""" # Get all tasks tasks = asyncio.all_tasks()

print(f"Total tasks: {len(tasks)}")
for task in tasks:
    print(f"  - {task.get_name()}: {task}")

    # Check if task is done
    if task.done():
        try:
            result = task.result()
            print(f"    Result: {result}")
        except Exception as e:
            print(f"    Exception: {e}")

Create some tasks

async def main(): task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task") task2 = asyncio.create_task(track_tasks(), name="tracking")

await task2
task1.cancel()

asyncio.run(main())

Testing Async Code

  1. pytest-asyncio Setup

test_async.py

import pytest import asyncio

Mark test as async

@pytest.mark.asyncio async def test_async_function(): """Test async function.""" result = await some_async_function() assert result == "expected"

@pytest.mark.asyncio async def test_async_http(): """Test async HTTP client.""" async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: assert response.status == 200 data = await response.json() assert "slideshow" in data

Async fixture

@pytest.fixture async def async_client(): """Async test fixture.""" client = await create_async_client() yield client await client.close()

@pytest.mark.asyncio async def test_with_fixture(async_client): """Test using async fixture.""" result = await async_client.fetch_data() assert result is not None

  1. Mocking Async Functions

import pytest from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio async def test_with_mock(): """Test with async mock.""" # Create async mock mock_func = AsyncMock(return_value="mocked result")

result = await mock_func()
assert result == "mocked result"
mock_func.assert_called_once()

@pytest.mark.asyncio @patch("module.async_function", new_callable=AsyncMock) async def test_with_patch(mock_async): """Test with patched async function.""" mock_async.return_value = {"status": "success"}

result = await some_function_that_calls_async()

assert result["status"] == "success"
mock_async.assert_called_once()

Performance Optimization

  1. Use asyncio.gather() for Parallelism

import asyncio import time

async def slow_task(n): await asyncio.sleep(1) return n * 2

async def optimized(): """Parallel execution.""" start = time.time()

# Sequential (slow) - 5 seconds
# results = []
# for i in range(5):
#     result = await slow_task(i)
#     results.append(result)

# Parallel (fast) - 1 second
results = await asyncio.gather(*[slow_task(i) for i in range(5)])

elapsed = time.time() - start
print(f"Time: {elapsed:.2f}s, Results: {results}")

asyncio.run(optimized())

  1. Connection Pooling

import asyncio import aiohttp

BAD: Create new session for each request

async def bad_pattern(): for i in range(10): async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: await response.json()

GOOD: Reuse session with connection pool

async def good_pattern(): async with aiohttp.ClientSession() as session: tasks = [ session.get("https://httpbin.org/json") for i in range(10) ] responses = await asyncio.gather(*tasks) for response in responses: await response.json()

  1. Avoid Blocking Operations

import asyncio

BAD: Blocking I/O in async function

async def bad_file_read(): with open("large_file.txt") as f: # Blocks event loop! data = f.read() return data

GOOD: Use async file I/O or run in executor

async def good_file_read(): loop = asyncio.get_running_loop()

# Run blocking operation in thread pool
data = await loop.run_in_executor(
    None,
    lambda: open("large_file.txt").read()
)
return data

BETTER: Use aiofiles for async file I/O

import aiofiles

async def better_file_read(): async with aiofiles.open("large_file.txt") as f: data = await f.read() return data

Common Pitfalls

❌ Anti-Pattern 1: Not Awaiting Coroutines

WRONG

async def bad(): result = async_function() # Returns coroutine, doesn't execute! print(result) # Prints: <coroutine object>

CORRECT

async def good(): result = await async_function() # Actually executes print(result)

❌ Anti-Pattern 2: Blocking the Event Loop

WRONG

import time

async def bad(): time.sleep(5) # Blocks entire event loop!

CORRECT

async def good(): await asyncio.sleep(5) # Non-blocking

❌ Anti-Pattern 3: Not Handling Cancellation

WRONG

async def bad(): await asyncio.sleep(10) # No cleanup if cancelled

CORRECT

async def good(): try: await asyncio.sleep(10) except asyncio.CancelledError: # Cleanup resources await cleanup() raise # Re-raise to propagate

❌ Anti-Pattern 4: Creating Event Loop Incorrectly

WRONG (Python 3.7+)

loop = asyncio.get_event_loop() loop.run_until_complete(main())

CORRECT (Python 3.7+)

asyncio.run(main())

❌ Anti-Pattern 5: Not Closing Resources

WRONG

async def bad(): session = aiohttp.ClientSession() response = await session.get(url) # Session never closed - resource leak!

CORRECT

async def good(): async with aiohttp.ClientSession() as session: response = await session.get(url) # Session automatically closed

Best Practices

  • Use asyncio.run() for entry point (Python 3.7+)

  • Always await coroutines - don't forget await

  • Use async context managers for resource cleanup

  • Connection pooling for HTTP and database clients

  • Handle CancelledError for graceful shutdown

  • Use asyncio.gather() for parallel operations

  • Avoid blocking operations in async functions

  • Use timeouts to prevent hanging operations

  • Debug mode during development to catch issues

  • Test async code with pytest-asyncio

Quick Reference

Common Commands

Run async script

python script.py

Run with debug mode

PYTHONASYNCIODEBUG=1 python script.py

Run tests

pytest -v --asyncio-mode=auto

Install async dependencies

pip install aiohttp asyncpg motor pytest-asyncio

Essential Imports

import asyncio import aiohttp import asyncpg from typing import List, Dict, Any

Resources

Related Skills

When using asyncio, consider these complementary skills:

  • fastapi-local-dev: FastAPI async server patterns and production deployment

  • pytest: Testing async code with pytest-asyncio and fixtures

  • systematic-debugging: Debugging async race conditions and deadlocks

Quick FastAPI Async Patterns (Inlined for Standalone Use)

FastAPI async endpoint pattern

from fastapi import FastAPI, Depends from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker

app = FastAPI()

Async database setup

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db(): async with AsyncSessionLocal() as session: yield session

@app.get("/users/{user_id}") async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): # Async database query result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") return user

Background tasks with asyncio

@app.post("/send-email") async def send_email_endpoint(email: EmailSchema): # Non-blocking background task asyncio.create_task(send_email_async(email)) return {"status": "email queued"}

Quick pytest-asyncio Patterns (Inlined for Standalone Use)

Testing async functions with pytest

import pytest import pytest_asyncio from httpx import AsyncClient

Async test fixture

@pytest_asyncio.fixture async def async_client(): async with AsyncClient(app=app, base_url="http://test") as client: yield client

Async test function

@pytest.mark.asyncio async def test_get_user(async_client): response = await async_client.get("/users/1") assert response.status_code == 200 assert response.json()["id"] == 1

Testing concurrent operations

@pytest.mark.asyncio async def test_concurrent_requests(): async with AsyncClient(app=app, base_url="http://test") as client: # Run 10 requests concurrently responses = await asyncio.gather( *[client.get(f"/users/{i}") for i in range(1, 11)] ) assert all(r.status_code == 200 for r in responses)

Mock async dependencies

@pytest_asyncio.fixture async def mock_db(): # Setup mock database db = AsyncMock() yield db # Cleanup

Quick Async Debugging Reference (Inlined for Standalone Use)

Common Async Pitfalls:

Blocking the Event Loop

❌ WRONG: Blocking call in async function

async def bad_function(): time.sleep(5) # Blocks entire event loop! return "done"

✅ CORRECT: Use asyncio.sleep

async def good_function(): await asyncio.sleep(5) # Releases event loop return "done"

Debugging Race Conditions

Add logging to track execution order

import logging logging.basicConfig(level=logging.DEBUG)

async def debug_task(name): logging.debug(f"{name}: Starting") await asyncio.sleep(1) logging.debug(f"{name}: Finished") return name

Run with detailed tracing

asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)

Deadlock Detection

Use timeout to detect deadlocks

try: result = await asyncio.wait_for(some_async_function(), timeout=5.0) except asyncio.TimeoutError: logging.error("Deadlock detected: operation timed out") # Investigate what's blocking

Inspecting Running Tasks

Check all pending tasks

tasks = asyncio.all_tasks() for task in tasks: print(f"Task: {task.get_name()}, Done: {task.done()}") if not task.done(): print(f" Current coro: {task.get_coro()}")

[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]

Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

nodejs-backend-typescript

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

jest-typescript

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

github-actions

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

golang-cli-cobra-viper

No summary provided by upstream source.

Repository SourceNeeds Review