Documentation/Buki/Python/ skills /python-async-patterns

📖 python-async-patterns

Master Python asynchronous programming with asyncio, async/await, and concurrent.futures. Use for async code and concurrency patterns.



Overview

Master asynchronous programming in Python using asyncio, async/await syntax, and concurrent execution patterns for I/O-bound and CPU-bound tasks.

Basic Async/Await

Core async syntax:

import asyncio

# Define async function with async def
async def fetch_data(url: str) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # Simulate I/O operation
    return f"Data from {url}"

# Call async function
async def main() -> None:
    result = await fetch_data("https://api.example.com")
    print(result)

# Run async function
asyncio.run(main())

Multiple concurrent operations:

import asyncio

async def fetch_url(url: str) -> str:
    await asyncio.sleep(1)
    return f"Content from {url}"

async def main() -> None:
    # Run concurrently with gather
    results = await asyncio.gather(
        fetch_url("https://example.com/1"),
        fetch_url("https://example.com/2"),
        fetch_url("https://example.com/3")
    )
    for result in results:
        print(result)

asyncio.run(main())

asyncio.create_task

Creating and managing tasks:

import asyncio

async def process_item(item: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Processed {item}"

async def main() -> None:
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(process_item("A", 2.0))
    task2 = asyncio.create_task(process_item("B", 1.0))
    task3 = asyncio.create_task(process_item("C", 1.5))

    # Do other work while tasks run
    print("Tasks started")

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(result1, result2, result3)

asyncio.run(main())

Task with name and context:

import asyncio

async def background_task(name: str) -> None:
    print(f"Task {name} starting")
    await asyncio.sleep(2)
    print(f"Task {name} completed")

async def main() -> None:
    # Create named task
    task = asyncio.create_task(
        background_task("worker"),
        name="background-worker"
    )

    # Check task status
    print(f"Task name: {task.get_name()}")
    print(f"Task done: {task.done()}")

    await task

asyncio.run(main())

asyncio.gather vs asyncio.wait

Using gather for results:

import asyncio

async def fetch(n: int) -> int:
    await asyncio.sleep(1)
    return n * 2

async def main() -> None:
    # gather returns results in order
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3)
    )
    print(results)  # [2, 4, 6]

    # Return exceptions instead of raising
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        return_exceptions=True
    )

asyncio.run(main())

Using wait for more control:

import asyncio

async def worker(n: int) -> int:
    await asyncio.sleep(n)
    return n

async def main() -> None:
    tasks = [
        asyncio.create_task(worker(1)),
        asyncio.create_task(worker(2)),
        asyncio.create_task(worker(3))
    ]

    # Wait for first task to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    print(f"Done: {len(done)}, Pending: {len(pending)}")

    # Cancel pending tasks
    for task in pending:
        task.cancel()

    # Wait for all with timeout
    done, pending = await asyncio.wait(
        tasks,
        timeout=2.0
    )

asyncio.run(main())

Async Context Managers

Creating async context managers:

import asyncio
from typing import AsyncIterator

class AsyncResource:
    async def __aenter__(self) -> "AsyncResource":
        print("Acquiring resource")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        print("Releasing resource")
        await asyncio.sleep(0.1)

    async def query(self) -> str:
        return "data"

async def main() -> None:
    async with AsyncResource() as resource:
        result = await resource.query()
        print(result)

asyncio.run(main())

Using asynccontextmanager decorator:

from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
    # Setup
    print(f"Connecting to {url}")
    await asyncio.sleep(0.1)
    conn = f"connection-{url}"

    try:
        yield conn
    finally:
        # Teardown
        print(f"Closing connection to {url}")
        await asyncio.sleep(0.1)

async def main() -> None:
    async with get_connection("localhost") as conn:
        print(f"Using {conn}")

asyncio.run(main())

Async Iterators

Creating async iterators:

import asyncio
from typing import AsyncIterator

class AsyncRange:
    def __init__(self, count: int) -> None:
        self.count = count

    def __aiter__(self) -> AsyncIterator[int]:
        return self

    async def __anext__(self) -> int:
        if self.count <= 0:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.count -= 1
        return self.count

async def main() -> None:
    async for i in AsyncRange(5):
        print(i)

asyncio.run(main())

Async generator functions:

import asyncio
from typing import AsyncIterator

async def async_range(count: int) -> AsyncIterator[int]:
    for i in range(count):
        await asyncio.sleep(0.1)
        yield i

async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
    for url in urls:
        await asyncio.sleep(0.5)
        yield f"Page content from {url}"

async def main() -> None:
    async for num in async_range(5):
        print(num)

    urls = ["url1", "url2", "url3"]
    async for page in fetch_pages(urls):
        print(page)

asyncio.run(main())

Async Queues

Producer-consumer pattern with Queue:

import asyncio
from asyncio import Queue

async def producer(queue: Queue[int], n: int) -> None:
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(i)
        print(f"Produced {i}")
    await queue.put(None)  # Sentinel value

async def consumer(queue: Queue[int], name: str) -> None:
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        await asyncio.sleep(0.2)
        print(f"Consumer {name} processed {item}")
        queue.task_done()

async def main() -> None:
    queue: Queue[int] = Queue(maxsize=5)

    # Start producer and consumers
    prod = asyncio.create_task(producer(queue, 10))
    cons1 = asyncio.create_task(consumer(queue, "A"))
    cons2 = asyncio.create_task(consumer(queue, "B"))

    await prod
    await queue.join()  # Wait for all tasks to be processed

    # Signal consumers to exit
    await queue.put(None)
    await queue.put(None)

    await cons1
    await cons2

