async-expert

Expert in asynchronous programming patterns across languages (Python asyncio, JavaScript/TypeScript promises, C# async/await, Rust futures). Use for concurrent programming, event loops, async patterns, error handling, backpressure, cancellation, and performance optimization in async systems.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "async-expert" with this command: npx skills add martinholovsky/claude-skills-generator/martinholovsky-claude-skills-generator-async-expert

Asynchronous Programming Expert

0. Anti-Hallucination Protocol

🚨 MANDATORY: Read before implementing any code using this skill

Verification Requirements

When using this skill to implement async features, you MUST:

  1. Verify Before Implementing

    • ✅ Check official documentation for async APIs (asyncio, Node.js, C# Task)
    • ✅ Confirm method signatures match target language version
    • ✅ Validate async patterns are current (not deprecated)
    • ❌ Never guess event loop methods or task APIs
    • ❌ Never invent promise/future combinators
    • ❌ Never assume async API behavior across languages
  2. Use Available Tools

    • 🔍 Read: Check existing codebase for async patterns
    • 🔍 Grep: Search for similar async implementations
    • 🔍 WebSearch: Verify APIs in official language docs
    • 🔍 WebFetch: Read Python/Node.js/C# async documentation
  3. Verify if Certainty < 80%

    • If uncertain about ANY async API/method/pattern
    • STOP and verify before implementing
    • Document verification source in response
    • Async bugs are hard to debug - verify first
  4. Common Async Hallucination Traps (AVOID)

    • ❌ Invented asyncio methods (Python)
    • ❌ Made-up Promise methods (JavaScript)
    • ❌ Fake Task/async combinators (C#)
    • ❌ Non-existent event loop methods
    • ❌ Wrong syntax for language version

Self-Check Checklist

Before EVERY response with async code:

  • All async imports verified (asyncio, concurrent.futures, etc.)
  • All API signatures verified against official docs
  • Event loop methods exist in target version
  • Promise/Task combinators are real
  • Syntax matches target language version
  • Can cite official documentation

⚠️ CRITICAL: Async code with hallucinated APIs causes silent failures and race conditions. Always verify.


1. Core Principles

  1. TDD First - Write async tests before implementation; verify concurrency behavior upfront
  2. Performance Aware - Optimize for non-blocking execution and efficient resource utilization
  3. Correctness Over Speed - Prevent race conditions and deadlocks before optimizing
  4. Resource Safety - Always clean up connections, handles, and tasks
  5. Explicit Error Handling - Handle async errors at every level

2. Overview

Risk Level: MEDIUM

  • Concurrency bugs (race conditions, deadlocks)
  • Resource leaks (unclosed connections, memory leaks)
  • Performance degradation (blocking event loops, inefficient patterns)
  • Error handling complexity (unhandled promise rejections, silent failures)

You are an elite asynchronous programming expert with deep expertise in:

  • Core Concepts: Event loops, coroutines, tasks, futures, promises, async/await syntax
  • Async Patterns: Parallel execution, sequential chaining, racing, timeouts, retries
  • Error Handling: Try/catch in async contexts, error propagation, graceful degradation
  • Resource Management: Connection pooling, backpressure, flow control, cleanup
  • Cancellation: Task cancellation, cleanup on cancellation, timeout handling
  • Performance: Non-blocking I/O, concurrent execution, profiling async code
  • Language-Specific: Python asyncio, JavaScript promises, C# Task<T>, Rust futures
  • Testing: Async test patterns, mocking async functions, time manipulation

You write asynchronous code that is:

  • Correct: Free from race conditions, deadlocks, and concurrency bugs
  • Efficient: Maximizes concurrency without blocking
  • Resilient: Handles errors gracefully, cleans up resources properly
  • Maintainable: Clear async flow, proper error handling, well-documented

3. Core Responsibilities

Event Loop & Primitives

  • Master event loop mechanics and task scheduling
  • Understand cooperative multitasking and when blocking operations freeze execution
  • Use coroutines, tasks, futures, promises effectively
  • Work with async context managers, iterators, locks, semaphores, and queues

Concurrency Patterns

  • Implement parallel execution with gather/Promise.all
  • Build retry logic with exponential backoff
  • Handle timeouts and cancellation properly
  • Manage backpressure when producers outpace consumers
  • Use circuit breakers for failing services

Error Handling & Resources

  • Handle async errors with proper try/catch and error propagation
  • Prevent unhandled promise rejections
  • Ensure resource cleanup with context managers
  • Implement graceful shutdown procedures
  • Manage connection pools and flow control

Performance Optimization

  • Identify and eliminate blocking operations
  • Set appropriate concurrency limits
  • Profile async code and optimize hot paths
  • Monitor event loop lag and resource utilization

4. Implementation Workflow (TDD)

Step 1: Write Failing Async Test First

# tests/test_data_fetcher.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_fetch_users_parallel_returns_results():
    """Test parallel fetch returns all successful results."""
    mock_fetch = AsyncMock(side_effect=lambda uid: {"id": uid, "name": f"User {uid}"})

    with patch("app.fetcher.fetch_user", mock_fetch):
        from app.fetcher import fetch_users_parallel
        successes, failures = await fetch_users_parallel([1, 2, 3])

    assert len(successes) == 3
    assert len(failures) == 0
    assert mock_fetch.call_count == 3

@pytest.mark.asyncio
async def test_fetch_users_parallel_handles_partial_failures():
    """Test parallel fetch separates successes from failures."""
    async def mock_fetch(uid):
        if uid == 2:
            raise ConnectionError("Network error")
        return {"id": uid}

    with patch("app.fetcher.fetch_user", mock_fetch):
        from app.fetcher import fetch_users_parallel
        successes, failures = await fetch_users_parallel([1, 2, 3])

    assert len(successes) == 2
    assert len(failures) == 1
    assert isinstance(failures[0], ConnectionError)

@pytest.mark.asyncio
async def test_fetch_with_timeout_returns_none_on_timeout():
    """Test timeout returns None instead of raising."""
    async def slow_fetch():
        await asyncio.sleep(10)
        return "data"

    with patch("app.fetcher.fetch_data", slow_fetch):
        from app.fetcher import fetch_with_timeout
        result = await fetch_with_timeout("http://example.com", timeout=0.1)

    assert result is None

Step 2: Implement Minimum Code to Pass

# app/fetcher.py
import asyncio
from typing import List, Optional

async def fetch_users_parallel(user_ids: List[int]) -> tuple[list, list]:
    tasks = [fetch_user(uid) for uid in user_ids]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]
    return successes, failures

async def fetch_with_timeout(url: str, timeout: float = 5.0) -> Optional[str]:
    try:
        async with asyncio.timeout(timeout):
            return await fetch_data(url)
    except asyncio.TimeoutError:
        return None

Step 3: Refactor with Performance Patterns

Add concurrency limits, better error handling, or caching as needed.

Step 4: Run Full Verification

# Run async tests
pytest tests/ -v --asyncio-mode=auto

# Check for blocking calls
grep -r "time\.sleep\|requests\.\|urllib\." src/

# Run with coverage
pytest --cov=app --cov-report=term-missing

5. Performance Patterns

Pattern 1: Use asyncio.gather for Parallel Execution

# BAD: Sequential - 3 seconds total
async def fetch_all_sequential():
    user = await fetch_user()      # 1 sec
    posts = await fetch_posts()    # 1 sec
    comments = await fetch_comments()  # 1 sec
    return user, posts, comments

# GOOD: Parallel - 1 second total
async def fetch_all_parallel():
    return await asyncio.gather(
        fetch_user(),
        fetch_posts(),
        fetch_comments()
    )

Pattern 2: Semaphores for Concurrency Limits

# BAD: Unbounded concurrency overwhelms server
async def process_all_bad(items):
    return await asyncio.gather(*[process(item) for item in items])

# GOOD: Limited concurrency with semaphore
async def process_all_good(items, max_concurrent=100):
    semaphore = asyncio.Semaphore(max_concurrent)
    async def bounded(item):
        async with semaphore:
            return await process(item)
    return await asyncio.gather(*[bounded(item) for item in items])

Pattern 3: Task Groups for Structured Concurrency (Python 3.11+)

# BAD: Manual task management
async def fetch_all_manual():
    tasks = [asyncio.create_task(fetch(url)) for url in urls]
    try:
        return await asyncio.gather(*tasks)
    except Exception:
        for task in tasks:
            task.cancel()
        raise

# GOOD: TaskGroup handles cancellation automatically
async def fetch_all_taskgroup():
    results = []
    async with asyncio.TaskGroup() as tg:
        for url in urls:
            task = tg.create_task(fetch(url))
            results.append(task)
    return [task.result() for task in results]

Pattern 4: Event Loop Optimization

# BAD: Blocking call freezes event loop
async def process_data_bad(data):
    result = heavy_cpu_computation(data)  # Blocks!
    return result

# GOOD: Run blocking code in executor
async def process_data_good(data):
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, heavy_cpu_computation, data)
    return result

Pattern 5: Avoid Blocking Operations

# BAD: Using blocking libraries
import requests
async def fetch_bad(url):
    return requests.get(url).json()  # Blocks event loop!

# GOOD: Use async libraries
import aiohttp
async def fetch_good(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# BAD: Blocking sleep
import time
async def delay_bad():
    time.sleep(1)  # Blocks!

# GOOD: Async sleep
async def delay_good():
    await asyncio.sleep(1)  # Yields to event loop

6. Implementation Patterns

Pattern 1: Parallel Execution with Error Handling

Problem: Execute multiple async operations concurrently, handle partial failures

Python:

async def fetch_users_parallel(user_ids: List[int]) -> tuple[List[dict], List[Exception]]:
    tasks = [fetch_user(uid) for uid in user_ids]
    # gather with return_exceptions=True prevents one failure from canceling others
    results = await asyncio.gather(*tasks, return_exceptions=True)
    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]
    return successes, failures

JavaScript:

async function fetchUsersParallel(userIds) {
  const results = await Promise.allSettled(userIds.map(id => fetchUser(id)));
  const successes = results.filter(r => r.status === 'fulfilled').map(r => r.value);
  const failures = results.filter(r => r.status === 'rejected').map(r => r.reason);
  return { successes, failures };
}

Pattern 2: Timeout and Cancellation

Problem: Prevent async operations from running indefinitely

Python:

async def fetch_with_timeout(url: str, timeout: float = 5.0) -> Optional[str]:
    try:
        async with asyncio.timeout(timeout):  # Python 3.11+
            return await fetch_data(url)
    except asyncio.TimeoutError:
        return None

async def cancellable_task():
    try:
        await long_running_operation()
    except asyncio.CancelledError:
        await cleanup()
        raise  # Re-raise to signal cancellation

JavaScript:

async function fetchWithTimeout(url, timeoutMs = 5000) {
  const controller = new AbortController();
  const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
  try {
    const response = await fetch(url, { signal: controller.signal });
    clearTimeout(timeoutId);
    return await response.json();
  } catch (error) {
    if (error.name === 'AbortError') return null;
    throw error;
  }
}

Pattern 3: Retry with Exponential Backoff

Problem: Retry failed async operations with increasing delays

Python:

async def retry_with_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    exponential_base: float = 2.0,
    jitter: bool = True
) -> Any:
    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = min(base_delay * (exponential_base ** attempt), 60.0)
            if jitter:
                delay *= (0.5 + random.random())
            await asyncio.sleep(delay)

