websocket-realtime

WebSocket Real-time Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "websocket-realtime" with this command: npx skills add maneeshanif/todo-spec-driven/maneeshanif-todo-spec-driven-websocket-realtime

WebSocket Real-time Skill

Quick Start

  • Read Phase 5 Constitution - constitution-prompt-phase-5.md

  • Create WebSocket service - New microservice on port 8005

  • Subscribe to Kafka events - Via Dapr pub/sub

  • Broadcast to clients - WebSocket connections per user

  • Update frontend - Connect to WebSocket service

  • Handle reconnection - Automatic reconnect logic

Architecture

┌─────────────┐ ┌───────────────┐ ┌─────────────────┐ │ Backend │────▶│ Kafka │────▶│ WebSocket Svc │ │ (Events) │ │ task-updates │ │ (Port 8005) │ └─────────────┘ └───────────────┘ └────────┬────────┘ │ WebSocket Connections │ ┌──────────────────────────────┼──────────────────────────────┐ │ │ │ ┌─────▼─────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ Client 1 │ │ Client 2 │ │ Client 3 │ │ (User A) │ │ (User A) │ │ (User B) │ └───────────┘ └─────────────┘ └─────────────┘

WebSocket Service Implementation

Project Structure

services/websocket/ ├── src/ │ ├── init.py │ ├── main.py # FastAPI app with WebSocket │ ├── connection.py # Connection manager │ ├── events.py # Event handlers │ └── auth.py # Token validation ├── pyproject.toml └── Dockerfile

Main Application

services/websocket/src/main.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends from fastapi.middleware.cors import CORSMiddleware from dapr.ext.fastapi import DaprApp import json

from .connection import ConnectionManager from .auth import verify_token

app = FastAPI(title="WebSocket Service") dapr_app = DaprApp(app) manager = ConnectionManager()

app.add_middleware( CORSMiddleware, allow_origins=[""], allow_credentials=True, allow_methods=[""], allow_headers=["*"], )

@app.websocket("/ws") async def websocket_endpoint( websocket: WebSocket, token: str = Query(...) ): """WebSocket endpoint for real-time updates.""" # Verify JWT token user_id = await verify_token(token) if not user_id: await websocket.close(code=4001) return

await manager.connect(websocket, user_id)
try:
    while True:
        # Keep connection alive, handle client messages
        data = await websocket.receive_text()
        message = json.loads(data)

        if message.get("type") == "ping":
            await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
    manager.disconnect(websocket, user_id)

Dapr subscription for task events

@dapr_app.subscribe(pubsub="taskpubsub", topic="task-updates") async def handle_task_update(event: dict): """Handle task update events from Kafka.""" user_id = event.get("user_id") event_type = event.get("event_type") task_data = event.get("task")

# Broadcast to all connections for this user
await manager.broadcast_to_user(user_id, {
    "type": "task_update",
    "event": event_type,
    "task": task_data
})

@app.get("/health") async def health_check(): return {"status": "healthy", "connections": manager.active_connections_count}

Connection Manager

services/websocket/src/connection.py

from fastapi import WebSocket from collections import defaultdict import asyncio import logging

logger = logging.getLogger(name)

class ConnectionManager: """Manage WebSocket connections per user."""

def __init__(self):
    # user_id -> list of WebSocket connections
    self.active_connections: dict[str, list[WebSocket]] = defaultdict(list)
    self._lock = asyncio.Lock()

async def connect(self, websocket: WebSocket, user_id: str):
    """Accept and store a new WebSocket connection."""
    await websocket.accept()
    async with self._lock:
        self.active_connections[user_id].append(websocket)
    logger.info(f"User {user_id} connected. Total connections: {self.active_connections_count}")

def disconnect(self, websocket: WebSocket, user_id: str):
    """Remove a WebSocket connection."""
    if user_id in self.active_connections:
        if websocket in self.active_connections[user_id]:
            self.active_connections[user_id].remove(websocket)
        if not self.active_connections[user_id]:
            del self.active_connections[user_id]
    logger.info(f"User {user_id} disconnected. Total connections: {self.active_connections_count}")

async def broadcast_to_user(self, user_id: str, message: dict):
    """Send message to all connections for a specific user."""
    if user_id not in self.active_connections:
        return

    disconnected = []
    for connection in self.active_connections[user_id]:
        try:
            await connection.send_json(message)
        except Exception as e:
            logger.error(f"Failed to send to user {user_id}: {e}")
            disconnected.append(connection)

    # Clean up disconnected
    for conn in disconnected:
        self.disconnect(conn, user_id)

async def broadcast_to_all(self, message: dict):
    """Send message to all connected users."""
    for user_id in list(self.active_connections.keys()):
        await self.broadcast_to_user(user_id, message)

@property
def active_connections_count(self) -> int:
    """Total number of active connections."""
    return sum(len(conns) for conns in self.active_connections.values())

Token Verification

services/websocket/src/auth.py

import jwt import os from typing import Optional

SECRET_KEY = os.getenv("BETTER_AUTH_SECRET", "") ALGORITHM = "HS256"

