community-feed

Social feed with trending algorithms, cursor pagination, and engagement tracking.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "community-feed" with this command: npx skills add dadbodgeoff/drift/dadbodgeoff-drift-community-feed

Community Feed

Social feed with trending algorithms, cursor pagination, and engagement tracking.

When to Use This Skill

  • Building social feeds with infinite scroll

  • Need trending/hot content algorithms

  • Implementing like/engagement systems

  • Want efficient pagination for large datasets

Core Concepts

Cursor pagination beats offset for large datasets. Batch-load relationships to avoid N+1. Store trending scores as computed columns for efficient sorting.

Implementation

Python

from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional, List, Dict import base64 import json

@dataclass class PaginatedPosts: posts: List[Dict] total_count: int has_more: bool next_cursor: Optional[str]

class CommunityFeedService: """Service for community feed with cursor pagination."""

def __init__(self, db):
    self.db = db

async def get_feed(
    self,
    feed_type: str = "trending",
    viewer_id: Optional[str] = None,
    cursor: Optional[str] = None,
    limit: int = 20,
    tags: Optional[List[str]] = None,
) -> PaginatedPosts:
    cursor_data = self._parse_cursor(cursor) if cursor else None

    query = self.db.table("community_posts").select(
        "*",
        "assets!inner(url, asset_type)",
        "users!inner(id, display_name, avatar_url)",
    )

    if tags:
        query = query.contains("tags", tags)

    # Feed-specific ordering
    if feed_type == "following" and viewer_id:
        following = await self._get_following_ids(viewer_id)
        if not following:
            return PaginatedPosts(posts=[], total_count=0, has_more=False, next_cursor=None)
        query = query.in_("user_id", following)

    if feed_type == "trending":
        query = query.order("trending_score", desc=True)
        if cursor_data:
            query = query.lt("trending_score", cursor_data["score"])
    else:
        query = query.order("created_at", desc=True)
        if cursor_data:
            query = query.lt("created_at", cursor_data["created_at"])

    # Fetch one extra to check has_more
    query = query.limit(limit + 1)
    result = query.execute()
    posts = result.data or []

    has_more = len(posts) > limit
    if has_more:
        posts = posts[:limit]

    # Batch load viewer's likes
    if viewer_id and posts:
        liked_ids = await self._get_liked_post_ids(viewer_id, [p["id"] for p in posts])
        for post in posts:
            post["is_liked_by_viewer"] = post["id"] in liked_ids

    next_cursor = None
    if has_more and posts:
        next_cursor = self._generate_cursor(posts[-1], feed_type)

    return PaginatedPosts(
        posts=posts,
        total_count=await self._get_total_count(tags),
        has_more=has_more,
        next_cursor=next_cursor,
    )

async def _get_following_ids(self, user_id: str) -> List[str]:
    result = self.db.table("user_follows").select("following_id").eq("follower_id", user_id).execute()
    return [r["following_id"] for r in (result.data or [])]

async def _get_liked_post_ids(self, user_id: str, post_ids: List[str]) -> set:
    result = self.db.table("post_likes").select("post_id").eq("user_id", user_id).in_("post_id", post_ids).execute()
    return {r["post_id"] for r in (result.data or [])}

def _parse_cursor(self, cursor: str) -> dict:
    try:
        return json.loads(base64.b64decode(cursor).decode())
    except:
        return {}

def _generate_cursor(self, post: dict, feed_type: str) -> str:
    if feed_type == "trending":
        data = {"score": post.get("trending_score", 0)}
    else:
        data = {"created_at": post["created_at"]}
    return base64.b64encode(json.dumps(data).encode()).decode()

Engagement operations

async def like_post(self, post_id: str, user_id: str) -> bool: existing = self.db.table("post_likes").select("id").eq("post_id", post_id).eq("user_id", user_id).execute() if existing.data: return False # Already liked

self.db.table("post_likes").insert({
    "post_id": post_id,
    "user_id": user_id,
    "created_at": datetime.now(timezone.utc).isoformat(),
}).execute()

# Atomic increment
self.db.rpc("increment_like_count", {"post_id": post_id}).execute()
return True

async def unlike_post(self, post_id: str, user_id: str) -> bool: result = self.db.table("post_likes").delete().eq("post_id", post_id).eq("user_id", user_id).execute() if not result.data: return False

self.db.rpc("decrement_like_count", {"post_id": post_id}).execute()
return True

Trending algorithm

def calculate_trending_score( like_count: int, comment_count: int, view_count: int, created_at: datetime, is_featured: bool = False, ) -> float: """ Trending score = engagement / age^decay Higher engagement + newer = higher score """ engagement = like_count * 1.0 + comment_count * 2.0 + view_count * 0.1 age_hours = max((datetime.now(timezone.utc) - created_at).total_seconds() / 3600, 0.1) score = engagement / (age_hours ** 1.5) return score * 1.5 if is_featured else score

SQL Schema

CREATE TABLE community_posts ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id), title VARCHAR(200) NOT NULL, like_count INTEGER DEFAULT 0, comment_count INTEGER DEFAULT 0, view_count INTEGER DEFAULT 0, is_featured BOOLEAN DEFAULT FALSE, tags TEXT[] DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW(), -- Computed trending score trending_score FLOAT GENERATED ALWAYS AS ( (like_count + comment_count * 2 + view_count * 0.1) / POWER(GREATEST(EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600, 0.1), 1.5) ) STORED );

CREATE INDEX idx_posts_trending ON community_posts(trending_score DESC); CREATE INDEX idx_posts_created ON community_posts(created_at DESC);

-- Atomic increment functions CREATE FUNCTION increment_like_count(post_id UUID) RETURNS VOID AS $$ BEGIN UPDATE community_posts SET like_count = like_count + 1 WHERE id = post_id; END; $$ LANGUAGE plpgsql;

Usage Examples

feed_service = CommunityFeedService(db)

Get trending feed

result = await feed_service.get_feed(feed_type="trending", viewer_id="user_123", limit=20) for post in result.posts: print(f"{post['title']} - {post['like_count']} likes")

Load next page

if result.has_more: next_page = await feed_service.get_feed(feed_type="trending", cursor=result.next_cursor)

Like a post

await feed_service.like_post("post_456", "user_123")

Best Practices

  • Use cursor pagination over offset for large datasets

  • Batch-load relationships to avoid N+1 queries

  • Store trending score as computed column for efficient sorting

  • Use atomic database functions for counter updates

  • Cache total counts (expensive to compute)

Common Mistakes

  • Using offset pagination (slow for large offsets)

  • N+1 queries for author/like data

  • Computing trending score on every query

  • Non-atomic counter updates (race conditions)

Related Patterns

  • analytics-pipeline (event tracking)

  • intelligent-cache (caching feeds)

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

oauth-social-login

No summary provided by upstream source.

Repository SourceNeeds Review
General

sse-streaming

No summary provided by upstream source.

Repository SourceNeeds Review
General

multi-tenancy

No summary provided by upstream source.

Repository SourceNeeds Review