vector-database-management

Vector Database Management

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "vector-database-management" with this command: npx skills add manutej/luxor-claude-marketplace/manutej-luxor-claude-marketplace-vector-database-management

Vector Database Management

Table of Contents

  • Introduction

  • Vector Embeddings Fundamentals

  • Database Setup & Configuration

  • Index Operations

  • Vector Operations

  • Similarity Search

  • Metadata Filtering

  • Hybrid Search

  • Namespace & Collection Management

  • Performance & Scaling

  • Production Best Practices

  • Cost Optimization

Introduction

Vector databases are specialized systems designed to store, index, and query high-dimensional vector embeddings efficiently. They power modern AI applications including semantic search, recommendation systems, RAG (Retrieval Augmented Generation), and similarity-based matching.

Key Concepts

  • Vector Embeddings: Numerical representations of data (text, images, audio) in high-dimensional space

  • Similarity Search: Finding vectors that are "close" to a query vector using distance metrics

  • Metadata Filtering: Combining vector similarity with structured data filtering

  • Indexing: Optimization structures (HNSW, IVF, etc.) for fast approximate nearest neighbor search

Database Comparison

Feature Pinecone Weaviate Chroma

Deployment Fully managed Managed or self-hosted Self-hosted or cloud

Index Types Serverless, Pods HNSW HNSW

Metadata Filtering Advanced GraphQL-based Simple

Hybrid Search Sparse-Dense Built-in Limited

Scale Massive Large Small-Medium

Best For Production RAG Knowledge graphs Local development

Vector Embeddings Fundamentals

Understanding Vector Representations

Vector embeddings transform unstructured data into numerical arrays that capture semantic meaning:

Text to embeddings using OpenAI

from openai import OpenAI

client = OpenAI(api_key="YOUR_API_KEY")

def generate_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]: """Generate embeddings from text using OpenAI.""" response = client.embeddings.create( input=text, model=model ) return response.data[0].embedding

Example usage

text = "Vector databases enable semantic search capabilities" embedding = generate_embedding(text) print(f"Embedding dimension: {len(embedding)}") # 1536 dimensions print(f"First 5 values: {embedding[:5]}")

Popular Embedding Models

1. OpenAI Embeddings (Production-grade)

from openai import OpenAI

