similarity-search-patterns

Similarity Search Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "similarity-search-patterns" with this command: npx skills add wshobson/agents/wshobson-agents-similarity-search-patterns

Similarity Search Patterns

Patterns for implementing efficient similarity search in production systems.

When to Use This Skill

  • Building semantic search systems

  • Implementing RAG retrieval

  • Creating recommendation engines

  • Optimizing search latency

  • Scaling to millions of vectors

  • Combining semantic and keyword search

Core Concepts

  1. Distance Metrics

| Metric | Formula | Best For | | ------------------ | ------------------ | --------------------- | --- | -------------- | | Cosine | 1 - (A·B)/(‖A‖‖B‖) | Normalized embeddings | | Euclidean (L2) | √Σ(a-b)² | Raw embeddings | | Dot Product | A·B | Magnitude matters | | Manhattan (L1) | Σ | a-b | | Sparse vectors |

  1. Index Types

┌─────────────────────────────────────────────────┐ │ Index Types │ ├─────────────┬───────────────┬───────────────────┤ │ Flat │ HNSW │ IVF+PQ │ │ (Exact) │ (Graph-based) │ (Quantized) │ ├─────────────┼───────────────┼───────────────────┤ │ O(n) search │ O(log n) │ O(√n) │ │ 100% recall │ ~95-99% │ ~90-95% │ │ Small data │ Medium-Large │ Very Large │ └─────────────┴───────────────┴───────────────────┘

Templates

Template 1: Pinecone Implementation

from pinecone import Pinecone, ServerlessSpec from typing import List, Dict, Optional import hashlib

class PineconeVectorStore: def init( self, api_key: str, index_name: str, dimension: int = 1536, metric: str = "cosine" ): self.pc = Pinecone(api_key=api_key)

    # Create index if not exists
    if index_name not in self.pc.list_indexes().names():
        self.pc.create_index(
            name=index_name,
            dimension=dimension,
            metric=metric,
            spec=ServerlessSpec(cloud="aws", region="us-east-1")
        )

    self.index = self.pc.Index(index_name)

def upsert(
    self,
    vectors: List[Dict],
    namespace: str = ""
) -> int:
    """
    Upsert vectors.
    vectors: [{"id": str, "values": List[float], "metadata": dict}]
    """
    # Batch upsert
    batch_size = 100
    total = 0

    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i + batch_size]
        self.index.upsert(vectors=batch, namespace=namespace)
        total += len(batch)

    return total

def search(
    self,
    query_vector: List[float],
    top_k: int = 10,
    namespace: str = "",
    filter: Optional[Dict] = None,
    include_metadata: bool = True
) -> List[Dict]:
    """Search for similar vectors."""
    results = self.index.query(
        vector=query_vector,
        top_k=top_k,
        namespace=namespace,
        filter=filter,
        include_metadata=include_metadata
    )

    return [
        {
            "id": match.id,
            "score": match.score,
            "metadata": match.metadata
        }
        for match in results.matches
    ]

def search_with_rerank(
    self,
    query: str,
    query_vector: List[float],
    top_k: int = 10,
    rerank_top_n: int = 50,
    namespace: str = ""
) -> List[Dict]:
    """Search and rerank results."""
    # Over-fetch for reranking
    initial_results = self.search(
        query_vector,
        top_k=rerank_top_n,
        namespace=namespace
    )

    # Rerank with cross-encoder or LLM
    reranked = self._rerank(query, initial_results)

    return reranked[:top_k]

def _rerank(self, query: str, results: List[Dict]) -> List[Dict]:
    """Rerank results using cross-encoder."""
    from sentence_transformers import CrossEncoder

    model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

    pairs = [(query, r["metadata"]["text"]) for r in results]
    scores = model.predict(pairs)

    for result, score in zip(results, scores):
        result["rerank_score"] = float(score)

    return sorted(results, key=lambda x: x["rerank_score"], reverse=True)

def delete(self, ids: List[str], namespace: str = ""):
    """Delete vectors by ID."""
    self.index.delete(ids=ids, namespace=namespace)

def delete_by_filter(self, filter: Dict, namespace: str = ""):
    """Delete vectors matching filter."""
    self.index.delete(filter=filter, namespace=namespace)

Template 2: Qdrant Implementation

from qdrant_client import QdrantClient from qdrant_client.http import models from typing import List, Dict, Optional

class QdrantVectorStore: def init( self, url: str = "localhost", port: int = 6333, collection_name: str = "documents", vector_size: int = 1536 ): self.client = QdrantClient(url=url, port=port) self.collection_name = collection_name

    # Create collection if not exists
    collections = self.client.get_collections().collections
    if collection_name not in [c.name for c in collections]:
        self.client.create_collection(
            collection_name=collection_name,
            vectors_config=models.VectorParams(
                size=vector_size,
                distance=models.Distance.COSINE
            ),
            # Optional: enable quantization for memory efficiency
            quantization_config=models.ScalarQuantization(
                scalar=models.ScalarQuantizationConfig(
                    type=models.ScalarType.INT8,
                    quantile=0.99,
                    always_ram=True
                )
            )
        )