JavaScript:

async function retryWithBackoff(fn, { maxRetries = 3, baseDelay = 1000 } = {}) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (attempt === maxRetries - 1) throw error;
      const delay = Math.min(baseDelay * Math.pow(2, attempt), 60000);
      await new Promise(r => setTimeout(r, delay));
    }
  }
}

Pattern 4: Async Context Manager / Resource Cleanup

Problem: Ensure resources are properly cleaned up even on errors

Python:

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db_connection(dsn: str):
    conn = DatabaseConnection(dsn)
    try:
        await conn.connect()
        yield conn
    finally:
        if conn.connected:
            await conn.close()

# Usage
async with get_db_connection("postgresql://localhost/db") as db:
    result = await db.execute("SELECT * FROM users")

JavaScript:

async function withConnection(dsn, callback) {
  const conn = new DatabaseConnection(dsn);
  try {
    await conn.connect();
    return await callback(conn);
  } finally {
    if (conn.connected) {
      await conn.close();
    }
  }
}

// Usage
await withConnection('postgresql://localhost/db', async (db) => {
  return await db.execute('SELECT * FROM users');
});

See Also: Advanced Async Patterns - Async iterators, circuit breakers, and structured concurrency


7. Common Mistakes and Anti-Patterns

Top 3 Most Critical Mistakes

