ð fastapi-async-patterns
Use when FastAPI async patterns for building high-performance APIs. Use when handling concurrent requests and async operations.
Overview
Master async patterns in FastAPI for building high-performance, concurrent APIs with optimal resource usage.
Basic Async Route Handlers
Understanding async vs sync endpoints in FastAPI.
from fastapi import FastAPI
import time
import asyncio
app = FastAPI()
# Sync endpoint (blocks the event loop)
@app.get('/sync')
def sync_endpoint():
time.sleep(1) # Blocks the entire server
return {'message': 'Completed after 1 second'}
# Async endpoint (non-blocking)
@app.get('/async')
async def async_endpoint():
await asyncio.sleep(1) # Other requests can be handled
return {'message': 'Completed after 1 second'}
# CPU-bound work (use sync)
@app.get('/cpu-intensive')
def cpu_intensive():
result = sum(i * i for i in range(10000000))
return {'result': result}
# I/O-bound work (use async)
@app.get('/io-intensive')
async def io_intensive():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/data')
return response.json()
Async Database Operations
Async database patterns with popular ORMs and libraries.
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()
# SQLAlchemy async setup
DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db'
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@app.get('/users/{user_id}')
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
# Direct asyncpg (lower level, faster)
async def get_asyncpg_pool():
pool = await asyncpg.create_pool(
'postgresql://user:pass@localhost/db',
min_size=10,
max_size=20
)
try:
yield pool
finally:
await pool.close()
@app.get('/users-fast/{user_id}')
async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)):
async with pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1', user_id
)
if not row:
raise HTTPException(status_code=404, detail='User not found')
return dict(row)
# MongoDB with Motor
mongo_client = AsyncIOMotorClient('mongodb://localhost:27017')
db = mongo_client.mydatabase
@app.get('/documents/{doc_id}')
async def get_document(doc_id: str):
document = await db.collection.find_one({'_id': doc_id})
if not document:
raise HTTPException(status_code=404, detail='Document not found')
return document
@app.post('/documents')
async def create_document(data: dict):
result = await db.collection.insert_one(data)
return {'id': str(result.inserted_id)}
# Tortoise ORM async
register_tortoise(
app,
db_url='postgres://user:pass@localhost/db',
modules={'models': ['app.models']},
generate_schemas=True,
add_exception_handlers=True,
)
from tortoise.models import Model
from tortoise import fields
class UserModel(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=255)
email = fields.CharField(max_length=255)
@app.get('/tortoise-users/{user_id}')
async def get_tortoise_user(user_id: int):
user = await UserModel.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
Background Tasks
Fire-and-forget tasks without blocking the response.
from fastapi import BackgroundTasks, FastAPI
import asyncio
from datetime import datetime
app = FastAPI()
# Simple background task
async def send_email(email: str, message: str):
await asyncio.sleep(2) # Simulate email sending
print(f'Email sent to {email}: {message}')
@app.post('/send-email')
async def send_email_endpoint(
email: str,
message: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_email, email, message)
return {'status': 'Email will be sent in background'}
# Multiple background tasks
async def log_activity(user_id: int, action: str):
await asyncio.sleep(0.5)
print(f'[{datetime.now()}] User {user_id} performed: {action}')
async def update_analytics(action: str):
await asyncio.sleep(1)
print(f'Analytics updated for action: {action}')
@app.post('/users/{user_id}/action')
async def perform_action(
user_id: int,
action: str,
background_tasks: BackgroundTasks
):
# Add multiple tasks
background_tasks.add_task(log_activity, user_id, action)
background_tasks.add_task(update_analytics, action)
return {'status': 'Action logged'}
# Background cleanup
async def cleanup_temp_files(file_path: str):
await asyncio.sleep(60) # Wait before cleanup
import os
if os.path.exists(file_path):
os.remove(file_path)
print(f'Cleaned up: {file_path}')
@app.post('/upload')
async def upload_file(
file: UploadFile,
background_tasks: BackgroundTasks
):
temp_path = f'/tmp/{file.filename}'
with open(temp_path, 'wb') as f:
content = await file.read()
f.write(content)
# Schedule cleanup
background_tasks.add_task(cleanup_temp_files, temp_path)
return {'filename': file.filename, 'path': temp_path}
WebSocket Handling
Real-time bidirectional communication patterns.
from fastapi import WebSocket, WebSocketDisconnect, Depends
from typing import List
import json
app = FastAPI()
# Simple WebSocket
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f'Echo: {data}')
except WebSocketDisconnect:
print('Client disconnected')
# WebSocket with authentication
async def get_current_user_ws(websocket: WebSocket):
token = websocket.query_params.get('token')
if not token or not verify_token(token):
await websocket.close(code=1008) # Policy violation
raise HTTPException(status_code=401, detail='Unauthorized')
return decode_token(token)
@app.websocket('/ws/authenticated')
async def authenticated_websocket(
websocket: WebSocket,
user = Depends(get_current_user_ws)
):
await websocket.accept()
try:
await websocket.send_text(f'Welcome {user["name"]}')
while True:
data = await websocket.receive_text()
await websocket.send_text(f'{user["name"]}: {data}')
except WebSocketDisconnect:
print(f'User {user["name"]} disconnected')
# Broadcasting to multiple connections
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws/chat/{client_id}')
async def chat_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
await manager.broadcast(f'Client {client_id} joined the chat')
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f'Client {client_id}: {data}')
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f'Client {client_id} left the chat')
# WebSocket with JSON messages
@app.websocket('/ws/json')
async def json_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message_type = data.get('type')
if message_type == 'ping':
await websocket.send_json({'type': 'pong'})
elif message_type == 'message':
await websocket.send_json({
'type': 'response',
'data': f'Received: {data.get("content")}'
})
except WebSocketDisconnect:
print('Client disconnected')
Server-Sent Events (SSE)
One-way streaming from server to client.
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
@app.get('/sse')
async def sse_endpoint():
async def event_generator():
for i in range(10):
await asyncio.sleep(1)
yield {
'event': 'message',
'data': f'Message {i}'
}
return EventSourceResponse(event_generator())
# SSE with real-time updates
@app.get('/sse/updates')
async def sse_updates():
async def update_generator():
while True:
# Simulate fetching updates
await asyncio.sleep(2)
update = await fetch_latest_update()
yield {
'event': 'update',
'data': json.dumps(update)
}
return EventSourceResponse(update_generator())
# SSE with heartbeat
@app.get('/sse/heartbeat')
async def sse_heartbeat():
async def heartbeat_generator():
try:
while True:
await asyncio.sleep(30)
yield {
'event': 'heartbeat',
'data': datetime.now().isoformat()
}
except asyncio.CancelledError:
print('SSE connection closed')
return EventSourceResponse(heartbeat_generator())
Streaming Responses
Stream large files or generated content.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import io
import csv
app = FastAPI()
# Stream large file
@app.get('/download/{filename}')
async def download_file(filename: str):
async def file_stream():
with open(f'/data/{filename}', 'rb') as f:
while chunk := f.read(8192):
yield chunk
return StreamingResponse(
file_stream(),
media_type='application/octet-stream',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)
# Stream generated CSV
@app.get('/export/users')
async def export_users():
async def csv_stream():
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(['ID', 'Name', 'Email'])
yield output.getvalue()
output.truncate(0)
output.seek(0)
# Stream users in batches
offset = 0
batch_size = 100
while True:
users = await fetch_users_batch(offset, batch_size)
if not users:
break
for user in users:
writer.writerow([user.id, user.name, user.email])
yield output.getvalue()
output.truncate(0)
output.seek(0)
offset += batch_size
return StreamingResponse(
csv_stream(),
media_type='text/csv',
headers={'Content-Disposition': 'attachment; filename=users.csv'}
)
# Stream generated content
@app.get('/generate/report')
async def generate_report():
async def report_stream():
yield b'<html><body><h1>Report</h1>'
for section in ['users', 'orders', 'analytics']:
await asyncio.sleep(0.5) # Simulate processing
data = await fetch_section_data(section)
yield f'<h2>{section.title()}</h2>'.encode()
yield f'<pre>{data}</pre>'.encode()
yield b'</body></html>'
return StreamingResponse(report_stream(), media_type='text/html')
Concurrent Request Handling
Parallel processing patterns for multiple operations.
from fastapi import FastAPI
import asyncio
import httpx
app = FastAPI()
# Parallel API calls
@app.get('/aggregate/user/{user_id}')
async def aggregate_user_data(user_id: int):
async with httpx.AsyncClient() as client:
# Fetch from multiple sources in parallel
profile_task = client.get(f'https://api.example.com/users/{user_id}')
posts_task = client.get(f'https://api.example.com/users/{user_id}/posts')
comments_task = client.get(f'https://api.example.com/users/{user_id}/comments')
profile, posts, comments = await asyncio.gather(
profile_task,
posts_task,
comments_task
)
return {
'profile': profile.json(),
'posts': posts.json(),
'comments': comments.json()
}
# Parallel database queries
@app.get('/dashboard')
async def get_dashboard(db: AsyncSession = Depends(get_db)):
# Execute multiple queries in parallel
users_query = db.execute(select(User).limit(10))
orders_query = db.execute(select(Order).limit(10))
stats_query = db.execute(select(func.count(User.id)))
users, orders, stats = await asyncio.gather(
users_query,
orders_query,
stats_query
)
return {
'users': users.scalars().all(),
'orders': orders.scalars().all(),
'total_users': stats.scalar()
}
# Race condition (first to complete wins)
@app.get('/fastest-price/{product_id}')
async def get_fastest_price(product_id: str):
async with httpx.AsyncClient() as client:
tasks = [
client.get(f'https://store1.com/price/{product_id}'),
client.get(f'https://store2.com/price/{product_id}'),
client.get(f'https://store3.com/price/{product_id}')
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending requests
for task in pending:
task.cancel()
result = done.pop().result()
return result.json()
Async Context Managers
Resource management with async context managers.
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
# Async context manager for lifespan events
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print('Starting up...')
db_pool = await create_db_pool()
redis_client = await create_redis_client()
# Store in app state
app.state.db_pool = db_pool
app.state.redis = redis_client
yield
# Shutdown
print('Shutting down...')
await db_pool.close()
await redis_client.close()
app = FastAPI(lifespan=lifespan)
# Custom async context manager
class AsyncDatabaseSession:
def __init__(self, pool):
self.pool = pool
self.conn = None
async def __aenter__(self):
self.conn = await self.pool.acquire()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.pool.release(self.conn)
if exc_type is not None:
# Handle exception
await self.conn.rollback()
return False
@app.get('/data')
async def get_data():
async with AsyncDatabaseSession(app.state.db_pool) as conn:
result = await conn.fetch('SELECT * FROM data')
return result
Connection Pooling
Efficient connection management for databases and HTTP clients.
from fastapi import FastAPI, Depends
import asyncpg
import httpx
from typing import AsyncGenerator
app = FastAPI()
# Database connection pool
class DatabasePool:
def __init__(self):
self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
'postgresql://user:pass@localhost/db',
min_size=10,
max_size=20,
command_timeout=60,
max_queries=50000,
max_inactive_connection_lifetime=300
)
async def close_pool(self):
await self.pool.close()
async def get_connection(self):
async with self.pool.acquire() as connection:
yield connection
db_pool = DatabasePool()
@app.on_event('startup')
async def startup():
await db_pool.create_pool()
@app.on_event('shutdown')
async def shutdown():
await db_pool.close_pool()
@app.get('/users')
async def get_users(conn = Depends(db_pool.get_connection)):
rows = await conn.fetch('SELECT * FROM users')
return [dict(row) for row in rows]
# HTTP client pool
class HTTPClientPool:
def __init__(self):
self.client = None
async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
if self.client is None:
self.client = httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
timeout=httpx.Timeout(10.0)
)
yield self.client
async def close(self):
if self.client:
await self.client.aclose()
http_pool = HTTPClientPool()
@app.get('/external-api')
async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)):
response = await client.get('https://api.example.com/data')
return response.json()
Performance Optimization
Async patterns for optimal performance.
from fastapi import FastAPI
import asyncio
from functools import lru_cache
app = FastAPI()
# Cache expensive async operations
from aiocache import Cache
from aiocache.serializers import JsonSerializer
cache = Cache(Cache.MEMORY, serializer=JsonSerializer())
@app.get('/expensive-data/{key}')
async def get_expensive_data(key: str):
# Check cache first
cached = await cache.get(key)
if cached:
return {'data': cached, 'cached': True}
# Expensive operation
await asyncio.sleep(2)
data = compute_expensive_result(key)
# Store in cache
await cache.set(key, data, ttl=300)
return {'data': data, 'cached': False}
# Batch operations
@app.post('/users/batch')
async def create_users_batch(users: List[UserCreate], db = Depends(get_db)):
# Create users in batch (more efficient than one-by-one)
user_objects = [User(**user.dict()) for user in users]
db.add_all(user_objects)
await db.flush()
return user_objects
# Debouncing with asyncio
class Debouncer:
def __init__(self, delay: float):
self.delay = delay
self.task = None
async def debounce(self, coro):
if self.task:
self.task.cancel()
async def delayed():
await asyncio.sleep(self.delay)
await coro
self.task = asyncio.create_task(delayed())
await self.task
debouncer = Debouncer(delay=1.0)
# Prefetching related data
@app.get('/posts/{post_id}')
async def get_post_with_relations(post_id: int, db = Depends(get_db)):
# Fetch post and related data in parallel
post_task = db.get(Post, post_id)
comments_task = db.execute(
select(Comment).where(Comment.post_id == post_id)
)
author_task = db.execute(
select(User).where(User.id == Post.author_id)
)
post, comments_result, author_result = await asyncio.gather(
post_task, comments_task, author_task
)
return {
'post': post,
'comments': comments_result.scalars().all(),
'author': author_result.scalar_one()
}
When to Use This Skill
Use fastapi-async-patterns when:
- Building high-throughput APIs that handle many concurrent requests
- Working with I/O-bound operations (database, external APIs, file operations)
- Implementing real-time features (WebSockets, SSE)
- Processing multiple operations in parallel
- Streaming large datasets or files
- Building microservices that communicate with other services
- Optimizing API response times and resource usage
- Handling background tasks without blocking responses
FastAPI Async Best Practices
- Use async for I/O - Always use async for database, HTTP requests, and file operations
- Avoid blocking calls - Never use blocking calls in async functions (time.sleep, requests library)
- Connection pooling - Use connection pools for databases and HTTP clients
- Proper cleanup - Always clean up resources with try/finally or async context managers
- Concurrent operations - Use asyncio.gather for parallel operations when possible
- Background tasks - Use BackgroundTasks for fire-and-forget operations
- Stream large data - Use StreamingResponse for large files or generated content
- Timeout handling - Set timeouts on all external calls to prevent hanging
- Error propagation - Handle exceptions properly in async code
- Monitor performance - Use tools like aiomonitor to debug async issues
FastAPI Async Common Pitfalls
- Blocking the event loop - Using synchronous I/O in async functions kills performance
- Missing await - Forgetting await on async functions causes coroutine warnings
- Creating too many tasks - Spawning unlimited tasks can exhaust resources
- Not closing connections - Resource leaks from unclosed database/HTTP connections
- Mixing sync and async - Incorrect mixing causes event loop issues
- Race conditions - Shared state in async code without proper locking
- Timeout issues - No timeouts on external calls can hang the server
- Memory leaks - Background tasks that never complete accumulate
- Error swallowing - Silent failures in background tasks and event handlers
- Deadlocks - Circular waits in async dependencies or locks