def upsert(self, points: List[Dict]) -> int:
    """
    Upsert points.
    points: [{"id": str/int, "vector": List[float], "payload": dict}]
    """
    qdrant_points = [
        models.PointStruct(
            id=p["id"],
            vector=p["vector"],
            payload=p.get("payload", {})
        )
        for p in points
    ]

    self.client.upsert(
        collection_name=self.collection_name,
        points=qdrant_points
    )
    return len(points)

def search(
    self,
    query_vector: List[float],
    limit: int = 10,
    filter: Optional[models.Filter] = None,
    score_threshold: Optional[float] = None
) -> List[Dict]:
    """Search for similar vectors."""
    results = self.client.search(
        collection_name=self.collection_name,
        query_vector=query_vector,
        limit=limit,
        query_filter=filter,
        score_threshold=score_threshold
    )

    return [
        {
            "id": r.id,
            "score": r.score,
            "payload": r.payload
        }
        for r in results
    ]

def search_with_filter(
    self,
    query_vector: List[float],
    must_conditions: List[Dict] = None,
    should_conditions: List[Dict] = None,
    must_not_conditions: List[Dict] = None,
    limit: int = 10
) -> List[Dict]:
    """Search with complex filters."""
    conditions = []

    if must_conditions:
        conditions.extend([
            models.FieldCondition(
                key=c["key"],
                match=models.MatchValue(value=c["value"])
            )
            for c in must_conditions
        ])

    filter = models.Filter(must=conditions) if conditions else None

    return self.search(query_vector, limit=limit, filter=filter)

def search_with_sparse(
    self,
    dense_vector: List[float],
    sparse_vector: Dict[int, float],
    limit: int = 10,
    dense_weight: float = 0.7
) -> List[Dict]:
    """Hybrid search with dense and sparse vectors."""
    # Requires collection with named vectors
    results = self.client.search(
        collection_name=self.collection_name,
        query_vector=models.NamedVector(
            name="dense",
            vector=dense_vector
        ),
        limit=limit
    )
    return [{"id": r.id, "score": r.score, "payload": r.payload} for r in results]

Template 3: pgvector with PostgreSQL

import asyncpg from typing import List, Dict, Optional import numpy as np

class PgVectorStore: def init(self, connection_string: str): self.connection_string = connection_string

