python-asyncio

Function Purpose Code

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "python-asyncio" with this command: npx skills add amrahman90/python-expert-agent/amrahman90-python-expert-agent-python-asyncio

Quick Reference

Function Purpose Code

asyncio.run()

Entry point asyncio.run(main())

asyncio.gather()

Concurrent tasks await asyncio.gather(*tasks)

asyncio.create_task()

Fire-and-await task = asyncio.create_task(coro)

asyncio.TaskGroup()

Structured concurrency async with asyncio.TaskGroup() as tg:

asyncio.Semaphore()

Rate limiting async with semaphore:

asyncio.timeout()

Timeout (3.11+) async with asyncio.timeout(5.0):

Pattern Sequential Concurrent

Execution await a(); await b()

await gather(a(), b())

Time Sum of durations Max of durations

Library Use Case

aiohttp

HTTP client (most popular)

httpx

HTTP (sync + async)

asyncpg

PostgreSQL

uvloop

2-4x faster event loop

When to Use This Skill

Use for async/concurrent programming:

  • Network requests (HTTP, WebSockets)

  • Database queries

  • File I/O operations

  • Multiple concurrent I/O operations

  • FastAPI/Starlette async endpoints

Related skills:

  • For FastAPI: see python-fastapi

  • For type hints: see python-type-hints

  • For gotchas: see python-gotchas

Python Asyncio Complete Guide

Overview

Asyncio is Python's built-in framework for writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like network requests, file I/O, and database queries.

When to Use Asyncio

Good Use Cases

  • Network requests (HTTP clients, WebSockets)

  • Database queries

  • File I/O operations

  • Web servers (FastAPI, Starlette)

  • Message queues

  • Multiple concurrent I/O operations

When NOT to Use

  • CPU-bound tasks (use multiprocessing instead)

  • Simple sequential scripts

  • Legacy codebases without async support

Core Concepts

Basic Async/Await

import asyncio

Async function (coroutine)

async def fetch_data(url: str) -> dict: # Simulated async I/O await asyncio.sleep(1) return {"url": url, "data": "..."}

Running a coroutine

async def main(): result = await fetch_data("https://api.example.com") print(result)

Entry point

asyncio.run(main())

Concurrent Execution with gather()

import asyncio import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict: async with session.get(url) as response: return {"url": url, "status": response.status}

async def fetch_all(urls: list[str]) -> list[dict]: async with aiohttp.ClientSession() as session: # Run all requests concurrently tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) return results

Usage

urls = [ "https://api.example.com/1", "https://api.example.com/2", "https://api.example.com/3", ] results = asyncio.run(fetch_all(urls))

TaskGroup (Python 3.11+)

import asyncio

async def process_item(item: str) -> str: await asyncio.sleep(1) return f"Processed: {item}"

async def process_all(items: list[str]) -> list[str]: results = []

# TaskGroup provides structured concurrency
async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(process_item(item)) for item in items]

# All tasks complete when exiting the context
return [task.result() for task in tasks]

Exception handling with TaskGroup

async def process_with_errors(items: list[str]): try: async with asyncio.TaskGroup() as tg: for item in items: tg.create_task(process_item(item)) except* ValueError as eg: # Handle ValueError exceptions for exc in eg.exceptions: print(f"ValueError: {exc}") except* TypeError as eg: # Handle TypeError exceptions for exc in eg.exceptions: print(f"TypeError: {exc}")

create_task() vs await

import asyncio

async def task_a(): await asyncio.sleep(2) return "A done"

async def task_b(): await asyncio.sleep(1) return "B done"

Sequential (slower - 3 seconds total)

async def sequential(): result_a = await task_a() # Wait 2 seconds result_b = await task_b() # Wait 1 second return result_a, result_b

Concurrent (faster - 2 seconds total)

async def concurrent(): task1 = asyncio.create_task(task_a()) # Start immediately task2 = asyncio.create_task(task_b()) # Start immediately result_a = await task1 result_b = await task2 return result_a, result_b

