FastAPI Async Patterns
Master async patterns in FastAPI for building high-performance, concurrent APIs with optimal resource usage.
Basic Async Route Handlers
Understanding async vs sync endpoints in FastAPI.
from fastapi import FastAPI import time import asyncio
app = FastAPI()
Sync endpoint (blocks the event loop)
@app.get('/sync') def sync_endpoint(): time.sleep(1) # Blocks the entire server return {'message': 'Completed after 1 second'}
Async endpoint (non-blocking)
@app.get('/async') async def async_endpoint(): await asyncio.sleep(1) # Other requests can be handled return {'message': 'Completed after 1 second'}
CPU-bound work (use sync)
@app.get('/cpu-intensive') def cpu_intensive(): result = sum(i * i for i in range(10000000)) return {'result': result}
I/O-bound work (use async)
@app.get('/io-intensive') async def io_intensive(): async with httpx.AsyncClient() as client: response = await client.get('https://api.example.com/data') return response.json()
Async Database Operations
Async database patterns with popular ORMs and libraries.
from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from sqlalchemy import select import asyncpg from motor.motor_asyncio import AsyncIOMotorClient from tortoise import Tortoise from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
SQLAlchemy async setup
DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db' engine = create_async_engine(DATABASE_URL, echo=True, future=True) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False )
async def get_db() -> AsyncSession: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
@app.get('/users/{user_id}') async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail='User not found') return user
Direct asyncpg (lower level, faster)
async def get_asyncpg_pool(): pool = await asyncpg.create_pool( 'postgresql://user:pass@localhost/db', min_size=10, max_size=20 ) try: yield pool finally: await pool.close()
@app.get('/users-fast/{user_id}') async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)): async with pool.acquire() as conn: row = await conn.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) if not row: raise HTTPException(status_code=404, detail='User not found') return dict(row)
MongoDB with Motor
mongo_client = AsyncIOMotorClient('mongodb://localhost:27017') db = mongo_client.mydatabase
@app.get('/documents/{doc_id}') async def get_document(doc_id: str): document = await db.collection.find_one({'_id': doc_id}) if not document: raise HTTPException(status_code=404, detail='Document not found') return document
@app.post('/documents') async def create_document(data: dict): result = await db.collection.insert_one(data) return {'id': str(result.inserted_id)}
Tortoise ORM async
register_tortoise( app, db_url='postgres://user:pass@localhost/db', modules={'models': ['app.models']}, generate_schemas=True, add_exception_handlers=True, )
from tortoise.models import Model from tortoise import fields
class UserModel(Model): id = fields.IntField(pk=True) name = fields.CharField(max_length=255) email = fields.CharField(max_length=255)
@app.get('/tortoise-users/{user_id}') async def get_tortoise_user(user_id: int): user = await UserModel.get_or_none(id=user_id) if not user: raise HTTPException(status_code=404, detail='User not found') return user
Background Tasks
Fire-and-forget tasks without blocking the response.
from fastapi import BackgroundTasks, FastAPI import asyncio from datetime import datetime
app = FastAPI()
Simple background task
async def send_email(email: str, message: str): await asyncio.sleep(2) # Simulate email sending print(f'Email sent to {email}: {message}')
@app.post('/send-email') async def send_email_endpoint( email: str, message: str, background_tasks: BackgroundTasks ): background_tasks.add_task(send_email, email, message) return {'status': 'Email will be sent in background'}
Multiple background tasks
async def log_activity(user_id: int, action: str): await asyncio.sleep(0.5) print(f'[{datetime.now()}] User {user_id} performed: {action}')
async def update_analytics(action: str): await asyncio.sleep(1) print(f'Analytics updated for action: {action}')
@app.post('/users/{user_id}/action') async def perform_action( user_id: int, action: str, background_tasks: BackgroundTasks ): # Add multiple tasks background_tasks.add_task(log_activity, user_id, action) background_tasks.add_task(update_analytics, action) return {'status': 'Action logged'}
Background cleanup
async def cleanup_temp_files(file_path: str): await asyncio.sleep(60) # Wait before cleanup import os if os.path.exists(file_path): os.remove(file_path) print(f'Cleaned up: {file_path}')
@app.post('/upload') async def upload_file( file: UploadFile, background_tasks: BackgroundTasks ): temp_path = f'/tmp/{file.filename}' with open(temp_path, 'wb') as f: content = await file.read() f.write(content)
# Schedule cleanup
background_tasks.add_task(cleanup_temp_files, temp_path)
return {'filename': file.filename, 'path': temp_path}
WebSocket Handling
Real-time bidirectional communication patterns.
from fastapi import WebSocket, WebSocketDisconnect, Depends from typing import List import json
app = FastAPI()
Simple WebSocket
@app.websocket('/ws') async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_text() await websocket.send_text(f'Echo: {data}') except WebSocketDisconnect: print('Client disconnected')
WebSocket with authentication
async def get_current_user_ws(websocket: WebSocket): token = websocket.query_params.get('token') if not token or not verify_token(token): await websocket.close(code=1008) # Policy violation raise HTTPException(status_code=401, detail='Unauthorized') return decode_token(token)
@app.websocket('/ws/authenticated') async def authenticated_websocket( websocket: WebSocket, user = Depends(get_current_user_ws) ): await websocket.accept() try: await websocket.send_text(f'Welcome {user["name"]}') while True: data = await websocket.receive_text() await websocket.send_text(f'{user["name"]}: {data}') except WebSocketDisconnect: print(f'User {user["name"]} disconnected')
Broadcasting to multiple connections
class ConnectionManager: def init(self): self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws/chat/{client_id}') async def chat_endpoint(websocket: WebSocket, client_id: str): await manager.connect(websocket) await manager.broadcast(f'Client {client_id} joined the chat') try: while True: data = await websocket.receive_text() await manager.broadcast(f'Client {client_id}: {data}') except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f'Client {client_id} left the chat')
WebSocket with JSON messages
@app.websocket('/ws/json') async def json_websocket(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_json() message_type = data.get('type')
if message_type == 'ping':
await websocket.send_json({'type': 'pong'})
elif message_type == 'message':
await websocket.send_json({
'type': 'response',
'data': f'Received: {data.get("content")}'
})
except WebSocketDisconnect:
print('Client disconnected')
Server-Sent Events (SSE)
One-way streaming from server to client.
from fastapi import FastAPI from sse_starlette.sse import EventSourceResponse import asyncio
app = FastAPI()
@app.get('/sse') async def sse_endpoint(): async def event_generator(): for i in range(10): await asyncio.sleep(1) yield { 'event': 'message', 'data': f'Message {i}' }
return EventSourceResponse(event_generator())
SSE with real-time updates
@app.get('/sse/updates') async def sse_updates(): async def update_generator(): while True: # Simulate fetching updates await asyncio.sleep(2) update = await fetch_latest_update() yield { 'event': 'update', 'data': json.dumps(update) }
return EventSourceResponse(update_generator())
SSE with heartbeat
@app.get('/sse/heartbeat') async def sse_heartbeat(): async def heartbeat_generator(): try: while True: await asyncio.sleep(30) yield { 'event': 'heartbeat', 'data': datetime.now().isoformat() } except asyncio.CancelledError: print('SSE connection closed')
return EventSourceResponse(heartbeat_generator())
Streaming Responses
Stream large files or generated content.
from fastapi import FastAPI from fastapi.responses import StreamingResponse import io import csv
app = FastAPI()
Stream large file
@app.get('/download/{filename}') async def download_file(filename: str): async def file_stream(): with open(f'/data/{filename}', 'rb') as f: while chunk := f.read(8192): yield chunk
return StreamingResponse(
file_stream(),
media_type='application/octet-stream',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
Stream generated CSV
@app.get('/export/users') async def export_users(): async def csv_stream(): output = io.StringIO() writer = csv.writer(output)
# Write header
writer.writerow(['ID', 'Name', 'Email'])
yield output.getvalue()
output.truncate(0)
output.seek(0)
# Stream users in batches
offset = 0
batch_size = 100
while True:
users = await fetch_users_batch(offset, batch_size)
if not users:
break
for user in users:
writer.writerow([user.id, user.name, user.email])
yield output.getvalue()
output.truncate(0)
output.seek(0)
offset += batch_size
return StreamingResponse(
csv_stream(),
media_type='text/csv',
headers={'Content-Disposition': 'attachment; filename=users.csv'}
)
Stream generated content
@app.get('/generate/report') async def generate_report(): async def report_stream(): yield b'<html><body><h1>Report</h1>'
for section in ['users', 'orders', 'analytics']:
await asyncio.sleep(0.5) # Simulate processing
data = await fetch_section_data(section)
yield f'<h2>{section.title()}</h2>'.encode()
yield f'<pre>{data}</pre>'.encode()
yield b'</body></html>'
return StreamingResponse(report_stream(), media_type='text/html')
Concurrent Request Handling
Parallel processing patterns for multiple operations.
from fastapi import FastAPI import asyncio import httpx
app = FastAPI()
Parallel API calls
@app.get('/aggregate/user/{user_id}') async def aggregate_user_data(user_id: int): async with httpx.AsyncClient() as client: # Fetch from multiple sources in parallel profile_task = client.get(f'https://api.example.com/users/{user_id}') posts_task = client.get(f'https://api.example.com/users/{user_id}/posts') comments_task = client.get(f'https://api.example.com/users/{user_id}/comments')
profile, posts, comments = await asyncio.gather(
profile_task,
posts_task,
comments_task
)
return {
'profile': profile.json(),
'posts': posts.json(),
'comments': comments.json()
}
Parallel database queries
@app.get('/dashboard') async def get_dashboard(db: AsyncSession = Depends(get_db)): # Execute multiple queries in parallel users_query = db.execute(select(User).limit(10)) orders_query = db.execute(select(Order).limit(10)) stats_query = db.execute(select(func.count(User.id)))
users, orders, stats = await asyncio.gather(
users_query,
orders_query,
stats_query
)
return {
'users': users.scalars().all(),
'orders': orders.scalars().all(),
'total_users': stats.scalar()
}
Race condition (first to complete wins)
@app.get('/fastest-price/{product_id}') async def get_fastest_price(product_id: str): async with httpx.AsyncClient() as client: tasks = [ client.get(f'https://store1.com/price/{product_id}'), client.get(f'https://store2.com/price/{product_id}'), client.get(f'https://store3.com/price/{product_id}') ]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending requests
for task in pending:
task.cancel()
result = done.pop().result()
return result.json()
Async Context Managers
Resource management with async context managers.
from contextlib import asynccontextmanager from fastapi import FastAPI import asyncio
Async context manager for lifespan events
@asynccontextmanager async def lifespan(app: FastAPI): # Startup print('Starting up...') db_pool = await create_db_pool() redis_client = await create_redis_client()
# Store in app state
app.state.db_pool = db_pool
app.state.redis = redis_client
yield
# Shutdown
print('Shutting down...')
await db_pool.close()
await redis_client.close()
app = FastAPI(lifespan=lifespan)
Custom async context manager
class AsyncDatabaseSession: def init(self, pool): self.pool = pool self.conn = None
async def __aenter__(self):
self.conn = await self.pool.acquire()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.pool.release(self.conn)
if exc_type is not None:
# Handle exception
await self.conn.rollback()
return False
@app.get('/data') async def get_data(): async with AsyncDatabaseSession(app.state.db_pool) as conn: result = await conn.fetch('SELECT * FROM data') return result
Connection Pooling
Efficient connection management for databases and HTTP clients.
from fastapi import FastAPI, Depends import asyncpg import httpx from typing import AsyncGenerator
app = FastAPI()
Database connection pool
class DatabasePool: def init(self): self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
'postgresql://user:pass@localhost/db',
min_size=10,
max_size=20,
command_timeout=60,
max_queries=50000,
max_inactive_connection_lifetime=300
)
async def close_pool(self):
await self.pool.close()
async def get_connection(self):
async with self.pool.acquire() as connection:
yield connection
db_pool = DatabasePool()
@app.on_event('startup') async def startup(): await db_pool.create_pool()
@app.on_event('shutdown') async def shutdown(): await db_pool.close_pool()
@app.get('/users') async def get_users(conn = Depends(db_pool.get_connection)): rows = await conn.fetch('SELECT * FROM users') return [dict(row) for row in rows]
HTTP client pool
class HTTPClientPool: def init(self): self.client = None
async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
if self.client is None:
self.client = httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
timeout=httpx.Timeout(10.0)
)
yield self.client
async def close(self):
if self.client:
await self.client.aclose()
http_pool = HTTPClientPool()
@app.get('/external-api') async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)): response = await client.get('https://api.example.com/data') return response.json()
Performance Optimization
Async patterns for optimal performance.
from fastapi import FastAPI import asyncio from functools import lru_cache
app = FastAPI()
Cache expensive async operations
from aiocache import Cache from aiocache.serializers import JsonSerializer
cache = Cache(Cache.MEMORY, serializer=JsonSerializer())
@app.get('/expensive-data/{key}') async def get_expensive_data(key: str): # Check cache first cached = await cache.get(key) if cached: return {'data': cached, 'cached': True}
# Expensive operation
await asyncio.sleep(2)
data = compute_expensive_result(key)
# Store in cache
await cache.set(key, data, ttl=300)
return {'data': data, 'cached': False}
Batch operations
@app.post('/users/batch') async def create_users_batch(users: List[UserCreate], db = Depends(get_db)): # Create users in batch (more efficient than one-by-one) user_objects = [User(**user.dict()) for user in users] db.add_all(user_objects) await db.flush() return user_objects
Debouncing with asyncio
class Debouncer: def init(self, delay: float): self.delay = delay self.task = None
async def debounce(self, coro):
if self.task:
self.task.cancel()
async def delayed():
await asyncio.sleep(self.delay)
await coro
self.task = asyncio.create_task(delayed())
await self.task
debouncer = Debouncer(delay=1.0)
Prefetching related data
@app.get('/posts/{post_id}') async def get_post_with_relations(post_id: int, db = Depends(get_db)): # Fetch post and related data in parallel post_task = db.get(Post, post_id) comments_task = db.execute( select(Comment).where(Comment.post_id == post_id) ) author_task = db.execute( select(User).where(User.id == Post.author_id) )
post, comments_result, author_result = await asyncio.gather(
post_task, comments_task, author_task
)
return {
'post': post,
'comments': comments_result.scalars().all(),
'author': author_result.scalar_one()
}
When to Use This Skill
Use fastapi-async-patterns when:
-
Building high-throughput APIs that handle many concurrent requests
-
Working with I/O-bound operations (database, external APIs, file operations)
-
Implementing real-time features (WebSockets, SSE)
-
Processing multiple operations in parallel
-
Streaming large datasets or files
-
Building microservices that communicate with other services
-
Optimizing API response times and resource usage
-
Handling background tasks without blocking responses
FastAPI Async Best Practices
-
Use async for I/O - Always use async for database, HTTP requests, and file operations
-
Avoid blocking calls - Never use blocking calls in async functions (time.sleep, requests library)
-
Connection pooling - Use connection pools for databases and HTTP clients
-
Proper cleanup - Always clean up resources with try/finally or async context managers
-
Concurrent operations - Use asyncio.gather for parallel operations when possible
-
Background tasks - Use BackgroundTasks for fire-and-forget operations
-
Stream large data - Use StreamingResponse for large files or generated content
-
Timeout handling - Set timeouts on all external calls to prevent hanging
-
Error propagation - Handle exceptions properly in async code
-
Monitor performance - Use tools like aiomonitor to debug async issues
FastAPI Async Common Pitfalls
-
Blocking the event loop - Using synchronous I/O in async functions kills performance
-
Missing await - Forgetting await on async functions causes coroutine warnings
-
Creating too many tasks - Spawning unlimited tasks can exhaust resources
-
Not closing connections - Resource leaks from unclosed database/HTTP connections
-
Mixing sync and async - Incorrect mixing causes event loop issues
-
Race conditions - Shared state in async code without proper locking
-
Timeout issues - No timeouts on external calls can hang the server
-
Memory leaks - Background tasks that never complete accumulate
-
Error swallowing - Silent failures in background tasks and event handlers
-
Deadlocks - Circular waits in async dependencies or locks
Resources
-
FastAPI Async Documentation
-
Python asyncio Documentation
-
SQLAlchemy Async Guide
-
HTTPX Async Client
-
AsyncPG Documentation
-
Motor (MongoDB Async)
-
WebSockets in FastAPI
-
Server-Sent Events with Starlette