real-time-systems

Build real-time communication systems with WebSocket, SSE, and pub/sub patterns. This skill covers connection management, scaling, and production deployment.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "real-time-systems" with this command: npx skills add doanchienthangdev/omgkit/doanchienthangdev-omgkit-real-time-systems

Real-Time Systems

Build real-time communication systems with WebSocket, SSE, and pub/sub patterns. This skill covers connection management, scaling, and production deployment.

Purpose

Implement live features that users expect:

  • Real-time messaging and chat

  • Live notifications and updates

  • Collaborative editing

  • Presence detection

  • Live dashboards and metrics

  • Gaming and interactive experiences

Features

  1. WebSocket Server with Socket.io

import { Server } from 'socket.io'; import { createAdapter } from '@socket.io/redis-adapter'; import { createClient } from 'redis';

// Initialize Socket.io with Redis adapter for scaling async function createSocketServer(httpServer: http.Server) { const io = new Server(httpServer, { cors: { origin: process.env.CLIENT_URL, credentials: true, }, pingTimeout: 60000, pingInterval: 25000, });

// Redis adapter for multi-server deployment const pubClient = createClient({ url: process.env.REDIS_URL }); const subClient = pubClient.duplicate(); await Promise.all([pubClient.connect(), subClient.connect()]); io.adapter(createAdapter(pubClient, subClient));

// Authentication middleware io.use(async (socket, next) => { const token = socket.handshake.auth.token;

if (!token) {
  return next(new Error('Authentication required'));
}

try {
  const user = await verifyToken(token);
  socket.data.user = user;
  next();
} catch (error) {
  next(new Error('Invalid token'));
}

});

// Connection handling io.on('connection', (socket) => { const userId = socket.data.user.id; console.log(User connected: ${userId});

// Join user's personal room
socket.join(`user:${userId}`);

// Handle joining rooms
socket.on('join:room', async (roomId: string) => {
  // Verify access
  const hasAccess = await checkRoomAccess(userId, roomId);
  if (!hasAccess) {
    socket.emit('error', { message: 'Access denied' });
    return;
  }

  socket.join(`room:${roomId}`);
  socket.to(`room:${roomId}`).emit('user:joined', {
    userId,
    username: socket.data.user.name,
  });
});

// Handle messages
socket.on('message:send', async (data: { roomId: string; content: string }) => {
  const message = await saveMessage({
    roomId: data.roomId,
    userId,
    content: data.content,
  });

  io.to(`room:${data.roomId}`).emit('message:new', message);
});

// Typing indicators
socket.on('typing:start', (roomId: string) => {
  socket.to(`room:${roomId}`).emit('typing:user', {
    userId,
    username: socket.data.user.name,
    typing: true,
  });
});

socket.on('typing:stop', (roomId: string) => {
  socket.to(`room:${roomId}`).emit('typing:user', {
    userId,
    typing: false,
  });
});

// Presence
socket.on('presence:update', async (status: 'online' | 'away' | 'busy') => {
  await updatePresence(userId, status);
  io.emit('presence:changed', { userId, status });
});

// Disconnect handling
socket.on('disconnect', async (reason) => {
  console.log(`User disconnected: ${userId}, reason: ${reason}`);
  await updatePresence(userId, 'offline');
  io.emit('presence:changed', { userId, status: 'offline' });
});

});

return io; }

  1. Server-Sent Events (SSE)

import { Router } from 'express';

const router = Router();

// SSE endpoint for notifications router.get('/events/notifications', authenticate, (req, res) => { const userId = req.user.id;

// Set SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering

// Send initial connection event res.write(event: connected\ndata: ${JSON.stringify({ userId })}\n\n);

// Keep-alive ping const pingInterval = setInterval(() => { res.write(: ping\n\n); }, 30000);

// Subscribe to user's notifications const subscription = pubsub.subscribe(notifications:${userId}, (message) => { res.write(event: notification\ndata: ${JSON.stringify(message)}\n\n); });

// Cleanup on disconnect req.on('close', () => { clearInterval(pingInterval); subscription.unsubscribe(); console.log(SSE connection closed for user ${userId}); }); });

// SSE for live updates (e.g., stock prices, metrics) router.get('/events/stream/:channel', authenticate, async (req, res) => { const { channel } = req.params;

res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive');

// Send initial data const initialData = await getChannelData(channel); res.write(event: initial\ndata: ${JSON.stringify(initialData)}\n\n);

// Stream updates const unsubscribe = subscribeToChannel(channel, (update) => { res.write(event: update\ndata: ${JSON.stringify(update)}\n\n); });

// Handle retry on reconnection res.write(retry: 3000\n\n);

req.on('close', () => { unsubscribe(); }); });

// Client-side SSE handling const EventSourceComponent = () => { useEffect(() => { const eventSource = new EventSource('/api/events/notifications', { withCredentials: true, });

