rust-async-patterns

Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "rust-async-patterns" with this command: npx skills add wshobson/agents/wshobson-agents-rust-async-patterns

Rust Async Patterns

Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.

When to Use This Skill

  • Building async Rust applications

  • Implementing concurrent network services

  • Using Tokio for async I/O

  • Handling async errors properly

  • Debugging async code issues

  • Optimizing async performance

Core Concepts

  1. Async Execution Model

Future (lazy) → poll() → Ready(value) | Pending ↑ ↓ Waker ← Runtime schedules

  1. Key Abstractions

Concept Purpose

Future

Lazy computation that may complete later

async fn

Function returning impl Future

await

Suspend until future completes

Task

Spawned future running concurrently

Runtime

Executor that polls futures

Quick Start

Cargo.toml

[dependencies] tokio = { version = "1", features = ["full"] } futures = "0.3" async-trait = "0.1" anyhow = "1.0" tracing = "0.1" tracing-subscriber = "0.3"

use tokio::time::{sleep, Duration}; use anyhow::Result;

#[tokio::main] async fn main() -> Result<()> { // Initialize tracing tracing_subscriber::fmt::init();

// Async operations
let result = fetch_data("https://api.example.com").await?;
println!("Got: {}", result);

Ok(())

}

async fn fetch_data(url: &str) -> Result<String> { // Simulated async operation sleep(Duration::from_millis(100)).await; Ok(format!("Data from {}", url)) }

Patterns

Pattern 1: Concurrent Task Execution

use tokio::task::JoinSet; use anyhow::Result;

// Spawn multiple concurrent tasks async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> { let mut set = JoinSet::new();

for url in urls {
    set.spawn(async move {
        fetch_data(&#x26;url).await
    });
}

let mut results = Vec::new();
while let Some(res) = set.join_next().await {
    match res {
        Ok(Ok(data)) => results.push(data),
        Ok(Err(e)) => tracing::error!("Task failed: {}", e),
        Err(e) => tracing::error!("Join error: {}", e),
    }
}

Ok(results)

}

// With concurrency limit use futures::stream::{self, StreamExt};

async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> { stream::iter(urls) .map(|url| async move { fetch_data(&url).await }) .buffer_unordered(limit) // Max concurrent tasks .collect() .await }

// Select first to complete use tokio::select;

async fn race_requests(url1: &str, url2: &str) -> Result<String> { select! { result = fetch_data(url1) => result, result = fetch_data(url2) => result, } }

Pattern 2: Channels for Communication

use tokio::sync::{mpsc, broadcast, oneshot, watch};

// Multi-producer, single-consumer async fn mpsc_example() { let (tx, mut rx) = mpsc::channel::<String>(100);

// Spawn producer
let tx2 = tx.clone();
tokio::spawn(async move {
    tx2.send("Hello".to_string()).await.unwrap();
});

// Consume
while let Some(msg) = rx.recv().await {
    println!("Got: {}", msg);
}

}

// Broadcast: multi-producer, multi-consumer async fn broadcast_example() { let (tx, _) = broadcast::channel::<String>(100);

let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();

tx.send("Event".to_string()).unwrap();

// Both receivers get the message
let _ = rx1.recv().await;
let _ = rx2.recv().await;

}

// Oneshot: single value, single use async fn oneshot_example() -> String { let (tx, rx) = oneshot::channel::<String>();

tokio::spawn(async move {
    tx.send("Result".to_string()).unwrap();
});

rx.await.unwrap()

}

// Watch: single producer, multi-consumer, latest value async fn watch_example() { let (tx, mut rx) = watch::channel("initial".to_string());

tokio::spawn(async move {
    loop {
        // Wait for changes
        rx.changed().await.unwrap();
        println!("New value: {}", *rx.borrow());
    }
});

tx.send("updated".to_string()).unwrap();

}

Pattern 3: Async Error Handling

use anyhow::{Context, Result, bail}; use thiserror::Error;

#[derive(Error, Debug)] pub enum ServiceError { #[error("Network error: {0}")] Network(#[from] reqwest::Error),

#[error("Database error: {0}")]
Database(#[from] sqlx::Error),

#[error("Not found: {0}")]
NotFound(String),

#[error("Timeout after {0:?}")]
Timeout(std::time::Duration),

}

// Using anyhow for application errors async fn process_request(id: &str) -> Result<Response> { let data = fetch_data(id) .await .context("Failed to fetch data")?;

let parsed = parse_response(&#x26;data)
    .context("Failed to parse response")?;

Ok(parsed)

}

// Using custom errors for library code async fn get_user(id: &str) -> Result<User, ServiceError> { let result = db.query(id).await?;

match result {
    Some(user) => Ok(user),
    None => Err(ServiceError::NotFound(id.to_string())),
}

}

// Timeout wrapper use tokio::time::timeout;

async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError> where F: std::future::Future<Output = Result<T, ServiceError>>, { timeout(duration, future) .await .map_err(|_| ServiceError::Timeout(duration))? }

Pattern 4: Graceful Shutdown

use tokio::signal; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken;

async fn run_server() -> Result<()> { // Method 1: CancellationToken let token = CancellationToken::new(); let token_clone = token.clone();

// Spawn task that respects cancellation
tokio::spawn(async move {
    loop {
        tokio::select! {
            _ = token_clone.cancelled() => {
                tracing::info!("Task shutting down");
                break;
            }
            _ = do_work() => {}
        }
    }
});

// Wait for shutdown signal
signal::ctrl_c().await?;
tracing::info!("Shutdown signal received");

// Cancel all tasks
token.cancel();

// Give tasks time to cleanup
tokio::time::sleep(Duration::from_secs(5)).await;

Ok(())

}

