Documentation/Buki/Rust/ skills /rust-async-patterns

📖 rust-async-patterns

Use when Rust async programming with tokio, async/await, and futures. Use when writing asynchronous Rust code.



Overview

Master asynchronous programming in Rust using async/await syntax, tokio runtime, and the futures ecosystem for concurrent I/O operations.

Async/Await Basics

Basic async function:

async fn fetch_data() -> String {
    String::from("data")
}

#[tokio::main]
async fn main() {
    let data = fetch_data().await;
    println!("{}", data);
}

Cargo.toml setup:

[dependencies]
tokio = { version = "1", features = ["full"] }

Tokio Runtime

Different runtime configurations:

// Multi-threaded runtime (default)
#[tokio::main]
async fn main() {
    // Code here
}

// Single-threaded runtime
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // Code here
}

// Manual runtime creation
use tokio::runtime::Runtime;

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("Running async code");
    });
}

Spawning Tasks

Creating concurrent tasks:

use tokio::task;

#[tokio::main]
async fn main() {
    let task1 = task::spawn(async {
        println!("Task 1");
        42
    });

    let task2 = task::spawn(async {
        println!("Task 2");
        100
    });

    let result1 = task1.await.unwrap();
    let result2 = task2.await.unwrap();

    println!("Results: {}, {}", result1, result2);
}

Spawning with move:

#[tokio::main]
async fn main() {
    let data = String::from("hello");

    let handle = task::spawn(async move {
        println!("{}", data);
    });

    handle.await.unwrap();
}

Async HTTP with reqwest

Install reqwest:

[dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }

Making HTTP requests:

use reqwest;

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    let response = reqwest::get("https://api.github.com/users/rust-lang")
        .await?
        .text()
        .await?;

    println!("{}", response);
    Ok(())
}

Concurrent requests:

use reqwest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://api.github.com/users/rust-lang",
        "https://api.github.com/users/tokio-rs",
    ];

    let mut handles = vec![];

    for url in urls {
        let handle = tokio::spawn(async move {
            reqwest::get(url).await?.text().await
        });
        handles.push(handle);
    }

    for handle in handles {
        let response = handle.await??;
        println!("{}", response);
    }

    Ok(())
}

Select and Join

tokio::select! for racing futures:

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = sleep(Duration::from_secs(1)) => {
            println!("Timer finished first");
        }
        _ = async_operation() => {
            println!("Operation finished first");
        }
    }
}

async fn async_operation() {
    sleep(Duration::from_secs(2)).await;
}

tokio::join! for concurrent execution:

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (r1, r2, r3) = tokio::join!(
        async { sleep(Duration::from_secs(1)).await; 1 },
        async { sleep(Duration::from_secs(1)).await; 2 },
        async { sleep(Duration::from_secs(1)).await; 3 },
    );

    println!("Results: {}, {}, {}", r1, r2, r3);
}

Channels

mpsc channel for message passing:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.unwrap();
        }
    });

    while let Some(value) = rx.recv().await {
        println!("Received: {}", value);
    }
}

Multiple producers:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            tx.send(format!("Message from {}", i)).await.unwrap();
        });
    }

    drop(tx); // Close channel when all senders dropped

    while let Some(msg) = rx.recv().await {
        println!("{}", msg);
    }
}

oneshot channel:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send("result").unwrap();
    });

    let result = rx.await.unwrap();
    println!("{}", result);
}

Synchronization Primitives

Mutex for shared state:

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = tokio::spawn(async move {
            let mut num = counter.lock().await;
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    println!("Result: {}", *counter.lock().await);
}

RwLock for read-write access:

use tokio::sync::RwLock;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));

    // Multiple readers
    let data1 = Arc::clone(&data);
    let data2 = Arc::clone(&data);

    let reader1 = tokio::spawn(async move {
        let d = data1.read().await;
        println!("Reader 1: {:?}", *d);
    });

    let reader2 = tokio::spawn(async move {
        let d = data2.read().await;
        println!("Reader 2: {:?}", *d);
    });

    // One writer
    let data3 = Arc::clone(&data);
    let writer = tokio::spawn(async move {
        let mut d = data3.write().await;
        d.push(4);
    });

    reader1.await.unwrap();
    reader2.await.unwrap();
    writer.await.unwrap();
}