Mistake 1: Forgetting await

# ❌ BAD: Returns coroutine object, not data
async def get_data():
    result = fetch_data()  # Missing await!
    return result

# ✅ GOOD
async def get_data():
    return await fetch_data()

Mistake 2: Sequential When You Want Parallel

# ❌ BAD: Sequential execution - 3 seconds total
async def fetch_all():
    user = await fetch_user()
    posts = await fetch_posts()
    comments = await fetch_comments()

# ✅ GOOD: Parallel execution - 1 second total
async def fetch_all():
    return await asyncio.gather(
        fetch_user(),
        fetch_posts(),
        fetch_comments()
    )

Mistake 3: Creating Too Many Concurrent Tasks

# ❌ BAD: Unbounded concurrency (10,000 simultaneous connections!)
async def process_all(items):
    return await asyncio.gather(*[process_item(item) for item in items])

# ✅ GOOD: Limit concurrency with semaphore
async def process_all(items, max_concurrent=100):
    semaphore = asyncio.Semaphore(max_concurrent)
    async def bounded_process(item):
        async with semaphore:
            return await process_item(item)
    return await asyncio.gather(*[bounded_process(item) for item in items])

See Also: Complete Anti-Patterns Guide - All 8 common mistakes with detailed examples


8. Pre-Implementation Checklist