Using gather (recommended for multiple tasks)

async def concurrent_gather(): result_a, result_b = await asyncio.gather(task_a(), task_b()) return result_a, result_b

Advanced Patterns

Semaphores for Rate Limiting

import asyncio import aiohttp

async def fetch_with_limit( session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore ) -> dict: async with semaphore: # Limits concurrent requests async with session.get(url) as response: return await response.json()

async def fetch_many(urls: list[str], max_concurrent: int = 10) -> list[dict]: semaphore = asyncio.Semaphore(max_concurrent)

async with aiohttp.ClientSession() as session:
    tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
    return await asyncio.gather(*tasks)

Timeouts

import asyncio

async def slow_operation(): await asyncio.sleep(10) return "done"

async def with_timeout(): try: # Wait at most 5 seconds result = await asyncio.wait_for(slow_operation(), timeout=5.0) return result except asyncio.TimeoutError: print("Operation timed out") return None

Using timeout context manager (Python 3.11+)

async def with_timeout_context(): async with asyncio.timeout(5.0): result = await slow_operation() return result

Queues for Producer-Consumer

import asyncio from typing import Any

async def producer(queue: asyncio.Queue, items: list[Any]): for item in items: await queue.put(item) print(f"Produced: {item}") # Signal completion await queue.put(None)

async def consumer(queue: asyncio.Queue, consumer_id: int): while True: item = await queue.get() if item is None: # Put sentinel back for other consumers await queue.put(None) break print(f"Consumer {consumer_id} processing: {item}") await asyncio.sleep(0.5) # Simulate work queue.task_done()

async def main(): queue: asyncio.Queue = asyncio.Queue(maxsize=10)

items = list(range(20))

# Start producer and consumers
async with asyncio.TaskGroup() as tg:
    tg.create_task(producer(queue, items))
    for i in range(3):
        tg.create_task(consumer(queue, i))

asyncio.run(main())

Async Generators

import asyncio from typing import AsyncIterator

async def fetch_pages(url: str, max_pages: int = 10) -> AsyncIterator[dict]: """Async generator for paginated API.""" page = 1 while page <= max_pages: # Simulate API call await asyncio.sleep(0.1) data = {"page": page, "items": [f"item_{i}" for i in range(10)]}

    if not data["items"]:
        break

    yield data
    page += 1

async def process_pages(): async for page_data in fetch_pages("https://api.example.com"): print(f"Processing page {page_data['page']}") for item in page_data["items"]: process(item)

Async Context Managers

from contextlib import asynccontextmanager from typing import AsyncIterator

class AsyncDatabaseConnection: async def aenter(self): await self.connect() return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
    await self.disconnect()
    return False

async def connect(self):
    print("Connecting...")
    await asyncio.sleep(0.1)

async def disconnect(self):
    print("Disconnecting...")
    await asyncio.sleep(0.1)

Using decorator

@asynccontextmanager async def async_session() -> AsyncIterator[dict]: session = {"connected": True} try: yield session finally: session["connected"] = False await asyncio.sleep(0.1) # Cleanup

Usage

async def main(): async with AsyncDatabaseConnection() as db: await db.query("SELECT * FROM users")

async with async_session() as session:
    print(session)

Performance Optimization

Eager Task Factory (Python 3.12+)

import asyncio

async def cached_operation(key: str) -> str: cache = {"a": "value_a", "b": "value_b"} if key in cache: return cache[key] # Returns synchronously await asyncio.sleep(1) # Only if cache miss return f"fetched_{key}"

async def main(): loop = asyncio.get_event_loop() # Enable eager task execution loop.set_task_factory(asyncio.eager_task_factory)

# Cached operations complete synchronously without event loop overhead
result = await cached_operation("a")

Using uvloop

Install: pip install uvloop

import asyncio

try: import uvloop # 2-4x performance improvement asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) except ImportError: pass # Fall back to default event loop

async def main(): # Your async code here pass

