Building RAG Systems
Production-grade RAG with semantic chunking, incremental updates, and filtered retrieval.
Quick Start
Dependencies
pip install qdrant-client openai pydantic python-frontmatter
Core components
1. Crawler → discovers files, extracts path metadata
2. Parser → extracts frontmatter, computes file hash
3. Chunker → semantic split on ## headers, 400 tokens, 15% overlap
4. Embedder → batched OpenAI embeddings
5. Uploader → Qdrant upsert with indexed payloads
Ingestion Pipeline
Architecture
┌──────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐ │ Crawler │ -> │ Parser │ -> │ Chunker │ -> │ Embedder │ -> │ Uploader │ └──────────┘ └────────┘ └─────────┘ └──────────┘ └──────────┘ │ │ │ │ │ Discovers Extracts Splits by Generates Upserts to files frontmatter semantic vectors Qdrant + file hash boundaries (batched) (batched)
Semantic Chunking (NOT Fixed-Size)
class SemanticChunker: """ Production chunking: - Split on ## headers (semantic boundaries) - Target 400 tokens (NVIDIA benchmark optimal) - 15% overlap for context continuity - Track prev/next for context expansion """ SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE) TOKENS_PER_WORD = 1.3
def __init__(
self,
target_tokens: int = 400,
max_tokens: int = 512,
overlap_percent: float = 0.15,
):
self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
self.overlap_words = int(self.target_words * overlap_percent)
def chunk(self, content: str, file_hash: str) -> list[Chunk]:
sections = self.SECTION_PATTERN.split(content)
chunks = []
for idx, section in enumerate(sections):
content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"
chunks.append(Chunk(
id=chunk_id,
text=section,
chunk_index=idx,
total_chunks=len(sections),
prev_chunk_id=chunks[-1].id if chunks else None,
content_hash=content_hash,
source_file_hash=file_hash,
))
# Set next_chunk_id on previous
if len(chunks) > 1:
chunks[-2].next_chunk_id = chunk_id
return chunks
Change Detection (Incremental Updates)
def compute_file_hash(file_path: str) -> str: """SHA-256 for change detection.""" with open(file_path, 'rb') as f: return hashlib.sha256(f.read()).hexdigest()
class QdrantStateTracker: """Query Qdrant payloads directly - no external state DB needed."""
def get_indexed_files(self, book_id: str) -> dict[str, str]:
"""Returns {file_path: file_hash} from Qdrant."""
indexed = {}
offset = None
while True:
points, next_offset = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="book_id", match=MatchValue(value=book_id))
]),
limit=100,
offset=offset,
with_payload=["source_file", "source_file_hash"],
with_vectors=False,
)
for point in points:
indexed[point.payload["source_file"]] = point.payload["source_file_hash"]
if next_offset is None:
break
offset = next_offset
return indexed
def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
"""Compare filesystem vs index."""
new = [p for p in current if p not in indexed]
deleted = [p for p in indexed if p not in current]
modified = [p for p in current if p in indexed and current[p] != indexed[p]]
return new, modified, deleted
Batched Embeddings
class OpenAIEmbedder: def init(self, model: str = "text-embedding-3-small", batch_size: int = 20): self.client = OpenAI() self.model = model self.batch_size = batch_size # OpenAI recommendation
def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
embedded = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
response = self.client.embeddings.create(
input=[c.text for c in batch],
model=self.model,
)
for chunk, data in zip(batch, response.data):
embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
return embedded
Qdrant Collection with Payload Indexes
def create_collection(self, recreate: bool = False): """Create collection with proper indexes for filtered retrieval.""" self.client.create_collection( collection_name=self.collection, vectors_config=VectorParams(size=1536, distance=Distance.COSINE), )
# Index ALL fields you filter by
indexes = [
("book_id", PayloadSchemaType.KEYWORD), # Tenant isolation
("module", PayloadSchemaType.KEYWORD), # Content filter
("chapter", PayloadSchemaType.INTEGER), # Range filter
("hardware_tier", PayloadSchemaType.INTEGER),# Personalization
("proficiency_level", PayloadSchemaType.KEYWORD),
("parent_doc_id", PayloadSchemaType.KEYWORD),# Context expansion
("source_file_hash", PayloadSchemaType.KEYWORD), # Change detection
]
for field, schema in indexes:
self.client.create_payload_index(
collection_name=self.collection,
field_name=field,
field_schema=schema,
)
Retrieval Patterns
Comprehensive Filter Builder
def build_filter(self, query: SearchQuery) -> Filter: """Build Qdrant filter with all conditions (AND logic).""" conditions = []
# Required: Tenant isolation
conditions.append(FieldCondition(
key="book_id", match=MatchValue(value=query.book_id)
))
# Required: Hardware tier (lte = "tier X or lower")
conditions.append(FieldCondition(
key="hardware_tier", range=Range(lte=query.hardware_tier)
))
# Optional: Module exact match
if query.module:
conditions.append(FieldCondition(
key="module", match=MatchValue(value=query.module)
))
# Optional: Chapter range
if query.chapter_min or query.chapter_max:
chapter_range = Range()
if query.chapter_min:
chapter_range.gte = query.chapter_min
if query.chapter_max:
chapter_range.lte = query.chapter_max
conditions.append(FieldCondition(key="chapter", range=chapter_range))
# Optional: Proficiency OR logic
if query.proficiency_levels:
conditions.append(FieldCondition(
key="proficiency_level",
match=MatchAny(any=query.proficiency_levels),
))
return Filter(must=conditions)
Context Expansion (Walk Chunk Chain)
def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]: """Walk prev_chunk_id/next_chunk_id chain for surrounding context.""" current = self.get_chunk_by_id(chunk_id) if not current: return []
# Walk backwards
prev_chunks = []
prev_id = current.prev_chunk_id
for _ in range(prev):
if not prev_id:
break
chunk = self.get_chunk_by_id(prev_id)
if not chunk:
break
prev_chunks.insert(0, chunk)
prev_id = chunk.prev_chunk_id
# Walk forwards
next_chunks = []
next_id = current.next_chunk_id
for _ in range(next):
if not next_id:
break
chunk = self.get_chunk_by_id(next_id)
if not chunk:
break
next_chunks.append(chunk)
next_id = chunk.next_chunk_id
return prev_chunks + [current] + next_chunks
Full Document Retrieval
def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]: """Get all chunks for a document, ordered by chunk_index.""" points, _ = self.client.scroll( collection_name=self.collection, scroll_filter=Filter(must=[ FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id)) ]), limit=100, with_payload=True, with_vectors=False, )
chunks = [self._to_chunk(p) for p in points]
chunks.sort(key=lambda c: c.chunk_index)
return chunks
Payload Schema
class ChunkPayload(BaseModel): """Complete payload for filtered retrieval and context expansion."""
# Tenant isolation
book_id: str
# Content filters (all indexed)
module: str
chapter: int
lesson: int
hardware_tier: int
proficiency_level: str
# Display content
text: str
section_title: Optional[str]
source_file: str
# Context expansion
parent_doc_id: str
chunk_index: int
total_chunks: int
prev_chunk_id: Optional[str]
next_chunk_id: Optional[str]
# Change detection
content_hash: str
source_file_hash: str
Anti-Patterns
Don't Do Instead
Fixed character chunking Semantic boundaries (## headers)
Position-based chunk IDs Content hash for stable IDs
No overlap between chunks 10-20% overlap for continuity
Full re-index on every change Incremental with file hash detection
Missing payload indexes Index every field you filter by
Synchronous embedding Batch with background jobs
External state database Qdrant-native state tracking
Verification
Run: python scripts/verify.py
Related Skills
-
scaffolding-fastapi-dapr
-
API patterns for search endpoints
-
streaming-llm-responses
-
Streaming RAG responses
References
-
references/ingestion-patterns.md - Full ingestion pipeline
-
references/retrieval-patterns.md - Filter strategies, context expansion