Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Rag Retrieval

Retrieval-Augmented Generation patterns for grounded LLM responses. Use when building RAG pipelines, embedding documents, implementing hybrid search, contextual retrieval, HyDE, agentic RAG, multimodal RAG, query decomposition, reranking, or pgvector search.

Reference high

Primary Agent: data-pipeline-engineer

RAG Retrieval

Comprehensive patterns for building production RAG systems. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Core RAG4CRITICALBasic RAG, citations, hybrid search, context management
Embeddings3HIGHModel selection, chunking, batch/cache optimization
Contextual Retrieval3HIGHContext-prepending, hybrid BM25+vector, pipeline
HyDE3HIGHVocabulary mismatch, hypothetical document generation
Agentic RAG4HIGHSelf-RAG, CRAG, knowledge graphs, adaptive routing
Multimodal RAG3MEDIUMImage+text retrieval, PDF chunking, cross-modal search
Query Decomposition3MEDIUMMulti-concept queries, parallel retrieval, RRF fusion
Reranking3MEDIUMCross-encoder, LLM scoring, combined signals
PGVector4HIGHPostgreSQL hybrid search, HNSW indexes, schema design

Total: 30 rules across 9 categories

Core RAG

Fundamental patterns for retrieval, generation, and pipeline composition.

RuleFileKey Pattern
Basic RAGrules/core-basic-rag.mdRetrieve + context + generate with citations
Hybrid Searchrules/core-hybrid-search.mdRRF fusion (k=60) for semantic + keyword
Context Managementrules/core-context-management.mdToken budgeting + sufficiency check
Pipeline Compositionrules/core-pipeline-composition.mdComposable Decompose → HyDE → Retrieve → Rerank

Embeddings

Embedding models, chunking strategies, and production optimization.

RuleFileKey Pattern
Models & APIrules/embeddings-models.mdModel selection, batch API, similarity
Chunkingrules/embeddings-chunking.mdSemantic boundary splitting, 512 token sweet spot
Advancedrules/embeddings-advanced.mdRedis cache, Matryoshka dims, batch processing

Contextual Retrieval

Anthropic's context-prepending technique — 67% fewer retrieval failures.

RuleFileKey Pattern
Context Prependingrules/contextual-prepend.mdLLM-generated context + prompt caching
Hybrid Searchrules/contextual-hybrid.md40% BM25 / 60% vector weight split
Complete Pipelinerules/contextual-pipeline.mdEnd-to-end indexing + hybrid retrieval

HyDE

Hypothetical Document Embeddings for bridging vocabulary gaps.

RuleFileKey Pattern
Generationrules/hyde-generation.mdEmbed hypothetical doc, not query
Per-Conceptrules/hyde-per-concept.mdParallel HyDE for multi-topic queries
Fallbackrules/hyde-fallback.md2-3s timeout → direct embedding fallback

Agentic RAG

Self-correcting retrieval with LLM-driven decision making.

RuleFileKey Pattern
Self-RAGrules/agentic-self-rag.mdBinary document grading for relevance
Corrective RAGrules/agentic-corrective-rag.mdCRAG workflow with web fallback
Knowledge Graphrules/agentic-knowledge-graph.mdKG + vector hybrid for entity-rich domains
Adaptive Retrievalrules/agentic-adaptive-retrieval.mdQuery routing to optimal strategy

Multimodal RAG

Image + text retrieval with cross-modal search.

RuleFileKey Pattern
Embeddingsrules/multimodal-embeddings.mdCLIP, SigLIP 2, Voyage multimodal-3
Chunkingrules/multimodal-chunking.mdPDF extraction preserving images
Pipelinerules/multimodal-pipeline.mdDedup + hybrid retrieval + generation

Query Decomposition

Breaking complex queries into concepts for parallel retrieval.

RuleFileKey Pattern
Detectionrules/query-detection.mdHeuristic indicators (<1ms fast path)
Decompose + RRFrules/query-decompose.mdLLM concept extraction + parallel retrieval
HyDE Comborules/query-hyde-combo.mdDecompose + HyDE for maximum coverage

Reranking

Post-retrieval re-scoring for higher precision.

RuleFileKey Pattern
Cross-Encoderrules/reranking-cross-encoder.mdms-marco-MiniLM (~50ms, free)
LLM Rerankingrules/reranking-llm.mdBatch scoring + Cohere API
Combinedrules/reranking-combined.mdMulti-signal weighted scoring

PGVector

Production hybrid search with PostgreSQL.

RuleFileKey Pattern
Schemarules/pgvector-schema.mdHNSW index + pre-computed tsvector
Hybrid Searchrules/pgvector-hybrid-search.mdSQLAlchemy RRF with FULL OUTER JOIN
Indexingrules/pgvector-indexing.mdHNSW (17x faster) vs IVFFlat
Metadatarules/pgvector-metadata.mdFiltering, boosting, Redis 8 comparison

Quick Start Example

from openai import OpenAI

client = OpenAI()

async def rag_query(question: str, top_k: int = 5) -> dict:
    """Basic RAG with citations."""
    docs = await vector_db.search(question, limit=top_k)
    context = "\n\n".join([f"[{i+1}] {doc.text}" for i, doc in enumerate(docs)])

    response = await llm.chat([
        {"role": "system", "content": "Answer with inline citations [1], [2]. Use ONLY provided context."},
        {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
    ])

    return {"answer": response.content, "sources": [d.metadata['source'] for d in docs]}

Key Decisions

DecisionRecommendation
Embedding modeltext-embedding-3-small (general), voyage-3 (production)
Chunk size256-1024 tokens (512 typical)
Hybrid weight40% BM25 / 60% vector
Top-k3-10 documents
Temperature0.1-0.3 (factual)
Context budget4K-8K tokens
RerankingRetrieve 50, rerank to 10
Vector indexHNSW (production), IVFFlat (high-volume)
HyDE timeout2-3 seconds with fallback
Query decompositionHeuristic first, LLM only if multi-concept

Common Mistakes

  1. No citation tracking (unverifiable answers)
  2. Context too large (dilutes relevance)
  3. Single retrieval method (misses keyword matches)
  4. Not chunking long documents (context gets lost)
  5. Embedding queries differently than documents
  6. No fallback path in agentic RAG (workflow hangs)
  7. Infinite rewrite loops (no retry limit)
  8. Using wrong similarity metric (cosine vs euclidean)
  9. Not caching embeddings (recomputing unchanged content)
  10. Missing image captions in multimodal RAG (limits text search)

Evaluations

See test-cases.json for 30 test cases across all categories.

  • ork:langgraph - LangGraph workflow patterns (for agentic RAG workflows)
  • caching - Cache RAG responses for repeated queries
  • ork:golden-dataset - Evaluate retrieval quality
  • ork:llm-integration - Local embeddings with nomic-embed-text
  • vision-language-models - Image analysis for multimodal RAG
  • ork:database-patterns - Schema design for vector search

Capability Details

retrieval-patterns

Keywords: retrieval, context, chunks, relevance, rag Solves:

  • Retrieve relevant context for LLM
  • Implement RAG pipeline with citations
  • Optimize retrieval quality

Keywords: hybrid, bm25, vector, fusion, rrf Solves:

  • Combine keyword and semantic search
  • Implement reciprocal rank fusion
  • Balance precision and recall

embeddings

Keywords: embedding, text to vector, vectorize, chunk, similarity Solves:

  • Convert text to vector embeddings
  • Choose embedding models and dimensions
  • Implement chunking strategies

contextual-retrieval

Keywords: contextual, anthropic, context-prepend, bm25 Solves:

  • Prepend context to chunks for better retrieval
  • Reduce retrieval failures by 67%
  • Implement hybrid BM25+vector search

hyde

Keywords: hyde, hypothetical, vocabulary mismatch Solves:

  • Bridge vocabulary gaps in semantic search
  • Generate hypothetical documents for embedding
  • Handle abstract or conceptual queries

agentic-rag

Keywords: self-rag, crag, corrective, adaptive, grading Solves:

  • Build self-correcting RAG workflows
  • Grade document relevance
  • Implement web search fallback

multimodal-rag

Keywords: multimodal, image, clip, vision, pdf Solves:

  • Build RAG with images and text
  • Cross-modal search (text → image)
  • Process PDFs with mixed content

query-decomposition

Keywords: decompose, multi-concept, complex query Solves:

  • Break complex queries into concepts
  • Parallel retrieval per concept
  • Improve coverage for compound questions

reranking

Keywords: rerank, cross-encoder, precision, scoring Solves:

  • Improve search precision post-retrieval
  • Score relevance with cross-encoder or LLM
  • Combine multiple scoring signals

Keywords: pgvector, postgresql, hnsw, tsvector, hybrid Solves:

  • Production hybrid search with PostgreSQL
  • HNSW vs IVFFlat index selection
  • SQL-based RRF fusion

Rules (30)

Route queries to the best retrieval strategy using adaptive selection per query type — MEDIUM

Adaptive Retrieval

Route queries to optimal retrieval strategies based on query characteristics.

Query Router:

from pydantic import BaseModel, Field
from typing import Literal

class QueryRoute(BaseModel):
    strategy: Literal["direct", "hyde", "decompose", "web"] = Field(
        description="Best retrieval strategy for this query"
    )
    reasoning: str

async def route_query(question: str) -> str:
    route = await llm.with_structured_output(QueryRoute).ainvoke(
        f"Choose the best retrieval strategy for: {question}\n"
        "- direct: Simple factual queries with clear keywords\n"
        "- hyde: Abstract/conceptual queries with vocabulary mismatch\n"
        "- decompose: Multi-concept queries spanning multiple topics\n"
        "- web: Recent events or data not in knowledge base"
    )
    return route.strategy

Multi-Source Orchestration:

async def adaptive_search(question: str, top_k: int = 10) -> list[dict]:
    strategy = await route_query(question)

    if strategy == "direct":
        return await retriever.search(question, top_k=top_k)
    elif strategy == "hyde":
        hyde_result = await hyde_service.generate(question)
        return await retriever.search_by_embedding(hyde_result.embedding, top_k=top_k)
    elif strategy == "decompose":
        return await decomposed_search(question, top_k=top_k)
    elif strategy == "web":
        return await web_search(question)

Incorrect — hardcoded single retrieval strategy:

async def search(question: str) -> list[dict]:
    # Always uses HyDE regardless of query type
    hyde_result = await hyde_service.generate(question)
    return await retriever.search_by_embedding(hyde_result.embedding, top_k=10)

Correct — adaptive routing based on query characteristics:

async def adaptive_search(question: str) -> list[dict]:
    strategy = await route_query(question)  # Choose best approach

    if strategy == "direct":
        return await retriever.search(question, top_k=10)  # Fast path
    elif strategy == "hyde":
        hyde_result = await hyde_service.generate(question)
        return await retriever.search_by_embedding(hyde_result.embedding, top_k=10)

Key rules:

  • Route queries to optimal sources based on query type
  • Direct search for simple factual queries (fastest)
  • HyDE for abstract/conceptual queries (vocabulary bridging)
  • Decomposition for multi-concept queries (comprehensive coverage)
  • Web search for recent events or out-of-knowledge-base queries
  • Routing adds ~200ms overhead — use heuristics for fast-path decisions

Apply corrective RAG with quality assurance and web fallback for self-correction — HIGH

Corrective RAG (CRAG)

Self-correcting retrieval with query rewriting and web search fallback.

CRAG Workflow:

def build_crag_workflow() -> StateGraph:
    workflow = StateGraph(RAGState)

    workflow.add_node("retrieve", retrieve)
    workflow.add_node("grade", grade_documents)
    workflow.add_node("generate", generate)
    workflow.add_node("web_search", web_search)
    workflow.add_node("transform_query", transform_query)

    workflow.add_edge(START, "retrieve")
    workflow.add_edge("retrieve", "grade")

    workflow.add_conditional_edges("grade", route_after_grading, {
        "generate": "generate",
        "transform_query": "transform_query",
        "web_search": "web_search"
    })

    workflow.add_edge("transform_query", "retrieve")  # Retry
    workflow.add_edge("web_search", "generate")
    workflow.add_edge("generate", END)

    return workflow.compile()

def route_after_grading(state: RAGState) -> str:
    if state["web_search_needed"]:
        if state.get("retry_count", 0) < 2:
            return "transform_query"
        return "web_search"
    return "generate"

Web Search Fallback:

def web_search(state: RAGState) -> dict:
    web_results = tavily_client.search(state["question"], max_results=5, search_depth="advanced")
    web_docs = [
        Document(page_content=r["content"], metadata={"source": r["url"], "type": "web"})
        for r in web_results
    ]
    return {"documents": web_docs, "web_search_needed": False}

Incorrect — no fallback path or retry limits:

def route_after_grading(state: RAGState) -> str:
    if state["web_search_needed"]:
        return "transform_query"  # Infinite loop possible!
    return "generate"

Correct — bounded retries with web fallback:

def route_after_grading(state: RAGState) -> str:
    if state["web_search_needed"]:
        if state.get("retry_count", 0) < 2:  # Max 2 retries
            return "transform_query"
        return "web_search"  # Fallback to web
    return "generate"

Key rules:

  • Fallback order: Rewrite query (2x max) -> Web search -> Abstain
  • Max 2-3 retries for query rewriting to prevent infinite loops
  • Web search as last resort (latency + cost)
  • Always include retry_count to prevent infinite loops
  • No fallback path = workflow hangs on bad queries

Build knowledge graph RAG for multi-hop reasoning over entity-rich domains — MEDIUM

Knowledge Graph RAG (GraphRAG)

Combine knowledge graphs with vector search for entity-rich domains.

Architecture:

Query → [Entity Extraction] → [KG Lookup] → [Vector Search] → [Merge] → [Generate]

Pattern Comparison:

PatternWhen to UseKey Feature
Self-RAGNeed adaptive retrievalLLM decides when to retrieve
CRAGNeed quality assuranceDocument grading + web fallback
GraphRAGEntity-rich domainsKnowledge graph + vector hybrid
AgenticComplex multi-stepFull plan-route-act-verify loop

Incorrect — vector-only search missing entity relationships:

async def search(query: str) -> list[dict]:
    # Misses relationships between entities
    return await vector_db.search(query, limit=10)

Correct — hybrid KG + vector search:

async def graph_rag_search(query: str) -> list[dict]:
    entities = await extract_entities(query)  # Extract entities from query
    kg_results = await neo4j.lookup_entities(entities)  # KG lookup
    vector_results = await vector_db.search(query, limit=10)  # Vector search
    return merge_results(kg_results, vector_results)  # Combine both

Key rules:

  • Use GraphRAG when domain has rich entity relationships (people, organizations, products)
  • Combine KG entity lookup with vector similarity for hybrid results
  • Entity extraction should use structured output (Pydantic) for reliability
  • Multi-hop reasoning: follow graph edges to find connected information
  • Neo4j or similar graph DB for production knowledge graphs

Grade document relevance with Self-RAG to prevent context contamination from irrelevant results — HIGH

Self-RAG — Document Grading

LLM grades retrieved documents for relevance before generation.

State Definition:

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, Annotated
from langchain_core.documents import Document
import operator

class RAGState(TypedDict):
    question: str
    documents: Annotated[List[Document], operator.add]
    generation: str
    web_search_needed: bool
    retry_count: int
    relevance_scores: dict[str, float]

Document Grading:

from pydantic import BaseModel, Field

class GradeDocuments(BaseModel):
    binary_score: str = Field(description="Relevance score 'yes' or 'no'")

def grade_documents(state: RAGState) -> dict:
    """Grade documents for relevance — core Self-RAG pattern."""
    question = state["question"]
    documents = state["documents"]
    filtered_docs, relevance_scores = [], {}

    for doc in documents:
        score = retrieval_grader.invoke({
            "question": question, "document": doc.page_content
        })
        doc_id = doc.metadata.get("id", hash(doc.page_content))
        relevance_scores[doc_id] = 1.0 if score.binary_score == "yes" else 0.0
        if score.binary_score == "yes":
            filtered_docs.append(doc)

    web_search_needed = len(filtered_docs) < len(documents) // 2
    return {
        "documents": filtered_docs,
        "web_search_needed": web_search_needed,
        "relevance_scores": relevance_scores
    }

Incorrect — no document grading, all docs used:

def generate(state: RAGState) -> dict:
    # Uses all retrieved docs without quality check
    context = "\n\n".join([d.page_content for d in state["documents"]])
    return {"generation": llm.invoke(context)}

Correct — grade documents before generation:

def grade_documents(state: RAGState) -> dict:
    filtered_docs = []
    for doc in state["documents"]:
        score = grader.invoke({"question": state["question"], "document": doc.page_content})
        if score.binary_score == "yes":  # Only keep relevant docs
            filtered_docs.append(doc)

    web_search_needed = len(filtered_docs) < len(state["documents"]) // 2
    return {"documents": filtered_docs, "web_search_needed": web_search_needed}

Key rules:

  • Binary grading (yes/no) is simpler and more reliable than numeric scores
  • Trigger web search fallback when >50% of docs are filtered out
  • Track relevance scores for debugging and quality monitoring
  • Self-RAG lets the LLM decide when to retrieve — adaptive by design

Combine contextual embeddings with BM25 hybrid search for maximum retrieval coverage — HIGH

Contextual Retrieval — Hybrid BM25+Vector

Combine contextual embeddings with BM25 for maximum retrieval quality.

Hybrid Retriever:

from rank_bm25 import BM25Okapi
import numpy as np

class HybridRetriever:
    def __init__(self, chunks: list[str], embeddings: np.ndarray):
        self.chunks = chunks
        self.embeddings = embeddings
        tokenized = [c.lower().split() for c in chunks]
        self.bm25 = BM25Okapi(tokenized)

    def search(
        self, query: str, query_embedding: np.ndarray,
        top_k: int = 20, bm25_weight: float = 0.4, vector_weight: float = 0.6
    ) -> list[tuple[int, float]]:
        bm25_scores = self.bm25.get_scores(query.lower().split())
        bm25_scores = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-6)

        vector_scores = np.dot(self.embeddings, query_embedding)
        vector_scores = (vector_scores - vector_scores.min()) / (vector_scores.max() - vector_scores.min() + 1e-6)

        combined = bm25_weight * bm25_scores + vector_weight * vector_scores
        top_indices = np.argsort(combined)[::-1][:top_k]
        return [(i, combined[i]) for i in top_indices]