async def init(self):
    """Initialize connection pool and extension."""
    self.pool = await asyncpg.create_pool(self.connection_string)

    async with self.pool.acquire() as conn:
        # Enable extension
        await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")

        # Create table
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS documents (
                id TEXT PRIMARY KEY,
                content TEXT,
                metadata JSONB,
                embedding vector(1536)
            )
        """)

        # Create index (HNSW for better performance)
        await conn.execute("""
            CREATE INDEX IF NOT EXISTS documents_embedding_idx
            ON documents
            USING hnsw (embedding vector_cosine_ops)
            WITH (m = 16, ef_construction = 64)
        """)

async def upsert(self, documents: List[Dict]):
    """Upsert documents with embeddings."""
    async with self.pool.acquire() as conn:
        await conn.executemany(
            """
            INSERT INTO documents (id, content, metadata, embedding)
            VALUES ($1, $2, $3, $4)
            ON CONFLICT (id) DO UPDATE SET
                content = EXCLUDED.content,
                metadata = EXCLUDED.metadata,
                embedding = EXCLUDED.embedding
            """,
            [
                (
                    doc["id"],
                    doc["content"],
                    doc.get("metadata", {}),
                    np.array(doc["embedding"]).tolist()
                )
                for doc in documents
            ]
        )

async def search(
    self,
    query_embedding: List[float],
    limit: int = 10,
    filter_metadata: Optional[Dict] = None
) -> List[Dict]:
    """Search for similar documents."""
    query = """
        SELECT id, content, metadata,
               1 - (embedding <=> $1::vector) as similarity
        FROM documents
    """

    params = [query_embedding]

    if filter_metadata:
        conditions = []
        for key, value in filter_metadata.items():
            params.append(value)
            conditions.append(f"metadata->>'{key}' = ${len(params)}")
        query += " WHERE " + " AND ".join(conditions)

    query += f" ORDER BY embedding <=> $1::vector LIMIT ${len(params) + 1}"
    params.append(limit)

    async with self.pool.acquire() as conn:
        rows = await conn.fetch(query, *params)

    return [
        {
            "id": row["id"],
            "content": row["content"],
            "metadata": row["metadata"],
            "score": row["similarity"]
        }
        for row in rows
    ]

async def hybrid_search(
    self,
    query_embedding: List[float],
    query_text: str,
    limit: int = 10,
    vector_weight: float = 0.5
) -> List[Dict]:
    """Hybrid search combining vector and full-text."""
    async with self.pool.acquire() as conn:
        rows = await conn.fetch(
            """
            WITH vector_results AS (
                SELECT id, content, metadata,
                       1 - (embedding <=> $1::vector) as vector_score
                FROM documents
                ORDER BY embedding <=> $1::vector
                LIMIT $3 * 2
            ),
            text_results AS (
                SELECT id, content, metadata,
                       ts_rank(to_tsvector('english', content),
                               plainto_tsquery('english', $2)) as text_score
                FROM documents
                WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $2)
                LIMIT $3 * 2
            )
            SELECT
                COALESCE(v.id, t.id) as id,
                COALESCE(v.content, t.content) as content,
                COALESCE(v.metadata, t.metadata) as metadata,
                COALESCE(v.vector_score, 0) * $4 +
                COALESCE(t.text_score, 0) * (1 - $4) as combined_score
            FROM vector_results v
            FULL OUTER JOIN text_results t ON v.id = t.id
            ORDER BY combined_score DESC
            LIMIT $3
            """,
            query_embedding, query_text, limit, vector_weight
        )

    return [dict(row) for row in rows]

Template 4: Weaviate Implementation

import weaviate from weaviate.util import generate_uuid5 from typing import List, Dict, Optional

class WeaviateVectorStore: def init( self, url: str = "http://localhost:8080", class_name: str = "Document" ): self.client = weaviate.Client(url=url) self.class_name = class_name self._ensure_schema()

def _ensure_schema(self):
    """Create schema if not exists."""
    schema = {
        "class": self.class_name,
        "vectorizer": "none",  # We provide vectors
        "properties": [
            {"name": "content", "dataType": ["text"]},
            {"name": "source", "dataType": ["string"]},
            {"name": "chunk_id", "dataType": ["int"]}
        ]
    }

    if not self.client.schema.exists(self.class_name):
        self.client.schema.create_class(schema)

def upsert(self, documents: List[Dict]):
    """Batch upsert documents."""
    with self.client.batch as batch:
        batch.batch_size = 100

        for doc in documents:
            batch.add_data_object(
                data_object={
                    "content": doc["content"],
                    "source": doc.get("source", ""),
                    "chunk_id": doc.get("chunk_id", 0)
                },
                class_name=self.class_name,
                uuid=generate_uuid5(doc["id"]),
                vector=doc["embedding"]
            )

def search(
    self,
    query_vector: List[float],
    limit: int = 10,
    where_filter: Optional[Dict] = None
) -> List[Dict]:
    """Vector search."""
    query = (
        self.client.query
        .get(self.class_name, ["content", "source", "chunk_id"])
        .with_near_vector({"vector": query_vector})
        .with_limit(limit)
        .with_additional(["distance", "id"])
    )

    if where_filter:
        query = query.with_where(where_filter)

    results = query.do()

    return [
        {
            "id": item["_additional"]["id"],
            "content": item["content"],
            "source": item["source"],
            "score": 1 - item["_additional"]["distance"]
        }
        for item in results["data"]["Get"][self.class_name]
    ]

def hybrid_search(
    self,
    query: str,
    query_vector: List[float],
    limit: int = 10,
    alpha: float = 0.5  # 0 = keyword, 1 = vector
) -> List[Dict]:
    """Hybrid search combining BM25 and vector."""
    results = (
        self.client.query
        .get(self.class_name, ["content", "source"])
        .with_hybrid(query=query, vector=query_vector, alpha=alpha)
        .with_limit(limit)
        .with_additional(["score"])
        .do()
    )

    return [
        {
            "content": item["content"],
            "source": item["source"],
            "score": item["_additional"]["score"]
        }
        for item in results["data"]["Get"][self.class_name]
    ]

Best Practices

Do's

  • Use appropriate index - HNSW for most cases

  • Tune parameters - ef_search, nprobe for recall/speed

  • Implement hybrid search - Combine with keyword search

  • Monitor recall - Measure search quality

  • Pre-filter when possible - Reduce search space

Don'ts

  • Don't skip evaluation - Measure before optimizing

  • Don't over-index - Start with flat, scale up

  • Don't ignore latency - P99 matters for UX

  • Don't forget costs - Vector storage adds up

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

tailwind-design-system

Tailwind Design System (v4)

Repository Source
19.1K31.3Kwshobson
Automation

api-design-principles

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

nodejs-backend-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

nextjs-app-router-patterns

No summary provided by upstream source.

Repository SourceNeeds Review