async def verify_token(token: str) -> Optional[str]: """Verify JWT token and return user_id.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload.get("sub") # user_id except jwt.InvalidTokenError: return None

Frontend WebSocket Client

WebSocket Hook

// frontend/lib/websocket/use-websocket.ts import { useEffect, useRef, useCallback, useState } from "react"; import { useAuthStore } from "@/stores/auth-store"; import { useTaskStore } from "@/stores/task-store";

interface WebSocketMessage { type: string; event?: string; task?: Task; }

export function useWebSocket() { const { token } = useAuthStore(); const { updateTask, addTask, removeTask } = useTaskStore(); const ws = useRef<WebSocket | null>(null); const [isConnected, setIsConnected] = useState(false); const reconnectAttempts = useRef(0); const maxReconnectAttempts = 5;

const connect = useCallback(() => { if (!token) return;

const wsUrl = `${process.env.NEXT_PUBLIC_WS_URL}/ws?token=${token}`;
ws.current = new WebSocket(wsUrl);

ws.current.onopen = () => {
  console.log("WebSocket connected");
  setIsConnected(true);
  reconnectAttempts.current = 0;
};

ws.current.onmessage = (event) => {
  const message: WebSocketMessage = JSON.parse(event.data);
  handleMessage(message);
};

ws.current.onclose = () => {
  console.log("WebSocket disconnected");
  setIsConnected(false);

  // Attempt reconnection with exponential backoff
  if (reconnectAttempts.current &#x3C; maxReconnectAttempts) {
    const delay = Math.pow(2, reconnectAttempts.current) * 1000;
    reconnectAttempts.current++;
    setTimeout(connect, delay);
  }
};

ws.current.onerror = (error) => {
  console.error("WebSocket error:", error);
};

}, [token]);

const handleMessage = (message: WebSocketMessage) => { if (message.type === "task_update" && message.task) { switch (message.event) { case "task.created": addTask(message.task); break; case "task.updated": updateTask(message.task); break; case "task.deleted": removeTask(message.task.id); break; case "task.completed": updateTask({ ...message.task, status: "completed" }); break; } } };

// Heartbeat to keep connection alive useEffect(() => { if (!isConnected) return;

const interval = setInterval(() => {
  if (ws.current?.readyState === WebSocket.OPEN) {
    ws.current.send(JSON.stringify({ type: "ping" }));
  }
}, 30000); // Every 30 seconds

return () => clearInterval(interval);

}, [isConnected]);

// Connect on mount useEffect(() => { connect(); return () => { ws.current?.close(); }; }, [connect]);

return { isConnected }; }

WebSocket Provider

// frontend/providers/websocket-provider.tsx "use client";

import { createContext, useContext, ReactNode } from "react"; import { useWebSocket } from "@/lib/websocket/use-websocket";

interface WebSocketContextType { isConnected: boolean; }

const WebSocketContext = createContext<WebSocketContextType>({ isConnected: false, });

export function WebSocketProvider({ children }: { children: ReactNode }) { const { isConnected } = useWebSocket();

return ( <WebSocketContext.Provider value={{ isConnected }}> {children} </WebSocketContext.Provider> ); }

export function useWebSocketStatus() { return useContext(WebSocketContext); }

Connection Status Indicator

// frontend/components/websocket/connection-status.tsx "use client";

import { useWebSocketStatus } from "@/providers/websocket-provider"; import { Wifi, WifiOff } from "lucide-react";

export function ConnectionStatus() { const { isConnected } = useWebSocketStatus();

return ( <div className="flex items-center gap-2"> {isConnected ? ( <> <Wifi className="h-4 w-4 text-green-500" /> <span className="text-sm text-green-500">Connected</span> </> ) : ( <> <WifiOff className="h-4 w-4 text-red-500" /> <span className="text-sm text-red-500">Disconnected</span> </> )} </div> ); }

Backend Event Publishing

backend/src/services/task_service.py

from dapr.clients import DaprClient

async def publish_task_update(event_type: str, task: Task): """Publish task update for real-time sync.""" with DaprClient() as client: client.publish_event( pubsub_name="taskpubsub", topic_name="task-updates", data={ "event_type": event_type, "user_id": str(task.user_id), "task": task.model_dump(mode="json") } )

Kubernetes Deployment

k8s/websocket-service.yaml

apiVersion: apps/v1 kind: Deployment metadata: name: websocket-service namespace: todo-app spec: replicas: 2 selector: matchLabels: app: websocket-service template: metadata: labels: app: websocket-service annotations: dapr.io/enabled: "true" dapr.io/app-id: "websocket-service" dapr.io/app-port: "8005" spec: containers: - name: websocket-service image: evolution-todo/websocket-service:latest ports: - containerPort: 8005 env: - name: BETTER_AUTH_SECRET valueFrom: secretKeyRef: name: todo-secrets key: better-auth-secret

apiVersion: v1 kind: Service metadata: name: websocket-service namespace: todo-app spec: selector: app: websocket-service ports: - port: 8005 targetPort: 8005

Dapr Configuration

dapr-components/subscription.yaml

apiVersion: dapr.io/v2alpha1 kind: Subscription metadata: name: task-updates-subscription spec: pubsubname: taskpubsub topic: task-updates routes: default: /task-updates scopes: - websocket-service

Verification Checklist

  • WebSocket service created on port 8005

  • Connection manager handles multiple users

  • Token verification working

  • Dapr subscription configured

  • Frontend WebSocket hook created

  • Automatic reconnection working

  • Heartbeat/ping-pong implemented

  • Connection status indicator displayed

  • Real-time updates propagate to all clients

  • Service deployed to Kubernetes

Troubleshooting

Issue Cause Solution

Connection refused Wrong URL Check WS_URL env var

Auth failed Invalid token Verify token format

No updates received Dapr not connected Check dapr sidecar logs

Connection drops No heartbeat Implement ping/pong

High latency Too many connections Scale horizontally

References

  • FastAPI WebSockets

  • Dapr Pub/Sub

  • WebSocket API (MDN)

  • Phase 5 Constitution

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

dapr-integration

No summary provided by upstream source.

Repository SourceNeeds Review
General

conversation-management

No summary provided by upstream source.

Repository SourceNeeds Review
General

chatkit-frontend

No summary provided by upstream source.

Repository SourceNeeds Review