Results (Anthropic Research):

MethodRetrieval Failure Rate
Traditional embeddings5.7%
+ Contextual embeddings3.5%
+ Contextual + BM25 hybrid1.9%
+ Contextual + BM25 + reranking1.3%

Incorrect — vector-only search without BM25:

def search(query: str, query_embedding: np.ndarray, top_k: int = 20) -> list[int]:
    # Misses exact-match queries
    vector_scores = np.dot(self.embeddings, query_embedding)
    return np.argsort(vector_scores)[::-1][:top_k]

Correct — hybrid BM25 + vector with proper weighting:

def search(query: str, query_embedding: np.ndarray, top_k: int = 20) -> list[tuple[int, float]]:
    bm25_scores = self.bm25.get_scores(query.lower().split())
    bm25_norm = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-6)

    vector_scores = np.dot(self.embeddings, query_embedding)
    vector_norm = (vector_scores - vector_scores.min()) / (vector_scores.max() - vector_scores.min() + 1e-6)

    combined = 0.4 * bm25_norm + 0.6 * vector_norm  # Research-backed weights
    return [(i, combined[i]) for i in np.argsort(combined)[::-1][:top_k]]

Key rules:

  • 67% reduction in retrieval failures with full contextual retrieval pipeline
  • Default weight split: 40% BM25 / 60% vector (from Anthropic research)
  • BM25 catches exact-match queries that vector search misses
  • Normalize scores before weighted combination (min-max normalization)
  • Adding reranking on top takes failure rate from 1.9% to 1.3%

Build complete contextual retrieval pipeline integrating context generation, embedding, and hybrid search — MEDIUM

Contextual Retrieval — Complete Pipeline

End-to-end pipeline with context generation, hybrid indexing, and retrieval.

Complete Pipeline:

from dataclasses import dataclass

@dataclass
class ContextualChunk:
    original: str
    contextualized: str
    embedding: list[float]
    doc_id: str
    chunk_index: int

class ContextualRetriever:
    def __init__(self, embed_model, llm_client):
        self.embed_model = embed_model
        self.llm = llm_client
        self.chunks: list[ContextualChunk] = []

    def add_document(self, doc_id: str, text: str, chunk_size: int = 512):
        raw_chunks = self._chunk_text(text, chunk_size)
        contextualized = self._contextualize_batch(text, raw_chunks)
        embeddings = self.embed_model.embed(contextualized)

        for i, (raw, ctx, emb) in enumerate(zip(raw_chunks, contextualized, embeddings)):
            self.chunks.append(ContextualChunk(
                original=raw, contextualized=ctx, embedding=emb,
                doc_id=doc_id, chunk_index=i
            ))
        self._rebuild_bm25()

    def search(self, query: str, top_k: int = 10) -> list[ContextualChunk]:
        query_emb = self.embed_model.embed([query])[0]
        bm25_scores = self.bm25.get_scores(query.lower().split())
        embeddings = np.array([c.embedding for c in self.chunks])
        vector_scores = np.dot(embeddings, query_emb)
        combined = 0.4 * self._normalize(bm25_scores) + 0.6 * self._normalize(vector_scores)
        top_indices = np.argsort(combined)[::-1][:top_k]
        return [self.chunks[i] for i in top_indices]

Parallel Processing:

async def contextualize_parallel(document: str, chunks: list[str]) -> list[str]:
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
    async def process_chunk(chunk: str) -> str:
        async with semaphore:
            context = await async_generate_context(document, chunk)
            return f"{context}\n\n{chunk}"
    return await asyncio.gather(*[process_chunk(c) for c in chunks])

Incorrect — missing context generation and hybrid indexing:

def add_document(self, doc_id: str, text: str):
    # No contextualization, no hybrid indexing
    raw_chunks = self._chunk_text(text, 512)
    embeddings = self.embed_model.embed(raw_chunks)
    self.chunks.extend(embeddings)

Correct — complete pipeline with contextualization and hybrid search:

def add_document(self, doc_id: str, text: str, chunk_size: int = 512):
    raw_chunks = self._chunk_text(text, chunk_size)
    contextualized = self._contextualize_batch(text, raw_chunks)  # Add context
    embeddings = self.embed_model.embed(contextualized)  # Embed with context

    for i, (raw, ctx, emb) in enumerate(zip(raw_chunks, contextualized, embeddings)):
        self.chunks.append(ContextualChunk(
            original=raw, contextualized=ctx, embedding=emb,
            doc_id=doc_id, chunk_index=i
        ))
    self._rebuild_bm25()  # Hybrid BM25 + vector

Key rules:

  • Use contextual retrieval when: documents have important metadata, chunks lose context, quality is critical
  • Skip if: chunks are self-contained (Q&A pairs), low-latency indexing required, cost-sensitive with many small docs
  • Parallel processing with semaphore (10 concurrent) for batch contextualization
  • Prompt caching reduces cost by ~90% when processing many chunks from same document

Prepend situational context to chunks before embedding to reduce retrieval failures — HIGH

Contextual Retrieval — Context Prepending

Prepend situational context to chunks before embedding to preserve document-level meaning.

The Problem:

Original: "ACME Q3 2024 Earnings Report..."
Chunk: "Revenue increased 15% compared to the previous quarter."
Query: "What was ACME's Q3 2024 revenue growth?"
Result: Chunk doesn't mention "ACME" or "Q3 2024" — retrieval fails

Context Generation:

import anthropic
client = anthropic.Anthropic()

CONTEXT_PROMPT = """
<document>
{document}
</document>

Here is the chunk we want to situate within the document:
<chunk>
{chunk}
</chunk>

Please give a short, succinct context (1-2 sentences) to situate this chunk
within the overall document. Focus on information that would help retrieval.
Answer only with the context, nothing else.
"""

def contextualize_chunk(document: str, chunk: str) -> str:
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=150,
        messages=[{"role": "user",
                   "content": CONTEXT_PROMPT.format(document=document, chunk=chunk)}]
    )
    return f"{response.content[0].text}\n\n{chunk}"

With Prompt Caching (90% cost reduction):

def contextualize_cached(document: str, chunk: str) -> str:
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=150,
        messages=[{"role": "user", "content": [
            {"type": "text", "text": f"<document>\n{document}\n</document>",
             "cache_control": {"type": "ephemeral"}},
            {"type": "text", "text": f"Situate this chunk (1-2 sentences):\n<chunk>\n{chunk}\n</chunk>"}
        ]}]
    )
    return f"{response.content[0].text}\n\n{chunk}"

Incorrect — chunk without document context:

def index_chunk(chunk: str) -> str:
    # Missing document context — retrieval will fail
    embedding = embed_model.embed([chunk])[0]
    return embedding

Correct — prepend situational context before embedding:

def contextualize_chunk(document: str, chunk: str) -> str:
    context = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=150,
        messages=[{"role": "user", "content": [
            {"type": "text", "text": f"<document>\n{document}\n</document>",
             "cache_control": {"type": "ephemeral"}},  # Cache for 90% cost reduction
            {"type": "text", "text": f"Situate this chunk (1-2 sentences):\n<chunk>\n{chunk}\n</chunk>"}
        ]}]
    )
    return f"{context.content[0].text}\n\n{chunk}"  # Prepend context

Key rules:

  • Good context: "This chunk is from ACME Corp's Q3 2024 earnings report, specifically the revenue section."
  • Bad context: "This is a chunk from the document." (too generic)
  • Context length: 1-2 sentences — enough without excessive token overhead
  • Use prompt caching (ephemeral) for 90% cost reduction when processing many chunks from same doc

Construct basic RAG pipeline with proper context assembly and citation tracking — CRITICAL

Basic RAG Pattern

Retrieve relevant documents, construct context, and generate grounded responses with citations.

Basic RAG:

async def rag_query(question: str, top_k: int = 5) -> str:
    """Basic RAG: retrieve then generate."""
    docs = await vector_db.search(question, limit=top_k)

    context = "\n\n".join([
        f"[{i+1}] {doc.text}"
        for i, doc in enumerate(docs)
    ])

    response = await llm.chat([
        {"role": "system", "content":
            "Answer using ONLY the provided context. "
            "If not in context, say 'I don't have that information.'"},
        {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
    ])

    return response.content

RAG with Citations:

async def rag_with_citations(question: str) -> dict:
    """RAG with inline citations [1], [2], etc."""
    docs = await vector_db.search(question, limit=5)

    context = "\n\n".join([
        f"[{i+1}] {doc.text}\nSource: {doc.metadata['source']}"
        for i, doc in enumerate(docs)
    ])

    response = await llm.chat([
        {"role": "system", "content":
            "Answer with inline citations like [1], [2]. "
            "End with a Sources section."},
        {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
    ])

    return {
        "answer": response.content,
        "sources": [doc.metadata['source'] for doc in docs]
    }

Incorrect — no citations, no grounding constraint:

async def rag_query(question: str) -> str:
    docs = await vector_db.search(question, limit=5)
    context = "\n\n".join([doc.text for doc in docs])  # No citations
    response = await llm.chat([
        {"role": "user", "content": f"{context}\n\n{question}"}  # No grounding instruction
    ])
    return response.content  # No source tracking

Correct — citations with grounding constraint:

async def rag_with_citations(question: str) -> dict:
    docs = await vector_db.search(question, limit=5)
    context = "\n\n".join([
        f"[{i+1}] {doc.text}\nSource: {doc.metadata['source']}"  # Numbered citations
        for i, doc in enumerate(docs)
    ])

    response = await llm.chat([
        {"role": "system", "content": "Answer with inline citations like [1], [2]. Use ONLY the provided context."},
        {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
    ])

    return {"answer": response.content, "sources": [doc.metadata['source'] for doc in docs]}

Key rules:

  • Always include citation tracking ([1], [2]) for verifiable answers
  • Set system prompt to constrain answers to retrieved context only
  • Use top-k of 3-10 documents, temperature 0.1-0.3 for factual tasks
  • Return sources alongside answers for transparency

Manage context window size with sufficiency checks to balance relevance and cost — HIGH

Context Window Management

Budget tokens for context and validate sufficiency before generation.

Token Budget Fitting:

def fit_context(docs: list, max_tokens: int = 6000) -> list:
    """Truncate context to fit token budget."""
    total_tokens = 0
    selected = []

    for doc in docs:
        doc_tokens = count_tokens(doc.text)
        if total_tokens + doc_tokens > max_tokens:
            break
        selected.append(doc)
        total_tokens += doc_tokens

    return selected

Sufficiency Check (Google Research 2025):

from pydantic import BaseModel

class SufficiencyCheck(BaseModel):
    is_sufficient: bool
    confidence: float  # 0.0-1.0
    missing_info: str | None = None

async def rag_with_sufficiency(question: str, top_k: int = 5) -> str:
    """RAG with hallucination prevention via sufficiency check."""
    docs = await vector_db.search(question, limit=top_k)
    context = "\n\n".join([f"[{i+1}] {doc.text}" for i, doc in enumerate(docs)])

    check = await llm.with_structured_output(SufficiencyCheck).ainvoke(
        f"Does this context contain sufficient information to answer?\n"
        f"Question: {question}\nContext:\n{context}"
    )

    if not check.is_sufficient and check.confidence > 0.7:
        return f"I don't have enough information. Missing: {check.missing_info}"

    if not check.is_sufficient and check.confidence <= 0.7:
        more_docs = await vector_db.search(question, limit=top_k * 2)
        context = "\n\n".join([f"[{i+1}] {doc.text}" for i, doc in enumerate(more_docs)])

    return await generate_with_context(question, context)

Incorrect — no token budget or sufficiency check:

async def rag_query(question: str) -> str:
    docs = await vector_db.search(question, limit=100)  # No limit!
    context = "\n\n".join([doc.text for doc in docs])  # Could exceed context window
    return await generate_with_context(question, context)  # No sufficiency check

Correct — token budget with sufficiency validation:

async def rag_with_sufficiency(question: str, top_k: int = 5) -> str:
    docs = await vector_db.search(question, limit=top_k)
    fitted = fit_context(docs, max_tokens=6000)  # Budget enforcement
    context = "\n\n".join([f"[{i+1}] {doc.text}" for i, doc in enumerate(fitted)])

    check = await llm.with_structured_output(SufficiencyCheck).ainvoke(
        f"Does this context contain sufficient information?\nQuestion: {question}\nContext:\n{context}"
    )

    if not check.is_sufficient and check.confidence > 0.7:
        return f"I don't have enough information. Missing: {check.missing_info}"

    return await generate_with_context(question, context)

Key rules:

  • Keep context under 75% of model limit, reserve for system prompt + response
  • Prioritize highest-relevance documents first
  • Context budget: 4K-8K tokens typical for factual tasks
  • RAG paradoxically increases hallucinations when context is insufficient — use sufficiency check
  • Abstain when confidence > 0.7 and context is insufficient

Combine semantic and keyword search with reciprocal rank fusion for best coverage — HIGH

Hybrid Search (Semantic + Keyword)

Combine vector similarity and keyword matching using Reciprocal Rank Fusion for best coverage.

Reciprocal Rank Fusion:

def reciprocal_rank_fusion(
    semantic_results: list,
    keyword_results: list,
    k: int = 60
) -> list:
    """Combine semantic and keyword search with RRF."""
    scores = {}

    for rank, doc in enumerate(semantic_results):
        scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)

    for rank, doc in enumerate(keyword_results):
        scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)

    ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
    return [get_doc(id) for id in ranked_ids]

Multi-list RRF (for query decomposition):

from collections import defaultdict

def multi_rrf(result_lists: list[list[dict]], k: int = 60) -> list[dict]:
    """Combine multiple ranked lists using RRF."""
    scores: defaultdict[str, float] = defaultdict(float)
    docs: dict[str, dict] = {}

    for results in result_lists:
        for rank, doc in enumerate(results, start=1):
            doc_id = doc["id"]
            scores[doc_id] += 1.0 / (k + rank)
            docs[doc_id] = doc

    ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
    return [docs[doc_id] for doc_id in ranked_ids]

Incorrect — no reciprocal rank fusion, just simple averaging:

def hybrid_search(query: str, top_k: int = 10) -> list:
    semantic = vector_search(query, top_k)
    keyword = bm25_search(query, top_k)
    # Naive merge without RRF
    return semantic[:5] + keyword[:5]

Correct — proper RRF combination:

def reciprocal_rank_fusion(semantic_results: list, keyword_results: list, k: int = 60) -> list:
    scores = {}
    for rank, doc in enumerate(semantic_results):
        scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)

    for rank, doc in enumerate(keyword_results):
        scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)

    ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
    return [get_doc(id) for id in ranked_ids]

Key rules:

  • Default weight split: 40% BM25 / 60% vector (Anthropic research optimal)
  • RRF smoothing constant k=60 is the standard — robust and parameter-free
  • Retrieve 3x the final top-k for better RRF coverage (e.g., top-30 for final top-10)
  • Normalize scores before combining if not using RRF

Compose retrieval pipeline stages in correct order to avoid redundant processing — MEDIUM

Pipeline Composition

Compose retrieval techniques in the right order for optimal results.

Standard Pipeline:

Query → [Decompose?] → [HyDE?] → [Retrieve] → [Rerank] → [Context Fit] → [Generate]

Composition Pattern:

class RAGPipeline:
    """Composable RAG pipeline with optional stages."""

    def __init__(self, retriever, reranker=None, hyde_service=None, decomposer=None):
        self.retriever = retriever
        self.reranker = reranker
        self.hyde = hyde_service
        self.decomposer = decomposer

    async def query(self, question: str, top_k: int = 10) -> list[dict]:
        # Stage 1: Query enhancement (optional)
        queries = [question]
        if self.decomposer:
            concepts = await self.decomposer.decompose(question)
            if len(concepts) > 1:
                queries = concepts

        # Stage 2: Retrieve (with optional HyDE)
        all_results = []
        for q in queries:
            if self.hyde:
                hyde_result = await self.hyde.generate(q)
                results = await self.retriever.search_by_embedding(hyde_result.embedding, top_k=top_k * 3)
            else:
                results = await self.retriever.search(q, top_k=top_k * 3)
            all_results.append(results)

        # Stage 3: Fuse if multiple queries
        if len(all_results) > 1:
            merged = reciprocal_rank_fusion(all_results)
        else:
            merged = all_results[0]

        # Stage 4: Rerank (optional)
        if self.reranker:
            merged = await self.reranker.rerank(question, merged, top_k=top_k)

        return merged[:top_k]

Incorrect — monolithic retrieval without composition:

async def query(question: str) -> list[dict]:
    # No optional stages, fixed pipeline
    docs = await retriever.search(question, top_k=10)
    return docs

Correct — composable pipeline with optional stages:

async def query(self, question: str, top_k: int = 10) -> list[dict]:
    queries = [question]
    if self.decomposer:  # Optional decomposition
        concepts = await self.decomposer.decompose(question)
        if len(concepts) > 1:
            queries = concepts

    all_results = []
    for q in queries:
        if self.hyde:  # Optional HyDE
            hyde_result = await self.hyde.generate(q)
            results = await self.retriever.search_by_embedding(hyde_result.embedding, top_k * 3)
        else:
            results = await self.retriever.search(q, top_k * 3)
        all_results.append(results)

    merged = reciprocal_rank_fusion(all_results) if len(all_results) > 1 else all_results[0]

    if self.reranker:  # Optional reranking
        merged = await self.reranker.rerank(question, merged, top_k)

    return merged[:top_k]

Key rules:

  • Compose: Decompose → HyDE → Retrieve → Rerank → Context Fit → Generate
  • HyDE adds ~500ms latency; use with fallback timeout (2-3s)
  • Reranking adds ~50-500ms; retrieve more (3x), rerank to final top-k
  • Query decomposition only when heuristic detects multi-concept query
  • Each stage is optional — start simple, add stages as needed

Implement production embedding pipelines with batching, caching, and cost optimization — MEDIUM

Advanced Embedding Patterns

Production patterns for embedding at scale.

Embedding Cache (Redis):

import hashlib
import json
import redis

class EmbeddingCache:
    def __init__(self, redis_client: redis.Redis, ttl: int = 86400):
        self.redis = redis_client
        self.ttl = ttl

    def _key(self, text: str, model: str) -> str:
        h = hashlib.md5(f"{model}:{text}".encode()).hexdigest()
        return f"emb:{h}"

    async def get_or_embed(self, text: str, model: str, embed_fn) -> list[float]:
        key = self._key(text, model)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        embedding = await embed_fn(text)
        self.redis.setex(key, self.ttl, json.dumps(embedding))
        return embedding

Batch Processing with Rate Limiting:

import asyncio

async def batch_embed(texts: list[str], embed_fn, batch_size: int = 100) -> list[list[float]]:
    """Embed texts in batches with rate limiting."""
    results = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        embeddings = await embed_fn(batch)
        results.extend(embeddings)
        if i + batch_size < len(texts):
            await asyncio.sleep(0.1)  # Rate limit courtesy
    return results

Matryoshka Dimension Reduction:

# text-embedding-3 models support Matryoshka embeddings
# Truncate to fewer dimensions with minimal quality loss
response = client.embeddings.create(
    model="text-embedding-3-large",
    input="Your text",
    dimensions=1536  # Reduce from 3072 to 1536 (saves 50% storage)
)

Incorrect — no caching or batching, wasteful API calls:

async def embed_texts(texts: list[str]) -> list[list[float]]:
    results = []
    for text in texts:  # One API call per text!
        embedding = await client.embeddings.create(
            model="text-embedding-3-large",
            input=text
        )
        results.append(embedding.data[0].embedding)
    return results

Correct — cached batching with rate limiting:

async def batch_embed(texts: list[str], batch_size: int = 100) -> list[list[float]]:
    results = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        # Check cache first
        cached_keys = [cache.get(text) for text in batch]
        uncached = [t for t, c in zip(batch, cached_keys) if not c]

        if uncached:
            embeddings = await client.embeddings.create(
                model="text-embedding-3-large",
                input=uncached,
                dimensions=1536  # Matryoshka reduction
            )
            for text, emb in zip(uncached, embeddings.data):
                cache.set(text, emb.embedding)  # Cache for reuse

        results.extend([cached or cache.get(t) for t, cached in zip(batch, cached_keys)])
        await asyncio.sleep(0.1)  # Rate limiting
    return results

Key rules:

  • Late Chunking: Embed full document, extract chunk vectors from contextualized tokens
  • Cache aggressively — same text + model = same embedding, no need to recompute
  • Batch size 100-500 per API call for optimal throughput
  • Matryoshka: Truncate text-embedding-3-large from 3072 to 1536 dims with ~2% quality loss
  • Rate limit: 0.1s delay between batches as courtesy to API providers

Choose chunking strategies carefully since chunk boundaries determine retrieval quality — HIGH

Chunking Strategies

Split documents into optimal chunks that preserve semantic meaning.

Basic Overlapping Chunks:

def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
    """Split text into overlapping chunks for embedding."""
    words = text.split()
    chunks = []

    for i in range(0, len(words), chunk_size - overlap):
        chunk = " ".join(words[i:i + chunk_size])
        if chunk:
            chunks.append(chunk)

    return chunks

Semantic Boundary Chunking (OrchestKit Standard):

CHUNK_CONFIG = {
    "target_tokens": 500,      # ~400-600 tokens per chunk
    "max_tokens": 800,         # Hard limit
    "overlap_tokens": 75,      # ~15% overlap
    "boundary_markers": [      # Prefer splitting at:
        "\n## ",               # H2 headers
        "\n### ",              # H3 headers
        "\n\n",                # Paragraphs
        ". ",                  # Sentences (last resort)
    ]
}

Sentence-Aware Chunking:

def chunk_by_sentences(text: str, chunk_size: int = 512) -> list[str]:
    sentences = text.split('. ')
    chunks, current, current_len = [], [], 0

    for sent in sentences:
        if current_len + len(sent) > chunk_size and current:
            chunks.append('. '.join(current) + '.')
            current, current_len = [sent], len(sent)
        else:
            current.append(sent)
            current_len += len(sent)

    if current:
        chunks.append('. '.join(current))
    return chunks

Incorrect — fixed-size splits without overlap or semantic boundaries:

def chunk_text(text: str) -> list[str]:
    # Arbitrary splits, no overlap, breaks mid-sentence
    return [text[i:i+500] for i in range(0, len(text), 500)]

Correct — semantic boundary chunking with overlap:

def chunk_by_sentences(text: str, chunk_size: int = 512, overlap: int = 75) -> list[str]:
    sentences = text.split('. ')
    chunks, current, current_len = [], [], 0

    for sent in sentences:
        if current_len + len(sent) > chunk_size and current:
            chunk_text = '. '.join(current) + '.'
            chunks.append(chunk_text)
            # Keep last few sentences for overlap
            overlap_sents = current[-2:] if len(current) > 2 else current
            current, current_len = overlap_sents, sum(len(s) for s in overlap_sents)
        else:
            current.append(sent)
            current_len += len(sent)

    if current:
        chunks.append('. '.join(current))
    return chunks

Key rules:

  • Chunk size: 256-1024 tokens (512 typical sweet spot)
  • Overlap: 10-20% for context continuity between chunks
  • Include metadata (title, source, section) with each chunk
  • Prefer semantic boundaries (headers, paragraphs) over fixed-size splits
  • Not chunking long documents is a common mistake — context gets lost in embeddings

Select embedding models and dimensions correctly to ensure index compatibility and quality — HIGH

Embedding Models & API

Choose the right embedding model and use the API correctly.

Quick Start:

from openai import OpenAI
client = OpenAI()

# Single text embedding
response = client.embeddings.create(
    model="text-embedding-3-small",
    input="Your text here"
)
vector = response.data[0].embedding  # 1536 dimensions

# Batch embedding (efficient)
texts = ["text1", "text2", "text3"]
response = client.embeddings.create(
    model="text-embedding-3-small",
    input=texts
)
vectors = [item.embedding for item in response.data]

Model Selection:

ModelDimsCostUse Case
text-embedding-3-small1536$0.02/1MGeneral purpose
text-embedding-3-large3072$0.13/1MHigh accuracy
nomic-embed-text (Ollama)768FreeLocal/CI
voyage-31024$0.06/1MProduction (OrchestKit)

Similarity Calculation:

import numpy as np

def cosine_similarity(a: list[float], b: list[float]) -> float:
    a, b = np.array(a), np.array(b)
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
# 1.0 = identical, 0.0 = orthogonal

Incorrect — mixing different embedding models:

# Index with one model
docs_embeddings = client.embeddings.create(
    model="text-embedding-3-large",  # 3072 dims
    input=documents
)

# Query with different model
query_embedding = client.embeddings.create(
    model="text-embedding-3-small",  # 1536 dims - MISMATCH!
    input=query
)
# Results will be nonsensical due to dimension mismatch

Correct — consistent model for queries and documents:

MODEL = "text-embedding-3-small"  # Use same model everywhere

# Index
docs_embeddings = client.embeddings.create(model=MODEL, input=documents)

# Query
query_embedding = client.embeddings.create(model=MODEL, input=query)

# Now cosine similarity is meaningful
similarity = cosine_similarity(query_embedding, docs_embeddings[0])

Key rules:

  • Embed queries and documents with the SAME model — never mix
  • Dimension reduction: Can truncate text-embedding-3-large to 1536 dims (Matryoshka)
  • Batch size: 100-500 texts per API call for efficiency
  • Cache embeddings — never re-embed unchanged content
  • Most models return normalized vectors (cosine = dot product)

Configure HyDE fallback strategy to avoid latency degradation from slow generation — MEDIUM

HyDE Fallback Strategy

Implement graceful degradation when HyDE generation is too slow.

Timeout with Fallback:

async def hyde_with_fallback(
    query: str,
    hyde_service: HyDEService,
    embed_fn: callable,
    timeout: float = 3.0,
) -> list[float]:
    """HyDE with fallback to direct embedding on timeout."""
    try:
        async with asyncio.timeout(timeout):
            result = await hyde_service.generate(query)
            return result.embedding
    except TimeoutError:
        # Fallback to direct query embedding
        return await embed_fn(query)

Performance Tips:

  • Use fast model (gpt-5.2-mini, claude-haiku-4-5) for generation
  • Cache aggressively (queries often repeat)
  • Set tight timeouts (2-3s) with fallback
  • Keep hypothetical docs concise (100-200 tokens)
  • Combine with query decomposition for best results

Incorrect — no timeout or fallback, blocking forever:

async def hyde_search(query: str) -> list[float]:
    # No timeout! May hang indefinitely
    result = await hyde_service.generate(query)
    return result.embedding

Correct — timeout with graceful fallback:

async def hyde_with_fallback(query: str, timeout: float = 3.0) -> list[float]:
    try:
        async with asyncio.timeout(timeout):
            result = await hyde_service.generate(query)
            return result.embedding
    except TimeoutError:
        # Fallback to direct query embedding
        return await embed_fn(query)

Key rules:

  • Always implement timeout fallback — HyDE generation model may be slow or unavailable
  • Default timeout: 2-3 seconds is the sweet spot (balances quality vs latency)
  • Fallback to direct query embedding maintains functionality (just lower quality)
  • Log fallback events to monitor HyDE generation reliability

Bridge query-document vocabulary mismatch with hypothetical document embeddings via HyDE — HIGH

HyDE Generation

Generate hypothetical answer documents to bridge vocabulary gaps in semantic search.

The Problem:

Query: "scaling async data pipelines"
Docs use: "event-driven messaging", "Apache Kafka", "message brokers"
-> Low similarity scores despite high relevance

The Solution:

from openai import AsyncOpenAI
from pydantic import BaseModel

class HyDEResult(BaseModel):
    original_query: str
    hypothetical_doc: str
    embedding: list[float]

async def generate_hyde(
    query: str, llm: AsyncOpenAI, embed_fn: callable, max_tokens: int = 150
) -> HyDEResult:
    """Generate hypothetical document and embed it."""
    response = await llm.chat.completions.create(
        model="gpt-5.2-mini",
        messages=[
            {"role": "system", "content":
                "Write a short paragraph that would answer this query. "
                "Use technical terminology that documentation would use."},
            {"role": "user", "content": query}
        ],
        max_tokens=max_tokens,
        temperature=0.3,
    )

    hypothetical_doc = response.choices[0].message.content
    embedding = await embed_fn(hypothetical_doc)  # Embed the hypothetical doc, not the query

    return HyDEResult(
        original_query=query,
        hypothetical_doc=hypothetical_doc,
        embedding=embedding,
    )

When to use HyDE:

ScenarioUse HyDE?
Abstract/conceptual queriesYes
Exact term searchesNo (use keyword)
Code snippet searchesNo
Natural language questionsYes
Vocabulary mismatch suspectedYes

Incorrect — embedding the query instead of hypothetical document:

async def generate_hyde(query: str) -> HyDEResult:
    response = await llm.chat.completions.create(
        model="gpt-5.2-mini",
        messages=[{"role": "user", "content": query}],
        max_tokens=150
    )
    hypothetical_doc = response.choices[0].message.content
    embedding = await embed_fn(query)  # WRONG: Embeds query, not hypothetical doc!
    return HyDEResult(query, hypothetical_doc, embedding)

Correct — embed the hypothetical document:

async def generate_hyde(query: str) -> HyDEResult:
    response = await llm.chat.completions.create(
        model="gpt-5.2-mini",
        messages=[
            {"role": "system", "content": "Write a short paragraph that would answer this query."},
            {"role": "user", "content": query}
        ],
        max_tokens=150,
        temperature=0.3
    )

    hypothetical_doc = response.choices[0].message.content
    embedding = await embed_fn(hypothetical_doc)  # Embed the hypothetical doc!

    return HyDEResult(query, hypothetical_doc, embedding)

Key rules:

  • Embed the hypothetical document, NOT the original query
  • Use fast/cheap model (gpt-5.2-mini, claude-haiku-4-5) for generation
  • Temperature 0.3 for consistent, factual hypothetical docs
  • Keep hypothetical docs concise: 100-200 tokens
  • Adds ~500ms latency — always implement with timeout fallback

Generate separate HyDE documents per concept for multi-topic vocabulary bridging — MEDIUM

Per-Concept HyDE

Generate HyDE embeddings for each concept in multi-concept queries.

Per-Concept Pattern:

async def batch_hyde(
    concepts: list[str], hyde_service: HyDEService
) -> list[HyDEResult]:
    """Generate HyDE embeddings for multiple concepts in parallel."""
    tasks = [hyde_service.generate(concept) for concept in concepts]
    return await asyncio.gather(*tasks)

With Caching:

class HyDEService:
    def __init__(self, llm, embed_fn):
        self.llm = llm
        self.embed_fn = embed_fn
        self._cache: dict[str, HyDEResult] = {}

    def _cache_key(self, query: str) -> str:
        return hashlib.md5(query.lower().strip().encode()).hexdigest()

    async def generate(self, query: str) -> HyDEResult:
        key = self._cache_key(query)
        if key in self._cache:
            return self._cache[key]
        result = await generate_hyde(query, self.llm, self.embed_fn)
        self._cache[key] = result
        return result

Incorrect — sequential HyDE generation, slow:

async def batch_hyde(concepts: list[str]) -> list[HyDEResult]:
    results = []
    for concept in concepts:  # Sequential! Slow for many concepts
        result = await hyde_service.generate(concept)
        results.append(result)
    return results

Correct — parallel HyDE generation:

async def batch_hyde(concepts: list[str]) -> list[HyDEResult]:
    # Parallel generation for all concepts simultaneously
    tasks = [hyde_service.generate(concept) for concept in concepts]
    return await asyncio.gather(*tasks)

Key rules:

  • For multi-concept queries, decompose first then generate HyDE per concept
  • Cache aggressively — queries often repeat
  • Parallel generation with asyncio.gather for all concepts simultaneously
  • Combine with query decomposition for best results on complex queries

Chunk multimodal documents to preserve relationships between text, images, and tables — MEDIUM

Multimodal Document Chunking

Chunk PDFs preserving images, tables, and text relationships.

Multimodal Chunks:

from dataclasses import dataclass
from typing import Literal, Optional

@dataclass
class Chunk:
    content: str
    chunk_type: Literal["text", "image", "table", "chart"]
    page: int
    image_path: Optional[str] = None
    embedding: Optional[list[float]] = None

def chunk_multimodal_document(pdf_path: str) -> list[Chunk]:
    import fitz  # PyMuPDF
    doc = fitz.open(pdf_path)
    chunks = []

    for page_num, page in enumerate(doc):
        text_blocks = page.get_text("blocks")
        current_text = ""

        for block in text_blocks:
            if block[6] == 0:  # Text block
                current_text += block[4] + "\n"
            else:  # Image block
                if current_text.strip():
                    chunks.append(Chunk(content=current_text.strip(), chunk_type="text", page=page_num))
                    current_text = ""
                xref = block[7]
                img = doc.extract_image(xref)
                img_path = f"/tmp/page{page_num}_img{xref}.{img['ext']}"
                with open(img_path, "wb") as f:
                    f.write(img["image"])
                caption = generate_image_caption(img_path)
                chunks.append(Chunk(content=caption, chunk_type="image", page=page_num, image_path=img_path))

        if current_text.strip():
            chunks.append(Chunk(content=current_text.strip(), chunk_type="text", page=page_num))

    return chunks

Incorrect — text-only chunking, loses images and tables:

def chunk_pdf(pdf_path: str) -> list[str]:
    import fitz
    doc = fitz.open(pdf_path)
    chunks = []
    for page in doc:
        chunks.append(page.get_text())  # Text only, images lost!
    return chunks

Correct — multimodal chunking with images and captions:

def chunk_multimodal_document(pdf_path: str) -> list[Chunk]:
    import fitz
    doc = fitz.open(pdf_path)
    chunks = []

    for page_num, page in enumerate(doc):
        text_blocks = page.get_text("blocks")
        for block in text_blocks:
            if block[6] == 0:  # Text block
                chunks.append(Chunk(content=block[4], chunk_type="text", page=page_num))
            else:  # Image block
                xref = block[7]
                img = doc.extract_image(xref)
                img_path = f"/tmp/page{page_num}_img{xref}.{img['ext']}"
                with open(img_path, "wb") as f:
                    f.write(img["image"])
                caption = generate_image_caption(img_path)
                chunks.append(Chunk(content=caption, chunk_type="image", page=page_num, image_path=img_path))

    return chunks

Key rules:

  • Extract images separately and generate captions for text-based search
  • Preserve page numbers for citation and navigation
  • Use PyMuPDF (fitz) for reliable PDF extraction
  • Process large PDFs in page-range batches (CC 2.1.30: max 20 pages per Read)
  • Always store image paths alongside embeddings for result display

Use multimodal embedding models for cross-modal search across text and images — HIGH

Multimodal Embeddings

Embed images and text in the same vector space for cross-modal retrieval.

Model Selection:

ModelContextModalitiesBest For
Voyage multimodal-332K tokensText + ImageLong documents
SigLIP 2StandardText + ImageLarge-scale retrieval
CLIP ViT-L/1477 tokensText + ImageGeneral purpose
ImageBindStandard6 modalitiesAudio/video included
ColPaliDocumentText + ImagePDF/document RAG

CLIP Embeddings:

from transformers import CLIPProcessor, CLIPModel
import torch

model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")

def embed_image(image_path: str) -> list[float]:
    image = Image.open(image_path)
    inputs = processor(images=image, return_tensors="pt")
    with torch.no_grad():
        embeddings = model.get_image_features(**inputs)
    embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True)
    return embeddings[0].tolist()