asyncio.run(main())

Semaphore and Lock

Limiting concurrent operations:

import asyncio

async def fetch_with_limit(
    url: str,
    semaphore: asyncio.Semaphore
) -> str:
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(1)
        return f"Data from {url}"

async def main() -> None:
    # Limit to 3 concurrent operations
    semaphore = asyncio.Semaphore(3)

    urls = [f"https://example.com/{i}" for i in range(10)]

    tasks = [
        fetch_with_limit(url, semaphore)
        for url in urls
    ]

    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())

Using Lock for mutual exclusion:

import asyncio

class Counter:
    def __init__(self) -> None:
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self) -> None:
        async with self.lock:
            # Critical section
            current = self.value
            await asyncio.sleep(0.01)
            self.value = current + 1

async def main() -> None:
    counter = Counter()

    # Run increments concurrently
    await asyncio.gather(*[
        counter.increment()
        for _ in range(100)
    ])

    print(f"Final value: {counter.value}")  # Should be 100

asyncio.run(main())

Timeouts and Cancellation

Using timeout:

import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(5)
    return "completed"

async def main() -> None:
    # Timeout after 2 seconds
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())

Handling cancellation:

import asyncio

async def cancellable_task() -> None:
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")
        # Cleanup
        raise

async def main() -> None:
    task = asyncio.create_task(cancellable_task())

    await asyncio.sleep(3)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task cancellation confirmed")

asyncio.run(main())

Event Loop Management

Direct event loop control:

import asyncio

async def task1() -> None:
    print("Task 1")

async def task2() -> None:
    print("Task 2")

# Create new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
    # Schedule callbacks
    loop.call_soon(lambda: print("Callback"))

    # Schedule delayed callback
    loop.call_later(1.0, lambda: print("Delayed"))

    # Run coroutine
    loop.run_until_complete(task1())

    # Run multiple tasks
    loop.run_until_complete(
        asyncio.gather(task1(), task2())
    )
finally:
    loop.close()

concurrent.futures

ThreadPoolExecutor for I/O-bound tasks:

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io_task(n: int) -> int:
    print(f"Task {n} starting")
    time.sleep(2)  # Blocking I/O
    return n * 2

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ThreadPoolExecutor(max_workers=3) as executor:
        # Run blocking function in thread pool
        tasks = [
            loop.run_in_executor(executor, blocking_io_task, i)
            for i in range(5)
        ]

        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())

ProcessPoolExecutor for CPU-bound tasks:

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive_task(n: int) -> int:
    # CPU-intensive computation
    result = sum(i * i for i in range(n))
    return result

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ProcessPoolExecutor(max_workers=4) as executor:
        # Run CPU-bound function in process pool
        tasks = [
            loop.run_in_executor(
                executor,
                cpu_intensive_task,
                10_000_000
            )
            for _ in range(4)
        ]

        results = await asyncio.gather(*tasks)
        print(f"Completed {len(results)} tasks")

asyncio.run(main())

Async HTTP with aiohttp

Making async HTTP requests:

import asyncio
import aiohttp

async def fetch_url(
    session: aiohttp.ClientSession,
    url: str
) -> str:
    async with session.get(url) as response:
        return await response.text()

async def main() -> None:
    async with aiohttp.ClientSession() as session:
        urls = [
            "https://example.com/1",
            "https://example.com/2",
            "https://example.com/3"
        ]

        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        print(f"Fetched {len(results)} pages")

asyncio.run(main())

Error Handling

Handling exceptions in async code:

import asyncio

async def failing_task() -> None:
    await asyncio.sleep(1)
    raise ValueError("Task failed")

async def main() -> None:
    # Handle exception in single task
    try:
        await failing_task()
    except ValueError as e:
        print(f"Caught: {e}")

    # Handle exceptions with gather
    results = await asyncio.gather(
        failing_task(),
        failing_task(),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")

asyncio.run(main())

When to Use This Skill

Use python-async-patterns when you need to:

  • Handle multiple I/O operations concurrently (API calls, database queries)
  • Build async web servers or clients
  • Process data streams asynchronously
  • Implement producer-consumer patterns with async queues
  • Run blocking I/O operations without blocking the event loop
  • Create async context managers for resource management
  • Implement async iterators for streaming data
  • Control concurrency with semaphores and locks
  • Handle timeouts and cancellation in async operations
  • Mix CPU-bound and I/O-bound operations efficiently

Best Practices

  • Use asyncio.run() for the main entry point
  • Create tasks with asyncio.create_task() for concurrent execution
  • Use gather() when you need all results
  • Use wait() when you need fine-grained control
  • Always handle CancelledError in long-running tasks
  • Use semaphores to limit concurrent operations
  • Prefer async context managers for resource management
  • Use asyncio.Queue for producer-consumer patterns
  • Run blocking I/O in thread pool with run_in_executor()
  • Run CPU-bound tasks in process pool
  • Set appropriate timeouts for network operations
  • Use structured concurrency patterns (nurseries)

Common Pitfalls

  • Forgetting to await coroutines (creates coroutine object, doesn't run)
  • Blocking the event loop with CPU-intensive work
  • Not handling task cancellation properly
  • Using time.sleep() instead of asyncio.sleep()
  • Creating too many concurrent tasks without limits
  • Not closing resources properly in async context
  • Mixing blocking and async code incorrectly
  • Not handling exceptions in background tasks
  • Forgetting to call task_done() with Queue
  • Using global event loop instead of asyncio.run()

Resources