eventSource.onopen = () => {
  console.log('SSE connected');
};

eventSource.addEventListener('notification', (event) => {
  const notification = JSON.parse(event.data);
  showNotification(notification);
});

eventSource.onerror = (error) => {
  console.error('SSE error:', error);
  // EventSource auto-reconnects
};

return () => {
  eventSource.close();
};

}, []);

return null; };

  1. Pub/Sub with Redis

import { createClient } from 'redis';

class PubSubService { private publisher: ReturnType<typeof createClient>; private subscriber: ReturnType<typeof createClient>; private handlers: Map<string, Set<(message: any) => void>> = new Map();

async connect() { this.publisher = createClient({ url: process.env.REDIS_URL }); this.subscriber = this.publisher.duplicate();

await Promise.all([
  this.publisher.connect(),
  this.subscriber.connect(),
]);

// Handle incoming messages
this.subscriber.on('message', (channel, message) => {
  const handlers = this.handlers.get(channel);
  if (handlers) {
    const parsed = JSON.parse(message);
    handlers.forEach(handler => handler(parsed));
  }
});

}

async publish(channel: string, message: any): Promise<void> { await this.publisher.publish(channel, JSON.stringify(message)); }

subscribe(channel: string, handler: (message: any) => void): () => void { if (!this.handlers.has(channel)) { this.handlers.set(channel, new Set()); this.subscriber.subscribe(channel); }

this.handlers.get(channel)!.add(handler);

// Return unsubscribe function
return () => {
  const handlers = this.handlers.get(channel);
  if (handlers) {
    handlers.delete(handler);
    if (handlers.size === 0) {
      this.handlers.delete(channel);
      this.subscriber.unsubscribe(channel);
    }
  }
};

}

// Pattern subscription async psubscribe(pattern: string, handler: (channel: string, message: any) => void): Promise<() => void> { await this.subscriber.pSubscribe(pattern, (message, channel) => { handler(channel, JSON.parse(message)); });

return () => {
  this.subscriber.pUnsubscribe(pattern);
};

} }

const pubsub = new PubSubService();

// Usage in services class NotificationService { async sendNotification(userId: string, notification: Notification): Promise<void> { // Save to database await db.notification.create({ data: { ...notification, userId } });

// Publish to real-time channel
await pubsub.publish(`notifications:${userId}`, notification);

}

async broadcastToRoom(roomId: string, event: string, data: any): Promise<void> { await pubsub.publish(room:${roomId}, { event, data }); } }

  1. Presence System

interface PresenceData { status: 'online' | 'away' | 'busy' | 'offline'; lastSeen: Date; socketIds: string[]; }

class PresenceService { private redis: ReturnType<typeof createClient>; private readonly PRESENCE_TTL = 300; // 5 minutes

async setPresence(userId: string, socketId: string, status: string): Promise<void> { const key = presence:${userId};

// Use MULTI for atomic operations
await this.redis.multi()
  .hSet(key, {
    status,
    lastSeen: Date.now().toString(),
  })
  .sAdd(`${key}:sockets`, socketId)
  .expire(key, this.PRESENCE_TTL)
  .exec();

// Publish presence change
await pubsub.publish('presence:updates', {
  userId,
  status,
  lastSeen: new Date(),
});

}

async removeSocket(userId: string, socketId: string): Promise<void> { const key = presence:${userId};

await this.redis.sRem(`${key}:sockets`, socketId);
const remaining = await this.redis.sCard(`${key}:sockets`);

if (remaining === 0) {
  await this.redis.hSet(key, 'status', 'offline');
  await pubsub.publish('presence:updates', {
    userId,
    status: 'offline',
    lastSeen: new Date(),
  });
}

}

async getPresence(userId: string): Promise<PresenceData | null> { const key = presence:${userId}; const data = await this.redis.hGetAll(key);

if (!data.status) return null;

return {
  status: data.status as PresenceData['status'],
  lastSeen: new Date(parseInt(data.lastSeen)),
  socketIds: await this.redis.sMembers(`${key}:sockets`),
};

}

async getMultiplePresence(userIds: string[]): Promise<Map<string, PresenceData>> { const pipeline = this.redis.multi();

userIds.forEach(id => {
  pipeline.hGetAll(`presence:${id}`);
});

const results = await pipeline.exec();
const presenceMap = new Map&#x3C;string, PresenceData>();

userIds.forEach((id, index) => {
  const data = results[index] as Record&#x3C;string, string>;
  if (data?.status) {
    presenceMap.set(id, {
      status: data.status as PresenceData['status'],
      lastSeen: new Date(parseInt(data.lastSeen)),
      socketIds: [],
    });
  }
});

return presenceMap;

} }

  1. Connection Recovery