Semaphore for limiting concurrency:

use tokio::sync::Semaphore;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut handles = vec![];

    for i in 0..10 {
        let permit = semaphore.clone();
        let handle = tokio::spawn(async move {
            let _permit = permit.acquire().await.unwrap();
            println!("Task {} acquired permit", i);
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            println!("Task {} releasing permit", i);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

Streams

Using async streams:

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);

    while let Some(value) = stream.next().await {
        println!("{}", value);
    }
}

Creating custom streams:

use tokio_stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};

struct Counter {
    count: usize,
    max: usize,
}

impl Stream for Counter {
    type Item = usize;

    fn poll_next(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        if self.count < self.max {
            let current = self.count;
            self.count += 1;
            Poll::Ready(Some(current))
        } else {
            Poll::Ready(None)
        }
    }
}

#[tokio::main]
async fn main() {
    let mut counter = Counter { count: 0, max: 5 };

    while let Some(value) = counter.next().await {
        println!("{}", value);
    }
}

Timeouts and Intervals

Using timeout:

use tokio::time::{timeout, Duration};

async fn slow_operation() -> String {
    tokio::time::sleep(Duration::from_secs(5)).await;
    String::from("done")
}

#[tokio::main]
async fn main() {
    match timeout(Duration::from_secs(2), slow_operation()).await {
        Ok(result) => println!("Success: {}", result),
        Err(_) => println!("Operation timed out"),
    }
}

Using intervals:

use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
    let mut interval = interval(Duration::from_secs(1));

    for _ in 0..5 {
        interval.tick().await;
        println!("Tick");
    }
}

Error Handling

Propagating errors with ?:

use reqwest;

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    let response = reqwest::get(url).await?;
    let body = response.text().await?;
    Ok(body)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let content = fetch_url("https://example.com").await?;
    println!("{}", content);
    Ok(())
}

Blocking Operations

Running CPU-intensive tasks:

use tokio::task;

fn blocking_operation() -> u64 {
    // CPU-intensive work
    (0..1_000_000).sum()
}

#[tokio::main]
async fn main() {
    let result = task::spawn_blocking(|| {
        blocking_operation()
    }).await.unwrap();

    println!("Result: {}", result);
}

Async Traits

Using async-trait crate:

[dependencies]
async-trait = "0.1"
use async_trait::async_trait;

#[async_trait]
trait Repository {
    async fn find(&self, id: u64) -> Option<String>;
    async fn save(&self, data: String) -> Result<(), String>;
}

struct UserRepository;

#[async_trait]
impl Repository for UserRepository {
    async fn find(&self, id: u64) -> Option<String> {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        Some(format!("User {}", id))
    }

    async fn save(&self, data: String) -> Result<(), String> {
        println!("Saving: {}", data);
        Ok(())
    }
}

When to Use This Skill

Use rust-async-patterns when you need to:

  • Build async web servers or clients
  • Handle concurrent I/O operations efficiently
  • Make multiple HTTP requests concurrently
  • Implement producer-consumer patterns
  • Work with async streams of data
  • Manage shared state across async tasks
  • Control concurrency limits
  • Handle timeouts and cancellation
  • Build event-driven systems
  • Process data asynchronously

Best Practices

  • Use tokio::spawn for CPU-independent tasks
  • Use spawn_blocking for CPU-intensive work
  • Prefer channels over shared state with locks
  • Use select! for racing futures
  • Use join! for concurrent independent operations
  • Set appropriate timeouts for network operations
  • Use Semaphore to limit concurrent operations
  • Avoid holding locks across await points
  • Use Arc for shared ownership in async context
  • Handle errors properly with Result

Common Pitfalls

  • Holding std::sync::Mutex across await (use tokio::sync::Mutex)
  • Not using move with closures in spawn
  • Forgetting to await futures
  • Blocking the runtime with CPU-intensive work
  • Creating too many tasks without limits
  • Not handling cancellation properly
  • Using the wrong channel type
  • Deadlocks with improper lock ordering
  • Not configuring runtime appropriately
  • Ignoring errors in spawned tasks

Resources