// Method 2: Broadcast channel for shutdown async fn run_with_broadcast() -> Result<()> { let (shutdown_tx, _) = broadcast::channel::<()>(1);

let mut rx = shutdown_tx.subscribe();
tokio::spawn(async move {
    tokio::select! {
        _ = rx.recv() => {
            tracing::info!("Received shutdown");
        }
        _ = async { loop { do_work().await } } => {}
    }
});

signal::ctrl_c().await?;
let _ = shutdown_tx.send(());

Ok(())

}

Pattern 5: Async Traits

use async_trait::async_trait;

#[async_trait] pub trait Repository { async fn get(&self, id: &str) -> Result<Entity>; async fn save(&self, entity: &Entity) -> Result<()>; async fn delete(&self, id: &str) -> Result<()>; }

pub struct PostgresRepository { pool: sqlx::PgPool, }

#[async_trait] impl Repository for PostgresRepository { async fn get(&self, id: &str) -> Result<Entity> { sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id) .fetch_one(&self.pool) .await .map_err(Into::into) }

async fn save(&#x26;self, entity: &#x26;Entity) -> Result&#x3C;()> {
    sqlx::query!(
        "INSERT INTO entities (id, data) VALUES ($1, $2)
         ON CONFLICT (id) DO UPDATE SET data = $2",
        entity.id,
        entity.data
    )
    .execute(&#x26;self.pool)
    .await?;
    Ok(())
}

async fn delete(&#x26;self, id: &#x26;str) -> Result&#x3C;()> {
    sqlx::query!("DELETE FROM entities WHERE id = $1", id)
        .execute(&#x26;self.pool)
        .await?;
    Ok(())
}

}

// Trait object usage async fn process(repo: &dyn Repository, id: &str) -> Result<()> { let entity = repo.get(id).await?; // Process... repo.save(&entity).await }

Pattern 6: Streams and Async Iteration

use futures::stream::{self, Stream, StreamExt}; use async_stream::stream;

// Create stream from async iterator fn numbers_stream() -> impl Stream<Item = i32> { stream! { for i in 0..10 { tokio::time::sleep(Duration::from_millis(100)).await; yield i; } } }

// Process stream async fn process_stream() { let stream = numbers_stream();

// Map and filter
let processed: Vec&#x3C;_> = stream
    .filter(|n| futures::future::ready(*n % 2 == 0))
    .map(|n| n * 2)
    .collect()
    .await;

println!("{:?}", processed);

}

// Chunked processing async fn process_in_chunks() { let stream = numbers_stream();

let mut chunks = stream.chunks(3);

while let Some(chunk) = chunks.next().await {
    println!("Processing chunk: {:?}", chunk);
}

}

// Merge multiple streams async fn merge_streams() { let stream1 = numbers_stream(); let stream2 = numbers_stream();

let merged = stream::select(stream1, stream2);

merged
    .for_each(|n| async move {
        println!("Got: {}", n);
    })
    .await;

}

Pattern 7: Resource Management

use std::sync::Arc; use tokio::sync::{Mutex, RwLock, Semaphore};

// Shared state with RwLock (prefer for read-heavy) struct Cache { data: RwLock<HashMap<String, String>>, }

impl Cache { async fn get(&self, key: &str) -> Option<String> { self.data.read().await.get(key).cloned() }

async fn set(&#x26;self, key: String, value: String) {
    self.data.write().await.insert(key, value);
}

}

// Connection pool with semaphore struct Pool { semaphore: Semaphore, connections: Mutex<Vec<Connection>>, }

impl Pool { fn new(size: usize) -> Self { Self { semaphore: Semaphore::new(size), connections: Mutex::new((0..size).map(|_| Connection::new()).collect()), } }

async fn acquire(&#x26;self) -> PooledConnection&#x3C;'_> {
    let permit = self.semaphore.acquire().await.unwrap();
    let conn = self.connections.lock().await.pop().unwrap();
    PooledConnection { pool: self, conn: Some(conn), _permit: permit }
}

}

struct PooledConnection<'a> { pool: &'a Pool, conn: Option<Connection>, _permit: tokio::sync::SemaphorePermit<'a>, }

impl Drop for PooledConnection<'_> { fn drop(&mut self) { if let Some(conn) = self.conn.take() { let pool = self.pool; tokio::spawn(async move { pool.connections.lock().await.push(conn); }); } } }

Debugging Tips

// Enable tokio-console for runtime debugging // Cargo.toml: tokio = { features = ["tracing"] } // Run: RUSTFLAGS="--cfg tokio_unstable" cargo run // Then: tokio-console

// Instrument async functions use tracing::instrument;

#[instrument(skip(pool))] async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> { tracing::debug!("Fetching user"); // ... }

// Track task spawning let span = tracing::info_span!("worker", id = %worker_id); tokio::spawn(async move { // Enters span when polled }.instrument(span));

Best Practices

Do's

  • Use tokio::select!

  • For racing futures

  • Prefer channels - Over shared state when possible

  • Use JoinSet

  • For managing multiple tasks

  • Instrument with tracing - For debugging async code

  • Handle cancellation - Check CancellationToken

Don'ts

  • Don't block - Never use std::thread::sleep in async

  • Don't hold locks across awaits - Causes deadlocks

  • Don't spawn unboundedly - Use semaphores for limits

  • Don't ignore errors - Propagate with ? or log

  • Don't forget Send bounds - For spawned futures

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

typescript-advanced-types

TypeScript Advanced Types

Repository Source
13.9K31.3Kwshobson
Coding

python-performance-optimization

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

code-review-excellence

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

python-testing-patterns

No summary provided by upstream source.

Repository SourceNeeds Review