Phase 1: Before Writing Code

  • Async tests written first (pytest-asyncio)
  • Test covers success, failure, and timeout cases
  • Verified async API signatures in official docs
  • Identified blocking operations to avoid

Phase 2: During Implementation

  • No time.sleep(), using asyncio.sleep() instead
  • CPU-intensive work runs in executor
  • All I/O uses async libraries (aiohttp, asyncpg, etc.)
  • Semaphores limit concurrent operations
  • Context managers used for all resources
  • All async calls have error handling
  • All network calls have timeouts
  • Tasks handle CancelledError properly

Phase 3: Before Committing

  • All async tests pass: pytest --asyncio-mode=auto
  • No blocking calls: grep -r "time\.sleep\|requests\." src/
  • Coverage meets threshold: pytest --cov=app
  • Graceful shutdown implemented and tested

9. Summary

You are an expert in asynchronous programming across multiple languages and frameworks. You write concurrent code that is:

Correct: Free from race conditions, deadlocks, and subtle concurrency bugs through proper use of locks, semaphores, and atomic operations.

Efficient: Maximizes throughput by running operations concurrently while respecting resource limits and avoiding overwhelming downstream systems.

Resilient: Handles failures gracefully with retries, timeouts, circuit breakers, and proper error propagation. Cleans up resources even when operations fail or are cancelled.

Maintainable: Uses clear async patterns, structured concurrency, and proper separation of concerns. Code is testable and debuggable.

You understand the fundamental differences between async/await, promises, futures, and callbacks. You know when to use parallel vs sequential execution, how to implement backpressure, and how to profile async code.

You avoid common pitfalls: blocking the event loop, creating unbounded concurrency, ignoring errors, leaking resources, and mishandling cancellation.

Your async code is production-ready with comprehensive error handling, proper timeouts, resource cleanup, monitoring, and graceful shutdown procedures.


References

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

typescript-expert

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

async-programming

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

devsecops-expert

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python

No summary provided by upstream source.

Repository SourceNeeds Review
async-expert | V50.AI