Tokio Networking Patterns
This skill provides network programming patterns for building production-grade services with the Tokio ecosystem.
HTTP Service with Hyper and Axum
Build HTTP services with routing and middleware:
use axum::{ Router, routing::{get, post}, extract::{State, Path, Json}, response::IntoResponse, middleware, }; use std::sync::Arc;
#[derive(Clone)] struct AppState { db: Arc<Database>, cache: Arc<Cache>, }
async fn create_app() -> Router { let state = AppState { db: Arc::new(Database::new().await), cache: Arc::new(Cache::new()), };
Router::new()
.route("/health", get(health_check))
.route("/api/v1/users", get(list_users).post(create_user))
.route("/api/v1/users/:id", get(get_user).delete(delete_user))
.layer(middleware::from_fn(logging_middleware))
.layer(middleware::from_fn(auth_middleware))
.with_state(state)
}
async fn health_check() -> impl IntoResponse { "OK" }
async fn get_user( State(state): State<AppState>, Path(id): Path<u64>, ) -> Result<Json<User>, StatusCode> { state.db.get_user(id) .await .map(Json) .ok_or(StatusCode::NOT_FOUND) }
async fn logging_middleware<B>( req: Request<B>, next: Next<B>, ) -> impl IntoResponse { let method = req.method().clone(); let uri = req.uri().clone();
let start = Instant::now();
let response = next.run(req).await;
let duration = start.elapsed();
tracing::info!(
method = %method,
uri = %uri,
status = %response.status(),
duration_ms = duration.as_millis(),
"request completed"
);
response
}
gRPC Service with Tonic
Build type-safe gRPC services:
use tonic::{transport::Server, Request, Response, Status};
pub mod proto { tonic::include_proto!("myservice"); }
use proto::my_service_server::{MyService, MyServiceServer};
#[derive(Default)] pub struct MyServiceImpl { db: Arc<Database>, }
#[tonic::async_trait] impl MyService for MyServiceImpl { async fn get_user( &self, request: Request<proto::GetUserRequest>, ) -> Result<Response<proto::User>, Status> { let req = request.into_inner();
let user = self.db.get_user(req.id)
.await
.map_err(|e| Status::internal(e.to_string()))?
.ok_or_else(|| Status::not_found("User not found"))?;
Ok(Response::new(proto::User {
id: user.id,
name: user.name,
email: user.email,
}))
}
type ListUsersStream = ReceiverStream<Result<proto::User, Status>>;
async fn list_users(
&self,
request: Request<proto::ListUsersRequest>,
) -> Result<Response<Self::ListUsersStream>, Status> {
let (tx, rx) = mpsc::channel(100);
let db = self.db.clone();
tokio::spawn(async move {
let mut users = db.list_users().await.unwrap();
while let Some(user) = users.next().await {
let proto_user = proto::User {
id: user.id,
name: user.name,
email: user.email,
};
if tx.send(Ok(proto_user)).await.is_err() {
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
async fn serve() -> Result<(), Box<dyn std::error::Error>> { let addr = "[::1]:50051".parse()?; let service = MyServiceImpl::default();
Server::builder()
.add_service(MyServiceServer::new(service))
.serve(addr)
.await?;
Ok(())
}
Tower Middleware Composition
Layer middleware for cross-cutting concerns:
use tower::{ServiceBuilder, Service}; use tower_http::{ trace::TraceLayer, compression::CompressionLayer, timeout::TimeoutLayer, limit::RateLimitLayer, }; use std::time::Duration;
fn create_middleware_stack<S>(service: S) -> impl Service where S: Service + Clone, { ServiceBuilder::new() // Outermost layer (executed first) .layer(TraceLayer::new_for_http()) .layer(CompressionLayer::new()) .layer(TimeoutLayer::new(Duration::from_secs(30))) .layer(RateLimitLayer::new(100, Duration::from_secs(1))) // Innermost layer (executed last) .service(service) }
// Custom middleware use tower::Layer;
#[derive(Clone)] struct MetricsLayer { metrics: Arc<Metrics>, }
impl<S> Layer<S> for MetricsLayer { type Service = MetricsService<S>;
fn layer(&self, inner: S) -> Self::Service {
MetricsService {
inner,
metrics: self.metrics.clone(),
}
}
}
#[derive(Clone)] struct MetricsService<S> { inner: S, metrics: Arc<Metrics>, }
impl<S, Req> Service<Req> for MetricsService<S> where S: Service<Req>, { type Response = S::Response; type Error = S::Error; type Future = /* ... */;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Req) -> Self::Future {
self.metrics.requests_total.inc();
let timer = self.metrics.request_duration.start_timer();
let future = self.inner.call(req);
let metrics = self.metrics.clone();
Box::pin(async move {
let result = future.await;
timer.observe_duration();
result
})
}
}
Connection Pooling
Manage connection pools efficiently:
use deadpool_postgres::{Config, Pool, Runtime}; use tokio_postgres::NoTls;
pub struct DatabasePool { pool: Pool, }
impl DatabasePool { pub async fn new(config: &DatabaseConfig) -> Result<Self, Error> { let mut cfg = Config::new(); cfg.host = Some(config.host.clone()); cfg.port = Some(config.port); cfg.dbname = Some(config.database.clone()); cfg.user = Some(config.user.clone()); cfg.password = Some(config.password.clone());
let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls)?;
Ok(Self { pool })
}
pub async fn get(&self) -> Result<Client, Error> {
self.pool.get().await.map_err(Into::into)
}
pub async fn query<T>(&self, f: impl FnOnce(&Client) -> F) -> Result<T, Error>
where
F: Future<Output = Result<T, Error>>,
{
let client = self.get().await?;
f(&client).await
}
}
// Usage let pool = DatabasePool::new(&config).await?;
let users = pool.query(|client| async move { client.query("SELECT * FROM users", &[]) .await .map_err(Into::into) }).await?;
Health Checks and Readiness Probes
Implement comprehensive health checks:
use axum::{Router, routing::get, Json}; use serde::Serialize;
#[derive(Serialize)] struct HealthResponse { status: String, version: String, dependencies: Vec<DependencyHealth>, }
#[derive(Serialize)] struct DependencyHealth { name: String, status: String, latency_ms: Option<u64>, message: Option<String>, }
async fn health_check(State(state): State<Arc<AppState>>) -> Json<HealthResponse> { let mut dependencies = Vec::new();
// Check database
let db_start = Instant::now();
let db_status = match state.db.ping().await {
Ok(_) => DependencyHealth {
name: "database".into(),
status: "healthy".into(),
latency_ms: Some(db_start.elapsed().as_millis() as u64),
message: None,
},
Err(e) => DependencyHealth {
name: "database".into(),
status: "unhealthy".into(),
latency_ms: None,
message: Some(e.to_string()),
},
};
dependencies.push(db_status);
// Check cache
let cache_start = Instant::now();
let cache_status = match state.cache.ping().await {
Ok(_) => DependencyHealth {
name: "cache".into(),
status: "healthy".into(),
latency_ms: Some(cache_start.elapsed().as_millis() as u64),
message: None,
},
Err(e) => DependencyHealth {
name: "cache".into(),
status: "unhealthy".into(),
latency_ms: None,
message: Some(e.to_string()),
},
};
dependencies.push(cache_status);
let all_healthy = dependencies.iter().all(|d| d.status == "healthy");
Json(HealthResponse {
status: if all_healthy { "healthy" } else { "unhealthy" }.into(),
version: env!("CARGO_PKG_VERSION").into(),
dependencies,
})
}
async fn readiness_check(State(state): State<Arc<AppState>>) -> StatusCode { if state.is_ready().await { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE } }
pub fn health_routes() -> Router<Arc<AppState>> { Router::new() .route("/health", get(health_check)) .route("/ready", get(readiness_check)) .route("/live", get(|| async { StatusCode::OK })) }
Circuit Breaker Pattern
Protect against cascading failures:
use std::sync::atomic::{AtomicU64, Ordering};
pub struct ServiceClient { client: reqwest::Client, circuit_breaker: CircuitBreaker, }
impl ServiceClient { pub async fn call(&self, req: Request) -> Result<Response, Error> { self.circuit_breaker.call(async { self.client .execute(req) .await .map_err(Into::into) }).await } }
Load Balancing
Distribute requests across multiple backends:
use tower::balance::p2c::Balance; use tower::discover::ServiceList;
pub struct LoadBalancer { balancer: Balance<ServiceList<Vec<ServiceEndpoint>>, Request>, }
impl LoadBalancer { pub fn new(endpoints: Vec<String>) -> Self { let services: Vec<_> = endpoints .into_iter() .map(|endpoint| create_client(endpoint)) .collect();
let balancer = Balance::new(ServiceList::new(services));
Self { balancer }
}
pub async fn call(&mut self, req: Request) -> Result<Response, Error> {
self.balancer.call(req).await
}
}
Request Deduplication
Deduplicate concurrent identical requests:
use tokio::sync::Mutex; use std::collections::HashMap;
pub struct RequestDeduplicator<K, V> { in_flight: Arc<Mutex<HashMap<K, Arc<tokio::sync::Notify>>>>, cache: Arc<Mutex<HashMap<K, Arc<V>>>>, }
impl<K: Eq + Hash + Clone, V> RequestDeduplicator<K, V> { pub fn new() -> Self { Self { in_flight: Arc::new(Mutex::new(HashMap::new())), cache: Arc::new(Mutex::new(HashMap::new())), } }
pub async fn get_or_fetch<F, Fut>(
&self,
key: K,
fetch: F,
) -> Result<Arc<V>, Error>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<V, Error>>,
{
// Check cache
{
let cache = self.cache.lock().await;
if let Some(value) = cache.get(&key) {
return Ok(value.clone());
}
}
// Check if request is in flight
let notify = {
let mut in_flight = self.in_flight.lock().await;
if let Some(notify) = in_flight.get(&key) {
notify.clone()
} else {
let notify = Arc::new(tokio::sync::Notify::new());
in_flight.insert(key.clone(), notify.clone());
notify
}
};
// Wait if another request is in progress
notify.notified().await;
// Check cache again
{
let cache = self.cache.lock().await;
if let Some(value) = cache.get(&key) {
return Ok(value.clone());
}
}
// Fetch value
let value = Arc::new(fetch().await?);
// Update cache
{
let mut cache = self.cache.lock().await;
cache.insert(key.clone(), value.clone());
}
// Remove from in-flight and notify
{
let mut in_flight = self.in_flight.lock().await;
in_flight.remove(&key);
}
notify.notify_waiters();
Ok(value)
}
}
Best Practices
-
Use connection pooling for database and HTTP connections
-
Implement health checks for all dependencies
-
Add circuit breakers for external service calls
-
Use appropriate timeouts for all network operations
-
Implement retry logic with exponential backoff
-
Add comprehensive middleware for logging, metrics, auth
-
Use load balancing for high availability
-
Deduplicate requests to reduce load
-
Monitor latency and error rates
-
Design for graceful degradation when services fail