// Client-side reconnection logic class ReconnectingWebSocket { private ws: WebSocket | null = null; private reconnectAttempts = 0; private maxReconnectAttempts = 10; private reconnectInterval = 1000; private messageQueue: any[] = [];

constructor( private url: string, private options: { onMessage: (data: any) => void; onConnect: () => void; onDisconnect: () => void; } ) { this.connect(); }

private connect(): void { this.ws = new WebSocket(this.url);

this.ws.onopen = () => {
  console.log('WebSocket connected');
  this.reconnectAttempts = 0;
  this.options.onConnect();

  // Flush queued messages
  while (this.messageQueue.length > 0) {
    const msg = this.messageQueue.shift();
    this.send(msg);
  }
};

this.ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  this.options.onMessage(data);
};

this.ws.onclose = (event) => {
  console.log('WebSocket closed:', event.code, event.reason);
  this.options.onDisconnect();
  this.scheduleReconnect();
};

this.ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

}

private scheduleReconnect(): void { if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('Max reconnection attempts reached'); return; }

const delay = Math.min(
  this.reconnectInterval * Math.pow(2, this.reconnectAttempts),
  30000 // Max 30 seconds
);

console.log(`Reconnecting in ${delay}ms...`);

setTimeout(() => {
  this.reconnectAttempts++;
  this.connect();
}, delay);

}

send(data: any): void { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(data)); } else { // Queue message for when connection is restored this.messageQueue.push(data); } }

close(): void { this.maxReconnectAttempts = 0; // Prevent reconnection this.ws?.close(); } }

// Server-side missed message recovery class MessageRecovery { async getMessagesSince(roomId: string, lastMessageId: string): Promise<Message[]> { // Fetch messages after the last seen message return db.message.findMany({ where: { roomId, id: { gt: lastMessageId }, }, orderBy: { createdAt: 'asc' }, take: 100, // Limit recovery batch }); }

async recoverClientState(userId: string, lastSyncTimestamp: number): Promise<{ messages: Message[]; notifications: Notification[]; presenceUpdates: PresenceUpdate[]; }> { const since = new Date(lastSyncTimestamp);

return {
  messages: await this.getUnreadMessages(userId, since),
  notifications: await this.getUnreadNotifications(userId, since),
  presenceUpdates: await this.getPresenceChanges(since),
};

} }

  1. Scaling WebSockets

// Horizontal scaling with sticky sessions // nginx.conf upstream websocket_servers { ip_hash; // Sticky sessions server ws1.example.com:3000; server ws2.example.com:3000; server ws3.example.com:3000; }

server { location /socket.io/ { proxy_pass http://websocket_servers; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_read_timeout 86400; } }

// Broadcasting across servers class ScaledBroadcaster { async broadcastToRoom(roomId: string, event: string, data: any): Promise<void> { // Publish to Redis - all servers will receive await pubsub.publish(broadcast:room:${roomId}, { event, data, timestamp: Date.now(), }); }

// Each server subscribes and emits locally setupBroadcastListener(io: Server): void { pubsub.psubscribe('broadcast:*', (channel, message) => { const [, type, id] = channel.split(':');

  if (type === 'room') {
    io.to(`room:${id}`).emit(message.event, message.data);
  } else if (type === 'user') {
    io.to(`user:${id}`).emit(message.event, message.data);
  }
});

} }

Use Cases

  1. Chat Application

// Real-time chat with typing indicators and read receipts socket.on('chat:message', async (data) => { const message = await createMessage(data); io.to(room:${data.roomId}).emit('chat:message', message); });

socket.on('chat:read', async ({ roomId, messageId }) => { await markAsRead(socket.data.user.id, roomId, messageId); socket.to(room:${roomId}).emit('chat:read', { userId: socket.data.user.id, messageId, }); });

  1. Live Dashboard

// Real-time metrics with SSE setInterval(async () => { const metrics = await gatherMetrics(); await pubsub.publish('dashboard:metrics', metrics); }, 5000);

Best Practices

Do's

  • Implement heartbeat/ping - Detect dead connections

  • Handle reconnection gracefully - Queue messages, recover state

  • Use rooms for scaling - Don't broadcast to all

  • Implement backpressure - Handle slow clients

  • Plan for offline scenarios - Message queuing

  • Monitor connection metrics - Track active connections

Don'ts

  • Don't trust client data without validation

  • Don't skip authentication

  • Don't broadcast sensitive data

  • Don't ignore connection limits

  • Don't forget cleanup on disconnect

  • Don't use WebSocket for everything

Related Skills

  • redis - Pub/sub and state management

  • backend-development - Server architecture

  • api-architecture - REST fallbacks

Reference Resources

  • Socket.io Documentation

  • WebSocket MDN

  • SSE MDN

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

database-optimization

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

postgresql

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

building-nestjs-apis

No summary provided by upstream source.

Repository SourceNeeds Review