ð python-async-patterns
Master Python asynchronous programming with asyncio, async/await, and concurrent.futures. Use for async code and concurrency patterns.
Overview
Master asynchronous programming in Python using asyncio, async/await syntax, and concurrent execution patterns for I/O-bound and CPU-bound tasks.
Basic Async/Await
Core async syntax:
import asyncio
# Define async function with async def
async def fetch_data(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(1) # Simulate I/O operation
return f"Data from {url}"
# Call async function
async def main() -> None:
result = await fetch_data("https://api.example.com")
print(result)
# Run async function
asyncio.run(main())
Multiple concurrent operations:
import asyncio
async def fetch_url(url: str) -> str:
await asyncio.sleep(1)
return f"Content from {url}"
async def main() -> None:
# Run concurrently with gather
results = await asyncio.gather(
fetch_url("https://example.com/1"),
fetch_url("https://example.com/2"),
fetch_url("https://example.com/3")
)
for result in results:
print(result)
asyncio.run(main())
asyncio.create_task
Creating and managing tasks:
import asyncio
async def process_item(item: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Processed {item}"
async def main() -> None:
# Create tasks for concurrent execution
task1 = asyncio.create_task(process_item("A", 2.0))
task2 = asyncio.create_task(process_item("B", 1.0))
task3 = asyncio.create_task(process_item("C", 1.5))
# Do other work while tasks run
print("Tasks started")
# Wait for tasks to complete
result1 = await task1
result2 = await task2
result3 = await task3
print(result1, result2, result3)
asyncio.run(main())
Task with name and context:
import asyncio
async def background_task(name: str) -> None:
print(f"Task {name} starting")
await asyncio.sleep(2)
print(f"Task {name} completed")
async def main() -> None:
# Create named task
task = asyncio.create_task(
background_task("worker"),
name="background-worker"
)
# Check task status
print(f"Task name: {task.get_name()}")
print(f"Task done: {task.done()}")
await task
asyncio.run(main())
asyncio.gather vs asyncio.wait
Using gather for results:
import asyncio
async def fetch(n: int) -> int:
await asyncio.sleep(1)
return n * 2
async def main() -> None:
# gather returns results in order
results = await asyncio.gather(
fetch(1),
fetch(2),
fetch(3)
)
print(results) # [2, 4, 6]
# Return exceptions instead of raising
results = await asyncio.gather(
fetch(1),
fetch(2),
return_exceptions=True
)
asyncio.run(main())
Using wait for more control:
import asyncio
async def worker(n: int) -> int:
await asyncio.sleep(n)
return n
async def main() -> None:
tasks = [
asyncio.create_task(worker(1)),
asyncio.create_task(worker(2)),
asyncio.create_task(worker(3))
]
# Wait for first task to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"Done: {len(done)}, Pending: {len(pending)}")
# Cancel pending tasks
for task in pending:
task.cancel()
# Wait for all with timeout
done, pending = await asyncio.wait(
tasks,
timeout=2.0
)
asyncio.run(main())
Async Context Managers
Creating async context managers:
import asyncio
from typing import AsyncIterator
class AsyncResource:
async def __aenter__(self) -> "AsyncResource":
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
print("Releasing resource")
await asyncio.sleep(0.1)
async def query(self) -> str:
return "data"
async def main() -> None:
async with AsyncResource() as resource:
result = await resource.query()
print(result)
asyncio.run(main())
Using asynccontextmanager decorator:
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
# Setup
print(f"Connecting to {url}")
await asyncio.sleep(0.1)
conn = f"connection-{url}"
try:
yield conn
finally:
# Teardown
print(f"Closing connection to {url}")
await asyncio.sleep(0.1)
async def main() -> None:
async with get_connection("localhost") as conn:
print(f"Using {conn}")
asyncio.run(main())
Async Iterators
Creating async iterators:
import asyncio
from typing import AsyncIterator
class AsyncRange:
def __init__(self, count: int) -> None:
self.count = count
def __aiter__(self) -> AsyncIterator[int]:
return self
async def __anext__(self) -> int:
if self.count <= 0:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.count -= 1
return self.count
async def main() -> None:
async for i in AsyncRange(5):
print(i)
asyncio.run(main())
Async generator functions:
import asyncio
from typing import AsyncIterator
async def async_range(count: int) -> AsyncIterator[int]:
for i in range(count):
await asyncio.sleep(0.1)
yield i
async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
for url in urls:
await asyncio.sleep(0.5)
yield f"Page content from {url}"
async def main() -> None:
async for num in async_range(5):
print(num)
urls = ["url1", "url2", "url3"]
async for page in fetch_pages(urls):
print(page)
asyncio.run(main())
Async Queues
Producer-consumer pattern with Queue:
import asyncio
from asyncio import Queue
async def producer(queue: Queue[int], n: int) -> None:
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(i)
print(f"Produced {i}")
await queue.put(None) # Sentinel value
async def consumer(queue: Queue[int], name: str) -> None:
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.2)
print(f"Consumer {name} processed {item}")
queue.task_done()
async def main() -> None:
queue: Queue[int] = Queue(maxsize=5)
# Start producer and consumers
prod = asyncio.create_task(producer(queue, 10))
cons1 = asyncio.create_task(consumer(queue, "A"))
cons2 = asyncio.create_task(consumer(queue, "B"))
await prod
await queue.join() # Wait for all tasks to be processed
# Signal consumers to exit
await queue.put(None)
await queue.put(None)
await cons1
await cons2
asyncio.run(main())
Semaphore and Lock
Limiting concurrent operations:
import asyncio
async def fetch_with_limit(
url: str,
semaphore: asyncio.Semaphore
) -> str:
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main() -> None:
# Limit to 3 concurrent operations
semaphore = asyncio.Semaphore(3)
urls = [f"https://example.com/{i}" for i in range(10)]
tasks = [
fetch_with_limit(url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
Using Lock for mutual exclusion:
import asyncio
class Counter:
def __init__(self) -> None:
self.value = 0
self.lock = asyncio.Lock()
async def increment(self) -> None:
async with self.lock:
# Critical section
current = self.value
await asyncio.sleep(0.01)
self.value = current + 1
async def main() -> None:
counter = Counter()
# Run increments concurrently
await asyncio.gather(*[
counter.increment()
for _ in range(100)
])
print(f"Final value: {counter.value}") # Should be 100
asyncio.run(main())
Timeouts and Cancellation
Using timeout:
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(5)
return "completed"
async def main() -> None:
# Timeout after 2 seconds
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
Handling cancellation:
import asyncio
async def cancellable_task() -> None:
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
# Cleanup
raise
async def main() -> None:
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancellation confirmed")
asyncio.run(main())
Event Loop Management
Direct event loop control:
import asyncio
async def task1() -> None:
print("Task 1")
async def task2() -> None:
print("Task 2")
# Create new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Schedule callbacks
loop.call_soon(lambda: print("Callback"))
# Schedule delayed callback
loop.call_later(1.0, lambda: print("Delayed"))
# Run coroutine
loop.run_until_complete(task1())
# Run multiple tasks
loop.run_until_complete(
asyncio.gather(task1(), task2())
)
finally:
loop.close()
concurrent.futures
ThreadPoolExecutor for I/O-bound tasks:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def blocking_io_task(n: int) -> int:
print(f"Task {n} starting")
time.sleep(2) # Blocking I/O
return n * 2
async def main() -> None:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=3) as executor:
# Run blocking function in thread pool
tasks = [
loop.run_in_executor(executor, blocking_io_task, i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
ProcessPoolExecutor for CPU-bound tasks:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive_task(n: int) -> int:
# CPU-intensive computation
result = sum(i * i for i in range(n))
return result
async def main() -> None:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as executor:
# Run CPU-bound function in process pool
tasks = [
loop.run_in_executor(
executor,
cpu_intensive_task,
10_000_000
)
for _ in range(4)
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} tasks")
asyncio.run(main())
Async HTTP with aiohttp
Making async HTTP requests:
import asyncio
import aiohttp
async def fetch_url(
session: aiohttp.ClientSession,
url: str
) -> str:
async with session.get(url) as response:
return await response.text()
async def main() -> None:
async with aiohttp.ClientSession() as session:
urls = [
"https://example.com/1",
"https://example.com/2",
"https://example.com/3"
]
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} pages")
asyncio.run(main())
Error Handling
Handling exceptions in async code:
import asyncio
async def failing_task() -> None:
await asyncio.sleep(1)
raise ValueError("Task failed")
async def main() -> None:
# Handle exception in single task
try:
await failing_task()
except ValueError as e:
print(f"Caught: {e}")
# Handle exceptions with gather
results = await asyncio.gather(
failing_task(),
failing_task(),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
asyncio.run(main())
When to Use This Skill
Use python-async-patterns when you need to:
- Handle multiple I/O operations concurrently (API calls, database queries)
- Build async web servers or clients
- Process data streams asynchronously
- Implement producer-consumer patterns with async queues
- Run blocking I/O operations without blocking the event loop
- Create async context managers for resource management
- Implement async iterators for streaming data
- Control concurrency with semaphores and locks
- Handle timeouts and cancellation in async operations
- Mix CPU-bound and I/O-bound operations efficiently
Best Practices
- Use asyncio.run() for the main entry point
- Create tasks with asyncio.create_task() for concurrent execution
- Use gather() when you need all results
- Use wait() when you need fine-grained control
- Always handle CancelledError in long-running tasks
- Use semaphores to limit concurrent operations
- Prefer async context managers for resource management
- Use asyncio.Queue for producer-consumer patterns
- Run blocking I/O in thread pool with run_in_executor()
- Run CPU-bound tasks in process pool
- Set appropriate timeouts for network operations
- Use structured concurrency patterns (nurseries)
Common Pitfalls
- Forgetting to await coroutines (creates coroutine object, doesn't run)
- Blocking the event loop with CPU-intensive work
- Not handling task cancellation properly
- Using time.sleep() instead of asyncio.sleep()
- Creating too many concurrent tasks without limits
- Not closing resources properly in async context
- Mixing blocking and async code incorrectly
- Not handling exceptions in background tasks
- Forgetting to call task_done() with Queue
- Using global event loop instead of asyncio.run()