ð rust-async-patterns
Use when Rust async programming with tokio, async/await, and futures. Use when writing asynchronous Rust code.
Overview
Master asynchronous programming in Rust using async/await syntax, tokio runtime, and the futures ecosystem for concurrent I/O operations.
Async/Await Basics
Basic async function:
async fn fetch_data() -> String {
String::from("data")
}
#[tokio::main]
async fn main() {
let data = fetch_data().await;
println!("{}", data);
}
Cargo.toml setup:
[dependencies]
tokio = { version = "1", features = ["full"] }
Tokio Runtime
Different runtime configurations:
// Multi-threaded runtime (default)
#[tokio::main]
async fn main() {
// Code here
}
// Single-threaded runtime
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Code here
}
// Manual runtime creation
use tokio::runtime::Runtime;
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("Running async code");
});
}
Spawning Tasks
Creating concurrent tasks:
use tokio::task;
#[tokio::main]
async fn main() {
let task1 = task::spawn(async {
println!("Task 1");
42
});
let task2 = task::spawn(async {
println!("Task 2");
100
});
let result1 = task1.await.unwrap();
let result2 = task2.await.unwrap();
println!("Results: {}, {}", result1, result2);
}
Spawning with move:
#[tokio::main]
async fn main() {
let data = String::from("hello");
let handle = task::spawn(async move {
println!("{}", data);
});
handle.await.unwrap();
}
Async HTTP with reqwest
Install reqwest:
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
Making HTTP requests:
use reqwest;
#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
let response = reqwest::get("https://api.github.com/users/rust-lang")
.await?
.text()
.await?;
println!("{}", response);
Ok(())
}
Concurrent requests:
use reqwest;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://api.github.com/users/rust-lang",
"https://api.github.com/users/tokio-rs",
];
let mut handles = vec![];
for url in urls {
let handle = tokio::spawn(async move {
reqwest::get(url).await?.text().await
});
handles.push(handle);
}
for handle in handles {
let response = handle.await??;
println!("{}", response);
}
Ok(())
}
Select and Join
tokio::select! for racing futures:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
println!("Timer finished first");
}
_ = async_operation() => {
println!("Operation finished first");
}
}
}
async fn async_operation() {
sleep(Duration::from_secs(2)).await;
}
tokio::join! for concurrent execution:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (r1, r2, r3) = tokio::join!(
async { sleep(Duration::from_secs(1)).await; 1 },
async { sleep(Duration::from_secs(1)).await; 2 },
async { sleep(Duration::from_secs(1)).await; 3 },
);
println!("Results: {}, {}, {}", r1, r2, r3);
}
Channels
mpsc channel for message passing:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
}
});
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
}
Multiple producers:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(format!("Message from {}", i)).await.unwrap();
});
}
drop(tx); // Close channel when all senders dropped
while let Some(msg) = rx.recv().await {
println!("{}", msg);
}
}
oneshot channel:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("result").unwrap();
});
let result = rx.await.unwrap();
println!("{}", result);
}
Synchronization Primitives
Mutex for shared state:
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = tokio::spawn(async move {
let mut num = counter.lock().await;
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("Result: {}", *counter.lock().await);
}
RwLock for read-write access:
use tokio::sync::RwLock;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
// Multiple readers
let data1 = Arc::clone(&data);
let data2 = Arc::clone(&data);
let reader1 = tokio::spawn(async move {
let d = data1.read().await;
println!("Reader 1: {:?}", *d);
});
let reader2 = tokio::spawn(async move {
let d = data2.read().await;
println!("Reader 2: {:?}", *d);
});
// One writer
let data3 = Arc::clone(&data);
let writer = tokio::spawn(async move {
let mut d = data3.write().await;
d.push(4);
});
reader1.await.unwrap();
reader2.await.unwrap();
writer.await.unwrap();
}
Semaphore for limiting concurrency:
use tokio::sync::Semaphore;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for i in 0..10 {
let permit = semaphore.clone();
let handle = tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
println!("Task {} acquired permit", i);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("Task {} releasing permit", i);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
Streams
Using async streams:
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);
while let Some(value) = stream.next().await {
println!("{}", value);
}
}
Creating custom streams:
use tokio_stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
struct Counter {
count: usize,
max: usize,
}
impl Stream for Counter {
type Item = usize;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
if self.count < self.max {
let current = self.count;
self.count += 1;
Poll::Ready(Some(current))
} else {
Poll::Ready(None)
}
}
}
#[tokio::main]
async fn main() {
let mut counter = Counter { count: 0, max: 5 };
while let Some(value) = counter.next().await {
println!("{}", value);
}
}
Timeouts and Intervals
Using timeout:
use tokio::time::{timeout, Duration};
async fn slow_operation() -> String {
tokio::time::sleep(Duration::from_secs(5)).await;
String::from("done")
}
#[tokio::main]
async fn main() {
match timeout(Duration::from_secs(2), slow_operation()).await {
Ok(result) => println!("Success: {}", result),
Err(_) => println!("Operation timed out"),
}
}
Using intervals:
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() {
let mut interval = interval(Duration::from_secs(1));
for _ in 0..5 {
interval.tick().await;
println!("Tick");
}
}
Error Handling
Propagating errors with ?:
use reqwest;
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let content = fetch_url("https://example.com").await?;
println!("{}", content);
Ok(())
}
Blocking Operations
Running CPU-intensive tasks:
use tokio::task;
fn blocking_operation() -> u64 {
// CPU-intensive work
(0..1_000_000).sum()
}
#[tokio::main]
async fn main() {
let result = task::spawn_blocking(|| {
blocking_operation()
}).await.unwrap();
println!("Result: {}", result);
}
Async Traits
Using async-trait crate:
[dependencies]
async-trait = "0.1"
use async_trait::async_trait;
#[async_trait]
trait Repository {
async fn find(&self, id: u64) -> Option<String>;
async fn save(&self, data: String) -> Result<(), String>;
}
struct UserRepository;
#[async_trait]
impl Repository for UserRepository {
async fn find(&self, id: u64) -> Option<String> {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Some(format!("User {}", id))
}
async fn save(&self, data: String) -> Result<(), String> {
println!("Saving: {}", data);
Ok(())
}
}
When to Use This Skill
Use rust-async-patterns when you need to:
- Build async web servers or clients
- Handle concurrent I/O operations efficiently
- Make multiple HTTP requests concurrently
- Implement producer-consumer patterns
- Work with async streams of data
- Manage shared state across async tasks
- Control concurrency limits
- Handle timeouts and cancellation
- Build event-driven systems
- Process data asynchronously
Best Practices
- Use tokio::spawn for CPU-independent tasks
- Use spawn_blocking for CPU-intensive work
- Prefer channels over shared state with locks
- Use select! for racing futures
- Use join! for concurrent independent operations
- Set appropriate timeouts for network operations
- Use Semaphore to limit concurrent operations
- Avoid holding locks across await points
- Use Arc for shared ownership in async context
- Handle errors properly with Result
Common Pitfalls
- Holding std::sync::Mutex across await (use tokio::sync::Mutex)
- Not using move with closures in spawn
- Forgetting to await futures
- Blocking the runtime with CPU-intensive work
- Creating too many tasks without limits
- Not handling cancellation properly
- Using the wrong channel type
- Deadlocks with improper lock ordering
- Not configuring runtime appropriately
- Ignoring errors in spawned tasks