def openai_embeddings(texts: list[str]) -> list[list[float]]: """Batch generate OpenAI embeddings.""" client = OpenAI(api_key="YOUR_API_KEY") response = client.embeddings.create( input=texts, model="text-embedding-3-large" # 3072 dimensions ) return [item.embedding for item in response.data]

2. Sentence Transformers (Open-source)

from sentence_transformers import SentenceTransformer

def sentence_transformer_embeddings(texts: list[str]) -> list[list[float]]: """Generate embeddings using Sentence Transformers.""" model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions embeddings = model.encode(texts) return embeddings.tolist()

3. Cohere Embeddings

import cohere

def cohere_embeddings(texts: list[str]) -> list[list[float]]: """Generate embeddings using Cohere.""" co = cohere.Client("YOUR_API_KEY") response = co.embed( texts=texts, model="embed-english-v3.0", input_type="search_document" ) return response.embeddings

Embedding Dimensions & Trade-offs

Different embedding models for different use cases

EMBEDDING_CONFIGS = { "openai-small": { "model": "text-embedding-3-small", "dimensions": 1536, "cost_per_1m": 0.02, "use_case": "General purpose, cost-effective" }, "openai-large": { "model": "text-embedding-3-large", "dimensions": 3072, "cost_per_1m": 0.13, "use_case": "High accuracy requirements" }, "sentence-transformers": { "model": "all-MiniLM-L6-v2", "dimensions": 384, "cost_per_1m": 0.00, # Open-source "use_case": "Local development, privacy-sensitive" }, "cohere-multilingual": { "model": "embed-multilingual-v3.0", "dimensions": 1024, "cost_per_1m": 0.10, "use_case": "Multi-language applications" } }

Database Setup & Configuration

Pinecone Setup

Install Pinecone SDK

pip install pinecone-client

from pinecone import Pinecone, ServerlessSpec

Initialize Pinecone client

pc = Pinecone(api_key="YOUR_API_KEY")

List existing indexes

indexes = pc.list_indexes() print(f"Existing indexes: {[idx.name for idx in indexes]}")

Create serverless index (recommended for production)

index_name = "production-search"

if index_name not in [idx.name for idx in pc.list_indexes()]: pc.create_index( name=index_name, dimension=1536, # Match your embedding model metric="cosine", # cosine, dotproduct, or euclidean spec=ServerlessSpec( cloud="aws", region="us-east-1" ), deletion_protection="enabled", # Prevent accidental deletion tags={ "environment": "production", "team": "ml", "project": "semantic-search" } ) print(f"Created index: {index_name}")

Connect to index

index = pc.Index(index_name)

Get index stats

stats = index.describe_index_stats() print(f"Index stats: {stats}")

Selective Metadata Indexing (Pinecone)

Configure which metadata fields to index for filtering

This optimizes memory usage and query performance

from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key="YOUR_API_KEY")

Create index with metadata configuration

pc.create_index( name="optimized-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1", schema={ "fields": { # Index these fields for filtering "document_id": {"filterable": True}, "category": {"filterable": True}, "created_at": {"filterable": True}, "tags": {"filterable": True}, # Store but don't index (saves memory) "document_title": {"filterable": False}, "document_url": {"filterable": False}, "full_content": {"filterable": False} } } ) )

This configuration allows you to:

1. Filter by document_id, category, created_at, tags

2. Retrieve document_title, document_url, full_content in results

3. Save memory by not indexing non-filterable fields

Weaviate Setup

Install Weaviate client

pip install weaviate-client

import weaviate from weaviate.classes.config import Configure, Property, DataType

Connect to Weaviate

client = weaviate.connect_to_local()

Or connect to Weaviate Cloud

client = weaviate.connect_to_wcs(

cluster_url="YOUR_WCS_URL",

auth_credentials=weaviate.auth.AuthApiKey("YOUR_API_KEY")

)

Create collection (schema)

try: collection = client.collections.create( name="Documents", vectorizer_config=Configure.Vectorizer.text2vec_openai( model="text-embedding-3-small" ), properties=[ Property(name="title", data_type=DataType.TEXT), Property(name="content", data_type=DataType.TEXT), Property(name="category", data_type=DataType.TEXT), Property(name="created_at", data_type=DataType.DATE), Property(name="tags", data_type=DataType.TEXT_ARRAY) ] ) print(f"Created collection: Documents") except Exception as e: print(f"Collection exists or error: {e}")

Get collection

documents = client.collections.get("Documents")

Check collection info

print(documents.config.get())

Chroma Setup

Install Chroma

pip install chromadb

import chromadb from chromadb.config import Settings

Initialize Chroma client (persistent)

client = chromadb.PersistentClient(path="./chroma_db")

Or use ephemeral (in-memory)

client = chromadb.EphemeralClient()

Create or get collection

collection = client.get_or_create_collection( name="documents", metadata={ "description": "Document collection for semantic search", "hnsw:space": "cosine" # cosine, l2, or ip (inner product) } )

List all collections

collections = client.list_collections() print(f"Available collections: {[c.name for c in collections]}")

Get collection info

print(f"Collection count: {collection.count()}")

Index Operations

Creating Indexes with Different Configurations

from pinecone import Pinecone, ServerlessSpec, PodSpec

pc = Pinecone(api_key="YOUR_API_KEY")

1. Serverless index (auto-scaling, pay-per-use)

pc.create_index( name="serverless-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) )

2. Pod-based index (dedicated resources)

pc.create_index( name="pod-index", dimension=1536, metric="dotproduct", spec=PodSpec( environment="us-east-1-aws", pod_type="p1.x1", # Performance tier pods=2, # Number of pods replicas=2, # Replicas for high availability shards=1 ) )

3. Sparse index (for BM25-like search)

pc.create_index( name="sparse-index", dimension=None, # Sparse vectors don't have fixed dimension metric="dotproduct", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) )

Index Management Operations

from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY")

List all indexes

indexes = pc.list_indexes() for idx in indexes: print(f"Name: {idx.name}, Status: {idx.status.state}, Host: {idx.host}")

Describe specific index

index_info = pc.describe_index("production-search") print(f"Dimension: {index_info.dimension}") print(f"Metric: {index_info.metric}") print(f"Status: {index_info.status}")

Connect to index

index = pc.Index("production-search")

Get index statistics

stats = index.describe_index_stats() print(f"Total vectors: {stats.total_vector_count}") print(f"Namespaces: {stats.namespaces}") print(f"Index fullness: {stats.index_fullness}")

Delete index (be careful!)

pc.delete_index("test-index")

Configuring Index for Optimal Performance

Configuration for different use cases

1. High-throughput search (many queries/second)

pc.create_index( name="high-throughput", dimension=1536, metric="cosine", spec=PodSpec( environment="us-east-1-aws", pod_type="p2.x1", # Higher performance tier pods=4, replicas=3 # More replicas = higher query throughput ) )

2. Large-scale storage (billions of vectors)

pc.create_index( name="large-scale", dimension=1536, metric="cosine", spec=PodSpec( environment="us-east-1-aws", pod_type="s1.x1", # Storage-optimized pods=8, shards=4 # More shards = more storage capacity ) )

3. Cost-optimized development

pc.create_index( name="dev-environment", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) # Serverless = pay only for what you use )

Vector Operations

Upserting Vectors (Pinecone)

from pinecone import Pinecone import uuid

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("production-search")

1. Single vector upsert

vector_id = str(uuid.uuid4()) index.upsert( vectors=[ { "id": vector_id, "values": [0.1, 0.2, 0.3, ...], # 1536 dimensions "metadata": { "title": "Introduction to Vector Databases", "category": "education", "author": "John Doe", "created_at": "2024-01-15", "tags": ["ml", "ai", "databases"] } } ], namespace="documents" )

2. Batch upsert (efficient for large datasets)

batch_size = 100 vectors = []

for i, (doc_id, embedding, metadata) in enumerate(documents): vectors.append({ "id": doc_id, "values": embedding, "metadata": metadata })

# Upsert in batches
if len(vectors) >= batch_size or i == len(documents) - 1:
    index.upsert(vectors=vectors, namespace="documents")
    print(f"Upserted batch of {len(vectors)} vectors")
    vectors = []

3. Upsert with async for better performance

from pinecone import Pinecone import asyncio

async def upsert_vectors_async(vectors_batch): """Async upsert for parallel processing.""" index.upsert(vectors=vectors_batch, namespace="documents", async_req=True)

Parallel upsert

tasks = [] for batch in batches: tasks.append(upsert_vectors_async(batch)) await asyncio.gather(*tasks)

Sparse Vector Operations (Pinecone)

Sparse vectors are useful for keyword-based search (like BM25)

Combined with dense vectors for hybrid search

from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("hybrid-search-index")

Upsert vector with both dense and sparse components

index.upsert( vectors=[ { "id": "doc1", "values": [0.1, 0.2, ..., 0.5], # Dense vector "sparse_values": { "indices": [10, 45, 123, 234, 678], # Token IDs "values": [0.8, 0.6, 0.9, 0.7, 0.5] # TF-IDF weights }, "metadata": {"title": "Hybrid Search Document"} } ], namespace="hybrid" )

Query with hybrid search

results = index.query( vector=[0.1, 0.2, ..., 0.5], # Dense query vector sparse_vector={ "indices": [10, 45, 123], "values": [0.8, 0.7, 0.9] }, top_k=10, namespace="hybrid", include_metadata=True )

Vector Operations (Weaviate)

import weaviate from weaviate.classes.query import MetadataQuery

client = weaviate.connect_to_local() documents = client.collections.get("Documents")

1. Insert single object

doc_uuid = documents.data.insert( properties={ "title": "Vector Database Guide", "content": "A comprehensive guide to vector databases...", "category": "tutorial", "created_at": "2024-01-15T10:00:00Z", "tags": ["database", "ml", "ai"] } ) print(f"Inserted: {doc_uuid}")

2. Batch insert

with documents.batch.dynamic() as batch: for doc in document_list: batch.add_object( properties={ "title": doc["title"], "content": doc["content"], "category": doc["category"], "created_at": doc["created_at"], "tags": doc["tags"] } )

3. Insert with custom vector

documents.data.insert( properties={"title": "Custom Vector Doc", "content": "..."}, vector=[0.1, 0.2, 0.3, ...] # Your pre-computed vector )

4. Update object

documents.data.update( uuid=doc_uuid, properties={"title": "Updated Title"} )

5. Delete object

documents.data.delete_by_id(uuid=doc_uuid)

Vector Operations (Chroma)

import chromadb

client = chromadb.PersistentClient(path="./chroma_db") collection = client.get_collection("documents")

1. Add documents with auto-embedding

collection.add( documents=[ "This is document 1", "This is document 2", "This is document 3" ], metadatas=[ {"category": "tech", "author": "Alice"}, {"category": "science", "author": "Bob"}, {"category": "tech", "author": "Charlie"} ], ids=["doc1", "doc2", "doc3"] )

2. Add with custom embeddings

collection.add( embeddings=[ [0.1, 0.2, 0.3, ...], [0.4, 0.5, 0.6, ...] ], metadatas=[ {"title": "Doc 1"}, {"title": "Doc 2"} ], ids=["custom1", "custom2"] )

3. Update documents

collection.update( ids=["doc1"], documents=["Updated document content"], metadatas=[{"category": "tech", "updated": True}] )

4. Delete documents

collection.delete(ids=["doc1", "doc2"])

5. Get documents by IDs

results = collection.get( ids=["doc1", "doc2"], include=["documents", "metadatas", "embeddings"] )

Similarity Search

Basic Similarity Search (Pinecone)

from pinecone import Pinecone from openai import OpenAI

Initialize clients

pc = Pinecone(api_key="PINECONE_API_KEY") openai_client = OpenAI(api_key="OPENAI_API_KEY")

index = pc.Index("production-search")

1. Generate query embedding

query_text = "What are the benefits of vector databases?" response = openai_client.embeddings.create( input=query_text, model="text-embedding-3-small" ) query_embedding = response.data[0].embedding

2. Search for similar vectors

results = index.query( vector=query_embedding, top_k=10, namespace="documents", include_values=False, include_metadata=True )

3. Process results

print(f"Found {len(results.matches)} results") for match in results.matches: print(f"ID: {match.id}") print(f"Score: {match.score:.4f}") print(f"Title: {match.metadata.get('title')}") print(f"Category: {match.metadata.get('category')}") print("---")

Search by ID (Query by Example)

Search using an existing vector as query

results = index.query( id="existing-doc-id", # Use this document as the query top_k=10, namespace="documents", include_metadata=True )

Useful for "find similar items" features

print(f"Documents similar to {results.matches[0].metadata.get('title')}:") for match in results.matches[1:]: # Skip first (self) print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")

Multi-vector Search (Pinecone)

Search multiple query vectors in one request

query_embeddings = [ [0.1, 0.2, ...], # Query 1 [0.3, 0.4, ...], # Query 2 [0.5, 0.6, ...] # Query 3 ]

results = index.query( queries=query_embeddings, top_k=5, namespace="documents", include_metadata=True )

Process results for each query

for i, query_results in enumerate(results): print(f"\nResults for query {i+1}:") for match in query_results.matches: print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")

Similarity Search (Weaviate)

import weaviate from weaviate.classes.query import MetadataQuery

client = weaviate.connect_to_local() documents = client.collections.get("Documents")

1. Near text search (semantic)

response = documents.query.near_text( query="vector database performance optimization", limit=10, return_metadata=MetadataQuery(distance=True, certainty=True) )

for obj in response.objects: print(f"Title: {obj.properties['title']}") print(f"Distance: {obj.metadata.distance:.4f}") print(f"Certainty: {obj.metadata.certainty:.4f}") print("---")

2. Near vector search (with custom embedding)

response = documents.query.near_vector( near_vector=[0.1, 0.2, 0.3, ...], limit=10 )

3. Near object search (find similar to existing object)

response = documents.query.near_object( near_object="uuid-of-reference-object", limit=10 )

Similarity Search (Chroma)

import chromadb

client = chromadb.PersistentClient(path="./chroma_db") collection = client.get_collection("documents")

1. Query with text (auto-embedding)

results = collection.query( query_texts=["What is machine learning?"], n_results=10, include=["documents", "metadatas", "distances"] )

print(f"Found {len(results['ids'][0])} results") for i, doc_id in enumerate(results['ids'][0]): print(f"ID: {doc_id}") print(f"Distance: {results['distances'][0][i]:.4f}") print(f"Document: {results['documents'][0][i][:100]}...") print(f"Metadata: {results['metadatas'][0][i]}") print("---")

2. Query with custom embedding

results = collection.query( query_embeddings=[[0.1, 0.2, 0.3, ...]], n_results=10 )

Metadata Filtering

Pinecone Metadata Filters

from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("production-search")

1. Equality filter

results = index.query( vector=query_embedding, top_k=10, filter={"category": {"$eq": "education"}}, include_metadata=True )

2. Inequality filter

results = index.query( vector=query_embedding, top_k=10, filter={"year": {"$ne": 2023}}, include_metadata=True )

3. Range filters

results = index.query( vector=query_embedding, top_k=10, filter={ "$and": [ {"year": {"$gte": 2020}}, {"year": {"$lte": 2024}} ] }, include_metadata=True )

4. In/Not-in filters

results = index.query( vector=query_embedding, top_k=10, filter={ "category": {"$in": ["education", "tutorial", "guide"]} }, include_metadata=True )

5. Existence check

results = index.query( vector=query_embedding, top_k=10, filter={"author": {"$exists": True}}, include_metadata=True )

6. Complex AND/OR queries

results = index.query( vector=query_embedding, top_k=10, filter={ "$and": [ {"category": {"$eq": "education"}}, { "$or": [ {"year": {"$eq": 2024}}, {"featured": {"$eq": True}} ] }, {"tags": {"$in": ["ml", "ai"]}} ] }, include_metadata=True )

7. Greater than/Less than

results = index.query( vector=query_embedding, top_k=10, filter={ "view_count": {"$gt": 1000}, "rating": {"$gte": 4.5} }, include_metadata=True )

Production Metadata Filter Patterns

Pattern 1: Time-based filtering (recent content)

from datetime import datetime, timedelta

def search_recent_documents(query_text: str, days: int = 30): """Search only documents from last N days.""" cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter={
        "created_at": {"$gte": cutoff_date}
    },
    include_metadata=True
)
return results

Pattern 2: User permission filtering

def search_with_permissions(query_text: str, user_id: str, user_roles: list): """Search only documents user has access to.""" results = index.query( vector=generate_embedding(query_text), top_k=10, filter={ "$or": [ {"owner_id": {"$eq": user_id}}, {"shared_with": {"$in": [user_id]}}, {"public": {"$eq": True}}, {"required_roles": {"$in": user_roles}} ] }, include_metadata=True ) return results

Pattern 3: Multi-tenant filtering

def search_tenant_documents(query_text: str, tenant_id: str, category: str = None): """Search within a specific tenant's data.""" filter_dict = {"tenant_id": {"$eq": tenant_id}}

if category:
    filter_dict["category"] = {"$eq": category}

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter=filter_dict,
    include_metadata=True
)
return results

Pattern 4: Faceted search

def faceted_search(query_text: str, facets: dict): """Search with multiple facet filters.""" filter_conditions = []

for field, values in facets.items():
    if isinstance(values, list):
        filter_conditions.append({field: {"$in": values}})
    else:
        filter_conditions.append({field: {"$eq": values}})

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter={"$and": filter_conditions} if filter_conditions else {},
    include_metadata=True
)
return results

Usage

results = faceted_search( "machine learning tutorials", facets={ "category": ["education", "tutorial"], "difficulty": "beginner", "language": ["english", "spanish"] } )

Weaviate Metadata Filtering

import weaviate from weaviate.classes.query import Filter

client = weaviate.connect_to_local() documents = client.collections.get("Documents")

1. Simple equality filter

response = documents.query.near_text( query="vector databases", limit=10, filters=Filter.by_property("category").equal("education") )

2. Greater than filter

response = documents.query.near_text( query="machine learning", limit=10, filters=Filter.by_property("year").greater_than(2020) )

3. Contains any filter

response = documents.query.near_text( query="AI tutorials", limit=10, filters=Filter.by_property("tags").contains_any(["ml", "ai", "deep-learning"]) )

4. Complex AND/OR filters

response = documents.query.near_text( query="database optimization", limit=10, filters=( Filter.by_property("category").equal("tutorial") & (Filter.by_property("difficulty").equal("beginner") | Filter.by_property("featured").equal(True)) ) )

Chroma Metadata Filtering

import chromadb

client = chromadb.PersistentClient(path="./chroma_db") collection = client.get_collection("documents")

1. Simple equality filter

results = collection.query( query_texts=["vector databases"], n_results=10, where={"category": "education"} )

2. AND conditions

results = collection.query( query_texts=["machine learning"], n_results=10, where={ "$and": [ {"category": "education"}, {"difficulty": "beginner"} ] } )

3. OR conditions

results = collection.query( query_texts=["AI tutorials"], n_results=10, where={ "$or": [ {"category": "education"}, {"category": "tutorial"} ] } )

4. Greater than/Less than

results = collection.query( query_texts=["recent content"], n_results=10, where={"year": {"$gte": 2023}} )

5. In operator

results = collection.query( query_texts=["programming guides"], n_results=10, where={"language": {"$in": ["python", "javascript", "go"]}} )

Hybrid Search

Pinecone Hybrid Search (Dense + Sparse)

from pinecone import Pinecone from typing import Dict, List import re from collections import Counter

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("hybrid-search-index")

def create_sparse_vector(text: str, top_k: int = 100) -> Dict: """Create sparse vector using simple TF approach.""" # Tokenize tokens = re.findall(r'\w+', text.lower())

# Calculate term frequencies
tf = Counter(tokens)

# Create vocabulary mapping
vocab = {word: hash(word) % 10000 for word in set(tokens)}

# Get top-k terms
top_terms = tf.most_common(top_k)

# Create sparse vector
indices = [vocab[term] for term, _ in top_terms]
values = [float(freq) / len(tokens) for _, freq in top_terms]

return {
    "indices": indices,
    "values": values
}

def hybrid_search(query_text: str, top_k: int = 10, alpha: float = 0.5): """ Perform hybrid search combining dense and sparse vectors. alpha: weight for dense search (0.0 = sparse only, 1.0 = dense only) """ # Generate dense vector dense_vector = generate_embedding(query_text)

# Generate sparse vector
sparse_vector = create_sparse_vector(query_text)

# Hybrid query
results = index.query(
    vector=dense_vector,
    sparse_vector=sparse_vector,
    top_k=top_k,
    include_metadata=True
)

return results

Example usage

results = hybrid_search("machine learning vector databases", top_k=10) for match in results.matches: print(f"{match.metadata['title']}: {match.score:.4f}")

Weaviate Hybrid Search

import weaviate

client = weaviate.connect_to_local() documents = client.collections.get("Documents")

Hybrid search (combines dense vector + BM25 keyword search)

response = documents.query.hybrid( query="vector database performance", limit=10, alpha=0.5, # 0 = pure keyword, 1 = pure vector, 0.5 = balanced fusion_type="rankedFusion" # or "relativeScore" )

for obj in response.objects: print(f"Title: {obj.properties['title']}") print(f"Score: {obj.metadata.score}") print("---")

Hybrid search with filters

response = documents.query.hybrid( query="machine learning tutorials", limit=10, alpha=0.7, # Favor semantic search filters=Filter.by_property("category").equal("education") )

Hybrid search with custom vector

response = documents.query.hybrid( query="custom query", vector=[0.1, 0.2, 0.3, ...], # Your pre-computed vector limit=10, alpha=0.5 )

BM25 + Vector Hybrid (Custom Implementation)

from rank_bm25 import BM25Okapi import numpy as np

class HybridSearchEngine: """Custom hybrid search combining BM25 and vector search."""

def __init__(self, index, documents: List[Dict]):
    self.index = index
    self.documents = documents

    # Build BM25 index
    tokenized_docs = [doc['content'].lower().split() for doc in documents]
    self.bm25 = BM25Okapi(tokenized_docs)
    self.doc_ids = [doc['id'] for doc in documents]

def search(self, query: str, top_k: int = 10, alpha: float = 0.5):
    """
    Hybrid search with custom score fusion.
    alpha: weight for vector search (1-alpha for BM25)
    """
    # 1. Vector search
    query_embedding = generate_embedding(query)
    vector_results = self.index.query(
        vector=query_embedding,
        top_k=top_k * 2,  # Get more candidates
        include_metadata=True
    )

    # 2. BM25 search
    tokenized_query = query.lower().split()
    bm25_scores = self.bm25.get_scores(tokenized_query)

    # 3. Normalize scores
    vector_scores = {
        m.id: m.score for m in vector_results.matches
    }
    max_vec_score = max(vector_scores.values()) if vector_scores else 1.0

    max_bm25_score = max(bm25_scores) if max(bm25_scores) > 0 else 1.0

    # 4. Combine scores
    hybrid_scores = {}
    all_ids = set(vector_scores.keys()) | set(self.doc_ids)

    for doc_id in all_ids:
        vec_score = vector_scores.get(doc_id, 0) / max_vec_score
        idx = self.doc_ids.index(doc_id) if doc_id in self.doc_ids else -1
        bm25_score = bm25_scores[idx] / max_bm25_score if idx >= 0 else 0

        hybrid_scores[doc_id] = (alpha * vec_score) + ((1 - alpha) * bm25_score)

    # 5. Rank and return top-k
    ranked = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
    return ranked[:top_k]

Usage

engine = HybridSearchEngine(index, documents) results = engine.search("machine learning databases", top_k=10, alpha=0.7)

Namespace & Collection Management

Pinecone Namespaces

from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("production-search")

1. Upsert to specific namespace

index.upsert( vectors=[ {"id": "doc1", "values": [...], "metadata": {...}} ], namespace="production" )

2. Query specific namespace

results = index.query( vector=[...], top_k=10, namespace="production", include_metadata=True )

3. Get namespace statistics

stats = index.describe_index_stats() for namespace, info in stats.namespaces.items(): print(f"Namespace: {namespace}") print(f" Vector count: {info.vector_count}")

4. Delete all vectors in namespace

index.delete(delete_all=True, namespace="test")

5. Multi-namespace architecture

NAMESPACES = { "production": "Live user-facing data", "staging": "Testing before production", "development": "Development and experiments", "archive": "Historical data" }

def upsert_with_environment(vectors, environment="production"): """Upsert to appropriate namespace.""" namespace = environment if environment in NAMESPACES else "development" index.upsert(vectors=vectors, namespace=namespace)

def search_across_namespaces(query_vector, namespaces=["production", "archive"]): """Search multiple namespaces and combine results.""" all_results = []

for ns in namespaces:
    results = index.query(
        vector=query_vector,
        top_k=10,
        namespace=ns,
        include_metadata=True
    )
    for match in results.matches:
        match.metadata["source_namespace"] = ns
        all_results.append(match)

# Sort by score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:10]

Weaviate Collections

import weaviate from weaviate.classes.config import Configure

client = weaviate.connect_to_local()

1. Create multiple collections

collections_config = [ { "name": "Products", "properties": ["name", "description", "category", "price"] }, { "name": "Users", "properties": ["username", "bio", "interests"] }, { "name": "Reviews", "properties": ["content", "rating", "product_id", "user_id"] } ]

for config in collections_config: try: client.collections.create( name=config["name"], vectorizer_config=Configure.Vectorizer.text2vec_openai() ) except Exception as e: print(f"Collection {config['name']} exists: {e}")

2. Cross-collection references

client.collections.create( name="Orders", references=[ weaviate.classes.config.ReferenceProperty( name="hasProduct", target_collection="Products" ), weaviate.classes.config.ReferenceProperty( name="byUser", target_collection="Users" ) ] )

3. Multi-collection search

def search_all_collections(query: str): """Search across multiple collections.""" results = {}

for collection_name in ["Products", "Users", "Reviews"]:
    collection = client.collections.get(collection_name)
    response = collection.query.near_text(
        query=query,
        limit=5
    )
    results[collection_name] = response.objects

return results

4. Delete collection

client.collections.delete("TestCollection")

Chroma Collections

import chromadb

client = chromadb.PersistentClient(path="./chroma_db")

1. Create multiple collections

collections = { "documents": { "metadata": {"description": "Document embeddings"}, "embedding_function": None # Use default }, "images": { "metadata": {"description": "Image embeddings"}, "embedding_function": None }, "code": { "metadata": {"description": "Code snippets"}, "embedding_function": None } }

for name, config in collections.items(): collection = client.get_or_create_collection( name=name, metadata=config["metadata"] )

2. List all collections

all_collections = client.list_collections() for coll in all_collections: print(f"Collection: {coll.name}") print(f" Count: {coll.count()}") print(f" Metadata: {coll.metadata}")

3. Collection-specific operations

docs_collection = client.get_collection("documents") docs_collection.add( documents=["Document text..."], metadatas=[{"type": "article"}], ids=["doc1"] )

4. Delete collection

client.delete_collection("test_collection")

5. Multi-collection search

def search_all_collections(query: str, n_results: int = 5): """Search across all collections.""" results = {}

for collection in client.list_collections():
    try:
        collection_results = collection.query(
            query_texts=[query],
            n_results=n_results
        )
        results[collection.name] = collection_results
    except Exception as e:
        print(f"Error searching {collection.name}: {e}")

return results

Performance & Scaling

Batch Operations Best Practices

from pinecone import Pinecone from typing import List, Dict import asyncio from concurrent.futures import ThreadPoolExecutor import time

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("production-search")

1. Optimal batch size

OPTIMAL_BATCH_SIZE = 100 # Pinecone recommendation

def batch_upsert(vectors: List[Dict], batch_size: int = OPTIMAL_BATCH_SIZE): """Efficiently upsert vectors in batches.""" total_batches = (len(vectors) + batch_size - 1) // batch_size

for i in range(0, len(vectors), batch_size):
    batch = vectors[i:i + batch_size]
    index.upsert(vectors=batch, namespace="documents")

    if (i // batch_size + 1) % 10 == 0:
        print(f"Processed {i // batch_size + 1}/{total_batches} batches")

2. Parallel batch upsert

def parallel_batch_upsert(vectors: List[Dict], num_workers: int = 4): """Parallel upsert using thread pool.""" batch_size = 100 batches = [ vectors[i:i + batch_size] for i in range(0, len(vectors), batch_size) ]

def upsert_batch(batch):
    try:
        index.upsert(vectors=batch, namespace="documents")
        return len(batch)
    except Exception as e:
        print(f"Error upserting batch: {e}")
        return 0

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    results = list(executor.map(upsert_batch, batches))

print(f"Successfully upserted {sum(results)} vectors")

3. Rate limiting for API calls

class RateLimiter: """Simple rate limiter for API calls."""

def __init__(self, max_calls: int, time_window: float):
    self.max_calls = max_calls
    self.time_window = time_window
    self.calls = []

def wait_if_needed(self):
    """Wait if rate limit would be exceeded."""
    now = time.time()
    # Remove old calls outside time window
    self.calls = [call_time for call_time in self.calls
                  if now - call_time < self.time_window]

    if len(self.calls) >= self.max_calls:
        sleep_time = self.time_window - (now - self.calls[0])
        if sleep_time > 0:
            time.sleep(sleep_time)
            self.calls = []

    self.calls.append(now)

Usage

rate_limiter = RateLimiter(max_calls=100, time_window=60) # 100 calls/minute

for batch in batches: rate_limiter.wait_if_needed() index.upsert(vectors=batch)

4. Bulk delete optimization

def bulk_delete_by_filter(filter_dict: Dict, namespace: str = "documents"): """Delete vectors matching filter (more efficient than individual deletes).""" # First, get IDs matching filter results = index.query( vector=[0] * 1536, # Dummy vector top_k=10000, # Max allowed filter=filter_dict, namespace=namespace, include_values=False )

ids_to_delete = [match.id for match in results.matches]

# Delete in batches
batch_size = 1000
for i in range(0, len(ids_to_delete), batch_size):
    batch = ids_to_delete[i:i + batch_size]
    index.delete(ids=batch, namespace=namespace)
    print(f"Deleted {len(batch)} vectors")

Query Optimization

1. Minimize data transfer

results = index.query( vector=query_vector, top_k=10, include_values=False, # Don't return vectors if not needed include_metadata=False, # Don't return metadata if not needed namespace="documents" )

2. Use appropriate top_k

Smaller top_k = faster queries

results_small = index.query(vector=query_vector, top_k=10) # Fast results_large = index.query(vector=query_vector, top_k=1000) # Slower

3. Filter before vector search when possible

Good: Reduces search space

results = index.query( vector=query_vector, top_k=10, filter={"category": "education"}, # Reduces candidates namespace="documents" )

4. Batch queries when possible

More efficient than individual queries

queries = [embedding1, embedding2, embedding3] results = index.query( queries=queries, top_k=10, namespace="documents" )

5. Cache frequent queries

from functools import lru_cache import hashlib import json

def vector_hash(vector: List[float]) -> str: """Create hash of vector for caching.""" return hashlib.md5(json.dumps(vector).encode()).hexdigest()

class CachedIndex: """Wrapper with query caching."""

def __init__(self, index, cache_size: int = 1000):
    self.index = index
    self.cache = {}
    self.cache_size = cache_size

def query(self, vector: List[float], top_k: int = 10, **kwargs):
    """Query with caching."""
    cache_key = f"{vector_hash(vector)}_{top_k}_{json.dumps(kwargs)}"

    if cache_key in self.cache:
        return self.cache[cache_key]

    results = self.index.query(vector=vector, top_k=top_k, **kwargs)

    if len(self.cache) >= self.cache_size:
        # Remove oldest entry
        self.cache.pop(next(iter(self.cache)))

    self.cache[cache_key] = results
    return results

Usage

cached_index = CachedIndex(index) results = cached_index.query(query_vector, top_k=10) # Cached on subsequent calls

Scaling Strategies

1. Index sizing for scale

def calculate_index_requirements( num_vectors: int, dimension: int, metadata_size_per_vector: int = 1024 # bytes ) -> Dict: """Calculate storage and cost for index.""" # Approximate calculations vector_size = dimension * 4 # 4 bytes per float32 total_vector_storage = num_vectors * vector_size total_metadata_storage = num_vectors * metadata_size_per_vector total_storage = total_vector_storage + total_metadata_storage

# Pinecone pricing (approximate)
storage_cost_per_gb_month = 0.095  # Serverless pricing
total_gb = total_storage / (1024 ** 3)
monthly_storage_cost = total_gb * storage_cost_per_gb_month

return {
    "num_vectors": num_vectors,
    "total_storage_gb": round(total_gb, 2),
    "monthly_storage_cost_usd": round(monthly_storage_cost, 2),
    "recommended_pod_type": "s1.x1" if num_vectors > 10_000_000 else "p1.x1"
}

Example

reqs = calculate_index_requirements( num_vectors=10_000_000, dimension=1536 ) print(f"10M vectors storage: {reqs['total_storage_gb']} GB") print(f"Monthly cost: ${reqs['monthly_storage_cost_usd']}")

2. Sharding strategy for massive scale

def create_sharded_indexes( base_name: str, num_shards: int, dimension: int, metric: str = "cosine" ): """Create multiple indexes for horizontal scaling.""" indexes = []

for shard_id in range(num_shards):
    index_name = f"{base_name}-shard-{shard_id}"

    pc.create_index(
        name=index_name,
        dimension=dimension,
        metric=metric,
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )

    indexes.append(index_name)

return indexes

def route_to_shard(vector_id: str, num_shards: int) -> int: """Determine which shard a vector belongs to.""" return hash(vector_id) % num_shards

def query_sharded_indexes(query_vector: List[float], indexes: List, top_k: int = 10): """Query all shards and merge results.""" all_results = []

for index_name in indexes:
    idx = pc.Index(index_name)
    results = idx.query(
        vector=query_vector,
        top_k=top_k,
        include_metadata=True
    )
    all_results.extend(results.matches)

# Sort by score and return top_k
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:top_k]

Production Best Practices

Error Handling & Retries

import time from typing import Optional, Callable import logging

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

class PineconeRetryHandler: """Robust error handling for Pinecone operations."""

def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
    self.max_retries = max_retries
    self.base_delay = base_delay

def retry_with_backoff(
    self,
    operation: Callable,
    *args,
    **kwargs
) -> Optional[any]:
    """Retry operation with exponential backoff."""
    for attempt in range(self.max_retries):
        try:
            return operation(*args, **kwargs)
        except Exception as e:
            if attempt == self.max_retries - 1:
                logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
                raise

            delay = self.base_delay * (2 ** attempt)
            logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
            time.sleep(delay)

    return None

Usage

retry_handler = PineconeRetryHandler(max_retries=3)

Upsert with retry

def safe_upsert(vectors, namespace="documents"): return retry_handler.retry_with_backoff( index.upsert, vectors=vectors, namespace=namespace )

Query with retry

def safe_query(vector, top_k=10, **kwargs): return retry_handler.retry_with_backoff( index.query, vector=vector, top_k=top_k, **kwargs )

Example

try: results = safe_query(query_vector, top_k=10, include_metadata=True) except Exception as e: logger.error(f"Query failed permanently: {e}")

Monitoring & Observability

import time from dataclasses import dataclass from typing import Dict, List from datetime import datetime

@dataclass class QueryMetrics: """Track query performance metrics.""" query_time: float result_count: int top_score: float timestamp: datetime namespace: str filter_used: bool

class VectorDBMonitor: """Monitor vector database operations."""

def __init__(self):
    self.metrics: List[QueryMetrics] = []

def track_query(
    self,
    query_func: Callable,
    *args,
    **kwargs
):
    """Track query execution and metrics."""
    start_time = time.time()
    results = query_func(*args, **kwargs)
    elapsed = time.time() - start_time

    metrics = QueryMetrics(
        query_time=elapsed,
        result_count=len(results.matches),
        top_score=results.matches[0].score if results.matches else 0.0,
        timestamp=datetime.now(),
        namespace=kwargs.get('namespace', 'default'),
        filter_used='filter' in kwargs
    )

    self.metrics.append(metrics)

    # Alert on slow queries
    if elapsed > 1.0:  # 1 second threshold
        logger.warning(f"Slow query detected: {elapsed:.2f}s")

    return results

def get_stats(self) -> Dict:
    """Get aggregate statistics."""
    if not self.metrics:
        return {}

    query_times = [m.query_time for m in self.metrics]

    return {
        "total_queries": len(self.metrics),
        "avg_query_time": sum(query_times) / len(query_times),
        "p95_query_time": sorted(query_times)[int(len(query_times) * 0.95)],
        "p99_query_time": sorted(query_times)[int(len(query_times) * 0.99)],
        "avg_results": sum(m.result_count for m in self.metrics) / len(self.metrics),
        "filtered_queries_pct": sum(1 for m in self.metrics if m.filter_used) / len(self.metrics) * 100
    }

Usage

monitor = VectorDBMonitor()

Wrap queries

results = monitor.track_query( index.query, vector=query_vector, top_k=10, namespace="documents", filter={"category": "education"} )

Get statistics

stats = monitor.get_stats() print(f"Average query time: {stats['avg_query_time']:.3f}s") print(f"P95 query time: {stats['p95_query_time']:.3f}s")

Data Validation

from typing import List, Dict import numpy as np

class VectorValidator: """Validate vectors and metadata before operations."""

def __init__(self, expected_dimension: int):
    self.expected_dimension = expected_dimension

def validate_vector(self, vector: List[float]) -> tuple[bool, str]:
    """Validate vector format and content."""
    # Check type
    if not isinstance(vector, (list, np.ndarray)):
        return False, "Vector must be list or numpy array"

    # Check dimension
    if len(vector) != self.expected_dimension:
        return False, f"Expected {self.expected_dimension} dimensions, got {len(vector)}"

    # Check for NaN or Inf
    if any(not np.isfinite(v) for v in vector):
        return False, "Vector contains NaN or Inf values"

    # Check for zero vector
    if all(v == 0 for v in vector):
        return False, "Zero vector not allowed"

    return True, "Valid"

def validate_metadata(self, metadata: Dict) -> tuple[bool, str]:
    """Validate metadata format."""
    # Check type
    if not isinstance(metadata, dict):
        return False, "Metadata must be dictionary"

    # Check metadata size (Pinecone limit: 40KB)
    metadata_str = str(metadata)
    if len(metadata_str.encode('utf-8')) > 40_000:
        return False, "Metadata exceeds 40KB limit"

    # Check for required fields (customize as needed)
    required_fields = ["title", "category"]
    for field in required_fields:
        if field not in metadata:
            return False, f"Missing required field: {field}"

    return True, "Valid"

def validate_batch(
    self,
    vectors: List[Dict]
) -> tuple[List[Dict], List[str]]:
    """Validate batch of vectors, return valid ones and errors."""
    valid_vectors = []
    errors = []

    for i, item in enumerate(vectors):
        # Validate vector
        is_valid, error = self.validate_vector(item.get('values', []))
        if not is_valid:
            errors.append(f"Vector {i} ({item.get('id', 'unknown')}): {error}")
            continue

        # Validate metadata
        if 'metadata' in item:
            is_valid, error = self.validate_metadata(item['metadata'])
            if not is_valid:
                errors.append(f"Metadata {i} ({item.get('id', 'unknown')}): {error}")
                continue

        valid_vectors.append(item)

    return valid_vectors, errors

Usage

validator = VectorValidator(expected_dimension=1536)

Validate single vector

is_valid, error = validator.validate_vector(embedding) if not is_valid: print(f"Invalid vector: {error}")

Validate batch

valid_vectors, errors = validator.validate_batch(vectors_to_upsert) if errors: for error in errors: logger.error(error)

Upsert only valid vectors

if valid_vectors: index.upsert(vectors=valid_vectors)

Backup & Disaster Recovery

import json import gzip from datetime import datetime from pathlib import Path

class VectorDBBackup: """Backup and restore vector database data."""

def __init__(self, index, backup_dir: str = "./backups"):
    self.index = index
    self.backup_dir = Path(backup_dir)
    self.backup_dir.mkdir(exist_ok=True)

def backup_namespace(
    self,
    namespace: str = "documents",
    compress: bool = True
) -> str:
    """Backup all vectors in a namespace."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"backup_{namespace}_{timestamp}.json"

    if compress:
        filename += ".gz"

    filepath = self.backup_dir / filename

    # Fetch all vectors (in batches)
    all_vectors = []
    batch_size = 100

    # Get all IDs first (would need to be tracked separately)
    # This is a simplified example
    stats = self.index.describe_index_stats()

    # For actual implementation, you'd need to track IDs
    # or use fetch with known IDs

    # Save to file
    data = {
        "namespace": namespace,
        "timestamp": timestamp,
        "vector_count": len(all_vectors),
        "vectors": all_vectors
    }

    if compress:
        with gzip.open(filepath, 'wt', encoding='utf-8') as f:
            json.dump(data, f)
    else:
        with open(filepath, 'w') as f:
            json.dump(data, f, indent=2)

    logger.info(f"Backed up {len(all_vectors)} vectors to {filepath}")
    return str(filepath)

def restore_from_backup(
    self,
    backup_file: str,
    target_namespace: str = None
):
    """Restore vectors from backup file."""
    filepath = Path(backup_file)

    # Load backup
    if filepath.suffix == '.gz':
        with gzip.open(filepath, 'rt', encoding='utf-8') as f:
            data = json.load(f)
    else:
        with open(filepath, 'r') as f:
            data = json.load(f)

    namespace = target_namespace or data['namespace']
    vectors = data['vectors']

    # Restore in batches
    batch_size = 100
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i + batch_size]
        self.index.upsert(vectors=batch, namespace=namespace)
        logger.info(f"Restored {len(batch)} vectors")

    logger.info(f"Restored {len(vectors)} vectors to namespace '{namespace}'")

Usage

backup_manager = VectorDBBackup(index)

Backup

backup_file = backup_manager.backup_namespace("production")

Restore

backup_manager.restore_from_backup(backup_file, target_namespace="production-restored")

Cost Optimization

Storage Optimization

1. Reduce metadata size

Bad: Storing full content in metadata

bad_metadata = { "title": "Long document title", "full_content": "...<entire document>...", # Wastes space "description": "...<long description>...", "extra_field_1": "...", "extra_field_2": "..." }

Good: Store only necessary metadata

good_metadata = { "title": "Long document title", "doc_id": "doc-123", # Reference to external store "category": "education", "created_at": "2024-01-15" }

2. Use selective metadata indexing

Only index fields you'll filter on

pc.create_index( name="optimized-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1", schema={ "fields": { "category": {"filterable": True}, # Need to filter "created_at": {"filterable": True}, # Need to filter "title": {"filterable": False}, # Just for display "description": {"filterable": False} # Just for display } } ) )

3. Regular cleanup of unused vectors

def cleanup_old_vectors(days_old: int = 90): """Delete vectors older than specified days.""" from datetime import datetime, timedelta

cutoff_date = (datetime.now() - timedelta(days=days_old)).isoformat()

# Delete by filter
index.delete(
    filter={"created_at": {"$lt": cutoff_date}},
    namespace="documents"
)

4. Compress dimensions for smaller models

text-embedding-3-small: 1536 dimensions

all-MiniLM-L6-v2: 384 dimensions (75% storage reduction)

Trade-off: slightly lower accuracy for significant cost savings

Query Cost Optimization

1. Batch queries instead of individual

Bad: Multiple individual queries

for query in queries: results = index.query(vector=query, top_k=10) # N API calls

Good: Single batch query

results = index.query( queries=query_vectors, # 1 API call top_k=10 )

2. Use appropriate top_k

Larger top_k = more expensive

results = index.query( vector=query_vector, top_k=10, # Usually sufficient # top_k=1000 # Much more expensive )

3. Minimize data transfer

results = index.query( vector=query_vector, top_k=10, include_values=False, # Save bandwidth include_metadata=False # Save bandwidth if not needed )

4. Use caching for repeated queries

from functools import lru_cache

@lru_cache(maxsize=1000) def cached_search(query_text: str, top_k: int = 10): """Cache search results for identical queries.""" embedding = generate_embedding(query_text) results = index.query( vector=embedding, top_k=top_k, include_metadata=True ) return results

5. Choose serverless vs pods appropriately

Serverless: Low/variable traffic (pay per query)

Pods: High consistent traffic (fixed cost)

def choose_deployment_type( queries_per_month: int, avg_response_time_requirement: float = 100 # ms ) -> str: """Recommend deployment type based on usage.""" # Rough cost calculations (update with current pricing) serverless_cost_per_query = 0.0001 # Example pod_cost_per_month = 70 # p1.x1 pod

serverless_monthly_cost = queries_per_month * serverless_cost_per_query

if serverless_monthly_cost &#x3C; pod_cost_per_month:
    return "serverless"
else:
    return "pods"

Cost Monitoring

import json from datetime import datetime, timedelta from collections import defaultdict

class CostMonitor: """Monitor and estimate vector database costs."""

def __init__(self):
    self.operations = defaultdict(int)
    self.pricing = {
        "serverless_write_units": 0.0000025,  # per write unit
        "serverless_read_units": 0.00000625,  # per read unit
        "serverless_storage_gb": 0.095,  # per GB per month
        "p1_x1_pod": 0.096,  # per hour
        "p2_x1_pod": 0.240,  # per hour
    }

def track_operation(self, operation_type: str, units: int = 1):
    """Track database operations."""
    self.operations[operation_type] += units

def estimate_monthly_cost(
    self,
    deployment_type: str,
    storage_gb: float = 0,
    pod_type: str = None
) -> Dict:
    """Estimate monthly costs."""
    costs = {}

    if deployment_type == "serverless":
        # Storage cost
        storage_cost = storage_gb * self.pricing["serverless_storage_gb"]

        # Operation costs
        write_cost = (
            self.operations["upsert"] *
            self.pricing["serverless_write_units"]
        )
        read_cost = (
            self.operations["query"] *
            self.pricing["serverless_read_units"]
        )

        costs = {
            "storage": storage_cost,
            "writes": write_cost,
            "reads": read_cost,
            "total": storage_cost + write_cost + read_cost
        }

    elif deployment_type == "pods":
        # Fixed pod cost
        hours_per_month = 730
        pod_cost = self.pricing.get(f"{pod_type}_pod", 0) * hours_per_month

        costs = {
            "pod": pod_cost,
            "total": pod_cost
        }

    return costs

def get_cost_report(self) -> str:
    """Generate cost report."""
    report = f"\n{'=' * 50}\n"
    report += "VECTOR DATABASE COST REPORT\n"
    report += f"{'=' * 50}\n\n"
    report += "Operations Summary:\n"

    for operation, count in self.operations.items():
        report += f"  {operation}: {count:,}\n"

    report += f"\n{'=' * 50}\n"
    return report

Usage

cost_monitor = CostMonitor()

Track operations

def monitored_upsert(vectors, **kwargs): cost_monitor.track_operation("upsert", len(vectors)) return index.upsert(vectors=vectors, **kwargs)

def monitored_query(vector, **kwargs): cost_monitor.track_operation("query", 1) return index.query(vector=vector, **kwargs)

Get cost estimate

monthly_cost = cost_monitor.estimate_monthly_cost( deployment_type="serverless", storage_gb=10.5 )

print(f"Estimated monthly cost: ${monthly_cost['total']:.2f}") print(cost_monitor.get_cost_report())

Summary

This comprehensive guide covers all aspects of vector database management across Pinecone, Weaviate, and Chroma. Key takeaways:

  • Choose the right database: Pinecone for production scale, Weaviate for knowledge graphs, Chroma for local development

  • Optimize embeddings: Balance dimension size with accuracy and cost

  • Use metadata filtering: Combine vector similarity with structured filtering for powerful search

  • Implement hybrid search: Combine dense and sparse vectors for best results

  • Scale efficiently: Use batching, caching, and appropriate index configurations

  • Monitor and optimize costs: Track usage and choose the right deployment type

For more information:

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

docker-compose-orchestration

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-database-engineering

No summary provided by upstream source.

Repository SourceNeeds Review
General

jest-react-testing

No summary provided by upstream source.

Repository SourceNeeds Review
General

ui-design-patterns

No summary provided by upstream source.

Repository SourceNeeds Review