salvo-realtime

Implement real-time features using WebSocket and Server-Sent Events (SSE). Use for chat applications, live updates, notifications, and bidirectional communication.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "salvo-realtime" with this command: npx skills add salvo-rs/salvo-skills/salvo-rs-salvo-skills-salvo-realtime

Salvo Real-time Communication

This skill provides an overview of real-time communication options in Salvo. For detailed implementations, see the dedicated skills:

  • salvo-websocket: Full-duplex bidirectional communication
  • salvo-sse: Server-to-client event streaming

Choosing Between WebSocket and SSE

FeatureWebSocketSSE
DirectionBidirectionalServer → Client only
ProtocolCustom protocolHTTP
ReconnectionManualAutomatic
Binary dataYesNo (text only)
Browser supportAll modernAll modern
Firewall friendlyMay have issuesYes (standard HTTP)
ComplexityHigherLower

When to Use WebSocket

  • Chat applications
  • Online gaming
  • Collaborative editing
  • Trading platforms
  • Any bidirectional real-time data

When to Use SSE

  • Live notifications
  • News feeds
  • Stock tickers
  • Progress updates
  • Server monitoring dashboards

Quick WebSocket Example

use salvo::prelude::*;
use salvo::websocket::WebSocketUpgrade;

#[handler]
async fn ws_handler(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
    WebSocketUpgrade::new()
        .upgrade(req, res, |mut ws| async move {
            while let Some(msg) = ws.recv().await {
                let msg = match msg {
                    Ok(msg) => msg,
                    Err(_) => return,
                };
                if ws.send(msg).await.is_err() {
                    return;
                }
            }
        })
        .await
}

#[tokio::main]
async fn main() {
    let router = Router::new()
        .push(Router::with_path("ws").goal(ws_handler));

    let acceptor = TcpListener::new("0.0.0.0:8080").bind().await;
    Server::new(acceptor).serve(router).await;
}

Quick SSE Example

use std::convert::Infallible;
use std::time::Duration;
use futures_util::StreamExt;
use salvo::prelude::*;
use salvo::sse::{self, SseEvent};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

#[handler]
async fn sse_handler(res: &mut Response) {
    let event_stream = {
        let mut counter: u64 = 0;
        let interval = interval(Duration::from_secs(1));
        let stream = IntervalStream::new(interval);

        stream.map(move |_| {
            counter += 1;
            Ok::<_, Infallible>(SseEvent::default().text(counter.to_string()))
        })
    };

    sse::stream(res, event_stream);
}

#[tokio::main]
async fn main() {
    let router = Router::new()
        .push(Router::with_path("events").get(sse_handler));

    let acceptor = TcpListener::new("0.0.0.0:8080").bind().await;
    Server::new(acceptor).serve(router).await;
}

Real-time Architecture Patterns

Broadcasting to Multiple Clients

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use salvo::websocket::Message;

type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;

async fn broadcast(users: &Users, sender_id: usize, message: &str) {
    let formatted = format!("User {}: {}", sender_id, message);
    let users = users.read().await;

    for (&uid, tx) in users.iter() {
        if uid != sender_id {
            let _ = tx.send(Message::text(formatted.clone()));
        }
    }
}

Room-Based Messaging

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

type Rooms = Arc<RwLock<HashMap<String, Vec<UserId>>>>;

async fn join_room(rooms: &Rooms, room: &str, user_id: UserId) {
    rooms.write().await
        .entry(room.to_string())
        .or_default()
        .push(user_id);
}

async fn leave_room(rooms: &Rooms, room: &str, user_id: UserId) {
    if let Some(users) = rooms.write().await.get_mut(room) {
        users.retain(|&id| id != user_id);
    }
}

Pub/Sub with Broadcast Channels

use tokio::sync::broadcast;

#[derive(Clone)]
struct PubSub {
    sender: broadcast::Sender<String>,
}

impl PubSub {
    fn new(capacity: usize) -> Self {
        let (sender, _) = broadcast::channel(capacity);
        Self { sender }
    }

    fn publish(&self, message: String) {
        let _ = self.sender.send(message);
    }

    fn subscribe(&self) -> broadcast::Receiver<String> {
        self.sender.subscribe()
    }
}

Client-Side Examples

WebSocket Client (JavaScript)

const ws = new WebSocket('ws://localhost:8080/ws');

ws.onopen = () => console.log('Connected');
ws.onmessage = (e) => console.log('Received:', e.data);
ws.onclose = () => console.log('Disconnected');
ws.onerror = (e) => console.error('Error:', e);

// Send message
ws.send('Hello, Server!');

// Close connection
ws.close();

SSE Client (JavaScript)

const source = new EventSource('http://localhost:8080/events');

source.onopen = () => console.log('Connected');
source.onmessage = (e) => console.log('Message:', e.data);
source.onerror = (e) => console.error('Error:', e);

// Named events
source.addEventListener('notification', (e) => {
    console.log('Notification:', e.data);
});

// Close connection
source.close();

Combining WebSocket and SSE

Some applications benefit from using both:

let router = Router::new()
    // WebSocket for bidirectional chat
    .push(Router::with_path("chat").goal(ws_chat_handler))
    // SSE for notifications (one-way)
    .push(Router::with_path("notifications").get(sse_notifications))
    // SSE for live data feeds
    .push(Router::with_path("feed").get(sse_feed));

Connection Management

Track Active Connections

use std::sync::atomic::{AtomicUsize, Ordering};

static ACTIVE_CONNECTIONS: AtomicUsize = AtomicUsize::new(0);

fn on_connect() {
    let count = ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed) + 1;
    tracing::info!("Connection opened. Active: {}", count);
}

fn on_disconnect() {
    let count = ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed) - 1;
    tracing::info!("Connection closed. Active: {}", count);
}

Heartbeat / Keep-Alive

use std::time::Duration;
use tokio::time::interval;

async fn heartbeat_task(tx: Sender<Message>) {
    let mut ticker = interval(Duration::from_secs(30));
    loop {
        ticker.tick().await;
        if tx.send(Message::ping(vec![])).await.is_err() {
            break;
        }
    }
}

Best Practices

WebSocket

  1. Handle disconnections gracefully: Clean up user state
  2. Implement ping/pong: Detect dead connections
  3. Use message queues: Buffer messages for slow clients
  4. Authenticate before upgrade: Verify tokens in query params or headers
  5. Limit message size: Prevent memory exhaustion
  6. Use binary for efficiency: When sending structured data

SSE

  1. Use keep-alive: Prevent connection timeout
  2. Include event IDs: Enable reconnection from last event
  3. Set retry interval: Guide client reconnection behavior
  4. Use named events: Organize different message types
  5. Handle client disconnects: Clean up server resources

General

  1. Monitor connections: Track active connection count
  2. Implement rate limiting: Prevent abuse
  3. Use compression: For large messages
  4. Log connection events: Debug connection issues
  5. Test at scale: Verify behavior with many concurrent connections
  6. Consider horizontal scaling: Use Redis/message queues for multi-server

See Also

  • salvo-websocket: Detailed WebSocket implementation guide
  • salvo-sse: Detailed SSE implementation guide

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

salvo-tls-acme

No summary provided by upstream source.

Repository SourceNeeds Review
General

salvo-concurrency-limiter

No summary provided by upstream source.

Repository SourceNeeds Review
General

salvo-static-files

No summary provided by upstream source.

Repository SourceNeeds Review
General

salvo-middleware

No summary provided by upstream source.

Repository SourceNeeds Review