asyncio.run(main())

Free-Threaded asyncio (Python 3.14+)

Python 3.14 improvements for free-threaded builds:

- Thread-safe asyncio with lock-free data structures

- Linear scaling with number of threads

- 10-20% single-threaded performance improvement

- Reduced memory usage

import asyncio import threading

async def per_thread_loop(): """Each thread can run its own event loop.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: await asyncio.sleep(1) finally: loop.close()

Multiple event loops in parallel (free-threaded build)

threads = [ threading.Thread(target=lambda: asyncio.run(per_thread_loop())) for _ in range(4) ] for t in threads: t.start() for t in threads: t.join()

Common Gotchas

Blocking the Event Loop

import asyncio import time

BAD: Blocks the event loop

async def bad_example(): time.sleep(5) # Blocks everything! return "done"

GOOD: Use async sleep

async def good_example(): await asyncio.sleep(5) return "done"

GOOD: Run blocking code in executor

async def blocking_in_executor(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, time.sleep, 5) return result

For CPU-bound work, use ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor

async def cpu_bound_work(data: list) -> list: loop = asyncio.get_event_loop() with ProcessPoolExecutor() as pool: result = await loop.run_in_executor(pool, heavy_computation, data) return result

Creating Tasks in Wrong Context

import asyncio

BAD: Task created outside async context

task = asyncio.create_task(some_coroutine()) # RuntimeError!

GOOD: Create tasks inside async function

async def main(): task = asyncio.create_task(some_coroutine()) await task

asyncio.run(main())

Forgetting to Await

import asyncio

async def fetch(): await asyncio.sleep(1) return "data"

BAD: Coroutine never executed

async def bad(): result = fetch() # Just creates coroutine object! print(result) # Prints coroutine object, not "data"

GOOD: Always await coroutines

async def good(): result = await fetch() print(result) # Prints "data"

Exception Handling in Tasks

import asyncio

async def failing_task(): await asyncio.sleep(1) raise ValueError("Task failed!")

async def main(): # BAD: Exception silently lost task = asyncio.create_task(failing_task()) await asyncio.sleep(2) # Task exception ignored

# GOOD: Always await tasks or use TaskGroup
task = asyncio.create_task(failing_task())
try:
    await task
except ValueError as e:
    print(f"Caught: {e}")

# BEST: Use TaskGroup (Python 3.11+)
try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(failing_task())
except* ValueError as eg:
    for exc in eg.exceptions:
        print(f"Caught: {exc}")

Async Libraries

HTTP Clients

aiohttp - Most popular

import aiohttp

async def fetch_aiohttp(url: str) -> dict: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()

httpx - Supports both sync and async

import httpx

async def fetch_httpx(url: str) -> dict: async with httpx.AsyncClient() as client: response = await client.get(url) return response.json()

Database

asyncpg - PostgreSQL

import asyncpg

async def query_postgres(): conn = await asyncpg.connect("postgresql://user:pass@localhost/db") rows = await conn.fetch("SELECT * FROM users") await conn.close() return rows

aiosqlite - SQLite

import aiosqlite

async def query_sqlite(): async with aiosqlite.connect("database.db") as db: async with db.execute("SELECT * FROM users") as cursor: return await cursor.fetchall()

Web Frameworks

FastAPI (built on Starlette)

from fastapi import FastAPI

app = FastAPI()

@app.get("/items/{item_id}") async def read_item(item_id: int): return {"item_id": item_id}

Starlette

from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Route

async def homepage(request): return JSONResponse({"hello": "world"})

app = Starlette(routes=[Route("/", homepage)])

Additional References

For production-ready patterns beyond this guide, see:

  • Async Patterns Library - Token bucket rate limiter, retry with exponential backoff, connection pools, batch processors, event bus, transaction context managers, async cache with TTL, graceful shutdown handlers

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

python-backend

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python-fundamentals

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python-package-management

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python-testing-deep

No summary provided by upstream source.

Repository SourceNeeds Review