Rag Retrieval
Retrieval-Augmented Generation patterns for grounded LLM responses. Use when building RAG pipelines, embedding documents, implementing hybrid search, contextual retrieval, HyDE, agentic RAG, multimodal RAG, query decomposition, reranking, or pgvector search.
Primary Agent: data-pipeline-engineer
RAG Retrieval
Comprehensive patterns for building production RAG systems. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Core RAG | 4 | CRITICAL | Basic RAG, citations, hybrid search, context management |
| Embeddings | 3 | HIGH | Model selection, chunking, batch/cache optimization |
| Contextual Retrieval | 3 | HIGH | Context-prepending, hybrid BM25+vector, pipeline |
| HyDE | 3 | HIGH | Vocabulary mismatch, hypothetical document generation |
| Agentic RAG | 4 | HIGH | Self-RAG, CRAG, knowledge graphs, adaptive routing |
| Multimodal RAG | 3 | MEDIUM | Image+text retrieval, PDF chunking, cross-modal search |
| Query Decomposition | 3 | MEDIUM | Multi-concept queries, parallel retrieval, RRF fusion |
| Reranking | 3 | MEDIUM | Cross-encoder, LLM scoring, combined signals |
| PGVector | 4 | HIGH | PostgreSQL hybrid search, HNSW indexes, schema design |
Total: 30 rules across 9 categories
Core RAG
Fundamental patterns for retrieval, generation, and pipeline composition.
| Rule | File | Key Pattern |
|---|---|---|
| Basic RAG | rules/core-basic-rag.md | Retrieve + context + generate with citations |
| Hybrid Search | rules/core-hybrid-search.md | RRF fusion (k=60) for semantic + keyword |
| Context Management | rules/core-context-management.md | Token budgeting + sufficiency check |
| Pipeline Composition | rules/core-pipeline-composition.md | Composable Decompose → HyDE → Retrieve → Rerank |
Embeddings
Embedding models, chunking strategies, and production optimization.
| Rule | File | Key Pattern |
|---|---|---|
| Models & API | rules/embeddings-models.md | Model selection, batch API, similarity |
| Chunking | rules/embeddings-chunking.md | Semantic boundary splitting, 512 token sweet spot |
| Advanced | rules/embeddings-advanced.md | Redis cache, Matryoshka dims, batch processing |
Contextual Retrieval
Anthropic's context-prepending technique — 67% fewer retrieval failures.
| Rule | File | Key Pattern |
|---|---|---|
| Context Prepending | rules/contextual-prepend.md | LLM-generated context + prompt caching |
| Hybrid Search | rules/contextual-hybrid.md | 40% BM25 / 60% vector weight split |
| Complete Pipeline | rules/contextual-pipeline.md | End-to-end indexing + hybrid retrieval |
HyDE
Hypothetical Document Embeddings for bridging vocabulary gaps.
| Rule | File | Key Pattern |
|---|---|---|
| Generation | rules/hyde-generation.md | Embed hypothetical doc, not query |
| Per-Concept | rules/hyde-per-concept.md | Parallel HyDE for multi-topic queries |
| Fallback | rules/hyde-fallback.md | 2-3s timeout → direct embedding fallback |
Agentic RAG
Self-correcting retrieval with LLM-driven decision making.
| Rule | File | Key Pattern |
|---|---|---|
| Self-RAG | rules/agentic-self-rag.md | Binary document grading for relevance |
| Corrective RAG | rules/agentic-corrective-rag.md | CRAG workflow with web fallback |
| Knowledge Graph | rules/agentic-knowledge-graph.md | KG + vector hybrid for entity-rich domains |
| Adaptive Retrieval | rules/agentic-adaptive-retrieval.md | Query routing to optimal strategy |
Multimodal RAG
Image + text retrieval with cross-modal search.
| Rule | File | Key Pattern |
|---|---|---|
| Embeddings | rules/multimodal-embeddings.md | CLIP, SigLIP 2, Voyage multimodal-3 |
| Chunking | rules/multimodal-chunking.md | PDF extraction preserving images |
| Pipeline | rules/multimodal-pipeline.md | Dedup + hybrid retrieval + generation |
Query Decomposition
Breaking complex queries into concepts for parallel retrieval.
| Rule | File | Key Pattern |
|---|---|---|
| Detection | rules/query-detection.md | Heuristic indicators (<1ms fast path) |
| Decompose + RRF | rules/query-decompose.md | LLM concept extraction + parallel retrieval |
| HyDE Combo | rules/query-hyde-combo.md | Decompose + HyDE for maximum coverage |
Reranking
Post-retrieval re-scoring for higher precision.
| Rule | File | Key Pattern |
|---|---|---|
| Cross-Encoder | rules/reranking-cross-encoder.md | ms-marco-MiniLM (~50ms, free) |
| LLM Reranking | rules/reranking-llm.md | Batch scoring + Cohere API |
| Combined | rules/reranking-combined.md | Multi-signal weighted scoring |
PGVector
Production hybrid search with PostgreSQL.
| Rule | File | Key Pattern |
|---|---|---|
| Schema | rules/pgvector-schema.md | HNSW index + pre-computed tsvector |
| Hybrid Search | rules/pgvector-hybrid-search.md | SQLAlchemy RRF with FULL OUTER JOIN |
| Indexing | rules/pgvector-indexing.md | HNSW (17x faster) vs IVFFlat |
| Metadata | rules/pgvector-metadata.md | Filtering, boosting, Redis 8 comparison |
Quick Start Example
from openai import OpenAI
client = OpenAI()
async def rag_query(question: str, top_k: int = 5) -> dict:
"""Basic RAG with citations."""
docs = await vector_db.search(question, limit=top_k)
context = "\n\n".join([f"[{i+1}] {doc.text}" for i, doc in enumerate(docs)])
response = await llm.chat([
{"role": "system", "content": "Answer with inline citations [1], [2]. Use ONLY provided context."},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
])
return {"answer": response.content, "sources": [d.metadata['source'] for d in docs]}Key Decisions
| Decision | Recommendation |
|---|---|
| Embedding model | text-embedding-3-small (general), voyage-3 (production) |
| Chunk size | 256-1024 tokens (512 typical) |
| Hybrid weight | 40% BM25 / 60% vector |
| Top-k | 3-10 documents |
| Temperature | 0.1-0.3 (factual) |
| Context budget | 4K-8K tokens |
| Reranking | Retrieve 50, rerank to 10 |
| Vector index | HNSW (production), IVFFlat (high-volume) |
| HyDE timeout | 2-3 seconds with fallback |
| Query decomposition | Heuristic first, LLM only if multi-concept |
Common Mistakes
- No citation tracking (unverifiable answers)
- Context too large (dilutes relevance)
- Single retrieval method (misses keyword matches)
- Not chunking long documents (context gets lost)
- Embedding queries differently than documents
- No fallback path in agentic RAG (workflow hangs)
- Infinite rewrite loops (no retry limit)
- Using wrong similarity metric (cosine vs euclidean)
- Not caching embeddings (recomputing unchanged content)
- Missing image captions in multimodal RAG (limits text search)
Evaluations
See test-cases.json for 30 test cases across all categories.
Related Skills
ork:langgraph- LangGraph workflow patterns (for agentic RAG workflows)caching- Cache RAG responses for repeated queriesork:golden-dataset- Evaluate retrieval qualityork:llm-integration- Local embeddings with nomic-embed-textvision-language-models- Image analysis for multimodal RAGork:database-patterns- Schema design for vector search
Capability Details
retrieval-patterns
Keywords: retrieval, context, chunks, relevance, rag Solves:
- Retrieve relevant context for LLM
- Implement RAG pipeline with citations
- Optimize retrieval quality
hybrid-search
Keywords: hybrid, bm25, vector, fusion, rrf Solves:
- Combine keyword and semantic search
- Implement reciprocal rank fusion
- Balance precision and recall
embeddings
Keywords: embedding, text to vector, vectorize, chunk, similarity Solves:
- Convert text to vector embeddings
- Choose embedding models and dimensions
- Implement chunking strategies
contextual-retrieval
Keywords: contextual, anthropic, context-prepend, bm25 Solves:
- Prepend context to chunks for better retrieval
- Reduce retrieval failures by 67%
- Implement hybrid BM25+vector search
hyde
Keywords: hyde, hypothetical, vocabulary mismatch Solves:
- Bridge vocabulary gaps in semantic search
- Generate hypothetical documents for embedding
- Handle abstract or conceptual queries
agentic-rag
Keywords: self-rag, crag, corrective, adaptive, grading Solves:
- Build self-correcting RAG workflows
- Grade document relevance
- Implement web search fallback
multimodal-rag
Keywords: multimodal, image, clip, vision, pdf Solves:
- Build RAG with images and text
- Cross-modal search (text → image)
- Process PDFs with mixed content
query-decomposition
Keywords: decompose, multi-concept, complex query Solves:
- Break complex queries into concepts
- Parallel retrieval per concept
- Improve coverage for compound questions
reranking
Keywords: rerank, cross-encoder, precision, scoring Solves:
- Improve search precision post-retrieval
- Score relevance with cross-encoder or LLM
- Combine multiple scoring signals
pgvector-search
Keywords: pgvector, postgresql, hnsw, tsvector, hybrid Solves:
- Production hybrid search with PostgreSQL
- HNSW vs IVFFlat index selection
- SQL-based RRF fusion
Rules (30)
Route queries to the best retrieval strategy using adaptive selection per query type — MEDIUM
Adaptive Retrieval
Route queries to optimal retrieval strategies based on query characteristics.
Query Router:
from pydantic import BaseModel, Field
from typing import Literal
class QueryRoute(BaseModel):
strategy: Literal["direct", "hyde", "decompose", "web"] = Field(
description="Best retrieval strategy for this query"
)
reasoning: str
async def route_query(question: str) -> str:
route = await llm.with_structured_output(QueryRoute).ainvoke(
f"Choose the best retrieval strategy for: {question}\n"
"- direct: Simple factual queries with clear keywords\n"
"- hyde: Abstract/conceptual queries with vocabulary mismatch\n"
"- decompose: Multi-concept queries spanning multiple topics\n"
"- web: Recent events or data not in knowledge base"
)
return route.strategyMulti-Source Orchestration:
async def adaptive_search(question: str, top_k: int = 10) -> list[dict]:
strategy = await route_query(question)
if strategy == "direct":
return await retriever.search(question, top_k=top_k)
elif strategy == "hyde":
hyde_result = await hyde_service.generate(question)
return await retriever.search_by_embedding(hyde_result.embedding, top_k=top_k)
elif strategy == "decompose":
return await decomposed_search(question, top_k=top_k)
elif strategy == "web":
return await web_search(question)Incorrect — hardcoded single retrieval strategy:
async def search(question: str) -> list[dict]:
# Always uses HyDE regardless of query type
hyde_result = await hyde_service.generate(question)
return await retriever.search_by_embedding(hyde_result.embedding, top_k=10)Correct — adaptive routing based on query characteristics:
async def adaptive_search(question: str) -> list[dict]:
strategy = await route_query(question) # Choose best approach
if strategy == "direct":
return await retriever.search(question, top_k=10) # Fast path
elif strategy == "hyde":
hyde_result = await hyde_service.generate(question)
return await retriever.search_by_embedding(hyde_result.embedding, top_k=10)Key rules:
- Route queries to optimal sources based on query type
- Direct search for simple factual queries (fastest)
- HyDE for abstract/conceptual queries (vocabulary bridging)
- Decomposition for multi-concept queries (comprehensive coverage)
- Web search for recent events or out-of-knowledge-base queries
- Routing adds ~200ms overhead — use heuristics for fast-path decisions
Apply corrective RAG with quality assurance and web fallback for self-correction — HIGH
Corrective RAG (CRAG)
Self-correcting retrieval with query rewriting and web search fallback.
CRAG Workflow:
def build_crag_workflow() -> StateGraph:
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("web_search", web_search)
workflow.add_node("transform_query", transform_query)
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade")
workflow.add_conditional_edges("grade", route_after_grading, {
"generate": "generate",
"transform_query": "transform_query",
"web_search": "web_search"
})
workflow.add_edge("transform_query", "retrieve") # Retry
workflow.add_edge("web_search", "generate")
workflow.add_edge("generate", END)
return workflow.compile()
def route_after_grading(state: RAGState) -> str:
if state["web_search_needed"]:
if state.get("retry_count", 0) < 2:
return "transform_query"
return "web_search"
return "generate"Web Search Fallback:
def web_search(state: RAGState) -> dict:
web_results = tavily_client.search(state["question"], max_results=5, search_depth="advanced")
web_docs = [
Document(page_content=r["content"], metadata={"source": r["url"], "type": "web"})
for r in web_results
]
return {"documents": web_docs, "web_search_needed": False}Incorrect — no fallback path or retry limits:
def route_after_grading(state: RAGState) -> str:
if state["web_search_needed"]:
return "transform_query" # Infinite loop possible!
return "generate"Correct — bounded retries with web fallback:
def route_after_grading(state: RAGState) -> str:
if state["web_search_needed"]:
if state.get("retry_count", 0) < 2: # Max 2 retries
return "transform_query"
return "web_search" # Fallback to web
return "generate"Key rules:
- Fallback order: Rewrite query (2x max) -> Web search -> Abstain
- Max 2-3 retries for query rewriting to prevent infinite loops
- Web search as last resort (latency + cost)
- Always include retry_count to prevent infinite loops
- No fallback path = workflow hangs on bad queries
Build knowledge graph RAG for multi-hop reasoning over entity-rich domains — MEDIUM
Knowledge Graph RAG (GraphRAG)
Combine knowledge graphs with vector search for entity-rich domains.
Architecture:
Query → [Entity Extraction] → [KG Lookup] → [Vector Search] → [Merge] → [Generate]Pattern Comparison:
| Pattern | When to Use | Key Feature |
|---|---|---|
| Self-RAG | Need adaptive retrieval | LLM decides when to retrieve |
| CRAG | Need quality assurance | Document grading + web fallback |
| GraphRAG | Entity-rich domains | Knowledge graph + vector hybrid |
| Agentic | Complex multi-step | Full plan-route-act-verify loop |
Incorrect — vector-only search missing entity relationships:
async def search(query: str) -> list[dict]:
# Misses relationships between entities
return await vector_db.search(query, limit=10)Correct — hybrid KG + vector search:
async def graph_rag_search(query: str) -> list[dict]:
entities = await extract_entities(query) # Extract entities from query
kg_results = await neo4j.lookup_entities(entities) # KG lookup
vector_results = await vector_db.search(query, limit=10) # Vector search
return merge_results(kg_results, vector_results) # Combine bothKey rules:
- Use GraphRAG when domain has rich entity relationships (people, organizations, products)
- Combine KG entity lookup with vector similarity for hybrid results
- Entity extraction should use structured output (Pydantic) for reliability
- Multi-hop reasoning: follow graph edges to find connected information
- Neo4j or similar graph DB for production knowledge graphs
Grade document relevance with Self-RAG to prevent context contamination from irrelevant results — HIGH
Self-RAG — Document Grading
LLM grades retrieved documents for relevance before generation.
State Definition:
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, Annotated
from langchain_core.documents import Document
import operator
class RAGState(TypedDict):
question: str
documents: Annotated[List[Document], operator.add]
generation: str
web_search_needed: bool
retry_count: int
relevance_scores: dict[str, float]Document Grading:
from pydantic import BaseModel, Field
class GradeDocuments(BaseModel):
binary_score: str = Field(description="Relevance score 'yes' or 'no'")
def grade_documents(state: RAGState) -> dict:
"""Grade documents for relevance — core Self-RAG pattern."""
question = state["question"]
documents = state["documents"]
filtered_docs, relevance_scores = [], {}
for doc in documents:
score = retrieval_grader.invoke({
"question": question, "document": doc.page_content
})
doc_id = doc.metadata.get("id", hash(doc.page_content))
relevance_scores[doc_id] = 1.0 if score.binary_score == "yes" else 0.0
if score.binary_score == "yes":
filtered_docs.append(doc)
web_search_needed = len(filtered_docs) < len(documents) // 2
return {
"documents": filtered_docs,
"web_search_needed": web_search_needed,
"relevance_scores": relevance_scores
}Incorrect — no document grading, all docs used:
def generate(state: RAGState) -> dict:
# Uses all retrieved docs without quality check
context = "\n\n".join([d.page_content for d in state["documents"]])
return {"generation": llm.invoke(context)}Correct — grade documents before generation:
def grade_documents(state: RAGState) -> dict:
filtered_docs = []
for doc in state["documents"]:
score = grader.invoke({"question": state["question"], "document": doc.page_content})
if score.binary_score == "yes": # Only keep relevant docs
filtered_docs.append(doc)
web_search_needed = len(filtered_docs) < len(state["documents"]) // 2
return {"documents": filtered_docs, "web_search_needed": web_search_needed}Key rules:
- Binary grading (yes/no) is simpler and more reliable than numeric scores
- Trigger web search fallback when >50% of docs are filtered out
- Track relevance scores for debugging and quality monitoring
- Self-RAG lets the LLM decide when to retrieve — adaptive by design
Combine contextual embeddings with BM25 hybrid search for maximum retrieval coverage — HIGH
Contextual Retrieval — Hybrid BM25+Vector
Combine contextual embeddings with BM25 for maximum retrieval quality.
Hybrid Retriever:
from rank_bm25 import BM25Okapi
import numpy as np
class HybridRetriever:
def __init__(self, chunks: list[str], embeddings: np.ndarray):
self.chunks = chunks
self.embeddings = embeddings
tokenized = [c.lower().split() for c in chunks]
self.bm25 = BM25Okapi(tokenized)
def search(
self, query: str, query_embedding: np.ndarray,
top_k: int = 20, bm25_weight: float = 0.4, vector_weight: float = 0.6
) -> list[tuple[int, float]]:
bm25_scores = self.bm25.get_scores(query.lower().split())
bm25_scores = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-6)
vector_scores = np.dot(self.embeddings, query_embedding)
vector_scores = (vector_scores - vector_scores.min()) / (vector_scores.max() - vector_scores.min() + 1e-6)
combined = bm25_weight * bm25_scores + vector_weight * vector_scores
top_indices = np.argsort(combined)[::-1][:top_k]
return [(i, combined[i]) for i in top_indices]Results (Anthropic Research):
| Method | Retrieval Failure Rate |
|---|---|
| Traditional embeddings | 5.7% |
| + Contextual embeddings | 3.5% |
| + Contextual + BM25 hybrid | 1.9% |
| + Contextual + BM25 + reranking | 1.3% |
Incorrect — vector-only search without BM25:
def search(query: str, query_embedding: np.ndarray, top_k: int = 20) -> list[int]:
# Misses exact-match queries
vector_scores = np.dot(self.embeddings, query_embedding)
return np.argsort(vector_scores)[::-1][:top_k]Correct — hybrid BM25 + vector with proper weighting:
def search(query: str, query_embedding: np.ndarray, top_k: int = 20) -> list[tuple[int, float]]:
bm25_scores = self.bm25.get_scores(query.lower().split())
bm25_norm = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-6)
vector_scores = np.dot(self.embeddings, query_embedding)
vector_norm = (vector_scores - vector_scores.min()) / (vector_scores.max() - vector_scores.min() + 1e-6)
combined = 0.4 * bm25_norm + 0.6 * vector_norm # Research-backed weights
return [(i, combined[i]) for i in np.argsort(combined)[::-1][:top_k]]Key rules:
- 67% reduction in retrieval failures with full contextual retrieval pipeline
- Default weight split: 40% BM25 / 60% vector (from Anthropic research)
- BM25 catches exact-match queries that vector search misses
- Normalize scores before weighted combination (min-max normalization)
- Adding reranking on top takes failure rate from 1.9% to 1.3%
Build complete contextual retrieval pipeline integrating context generation, embedding, and hybrid search — MEDIUM
Contextual Retrieval — Complete Pipeline
End-to-end pipeline with context generation, hybrid indexing, and retrieval.
Complete Pipeline:
from dataclasses import dataclass
@dataclass
class ContextualChunk:
original: str
contextualized: str
embedding: list[float]
doc_id: str
chunk_index: int
class ContextualRetriever:
def __init__(self, embed_model, llm_client):
self.embed_model = embed_model
self.llm = llm_client
self.chunks: list[ContextualChunk] = []
def add_document(self, doc_id: str, text: str, chunk_size: int = 512):
raw_chunks = self._chunk_text(text, chunk_size)
contextualized = self._contextualize_batch(text, raw_chunks)
embeddings = self.embed_model.embed(contextualized)
for i, (raw, ctx, emb) in enumerate(zip(raw_chunks, contextualized, embeddings)):
self.chunks.append(ContextualChunk(
original=raw, contextualized=ctx, embedding=emb,
doc_id=doc_id, chunk_index=i
))
self._rebuild_bm25()
def search(self, query: str, top_k: int = 10) -> list[ContextualChunk]:
query_emb = self.embed_model.embed([query])[0]
bm25_scores = self.bm25.get_scores(query.lower().split())
embeddings = np.array([c.embedding for c in self.chunks])
vector_scores = np.dot(embeddings, query_emb)
combined = 0.4 * self._normalize(bm25_scores) + 0.6 * self._normalize(vector_scores)
top_indices = np.argsort(combined)[::-1][:top_k]
return [self.chunks[i] for i in top_indices]Parallel Processing:
async def contextualize_parallel(document: str, chunks: list[str]) -> list[str]:
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def process_chunk(chunk: str) -> str:
async with semaphore:
context = await async_generate_context(document, chunk)
return f"{context}\n\n{chunk}"
return await asyncio.gather(*[process_chunk(c) for c in chunks])Incorrect — missing context generation and hybrid indexing:
def add_document(self, doc_id: str, text: str):
# No contextualization, no hybrid indexing
raw_chunks = self._chunk_text(text, 512)
embeddings = self.embed_model.embed(raw_chunks)
self.chunks.extend(embeddings)Correct — complete pipeline with contextualization and hybrid search:
def add_document(self, doc_id: str, text: str, chunk_size: int = 512):
raw_chunks = self._chunk_text(text, chunk_size)
contextualized = self._contextualize_batch(text, raw_chunks) # Add context
embeddings = self.embed_model.embed(contextualized) # Embed with context
for i, (raw, ctx, emb) in enumerate(zip(raw_chunks, contextualized, embeddings)):
self.chunks.append(ContextualChunk(
original=raw, contextualized=ctx, embedding=emb,
doc_id=doc_id, chunk_index=i
))
self._rebuild_bm25() # Hybrid BM25 + vectorKey rules:
- Use contextual retrieval when: documents have important metadata, chunks lose context, quality is critical
- Skip if: chunks are self-contained (Q&A pairs), low-latency indexing required, cost-sensitive with many small docs
- Parallel processing with semaphore (10 concurrent) for batch contextualization
- Prompt caching reduces cost by ~90% when processing many chunks from same document
Prepend situational context to chunks before embedding to reduce retrieval failures — HIGH
Contextual Retrieval — Context Prepending
Prepend situational context to chunks before embedding to preserve document-level meaning.
The Problem:
Original: "ACME Q3 2024 Earnings Report..."
Chunk: "Revenue increased 15% compared to the previous quarter."
Query: "What was ACME's Q3 2024 revenue growth?"
Result: Chunk doesn't mention "ACME" or "Q3 2024" — retrieval failsContext Generation:
import anthropic
client = anthropic.Anthropic()
CONTEXT_PROMPT = """
<document>
{document}
</document>
Here is the chunk we want to situate within the document:
<chunk>
{chunk}
</chunk>
Please give a short, succinct context (1-2 sentences) to situate this chunk
within the overall document. Focus on information that would help retrieval.
Answer only with the context, nothing else.
"""
def contextualize_chunk(document: str, chunk: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=150,
messages=[{"role": "user",
"content": CONTEXT_PROMPT.format(document=document, chunk=chunk)}]
)
return f"{response.content[0].text}\n\n{chunk}"With Prompt Caching (90% cost reduction):
def contextualize_cached(document: str, chunk: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=150,
messages=[{"role": "user", "content": [
{"type": "text", "text": f"<document>\n{document}\n</document>",
"cache_control": {"type": "ephemeral"}},
{"type": "text", "text": f"Situate this chunk (1-2 sentences):\n<chunk>\n{chunk}\n</chunk>"}
]}]
)
return f"{response.content[0].text}\n\n{chunk}"Incorrect — chunk without document context:
def index_chunk(chunk: str) -> str:
# Missing document context — retrieval will fail
embedding = embed_model.embed([chunk])[0]
return embeddingCorrect — prepend situational context before embedding:
def contextualize_chunk(document: str, chunk: str) -> str:
context = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=150,
messages=[{"role": "user", "content": [
{"type": "text", "text": f"<document>\n{document}\n</document>",
"cache_control": {"type": "ephemeral"}}, # Cache for 90% cost reduction
{"type": "text", "text": f"Situate this chunk (1-2 sentences):\n<chunk>\n{chunk}\n</chunk>"}
]}]
)
return f"{context.content[0].text}\n\n{chunk}" # Prepend contextKey rules:
- Good context: "This chunk is from ACME Corp's Q3 2024 earnings report, specifically the revenue section."
- Bad context: "This is a chunk from the document." (too generic)
- Context length: 1-2 sentences — enough without excessive token overhead
- Use prompt caching (ephemeral) for 90% cost reduction when processing many chunks from same doc
Construct basic RAG pipeline with proper context assembly and citation tracking — CRITICAL
Basic RAG Pattern
Retrieve relevant documents, construct context, and generate grounded responses with citations.
Basic RAG:
async def rag_query(question: str, top_k: int = 5) -> str:
"""Basic RAG: retrieve then generate."""
docs = await vector_db.search(question, limit=top_k)
context = "\n\n".join([
f"[{i+1}] {doc.text}"
for i, doc in enumerate(docs)
])
response = await llm.chat([
{"role": "system", "content":
"Answer using ONLY the provided context. "
"If not in context, say 'I don't have that information.'"},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
])
return response.contentRAG with Citations:
async def rag_with_citations(question: str) -> dict:
"""RAG with inline citations [1], [2], etc."""
docs = await vector_db.search(question, limit=5)
context = "\n\n".join([
f"[{i+1}] {doc.text}\nSource: {doc.metadata['source']}"
for i, doc in enumerate(docs)
])
response = await llm.chat([
{"role": "system", "content":
"Answer with inline citations like [1], [2]. "
"End with a Sources section."},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
])
return {
"answer": response.content,
"sources": [doc.metadata['source'] for doc in docs]
}Incorrect — no citations, no grounding constraint:
async def rag_query(question: str) -> str:
docs = await vector_db.search(question, limit=5)
context = "\n\n".join([doc.text for doc in docs]) # No citations
response = await llm.chat([
{"role": "user", "content": f"{context}\n\n{question}"} # No grounding instruction
])
return response.content # No source trackingCorrect — citations with grounding constraint:
async def rag_with_citations(question: str) -> dict:
docs = await vector_db.search(question, limit=5)
context = "\n\n".join([
f"[{i+1}] {doc.text}\nSource: {doc.metadata['source']}" # Numbered citations
for i, doc in enumerate(docs)
])
response = await llm.chat([
{"role": "system", "content": "Answer with inline citations like [1], [2]. Use ONLY the provided context."},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
])
return {"answer": response.content, "sources": [doc.metadata['source'] for doc in docs]}Key rules:
- Always include citation tracking (
[1],[2]) for verifiable answers - Set system prompt to constrain answers to retrieved context only
- Use top-k of 3-10 documents, temperature 0.1-0.3 for factual tasks
- Return sources alongside answers for transparency
Manage context window size with sufficiency checks to balance relevance and cost — HIGH
Context Window Management
Budget tokens for context and validate sufficiency before generation.
Token Budget Fitting:
def fit_context(docs: list, max_tokens: int = 6000) -> list:
"""Truncate context to fit token budget."""
total_tokens = 0
selected = []
for doc in docs:
doc_tokens = count_tokens(doc.text)
if total_tokens + doc_tokens > max_tokens:
break
selected.append(doc)
total_tokens += doc_tokens
return selectedSufficiency Check (Google Research 2025):
from pydantic import BaseModel
class SufficiencyCheck(BaseModel):
is_sufficient: bool
confidence: float # 0.0-1.0
missing_info: str | None = None
async def rag_with_sufficiency(question: str, top_k: int = 5) -> str:
"""RAG with hallucination prevention via sufficiency check."""
docs = await vector_db.search(question, limit=top_k)
context = "\n\n".join([f"[{i+1}] {doc.text}" for i, doc in enumerate(docs)])
check = await llm.with_structured_output(SufficiencyCheck).ainvoke(
f"Does this context contain sufficient information to answer?\n"
f"Question: {question}\nContext:\n{context}"
)
if not check.is_sufficient and check.confidence > 0.7:
return f"I don't have enough information. Missing: {check.missing_info}"
if not check.is_sufficient and check.confidence <= 0.7:
more_docs = await vector_db.search(question, limit=top_k * 2)
context = "\n\n".join([f"[{i+1}] {doc.text}" for i, doc in enumerate(more_docs)])
return await generate_with_context(question, context)Incorrect — no token budget or sufficiency check:
async def rag_query(question: str) -> str:
docs = await vector_db.search(question, limit=100) # No limit!
context = "\n\n".join([doc.text for doc in docs]) # Could exceed context window
return await generate_with_context(question, context) # No sufficiency checkCorrect — token budget with sufficiency validation:
async def rag_with_sufficiency(question: str, top_k: int = 5) -> str:
docs = await vector_db.search(question, limit=top_k)
fitted = fit_context(docs, max_tokens=6000) # Budget enforcement
context = "\n\n".join([f"[{i+1}] {doc.text}" for i, doc in enumerate(fitted)])
check = await llm.with_structured_output(SufficiencyCheck).ainvoke(
f"Does this context contain sufficient information?\nQuestion: {question}\nContext:\n{context}"
)
if not check.is_sufficient and check.confidence > 0.7:
return f"I don't have enough information. Missing: {check.missing_info}"
return await generate_with_context(question, context)Key rules:
- Keep context under 75% of model limit, reserve for system prompt + response
- Prioritize highest-relevance documents first
- Context budget: 4K-8K tokens typical for factual tasks
- RAG paradoxically increases hallucinations when context is insufficient — use sufficiency check
- Abstain when confidence > 0.7 and context is insufficient
Combine semantic and keyword search with reciprocal rank fusion for best coverage — HIGH
Hybrid Search (Semantic + Keyword)
Combine vector similarity and keyword matching using Reciprocal Rank Fusion for best coverage.
Reciprocal Rank Fusion:
def reciprocal_rank_fusion(
semantic_results: list,
keyword_results: list,
k: int = 60
) -> list:
"""Combine semantic and keyword search with RRF."""
scores = {}
for rank, doc in enumerate(semantic_results):
scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)
for rank, doc in enumerate(keyword_results):
scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)
ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [get_doc(id) for id in ranked_ids]Multi-list RRF (for query decomposition):
from collections import defaultdict
def multi_rrf(result_lists: list[list[dict]], k: int = 60) -> list[dict]:
"""Combine multiple ranked lists using RRF."""
scores: defaultdict[str, float] = defaultdict(float)
docs: dict[str, dict] = {}
for results in result_lists:
for rank, doc in enumerate(results, start=1):
doc_id = doc["id"]
scores[doc_id] += 1.0 / (k + rank)
docs[doc_id] = doc
ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [docs[doc_id] for doc_id in ranked_ids]Incorrect — no reciprocal rank fusion, just simple averaging:
def hybrid_search(query: str, top_k: int = 10) -> list:
semantic = vector_search(query, top_k)
keyword = bm25_search(query, top_k)
# Naive merge without RRF
return semantic[:5] + keyword[:5]Correct — proper RRF combination:
def reciprocal_rank_fusion(semantic_results: list, keyword_results: list, k: int = 60) -> list:
scores = {}
for rank, doc in enumerate(semantic_results):
scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)
for rank, doc in enumerate(keyword_results):
scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)
ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [get_doc(id) for id in ranked_ids]Key rules:
- Default weight split: 40% BM25 / 60% vector (Anthropic research optimal)
- RRF smoothing constant k=60 is the standard — robust and parameter-free
- Retrieve 3x the final top-k for better RRF coverage (e.g., top-30 for final top-10)
- Normalize scores before combining if not using RRF
Compose retrieval pipeline stages in correct order to avoid redundant processing — MEDIUM
Pipeline Composition
Compose retrieval techniques in the right order for optimal results.
Standard Pipeline:
Query → [Decompose?] → [HyDE?] → [Retrieve] → [Rerank] → [Context Fit] → [Generate]Composition Pattern:
class RAGPipeline:
"""Composable RAG pipeline with optional stages."""
def __init__(self, retriever, reranker=None, hyde_service=None, decomposer=None):
self.retriever = retriever
self.reranker = reranker
self.hyde = hyde_service
self.decomposer = decomposer
async def query(self, question: str, top_k: int = 10) -> list[dict]:
# Stage 1: Query enhancement (optional)
queries = [question]
if self.decomposer:
concepts = await self.decomposer.decompose(question)
if len(concepts) > 1:
queries = concepts
# Stage 2: Retrieve (with optional HyDE)
all_results = []
for q in queries:
if self.hyde:
hyde_result = await self.hyde.generate(q)
results = await self.retriever.search_by_embedding(hyde_result.embedding, top_k=top_k * 3)
else:
results = await self.retriever.search(q, top_k=top_k * 3)
all_results.append(results)
# Stage 3: Fuse if multiple queries
if len(all_results) > 1:
merged = reciprocal_rank_fusion(all_results)
else:
merged = all_results[0]
# Stage 4: Rerank (optional)
if self.reranker:
merged = await self.reranker.rerank(question, merged, top_k=top_k)
return merged[:top_k]Incorrect — monolithic retrieval without composition:
async def query(question: str) -> list[dict]:
# No optional stages, fixed pipeline
docs = await retriever.search(question, top_k=10)
return docsCorrect — composable pipeline with optional stages:
async def query(self, question: str, top_k: int = 10) -> list[dict]:
queries = [question]
if self.decomposer: # Optional decomposition
concepts = await self.decomposer.decompose(question)
if len(concepts) > 1:
queries = concepts
all_results = []
for q in queries:
if self.hyde: # Optional HyDE
hyde_result = await self.hyde.generate(q)
results = await self.retriever.search_by_embedding(hyde_result.embedding, top_k * 3)
else:
results = await self.retriever.search(q, top_k * 3)
all_results.append(results)
merged = reciprocal_rank_fusion(all_results) if len(all_results) > 1 else all_results[0]
if self.reranker: # Optional reranking
merged = await self.reranker.rerank(question, merged, top_k)
return merged[:top_k]Key rules:
- Compose: Decompose → HyDE → Retrieve → Rerank → Context Fit → Generate
- HyDE adds ~500ms latency; use with fallback timeout (2-3s)
- Reranking adds ~50-500ms; retrieve more (3x), rerank to final top-k
- Query decomposition only when heuristic detects multi-concept query
- Each stage is optional — start simple, add stages as needed
Implement production embedding pipelines with batching, caching, and cost optimization — MEDIUM
Advanced Embedding Patterns
Production patterns for embedding at scale.
Embedding Cache (Redis):
import hashlib
import json
import redis
class EmbeddingCache:
def __init__(self, redis_client: redis.Redis, ttl: int = 86400):
self.redis = redis_client
self.ttl = ttl
def _key(self, text: str, model: str) -> str:
h = hashlib.md5(f"{model}:{text}".encode()).hexdigest()
return f"emb:{h}"
async def get_or_embed(self, text: str, model: str, embed_fn) -> list[float]:
key = self._key(text, model)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
embedding = await embed_fn(text)
self.redis.setex(key, self.ttl, json.dumps(embedding))
return embeddingBatch Processing with Rate Limiting:
import asyncio
async def batch_embed(texts: list[str], embed_fn, batch_size: int = 100) -> list[list[float]]:
"""Embed texts in batches with rate limiting."""
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
embeddings = await embed_fn(batch)
results.extend(embeddings)
if i + batch_size < len(texts):
await asyncio.sleep(0.1) # Rate limit courtesy
return resultsMatryoshka Dimension Reduction:
# text-embedding-3 models support Matryoshka embeddings
# Truncate to fewer dimensions with minimal quality loss
response = client.embeddings.create(
model="text-embedding-3-large",
input="Your text",
dimensions=1536 # Reduce from 3072 to 1536 (saves 50% storage)
)Incorrect — no caching or batching, wasteful API calls:
async def embed_texts(texts: list[str]) -> list[list[float]]:
results = []
for text in texts: # One API call per text!
embedding = await client.embeddings.create(
model="text-embedding-3-large",
input=text
)
results.append(embedding.data[0].embedding)
return resultsCorrect — cached batching with rate limiting:
async def batch_embed(texts: list[str], batch_size: int = 100) -> list[list[float]]:
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
# Check cache first
cached_keys = [cache.get(text) for text in batch]
uncached = [t for t, c in zip(batch, cached_keys) if not c]
if uncached:
embeddings = await client.embeddings.create(
model="text-embedding-3-large",
input=uncached,
dimensions=1536 # Matryoshka reduction
)
for text, emb in zip(uncached, embeddings.data):
cache.set(text, emb.embedding) # Cache for reuse
results.extend([cached or cache.get(t) for t, cached in zip(batch, cached_keys)])
await asyncio.sleep(0.1) # Rate limiting
return resultsKey rules:
- Late Chunking: Embed full document, extract chunk vectors from contextualized tokens
- Cache aggressively — same text + model = same embedding, no need to recompute
- Batch size 100-500 per API call for optimal throughput
- Matryoshka: Truncate
text-embedding-3-largefrom 3072 to 1536 dims with ~2% quality loss - Rate limit: 0.1s delay between batches as courtesy to API providers
Choose chunking strategies carefully since chunk boundaries determine retrieval quality — HIGH
Chunking Strategies
Split documents into optimal chunks that preserve semantic meaning.
Basic Overlapping Chunks:
def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
"""Split text into overlapping chunks for embedding."""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = " ".join(words[i:i + chunk_size])
if chunk:
chunks.append(chunk)
return chunksSemantic Boundary Chunking (OrchestKit Standard):
CHUNK_CONFIG = {
"target_tokens": 500, # ~400-600 tokens per chunk
"max_tokens": 800, # Hard limit
"overlap_tokens": 75, # ~15% overlap
"boundary_markers": [ # Prefer splitting at:
"\n## ", # H2 headers
"\n### ", # H3 headers
"\n\n", # Paragraphs
". ", # Sentences (last resort)
]
}Sentence-Aware Chunking:
def chunk_by_sentences(text: str, chunk_size: int = 512) -> list[str]:
sentences = text.split('. ')
chunks, current, current_len = [], [], 0
for sent in sentences:
if current_len + len(sent) > chunk_size and current:
chunks.append('. '.join(current) + '.')
current, current_len = [sent], len(sent)
else:
current.append(sent)
current_len += len(sent)
if current:
chunks.append('. '.join(current))
return chunksIncorrect — fixed-size splits without overlap or semantic boundaries:
def chunk_text(text: str) -> list[str]:
# Arbitrary splits, no overlap, breaks mid-sentence
return [text[i:i+500] for i in range(0, len(text), 500)]Correct — semantic boundary chunking with overlap:
def chunk_by_sentences(text: str, chunk_size: int = 512, overlap: int = 75) -> list[str]:
sentences = text.split('. ')
chunks, current, current_len = [], [], 0
for sent in sentences:
if current_len + len(sent) > chunk_size and current:
chunk_text = '. '.join(current) + '.'
chunks.append(chunk_text)
# Keep last few sentences for overlap
overlap_sents = current[-2:] if len(current) > 2 else current
current, current_len = overlap_sents, sum(len(s) for s in overlap_sents)
else:
current.append(sent)
current_len += len(sent)
if current:
chunks.append('. '.join(current))
return chunksKey rules:
- Chunk size: 256-1024 tokens (512 typical sweet spot)
- Overlap: 10-20% for context continuity between chunks
- Include metadata (title, source, section) with each chunk
- Prefer semantic boundaries (headers, paragraphs) over fixed-size splits
- Not chunking long documents is a common mistake — context gets lost in embeddings
Select embedding models and dimensions correctly to ensure index compatibility and quality — HIGH
Embedding Models & API
Choose the right embedding model and use the API correctly.
Quick Start:
from openai import OpenAI
client = OpenAI()
# Single text embedding
response = client.embeddings.create(
model="text-embedding-3-small",
input="Your text here"
)
vector = response.data[0].embedding # 1536 dimensions
# Batch embedding (efficient)
texts = ["text1", "text2", "text3"]
response = client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
vectors = [item.embedding for item in response.data]Model Selection:
| Model | Dims | Cost | Use Case |
|---|---|---|---|
text-embedding-3-small | 1536 | $0.02/1M | General purpose |
text-embedding-3-large | 3072 | $0.13/1M | High accuracy |
nomic-embed-text (Ollama) | 768 | Free | Local/CI |
voyage-3 | 1024 | $0.06/1M | Production (OrchestKit) |
Similarity Calculation:
import numpy as np
def cosine_similarity(a: list[float], b: list[float]) -> float:
a, b = np.array(a), np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
# 1.0 = identical, 0.0 = orthogonalIncorrect — mixing different embedding models:
# Index with one model
docs_embeddings = client.embeddings.create(
model="text-embedding-3-large", # 3072 dims
input=documents
)
# Query with different model
query_embedding = client.embeddings.create(
model="text-embedding-3-small", # 1536 dims - MISMATCH!
input=query
)
# Results will be nonsensical due to dimension mismatchCorrect — consistent model for queries and documents:
MODEL = "text-embedding-3-small" # Use same model everywhere
# Index
docs_embeddings = client.embeddings.create(model=MODEL, input=documents)
# Query
query_embedding = client.embeddings.create(model=MODEL, input=query)
# Now cosine similarity is meaningful
similarity = cosine_similarity(query_embedding, docs_embeddings[0])Key rules:
- Embed queries and documents with the SAME model — never mix
- Dimension reduction: Can truncate
text-embedding-3-largeto 1536 dims (Matryoshka) - Batch size: 100-500 texts per API call for efficiency
- Cache embeddings — never re-embed unchanged content
- Most models return normalized vectors (cosine = dot product)
Configure HyDE fallback strategy to avoid latency degradation from slow generation — MEDIUM
HyDE Fallback Strategy
Implement graceful degradation when HyDE generation is too slow.
Timeout with Fallback:
async def hyde_with_fallback(
query: str,
hyde_service: HyDEService,
embed_fn: callable,
timeout: float = 3.0,
) -> list[float]:
"""HyDE with fallback to direct embedding on timeout."""
try:
async with asyncio.timeout(timeout):
result = await hyde_service.generate(query)
return result.embedding
except TimeoutError:
# Fallback to direct query embedding
return await embed_fn(query)Performance Tips:
- Use fast model (gpt-5.2-mini, claude-haiku-4-5) for generation
- Cache aggressively (queries often repeat)
- Set tight timeouts (2-3s) with fallback
- Keep hypothetical docs concise (100-200 tokens)
- Combine with query decomposition for best results
Incorrect — no timeout or fallback, blocking forever:
async def hyde_search(query: str) -> list[float]:
# No timeout! May hang indefinitely
result = await hyde_service.generate(query)
return result.embeddingCorrect — timeout with graceful fallback:
async def hyde_with_fallback(query: str, timeout: float = 3.0) -> list[float]:
try:
async with asyncio.timeout(timeout):
result = await hyde_service.generate(query)
return result.embedding
except TimeoutError:
# Fallback to direct query embedding
return await embed_fn(query)Key rules:
- Always implement timeout fallback — HyDE generation model may be slow or unavailable
- Default timeout: 2-3 seconds is the sweet spot (balances quality vs latency)
- Fallback to direct query embedding maintains functionality (just lower quality)
- Log fallback events to monitor HyDE generation reliability
Bridge query-document vocabulary mismatch with hypothetical document embeddings via HyDE — HIGH
HyDE Generation
Generate hypothetical answer documents to bridge vocabulary gaps in semantic search.
The Problem:
Query: "scaling async data pipelines"
Docs use: "event-driven messaging", "Apache Kafka", "message brokers"
-> Low similarity scores despite high relevanceThe Solution:
from openai import AsyncOpenAI
from pydantic import BaseModel
class HyDEResult(BaseModel):
original_query: str
hypothetical_doc: str
embedding: list[float]
async def generate_hyde(
query: str, llm: AsyncOpenAI, embed_fn: callable, max_tokens: int = 150
) -> HyDEResult:
"""Generate hypothetical document and embed it."""
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[
{"role": "system", "content":
"Write a short paragraph that would answer this query. "
"Use technical terminology that documentation would use."},
{"role": "user", "content": query}
],
max_tokens=max_tokens,
temperature=0.3,
)
hypothetical_doc = response.choices[0].message.content
embedding = await embed_fn(hypothetical_doc) # Embed the hypothetical doc, not the query
return HyDEResult(
original_query=query,
hypothetical_doc=hypothetical_doc,
embedding=embedding,
)When to use HyDE:
| Scenario | Use HyDE? |
|---|---|
| Abstract/conceptual queries | Yes |
| Exact term searches | No (use keyword) |
| Code snippet searches | No |
| Natural language questions | Yes |
| Vocabulary mismatch suspected | Yes |
Incorrect — embedding the query instead of hypothetical document:
async def generate_hyde(query: str) -> HyDEResult:
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[{"role": "user", "content": query}],
max_tokens=150
)
hypothetical_doc = response.choices[0].message.content
embedding = await embed_fn(query) # WRONG: Embeds query, not hypothetical doc!
return HyDEResult(query, hypothetical_doc, embedding)Correct — embed the hypothetical document:
async def generate_hyde(query: str) -> HyDEResult:
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[
{"role": "system", "content": "Write a short paragraph that would answer this query."},
{"role": "user", "content": query}
],
max_tokens=150,
temperature=0.3
)
hypothetical_doc = response.choices[0].message.content
embedding = await embed_fn(hypothetical_doc) # Embed the hypothetical doc!
return HyDEResult(query, hypothetical_doc, embedding)Key rules:
- Embed the hypothetical document, NOT the original query
- Use fast/cheap model (gpt-5.2-mini, claude-haiku-4-5) for generation
- Temperature 0.3 for consistent, factual hypothetical docs
- Keep hypothetical docs concise: 100-200 tokens
- Adds ~500ms latency — always implement with timeout fallback
Generate separate HyDE documents per concept for multi-topic vocabulary bridging — MEDIUM
Per-Concept HyDE
Generate HyDE embeddings for each concept in multi-concept queries.
Per-Concept Pattern:
async def batch_hyde(
concepts: list[str], hyde_service: HyDEService
) -> list[HyDEResult]:
"""Generate HyDE embeddings for multiple concepts in parallel."""
tasks = [hyde_service.generate(concept) for concept in concepts]
return await asyncio.gather(*tasks)With Caching:
class HyDEService:
def __init__(self, llm, embed_fn):
self.llm = llm
self.embed_fn = embed_fn
self._cache: dict[str, HyDEResult] = {}
def _cache_key(self, query: str) -> str:
return hashlib.md5(query.lower().strip().encode()).hexdigest()
async def generate(self, query: str) -> HyDEResult:
key = self._cache_key(query)
if key in self._cache:
return self._cache[key]
result = await generate_hyde(query, self.llm, self.embed_fn)
self._cache[key] = result
return resultIncorrect — sequential HyDE generation, slow:
async def batch_hyde(concepts: list[str]) -> list[HyDEResult]:
results = []
for concept in concepts: # Sequential! Slow for many concepts
result = await hyde_service.generate(concept)
results.append(result)
return resultsCorrect — parallel HyDE generation:
async def batch_hyde(concepts: list[str]) -> list[HyDEResult]:
# Parallel generation for all concepts simultaneously
tasks = [hyde_service.generate(concept) for concept in concepts]
return await asyncio.gather(*tasks)Key rules:
- For multi-concept queries, decompose first then generate HyDE per concept
- Cache aggressively — queries often repeat
- Parallel generation with asyncio.gather for all concepts simultaneously
- Combine with query decomposition for best results on complex queries
Chunk multimodal documents to preserve relationships between text, images, and tables — MEDIUM
Multimodal Document Chunking
Chunk PDFs preserving images, tables, and text relationships.
Multimodal Chunks:
from dataclasses import dataclass
from typing import Literal, Optional
@dataclass
class Chunk:
content: str
chunk_type: Literal["text", "image", "table", "chart"]
page: int
image_path: Optional[str] = None
embedding: Optional[list[float]] = None
def chunk_multimodal_document(pdf_path: str) -> list[Chunk]:
import fitz # PyMuPDF
doc = fitz.open(pdf_path)
chunks = []
for page_num, page in enumerate(doc):
text_blocks = page.get_text("blocks")
current_text = ""
for block in text_blocks:
if block[6] == 0: # Text block
current_text += block[4] + "\n"
else: # Image block
if current_text.strip():
chunks.append(Chunk(content=current_text.strip(), chunk_type="text", page=page_num))
current_text = ""
xref = block[7]
img = doc.extract_image(xref)
img_path = f"/tmp/page{page_num}_img{xref}.{img['ext']}"
with open(img_path, "wb") as f:
f.write(img["image"])
caption = generate_image_caption(img_path)
chunks.append(Chunk(content=caption, chunk_type="image", page=page_num, image_path=img_path))
if current_text.strip():
chunks.append(Chunk(content=current_text.strip(), chunk_type="text", page=page_num))
return chunksIncorrect — text-only chunking, loses images and tables:
def chunk_pdf(pdf_path: str) -> list[str]:
import fitz
doc = fitz.open(pdf_path)
chunks = []
for page in doc:
chunks.append(page.get_text()) # Text only, images lost!
return chunksCorrect — multimodal chunking with images and captions:
def chunk_multimodal_document(pdf_path: str) -> list[Chunk]:
import fitz
doc = fitz.open(pdf_path)
chunks = []
for page_num, page in enumerate(doc):
text_blocks = page.get_text("blocks")
for block in text_blocks:
if block[6] == 0: # Text block
chunks.append(Chunk(content=block[4], chunk_type="text", page=page_num))
else: # Image block
xref = block[7]
img = doc.extract_image(xref)
img_path = f"/tmp/page{page_num}_img{xref}.{img['ext']}"
with open(img_path, "wb") as f:
f.write(img["image"])
caption = generate_image_caption(img_path)
chunks.append(Chunk(content=caption, chunk_type="image", page=page_num, image_path=img_path))
return chunksKey rules:
- Extract images separately and generate captions for text-based search
- Preserve page numbers for citation and navigation
- Use PyMuPDF (fitz) for reliable PDF extraction
- Process large PDFs in page-range batches (CC 2.1.30: max 20 pages per Read)
- Always store image paths alongside embeddings for result display
Use multimodal embedding models for cross-modal search across text and images — HIGH
Multimodal Embeddings
Embed images and text in the same vector space for cross-modal retrieval.
Model Selection:
| Model | Context | Modalities | Best For |
|---|---|---|---|
| Voyage multimodal-3 | 32K tokens | Text + Image | Long documents |
| SigLIP 2 | Standard | Text + Image | Large-scale retrieval |
| CLIP ViT-L/14 | 77 tokens | Text + Image | General purpose |
| ImageBind | Standard | 6 modalities | Audio/video included |
| ColPali | Document | Text + Image | PDF/document RAG |
CLIP Embeddings:
from transformers import CLIPProcessor, CLIPModel
import torch
model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")
def embed_image(image_path: str) -> list[float]:
image = Image.open(image_path)
inputs = processor(images=image, return_tensors="pt")
with torch.no_grad():
embeddings = model.get_image_features(**inputs)
embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True)
return embeddings[0].tolist()
def embed_text(text: str) -> list[float]:
inputs = processor(text=[text], return_tensors="pt", padding=True)
with torch.no_grad():
embeddings = model.get_text_features(**inputs)
embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True)
return embeddings[0].tolist()Voyage Multimodal-3 (Long Context):
import voyageai
client = voyageai.Client()
def embed_multimodal(texts=None, images=None) -> list[list[float]]:
inputs = []
if texts:
inputs.extend([{"type": "text", "content": t} for t in texts])
if images:
for path in images:
with open(path, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
inputs.append({"type": "image", "content": f"data:image/png;base64,{b64}"})
return client.multimodal_embed(inputs=inputs, model="voyage-multimodal-3").embeddingsIncorrect — using text-only embeddings for images:
def embed_image(image_path: str) -> list[float]:
# Using text embedding model for images - wrong modality!
caption = generate_caption(image_path)
return text_embed_model.embed([caption])[0] # Loses visual featuresCorrect — multimodal embeddings for cross-modal search:
def embed_image(image_path: str) -> list[float]:
image = Image.open(image_path)
inputs = processor(images=image, return_tensors="pt")
with torch.no_grad():
embeddings = model.get_image_features(**inputs)
embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True) # Normalize
return embeddings[0].tolist()Key rules:
- Normalize embeddings for cosine similarity (CLIP already normalized)
- Voyage multimodal-3 for long documents (32K context)
- SigLIP 2 for large-scale production retrieval
- Always embed both images AND captions for maximum coverage
Build unified multimodal RAG pipeline that merges cross-modal results with deduplication — MEDIUM
Multimodal RAG Pipeline
Build end-to-end multimodal retrieval and generation pipeline.
Hybrid Retrieval:
class MultimodalRAG:
def __init__(self, vector_db, vision_model, text_model):
self.vector_db = vector_db
self.vision_model = vision_model
self.text_model = text_model
async def retrieve(self, query: str, query_image: str = None, top_k: int = 10) -> list[dict]:
results = []
text_emb = embed_text(query)
text_results = await self.vector_db.search(embedding=text_emb, top_k=top_k)
results.extend(text_results)
if query_image:
img_emb = embed_image(query_image)
img_results = await self.vector_db.search(embedding=img_emb, top_k=top_k)
results.extend(img_results)
# Dedupe by doc_id, keep highest score
seen = {}
for r in results:
doc_id = r["metadata"]["doc_id"]
if doc_id not in seen or r["score"] > seen[doc_id]["score"]:
seen[doc_id] = r
return sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:top_k]Multimodal Generation:
async def generate_with_context(query: str, chunks: list[Chunk], model: str = "claude-opus-4-6") -> str:
content = []
# Add images first (attention positioning)
for chunk in chunks:
if chunk.chunk_type == "image" and chunk.image_path:
b64, media_type = encode_image_base64(chunk.image_path)
content.append({"type": "image", "source": {"type": "base64", "media_type": media_type, "data": b64}})
# Add text context
text_context = "\n\n".join([f"[Page {c.page}]: {c.content}" for c in chunks if c.chunk_type == "text"])
content.append({"type": "text", "text": f"Context:\n{text_context}\n\nQuestion: {query}"})
response = client.messages.create(model=model, max_tokens=4096, messages=[{"role": "user", "content": content}])
return response.content[0].textIncorrect — no deduplication, fragmented results:
async def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
text_emb = embed_text(query)
text_results = await self.vector_db.search(embedding=text_emb, top_k=top_k)
img_emb = embed_image(query_image)
img_results = await self.vector_db.search(embedding=img_emb, top_k=top_k)
# No deduplication! Same doc may appear twice
return text_results + img_resultsCorrect — deduplicated cross-modal results:
async def retrieve(self, query: str, query_image: str = None, top_k: int = 10) -> list[dict]:
results = []
text_emb = embed_text(query)
text_results = await self.vector_db.search(embedding=text_emb, top_k=top_k)
results.extend(text_results)
if query_image:
img_emb = embed_image(query_image)
img_results = await self.vector_db.search(embedding=img_emb, top_k=top_k)
results.extend(img_results)
# Dedupe by doc_id, keep highest score
seen = {}
for r in results:
doc_id = r["metadata"]["doc_id"]
if doc_id not in seen or r["score"] > seen[doc_id]["score"]:
seen[doc_id] = r
return sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:top_k]Key rules:
- Deduplicate by document ID — keep highest scoring result per document
- Place images before text in generation prompt (attention positioning)
- Always embed both image features AND text captions for maximum coverage
- Use hybrid approach: CLIP + text embeddings for best accuracy
- Missing image URL storage is a common mistake — always store paths for display
Implement PGVector hybrid search with FULL OUTER JOIN and RRF fusion ranking — HIGH
PGVector Hybrid Search (SQL)
Hybrid vector+keyword search with RRF fusion in SQLAlchemy.
Hybrid Search Query:
async def hybrid_search(query: str, query_embedding: list[float], top_k: int = 10) -> list[Chunk]:
FETCH_MULTIPLIER = 3
K = 60 # RRF smoothing constant
# Vector search subquery
vector_subq = (
select(Chunk.id,
func.row_number().over(
order_by=Chunk.embedding.cosine_distance(query_embedding)
).label("vector_rank"))
.limit(top_k * FETCH_MULTIPLIER)
.subquery()
)
# Keyword search subquery
ts_query = func.plainto_tsquery("english", query)
keyword_subq = (
select(Chunk.id,
func.row_number().over(
order_by=func.ts_rank_cd(Chunk.content_tsvector, ts_query).desc()
).label("keyword_rank"))
.where(Chunk.content_tsvector.op("@@")(ts_query))
.limit(top_k * FETCH_MULTIPLIER)
.subquery()
)
# RRF fusion with FULL OUTER JOIN
rrf_subq = (
select(
func.coalesce(vector_subq.c.id, keyword_subq.c.id).label("chunk_id"),
(func.coalesce(1.0 / (K + vector_subq.c.vector_rank), 0.0) +
func.coalesce(1.0 / (K + keyword_subq.c.keyword_rank), 0.0)
).label("rrf_score"))
.select_from(vector_subq.outerjoin(keyword_subq, ..., full=True))
.order_by("rrf_score DESC")
.limit(top_k)
.subquery()
)
return await session.execute(
select(Chunk).join(rrf_subq, Chunk.id == rrf_subq.c.chunk_id)
)RRF Formula:
rrf_score = 1/(k + vector_rank) + 1/(k + keyword_rank) # k=60 (standard)Incorrect — separate queries without RRF fusion:
async def hybrid_search(query: str, embedding: list[float], top_k: int = 10):
# Separate queries, no fusion!
vector_results = await session.execute(
select(Chunk).order_by(Chunk.embedding.cosine_distance(embedding)).limit(top_k)
)
keyword_results = await session.execute(
select(Chunk).where(Chunk.content_tsvector.op("@@")(plainto_tsquery(query))).limit(top_k)
)
# Naive merge, no RRF
return list(vector_results) + list(keyword_results)Correct — RRF fusion with FULL OUTER JOIN:
async def hybrid_search(query: str, query_embedding: list[float], top_k: int = 10):
K = 60 # RRF smoothing constant
# Vector search subquery
vector_subq = select(Chunk.id, func.row_number().over(
order_by=Chunk.embedding.cosine_distance(query_embedding)
).label("vector_rank")).limit(top_k * 3).subquery()
# Keyword search subquery
keyword_subq = select(Chunk.id, func.row_number().over(
order_by=func.ts_rank_cd(Chunk.content_tsvector, plainto_tsquery(query)).desc()
).label("keyword_rank")).limit(top_k * 3).subquery()
# RRF fusion with FULL OUTER JOIN
rrf_subq = select(
func.coalesce(vector_subq.c.id, keyword_subq.c.id).label("chunk_id"),
(func.coalesce(1.0 / (K + vector_subq.c.vector_rank), 0.0) +
func.coalesce(1.0 / (K + keyword_subq.c.keyword_rank), 0.0)).label("rrf_score")
).select_from(vector_subq.outerjoin(keyword_subq, ..., full=True)).order_by("rrf_score DESC").limit(top_k)
return await session.execute(select(Chunk).join(rrf_subq, Chunk.id == rrf_subq.c.chunk_id))Key rules:
- Use FULL OUTER JOIN to catch docs found by only one search method
- 3x fetch multiplier for better RRF coverage (30 per method for top 10 final)
- RRF smoothing constant k=60 is the standard
- Use
func.coalesce(..., 0.0)for documents found by only one method plainto_tsqueryfor user queries (handles multi-word safely)
Choose correct PGVector index type to avoid 17x slower queries in production — HIGH
PGVector Index Strategies
Choose and configure the right vector index for your workload.
Index Comparison:
| Metric | IVFFlat | HNSW |
|---|---|---|
| Query speed | 50ms | 3ms (17x faster) |
| Index time | 2 min | 20 min |
| Best for | < 100k vectors | 100k+ vectors |
| Recall@10 | 0.85-0.95 | 0.95-0.99 |
HNSW Configuration (Recommended):
-- Create HNSW index
CREATE INDEX idx_chunks_embedding_hnsw ON chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Query-time tuning
SET hnsw.ef_search = 40; -- Higher = better recall, slower
-- Iterative scan for filtered queries (pgvector 0.8+)
SET hnsw.iterative_scan = 'relaxed_order';Search Type Comparison:
| Aspect | Semantic (Vector) | Keyword (BM25) |
|---|---|---|
| Query | Embedding similarity | Exact word matches |
| Strengths | Synonyms, concepts | Exact phrases, rare terms |
| Weaknesses | Exact matches, technical terms | No semantic understanding |
| Index | HNSW (pgvector) | GIN (tsvector) |
Incorrect — no index, sequential scan on every query:
-- No index! Sequential scan is 17x slower
SELECT * FROM chunks
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;Correct — HNSW index for fast queries:
-- Create HNSW index
CREATE INDEX idx_chunks_embedding_hnsw ON chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Query-time tuning
SET hnsw.ef_search = 40; -- Higher = better recall
-- Now queries are 17x faster
SELECT * FROM chunks
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;Key rules:
- Use HNSW for production (scales to millions, 17x faster queries)
- IVFFlat only for >1000 queries/sec where index build time matters
- m=16, ef_construction=64 are good defaults for most workloads
- Set
hnsw.ef_search = 40at query time for production recall - Use
iterative_scan = 'relaxed_order'for filtered vector queries
Filter PGVector results by metadata and boost scores for improved retrieval relevance — MEDIUM
PGVector Metadata Filtering & Patterns
Filter and boost search results using metadata.
Filtered Search:
results = await hybrid_search(
query="binary search",
query_embedding=embedding,
content_type_filter=["code_block"]
)Similarity Threshold:
results = await hybrid_search(query, embedding, top_k=50)
filtered = [r for r in results if (1 - r.vector_distance) >= 0.75][:10]Multi-Query Retrieval:
queries = ["machine learning", "ML algorithms", "neural networks"]
all_results = [await hybrid_search(q, embed(q)) for q in queries]
final = deduplicate_and_rerank(all_results)Redis 8 FT.HYBRID Alternative:
| Aspect | pgvector | Redis 8 FT.HYBRID |
|---|---|---|
| Setup | Medium | Low |
| RRF | Manual SQL | Native COMBINE RRF |
| Latency | 5-20ms | 2-5ms |
| Persistence | ACID | AOF/RDB |
| Max dataset | Billions | Memory-bound (~100M) |
Incorrect — no metadata filtering, irrelevant results:
# Returns all content types mixed together
results = await session.execute(
select(Chunk).order_by(Chunk.embedding.cosine_distance(embedding)).limit(10)
)Correct — filtered search with similarity threshold:
# Filter by content_type and similarity threshold
results = await session.execute(
select(Chunk)
.where(Chunk.content_type == "code_block") # Pre-filter
.order_by(Chunk.embedding.cosine_distance(embedding))
.limit(50)
)
# Apply similarity threshold
filtered = [r for r in results if (1 - r.vector_distance) >= 0.75][:10]Key rules:
- Metadata boosting (title/path matching) adds +6% MRR
- Pre-filter by content_type for targeted search
- Similarity threshold 0.75 is a good default for filtering low-relevance results
- Choose pgvector for: ACID, complex joins, large datasets, existing PostgreSQL
- Choose Redis 8 for: sub-5ms latency, caching layer, simpler deployment
Design PGVector schema with pre-computed tsvector columns and proper index configuration — HIGH
PGVector Database Schema
Production schema with pre-computed tsvector and HNSW index.
Schema:
CREATE TABLE chunks (
id UUID PRIMARY KEY,
document_id UUID REFERENCES documents(id),
content TEXT NOT NULL,
embedding vector(1024), -- PGVector
content_tsvector tsvector GENERATED ALWAYS AS (
to_tsvector('english', content)
) STORED,
section_title TEXT,
content_type TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
-- HNSW index for vector search
CREATE INDEX idx_chunks_embedding ON chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- GIN index for keyword search
CREATE INDEX idx_chunks_content_tsvector ON chunks
USING gin (content_tsvector);Incorrect — computing tsvector at query time, slow:
CREATE TABLE chunks (
id UUID PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1024)
);
-- Slow query: computes tsvector every time!
SELECT * FROM chunks
WHERE to_tsvector('english', content) @@ plainto_tsquery('search query');Correct — pre-computed tsvector as GENERATED column:
CREATE TABLE chunks (
id UUID PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1024),
content_tsvector tsvector GENERATED ALWAYS AS (
to_tsvector('english', content)
) STORED -- Pre-computed, 5-10x faster
);
-- GIN index for fast keyword search
CREATE INDEX idx_chunks_content_tsvector ON chunks USING gin (content_tsvector);
-- HNSW index for fast vector search
CREATE INDEX idx_chunks_embedding ON chunks
USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64);
-- Fast query using pre-computed tsvector
SELECT * FROM chunks WHERE content_tsvector @@ plainto_tsquery('search query');Key rules:
- Pre-compute tsvector as GENERATED column — 5-10x faster than
to_tsvector()at query time - Use
vector(1024)for Voyage-3 embeddings (match your model dimension) - HNSW index with m=16, ef_construction=64 for production workloads
- Always include document_id FK for document-level operations
- Include content_type for filtered search (code, text, table)
Decompose complex multi-topic queries with parallel retrieval and RRF fusion — HIGH
Query Decomposition + RRF Fusion
Break complex queries into concepts, retrieve separately, fuse with RRF.
LLM Decomposition:
from pydantic import BaseModel, Field
class ConceptExtraction(BaseModel):
concepts: list[str] = Field(..., min_length=1, max_length=5)
reasoning: str | None = None
async def decompose_query(query: str, llm: AsyncOpenAI) -> list[str]:
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[
{"role": "system", "content":
"Extract 2-4 independent concepts from this query. "
"Each concept should be searchable on its own."},
{"role": "user", "content": query}
],
response_format={"type": "json_object"},
temperature=0,
)
result = ConceptExtraction.model_validate_json(response.choices[0].message.content)
return result.conceptsDecomposed Search with RRF:
async def decomposed_search(query: str, search_fn, llm, top_k: int = 10) -> list[dict]:
if not is_multi_concept_heuristic(query):
return await search_fn(query, limit=top_k)
concepts = await decompose_query(query, llm)
if len(concepts) <= 1:
return await search_fn(query, limit=top_k)
# Parallel retrieval per concept
tasks = [search_fn(concept, limit=top_k) for concept in concepts]
results_per_concept = await asyncio.gather(*tasks)
return reciprocal_rank_fusion(results_per_concept, k=60)[:top_k]Incorrect — no decomposition, single query for complex topics:
async def search(query: str, top_k: int = 10) -> list[dict]:
# "How does authentication affect database performance?"
# Single query misses one of the two concepts
return await vector_search(query, limit=top_k)Correct — decompose and fuse with RRF:
async def decomposed_search(query: str, top_k: int = 10) -> list[dict]:
if not is_multi_concept_heuristic(query):
return await search_fn(query, limit=top_k) # Fast path
# Decompose: "authentication" + "database performance"
concepts = await decompose_query(query, llm)
if len(concepts) <= 1:
return await search_fn(query, limit=top_k)
# Parallel retrieval per concept
tasks = [search_fn(concept, limit=top_k) for concept in concepts]
results_per_concept = await asyncio.gather(*tasks)
# Fuse with RRF
return reciprocal_rank_fusion(results_per_concept, k=60)[:top_k]Key rules:
- Max 2-4 concepts per query (more increases latency without proportional benefit)
- Use gpt-5.2-mini for decomposition (fast, cheap, good at concept extraction)
- RRF fusion is robust and parameter-free for combining per-concept results
- Cache decomposition results — same query often asked repeatedly
- Set timeout with fallback to original query if decomposition fails
Detect multi-concept queries with heuristic fast-path to avoid unnecessary LLM decomposition — MEDIUM
Multi-Concept Query Detection
Fast heuristic to determine if query decomposition is needed.
Heuristic Detection (Fast Path):
MULTI_CONCEPT_INDICATORS = [
" vs ", " versus ", " compared to ", " or ",
" and ", " with ", " affect ", " impact ",
"difference between", "relationship between",
]
def is_multi_concept_heuristic(query: str) -> bool:
"""Fast check for multi-concept indicators (<1ms)."""
query_lower = query.lower()
return any(ind in query_lower for ind in MULTI_CONCEPT_INDICATORS)When to Decompose:
| Query Type | Decompose? |
|---|---|
| "What is X?" | No |
| "X vs Y" | Yes |
| "How does X affect Y?" | Yes |
| "Best practices for X" | No |
| "X and Y in Z" | Yes |
| "Difference between X, Y, Z" | Yes |
Incorrect — always decomposing, even for simple queries:
async def search(query: str, top_k: int = 10) -> list[dict]:
# Always calls LLM for decomposition, even for "What is React?"
concepts = await decompose_query(query, llm) # Wasteful!
tasks = [search_fn(concept, limit=top_k) for concept in concepts]
return await asyncio.gather(*tasks)Correct — heuristic fast path before LLM decomposition:
async def search(query: str, top_k: int = 10) -> list[dict]:
if not is_multi_concept_heuristic(query): # Sub-millisecond check
return await search_fn(query, limit=top_k) # Fast path
# Only call LLM if heuristic detects multi-concept
concepts = await decompose_query(query, llm)
tasks = [search_fn(concept, limit=top_k) for concept in concepts]
return await asyncio.gather(*tasks)
def is_multi_concept_heuristic(query: str) -> bool:
query_lower = query.lower()
return any(ind in query_lower for ind in [" vs ", " and ", "difference between"])Key rules:
- Heuristic first (sub-millisecond), LLM decomposition only if heuristic triggers
- Single-concept queries should skip decomposition entirely (no LLM cost)
- Keywords: "vs", "compared to", "affect", "difference between" indicate multi-concept
- This is the fast path — always check before calling the LLM decomposer
Combine query decomposition with HyDE for comprehensive vocabulary-bridged retrieval coverage — MEDIUM
Decomposition + HyDE Combo
Best of both: decompose into concepts, then generate HyDE for each concept.
Combined Pattern:
async def decomposed_hyde_search(
query: str,
decomposer: QueryDecomposer,
hyde_service: HyDEService,
vector_search: callable,
top_k: int = 10,
) -> list[dict]:
"""Decomposition + HyDE for maximum coverage."""
# Decompose query into concepts
concepts = await decomposer.get_concepts(query)
# Generate HyDE for each concept in parallel
hyde_results = await asyncio.gather(*[
hyde_service.generate(concept) for concept in concepts
])
# Search with HyDE embeddings
search_tasks = [
vector_search(embedding=hr.embedding, limit=top_k)
for hr in hyde_results
]
results_per_concept = await asyncio.gather(*search_tasks)
# Fuse results with RRF
return reciprocal_rank_fusion(results_per_concept)[:top_k]Incorrect — sequential decomposition and HyDE, slow:
async def search(query: str, top_k: int = 10) -> list[dict]:
concepts = await decompose_query(query, llm)
all_results = []
for concept in concepts: # Sequential! Slow
hyde_result = await hyde_service.generate(concept)
results = await vector_search(embedding=hyde_result.embedding, limit=top_k)
all_results.append(results)
return reciprocal_rank_fusion(all_results)[:top_k]Correct — parallel HyDE generation and search:
async def decomposed_hyde_search(query: str, top_k: int = 10) -> list[dict]:
concepts = await decomposer.get_concepts(query)
# Generate HyDE for each concept in parallel
hyde_results = await asyncio.gather(*[
hyde_service.generate(concept) for concept in concepts
])
# Search with HyDE embeddings in parallel
search_tasks = [
vector_search(embedding=hr.embedding, limit=top_k) for hr in hyde_results
]
results_per_concept = await asyncio.gather(*search_tasks)
# Fuse with RRF
return reciprocal_rank_fusion(results_per_concept)[:top_k]Key rules:
- Use this combo for complex queries with both multi-concept AND vocabulary mismatch
- Decompose first, then HyDE per concept, then parallel search, then RRF fuse
- Total latency: ~1-2s (decomposition + HyDE generation + parallel search)
- Cache both decomposition and HyDE results for efficiency
- This is the most expensive retrieval path — use only when simpler methods fail
Combine base, LLM, and recency scores for robust multi-signal reranking — MEDIUM
Combined Scoring & Reranking Service
Combine multiple scoring signals with weighted average and timeout fallback.
Combined Scoring:
def combined_rerank(
documents: list[dict], llm_scores: dict[str, float],
alpha: float = 0.3, beta: float = 0.5, gamma: float = 0.2
) -> list[dict]:
scored = []
for doc in documents:
base = doc.get("score", 0.5)
llm = llm_scores.get(doc["id"], 0.5)
recency = calculate_recency_score(doc.get("created_at"))
final = (alpha * base) + (beta * llm) + (gamma * recency)
scored.append({**doc, "score": final,
"score_components": {"base": base, "llm": llm, "recency": recency}})
scored.sort(key=lambda x: x["score"], reverse=True)
return scoredService with Timeout Fallback:
class ReRankingService:
def __init__(self, llm: AsyncOpenAI, timeout_seconds: float = 5.0):
self.llm = llm
self.timeout = timeout_seconds
async def rerank(self, query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
if len(documents) <= top_k:
return documents
try:
async with asyncio.timeout(self.timeout):
return await llm_rerank(query, documents, self.llm, top_k)
except TimeoutError:
return sorted(documents, key=lambda x: x.get("score", 0), reverse=True)[:top_k]Incorrect — single scoring signal without timeout:
async def rerank(query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
# Only uses LLM score, no timeout, no fallback
llm_scores = await llm_rerank(query, documents, llm) # May hang!
return sorted(documents, key=lambda x: llm_scores.get(x["id"], 0), reverse=True)[:top_k]Correct — combined scoring with timeout fallback:
async def rerank(query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
if len(documents) <= top_k:
return documents # Skip if no benefit
try:
async with asyncio.timeout(5.0): # 5s timeout
llm_scores = await llm_rerank(query, documents, llm)
# Combined scoring: 30% base + 50% LLM + 20% recency
scored = []
for doc in documents:
base = doc.get("score", 0.5)
llm = llm_scores.get(doc["id"], 0.5)
recency = calculate_recency_score(doc.get("created_at"))
final = 0.3 * base + 0.5 * llm + 0.2 * recency
scored.append({**doc, "score": final})
return sorted(scored, key=lambda x: x["score"], reverse=True)[:top_k]
except TimeoutError:
# Fallback to base ranking
return sorted(documents, key=lambda x: x.get("score", 0), reverse=True)[:top_k]Key rules:
- Default weights: 30% base retrieval + 50% LLM score + 20% recency
- Always set timeout (5s) with fallback to base ranking
- Skip reranking if document count <= top_k (no benefit)
- Cache scores: same query+doc pair = same score
- Store score components for debugging and tuning
Rerank results with cross-encoder models for accurate query-document relevance scoring — HIGH
Cross-Encoder Reranking
Re-score retrieved documents with cross-encoder for higher precision.
Cross-Encoder Pattern:
from sentence_transformers import CrossEncoder
class CrossEncoderReranker:
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
pairs = [(query, doc["content"]) for doc in documents]
scores = self.model.predict(pairs)
scored_docs = list(zip(documents, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
return [{**doc, "score": float(score)} for doc, score in scored_docs[:top_k]]Model Selection:
| Model | Latency | Cost | Quality |
|---|---|---|---|
cross-encoder/ms-marco-MiniLM-L-6-v2 | ~50ms | Free | Good |
BAAI/bge-reranker-large | ~100ms | Free | Better |
cohere rerank-english-v3.0 | ~200ms | $1/1K | Best |
Incorrect — retrieving few, no reranking:
async def search(query: str) -> list[dict]:
# Retrieve only 10, no reranking - misses good results
return await vector_search(query, limit=10)Correct — retrieve many, rerank to few:
async def search_with_reranking(query: str) -> list[dict]:
# Retrieve many candidates
candidates = await vector_search(query, limit=50)
# Rerank with cross-encoder
pairs = [(query, doc["content"][:400]) for doc in candidates]
scores = cross_encoder.predict(pairs)
scored_docs = list(zip(candidates, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
# Return top 10 after reranking
return [{**doc, "score": float(score)} for doc, score in scored_docs[:10]]Key rules:
- Retrieve many (50-100), rerank to few (10) — "retrieve more, rerank less"
- Cross-encoder processes query+doc pair together (slow but accurate)
- Default model:
ms-marco-MiniLM-L-6-v2(good quality, free, ~50ms) - Truncate document content to 200-400 chars for reranking efficiency
Use LLM reranking for domain-adaptive scoring without deploying a dedicated model — MEDIUM
LLM Reranking
Score document relevance using LLM in a single batch call.
LLM Batch Reranking:
async def llm_rerank(query: str, documents: list[dict], llm: AsyncOpenAI, top_k: int = 10) -> list[dict]:
docs_text = "\n\n".join([f"[Doc {i+1}]\n{doc['content'][:300]}..." for i, doc in enumerate(documents)])
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[
{"role": "system", "content": "Rate each document's relevance to the query (0.0-1.0).\nOutput one score per line."},
{"role": "user", "content": f"Query: {query}\n\nDocuments:\n{docs_text}"}
],
temperature=0,
)
scores = parse_scores(response.choices[0].message.content, len(documents))
scored_docs = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
return [{**doc, "score": score} for doc, score in scored_docs[:top_k]]
def parse_scores(response: str, expected_count: int) -> list[float]:
scores = []
for line in response.strip().split("\n"):
try:
scores.append(max(0.0, min(1.0, float(line.strip()))))
except ValueError:
scores.append(0.5)
while len(scores) < expected_count:
scores.append(0.5)
return scores[:expected_count]Cohere Rerank API:
import cohere
class CohereReranker:
def __init__(self, api_key: str):
self.client = cohere.Client(api_key)
def rerank(self, query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
results = self.client.rerank(
model="rerank-english-v3.0", query=query,
documents=[doc["content"] for doc in documents], top_n=top_k
)
return [{**documents[r.index], "score": r.relevance_score} for r in results.results]Incorrect — one LLM call per document, extremely slow:
async def llm_rerank(query: str, documents: list[dict]) -> list[dict]:
scores = []
for doc in documents: # Sequential LLM calls!
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[{"role": "user", "content": f"Rate relevance (0-1):\nQuery: {query}\nDoc: {doc['content']}"}]
)
scores.append(float(response.choices[0].message.content))
return sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)Correct — batch all docs in one LLM call:
async def llm_rerank(query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
# Batch all docs in ONE LLM call
docs_text = "\n\n".join([
f"[Doc {i+1}]\n{doc['content'][:300]}..." # Truncate
for i, doc in enumerate(documents)
])
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[
{"role": "system", "content": "Rate each document's relevance (0.0-1.0). One score per line."},
{"role": "user", "content": f"Query: {query}\n\nDocuments:\n{docs_text}"}
],
temperature=0
)
scores = parse_scores(response.choices[0].message.content, len(documents))
scored_docs = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
return [{**doc, "score": score} for doc, score in scored_docs[:top_k]]Key rules:
- Batch all docs in one LLM call (reduces latency vs per-doc calls)
- Truncate to 200-400 chars per doc for LLM reranking
- Parse scores defensively (default 0.5 on parse error)
- LLM reranking at ~500ms, Cohere at ~200ms
- Set timeout with fallback to base ranking
Checklists (2)
Rag Quality
RAG Quality Checklist
Quality assurance for agentic RAG implementations.
Retrieval Quality
- Semantic search configured with appropriate embedding model
- Chunk size optimized (512-1024 tokens typical)
- Chunk overlap configured (10-20% of chunk size)
- Metadata filtering implemented for scoping
- Top-k tuned for precision/recall balance
Document Grading
- Relevance grading implemented (binary or scored)
- Grading prompt tested with diverse queries
- Threshold tuned for false positive/negative balance
- Fallback behavior defined for low-relevance results
Query Transformation
- Query rewriting enabled for failed retrievals
- Maximum retry count configured (2-3 typical)
- Query decomposition for multi-concept queries
- HyDE integration for vocabulary mismatch
Web Fallback (CRAG)
- Web search integration configured
- Rate limiting for web search API
- Result filtering and quality check
- Source attribution for web results
Self-RAG Patterns
- Adaptive retrieval decision logic implemented
- Reflection tokens for quality assessment
- Skip retrieval path for simple queries
- Confidence thresholds calibrated
Generation Quality
- Context formatting optimized
- Citation/source attribution enforced
- Hallucination detection enabled
- Output length appropriate
Error Handling
- Graceful degradation on retrieval failure
- Fallback responses configured
- Retry logic with exponential backoff
- Error logging and alerting
Performance
- Retrieval latency acceptable (<500ms)
- Caching for repeated queries
- Batch embedding for efficiency
- Async execution where possible
Monitoring
- Retrieval metrics tracked (precision, recall)
- Query success/failure rates logged
- Web fallback frequency monitored
- User feedback integration
Search Implementation Checklist
PGVector Hybrid Search Implementation Checklist
Use this checklist when implementing semantic + keyword search with PGVector.
Pre-Implementation
Index Strategy Planning
- Choose vector algorithm - HNSW (recommended) or IVFFlat
- Select embedding model - OpenAI (1536), Voyage AI (1024), etc.
- Determine dimensions - Match model output dimensions
- Plan distance metric - Cosine (most common) or L2/Inner Product
- Set HNSW parameters - m=16, ef_construction=64 (good defaults)
Embedding Model Selection
- Test embedding quality - Validate on sample queries
- Measure embedding latency - API call time
- Budget embedding costs - Track usage for bulk ingestion
- Plan batch embedding - Batch API calls for efficiency
- Cache embeddings - Store in database, don't re-compute
RRF Configuration
- Set fetch multiplier - 3x (retrieve 30 for top-10 results)
- Choose RRF constant (k) - 60 (standard value)
- Plan score normalization - Use rank, not raw scores
- Define boosting factors - Section title (1.5x), path (1.15x), code (1.2x)
- Set similarity threshold - Minimum cosine similarity (e.g., 0.75)
Schema Design
- Define chunks table - id, content, embedding, metadata
- Add tsvector column - Pre-computed for keyword search
- Plan metadata fields - section_title, section_path, content_type
- Add timestamps - created_at, updated_at
- Foreign keys - Link to documents/artifacts
Implementation
Database Schema
-- 1. Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- 2. Create chunks table
CREATE TABLE chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
content TEXT NOT NULL,
-- Vector embedding (match model dimensions)
embedding vector(1024), -- Voyage AI 1024 dims
-- Pre-computed tsvector for full-text search
content_tsvector tsvector GENERATED ALWAYS AS (
to_tsvector('english', content)
) STORED,
-- Metadata
section_title TEXT,
section_path TEXT,
chunk_index INT,
content_type TEXT, -- 'code_block', 'paragraph', 'list'
-- Timestamps
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- 3. Create indexes
-- Vector search (HNSW for speed)
CREATE INDEX idx_chunks_embedding ON chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Full-text search (GIN for tsvector)
CREATE INDEX idx_chunks_content_tsvector ON chunks
USING gin (content_tsvector);
-- Metadata indexes
CREATE INDEX idx_chunks_document_id ON chunks(document_id);
CREATE INDEX idx_chunks_content_type ON chunks(content_type);- pgvector extension enabled
- Chunks table created
- Embedding column dimensions match model
- tsvector column generated and stored
- HNSW index created for vectors
- GIN index created for tsvector
- Metadata indexes created
Vector Search Query
from sqlalchemy import select, func
from pgvector.sqlalchemy import Vector
async def vector_search(
query_embedding: list[float],
top_k: int = 10,
content_type_filter: list[str] | None = None
) -> list[Chunk]:
"""Perform vector similarity search."""
# Fetch multiplier for better RRF coverage
FETCH_MULTIPLIER = 3
fetch_limit = top_k * FETCH_MULTIPLIER
# Build query
query = (
select(
Chunk.id,
(Chunk.embedding.cosine_distance(query_embedding)).label("distance"),
func.row_number().over(
order_by=Chunk.embedding.cosine_distance(query_embedding)
).label("rank")
)
.where(Chunk.embedding.isnot(None))
)
# Apply content type filter
if content_type_filter:
query = query.where(Chunk.content_type.in_(content_type_filter))
query = query.limit(fetch_limit).subquery("vector_results")
result = await session.execute(query)
return result.all()- Query embedding passed as parameter
- Cosine distance calculated
- Row number (rank) computed
- Fetch multiplier applied (3x)
- Content type filter optional
- Returns top-k * 3 results
Keyword Search Query
async def keyword_search(
query: str,
top_k: int = 10,
content_type_filter: list[str] | None = None
) -> list[Chunk]:
"""Perform BM25 keyword search."""
FETCH_MULTIPLIER = 3
fetch_limit = top_k * FETCH_MULTIPLIER
# Generate tsquery from plain text
ts_query = func.plainto_tsquery("english", query)
# Build query
query = (
select(
Chunk.id,
func.ts_rank_cd(Chunk.content_tsvector, ts_query).label("score"),
func.row_number().over(
order_by=func.ts_rank_cd(Chunk.content_tsvector, ts_query).desc()
).label("rank")
)
.where(Chunk.content_tsvector.op("@@")(ts_query))
)
# Apply content type filter
if content_type_filter:
query = query.where(Chunk.content_type.in_(content_type_filter))
query = query.limit(fetch_limit).subquery("keyword_results")
result = await session.execute(query)
return result.all()- Uses pre-indexed content_tsvector (not to_tsvector on query)
- plainto_tsquery handles special characters
- ts_rank_cd for BM25-like scoring
- Row number (rank) computed
- Fetch multiplier applied
- Only matches where tsvector matches query
Reciprocal Rank Fusion (RRF)
async def hybrid_search(
query: str,
query_embedding: list[float],
top_k: int = 10,
content_type_filter: list[str] | None = None
) -> list[Chunk]:
"""Combine vector + keyword search with RRF."""
FETCH_MULTIPLIER = 3
fetch_limit = top_k * FETCH_MULTIPLIER
K = 60 # RRF smoothing constant
# ===== 1. VECTOR SEARCH =====
vector_subquery = (
select(
Chunk.id,
(Chunk.embedding.cosine_distance(query_embedding)).label("vector_distance"),
func.row_number().over(
order_by=Chunk.embedding.cosine_distance(query_embedding)
).label("vector_rank")
)
.where(Chunk.embedding.isnot(None))
)
if content_type_filter:
vector_subquery = vector_subquery.where(
Chunk.content_type.in_(content_type_filter)
)
vector_subquery = vector_subquery.limit(fetch_limit).subquery("vector_results")
# ===== 2. KEYWORD SEARCH =====
ts_query = func.plainto_tsquery("english", query)
keyword_subquery = (
select(
Chunk.id,
func.ts_rank_cd(Chunk.content_tsvector, ts_query).label("bm25_score"),
func.row_number().over(
order_by=func.ts_rank_cd(Chunk.content_tsvector, ts_query).desc()
).label("keyword_rank")
)
.where(Chunk.content_tsvector.op("@@")(ts_query))
)
if content_type_filter:
keyword_subquery = keyword_subquery.where(
Chunk.content_type.in_(content_type_filter)
)
keyword_subquery = keyword_subquery.limit(fetch_limit).subquery("keyword_results")
# ===== 3. RECIPROCAL RANK FUSION =====
rrf_query = (
select(
func.coalesce(
vector_subquery.c.id,
keyword_subquery.c.id
).label("chunk_id"),
(
func.coalesce(1.0 / (K + vector_subquery.c.vector_rank), 0.0) +
func.coalesce(1.0 / (K + keyword_subquery.c.keyword_rank), 0.0)
).label("rrf_score"),
vector_subquery.c.vector_distance,
keyword_subquery.c.bm25_score
)
.select_from(
vector_subquery.outerjoin(
keyword_subquery,
vector_subquery.c.id == keyword_subquery.c.id,
full=True # FULL OUTER JOIN
)
)
.order_by(literal("rrf_score").desc())
.limit(top_k)
).subquery("rrf_results")
# ===== 4. FETCH FULL CHUNKS =====
final_query = (
select(Chunk, rrf_query.c.rrf_score)
.join(rrf_query, Chunk.id == rrf_query.c.chunk_id)
.order_by(rrf_query.c.rrf_score.desc())
)
result = await session.execute(final_query)
chunks = result.all()
return chunks- Both vector and keyword searches executed
- Full outer join combines results
- RRF score = 1/(k+rank_vector) + 1/(k+rank_keyword)
- Results sorted by RRF score descending
- Top-k returned
- Full chunk objects fetched
Metadata Boosting
def apply_metadata_boosting(
chunks: list[tuple[Chunk, float]],
query: str
) -> list[tuple[Chunk, float]]:
"""Boost RRF scores based on metadata relevance."""
boosted_chunks = []
for chunk, rrf_score in chunks:
boost_factor = 1.0
# Boost section titles (1.5x)
if chunk.section_title and query_matches_section_title(chunk.section_title, query):
boost_factor *= 1.5
# Boost document path (1.15x)
if chunk.section_path and query_matches_path(chunk.section_path, query):
boost_factor *= 1.15
# Boost code blocks for technical queries (1.2x)
if is_technical_query(query) and chunk.content_type == "code_block":
boost_factor *= 1.2
boosted_chunks.append((chunk, rrf_score * boost_factor))
# Re-sort by boosted score
boosted_chunks.sort(key=lambda x: x[1], reverse=True)
return boosted_chunks
def query_matches_section_title(section_title: str, query: str) -> bool:
"""Check if query keywords appear in section title."""
query_terms = set(query.lower().split())
title_terms = set(section_title.lower().split())
return len(query_terms & title_terms) > 0
def is_technical_query(query: str) -> bool:
"""Detect technical queries (code-focused)."""
technical_keywords = {
"function", "class", "method", "code", "implement",
"algorithm", "syntax", "example", "snippet"
}
query_terms = set(query.lower().split())
return len(query_terms & technical_keywords) > 0- Boosting applied after RRF
- Section title matching implemented
- Document path matching implemented
- Technical query detection implemented
- Results re-sorted after boosting
Verification
Golden Dataset Testing
import pytest
@pytest.mark.asyncio
async def test_hybrid_search_golden_dataset():
"""Test hybrid search against golden queries."""
golden_queries = load_golden_queries() # Load test cases
results = []
for query_data in golden_queries:
query = query_data["query"]
expected_chunks = query_data["expected_chunk_ids"]
# Generate embedding
embedding = await embed_text(query)
# Perform search
retrieved = await hybrid_search(query, embedding, top_k=10)
retrieved_ids = {c.id for c in retrieved}
# Check if expected chunks are in top 10
found = len(expected_chunks & retrieved_ids)
results.append({
"query": query,
"expected": len(expected_chunks),
"found": found,
"pass": found == len(expected_chunks)
})
# Calculate metrics
pass_rate = sum(r["pass"] for r in results) / len(results)
mrr = calculate_mrr(results)
print(f"Pass Rate: {pass_rate:.1%}")
print(f"MRR: {mrr:.3f}")
assert pass_rate >= 0.90, f"Pass rate {pass_rate:.1%} below 90% threshold"
def calculate_mrr(results: list[dict]) -> float:
"""Calculate Mean Reciprocal Rank."""
reciprocal_ranks = []
for result in results:
if result["found"] > 0:
# Assume first expected chunk found at rank 1 (simplified)
reciprocal_ranks.append(1.0)
else:
reciprocal_ranks.append(0.0)
return sum(reciprocal_ranks) / len(reciprocal_ranks)- Golden dataset loaded - 98+ test queries
- Pass rate measured - Target: 90%+
- MRR calculated - Mean Reciprocal Rank
- Hard queries tested - Technical, ambiguous queries
- Failures analyzed - Inspect failing queries
Retrieval Quality Metrics
@pytest.mark.asyncio
async def test_retrieval_quality_metrics():
"""Measure retrieval quality metrics."""
test_cases = load_golden_queries()
precision_at_k = []
recall_at_k = []
for case in test_cases:
query = case["query"]
relevant_chunks = set(case["expected_chunk_ids"])
# Perform search
embedding = await embed_text(query)
retrieved = await hybrid_search(query, embedding, top_k=10)
retrieved_ids = {c.id for c in retrieved}
# Precision@10: Relevant chunks in top-10 / 10
precision = len(relevant_chunks & retrieved_ids) / 10
precision_at_k.append(precision)
# Recall@10: Relevant chunks in top-10 / Total relevant
recall = len(relevant_chunks & retrieved_ids) / len(relevant_chunks)
recall_at_k.append(recall)
avg_precision = sum(precision_at_k) / len(precision_at_k)
avg_recall = sum(recall_at_k) / len(recall_at_k)
print(f"Precision@10: {avg_precision:.3f}")
print(f"Recall@10: {avg_recall:.3f}")
assert avg_precision >= 0.70, "Precision@10 below 70%"
assert avg_recall >= 0.85, "Recall@10 below 85%"- Precision@10 - Target: 70%+ (relevant in top-10)
- Recall@10 - Target: 85%+ (found most relevant)
- MRR - Target: 0.65+ (relevant chunks ranked high)
- nDCG - Normalized Discounted Cumulative Gain (optional)
Performance Benchmarks
@pytest.mark.asyncio
async def test_search_latency():
"""Measure search latency."""
import time
query = "How to implement binary search in Python?"
embedding = await embed_text(query)
# Measure vector search latency
start = time.perf_counter()
vector_results = await vector_search(embedding, top_k=30)
vector_latency = (time.perf_counter() - start) * 1000
# Measure keyword search latency
start = time.perf_counter()
keyword_results = await keyword_search(query, top_k=30)
keyword_latency = (time.perf_counter() - start) * 1000
# Measure hybrid search latency
start = time.perf_counter()
hybrid_results = await hybrid_search(query, embedding, top_k=10)
hybrid_latency = (time.perf_counter() - start) * 1000
print(f"Vector search: {vector_latency:.2f}ms")
print(f"Keyword search: {keyword_latency:.2f}ms")
print(f"Hybrid search: {hybrid_latency:.2f}ms")
# Latency targets
assert vector_latency < 100, f"Vector search latency {vector_latency:.2f}ms > 100ms"
assert keyword_latency < 50, f"Keyword search latency {keyword_latency:.2f}ms > 50ms"
assert hybrid_latency < 150, f"Hybrid search latency {hybrid_latency:.2f}ms > 150ms"- Vector search - < 100ms (HNSW index)
- Keyword search - < 50ms (GIN index)
- Hybrid search - < 150ms (combined)
- P95 latency - 95th percentile acceptable
- Index scans - Verify indexes used (EXPLAIN ANALYZE)
Index Performance Validation
-- Check if indexes are being used
EXPLAIN ANALYZE
SELECT id, embedding <=> '[0.1, 0.2, ..., 0.9]' AS distance
FROM chunks
ORDER BY distance
LIMIT 30;
-- Should show "Index Scan using idx_chunks_embedding"
-- NOT "Seq Scan" (sequential scan = no index!)- Vector index used - EXPLAIN shows "Index Scan using idx_chunks_embedding"
- Keyword index used - EXPLAIN shows "Bitmap Index Scan using idx_chunks_content_tsvector"
- No sequential scans - Avoid full table scans
- Index size reasonable - Check pg_indexes view
- Vacuum/Analyze run - Update statistics for query planner
Post-Implementation
Production Monitoring
- Search latency dashboard - P50, P95, P99 latency
- Retrieval quality tracking - Pass rate, MRR over time
- Index bloat - Monitor index size growth
- Query patterns - Log common queries, identify gaps
- Error rate - Track search failures
Optimization Opportunities
- Tune HNSW parameters - Increase m or ef_construction for accuracy
- Increase fetch multiplier - 3x → 5x for better RRF coverage
- Add more boosting - Domain-specific metadata boosts
- Multi-query retrieval - Generate query variations
- Hybrid query rewriting - Expand acronyms, synonyms
Index Maintenance
- Run VACUUM ANALYZE - Weekly or after bulk inserts
- Rebuild indexes - If bloated (pg_repack)
- Monitor index usage - Drop unused indexes
- Update statistics - Ensure query planner has fresh stats
- Test on production-scale data - Validate performance at scale
Troubleshooting
| Issue | Check |
|---|---|
| Slow vector search | HNSW index exists? Dimensions match? Increase m/ef_construction? |
| Slow keyword search | GIN index on tsvector? Using content_tsvector, not to_tsvector()? |
| Low pass rate | Increase fetch multiplier, add boosting, check embeddings quality |
| No keyword matches | Check tsvector generation, query language (English?), special chars |
| Wrong results | Validate RRF logic, check boosting factors, inspect rankings |
| Index not used | Run ANALYZE, check query plan (EXPLAIN), verify index conditions |
OrchestKit Integration
# Example: Search for content in OrchestKit
from app.shared.services.search.search_service import SearchService
search_service = SearchService()
results = await search_service.search(
query="How to implement hybrid search?",
top_k=10,
filters={"content_type": ["code_block", "paragraph"]}
)
# Results include chunk content, metadata, and RRF score
for chunk, score in results:
print(f"Score: {score:.4f} | {chunk.section_title}")
print(chunk.content[:200])- Search service integrated with API endpoints
- Results exposed via
/api/v1/searchendpoint - Filters applied for content_type, document_id
- Results paginated (offset/limit)
- Searchable in frontend UI
References
- PGVector Docs: https://github.com/pgvector/pgvector
- OrchestKit Implementation:
backend/app/db/repositories/chunk_repository.py - Search Service:
backend/app/shared/services/search/search_service.py - Constants:
backend/app/core/constants.py - Related Skill:
thedatabase-schema-designerskill
Quality Gates
Use when assessing task complexity, before starting complex tasks, when stuck after multiple attempts, or reviewing code against best practices. Provides quality-gates scoring (1-5), escalation workflows, and pattern library management.
React Server Components Framework
Use when building Next.js 16+ apps with React Server Components. Covers App Router, Cache Components (replacing experimental_ppr), streaming SSR, Server Actions, and React 19 patterns for server-first architecture.
Last updated on