def embed_text(text: str) -> list[float]:
    inputs = processor(text=[text], return_tensors="pt", padding=True)
    with torch.no_grad():
        embeddings = model.get_text_features(**inputs)
    embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True)
    return embeddings[0].tolist()

Voyage Multimodal-3 (Long Context):

import voyageai
client = voyageai.Client()

def embed_multimodal(texts=None, images=None) -> list[list[float]]:
    inputs = []
    if texts:
        inputs.extend([{"type": "text", "content": t} for t in texts])
    if images:
        for path in images:
            with open(path, "rb") as f:
                b64 = base64.b64encode(f.read()).decode()
                inputs.append({"type": "image", "content": f"data:image/png;base64,{b64}"})
    return client.multimodal_embed(inputs=inputs, model="voyage-multimodal-3").embeddings

Incorrect — using text-only embeddings for images:

def embed_image(image_path: str) -> list[float]:
    # Using text embedding model for images - wrong modality!
    caption = generate_caption(image_path)
    return text_embed_model.embed([caption])[0]  # Loses visual features

Correct — multimodal embeddings for cross-modal search:

def embed_image(image_path: str) -> list[float]:
    image = Image.open(image_path)
    inputs = processor(images=image, return_tensors="pt")
    with torch.no_grad():
        embeddings = model.get_image_features(**inputs)
    embeddings = embeddings / embeddings.norm(dim=-1, keepdim=True)  # Normalize
    return embeddings[0].tolist()

Key rules:

  • Normalize embeddings for cosine similarity (CLIP already normalized)
  • Voyage multimodal-3 for long documents (32K context)
  • SigLIP 2 for large-scale production retrieval
  • Always embed both images AND captions for maximum coverage

Build unified multimodal RAG pipeline that merges cross-modal results with deduplication — MEDIUM

Multimodal RAG Pipeline

Build end-to-end multimodal retrieval and generation pipeline.

Hybrid Retrieval:

class MultimodalRAG:
    def __init__(self, vector_db, vision_model, text_model):
        self.vector_db = vector_db
        self.vision_model = vision_model
        self.text_model = text_model

    async def retrieve(self, query: str, query_image: str = None, top_k: int = 10) -> list[dict]:
        results = []
        text_emb = embed_text(query)
        text_results = await self.vector_db.search(embedding=text_emb, top_k=top_k)
        results.extend(text_results)

        if query_image:
            img_emb = embed_image(query_image)
            img_results = await self.vector_db.search(embedding=img_emb, top_k=top_k)
            results.extend(img_results)

        # Dedupe by doc_id, keep highest score
        seen = {}
        for r in results:
            doc_id = r["metadata"]["doc_id"]
            if doc_id not in seen or r["score"] > seen[doc_id]["score"]:
                seen[doc_id] = r
        return sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:top_k]

Multimodal Generation:

async def generate_with_context(query: str, chunks: list[Chunk], model: str = "claude-opus-4-6") -> str:
    content = []
    # Add images first (attention positioning)
    for chunk in chunks:
        if chunk.chunk_type == "image" and chunk.image_path:
            b64, media_type = encode_image_base64(chunk.image_path)
            content.append({"type": "image", "source": {"type": "base64", "media_type": media_type, "data": b64}})
    # Add text context
    text_context = "\n\n".join([f"[Page {c.page}]: {c.content}" for c in chunks if c.chunk_type == "text"])
    content.append({"type": "text", "text": f"Context:\n{text_context}\n\nQuestion: {query}"})

    response = client.messages.create(model=model, max_tokens=4096, messages=[{"role": "user", "content": content}])
    return response.content[0].text

Incorrect — no deduplication, fragmented results:

async def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
    text_emb = embed_text(query)
    text_results = await self.vector_db.search(embedding=text_emb, top_k=top_k)

    img_emb = embed_image(query_image)
    img_results = await self.vector_db.search(embedding=img_emb, top_k=top_k)

    # No deduplication! Same doc may appear twice
    return text_results + img_results

Correct — deduplicated cross-modal results:

async def retrieve(self, query: str, query_image: str = None, top_k: int = 10) -> list[dict]:
    results = []
    text_emb = embed_text(query)
    text_results = await self.vector_db.search(embedding=text_emb, top_k=top_k)
    results.extend(text_results)

    if query_image:
        img_emb = embed_image(query_image)
        img_results = await self.vector_db.search(embedding=img_emb, top_k=top_k)
        results.extend(img_results)

    # Dedupe by doc_id, keep highest score
    seen = {}
    for r in results:
        doc_id = r["metadata"]["doc_id"]
        if doc_id not in seen or r["score"] > seen[doc_id]["score"]:
            seen[doc_id] = r
    return sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:top_k]

Key rules:

  • Deduplicate by document ID — keep highest scoring result per document
  • Place images before text in generation prompt (attention positioning)
  • Always embed both image features AND text captions for maximum coverage
  • Use hybrid approach: CLIP + text embeddings for best accuracy
  • Missing image URL storage is a common mistake — always store paths for display

Implement PGVector hybrid search with FULL OUTER JOIN and RRF fusion ranking — HIGH

PGVector Hybrid Search (SQL)

Hybrid vector+keyword search with RRF fusion in SQLAlchemy.

Hybrid Search Query:

async def hybrid_search(query: str, query_embedding: list[float], top_k: int = 10) -> list[Chunk]:
    FETCH_MULTIPLIER = 3
    K = 60  # RRF smoothing constant

    # Vector search subquery
    vector_subq = (
        select(Chunk.id,
            func.row_number().over(
                order_by=Chunk.embedding.cosine_distance(query_embedding)
            ).label("vector_rank"))
        .limit(top_k * FETCH_MULTIPLIER)
        .subquery()
    )

    # Keyword search subquery
    ts_query = func.plainto_tsquery("english", query)
    keyword_subq = (
        select(Chunk.id,
            func.row_number().over(
                order_by=func.ts_rank_cd(Chunk.content_tsvector, ts_query).desc()
            ).label("keyword_rank"))
        .where(Chunk.content_tsvector.op("@@")(ts_query))
        .limit(top_k * FETCH_MULTIPLIER)
        .subquery()
    )

    # RRF fusion with FULL OUTER JOIN
    rrf_subq = (
        select(
            func.coalesce(vector_subq.c.id, keyword_subq.c.id).label("chunk_id"),
            (func.coalesce(1.0 / (K + vector_subq.c.vector_rank), 0.0) +
             func.coalesce(1.0 / (K + keyword_subq.c.keyword_rank), 0.0)
            ).label("rrf_score"))
        .select_from(vector_subq.outerjoin(keyword_subq, ..., full=True))
        .order_by("rrf_score DESC")
        .limit(top_k)
        .subquery()
    )

    return await session.execute(
        select(Chunk).join(rrf_subq, Chunk.id == rrf_subq.c.chunk_id)
    )

RRF Formula:

rrf_score = 1/(k + vector_rank) + 1/(k + keyword_rank)  # k=60 (standard)

Incorrect — separate queries without RRF fusion:

async def hybrid_search(query: str, embedding: list[float], top_k: int = 10):
    # Separate queries, no fusion!
    vector_results = await session.execute(
        select(Chunk).order_by(Chunk.embedding.cosine_distance(embedding)).limit(top_k)
    )
    keyword_results = await session.execute(
        select(Chunk).where(Chunk.content_tsvector.op("@@")(plainto_tsquery(query))).limit(top_k)
    )
    # Naive merge, no RRF
    return list(vector_results) + list(keyword_results)

Correct — RRF fusion with FULL OUTER JOIN:

async def hybrid_search(query: str, query_embedding: list[float], top_k: int = 10):
    K = 60  # RRF smoothing constant

    # Vector search subquery
    vector_subq = select(Chunk.id, func.row_number().over(
        order_by=Chunk.embedding.cosine_distance(query_embedding)
    ).label("vector_rank")).limit(top_k * 3).subquery()

    # Keyword search subquery
    keyword_subq = select(Chunk.id, func.row_number().over(
        order_by=func.ts_rank_cd(Chunk.content_tsvector, plainto_tsquery(query)).desc()
    ).label("keyword_rank")).limit(top_k * 3).subquery()

    # RRF fusion with FULL OUTER JOIN
    rrf_subq = select(
        func.coalesce(vector_subq.c.id, keyword_subq.c.id).label("chunk_id"),
        (func.coalesce(1.0 / (K + vector_subq.c.vector_rank), 0.0) +
         func.coalesce(1.0 / (K + keyword_subq.c.keyword_rank), 0.0)).label("rrf_score")
    ).select_from(vector_subq.outerjoin(keyword_subq, ..., full=True)).order_by("rrf_score DESC").limit(top_k)

    return await session.execute(select(Chunk).join(rrf_subq, Chunk.id == rrf_subq.c.chunk_id))

Key rules:

  • Use FULL OUTER JOIN to catch docs found by only one search method
  • 3x fetch multiplier for better RRF coverage (30 per method for top 10 final)
  • RRF smoothing constant k=60 is the standard
  • Use func.coalesce(..., 0.0) for documents found by only one method
  • plainto_tsquery for user queries (handles multi-word safely)

Choose correct PGVector index type to avoid 17x slower queries in production — HIGH

PGVector Index Strategies

Choose and configure the right vector index for your workload.

Index Comparison:

MetricIVFFlatHNSW
Query speed50ms3ms (17x faster)
Index time2 min20 min
Best for< 100k vectors100k+ vectors
Recall@100.85-0.950.95-0.99

HNSW Configuration (Recommended):

-- Create HNSW index
CREATE INDEX idx_chunks_embedding_hnsw ON chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- Query-time tuning
SET hnsw.ef_search = 40;  -- Higher = better recall, slower

-- Iterative scan for filtered queries (pgvector 0.8+)
SET hnsw.iterative_scan = 'relaxed_order';

Search Type Comparison:

AspectSemantic (Vector)Keyword (BM25)
QueryEmbedding similarityExact word matches
StrengthsSynonyms, conceptsExact phrases, rare terms
WeaknessesExact matches, technical termsNo semantic understanding
IndexHNSW (pgvector)GIN (tsvector)

Incorrect — no index, sequential scan on every query:

-- No index! Sequential scan is 17x slower
SELECT * FROM chunks
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;

Correct — HNSW index for fast queries:

-- Create HNSW index
CREATE INDEX idx_chunks_embedding_hnsw ON chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- Query-time tuning
SET hnsw.ef_search = 40;  -- Higher = better recall

-- Now queries are 17x faster
SELECT * FROM chunks
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 10;

Key rules:

  • Use HNSW for production (scales to millions, 17x faster queries)
  • IVFFlat only for >1000 queries/sec where index build time matters
  • m=16, ef_construction=64 are good defaults for most workloads
  • Set hnsw.ef_search = 40 at query time for production recall
  • Use iterative_scan = 'relaxed_order' for filtered vector queries

Filter PGVector results by metadata and boost scores for improved retrieval relevance — MEDIUM

PGVector Metadata Filtering & Patterns

Filter and boost search results using metadata.

Filtered Search:

results = await hybrid_search(
    query="binary search",
    query_embedding=embedding,
    content_type_filter=["code_block"]
)

Similarity Threshold:

results = await hybrid_search(query, embedding, top_k=50)
filtered = [r for r in results if (1 - r.vector_distance) >= 0.75][:10]

Multi-Query Retrieval:

queries = ["machine learning", "ML algorithms", "neural networks"]
all_results = [await hybrid_search(q, embed(q)) for q in queries]
final = deduplicate_and_rerank(all_results)

Redis 8 FT.HYBRID Alternative:

AspectpgvectorRedis 8 FT.HYBRID
SetupMediumLow
RRFManual SQLNative COMBINE RRF
Latency5-20ms2-5ms
PersistenceACIDAOF/RDB
Max datasetBillionsMemory-bound (~100M)

Incorrect — no metadata filtering, irrelevant results:

# Returns all content types mixed together
results = await session.execute(
    select(Chunk).order_by(Chunk.embedding.cosine_distance(embedding)).limit(10)
)

Correct — filtered search with similarity threshold:

# Filter by content_type and similarity threshold
results = await session.execute(
    select(Chunk)
    .where(Chunk.content_type == "code_block")  # Pre-filter
    .order_by(Chunk.embedding.cosine_distance(embedding))
    .limit(50)
)

# Apply similarity threshold
filtered = [r for r in results if (1 - r.vector_distance) >= 0.75][:10]

Key rules:

  • Metadata boosting (title/path matching) adds +6% MRR
  • Pre-filter by content_type for targeted search
  • Similarity threshold 0.75 is a good default for filtering low-relevance results
  • Choose pgvector for: ACID, complex joins, large datasets, existing PostgreSQL
  • Choose Redis 8 for: sub-5ms latency, caching layer, simpler deployment

Design PGVector schema with pre-computed tsvector columns and proper index configuration — HIGH

PGVector Database Schema

Production schema with pre-computed tsvector and HNSW index.

Schema:

CREATE TABLE chunks (
    id UUID PRIMARY KEY,
    document_id UUID REFERENCES documents(id),
    content TEXT NOT NULL,
    embedding vector(1024),  -- PGVector
    content_tsvector tsvector GENERATED ALWAYS AS (
        to_tsvector('english', content)
    ) STORED,
    section_title TEXT,
    content_type TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);

-- HNSW index for vector search
CREATE INDEX idx_chunks_embedding ON chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- GIN index for keyword search
CREATE INDEX idx_chunks_content_tsvector ON chunks
    USING gin (content_tsvector);

Incorrect — computing tsvector at query time, slow:

CREATE TABLE chunks (
    id UUID PRIMARY KEY,
    content TEXT NOT NULL,
    embedding vector(1024)
);

-- Slow query: computes tsvector every time!
SELECT * FROM chunks
WHERE to_tsvector('english', content) @@ plainto_tsquery('search query');

Correct — pre-computed tsvector as GENERATED column:

CREATE TABLE chunks (
    id UUID PRIMARY KEY,
    content TEXT NOT NULL,
    embedding vector(1024),
    content_tsvector tsvector GENERATED ALWAYS AS (
        to_tsvector('english', content)
    ) STORED  -- Pre-computed, 5-10x faster
);

-- GIN index for fast keyword search
CREATE INDEX idx_chunks_content_tsvector ON chunks USING gin (content_tsvector);

-- HNSW index for fast vector search
CREATE INDEX idx_chunks_embedding ON chunks
    USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64);

-- Fast query using pre-computed tsvector
SELECT * FROM chunks WHERE content_tsvector @@ plainto_tsquery('search query');

Key rules:

  • Pre-compute tsvector as GENERATED column — 5-10x faster than to_tsvector() at query time
  • Use vector(1024) for Voyage-3 embeddings (match your model dimension)
  • HNSW index with m=16, ef_construction=64 for production workloads
  • Always include document_id FK for document-level operations
  • Include content_type for filtered search (code, text, table)

Decompose complex multi-topic queries with parallel retrieval and RRF fusion — HIGH

Query Decomposition + RRF Fusion

Break complex queries into concepts, retrieve separately, fuse with RRF.

LLM Decomposition:

from pydantic import BaseModel, Field

class ConceptExtraction(BaseModel):
    concepts: list[str] = Field(..., min_length=1, max_length=5)
    reasoning: str | None = None

async def decompose_query(query: str, llm: AsyncOpenAI) -> list[str]:
    response = await llm.chat.completions.create(
        model="gpt-5.2-mini",
        messages=[
            {"role": "system", "content":
                "Extract 2-4 independent concepts from this query. "
                "Each concept should be searchable on its own."},
            {"role": "user", "content": query}
        ],
        response_format={"type": "json_object"},
        temperature=0,
    )
    result = ConceptExtraction.model_validate_json(response.choices[0].message.content)
    return result.concepts

Decomposed Search with RRF:

async def decomposed_search(query: str, search_fn, llm, top_k: int = 10) -> list[dict]:
    if not is_multi_concept_heuristic(query):
        return await search_fn(query, limit=top_k)

    concepts = await decompose_query(query, llm)
    if len(concepts) <= 1:
        return await search_fn(query, limit=top_k)

    # Parallel retrieval per concept
    tasks = [search_fn(concept, limit=top_k) for concept in concepts]
    results_per_concept = await asyncio.gather(*tasks)

    return reciprocal_rank_fusion(results_per_concept, k=60)[:top_k]

Incorrect — no decomposition, single query for complex topics:

async def search(query: str, top_k: int = 10) -> list[dict]:
    # "How does authentication affect database performance?"
    # Single query misses one of the two concepts
    return await vector_search(query, limit=top_k)

Correct — decompose and fuse with RRF:

async def decomposed_search(query: str, top_k: int = 10) -> list[dict]:
    if not is_multi_concept_heuristic(query):
        return await search_fn(query, limit=top_k)  # Fast path

    # Decompose: "authentication" + "database performance"
    concepts = await decompose_query(query, llm)
    if len(concepts) <= 1:
        return await search_fn(query, limit=top_k)

    # Parallel retrieval per concept
    tasks = [search_fn(concept, limit=top_k) for concept in concepts]
    results_per_concept = await asyncio.gather(*tasks)

    # Fuse with RRF
    return reciprocal_rank_fusion(results_per_concept, k=60)[:top_k]

Key rules:

  • Max 2-4 concepts per query (more increases latency without proportional benefit)
  • Use gpt-5.2-mini for decomposition (fast, cheap, good at concept extraction)
  • RRF fusion is robust and parameter-free for combining per-concept results
  • Cache decomposition results — same query often asked repeatedly
  • Set timeout with fallback to original query if decomposition fails

Detect multi-concept queries with heuristic fast-path to avoid unnecessary LLM decomposition — MEDIUM

Multi-Concept Query Detection

Fast heuristic to determine if query decomposition is needed.

Heuristic Detection (Fast Path):

MULTI_CONCEPT_INDICATORS = [
    " vs ", " versus ", " compared to ", " or ",
    " and ", " with ", " affect ", " impact ",
    "difference between", "relationship between",
]

def is_multi_concept_heuristic(query: str) -> bool:
    """Fast check for multi-concept indicators (<1ms)."""
    query_lower = query.lower()
    return any(ind in query_lower for ind in MULTI_CONCEPT_INDICATORS)

When to Decompose:

Query TypeDecompose?
"What is X?"No
"X vs Y"Yes
"How does X affect Y?"Yes
"Best practices for X"No
"X and Y in Z"Yes
"Difference between X, Y, Z"Yes

Incorrect — always decomposing, even for simple queries:

async def search(query: str, top_k: int = 10) -> list[dict]:
    # Always calls LLM for decomposition, even for "What is React?"
    concepts = await decompose_query(query, llm)  # Wasteful!
    tasks = [search_fn(concept, limit=top_k) for concept in concepts]
    return await asyncio.gather(*tasks)

Correct — heuristic fast path before LLM decomposition:

async def search(query: str, top_k: int = 10) -> list[dict]:
    if not is_multi_concept_heuristic(query):  # Sub-millisecond check
        return await search_fn(query, limit=top_k)  # Fast path

    # Only call LLM if heuristic detects multi-concept
    concepts = await decompose_query(query, llm)
    tasks = [search_fn(concept, limit=top_k) for concept in concepts]
    return await asyncio.gather(*tasks)

def is_multi_concept_heuristic(query: str) -> bool:
    query_lower = query.lower()
    return any(ind in query_lower for ind in [" vs ", " and ", "difference between"])

Key rules:

  • Heuristic first (sub-millisecond), LLM decomposition only if heuristic triggers
  • Single-concept queries should skip decomposition entirely (no LLM cost)
  • Keywords: "vs", "compared to", "affect", "difference between" indicate multi-concept
  • This is the fast path — always check before calling the LLM decomposer

Combine query decomposition with HyDE for comprehensive vocabulary-bridged retrieval coverage — MEDIUM

Decomposition + HyDE Combo

Best of both: decompose into concepts, then generate HyDE for each concept.

Combined Pattern:

async def decomposed_hyde_search(
    query: str,
    decomposer: QueryDecomposer,
    hyde_service: HyDEService,
    vector_search: callable,
    top_k: int = 10,
) -> list[dict]:
    """Decomposition + HyDE for maximum coverage."""
    # Decompose query into concepts
    concepts = await decomposer.get_concepts(query)

    # Generate HyDE for each concept in parallel
    hyde_results = await asyncio.gather(*[
        hyde_service.generate(concept) for concept in concepts
    ])

    # Search with HyDE embeddings
    search_tasks = [
        vector_search(embedding=hr.embedding, limit=top_k)
        for hr in hyde_results
    ]
    results_per_concept = await asyncio.gather(*search_tasks)

    # Fuse results with RRF
    return reciprocal_rank_fusion(results_per_concept)[:top_k]

Incorrect — sequential decomposition and HyDE, slow:

async def search(query: str, top_k: int = 10) -> list[dict]:
    concepts = await decompose_query(query, llm)
    all_results = []
    for concept in concepts:  # Sequential! Slow
        hyde_result = await hyde_service.generate(concept)
        results = await vector_search(embedding=hyde_result.embedding, limit=top_k)
        all_results.append(results)
    return reciprocal_rank_fusion(all_results)[:top_k]

Correct — parallel HyDE generation and search:

async def decomposed_hyde_search(query: str, top_k: int = 10) -> list[dict]:
    concepts = await decomposer.get_concepts(query)

    # Generate HyDE for each concept in parallel
    hyde_results = await asyncio.gather(*[
        hyde_service.generate(concept) for concept in concepts
    ])

    # Search with HyDE embeddings in parallel
    search_tasks = [
        vector_search(embedding=hr.embedding, limit=top_k) for hr in hyde_results
    ]
    results_per_concept = await asyncio.gather(*search_tasks)

    # Fuse with RRF
    return reciprocal_rank_fusion(results_per_concept)[:top_k]

Key rules:

  • Use this combo for complex queries with both multi-concept AND vocabulary mismatch
  • Decompose first, then HyDE per concept, then parallel search, then RRF fuse
  • Total latency: ~1-2s (decomposition + HyDE generation + parallel search)
  • Cache both decomposition and HyDE results for efficiency
  • This is the most expensive retrieval path — use only when simpler methods fail

Combine base, LLM, and recency scores for robust multi-signal reranking — MEDIUM

Combined Scoring & Reranking Service

Combine multiple scoring signals with weighted average and timeout fallback.

Combined Scoring:

def combined_rerank(
    documents: list[dict], llm_scores: dict[str, float],
    alpha: float = 0.3, beta: float = 0.5, gamma: float = 0.2
) -> list[dict]:
    scored = []
    for doc in documents:
        base = doc.get("score", 0.5)
        llm = llm_scores.get(doc["id"], 0.5)
        recency = calculate_recency_score(doc.get("created_at"))
        final = (alpha * base) + (beta * llm) + (gamma * recency)
        scored.append({**doc, "score": final,
                       "score_components": {"base": base, "llm": llm, "recency": recency}})
    scored.sort(key=lambda x: x["score"], reverse=True)
    return scored

Service with Timeout Fallback:

class ReRankingService:
    def __init__(self, llm: AsyncOpenAI, timeout_seconds: float = 5.0):
        self.llm = llm
        self.timeout = timeout_seconds

    async def rerank(self, query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
        if len(documents) <= top_k:
            return documents
        try:
            async with asyncio.timeout(self.timeout):
                return await llm_rerank(query, documents, self.llm, top_k)
        except TimeoutError:
            return sorted(documents, key=lambda x: x.get("score", 0), reverse=True)[:top_k]

Incorrect — single scoring signal without timeout:

async def rerank(query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
    # Only uses LLM score, no timeout, no fallback
    llm_scores = await llm_rerank(query, documents, llm)  # May hang!
    return sorted(documents, key=lambda x: llm_scores.get(x["id"], 0), reverse=True)[:top_k]

Correct — combined scoring with timeout fallback:

async def rerank(query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
    if len(documents) <= top_k:
        return documents  # Skip if no benefit

    try:
        async with asyncio.timeout(5.0):  # 5s timeout
            llm_scores = await llm_rerank(query, documents, llm)

            # Combined scoring: 30% base + 50% LLM + 20% recency
            scored = []
            for doc in documents:
                base = doc.get("score", 0.5)
                llm = llm_scores.get(doc["id"], 0.5)
                recency = calculate_recency_score(doc.get("created_at"))
                final = 0.3 * base + 0.5 * llm + 0.2 * recency
                scored.append({**doc, "score": final})

            return sorted(scored, key=lambda x: x["score"], reverse=True)[:top_k]
    except TimeoutError:
        # Fallback to base ranking
        return sorted(documents, key=lambda x: x.get("score", 0), reverse=True)[:top_k]

Key rules:

  • Default weights: 30% base retrieval + 50% LLM score + 20% recency
  • Always set timeout (5s) with fallback to base ranking
  • Skip reranking if document count <= top_k (no benefit)
  • Cache scores: same query+doc pair = same score
  • Store score components for debugging and tuning

Rerank results with cross-encoder models for accurate query-document relevance scoring — HIGH

Cross-Encoder Reranking

Re-score retrieved documents with cross-encoder for higher precision.

Cross-Encoder Pattern:

from sentence_transformers import CrossEncoder

class CrossEncoderReranker:
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        self.model = CrossEncoder(model_name)

    def rerank(self, query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
        pairs = [(query, doc["content"]) for doc in documents]
        scores = self.model.predict(pairs)
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        return [{**doc, "score": float(score)} for doc, score in scored_docs[:top_k]]

Model Selection:

ModelLatencyCostQuality
cross-encoder/ms-marco-MiniLM-L-6-v2~50msFreeGood
BAAI/bge-reranker-large~100msFreeBetter
cohere rerank-english-v3.0~200ms$1/1KBest

Incorrect — retrieving few, no reranking:

async def search(query: str) -> list[dict]:
    # Retrieve only 10, no reranking - misses good results
    return await vector_search(query, limit=10)

Correct — retrieve many, rerank to few:

async def search_with_reranking(query: str) -> list[dict]:
    # Retrieve many candidates
    candidates = await vector_search(query, limit=50)

    # Rerank with cross-encoder
    pairs = [(query, doc["content"][:400]) for doc in candidates]
    scores = cross_encoder.predict(pairs)
    scored_docs = list(zip(candidates, scores))
    scored_docs.sort(key=lambda x: x[1], reverse=True)

    # Return top 10 after reranking
    return [{**doc, "score": float(score)} for doc, score in scored_docs[:10]]

Key rules:

  • Retrieve many (50-100), rerank to few (10) — "retrieve more, rerank less"
  • Cross-encoder processes query+doc pair together (slow but accurate)
  • Default model: ms-marco-MiniLM-L-6-v2 (good quality, free, ~50ms)
  • Truncate document content to 200-400 chars for reranking efficiency

Use LLM reranking for domain-adaptive scoring without deploying a dedicated model — MEDIUM

LLM Reranking

Score document relevance using LLM in a single batch call.

LLM Batch Reranking:

async def llm_rerank(query: str, documents: list[dict], llm: AsyncOpenAI, top_k: int = 10) -> list[dict]:
    docs_text = "\n\n".join([f"[Doc {i+1}]\n{doc['content'][:300]}..." for i, doc in enumerate(documents)])

    response = await llm.chat.completions.create(
        model="gpt-5.2-mini",
        messages=[
            {"role": "system", "content": "Rate each document's relevance to the query (0.0-1.0).\nOutput one score per line."},
            {"role": "user", "content": f"Query: {query}\n\nDocuments:\n{docs_text}"}
        ],
        temperature=0,
    )

    scores = parse_scores(response.choices[0].message.content, len(documents))
    scored_docs = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
    return [{**doc, "score": score} for doc, score in scored_docs[:top_k]]

def parse_scores(response: str, expected_count: int) -> list[float]:
    scores = []
    for line in response.strip().split("\n"):
        try:
            scores.append(max(0.0, min(1.0, float(line.strip()))))
        except ValueError:
            scores.append(0.5)
    while len(scores) < expected_count:
        scores.append(0.5)
    return scores[:expected_count]

Cohere Rerank API:

import cohere

class CohereReranker:
    def __init__(self, api_key: str):
        self.client = cohere.Client(api_key)

    def rerank(self, query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
        results = self.client.rerank(
            model="rerank-english-v3.0", query=query,
            documents=[doc["content"] for doc in documents], top_n=top_k
        )
        return [{**documents[r.index], "score": r.relevance_score} for r in results.results]

Incorrect — one LLM call per document, extremely slow:

async def llm_rerank(query: str, documents: list[dict]) -> list[dict]:
    scores = []
    for doc in documents:  # Sequential LLM calls!
        response = await llm.chat.completions.create(
            model="gpt-5.2-mini",
            messages=[{"role": "user", "content": f"Rate relevance (0-1):\nQuery: {query}\nDoc: {doc['content']}"}]
        )
        scores.append(float(response.choices[0].message.content))
    return sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)

Correct — batch all docs in one LLM call:

async def llm_rerank(query: str, documents: list[dict], top_k: int = 10) -> list[dict]:
    # Batch all docs in ONE LLM call
    docs_text = "\n\n".join([
        f"[Doc {i+1}]\n{doc['content'][:300]}..."  # Truncate
        for i, doc in enumerate(documents)
    ])

    response = await llm.chat.completions.create(
        model="gpt-5.2-mini",
        messages=[
            {"role": "system", "content": "Rate each document's relevance (0.0-1.0). One score per line."},
            {"role": "user", "content": f"Query: {query}\n\nDocuments:\n{docs_text}"}
        ],
        temperature=0
    )

    scores = parse_scores(response.choices[0].message.content, len(documents))
    scored_docs = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
    return [{**doc, "score": score} for doc, score in scored_docs[:top_k]]

Key rules:

  • Batch all docs in one LLM call (reduces latency vs per-doc calls)
  • Truncate to 200-400 chars per doc for LLM reranking
  • Parse scores defensively (default 0.5 on parse error)
  • LLM reranking at ~500ms, Cohere at ~200ms
  • Set timeout with fallback to base ranking

Checklists (2)

Rag Quality

RAG Quality Checklist

Quality assurance for agentic RAG implementations.

Retrieval Quality

  • Semantic search configured with appropriate embedding model
  • Chunk size optimized (512-1024 tokens typical)
  • Chunk overlap configured (10-20% of chunk size)
  • Metadata filtering implemented for scoping
  • Top-k tuned for precision/recall balance

Document Grading

  • Relevance grading implemented (binary or scored)
  • Grading prompt tested with diverse queries
  • Threshold tuned for false positive/negative balance
  • Fallback behavior defined for low-relevance results

Query Transformation

  • Query rewriting enabled for failed retrievals
  • Maximum retry count configured (2-3 typical)
  • Query decomposition for multi-concept queries
  • HyDE integration for vocabulary mismatch

Web Fallback (CRAG)

  • Web search integration configured
  • Rate limiting for web search API
  • Result filtering and quality check
  • Source attribution for web results

Self-RAG Patterns

  • Adaptive retrieval decision logic implemented
  • Reflection tokens for quality assessment
  • Skip retrieval path for simple queries
  • Confidence thresholds calibrated

Generation Quality

  • Context formatting optimized
  • Citation/source attribution enforced
  • Hallucination detection enabled
  • Output length appropriate

Error Handling

  • Graceful degradation on retrieval failure
  • Fallback responses configured
  • Retry logic with exponential backoff
  • Error logging and alerting

Performance

  • Retrieval latency acceptable (<500ms)
  • Caching for repeated queries
  • Batch embedding for efficiency
  • Async execution where possible

Monitoring

  • Retrieval metrics tracked (precision, recall)
  • Query success/failure rates logged
  • Web fallback frequency monitored
  • User feedback integration

Search Implementation Checklist

PGVector Hybrid Search Implementation Checklist

Use this checklist when implementing semantic + keyword search with PGVector.

Pre-Implementation

Index Strategy Planning

  • Choose vector algorithm - HNSW (recommended) or IVFFlat
  • Select embedding model - OpenAI (1536), Voyage AI (1024), etc.
  • Determine dimensions - Match model output dimensions
  • Plan distance metric - Cosine (most common) or L2/Inner Product
  • Set HNSW parameters - m=16, ef_construction=64 (good defaults)

Embedding Model Selection

  • Test embedding quality - Validate on sample queries
  • Measure embedding latency - API call time
  • Budget embedding costs - Track usage for bulk ingestion
  • Plan batch embedding - Batch API calls for efficiency
  • Cache embeddings - Store in database, don't re-compute

RRF Configuration

  • Set fetch multiplier - 3x (retrieve 30 for top-10 results)
  • Choose RRF constant (k) - 60 (standard value)
  • Plan score normalization - Use rank, not raw scores
  • Define boosting factors - Section title (1.5x), path (1.15x), code (1.2x)
  • Set similarity threshold - Minimum cosine similarity (e.g., 0.75)

Schema Design

  • Define chunks table - id, content, embedding, metadata
  • Add tsvector column - Pre-computed for keyword search
  • Plan metadata fields - section_title, section_path, content_type
  • Add timestamps - created_at, updated_at
  • Foreign keys - Link to documents/artifacts

Implementation

Database Schema

-- 1. Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- 2. Create chunks table
CREATE TABLE chunks (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
    content TEXT NOT NULL,

    -- Vector embedding (match model dimensions)
    embedding vector(1024),  -- Voyage AI 1024 dims

    -- Pre-computed tsvector for full-text search
    content_tsvector tsvector GENERATED ALWAYS AS (
        to_tsvector('english', content)
    ) STORED,

    -- Metadata
    section_title TEXT,
    section_path TEXT,
    chunk_index INT,
    content_type TEXT,  -- 'code_block', 'paragraph', 'list'

    -- Timestamps
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- 3. Create indexes
-- Vector search (HNSW for speed)
CREATE INDEX idx_chunks_embedding ON chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- Full-text search (GIN for tsvector)
CREATE INDEX idx_chunks_content_tsvector ON chunks
    USING gin (content_tsvector);

-- Metadata indexes
CREATE INDEX idx_chunks_document_id ON chunks(document_id);
CREATE INDEX idx_chunks_content_type ON chunks(content_type);
  • pgvector extension enabled
  • Chunks table created
  • Embedding column dimensions match model
  • tsvector column generated and stored
  • HNSW index created for vectors
  • GIN index created for tsvector
  • Metadata indexes created

Vector Search Query

from sqlalchemy import select, func
from pgvector.sqlalchemy import Vector

async def vector_search(
    query_embedding: list[float],
    top_k: int = 10,
    content_type_filter: list[str] | None = None
) -> list[Chunk]:
    """Perform vector similarity search."""

    # Fetch multiplier for better RRF coverage
    FETCH_MULTIPLIER = 3
    fetch_limit = top_k * FETCH_MULTIPLIER

    # Build query
    query = (
        select(
            Chunk.id,
            (Chunk.embedding.cosine_distance(query_embedding)).label("distance"),
            func.row_number().over(
                order_by=Chunk.embedding.cosine_distance(query_embedding)
            ).label("rank")
        )
        .where(Chunk.embedding.isnot(None))
    )

    # Apply content type filter
    if content_type_filter:
        query = query.where(Chunk.content_type.in_(content_type_filter))

    query = query.limit(fetch_limit).subquery("vector_results")

    result = await session.execute(query)
    return result.all()
  • Query embedding passed as parameter
  • Cosine distance calculated
  • Row number (rank) computed
  • Fetch multiplier applied (3x)
  • Content type filter optional
  • Returns top-k * 3 results

Keyword Search Query

async def keyword_search(
    query: str,
    top_k: int = 10,
    content_type_filter: list[str] | None = None
) -> list[Chunk]:
    """Perform BM25 keyword search."""

    FETCH_MULTIPLIER = 3
    fetch_limit = top_k * FETCH_MULTIPLIER

    # Generate tsquery from plain text
    ts_query = func.plainto_tsquery("english", query)

    # Build query
    query = (
        select(
            Chunk.id,
            func.ts_rank_cd(Chunk.content_tsvector, ts_query).label("score"),
            func.row_number().over(
                order_by=func.ts_rank_cd(Chunk.content_tsvector, ts_query).desc()
            ).label("rank")
        )
        .where(Chunk.content_tsvector.op("@@")(ts_query))
    )

    # Apply content type filter
    if content_type_filter:
        query = query.where(Chunk.content_type.in_(content_type_filter))

    query = query.limit(fetch_limit).subquery("keyword_results")

    result = await session.execute(query)
    return result.all()
  • Uses pre-indexed content_tsvector (not to_tsvector on query)
  • plainto_tsquery handles special characters
  • ts_rank_cd for BM25-like scoring
  • Row number (rank) computed
  • Fetch multiplier applied
  • Only matches where tsvector matches query

Reciprocal Rank Fusion (RRF)

async def hybrid_search(
    query: str,
    query_embedding: list[float],
    top_k: int = 10,
    content_type_filter: list[str] | None = None
) -> list[Chunk]:
    """Combine vector + keyword search with RRF."""

    FETCH_MULTIPLIER = 3
    fetch_limit = top_k * FETCH_MULTIPLIER
    K = 60  # RRF smoothing constant

    # ===== 1. VECTOR SEARCH =====
    vector_subquery = (
        select(
            Chunk.id,
            (Chunk.embedding.cosine_distance(query_embedding)).label("vector_distance"),
            func.row_number().over(
                order_by=Chunk.embedding.cosine_distance(query_embedding)
            ).label("vector_rank")
        )
        .where(Chunk.embedding.isnot(None))
    )

    if content_type_filter:
        vector_subquery = vector_subquery.where(
            Chunk.content_type.in_(content_type_filter)
        )

    vector_subquery = vector_subquery.limit(fetch_limit).subquery("vector_results")

    # ===== 2. KEYWORD SEARCH =====
    ts_query = func.plainto_tsquery("english", query)

    keyword_subquery = (
        select(
            Chunk.id,
            func.ts_rank_cd(Chunk.content_tsvector, ts_query).label("bm25_score"),
            func.row_number().over(
                order_by=func.ts_rank_cd(Chunk.content_tsvector, ts_query).desc()
            ).label("keyword_rank")
        )
        .where(Chunk.content_tsvector.op("@@")(ts_query))
    )

    if content_type_filter:
        keyword_subquery = keyword_subquery.where(
            Chunk.content_type.in_(content_type_filter)
        )

    keyword_subquery = keyword_subquery.limit(fetch_limit).subquery("keyword_results")

    # ===== 3. RECIPROCAL RANK FUSION =====
    rrf_query = (
        select(
            func.coalesce(
                vector_subquery.c.id,
                keyword_subquery.c.id
            ).label("chunk_id"),
            (
                func.coalesce(1.0 / (K + vector_subquery.c.vector_rank), 0.0) +
                func.coalesce(1.0 / (K + keyword_subquery.c.keyword_rank), 0.0)
            ).label("rrf_score"),
            vector_subquery.c.vector_distance,
            keyword_subquery.c.bm25_score
        )
        .select_from(
            vector_subquery.outerjoin(
                keyword_subquery,
                vector_subquery.c.id == keyword_subquery.c.id,
                full=True  # FULL OUTER JOIN
            )
        )
        .order_by(literal("rrf_score").desc())
        .limit(top_k)
    ).subquery("rrf_results")

    # ===== 4. FETCH FULL CHUNKS =====
    final_query = (
        select(Chunk, rrf_query.c.rrf_score)
        .join(rrf_query, Chunk.id == rrf_query.c.chunk_id)
        .order_by(rrf_query.c.rrf_score.desc())
    )

    result = await session.execute(final_query)
    chunks = result.all()

    return chunks
  • Both vector and keyword searches executed
  • Full outer join combines results
  • RRF score = 1/(k+rank_vector) + 1/(k+rank_keyword)
  • Results sorted by RRF score descending
  • Top-k returned
  • Full chunk objects fetched

Metadata Boosting

def apply_metadata_boosting(
    chunks: list[tuple[Chunk, float]],
    query: str
) -> list[tuple[Chunk, float]]:
    """Boost RRF scores based on metadata relevance."""

    boosted_chunks = []

    for chunk, rrf_score in chunks:
        boost_factor = 1.0

        # Boost section titles (1.5x)
        if chunk.section_title and query_matches_section_title(chunk.section_title, query):
            boost_factor *= 1.5

        # Boost document path (1.15x)
        if chunk.section_path and query_matches_path(chunk.section_path, query):
            boost_factor *= 1.15

        # Boost code blocks for technical queries (1.2x)
        if is_technical_query(query) and chunk.content_type == "code_block":
            boost_factor *= 1.2

        boosted_chunks.append((chunk, rrf_score * boost_factor))

    # Re-sort by boosted score
    boosted_chunks.sort(key=lambda x: x[1], reverse=True)

    return boosted_chunks


def query_matches_section_title(section_title: str, query: str) -> bool:
    """Check if query keywords appear in section title."""
    query_terms = set(query.lower().split())
    title_terms = set(section_title.lower().split())
    return len(query_terms & title_terms) > 0


def is_technical_query(query: str) -> bool:
    """Detect technical queries (code-focused)."""
    technical_keywords = {
        "function", "class", "method", "code", "implement",
        "algorithm", "syntax", "example", "snippet"
    }
    query_terms = set(query.lower().split())
    return len(query_terms & technical_keywords) > 0
  • Boosting applied after RRF
  • Section title matching implemented
  • Document path matching implemented
  • Technical query detection implemented
  • Results re-sorted after boosting

Verification

Golden Dataset Testing

import pytest

@pytest.mark.asyncio
async def test_hybrid_search_golden_dataset():
    """Test hybrid search against golden queries."""

    golden_queries = load_golden_queries()  # Load test cases

    results = []
    for query_data in golden_queries:
        query = query_data["query"]
        expected_chunks = query_data["expected_chunk_ids"]

        # Generate embedding
        embedding = await embed_text(query)

        # Perform search
        retrieved = await hybrid_search(query, embedding, top_k=10)
        retrieved_ids = {c.id for c in retrieved}

        # Check if expected chunks are in top 10
        found = len(expected_chunks & retrieved_ids)
        results.append({
            "query": query,
            "expected": len(expected_chunks),
            "found": found,
            "pass": found == len(expected_chunks)
        })

    # Calculate metrics
    pass_rate = sum(r["pass"] for r in results) / len(results)
    mrr = calculate_mrr(results)

    print(f"Pass Rate: {pass_rate:.1%}")
    print(f"MRR: {mrr:.3f}")

    assert pass_rate >= 0.90, f"Pass rate {pass_rate:.1%} below 90% threshold"


def calculate_mrr(results: list[dict]) -> float:
    """Calculate Mean Reciprocal Rank."""
    reciprocal_ranks = []

    for result in results:
        if result["found"] > 0:
            # Assume first expected chunk found at rank 1 (simplified)
            reciprocal_ranks.append(1.0)
        else:
            reciprocal_ranks.append(0.0)

    return sum(reciprocal_ranks) / len(reciprocal_ranks)
  • Golden dataset loaded - 98+ test queries
  • Pass rate measured - Target: 90%+
  • MRR calculated - Mean Reciprocal Rank
  • Hard queries tested - Technical, ambiguous queries
  • Failures analyzed - Inspect failing queries

Retrieval Quality Metrics

@pytest.mark.asyncio
async def test_retrieval_quality_metrics():
    """Measure retrieval quality metrics."""

    test_cases = load_golden_queries()

    precision_at_k = []
    recall_at_k = []

    for case in test_cases:
        query = case["query"]
        relevant_chunks = set(case["expected_chunk_ids"])

        # Perform search
        embedding = await embed_text(query)
        retrieved = await hybrid_search(query, embedding, top_k=10)
        retrieved_ids = {c.id for c in retrieved}

        # Precision@10: Relevant chunks in top-10 / 10
        precision = len(relevant_chunks & retrieved_ids) / 10
        precision_at_k.append(precision)

        # Recall@10: Relevant chunks in top-10 / Total relevant
        recall = len(relevant_chunks & retrieved_ids) / len(relevant_chunks)
        recall_at_k.append(recall)

    avg_precision = sum(precision_at_k) / len(precision_at_k)
    avg_recall = sum(recall_at_k) / len(recall_at_k)

    print(f"Precision@10: {avg_precision:.3f}")
    print(f"Recall@10: {avg_recall:.3f}")

    assert avg_precision >= 0.70, "Precision@10 below 70%"
    assert avg_recall >= 0.85, "Recall@10 below 85%"
  • Precision@10 - Target: 70%+ (relevant in top-10)
  • Recall@10 - Target: 85%+ (found most relevant)
  • MRR - Target: 0.65+ (relevant chunks ranked high)
  • nDCG - Normalized Discounted Cumulative Gain (optional)

Performance Benchmarks

@pytest.mark.asyncio
async def test_search_latency():
    """Measure search latency."""

    import time

    query = "How to implement binary search in Python?"
    embedding = await embed_text(query)

    # Measure vector search latency
    start = time.perf_counter()
    vector_results = await vector_search(embedding, top_k=30)
    vector_latency = (time.perf_counter() - start) * 1000

    # Measure keyword search latency
    start = time.perf_counter()
    keyword_results = await keyword_search(query, top_k=30)
    keyword_latency = (time.perf_counter() - start) * 1000

    # Measure hybrid search latency
    start = time.perf_counter()
    hybrid_results = await hybrid_search(query, embedding, top_k=10)
    hybrid_latency = (time.perf_counter() - start) * 1000

    print(f"Vector search: {vector_latency:.2f}ms")
    print(f"Keyword search: {keyword_latency:.2f}ms")
    print(f"Hybrid search: {hybrid_latency:.2f}ms")

    # Latency targets
    assert vector_latency < 100, f"Vector search latency {vector_latency:.2f}ms > 100ms"
    assert keyword_latency < 50, f"Keyword search latency {keyword_latency:.2f}ms > 50ms"
    assert hybrid_latency < 150, f"Hybrid search latency {hybrid_latency:.2f}ms > 150ms"
  • Vector search - < 100ms (HNSW index)
  • Keyword search - < 50ms (GIN index)
  • Hybrid search - < 150ms (combined)
  • P95 latency - 95th percentile acceptable
  • Index scans - Verify indexes used (EXPLAIN ANALYZE)

Index Performance Validation

-- Check if indexes are being used
EXPLAIN ANALYZE
SELECT id, embedding <=> '[0.1, 0.2, ..., 0.9]' AS distance
FROM chunks
ORDER BY distance
LIMIT 30;

-- Should show "Index Scan using idx_chunks_embedding"
-- NOT "Seq Scan" (sequential scan = no index!)
  • Vector index used - EXPLAIN shows "Index Scan using idx_chunks_embedding"
  • Keyword index used - EXPLAIN shows "Bitmap Index Scan using idx_chunks_content_tsvector"
  • No sequential scans - Avoid full table scans
  • Index size reasonable - Check pg_indexes view
  • Vacuum/Analyze run - Update statistics for query planner

Post-Implementation

Production Monitoring

  • Search latency dashboard - P50, P95, P99 latency
  • Retrieval quality tracking - Pass rate, MRR over time
  • Index bloat - Monitor index size growth
  • Query patterns - Log common queries, identify gaps
  • Error rate - Track search failures

Optimization Opportunities

  • Tune HNSW parameters - Increase m or ef_construction for accuracy
  • Increase fetch multiplier - 3x → 5x for better RRF coverage
  • Add more boosting - Domain-specific metadata boosts
  • Multi-query retrieval - Generate query variations
  • Hybrid query rewriting - Expand acronyms, synonyms

Index Maintenance

  • Run VACUUM ANALYZE - Weekly or after bulk inserts
  • Rebuild indexes - If bloated (pg_repack)
  • Monitor index usage - Drop unused indexes
  • Update statistics - Ensure query planner has fresh stats
  • Test on production-scale data - Validate performance at scale

Troubleshooting

IssueCheck
Slow vector searchHNSW index exists? Dimensions match? Increase m/ef_construction?
Slow keyword searchGIN index on tsvector? Using content_tsvector, not to_tsvector()?
Low pass rateIncrease fetch multiplier, add boosting, check embeddings quality
No keyword matchesCheck tsvector generation, query language (English?), special chars
Wrong resultsValidate RRF logic, check boosting factors, inspect rankings
Index not usedRun ANALYZE, check query plan (EXPLAIN), verify index conditions

OrchestKit Integration

# Example: Search for content in OrchestKit
from app.shared.services.search.search_service import SearchService

search_service = SearchService()
results = await search_service.search(
    query="How to implement hybrid search?",
    top_k=10,
    filters={"content_type": ["code_block", "paragraph"]}
)

# Results include chunk content, metadata, and RRF score
for chunk, score in results:
    print(f"Score: {score:.4f} | {chunk.section_title}")
    print(chunk.content[:200])
  • Search service integrated with API endpoints
  • Results exposed via /api/v1/search endpoint
  • Filters applied for content_type, document_id
  • Results paginated (offset/limit)
  • Searchable in frontend UI

References

  • PGVector Docs: https://github.com/pgvector/pgvector
  • OrchestKit Implementation: backend/app/db/repositories/chunk_repository.py
  • Search Service: backend/app/shared/services/search/search_service.py
  • Constants: backend/app/core/constants.py
  • Related Skill: the database-schema-designer skill
Edit on GitHub

Last updated on

On this page

RAG RetrievalQuick ReferenceCore RAGEmbeddingsContextual RetrievalHyDEAgentic RAGMultimodal RAGQuery DecompositionRerankingPGVectorQuick Start ExampleKey DecisionsCommon MistakesEvaluationsRelated SkillsCapability Detailsretrieval-patternshybrid-searchembeddingscontextual-retrievalhydeagentic-ragmultimodal-ragquery-decompositionrerankingpgvector-searchRules (30)Route queries to the best retrieval strategy using adaptive selection per query type — MEDIUMAdaptive RetrievalApply corrective RAG with quality assurance and web fallback for self-correction — HIGHCorrective RAG (CRAG)Build knowledge graph RAG for multi-hop reasoning over entity-rich domains — MEDIUMKnowledge Graph RAG (GraphRAG)Grade document relevance with Self-RAG to prevent context contamination from irrelevant results — HIGHSelf-RAG — Document GradingCombine contextual embeddings with BM25 hybrid search for maximum retrieval coverage — HIGHContextual Retrieval — Hybrid BM25+VectorBuild complete contextual retrieval pipeline integrating context generation, embedding, and hybrid search — MEDIUMContextual Retrieval — Complete PipelinePrepend situational context to chunks before embedding to reduce retrieval failures — HIGHContextual Retrieval — Context PrependingConstruct basic RAG pipeline with proper context assembly and citation tracking — CRITICALBasic RAG PatternManage context window size with sufficiency checks to balance relevance and cost — HIGHContext Window ManagementCombine semantic and keyword search with reciprocal rank fusion for best coverage — HIGHHybrid Search (Semantic + Keyword)Compose retrieval pipeline stages in correct order to avoid redundant processing — MEDIUMPipeline CompositionImplement production embedding pipelines with batching, caching, and cost optimization — MEDIUMAdvanced Embedding PatternsChoose chunking strategies carefully since chunk boundaries determine retrieval quality — HIGHChunking StrategiesSelect embedding models and dimensions correctly to ensure index compatibility and quality — HIGHEmbedding Models & APIConfigure HyDE fallback strategy to avoid latency degradation from slow generation — MEDIUMHyDE Fallback StrategyBridge query-document vocabulary mismatch with hypothetical document embeddings via HyDE — HIGHHyDE GenerationGenerate separate HyDE documents per concept for multi-topic vocabulary bridging — MEDIUMPer-Concept HyDEChunk multimodal documents to preserve relationships between text, images, and tables — MEDIUMMultimodal Document ChunkingUse multimodal embedding models for cross-modal search across text and images — HIGHMultimodal EmbeddingsBuild unified multimodal RAG pipeline that merges cross-modal results with deduplication — MEDIUMMultimodal RAG PipelineImplement PGVector hybrid search with FULL OUTER JOIN and RRF fusion ranking — HIGHPGVector Hybrid Search (SQL)Choose correct PGVector index type to avoid 17x slower queries in production — HIGHPGVector Index StrategiesFilter PGVector results by metadata and boost scores for improved retrieval relevance — MEDIUMPGVector Metadata Filtering & PatternsDesign PGVector schema with pre-computed tsvector columns and proper index configuration — HIGHPGVector Database SchemaDecompose complex multi-topic queries with parallel retrieval and RRF fusion — HIGHQuery Decomposition + RRF FusionDetect multi-concept queries with heuristic fast-path to avoid unnecessary LLM decomposition — MEDIUMMulti-Concept Query DetectionCombine query decomposition with HyDE for comprehensive vocabulary-bridged retrieval coverage — MEDIUMDecomposition + HyDE ComboCombine base, LLM, and recency scores for robust multi-signal reranking — MEDIUMCombined Scoring & Reranking ServiceRerank results with cross-encoder models for accurate query-document relevance scoring — HIGHCross-Encoder RerankingUse LLM reranking for domain-adaptive scoring without deploying a dedicated model — MEDIUMLLM RerankingChecklists (2)Rag QualityRAG Quality ChecklistRetrieval QualityDocument GradingQuery TransformationWeb Fallback (CRAG)Self-RAG PatternsGeneration QualityError HandlingPerformanceMonitoringSearch Implementation ChecklistPGVector Hybrid Search Implementation ChecklistPre-ImplementationIndex Strategy PlanningEmbedding Model SelectionRRF ConfigurationSchema DesignImplementationDatabase SchemaVector Search QueryKeyword Search QueryReciprocal Rank Fusion (RRF)Metadata BoostingVerificationGolden Dataset TestingRetrieval Quality MetricsPerformance BenchmarksIndex Performance ValidationPost-ImplementationProduction MonitoringOptimization OpportunitiesIndex MaintenanceTroubleshootingOrchestKit IntegrationReferences