Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Golden Dataset

Golden dataset lifecycle patterns for curation, versioning, quality validation, and CI integration. Use when building evaluation datasets, managing dataset versions, validating quality scores, or integrating golden tests into pipelines.

Reference medium

Primary Agent: data-pipeline-engineer

Golden Dataset

Comprehensive patterns for building, managing, and validating golden datasets for AI/ML evaluation. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Curation3HIGHContent collection, annotation pipelines, diversity analysis
Management3HIGHVersioning, backup/restore, CI/CD automation
Validation3CRITICALQuality scoring, drift detection, regression testing
Add Workflow1HIGH9-phase curation, quality scoring, bias detection, silver-to-gold

Total: 10 rules across 4 categories

Curation

Content collection, multi-agent annotation, and diversity analysis for golden datasets.

RuleFileKey Pattern
Collectionrules/curation-collection.mdContent type classification, quality thresholds, duplicate prevention
Annotationrules/curation-annotation.mdMulti-agent pipeline, consensus aggregation, Langfuse tracing
Diversityrules/curation-diversity.mdDifficulty stratification, domain coverage, balance guidelines

Management

Versioning, storage, and CI/CD automation for golden datasets.

RuleFileKey Pattern
Versioningrules/management-versioning.mdJSON backup format, embedding regeneration, disaster recovery
Storagerules/management-storage.mdBackup strategies, URL contract, data integrity checks
CI Integrationrules/management-ci.mdGitHub Actions automation, pre-deployment validation, weekly backups

Validation

Quality scoring, drift detection, and regression testing for golden datasets.

RuleFileKey Pattern
Qualityrules/validation-quality.mdSchema validation, content quality, referential integrity
Driftrules/validation-drift.mdDuplicate detection, semantic similarity, coverage gap analysis
Regressionrules/validation-regression.mdDifficulty distribution, pre-commit hooks, full dataset validation

Add Workflow

Structured workflow for adding new documents to the golden dataset.

RuleFileKey Pattern
Add Documentrules/curation-add-workflow.md9-phase curation, parallel quality analysis, bias detection

Quick Start Example

from app.shared.services.embeddings import embed_text

async def validate_before_add(document: dict, source_url_map: dict) -> dict:
    """Pre-addition validation for golden dataset entries."""
    errors = []

    # 1. URL contract check
    if "placeholder" in document.get("source_url", ""):
        errors.append("URL must be canonical, not a placeholder")

    # 2. Content quality
    if len(document.get("title", "")) < 10:
        errors.append("Title too short (min 10 chars)")

    # 3. Tag requirements
    if len(document.get("tags", [])) < 2:
        errors.append("At least 2 domain tags required")

    return {"valid": len(errors) == 0, "errors": errors}

Key Decisions

DecisionRecommendation
Backup formatJSON (version controlled, portable)
Embedding storageExclude from backup (regenerate on restore)
Quality threshold>= 0.70 quality score for inclusion
Confidence threshold>= 0.65 for auto-include
Duplicate threshold>= 0.90 similarity blocks, >= 0.85 warns
Min tags per entry2 domain tags
Min test queries3 per document
Difficulty balanceTrivial 3, Easy 3, Medium 5, Hard 3 minimum
CI frequencyWeekly automated backup (Sunday 2am UTC)

Common Mistakes

  1. Using placeholder URLs instead of canonical source URLs
  2. Skipping embedding regeneration after restore
  3. Not validating referential integrity between documents and queries
  4. Over-indexing on articles (neglecting tutorials, research papers)
  5. Missing difficulty distribution balance in test queries
  6. Not running verification after backup/restore operations
  7. Testing restore procedures in production instead of staging
  8. Committing SQL dumps instead of JSON (not version-control friendly)

Evaluations

See test-cases.json for 9 test cases across all categories.

  • ork:rag-retrieval - Retrieval evaluation using golden dataset
  • langfuse-observability - Tracing patterns for curation workflows
  • ork:testing-patterns - General testing patterns and strategies
  • ai-native-development - Embedding generation for restore

Capability Details

curation

Keywords: golden dataset, curation, content collection, annotation, quality criteria

Solves:

  • Classify document content types for golden dataset
  • Run multi-agent quality analysis pipelines
  • Generate test queries for new documents

management

Keywords: golden dataset, backup, restore, versioning, disaster recovery

Solves:

  • Backup and restore golden datasets with JSON
  • Regenerate embeddings after restore
  • Automate backups with CI/CD

validation

Keywords: golden dataset, validation, schema, duplicate detection, quality metrics

Solves:

  • Validate entries against document schema
  • Detect duplicate or near-duplicate entries
  • Analyze dataset coverage and distribution gaps

Rules (10)

Follow the full curation pipeline when adding entries to the golden dataset — HIGH

Add to Golden Dataset Workflow

Multi-agent curation pipeline with quality scoring, bias detection, and silver-to-gold promotion.

Incorrect — adding documents without validation:

# No quality check, no bias detection, no dedup
dataset.append({"url": url, "content": content})

Correct — 9-phase curation workflow:

Phase 1-2: Input and extraction

# Detect content type and extract structure
content_type = classify(url)  # article, tutorial, documentation, research_paper
structured = extract(url)      # title, sections, code blocks, key terms, metadata

Phase 3: Parallel quality analysis (4 agents)

# Launch ALL quality agents in parallel
# Agent 1: Accuracy, coherence, depth, relevance scores
# Agent 2: Keyword directness, difficulty level
# Agent 3: Domain tags, skill level classification
# Agent 4: Test query generation (direct, paraphrased, multi-hop)

Phase 4: Quality scoring formula

quality_score = (
    accuracy * 0.25 +
    coherence * 0.20 +
    depth * 0.25 +
    relevance * 0.30
)

Phase 5-6: Bias detection and diversity check

Bias ScoreAction
0-2Proceed normally
3-5Add disclaimer
6-8Require user review
9-10Recommend against inclusion

Phase 7-8: Validation and classification

StatusQuality ScoreAction
GOLD>= 0.75Add to main dataset
SILVER0.55-0.74Add to silver tier, track
REJECT< 0.55Do not add

Promotion criteria: 7+ days in silver, quality >= 0.75, no negative feedback.

Phase 9: Version tracking

{
  "version": "1.2.3",
  "change_type": "ADD",
  "document_id": "doc-123",
  "quality_score": 0.82,
  "rollback_available": true
}
Update TypeVersion Bump
Add/Update documentPatch (0.0.X)
Remove documentMinor (0.X.0)
Schema changeMajor (X.0.0)

Key rules:

  • Never skip the quality analysis phase — it prevents low-quality entries from degrading evaluations
  • Run bias detection on every addition — dataset contamination is hard to reverse
  • Use the two-tier system (silver/gold) to let borderline documents prove themselves
  • Always validate URL is canonical (not a placeholder) and check for >80% duplicate similarity
  • Minimum requirements: 2+ domain tags, 3+ test queries per document

Use multi-agent annotation for consistent and thorough curation quality decisions — HIGH

Multi-Agent Annotation

Multi-agent analysis pipeline with consensus aggregation for golden dataset curation.

Pipeline Architecture:

INPUT: URL/Content
        |
        v
+------------------+
|   FETCH AGENT    |  WebFetch or file read
|   (sequential)   |  Extract structure, detect type
+--------+---------+
         |
         v
+-----------------------------------------------+
|  PARALLEL ANALYSIS AGENTS                      |
|  +----------+ +----------+ +--------+ +------+ |
|  | Quality  | |Difficulty| | Domain | |Query | |
|  |Evaluator | |Classifier| | Tagger | |Gen   | |
|  +----+-----+ +----+-----+ +---+----+ +--+---+ |
+-------+------------+-----------+---------+-----+
                     |
                     v
+-----------------------------------------------+
|  CONSENSUS AGGREGATOR                          |
|  - Weighted quality score                      |
|  - Confidence level (agent agreement)          |
|  - Final recommendation: include/review/exclude|
+--------+--------------------------------------+
         |
         v
+------------------+
|  USER APPROVAL   |  Show scores, get confirmation
+------------------+

Quality Evaluator Agent:

Task(
    subagent_type="code-quality-reviewer",
    prompt="""GOLDEN DATASET QUALITY EVALUATION

    Evaluate this content for golden dataset inclusion:

    Content: {content_preview}
    Source: {source_url}
    Type: {content_type}

    Score these dimensions (0.0-1.0):

    1. ACCURACY (weight 0.25)
       - Technical correctness
       - Code validity
       - Up-to-date information

    2. COHERENCE (weight 0.20)
       - Logical structure
       - Clear flow
       - Consistent terminology

    3. DEPTH (weight 0.25)
       - Comprehensive coverage
       - Edge cases mentioned
       - Appropriate detail level

    4. RELEVANCE (weight 0.30)
       - Alignment with AI/ML, backend, frontend, DevOps
       - Practical applicability
       - Technical value

    Output JSON:
    {
        "accuracy": {"score": 0.X, "rationale": "..."},
        "coherence": {"score": 0.X, "rationale": "..."},
        "depth": {"score": 0.X, "rationale": "..."},
        "relevance": {"score": 0.X, "rationale": "..."},
        "weighted_total": 0.X,
        "recommendation": "include|review|exclude"
    }
    """,
    run_in_background=True
)

Consensus Aggregation Logic:

from dataclasses import dataclass
from typing import Literal

@dataclass
class CurationConsensus:
    """Aggregated result from multi-agent analysis."""
    quality_score: float  # Weighted average (0-1)
    confidence: float     # Agent agreement (0-1)
    decision: Literal["include", "review", "exclude"]
    content_type: str
    difficulty: str
    tags: list[str]
    suggested_queries: list[dict]
    warnings: list[str]

def aggregate_results(
    quality_result: dict,
    difficulty_result: dict,
    domain_result: dict,
    query_result: dict,
) -> CurationConsensus:
    """Aggregate multi-agent results into consensus."""

    # Calculate weighted quality score
    q = quality_result
    quality_score = (
        q["accuracy"]["score"] * 0.25 +
        q["coherence"]["score"] * 0.20 +
        q["depth"]["score"] * 0.25 +
        q["relevance"]["score"] * 0.30
    )

    # Calculate confidence (variance-based)
    scores = [
        q["accuracy"]["score"],
        q["coherence"]["score"],
        q["depth"]["score"],
        q["relevance"]["score"],
    ]
    variance = sum((s - quality_score)**2 for s in scores) / len(scores)
    confidence = 1.0 - min(variance * 4, 1.0)

    # Decision thresholds
    if quality_score >= 0.75 and confidence >= 0.7:
        decision = "include"
    elif quality_score >= 0.55:
        decision = "review"
    else:
        decision = "exclude"

    return CurationConsensus(
        quality_score=quality_score,
        confidence=confidence,
        decision=decision,
        content_type=difficulty_result.get("content_type", "article"),
        difficulty=difficulty_result["difficulty"],
        tags=domain_result["tags"],
        suggested_queries=query_result["queries"],
        warnings=[],
    )

Langfuse Integration (v3):

from langfuse import observe, get_client

@observe(name="golden-dataset-curation")
async def curate_with_tracing(url: str, doc_id: str, consensus: CurationConsensus) -> dict:
    """Trace curation decisions to Langfuse for audit trail."""
    get_client().update_current_trace(
        metadata={"source_url": url, "document_id": doc_id}
    )

    # Log individual dimension scores against the current trace
    lf = get_client()
    trace_id = lf.get_current_trace_id()
    lf.score(trace_id=trace_id, name="accuracy", value=0.85)
    lf.score(trace_id=trace_id, name="coherence", value=0.90)
    lf.score(trace_id=trace_id, name="depth", value=0.78)
    lf.score(trace_id=trace_id, name="relevance", value=0.92)

    # Final aggregated score
    lf.score(trace_id=trace_id, name="quality_total", value=consensus.quality_score)
    get_client().update_current_observation(
        metadata={"curation_decision": consensus.decision}
    )
    return {"decision": consensus.decision, "score": consensus.quality_score}

Incorrect — Sequential agent execution:

# Sequential - 4x slower
quality_result = await analyze_quality(content)
difficulty_result = await analyze_difficulty(content)
domain_result = await analyze_domain(content)
query_result = await generate_queries(content)

Correct — Parallel agent execution:

# Parallel - all agents run concurrently
quality_task = Task(subagent_type="code-quality-reviewer", prompt=quality_prompt, run_in_background=True)
difficulty_task = Task(subagent_type="classifier", prompt=difficulty_prompt, run_in_background=True)
domain_task = Task(subagent_type="tagger", prompt=domain_prompt, run_in_background=True)
query_task = Task(subagent_type="query-generator", prompt=query_prompt, run_in_background=True)

# Wait for all results
results = await gather_task_results([quality_task, difficulty_task, domain_task, query_task])

Key rules:

  • Run all 4 analysis agents in parallel for throughput
  • Use weighted scoring (accuracy 0.25, coherence 0.20, depth 0.25, relevance 0.30)
  • Require user approval before final inclusion
  • Log all scores to Langfuse for audit trail

Apply systematic collection criteria to maintain consistent golden dataset quality — HIGH

Content Collection

Systematic patterns for collecting and classifying content for golden dataset inclusion.

Content Type Classification:

TypeDescriptionQuality Focus
articleTechnical articles, blog postsDepth, accuracy, actionability
tutorialStep-by-step guidesCompleteness, clarity, code quality
research_paperAcademic papers, whitepapersRigor, citations, methodology
documentationAPI docs, reference materialsAccuracy, completeness, examples
video_transcriptTranscribed video contentStructure, coherence, key points
code_repositoryREADME, code analysisCode quality, documentation

Classification Decision Tree:

def classify_content_type(content: str, source_url: str) -> str:
    """Classify content type based on structure and source."""

    # URL-based hints
    if "arxiv.org" in source_url or "papers" in source_url:
        return "research_paper"
    if "docs." in source_url or "/api/" in source_url:
        return "documentation"
    if "github.com" in source_url:
        return "code_repository"

    # Content-based analysis
    if has_step_by_step_structure(content):
        return "tutorial"
    if has_academic_structure(content):  # Abstract, methodology, results
        return "research_paper"

    # Default
    return "article"

Quality Thresholds:

# Recommended thresholds for golden dataset inclusion
minimum_quality_score: 0.70
minimum_confidence: 0.65
required_tags: 2          # At least 2 domain tags
required_queries: 3       # At least 3 test queries

Quality Dimensions:

DimensionWeightPerfectAcceptableFailing
Accuracy0.250.95-1.00.70-0.94<0.70
Coherence0.200.90-1.00.60-0.89<0.60
Depth0.250.90-1.00.55-0.89<0.55
Relevance0.300.95-1.00.70-0.94<0.70

Decision Thresholds:

Quality ScoreConfidenceDecision
>= 0.75>= 0.70include
>= 0.55anyreview
< 0.55anyexclude

Duplicate Prevention Checklist:

  1. Check URL against existing source_url_map.json
  2. Run semantic similarity against existing document embeddings
  3. Warn if >80% similar to existing document

Provenance Tracking -- always record:

  • Source URL (canonical)
  • Curation date
  • Agent scores (for audit trail)
  • Langfuse trace ID

Incorrect — Placeholder URL:

# Missing real source URL
analysis = Analysis(
    url="https://orchestkit.dev/placeholder/123",
    content_type="article",
)

Correct — Real canonical URL:

# Real source for re-fetching and validation
analysis = Analysis(
    url="https://docs.python.org/3/library/asyncio.html",
    content_type="documentation",
)

Key rules:

  • Never use placeholder URLs -- always store real canonical source URLs
  • Require minimum 2 domain tags and 3 test queries per entry
  • Score all 4 quality dimensions before inclusion decision
  • Track provenance for full audit trail

Balance dataset coverage across difficulty levels, content types, and domains — HIGH

Dataset Diversity

Difficulty stratification, domain coverage, and balance guidelines for golden datasets.

Difficulty Levels:

LevelSemantic ComplexityExpected ScoreCharacteristics
trivialDirect keyword match>0.85Technical terms, exact phrases
easyCommon synonyms>0.70Well-known concepts, slight variations
mediumParaphrased intent>0.55Conceptual queries, multi-topic
hardMulti-hop reasoning>0.40Cross-domain, comparative analysis
adversarialEdge casesGraceful degradationRobustness tests, off-domain

Difficulty Classification:

def classify_difficulty(document: dict) -> str:
    """Classify document difficulty for retrieval testing."""

    factors = {
        "technical_density": count_technical_terms(document["content"]),
        "section_count": len(document.get("sections", [])),
        "cross_references": count_cross_references(document),
        "abstraction_level": assess_abstraction(document),
        "domain_specificity": assess_domain_specificity(document),
    }

    # Scoring rubric
    score = 0
    if factors["technical_density"] > 50:
        score += 2
    if factors["section_count"] > 10:
        score += 1
    if factors["cross_references"] > 5:
        score += 2
    if factors["abstraction_level"] == "high":
        score += 2

    # Map score to difficulty
    if score <= 2:
        return "trivial"
    elif score <= 4:
        return "easy"
    elif score <= 6:
        return "medium"
    elif score <= 8:
        return "hard"
    else:
        return "adversarial"

Coverage Requirements:

MetricMinimum
Tutorials>= 15% of documents
Research papers>= 5% of documents
Domain coverage>= 5 docs per expected domain
Hard queries>= 10% of queries
Adversarial queries>= 5% of queries

Difficulty Distribution Minimums:

LevelMinimum Count
trivial3
easy3
medium5
hard3

Coverage Gap Detection:

def analyze_coverage_gaps(
    documents: list[dict],
    queries: list[dict],
) -> dict:
    """Analyze dataset coverage and identify gaps."""

    # Content type distribution
    content_types = {}
    for doc in documents:
        ct = doc.get("content_type", "unknown")
        content_types[ct] = content_types.get(ct, 0) + 1

    # Domain/tag distribution
    all_tags = []
    for doc in documents:
        all_tags.extend(doc.get("tags", []))
    tag_counts = {}
    for tag in all_tags:
        tag_counts[tag] = tag_counts.get(tag, 0) + 1

    # Difficulty distribution
    difficulties = {}
    for query in queries:
        diff = query.get("difficulty", "unknown")
        difficulties[diff] = difficulties.get(diff, 0) + 1

    # Identify gaps
    gaps = []
    total_docs = len(documents)
    if content_types.get("tutorial", 0) / total_docs < 0.15:
        gaps.append("Under-represented: tutorials (<15%)")
    if content_types.get("research_paper", 0) / total_docs < 0.05:
        gaps.append("Under-represented: research papers (<5%)")

    expected_domains = ["ai-ml", "backend", "frontend", "devops", "security"]
    for domain in expected_domains:
        if tag_counts.get(domain, 0) < 5:
            gaps.append(f"Under-represented domain: {domain} (<5 docs)")

    total_queries = len(queries)
    if difficulties.get("hard", 0) / total_queries < 0.10:
        gaps.append("Under-represented: hard queries (<10%)")

    return {
        "content_type_distribution": content_types,
        "difficulty_distribution": difficulties,
        "gaps": gaps,
    }

Incorrect — Hardcoded difficulty without analysis:

# Guessing difficulty level
document = {
    "id": "new-doc",
    "difficulty": "medium",  # No assessment
    "tags": ["ai-ml"],  # Only 1 tag
}

Correct — Classified difficulty with analysis:

# Analyze multiple factors
factors = {
    "technical_density": count_technical_terms(document["content"]),
    "section_count": len(document.get("sections", [])),
    "abstraction_level": assess_abstraction(document),
}
difficulty = classify_difficulty(document)  # Returns "hard" based on factors
document["difficulty"] = difficulty
document["tags"] = ["ai-ml", "backend", "devops"]  # Minimum 2 tags

Key rules:

  • Maintain balanced coverage across content types, difficulty levels, and domains
  • Do not over-index on articles -- ensure tutorials and research papers are represented
  • Need both trivial AND hard queries for meaningful evaluation
  • Run coverage analysis before and after adding new entries
  • Target all 5 expected domains (ai-ml, backend, frontend, devops, security)

Integrate golden dataset validation and backups into the CI/CD pipeline — HIGH

CI Integration

GitHub Actions automation, pre-deployment validation, and scheduled backup patterns.

Automated Weekly Backup:

# .github/workflows/backup-golden-dataset.yml
name: Backup Golden Dataset

on:
  schedule:
    - cron: '0 2 * * 0'  # Weekly on Sunday at 2am
  workflow_dispatch:  # Manual trigger

jobs:
  backup:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          cd backend
          poetry install

      - name: Run backup
        env:
          DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py backup

      - name: Commit backup
        run: |
          git config user.name "GitHub Actions"
          git config user.email "actions@github.com"
          git add backend/data/golden_dataset_backup.json
          git add backend/data/golden_dataset_metadata.json
          git diff-index --quiet HEAD || git commit -m "chore: automated golden dataset backup [skip ci]"
          git push

Validation on Pull Request:

# .github/workflows/validate-golden-dataset.yml
name: Validate Golden Dataset

on:
  pull_request:
    paths:
      - 'backend/data/golden_dataset_backup.json'
  schedule:
    - cron: '0 8 * * 1'  # Weekly on Monday 8am

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          cd backend
          poetry install

      - name: Start PostgreSQL
        run: docker compose up -d postgres

      - name: Run migrations
        run: |
          cd backend
          poetry run alembic upgrade head

      - name: Restore golden dataset
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py restore

      - name: Validate dataset
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py verify

      - name: Run retrieval tests
        run: |
          cd backend
          poetry run pytest tests/integration/test_retrieval_quality.py -v

Pre-Deployment Checklist:

cd backend

# 1. Backup current data
poetry run python scripts/backup_golden_dataset.py backup

# 2. Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify

# 3. Run retrieval quality tests
poetry run pytest tests/integration/test_retrieval_quality.py

# 4. Check for regressions
# Expected: 91.6% pass rate, 0.777 MRR
# If lower, investigate before deploying

Manual CI Trigger:

# Trigger workflow manually
gh workflow run backup-golden-dataset.yml

# Check workflow status
gh run list --workflow=backup-golden-dataset.yml

# View logs
gh run view --log

Pre-Commit Hook:

#!/bin/bash
# Only run if golden dataset files changed
CHANGED_FILES=$(git diff --cached --name-only)

if echo "$CHANGED_FILES" | grep -q "fixtures/documents_expanded.json\|fixtures/queries.json"; then
    echo "Validating golden dataset changes..."

    cd backend
    poetry run python scripts/data/add_to_golden_dataset.py validate-all

    if [ $? -ne 0 ]; then
        echo "Golden dataset validation failed!"
        exit 1
    fi

    echo "Golden dataset validation passed"
fi

Incorrect — Backup without validation:

# Missing verification step
- name: Run backup
  run: poetry run python scripts/backup_golden_dataset.py backup
- name: Commit backup
  run: git commit -m "chore: backup"

Correct — Backup with verification:

# Verify backup integrity
- name: Run backup
  run: poetry run python scripts/backup_golden_dataset.py backup
- name: Verify backup
  run: poetry run python scripts/backup_golden_dataset.py verify
- name: Commit backup
  run: git commit -m "chore: automated golden dataset backup [skip ci]"

Key rules:

  • Set up weekly automated backups to prevent data staleness
  • Validate golden dataset on every PR that modifies dataset files
  • Always run verification after automated backup creation
  • Use [skip ci] in automated commit messages to prevent infinite loops
  • Include pre-deployment validation in release checklists

Choose the right backup strategy and URL contract for golden dataset storage — HIGH

Storage Patterns

Backup strategies, URL contract enforcement, and data integrity checks.

Backup Strategy Comparison:

StrategyVersion ControlRestore SpeedPortabilityInspection
JSON (recommended)YesSlower (regen embeddings)HighEasy
SQL DumpNo (binary)FastDB-version dependentHard

The URL Contract:

Golden dataset analyses MUST store real canonical URLs, not placeholders.

# WRONG - Placeholder URL (breaks restore)
analysis.url = "https://orchestkit.dev/placeholder/123"

# CORRECT - Real canonical URL (enables re-fetch if needed)
analysis.url = "https://docs.python.org/3/library/asyncio.html"

Why this matters:

  • Enables re-fetching content if embeddings need regeneration
  • Allows validation that source content hasn't changed
  • Provides audit trail for data provenance

URL Validation:

FORBIDDEN_URL_PATTERNS = [
    "orchestkit.dev",
    "placeholder",
    "example.com",
    "localhost",
    "127.0.0.1",
]

def validate_url(url: str) -> tuple[bool, str]:
    """Validate URL is not a placeholder."""
    for pattern in FORBIDDEN_URL_PATTERNS:
        if pattern in url.lower():
            return False, f"URL contains forbidden pattern: {pattern}"

    if not url.startswith("https://"):
        if not url.startswith("http://arxiv.org"):  # arXiv redirects
            return False, "URL must use HTTPS"

    return True, "OK"

Data Integrity Checks:

CheckError/WarningDescription
Count mismatchErrorAnalysis/chunk count differs from metadata
Placeholder URLsErrorURLs containing orchestkit.dev or placeholder
Missing embeddingsErrorChunks without embeddings after restore
Orphaned chunksWarningChunks with no parent analysis

Verification Implementation:

async def verify_golden_dataset() -> dict:
    """Verify golden dataset integrity."""

    errors = []
    warnings = []

    async with get_session() as session:
        # 1. Check counts
        analysis_count = await session.scalar(select(func.count(Analysis.id)))
        chunk_count = await session.scalar(select(func.count(Chunk.id)))

        expected = load_metadata()
        if analysis_count != expected["total_analyses"]:
            errors.append(f"Analysis count mismatch: {analysis_count} vs {expected['total_analyses']}")

        # 2. Check URL contract
        query = select(Analysis).where(
            Analysis.url.like("%orchestkit.dev%") |
            Analysis.url.like("%placeholder%")
        )
        result = await session.execute(query)
        invalid_urls = result.scalars().all()

        if invalid_urls:
            errors.append(f"Found {len(invalid_urls)} analyses with placeholder URLs")

        # 3. Check embeddings exist
        query = select(Chunk).where(Chunk.embedding.is_(None))
        result = await session.execute(query)
        missing_embeddings = result.scalars().all()

        if missing_embeddings:
            errors.append(f"Found {len(missing_embeddings)} chunks without embeddings")

        # 4. Check orphaned chunks
        query = select(Chunk).outerjoin(Analysis).where(Analysis.id.is_(None))
        result = await session.execute(query)
        orphaned = result.scalars().all()

        if orphaned:
            warnings.append(f"Found {len(orphaned)} orphaned chunks")

        return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}

Best Practices:

  1. Version control backups -- commit JSON to git for history and diffs
  2. Validate before deployment -- run verify before production changes
  3. Test restore in staging -- never test restore in production first
  4. Document changes -- track additions/removals in metadata

Incorrect — Missing URL validation:

# No URL contract enforcement
analysis.url = url  # Could be placeholder
session.add(analysis)
await session.commit()

Correct — Enforcing URL contract:

# Validate before saving
valid, msg = validate_url(url)
if not valid:
    raise ValueError(f"Invalid URL: {msg}")

analysis.url = url  # Guaranteed to be real canonical URL
session.add(analysis)
await session.commit()

Key rules:

  • Always use JSON backup for version control and portability
  • Never store placeholder URLs -- enforce the URL contract
  • Run all 4 integrity checks (counts, URLs, embeddings, orphans) after every restore
  • SQL dumps for local snapshots only, not version control

Version golden datasets for reproducible evaluation across environments and recovery — HIGH

Dataset Versioning

JSON backup format, embedding regeneration, and disaster recovery patterns.

Backup Format:

{
  "version": "1.0",
  "created_at": "2025-12-19T10:30:00Z",
  "metadata": {
    "total_analyses": 98,
    "total_chunks": 415,
    "total_artifacts": 98
  },
  "analyses": [
    {
      "id": "550e8400-e29b-41d4-a716-446655440000",
      "url": "https://docs.python.org/3/library/asyncio.html",
      "content_type": "documentation",
      "status": "completed",
      "created_at": "2025-11-15T08:20:00Z",
      "chunks": [
        {
          "id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
          "content": "asyncio is a library...",
          "section_title": "Introduction to asyncio"
        }
      ]
    }
  ]
}

Key design decisions:

  • Embeddings excluded (regenerate on restore with current model)
  • Nested structure (analyses -> chunks -> artifacts)
  • Metadata for validation
  • ISO timestamps for reproducibility

Restore with Embedding Regeneration:

async def restore_golden_dataset(replace: bool = False):
    """Restore golden dataset from JSON backup."""

    with open(BACKUP_FILE) as f:
        backup_data = json.load(f)

    async with get_session() as session:
        if replace:
            await session.execute(delete(Chunk))
            await session.execute(delete(Artifact))
            await session.execute(delete(Analysis))
            await session.commit()

        from app.shared.services.embeddings import embed_text

        for analysis_data in backup_data["analyses"]:
            analysis = Analysis(
                id=UUID(analysis_data["id"]),
                url=analysis_data["url"],
            )
            session.add(analysis)

            for chunk_data in analysis_data["chunks"]:
                # Regenerate embedding using CURRENT model
                embedding = await embed_text(chunk_data["content"])

                chunk = Chunk(
                    id=UUID(chunk_data["id"]),
                    analysis_id=analysis.id,
                    content=chunk_data["content"],
                    embedding=embedding,  # Freshly generated!
                )
                session.add(chunk)

            if idx % 10 == 0:
                await session.commit()

        await session.commit()

Why regenerate embeddings?

  • Embedding models improve over time (Voyage AI v1 -> v2)
  • Ensures consistency with current production model
  • Smaller backup files (exclude large vectors)

Disaster Recovery Scenarios:

ScenarioSteps
Accidental deletionrestore --replace -> verify -> run tests
Migration failurealembic downgrade -1 -> restore --replace -> fix migration
New environmentClone repo -> setup DB -> restore -> run tests

CLI Commands:

cd backend

# Backup golden dataset
poetry run python scripts/backup_golden_dataset.py backup

# Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify

# Restore from backup (WARNING: Deletes existing data)
poetry run python scripts/backup_golden_dataset.py restore --replace

# Restore without deleting (adds to existing)
poetry run python scripts/backup_golden_dataset.py restore

Incorrect — Storing embeddings in backup:

# Embedding vectors bloat backup file
backup_data = {
    "chunks": [{
        "content": "...",
        "embedding": [0.123, 0.456, ...],  # 1024 floats!
    }]
}

Correct — Regenerate embeddings on restore:

# Exclude embeddings from backup
backup_data = {
    "chunks": [{
        "content": "...",
        # No embedding field
    }]
}

# Regenerate during restore
embedding = await embed_text(chunk_data["content"])
chunk.embedding = embedding  # Fresh with current model

Key rules:

  • Always regenerate embeddings on restore -- never store them in backup
  • Commit backups every 10 analyses to avoid huge transactions
  • Verify counts match metadata after every restore
  • Test restore procedures in staging before production

Detect duplicate entries and coverage gaps that skew golden dataset evaluation results — CRITICAL

Drift Detection

Duplicate detection, semantic similarity checking, and coverage gap analysis.

Duplicate Detection Thresholds:

SimilarityAction
>= 0.90Block -- Content too similar
>= 0.85Warn -- High similarity detected
>= 0.80Note -- Similar content exists
< 0.80Allow -- Sufficiently unique

Semantic Similarity Check:

import numpy as np
from typing import Optional

async def check_duplicate(
    new_content: str,
    existing_embeddings: list[tuple[str, np.ndarray]],
    embedding_service,
    threshold: float = 0.85,
) -> Optional[tuple[str, float]]:
    """Check if content is duplicate of existing document.

    Returns:
        (doc_id, similarity) if duplicate found, None otherwise
    """
    # Generate embedding for new content
    new_embedding = await embedding_service.generate_embedding(
        text=new_content[:8000],  # Truncate for embedding
        normalize=True,
    )
    new_vec = np.array(new_embedding)

    # Compare against existing
    max_similarity = 0.0
    most_similar_doc = None

    for doc_id, existing_vec in existing_embeddings:
        # Cosine similarity (vectors are normalized)
        similarity = np.dot(new_vec, existing_vec)

        if similarity > max_similarity:
            max_similarity = similarity
            most_similar_doc = doc_id

    if max_similarity >= threshold:
        return (most_similar_doc, max_similarity)

    return None

URL Duplicate Check:

def check_url_duplicate(
    new_url: str,
    source_url_map: dict[str, str],
) -> Optional[str]:
    """Check if URL already exists in dataset."""
    normalized = normalize_url(new_url)

    for doc_id, existing_url in source_url_map.items():
        if normalize_url(existing_url) == normalized:
            return doc_id

    return None

def normalize_url(url: str) -> str:
    """Normalize URL for comparison."""
    from urllib.parse import urlparse, urlunparse

    parsed = urlparse(url.lower())
    netloc = parsed.netloc.replace("www.", "")
    path = parsed.path.rstrip("/")

    return urlunparse((
        parsed.scheme, netloc, path,
        "", "", "",  # params, query, fragment stripped
    ))

Pre-Addition Validation Workflow:

async def validate_before_add(
    document: dict,
    existing_documents: list[dict],
    source_url_map: dict[str, str],
    embedding_service,
) -> dict:
    """Run full validation before adding document."""
    errors = []
    warnings = []

    # 1. Schema validation
    schema_errors = validate_schema(document)
    errors.extend(schema_errors)

    # 2. URL validation
    url_valid, url_msg = validate_url(document.get("source_url", ""))
    if not url_valid:
        errors.append(url_msg)

    # 3. URL duplicate check
    url_dup = check_url_duplicate(document.get("source_url", ""), source_url_map)
    if url_dup:
        errors.append(f"URL already exists in dataset as: {url_dup}")

    # 4. Semantic duplicate check
    content = " ".join(
        s.get("content", "") for s in document.get("sections", [])
    )
    existing_embeddings = await load_existing_embeddings(existing_documents)
    dup_result = await check_duplicate(content, existing_embeddings, embedding_service)

    if dup_result and dup_result[1] >= 0.90:
        errors.append(
            f"Content too similar to: {dup_result[0]} (similarity: {dup_result[1]:.2f})"
        )
    elif dup_result and dup_result[1] >= 0.80:
        warnings.append(
            f"Content similar to: {dup_result[0]} (similarity: {dup_result[1]:.2f})"
        )

    return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}

Incorrect — Raw URL comparison:

# Fails for www/https/trailing slash variants
if new_url == existing_url:
    return "duplicate"

Correct — Normalized URL comparison:

# Normalize both URLs before comparing
def normalize_url(url: str) -> str:
    parsed = urlparse(url.lower())
    netloc = parsed.netloc.replace("www.", "")
    path = parsed.path.rstrip("/")
    return urlunparse((parsed.scheme, netloc, path, "", "", ""))

if normalize_url(new_url) == normalize_url(existing_url):
    return "duplicate"

Key rules:

  • Always run both URL and semantic duplicate checks before adding entries
  • Block entries with >= 0.90 cosine similarity to existing content
  • Normalize URLs before comparison (strip www, trailing slashes, query params)
  • Run coverage gap analysis periodically to detect dataset drift
  • Truncate content to 8000 chars for embedding comparison

Validate schema and content quality to prevent invalid entries from degrading evaluations — CRITICAL

Quality Validation

Schema validation, content quality checks, and referential integrity enforcement.

Document Schema (v2.0):

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "required": ["id", "title", "source_url", "content_type", "sections"],
  "properties": {
    "id": {
      "type": "string",
      "pattern": "^[a-z0-9-]+$",
      "description": "Unique kebab-case identifier"
    },
    "title": {
      "type": "string",
      "minLength": 10,
      "maxLength": 200
    },
    "source_url": {
      "type": "string",
      "format": "uri",
      "description": "Canonical source URL (NOT placeholder)"
    },
    "content_type": {
      "type": "string",
      "enum": ["article", "tutorial", "research_paper", "documentation", "video_transcript", "code_repository"]
    },
    "tags": {
      "type": "array",
      "items": {"type": "string"},
      "minItems": 2,
      "maxItems": 10
    },
    "sections": {
      "type": "array",
      "minItems": 1,
      "items": {
        "type": "object",
        "required": ["id", "title", "content"],
        "properties": {
          "id": {"type": "string", "pattern": "^[a-z0-9-/]+$"},
          "title": {"type": "string"},
          "content": {"type": "string", "minLength": 50}
        }
      }
    }
  }
}

Content Quality Validation:

def validate_content_quality(document: dict) -> list[str]:
    """Validate document content meets quality standards."""
    warnings = []

    # Title length
    title = document.get("title", "")
    if len(title) < 10:
        warnings.append("Title too short (min 10 chars)")
    if len(title) > 200:
        warnings.append("Title too long (max 200 chars)")

    # Section content
    for section in document.get("sections", []):
        content = section.get("content", "")
        if len(content) < 50:
            warnings.append(f"Section {section['id']} content too short (min 50 chars)")
        if len(content) > 50000:
            warnings.append(f"Section {section['id']} content very long (>50k chars)")

    # Tags
    tags = document.get("tags", [])
    if len(tags) < 2:
        warnings.append("Too few tags (min 2)")
    if len(tags) > 10:
        warnings.append("Too many tags (max 10)")

    return warnings

Unique ID Validation:

def validate_unique_ids(documents: list[dict], queries: list[dict]) -> list[str]:
    """Ensure all IDs are unique across documents and queries."""
    errors = []

    # Document IDs
    doc_ids = [d["id"] for d in documents]
    if len(doc_ids) != len(set(doc_ids)):
        duplicates = [id for id in doc_ids if doc_ids.count(id) > 1]
        errors.append(f"Duplicate document IDs: {set(duplicates)}")

    # Query IDs
    query_ids = [q["id"] for q in queries]
    if len(query_ids) != len(set(query_ids)):
        duplicates = [id for id in query_ids if query_ids.count(id) > 1]
        errors.append(f"Duplicate query IDs: {set(duplicates)}")

    # Section IDs within documents
    for doc in documents:
        section_ids = [s["id"] for s in doc.get("sections", [])]
        if len(section_ids) != len(set(section_ids)):
            errors.append(f"Duplicate section IDs in document: {doc['id']}")

    return errors

Referential Integrity:

def validate_references(documents: list[dict], queries: list[dict]) -> list[str]:
    """Ensure query expected_chunks reference valid section IDs."""
    errors = []

    # Build set of all valid section IDs
    valid_sections = set()
    for doc in documents:
        for section in doc.get("sections", []):
            valid_sections.add(section["id"])

    # Check query references
    for query in queries:
        for chunk_id in query.get("expected_chunks", []):
            if chunk_id not in valid_sections:
                errors.append(
                    f"Query {query['id']} references invalid section: {chunk_id}"
                )

    return errors

Validation Rules Summary:

RulePurposeSeverity
No Placeholder URLsEnsure real canonical URLsError
Unique IdentifiersNo duplicate doc/query/section IDsError
Referential IntegrityQuery chunks reference valid sectionsError
Content QualityTitle/content length, tag countWarning
Difficulty DistributionBalanced query difficulty levelsWarning

Incorrect — Missing referential integrity check:

# Query references non-existent section
query = {
    "id": "q-test",
    "expected_chunks": ["section-999"],  # Doesn't exist!
}
queries.append(query)  # No validation

Correct — Validate references exist:

# Build set of valid section IDs
valid_sections = set()
for doc in documents:
    for section in doc.get("sections", []):
        valid_sections.add(section["id"])

# Validate query references
for chunk_id in query.get("expected_chunks", []):
    if chunk_id not in valid_sections:
        raise ValueError(f"Query references invalid section: {chunk_id}")

Key rules:

  • All documents must pass schema validation before inclusion
  • IDs must be unique across documents, queries, and sections
  • Query expected_chunks must reference existing section IDs
  • Content quality checks are warnings (non-blocking) but should be addressed

Run regression tests and enforce difficulty distribution to maintain evaluation reliability — CRITICAL

Regression Testing

Difficulty distribution enforcement, pre-commit hooks, and full dataset validation.

Difficulty Distribution Validation:

def validate_difficulty_distribution(queries: list[dict]) -> list[str]:
    """Ensure balanced difficulty distribution."""
    warnings = []

    # Count by difficulty
    distribution = {}
    for query in queries:
        diff = query.get("difficulty", "unknown")
        distribution[diff] = distribution.get(diff, 0) + 1

    # Minimum requirements
    requirements = {
        "trivial": 3,
        "easy": 3,
        "medium": 5,  # Most common real-world case
        "hard": 3,
    }

    for level, min_count in requirements.items():
        actual = distribution.get(level, 0)
        if actual < min_count:
            warnings.append(
                f"Insufficient {level} queries: {actual}/{min_count}"
            )

    return warnings

Query Schema:

{
  "type": "object",
  "required": ["id", "query", "difficulty", "expected_chunks", "min_score"],
  "properties": {
    "id": {"type": "string", "pattern": "^q-[a-z0-9-]+$"},
    "query": {"type": "string", "minLength": 5, "maxLength": 500},
    "modes": {"type": "array", "items": {"enum": ["semantic", "keyword", "hybrid"]}},
    "category": {"enum": ["specific", "broad", "negative", "edge", "coarse-to-fine"]},
    "difficulty": {"enum": ["trivial", "easy", "medium", "hard", "adversarial"]},
    "expected_chunks": {"type": "array", "items": {"type": "string"}, "minItems": 1},
    "min_score": {"type": "number", "minimum": 0, "maximum": 1}
  }
}

Full Dataset Validation:

async def validate_full_dataset() -> dict:
    """Run comprehensive validation on entire dataset.

    Use this for:
    - Pre-commit hooks
    - CI/CD validation
    - Periodic integrity checks
    """
    from backend.tests.smoke.retrieval.fixtures.loader import FixtureLoader

    loader = FixtureLoader(use_expanded=True)
    documents = loader.load_documents()
    queries = loader.load_queries()
    source_url_map = loader.load_source_url_map()

    all_errors = []
    all_warnings = []

    # 1. Schema validation for all documents
    for doc in documents:
        errors = validate_schema(doc)
        all_errors.extend([f"[{doc['id']}] {e}" for e in errors])

    # 2. Unique ID validation
    id_errors = validate_unique_ids(documents, queries)
    all_errors.extend(id_errors)

    # 3. Referential integrity
    ref_errors = validate_references(documents, queries)
    all_errors.extend(ref_errors)

    # 4. URL validation
    for doc in documents:
        valid, msg = validate_url(doc.get("source_url", ""))
        if not valid:
            all_errors.append(f"[{doc['id']}] {msg}")

    # 5. Difficulty distribution
    dist_warnings = validate_difficulty_distribution(queries)
    all_warnings.extend(dist_warnings)

    # 6. Coverage analysis
    coverage = analyze_coverage_gaps(documents, queries)
    all_warnings.extend(coverage["gaps"])

    return {
        "valid": len(all_errors) == 0,
        "errors": all_errors,
        "warnings": all_warnings,
        "coverage": coverage,
        "stats": {
            "documents": len(documents),
            "queries": len(queries),
            "sections": sum(len(d.get("sections", [])) for d in documents),
        }
    }

Pre-Commit Hook:

#!/bin/bash
# .claude/hooks/pretool/bash/validate-golden-dataset.sh

# Only run if golden dataset files changed
CHANGED_FILES=$(git diff --cached --name-only)

if echo "$CHANGED_FILES" | grep -q "fixtures/documents_expanded.json\|fixtures/queries.json\|fixtures/source_url_map.json"; then
    echo "Validating golden dataset changes..."

    cd backend
    poetry run python scripts/data/add_to_golden_dataset.py validate-all

    if [ $? -ne 0 ]; then
        echo "Golden dataset validation failed!"
        echo "Fix errors before committing."
        exit 1
    fi

    echo "Golden dataset validation passed"
fi

CLI Validation Commands:

# Validate specific document
poetry run python scripts/data/add_to_golden_dataset.py validate \
    --document-id "new-doc-id"

# Validate full dataset
poetry run python scripts/data/add_to_golden_dataset.py validate-all

# Check for duplicates
poetry run python scripts/data/add_to_golden_dataset.py check-duplicate \
    --url "https://example.com/article"

# Analyze coverage gaps
poetry run python scripts/data/add_to_golden_dataset.py coverage

Incorrect — Unbalanced difficulty distribution:

# All queries marked "easy"
queries = [
    {"id": "q-1", "difficulty": "easy"},
    {"id": "q-2", "difficulty": "easy"},
    {"id": "q-3", "difficulty": "easy"},
]

Correct — Balanced difficulty distribution:

# Mix of difficulty levels
queries = [
    {"id": "q-1", "difficulty": "trivial"},  # 3+ trivial
    {"id": "q-2", "difficulty": "easy"},     # 3+ easy
    {"id": "q-3", "difficulty": "medium"},   # 5+ medium
    {"id": "q-4", "difficulty": "hard"},     # 3+ hard
]

# Validate distribution
validate_difficulty_distribution(queries)  # Checks minimums

Key rules:

  • Run full dataset validation before every commit that modifies golden dataset files
  • Enforce minimum difficulty distribution (trivial 3, easy 3, medium 5, hard 3)
  • Run all 6 validation steps: schema, IDs, references, URLs, distribution, coverage
  • Block commits that introduce schema errors or referential integrity violations
  • Treat difficulty distribution and coverage gaps as warnings that should be addressed

References (8)

Annotation Patterns

Annotation Patterns

Multi-agent analysis pipeline and consensus aggregation for golden dataset curation.

Multi-Agent Analysis Pipeline

Architecture

INPUT: URL/Content
        |
        v
+------------------+
|   FETCH AGENT    |  WebFetch or file read
|   (sequential)   |  Extract structure, detect type
+--------+---------+
         |
         v
+-----------------------------------------------+
|  PARALLEL ANALYSIS AGENTS                      |
|  +----------+ +----------+ +--------+ +------+ |
|  | Quality  | |Difficulty| | Domain | |Query | |
|  |Evaluator | |Classifier| | Tagger | |Gen   | |
|  +----+-----+ +----+-----+ +---+----+ +--+---+ |
|       |            |           |         |     |
+-------+------------+-----------+---------+-----+
        |            |           |         |
        +------------+-----------+---------+
                     |
                     v
+-----------------------------------------------+
|  CONSENSUS AGGREGATOR                          |
|  - Weighted quality score                      |
|  - Confidence level (agent agreement)          |
|  - Final recommendation: include/review/exclude|
+--------+--------------------------------------+
         |
         v
+------------------+
|  USER APPROVAL   |  Show scores, get confirmation
+--------+---------+
         |
         v
OUTPUT: Curated document entry

Agent Specifications

Quality Evaluator Agent

Task(
    subagent_type="code-quality-reviewer",
    prompt="""GOLDEN DATASET QUALITY EVALUATION

    Evaluate this content for golden dataset inclusion:

    Content: {content_preview}
    Source: {source_url}
    Type: {content_type}

    Score these dimensions (0.0-1.0):

    1. ACCURACY (weight 0.25)
       - Technical correctness
       - Code validity
       - Up-to-date information

    2. COHERENCE (weight 0.20)
       - Logical structure
       - Clear flow
       - Consistent terminology

    3. DEPTH (weight 0.25)
       - Comprehensive coverage
       - Edge cases mentioned
       - Appropriate detail level

    4. RELEVANCE (weight 0.30)
       - Alignment with AI/ML, backend, frontend, DevOps
       - Practical applicability
       - Technical value

    Output JSON:
    {
        "accuracy": {"score": 0.X, "rationale": "..."},
        "coherence": {"score": 0.X, "rationale": "..."},
        "depth": {"score": 0.X, "rationale": "..."},
        "relevance": {"score": 0.X, "rationale": "..."},
        "weighted_total": 0.X,
        "recommendation": "include|review|exclude"
    }
    """,
    run_in_background=True
)

Difficulty Classifier Agent

Task(
    subagent_type="workflow-architect",
    prompt="""DIFFICULTY CLASSIFICATION

    Analyze document complexity for retrieval testing:

    Content: {content_preview}
    Sections: {section_titles}

    Assess these factors:
    1. Technical term density (count specialized terms)
    2. Section complexity (nesting depth, count)
    3. Cross-domain references (links between topics)
    4. Abstraction level (concrete vs conceptual)
    5. Query ambiguity potential (how many ways to ask about this?)

    Output JSON:
    {
        "difficulty": "trivial|easy|medium|hard|adversarial",
        "factors": {
            "technical_density": "low|medium|high",
            "structure_complexity": "simple|moderate|complex",
            "cross_references": "none|some|many",
            "abstraction": "concrete|mixed|abstract"
        },
        "expected_retrieval_score": 0.X,
        "rationale": "..."
    }
    """
)

Domain Tagger Agent

Task(
    subagent_type="data-pipeline-engineer",
    prompt="""DOMAIN TAGGING

    Extract domain tags for this content:

    Content: {content_preview}
    Source: {source_url}

    Primary domains (pick 1-2):
    - ai-ml (LLM, agents, RAG, embeddings, LangGraph)
    - backend (FastAPI, PostgreSQL, APIs, microservices)
    - frontend (React, TypeScript, UI/UX)
    - devops (Docker, K8s, CI/CD, infrastructure)
    - security (auth, OWASP, encryption)
    - databases (SQL, NoSQL, vector DBs)
    - testing (pytest, playwright, TDD)

    Secondary tags (pick 3-5):
    - Specific technologies mentioned
    - Patterns/concepts covered
    - Use cases addressed

    Output JSON:
    {
        "primary_domains": ["ai-ml", "backend"],
        "tags": ["langraph", "agents", "tool-use", "fastapi"],
        "confidence": 0.X
    }
    """
)

Query Generator Agent

Task(
    subagent_type="test-generator",
    prompt="""TEST QUERY GENERATION

    Generate test queries for this golden dataset document:

    Document ID: {document_id}
    Title: {title}
    Sections: {section_titles}
    Content preview: {content_preview}

    Generate 3-5 test queries with varied difficulty:

    1. At least 1 TRIVIAL query (exact keyword match)
    2. At least 1 EASY query (synonyms, common terms)
    3. At least 1 MEDIUM query (paraphrased intent)
    4. Optional: 1 HARD query (cross-section reasoning)

    For each query specify:
    - Query text
    - Expected sections to match
    - Difficulty level
    - Minimum expected score

    Output JSON:
    {
        "queries": [
            {
                "id": "q-{doc-id}-{num}",
                "query": "How to implement X with Y?",
                "difficulty": "medium",
                "expected_chunks": ["section-id-1", "section-id-2"],
                "min_score": 0.55,
                "modes": ["semantic", "hybrid"],
                "category": "specific",
                "description": "Tests retrieval of X implementation details"
            }
        ]
    }
    """
)

Consensus Aggregation

Aggregation Logic

from dataclasses import dataclass
from typing import Literal

@dataclass
class CurationConsensus:
    """Aggregated result from multi-agent analysis."""

    quality_score: float  # Weighted average (0-1)
    confidence: float     # Agent agreement (0-1)
    decision: Literal["include", "review", "exclude"]

    # Individual scores
    accuracy: float
    coherence: float
    depth: float
    relevance: float

    # Classification results
    content_type: str
    difficulty: str
    tags: list[str]

    # Generated queries
    suggested_queries: list[dict]

    # Warnings
    warnings: list[str]

def aggregate_results(
    quality_result: dict,
    difficulty_result: dict,
    domain_result: dict,
    query_result: dict,
) -> CurationConsensus:
    """Aggregate multi-agent results into consensus."""

    # Calculate weighted quality score
    q = quality_result
    quality_score = (
        q["accuracy"]["score"] * 0.25 +
        q["coherence"]["score"] * 0.20 +
        q["depth"]["score"] * 0.25 +
        q["relevance"]["score"] * 0.30
    )

    # Calculate confidence (variance-based)
    scores = [
        q["accuracy"]["score"],
        q["coherence"]["score"],
        q["depth"]["score"],
        q["relevance"]["score"],
    ]
    variance = sum((s - quality_score)**2 for s in scores) / len(scores)
    confidence = 1.0 - min(variance * 4, 1.0)

    # Decision thresholds
    if quality_score >= 0.75 and confidence >= 0.7:
        decision = "include"
    elif quality_score >= 0.55:
        decision = "review"
    else:
        decision = "exclude"

    # Collect warnings
    warnings = []
    if q["accuracy"]["score"] < 0.6:
        warnings.append("Low accuracy score - verify technical claims")
    if q["relevance"]["score"] < 0.7:
        warnings.append("Low relevance - may be off-topic for OrchestKit")
    if domain_result["confidence"] < 0.7:
        warnings.append("Low confidence in domain classification")

    return CurationConsensus(
        quality_score=quality_score,
        confidence=confidence,
        decision=decision,
        accuracy=q["accuracy"]["score"],
        coherence=q["coherence"]["score"],
        depth=q["depth"]["score"],
        relevance=q["relevance"]["score"],
        content_type=difficulty_result.get("content_type", "article"),
        difficulty=difficulty_result["difficulty"],
        tags=domain_result["tags"],
        suggested_queries=query_result["queries"],
        warnings=warnings,
    )

Langfuse Integration

Trace Structure

# Langfuse trace for curation workflow
trace = langfuse.trace(
    name="golden-dataset-curation",
    metadata={
        "source_url": url,
        "document_id": doc_id,
    }
)

# Spans for each agent
with trace.span(name="fetch_content") as span:
    content = fetch_url(url)
    span.update(output={"length": len(content)})

with trace.span(name="quality_evaluation") as span:
    quality_result = await run_quality_agent(content)
    span.update(output=quality_result)
    # Log individual dimension scores
    trace.score(name="accuracy", value=quality_result["accuracy"]["score"])
    trace.score(name="coherence", value=quality_result["coherence"]["score"])
    trace.score(name="depth", value=quality_result["depth"]["score"])
    trace.score(name="relevance", value=quality_result["relevance"]["score"])

# Final aggregated score
trace.score(name="quality_total", value=consensus.quality_score)
trace.event(
    name="curation_decision",
    metadata={"decision": consensus.decision}
)

Prompt Management

All curation prompts are managed in Langfuse:

Prompt NamePurposeTags
golden-content-classifierClassify content_typegolden-dataset, classification
golden-difficulty-classifierAssign difficultygolden-dataset, difficulty
golden-domain-taggerExtract tagsgolden-dataset, tagging
golden-query-generatorGenerate queriesgolden-dataset, query-gen

Backup Restore

Backup & Restore Golden Dataset

Backup Process

1. Export to JSON

# backend/scripts/backup_golden_dataset.py backup

async def backup_golden_dataset():
    """Export golden dataset to JSON."""

    async with get_session() as session:
        # Fetch all completed analyses
        query = (
            select(Analysis)
            .where(Analysis.status == "completed")
            .options(
                selectinload(Analysis.chunks),
                selectinload(Analysis.artifact)
            )
            .order_by(Analysis.created_at)
        )
        result = await session.execute(query)
        analyses = result.scalars().all()

        # Serialize
        backup_data = {
            "version": "1.0",
            "created_at": datetime.now(UTC).isoformat(),
            "metadata": {
                "total_analyses": len(analyses),
                "total_chunks": sum(len(a.chunks) for a in analyses),
                "total_artifacts": sum(1 for a in analyses if a.artifact)
            },
            "analyses": [serialize_analysis(a) for a in analyses]
        }

        # Write to file
        BACKUP_FILE.parent.mkdir(exist_ok=True)
        with open(BACKUP_FILE, "w") as f:
            json.dump(backup_data, f, indent=2, default=str)

        # Write metadata (quick stats)
        with open(METADATA_FILE, "w") as f:
            json.dump(backup_data["metadata"], f, indent=2)

        print(f"✅ Backup completed: {len(analyses)} analyses, {backup_data['metadata']['total_chunks']} chunks")

2. Serialize Without Embeddings

def serialize_chunk(chunk: Chunk) -> dict:
    """Serialize chunk WITHOUT embedding vector."""
    return {
        "id": str(chunk.id),
        "content": chunk.content,
        "section_title": chunk.section_title,
        "section_path": chunk.section_path,
        "content_type": chunk.content_type,
        "chunk_index": chunk.chunk_index
        # embedding excluded - regenerated on restore
    }

Why exclude embeddings?

  • Smaller backup files (415 chunks × 1024 dims × 4 bytes = 1.7 MB saved)
  • Model independence (can restore with different model)
  • Version control friendly (JSON diffs are meaningful)

Restore Process

1. Load and Validate Backup

async def restore_golden_dataset(replace: bool = False):
    """Restore golden dataset from JSON backup."""

    # Load backup file
    if not BACKUP_FILE.exists():
        raise FileNotFoundError(f"Backup file not found: {BACKUP_FILE}")

    with open(BACKUP_FILE) as f:
        backup_data = json.load(f)

    # Validate structure
    required_keys = ["version", "created_at", "metadata", "analyses"]
    for key in required_keys:
        if key not in backup_data:
            raise ValueError(f"Invalid backup: missing '{key}'")

    print(f"📦 Loading backup from {backup_data['created_at']}")
    print(f"   Analyses: {backup_data['metadata']['total_analyses']}")
    print(f"   Chunks: {backup_data['metadata']['total_chunks']}")

2. Clear Existing Data (Optional)

    async with get_session() as session:
        if replace:
            print("⚠️  Deleting existing data...")

            # Delete in correct order (respect foreign keys)
            await session.execute(delete(Chunk))
            await session.execute(delete(Artifact))
            await session.execute(delete(Analysis))
            await session.commit()

            print("✅ Existing data cleared")

3. Restore Analyses and Chunks

        from app.shared.services.embeddings import embed_text

        total_chunks = 0

        for idx, analysis_data in enumerate(backup_data["analyses"], 1):
            print(f"[{idx}/{len(backup_data['analyses'])}] Restoring {analysis_data['url'][:50]}...")

            # Create analysis
            analysis = Analysis(
                id=UUID(analysis_data["id"]),
                url=analysis_data["url"],
                content_type=analysis_data["content_type"],
                status=analysis_data["status"],
                created_at=datetime.fromisoformat(analysis_data["created_at"])
                # ... other fields ...
            )
            session.add(analysis)

            # Restore chunks with regenerated embeddings
            for chunk_data in analysis_data["chunks"]:
                # Generate embedding using CURRENT model
                embedding = await embed_text(chunk_data["content"])

                chunk = Chunk(
                    id=UUID(chunk_data["id"]),
                    analysis_id=analysis.id,
                    content=chunk_data["content"],
                    embedding=embedding,  # Freshly generated
                    section_title=chunk_data.get("section_title"),
                    section_path=chunk_data.get("section_path"),
                    content_type=chunk_data["content_type"],
                    chunk_index=chunk_data["chunk_index"]
                )
                session.add(chunk)
                total_chunks += 1

            # Restore artifact
            if analysis_data.get("artifact"):
                artifact_data = analysis_data["artifact"]
                artifact = Artifact(
                    id=UUID(artifact_data["id"]),
                    analysis_id=analysis.id,
                    summary=artifact_data["summary"],
                    # ... other fields ...
                )
                session.add(artifact)

            # Commit every 10 analyses (avoid huge transactions)
            if idx % 10 == 0:
                await session.commit()

        # Final commit
        await session.commit()

        print(f"✅ Restore completed: {len(backup_data['analyses'])} analyses, {total_chunks} chunks")

4. Verify Restore

        # Verify counts match
        analysis_count = await session.scalar(select(func.count(Analysis.id)))
        chunk_count = await session.scalar(select(func.count(Chunk.id)))

        assert analysis_count == backup_data["metadata"]["total_analyses"]
        assert chunk_count == backup_data["metadata"]["total_chunks"]

        print("✅ Verification passed")

CLI Commands

cd backend

# Backup
poetry run python scripts/backup_golden_dataset.py backup

# Restore (add to existing data)
poetry run python scripts/backup_golden_dataset.py restore

# Restore (replace all data - DESTRUCTIVE!)
poetry run python scripts/backup_golden_dataset.py restore --replace

# Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify

Regenerating Embeddings

Why regenerate?

  • Embedding models improve over time (Voyage AI v1 → v2)
  • Ensures consistency with current production model
  • Smaller backup files

Process:

from app.shared.services.embeddings import embed_text

async def regenerate_embeddings():
    """Regenerate embeddings for all chunks."""

    async with get_session() as session:
        # Fetch all chunks
        query = select(Chunk).order_by(Chunk.id)
        result = await session.execute(query)
        chunks = result.scalars().all()

        print(f"Regenerating embeddings for {len(chunks)} chunks...")

        for idx, chunk in enumerate(chunks, 1):
            # Generate new embedding
            embedding = await embed_text(chunk.content)

            # Update chunk
            chunk.embedding = embedding

            if idx % 50 == 0:
                await session.commit()
                print(f"  Progress: {idx}/{len(chunks)}")

        await session.commit()
        print("✅ Embeddings regenerated")

Runtime: ~415 chunks × 200ms = ~83 seconds


SQL Dump (Alternative)

Create SQL Dump

# Dump only golden dataset tables
pg_dump $DATABASE_URL \
  --table=analyses \
  --table=chunks \
  --table=artifacts \
  --data-only \
  --file=backend/data/golden_dataset_dump.sql

# ~5 MB for 98 analyses + 415 chunks (includes embeddings)

Restore from SQL Dump

# Restore SQL dump
psql $DATABASE_URL < backend/data/golden_dataset_dump.sql

Pros:

  • Fast (includes embeddings, no regeneration)
  • Exact replica

Cons:

  • Not version controlled (too large, binary)
  • DB version dependent
  • No easy inspection

OrchestKit uses JSON (version controlled), SQL dump for local snapshots only.


Error Handling

async def restore_with_error_handling():
    """Restore with proper error handling."""

    try:
        await restore_golden_dataset(replace=True)
    except FileNotFoundError as e:
        print(f"❌ Backup file not found: {e}")
        print(f"   Expected: {BACKUP_FILE}")
        return False
    except ValueError as e:
        print(f"❌ Invalid backup format: {e}")
        return False
    except Exception as e:
        print(f"❌ Restore failed: {e}")
        # Rollback handled by async context manager
        return False

    return True

References

  • OrchestKit: backend/scripts/backup_golden_dataset.py
  • OrchestKit: backend/data/golden_dataset_backup.json

Quality Metrics

Quality Metrics and Coverage Analysis

Metrics and analysis patterns for golden dataset quality.

Coverage Analysis

Gap Detection

def analyze_coverage_gaps(
    documents: list[dict],
    queries: list[dict],
) -> dict:
    """Analyze dataset coverage and identify gaps."""

    # Content type distribution
    content_types = {}
    for doc in documents:
        ct = doc.get("content_type", "unknown")
        content_types[ct] = content_types.get(ct, 0) + 1

    # Domain/tag distribution
    all_tags = []
    for doc in documents:
        all_tags.extend(doc.get("tags", []))
    tag_counts = {}
    for tag in all_tags:
        tag_counts[tag] = tag_counts.get(tag, 0) + 1

    # Difficulty distribution
    difficulties = {}
    for query in queries:
        diff = query.get("difficulty", "unknown")
        difficulties[diff] = difficulties.get(diff, 0) + 1

    # Identify gaps
    gaps = []

    # Check content type balance
    total_docs = len(documents)
    if content_types.get("tutorial", 0) / total_docs < 0.15:
        gaps.append("Under-represented: tutorials (<15%)")
    if content_types.get("research_paper", 0) / total_docs < 0.05:
        gaps.append("Under-represented: research papers (<5%)")

    # Check domain coverage
    expected_domains = ["ai-ml", "backend", "frontend", "devops", "security"]
    for domain in expected_domains:
        if tag_counts.get(domain, 0) < 5:
            gaps.append(f"Under-represented domain: {domain} (<5 docs)")

    # Check difficulty balance
    total_queries = len(queries)
    if difficulties.get("hard", 0) / total_queries < 0.10:
        gaps.append("Under-represented: hard queries (<10%)")
    if difficulties.get("adversarial", 0) / total_queries < 0.05:
        gaps.append("Under-represented: adversarial queries (<5%)")

    return {
        "content_type_distribution": content_types,
        "tag_distribution": dict(sorted(tag_counts.items(), key=lambda x: -x[1])[:20]),
        "difficulty_distribution": difficulties,
        "gaps": gaps,
        "total_documents": total_docs,
        "total_queries": total_queries,
    }

Validation Workflow

Pre-Addition Validation

async def validate_before_add(
    document: dict,
    existing_documents: list[dict],
    existing_queries: list[dict],
    source_url_map: dict[str, str],
    embedding_service,
) -> dict:
    """Run full validation before adding document.

    Returns:
        {
            "valid": bool,
            "errors": list[str],  # Blocking issues
            "warnings": list[str],  # Non-blocking issues
            "duplicate_check": {
                "is_duplicate": bool,
                "similar_to": str | None,
                "similarity": float | None,
            }
        }
    """
    errors = []
    warnings = []

    # 1. Schema validation
    schema_errors = validate_schema(document)
    errors.extend(schema_errors)

    # 2. URL validation
    url_valid, url_msg = validate_url(document.get("source_url", ""))
    if not url_valid:
        errors.append(url_msg)

    # 3. URL duplicate check
    url_dup = check_url_duplicate(document.get("source_url", ""), source_url_map)
    if url_dup:
        errors.append(f"URL already exists in dataset as: {url_dup}")

    # 4. Content quality
    quality_warnings = validate_content_quality(document)
    warnings.extend(quality_warnings)

    # 5. Semantic duplicate check
    content = " ".join(
        s.get("content", "") for s in document.get("sections", [])
    )
    existing_embeddings = await load_existing_embeddings(existing_documents)
    dup_result = await check_duplicate(
        content, existing_embeddings, embedding_service
    )

    duplicate_check = {
        "is_duplicate": dup_result is not None,
        "similar_to": dup_result[0] if dup_result else None,
        "similarity": dup_result[1] if dup_result else None,
    }

    if dup_result and dup_result[1] >= 0.90:
        errors.append(
            f"Content too similar to existing document: {dup_result[0]} "
            f"(similarity: {dup_result[1]:.2f})"
        )
    elif dup_result and dup_result[1] >= 0.80:
        warnings.append(
            f"Content similar to existing document: {dup_result[0]} "
            f"(similarity: {dup_result[1]:.2f})"
        )

    return {
        "valid": len(errors) == 0,
        "errors": errors,
        "warnings": warnings,
        "duplicate_check": duplicate_check,
    }

Full Dataset Validation

async def validate_full_dataset() -> dict:
    """Run comprehensive validation on entire dataset.

    Use this for:
    - Pre-commit hooks
    - CI/CD validation
    - Periodic integrity checks
    """
    from backend.tests.smoke.retrieval.fixtures.loader import FixtureLoader

    loader = FixtureLoader(use_expanded=True)
    documents = loader.load_documents()
    queries = loader.load_queries()
    source_url_map = loader.load_source_url_map()

    all_errors = []
    all_warnings = []

    # 1. Schema validation for all documents
    for doc in documents:
        errors = validate_schema(doc)
        all_errors.extend([f"[{doc['id']}] {e}" for e in errors])

    # 2. Unique ID validation
    id_errors = validate_unique_ids(documents, queries)
    all_errors.extend(id_errors)

    # 3. Referential integrity
    ref_errors = validate_references(documents, queries)
    all_errors.extend(ref_errors)

    # 4. URL validation
    for doc in documents:
        valid, msg = validate_url(doc.get("source_url", ""))
        if not valid:
            all_errors.append(f"[{doc['id']}] {msg}")

    # 5. Difficulty distribution
    dist_warnings = validate_difficulty_distribution(queries)
    all_warnings.extend(dist_warnings)

    # 6. Coverage analysis
    coverage = analyze_coverage_gaps(documents, queries)
    all_warnings.extend(coverage["gaps"])

    return {
        "valid": len(all_errors) == 0,
        "errors": all_errors,
        "warnings": all_warnings,
        "coverage": coverage,
        "stats": {
            "documents": len(documents),
            "queries": len(queries),
            "sections": sum(len(d.get("sections", [])) for d in documents),
        }
    }

CLI Integration

Validation Commands

# Validate specific document
poetry run python scripts/data/add_to_golden_dataset.py validate \
    --document-id "new-doc-id"

# Validate full dataset
poetry run python scripts/data/add_to_golden_dataset.py validate-all

# Check for duplicates
poetry run python scripts/data/add_to_golden_dataset.py check-duplicate \
    --url "https://example.com/article"

# Analyze coverage gaps
poetry run python scripts/data/add_to_golden_dataset.py coverage

Pre-Commit Hook

#!/bin/bash
# .claude/hooks/pretool/bash/validate-golden-dataset.sh

# Only run if golden dataset files changed
CHANGED_FILES=$(git diff --cached --name-only)

if echo "$CHANGED_FILES" | grep -q "fixtures/documents_expanded.json\|fixtures/queries.json\|fixtures/source_url_map.json"; then
    echo "Validating golden dataset changes..."

    cd backend
    poetry run python scripts/data/add_to_golden_dataset.py validate-all

    if [ $? -ne 0 ]; then
        echo "Golden dataset validation failed!"
        echo "Fix errors before committing."
        exit 1
    fi

    echo "Golden dataset validation passed"
fi

Selection Criteria

Selection Criteria

Content classification and difficulty stratification for golden datasets.

Content Type Classification

Supported Types

TypeDescriptionQuality Focus
articleTechnical articles, blog postsDepth, accuracy, actionability
tutorialStep-by-step guidesCompleteness, clarity, code quality
research_paperAcademic papers, whitepapersRigor, citations, methodology
documentationAPI docs, reference materialsAccuracy, completeness, examples
video_transcriptTranscribed video contentStructure, coherence, key points
code_repositoryREADME, code analysisCode quality, documentation

Classification Criteria

# Content Type Decision Tree
def classify_content_type(content: str, source_url: str) -> str:
    """Classify content type based on structure and source."""

    # URL-based hints
    if "arxiv.org" in source_url or "papers" in source_url:
        return "research_paper"
    if "docs." in source_url or "/api/" in source_url:
        return "documentation"
    if "github.com" in source_url:
        return "code_repository"

    # Content-based analysis
    if has_step_by_step_structure(content):
        return "tutorial"
    if has_academic_structure(content):  # Abstract, methodology, results
        return "research_paper"

    # Default
    return "article"

Difficulty Classification

Stratification Levels

LevelSemantic ComplexityExpected ScoreCharacteristics
trivialDirect keyword match>0.85Technical terms, exact phrases
easyCommon synonyms>0.70Well-known concepts, slight variations
mediumParaphrased intent>0.55Conceptual queries, multi-topic
hardMulti-hop reasoning>0.40Cross-domain, comparative analysis
adversarialEdge casesGraceful degradationRobustness tests, off-domain

Classification Factors

def classify_difficulty(document: dict) -> str:
    """Classify document difficulty for retrieval testing."""

    factors = {
        "technical_density": count_technical_terms(document["content"]),
        "section_count": len(document.get("sections", [])),
        "cross_references": count_cross_references(document),
        "abstraction_level": assess_abstraction(document),
        "domain_specificity": assess_domain_specificity(document),
    }

    # Scoring rubric
    score = 0
    if factors["technical_density"] > 50:
        score += 2
    if factors["section_count"] > 10:
        score += 1
    if factors["cross_references"] > 5:
        score += 2
    if factors["abstraction_level"] == "high":
        score += 2

    # Map score to difficulty
    if score <= 2:
        return "trivial"
    elif score <= 4:
        return "easy"
    elif score <= 6:
        return "medium"
    elif score <= 8:
        return "hard"
    else:
        return "adversarial"

Quality Evaluation Dimensions

1. Accuracy (Weight: 0.25)

What it measures: Factual correctness, up-to-date information

Evaluation criteria:

  • Technical claims are verifiable
  • Code examples are syntactically correct
  • No outdated information (check dates, versions)
  • Sources/citations where applicable

Thresholds:

  • Perfect: 0.95-1.0 (all claims verifiable)
  • Acceptable: 0.70-0.94 (minor inaccuracies)
  • Failing: <0.70 (significant errors)

2. Coherence (Weight: 0.20)

What it measures: Logical flow, structure, readability

Evaluation criteria:

  • Clear introduction and conclusion
  • Logical section ordering
  • Smooth transitions between topics
  • Consistent terminology

Thresholds:

  • Perfect: 0.90-1.0 (professional quality)
  • Acceptable: 0.60-0.89 (readable but rough)
  • Failing: <0.60 (confusing structure)

3. Depth (Weight: 0.25)

What it measures: Thoroughness, detail level, comprehensiveness

Evaluation criteria:

  • Covers topic comprehensively
  • Includes edge cases and caveats
  • Provides context and background
  • Appropriate level of detail for audience

Thresholds:

  • Perfect: 0.90-1.0 (exhaustive coverage)
  • Acceptable: 0.55-0.89 (covers main points)
  • Failing: <0.55 (superficial treatment)

4. Relevance (Weight: 0.30)

What it measures: Alignment with OrchestKit's technical domains

Target domains:

  • AI/ML (LangGraph, RAG, agents, embeddings)
  • Backend (FastAPI, PostgreSQL, APIs)
  • Frontend (React, TypeScript)
  • DevOps (Docker, Kubernetes, CI/CD)
  • Security (OWASP, authentication)

Thresholds:

  • Perfect: 0.95-1.0 (core domain, highly relevant)
  • Acceptable: 0.70-0.94 (related domain)
  • Failing: <0.70 (off-topic for OrchestKit)

Best Practices

Quality Thresholds

# Recommended thresholds for golden dataset inclusion
minimum_quality_score: 0.70
minimum_confidence: 0.65
required_tags: 2  # At least 2 domain tags
required_queries: 3  # At least 3 test queries

Coverage Balance

Maintain balanced coverage across:

  • Content types (don't over-index on articles)
  • Difficulty levels (need trivial AND hard)
  • Domains (spread across AI/ML, backend, frontend, etc.)

Duplicate Prevention

Before adding:

  1. Check URL against existing source_url_map.json
  2. Run semantic similarity against existing document embeddings
  3. Warn if >80% similar to existing document

Provenance Tracking

Always record:

  • Source URL (canonical)
  • Curation date
  • Agent scores (for audit trail)
  • Langfuse trace ID

Storage Patterns

Storage Patterns

Backup strategies and storage formats for golden datasets.

Backup Strategies

Pros:

  • Version controlled (commit to git)
  • Human-readable (easy to inspect)
  • Portable (works across DB versions)
  • Incremental diffs (see what changed)

Cons:

  • Must regenerate embeddings on restore
  • Larger file size than SQL dump

OrchestKit uses JSON backup.

Strategy 2: SQL Dump

Pros:

  • Fast restore (includes embeddings)
  • Exact replica (binary-identical)
  • Native PostgreSQL format

Cons:

  • Not version controlled (binary format)
  • DB version dependent
  • No easy inspection

Use case: Local snapshots, not version control.

Backup Format

{
  "version": "1.0",
  "created_at": "2025-12-19T10:30:00Z",
  "metadata": {
    "total_analyses": 98,
    "total_chunks": 415,
    "total_artifacts": 98
  },
  "analyses": [
    {
      "id": "550e8400-e29b-41d4-a716-446655440000",
      "url": "https://docs.python.org/3/library/asyncio.html",
      "content_type": "documentation",
      "status": "completed",
      "created_at": "2025-11-15T08:20:00Z",
      "findings": [
        {
          "agent": "security_agent",
          "category": "best_practices",
          "content": "Always use asyncio.run() for top-level entry point",
          "confidence": 0.92
        }
      ],
      "chunks": [
        {
          "id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
          "content": "asyncio is a library to write concurrent code...",
          "section_title": "Introduction to asyncio",
          "section_path": "docs/python/asyncio/intro.md",
          "content_type": "paragraph",
          "chunk_index": 0
          // Note: embedding NOT included (regenerated on restore)
        }
      ],
      "artifact": {
        "id": "a1b2c3d4-e5f6-4a5b-8c7d-9e8f7a6b5c4d",
        "summary": "Comprehensive guide to asyncio...",
        "key_findings": ["..."],
        "metadata": {}
      }
    }
  ]
}

Key Design Decisions:

  • Embeddings excluded (regenerate on restore with current model)
  • Nested structure (analyses -> chunks -> artifacts)
  • Metadata for validation
  • ISO timestamps for reproducibility

Backup Implementation

# backend/scripts/backup_golden_dataset.py

import asyncio
import json
from datetime import datetime, UTC
from pathlib import Path
from sqlalchemy import select
from app.db.session import get_session
from app.db.models import Analysis, Chunk, Artifact

BACKUP_DIR = Path("backend/data")
BACKUP_FILE = BACKUP_DIR / "golden_dataset_backup.json"
METADATA_FILE = BACKUP_DIR / "golden_dataset_metadata.json"

async def backup_golden_dataset():
    """Backup golden dataset to JSON."""

    async with get_session() as session:
        # Fetch all completed analyses
        query = (
            select(Analysis)
            .where(Analysis.status == "completed")
            .order_by(Analysis.created_at)
        )
        result = await session.execute(query)
        analyses = result.scalars().all()

        # Serialize to JSON
        backup_data = {
            "version": "1.0",
            "created_at": datetime.now(UTC).isoformat(),
            "metadata": {
                "total_analyses": len(analyses),
                "total_chunks": sum(len(a.chunks) for a in analyses),
                "total_artifacts": len([a for a in analyses if a.artifact])
            },
            "analyses": [
                serialize_analysis(a) for a in analyses
            ]
        }

        # Write backup file
        BACKUP_DIR.mkdir(exist_ok=True)
        with open(BACKUP_FILE, "w") as f:
            json.dump(backup_data, f, indent=2, default=str)

        # Write metadata file (quick stats)
        with open(METADATA_FILE, "w") as f:
            json.dump(backup_data["metadata"], f, indent=2)

        print(f"Backup completed: {BACKUP_FILE}")
        print(f"   Analyses: {backup_data['metadata']['total_analyses']}")
        print(f"   Chunks: {backup_data['metadata']['total_chunks']}")

def serialize_analysis(analysis: Analysis) -> dict:
    """Serialize analysis to dict."""
    return {
        "id": str(analysis.id),
        "url": analysis.url,
        "content_type": analysis.content_type,
        "status": analysis.status,
        "created_at": analysis.created_at.isoformat(),
        "findings": [serialize_finding(f) for f in analysis.findings],
        "chunks": [serialize_chunk(c) for c in analysis.chunks],
        "artifact": serialize_artifact(analysis.artifact) if analysis.artifact else None
    }

def serialize_chunk(chunk: Chunk) -> dict:
    """Serialize chunk (WITHOUT embedding)."""
    return {
        "id": str(chunk.id),
        "content": chunk.content,
        "section_title": chunk.section_title,
        "section_path": chunk.section_path,
        "content_type": chunk.content_type,
        "chunk_index": chunk.chunk_index
        # embedding excluded (regenerate on restore)
    }

CLI Usage

cd backend

# Backup golden dataset
poetry run python scripts/backup_golden_dataset.py backup

# Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify

# Restore from backup (WARNING: Deletes existing data)
poetry run python scripts/backup_golden_dataset.py restore --replace

# Restore without deleting (adds to existing)
poetry run python scripts/backup_golden_dataset.py restore

CI/CD Integration

Automated Backups

# .github/workflows/backup-golden-dataset.yml
name: Backup Golden Dataset

on:
  schedule:
    - cron: '0 2 * * 0'  # Weekly on Sunday at 2am
  workflow_dispatch:  # Manual trigger

jobs:
  backup:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          cd backend
          poetry install

      - name: Run backup
        env:
          DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py backup

      - name: Commit backup
        run: |
          git config user.name "GitHub Actions"
          git config user.email "actions@github.com"
          git add backend/data/golden_dataset_backup.json
          git add backend/data/golden_dataset_metadata.json
          git commit -m "chore: automated golden dataset backup"
          git push

Validation Contracts

Data Validation & Contracts

The URL Contract

Rule: Golden dataset analyses MUST store real canonical URLs, not placeholders.

Why This Matters

# WRONG - Placeholder URL
analysis.url = "https://orchestkit.dev/placeholder/doc-123"

# Problems:
# 1. Can't re-fetch content if needed
# 2. Can't verify source hasn't changed
# 3. No audit trail for data provenance
# 4. Breaks restore on different domains

# CORRECT - Real canonical URL
analysis.url = "https://docs.python.org/3/library/asyncio.html"

# Benefits:
# 1. Can re-fetch if embeddings model changes
# 2. Can validate content hasn't been updated
# 3. Clear data provenance
# 4. Works across environments

Validation Check

async def check_url_contract() -> list[str]:
    """Find analyses with placeholder URLs."""

    async with get_session() as session:
        query = select(Analysis).where(
            Analysis.url.like("%orchestkit.dev%") |
            Analysis.url.like("%placeholder%") |
            Analysis.url.like("%example.com%") |
            Analysis.url.like("%test.local%")
        )
        result = await session.execute(query)
        invalid = result.scalars().all()

        if invalid:
            print(f"❌ Found {len(invalid)} analyses with placeholder URLs:")
            for analysis in invalid:
                print(f"   - {analysis.id}: {analysis.url}")
            return [str(a.id) for a in invalid]

        print("✅ URL contract validated: All URLs are canonical")
        return []

Data Integrity Checks

1. Count Validation

async def validate_counts(expected_metadata: dict) -> dict:
    """Verify counts match expected values."""

    async with get_session() as session:
        actual = {
            "analyses": await session.scalar(select(func.count(Analysis.id))),
            "chunks": await session.scalar(select(func.count(Chunk.id))),
            "artifacts": await session.scalar(select(func.count(Artifact.id)))
        }

        expected = {
            "analyses": expected_metadata["total_analyses"],
            "chunks": expected_metadata["total_chunks"],
            "artifacts": expected_metadata["total_artifacts"]
        }

        errors = []
        for key in ["analyses", "chunks", "artifacts"]:
            if actual[key] != expected[key]:
                errors.append(f"{key}: expected {expected[key]}, got {actual[key]}")

        return {
            "valid": len(errors) == 0,
            "errors": errors,
            "actual": actual,
            "expected": expected
        }

2. Embedding Validation

async def validate_embeddings() -> dict:
    """Check all chunks have embeddings."""

    async with get_session() as session:
        # Find chunks without embeddings
        query = select(Chunk).where(Chunk.embedding.is_(None))
        result = await session.execute(query)
        missing = result.scalars().all()

        if missing:
            return {
                "valid": False,
                "error": f"Found {len(missing)} chunks without embeddings",
                "chunk_ids": [str(c.id) for c in missing]
            }

        # Check embedding dimensions
        query = select(Chunk).limit(1)
        result = await session.execute(query)
        sample = result.scalar_one()

        if len(sample.embedding) != 1024:
            return {
                "valid": False,
                "error": f"Invalid embedding dimensions: {len(sample.embedding)} (expected 1024)"
            }

        return {"valid": True, "message": "All chunks have valid embeddings"}

3. Orphaned Data Check

async def check_orphaned_data() -> dict:
    """Find orphaned chunks (no parent analysis)."""

    async with get_session() as session:
        # Find chunks without parent analysis
        query = (
            select(Chunk)
            .outerjoin(Analysis, Chunk.analysis_id == Analysis.id)
            .where(Analysis.id.is_(None))
        )
        result = await session.execute(query)
        orphaned = result.scalars().all()

        if orphaned:
            return {
                "valid": False,
                "warning": f"Found {len(orphaned)} orphaned chunks",
                "chunk_ids": [str(c.id) for c in orphaned]
            }

        return {"valid": True, "message": "No orphaned data found"}

4. Duplicate Check

async def check_duplicates() -> dict:
    """Find duplicate analyses (same URL)."""

    async with get_session() as session:
        # Find URLs that appear more than once
        query = (
            select(Analysis.url, func.count(Analysis.id).label("count"))
            .group_by(Analysis.url)
            .having(func.count(Analysis.id) > 1)
        )
        result = await session.execute(query)
        duplicates = result.all()

        if duplicates:
            return {
                "valid": False,
                "warning": f"Found {len(duplicates)} duplicate URLs",
                "urls": [(url, count) for url, count in duplicates]
            }

        return {"valid": True, "message": "No duplicates found"}

Comprehensive Validation

async def verify_golden_dataset() -> dict:
    """Run all validation checks."""

    print("🔍 Validating golden dataset...")

    # Load expected metadata
    with open(METADATA_FILE) as f:
        expected_metadata = json.load(f)

    results = {
        "timestamp": datetime.now(UTC).isoformat(),
        "checks": {}
    }

    # 1. URL Contract
    print("\n1. Checking URL contract...")
    invalid_urls = await check_url_contract()
    results["checks"]["url_contract"] = {
        "passed": len(invalid_urls) == 0,
        "invalid_count": len(invalid_urls),
        "invalid_ids": invalid_urls
    }

    # 2. Count Validation
    print("\n2. Validating counts...")
    count_result = await validate_counts(expected_metadata)
    results["checks"]["counts"] = count_result

    # 3. Embedding Validation
    print("\n3. Validating embeddings...")
    embedding_result = await validate_embeddings()
    results["checks"]["embeddings"] = embedding_result

    # 4. Orphaned Data
    print("\n4. Checking for orphaned data...")
    orphan_result = await check_orphaned_data()
    results["checks"]["orphaned_data"] = orphan_result

    # 5. Duplicates
    print("\n5. Checking for duplicates...")
    duplicate_result = await check_duplicates()
    results["checks"]["duplicates"] = duplicate_result

    # Overall result
    all_passed = all(
        check.get("valid") or check.get("passed")
        for check in results["checks"].values()
    )

    results["overall"] = {
        "passed": all_passed,
        "total_checks": len(results["checks"]),
        "passed_checks": sum(
            1 for check in results["checks"].values()
            if check.get("valid") or check.get("passed")
        )
    }

    # Print summary
    print("\n" + "="*50)
    if all_passed:
        print("✅ All validation checks passed")
    else:
        print("❌ Validation failed")
        for name, check in results["checks"].items():
            if not (check.get("valid") or check.get("passed")):
                print(f"   - {name}: {check.get('error') or check.get('warning')}")

    return results

Pre-Deployment Checklist

# Run before deploying to production

cd backend

# 1. Backup current data
poetry run python scripts/backup_golden_dataset.py backup

# 2. Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify

# 3. Run retrieval quality tests
poetry run pytest tests/integration/test_retrieval_quality.py

# 4. Check for regressions
# Expected: 91.6% pass rate, 0.777 MRR
# If lower, investigate before deploying

Automated Validation (CI)

# .github/workflows/validate-golden-dataset.yml
name: Validate Golden Dataset

on:
  pull_request:
    paths:
      - 'backend/data/golden_dataset_backup.json'
  schedule:
    - cron: '0 8 * * 1'  # Weekly on Monday 8am

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          cd backend
          poetry install

      - name: Start PostgreSQL
        run: docker compose up -d postgres

      - name: Run migrations
        run: |
          cd backend
          poetry run alembic upgrade head

      - name: Restore golden dataset
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py restore

      - name: Validate dataset
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py verify

      - name: Run retrieval tests
        run: |
          cd backend
          poetry run pytest tests/integration/test_retrieval_quality.py -v

References

  • OrchestKit: backend/scripts/backup_golden_dataset.py
  • OrchestKit: backend/tests/integration/test_retrieval_quality.py

Validation Rules

Validation Rules

Detailed validation rules for golden dataset integrity.

Rule 1: No Placeholder URLs

FORBIDDEN_URL_PATTERNS = [
    "orchestkit.dev",
    "placeholder",
    "example.com",
    "localhost",
    "127.0.0.1",
]

def validate_url(url: str) -> tuple[bool, str]:
    """Validate URL is not a placeholder."""
    for pattern in FORBIDDEN_URL_PATTERNS:
        if pattern in url.lower():
            return False, f"URL contains forbidden pattern: {pattern}"

    # Must be HTTPS (except for specific cases)
    if not url.startswith("https://"):
        if not url.startswith("http://arxiv.org"):  # arXiv redirects
            return False, "URL must use HTTPS"

    return True, "OK"

Rule 2: Unique Identifiers

def validate_unique_ids(documents: list[dict], queries: list[dict]) -> list[str]:
    """Ensure all IDs are unique across documents and queries."""
    errors = []

    # Document IDs
    doc_ids = [d["id"] for d in documents]
    if len(doc_ids) != len(set(doc_ids)):
        duplicates = [id for id in doc_ids if doc_ids.count(id) > 1]
        errors.append(f"Duplicate document IDs: {set(duplicates)}")

    # Query IDs
    query_ids = [q["id"] for q in queries]
    if len(query_ids) != len(set(query_ids)):
        duplicates = [id for id in query_ids if query_ids.count(id) > 1]
        errors.append(f"Duplicate query IDs: {set(duplicates)}")

    # Section IDs within documents
    for doc in documents:
        section_ids = [s["id"] for s in doc.get("sections", [])]
        if len(section_ids) != len(set(section_ids)):
            errors.append(f"Duplicate section IDs in document: {doc['id']}")

    return errors

Rule 3: Referential Integrity

def validate_references(documents: list[dict], queries: list[dict]) -> list[str]:
    """Ensure query expected_chunks reference valid section IDs."""
    errors = []

    # Build set of all valid section IDs
    valid_sections = set()
    for doc in documents:
        for section in doc.get("sections", []):
            valid_sections.add(section["id"])

    # Check query references
    for query in queries:
        for chunk_id in query.get("expected_chunks", []):
            if chunk_id not in valid_sections:
                errors.append(
                    f"Query {query['id']} references invalid section: {chunk_id}"
                )

    return errors

Rule 4: Content Quality

def validate_content_quality(document: dict) -> list[str]:
    """Validate document content meets quality standards."""
    warnings = []

    # Title length
    title = document.get("title", "")
    if len(title) < 10:
        warnings.append("Title too short (min 10 chars)")
    if len(title) > 200:
        warnings.append("Title too long (max 200 chars)")

    # Section content
    for section in document.get("sections", []):
        content = section.get("content", "")
        if len(content) < 50:
            warnings.append(f"Section {section['id']} content too short (min 50 chars)")
        if len(content) > 50000:
            warnings.append(f"Section {section['id']} content very long (>50k chars)")

    # Tags
    tags = document.get("tags", [])
    if len(tags) < 2:
        warnings.append("Too few tags (min 2)")
    if len(tags) > 10:
        warnings.append("Too many tags (max 10)")

    return warnings

Rule 5: Difficulty Distribution

def validate_difficulty_distribution(queries: list[dict]) -> list[str]:
    """Ensure balanced difficulty distribution."""
    warnings = []

    # Count by difficulty
    distribution = {}
    for query in queries:
        diff = query.get("difficulty", "unknown")
        distribution[diff] = distribution.get(diff, 0) + 1

    # Minimum requirements
    requirements = {
        "trivial": 3,
        "easy": 3,
        "medium": 5,  # Most common real-world case
        "hard": 3,
    }

    for level, min_count in requirements.items():
        actual = distribution.get(level, 0)
        if actual < min_count:
            warnings.append(
                f"Insufficient {level} queries: {actual}/{min_count}"
            )

    return warnings

Duplicate Detection

Semantic Similarity Check

import numpy as np
from typing import Optional

async def check_duplicate(
    new_content: str,
    existing_embeddings: list[tuple[str, np.ndarray]],
    embedding_service,
    threshold: float = 0.85,
) -> Optional[tuple[str, float]]:
    """Check if content is duplicate of existing document.

    Args:
        new_content: Content to check
        existing_embeddings: List of (doc_id, embedding) tuples
        embedding_service: Service to generate embeddings
        threshold: Similarity threshold for duplicate warning

    Returns:
        (doc_id, similarity) if duplicate found, None otherwise
    """
    # Generate embedding for new content
    new_embedding = await embedding_service.generate_embedding(
        text=new_content[:8000],  # Truncate for embedding
        normalize=True,
    )
    new_vec = np.array(new_embedding)

    # Compare against existing
    max_similarity = 0.0
    most_similar_doc = None

    for doc_id, existing_vec in existing_embeddings:
        # Cosine similarity (vectors are normalized)
        similarity = np.dot(new_vec, existing_vec)

        if similarity > max_similarity:
            max_similarity = similarity
            most_similar_doc = doc_id

    if max_similarity >= threshold:
        return (most_similar_doc, max_similarity)

    return None

URL Duplicate Check

def check_url_duplicate(
    new_url: str,
    source_url_map: dict[str, str],
) -> Optional[str]:
    """Check if URL already exists in dataset.

    Returns document ID if duplicate found.
    """
    # Normalize URL
    normalized = normalize_url(new_url)

    for doc_id, existing_url in source_url_map.items():
        if normalize_url(existing_url) == normalized:
            return doc_id

    return None

def normalize_url(url: str) -> str:
    """Normalize URL for comparison."""
    from urllib.parse import urlparse, urlunparse

    parsed = urlparse(url.lower())

    # Remove trailing slashes, www prefix
    netloc = parsed.netloc.replace("www.", "")
    path = parsed.path.rstrip("/")

    return urlunparse((
        parsed.scheme,
        netloc,
        path,
        "",  # params
        "",  # query (stripped)
        "",  # fragment
    ))

Versioning

Versioning and Recovery

Restore procedures, validation, and disaster recovery patterns.

Restore Implementation

Process Overview

  1. Load JSON backup
  2. Validate structure (version, required fields)
  3. Create analyses (without embeddings yet)
  4. Create chunks (without embeddings yet)
  5. Generate embeddings (using current embedding model)
  6. Create artifacts
  7. Verify integrity (counts, URL contract)

Regenerating Embeddings

async def restore_golden_dataset(replace: bool = False):
    """Restore golden dataset from JSON backup."""

    # Load backup
    with open(BACKUP_FILE) as f:
        backup_data = json.load(f)

    async with get_session() as session:
        if replace:
            # Delete existing data
            await session.execute(delete(Chunk))
            await session.execute(delete(Artifact))
            await session.execute(delete(Analysis))
            await session.commit()

        # Restore analyses and chunks
        from app.shared.services.embeddings import embed_text

        for analysis_data in backup_data["analyses"]:
            # Create analysis
            analysis = Analysis(
                id=UUID(analysis_data["id"]),
                url=analysis_data["url"],
                # ... other fields ...
            )
            session.add(analysis)

            # Create chunks with regenerated embeddings
            for chunk_data in analysis_data["chunks"]:
                # Regenerate embedding using CURRENT model
                embedding = await embed_text(chunk_data["content"])

                chunk = Chunk(
                    id=UUID(chunk_data["id"]),
                    analysis_id=analysis.id,
                    content=chunk_data["content"],
                    embedding=embedding,  # Freshly generated!
                    # ... other fields ...
                )
                session.add(chunk)

            await session.commit()

        print("Restore completed")

Why regenerate embeddings?

  • Embedding models improve over time
  • Ensures consistency with current model
  • Smaller backup files (exclude large vectors)

Validation

Validation Checklist

async def verify_golden_dataset() -> dict:
    """Verify golden dataset integrity."""

    errors = []
    warnings = []

    async with get_session() as session:
        # 1. Check counts
        analysis_count = await session.scalar(select(func.count(Analysis.id)))
        chunk_count = await session.scalar(select(func.count(Chunk.id)))
        artifact_count = await session.scalar(select(func.count(Artifact.id)))

        expected = load_metadata()
        if analysis_count != expected["total_analyses"]:
            errors.append(f"Analysis count mismatch: {analysis_count} vs {expected['total_analyses']}")

        # 2. Check URL contract
        query = select(Analysis).where(
            Analysis.url.like("%orchestkit.dev%") |
            Analysis.url.like("%placeholder%")
        )
        result = await session.execute(query)
        invalid_urls = result.scalars().all()

        if invalid_urls:
            errors.append(f"Found {len(invalid_urls)} analyses with placeholder URLs")

        # 3. Check embeddings exist
        query = select(Chunk).where(Chunk.embedding.is_(None))
        result = await session.execute(query)
        missing_embeddings = result.scalars().all()

        if missing_embeddings:
            errors.append(f"Found {len(missing_embeddings)} chunks without embeddings")

        # 4. Check orphaned chunks
        query = select(Chunk).outerjoin(Analysis).where(Analysis.id.is_(None))
        result = await session.execute(query)
        orphaned = result.scalars().all()

        if orphaned:
            warnings.append(f"Found {len(orphaned)} orphaned chunks")

        return {
            "valid": len(errors) == 0,
            "errors": errors,
            "warnings": warnings,
            "stats": {
                "analyses": analysis_count,
                "chunks": chunk_count,
                "artifacts": artifact_count
            }
        }

Best Practices

1. Version Control Backups

# Commit backups to git
git add backend/data/golden_dataset_backup.json
git commit -m "chore: golden dataset backup (98 analyses, 415 chunks)"

2. Validate Before Deployment

# Pre-deployment check
poetry run python scripts/backup_golden_dataset.py verify

# Should output:
# Validation passed
#    Analyses: 98
#    Chunks: 415
#    Artifacts: 98
#    No errors found

3. Test Restore in Staging

# Never test restore in production first!

# Staging environment
export DATABASE_URL=$STAGING_DATABASE_URL
poetry run python scripts/backup_golden_dataset.py restore --replace

# Run tests to verify
poetry run pytest tests/integration/test_retrieval_quality.py

4. Document Changes

// backend/data/golden_dataset_metadata.json
{
  "total_analyses": 98,
  "total_chunks": 415,
  "last_updated": "2025-12-19T10:30:00Z",
  "changes": [
    {
      "date": "2025-12-19",
      "action": "added",
      "count": 5,
      "description": "Added 5 new LangGraph tutorial analyses"
    },
    {
      "date": "2025-12-10",
      "action": "removed",
      "count": 2,
      "description": "Removed 2 outdated React 17 analyses"
    }
  ]
}

Disaster Recovery

Scenario 1: Accidental Deletion

# Oh no! Someone ran DELETE FROM analyses WHERE 1=1

# 1. Restore from backup
poetry run python scripts/backup_golden_dataset.py restore --replace

# 2. Verify
poetry run python scripts/backup_golden_dataset.py verify

# 3. Run tests
poetry run pytest tests/integration/test_retrieval_quality.py

Scenario 2: Database Migration Gone Wrong

# Migration corrupted data

# 1. Rollback migration
alembic downgrade -1

# 2. Restore from backup
poetry run python scripts/backup_golden_dataset.py restore --replace

# 3. Re-run migration (fixed)
alembic upgrade head

Scenario 3: New Environment Setup

# Fresh dev environment, need golden dataset

# 1. Clone repo (includes backup)
git clone https://github.com/your-org/orchestkit
cd orchestkit/backend

# 2. Setup DB
docker compose up -d postgres
alembic upgrade head

# 3. Restore golden dataset
poetry run python scripts/backup_golden_dataset.py restore

# 4. Verify
poetry run pytest tests/integration/test_retrieval_quality.py

Data Integrity Contracts

The URL Contract

Golden dataset analyses MUST store real canonical URLs, not placeholders.

# WRONG - Placeholder URL (breaks restore)
analysis.url = "https://orchestkit.dev/placeholder/123"

# CORRECT - Real canonical URL (enables re-fetch if needed)
analysis.url = "https://docs.python.org/3/library/asyncio.html"

Why this matters:

  • Enables re-fetching content if embeddings need regeneration
  • Allows validation that source content hasn't changed
  • Provides audit trail for data provenance

Verification:

# Check for placeholder URLs
def verify_url_contract(analyses: list[Analysis]) -> list[str]:
    """Find analyses with placeholder URLs."""
    invalid = []
    for analysis in analyses:
        if "orchestkit.dev" in analysis.url or "placeholder" in analysis.url:
            invalid.append(analysis.id)
    return invalid

Checklists (1)

Backup Restore Checklist

Golden Dataset Backup/Restore Checklist

Use this checklist to ensure safe, reliable backup and restoration of golden datasets


Pre-Backup Checklist

Environment Verification

  • Database connection verified

    psql -h localhost -p 5437 -U orchestkit -c "SELECT version();"
    # Expected: PostgreSQL 16.x
  • Database contains expected data

    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT COUNT(*) FROM analyses WHERE status = 'completed';"
    # Expected: 98 (or current golden dataset size)
  • Embeddings generated for all chunks

    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT COUNT(*) FROM analysis_chunks WHERE vector IS NULL;"
    # Expected: 0 (no chunks without embeddings)

Data Quality Validation

  • URL contract verified (no placeholder URLs)

    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT COUNT(*) FROM analyses WHERE url LIKE '%orchestkit.dev%';"
    # Expected: 0 (no placeholder URLs)
  • All analyses have artifacts

    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT COUNT(*) FROM analyses a
       LEFT JOIN artifacts ar ON a.id = ar.analysis_id
       WHERE ar.id IS NULL AND a.status = 'completed';"
    # Expected: 0 (all completed analyses have artifacts)
  • No orphaned chunks

    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT COUNT(*) FROM analysis_chunks c
       LEFT JOIN analyses a ON c.analysis_id = a.id
       WHERE a.id IS NULL;"
    # Expected: 0 (all chunks belong to an analysis)

Script Availability

  • Backup script exists

    ls -lh /Users/yonatangross/coding/OrchestKit/backend/scripts/backup_golden_dataset.py
    # Expected: File exists
  • Dependencies installed

    cd /Users/yonatangross/coding/OrchestKit/backend
    poetry install
    # Expected: All dependencies installed
  • Data directory exists

    mkdir -p /Users/yonatangross/coding/OrchestKit/backend/data
    ls -ld /Users/yonatangross/coding/OrchestKit/backend/data
    # Expected: Directory exists and is writable

Backup Execution Checklist

Run Backup

  • Execute backup command

    cd /Users/yonatangross/coding/OrchestKit/backend
    poetry run python scripts/backup_golden_dataset.py backup
  • Verify backup output shows success

    • "BACKUP COMPLETE (v2.0)" message displayed
    • Analyses count matches expected (98)
    • Artifacts count matches expected (98)
    • Chunks count matches expected (415)
    • Fixtures count matches expected (98 documents)
  • Check backup file created

    ls -lh /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json
    # Expected: ~2.5 MB file
  • Check metadata file created

    ls -lh /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_metadata.json
    # Expected: ~1 KB file

Verify Backup

  • Run verification command

    poetry run python scripts/backup_golden_dataset.py verify
  • Verify output shows valid backup

    • "BACKUP IS VALID" message displayed
    • Analyses count correct
    • Artifacts count correct
    • Chunks count correct
    • Fixtures included (documents, URL maps, queries)
    • Referential integrity: OK
    • All analyses have artifacts: OK
    • No placeholder URLs warning
  • Verify backup file is valid JSON

    cat /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json | jq '.'
    # Expected: Valid JSON, no parse errors
  • Check backup version

    cat /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json | \
      jq '.version'
    # Expected: "2.0"

Commit Backup

  • Stage backup files

    git add backend/data/golden_dataset_backup.json
    git add backend/data/golden_dataset_metadata.json
  • Write descriptive commit message

    git commit -m "chore: golden dataset backup (98 analyses, 415 chunks)
    
    - Backup version: 2.0 (includes fixtures)
    - Pass rate: 91.6% (186/203 queries)
    - Changes: [describe any additions/removals]"
  • Push to remote

    git push origin main

Pre-Restore Checklist

Backup Verification

  • Backup file exists

    ls -lh /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json
    # Expected: File exists
  • Backup integrity verified

    cd /Users/yonatangross/coding/OrchestKit/backend
    poetry run python scripts/backup_golden_dataset.py verify
    # Expected: "BACKUP IS VALID"
  • Backup version compatible

    cat backend/data/golden_dataset_backup.json | jq '.version'
    # Expected: "1.0" or "2.0" (script handles both)

Database State Assessment

  • Database accessible

    psql -h localhost -p 5437 -U orchestkit -c "SELECT 1;"
    # Expected: "1"
  • Current data count known

    psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analyses;"
    # Note the count for comparison after restore
  • Decision made: Add or Replace?

    • ADD mode: Keep existing data, add from backup (use restore)
    • REPLACE mode: Delete existing data, restore from backup (use restore --replace)

    WARNING: REPLACE mode is DESTRUCTIVE. Use only if:

    • Setting up fresh environment
    • Recovering from data corruption
    • You have confirmed backup is valid

Environment Setup

  • PostgreSQL running

    docker compose ps postgres
    # Expected: State = "running"
  • Database migrations applied

    cd /Users/yonatangross/coding/OrchestKit/backend
    poetry run alembic current
    # Expected: Shows latest migration revision
  • OpenAI API key set (for embedding regeneration)

    echo $OPENAI_API_KEY
    # Expected: sk-... (valid API key)
    
    # OR check .env file
    grep OPENAI_API_KEY backend/.env
    # Expected: OPENAI_API_KEY=sk-...
  • Sufficient disk space

    df -h /Users/yonatangross/coding/OrchestKit/backend/data
    # Expected: At least 1 GB free

Restore Execution Checklist

Run Restore

Option A: Add to existing data (non-destructive)

  • Execute restore command
    cd /Users/yonatangross/coding/OrchestKit/backend
    poetry run python scripts/backup_golden_dataset.py restore

Option B: Replace existing data (DESTRUCTIVE)

  • CONFIRM backup is valid (run verify again)

    poetry run python scripts/backup_golden_dataset.py verify
    # Expected: "BACKUP IS VALID"
  • CONFIRM you want to delete existing data (no turning back)

    • Yes, I understand this is destructive
    • Yes, I have verified the backup
    • Yes, I am ready to proceed
  • Execute restore with --replace flag

    poetry run python scripts/backup_golden_dataset.py restore --replace

Monitor Restore Progress

  • Watch for restore stages

    • "Loaded backup from: ..." (backup file loaded)
    • "Backup version: 2.0" (schema version)
    • "Restoring 98 analyses..." (analyses being inserted)
    • "Restoring 98 artifacts..." (artifacts being inserted)
    • "Restoring 415 chunks (regenerating embeddings)..." (chunks + embeddings)
    • "Restored 50/415 chunks" (progress updates)
    • "Restored 100/415 chunks"
    • "Restored 150/415 chunks"
    • ... (continues until 415/415)
  • Check for errors during embedding generation

    • No "Failed to generate embedding" warnings
    • No OpenAI API errors
    • All chunks processed successfully
  • Verify restore completion message

    • "RESTORE COMPLETE" displayed
    • Analyses: 98
    • Artifacts: 98
    • Chunks: 415

Post-Restore Verification

  • Check database counts

    # Analyses
    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT COUNT(*) FROM analyses WHERE status = 'completed';"
    # Expected: 98
    
    # Artifacts
    psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM artifacts;"
    # Expected: 98
    
    # Chunks
    psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analysis_chunks;"
    # Expected: 415
  • Verify embeddings generated

    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT COUNT(*) FROM analysis_chunks WHERE vector IS NULL;"
    # Expected: 0 (all chunks have embeddings)
  • Verify URL contract maintained

    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT COUNT(*) FROM analyses WHERE url LIKE '%orchestkit.dev%';"
    # Expected: 0 (no placeholder URLs)
  • Check sample data integrity

    # Verify a known document exists
    psql -h localhost -p 5437 -U orchestkit -c \
      "SELECT title FROM analyses WHERE url = 'https://docs.python.org/3/library/asyncio.html';"
    # Expected: Row returned with title

Validation Testing Checklist

Retrieval Quality Tests

  • Run smoke tests

    cd /Users/yonatangross/coding/OrchestKit/backend
    poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v
  • Check pass rate

    • Total queries: 203
    • Expected pass rate: ~91.6% (186/203 queries)
    • Actual pass rate: ____ (fill in from test output)
    • Pass rate within acceptable range (±2%)
  • No critical regressions

    • If pass rate dropped >5%, investigate:
      • Embedding model matches (check model version)
      • Hybrid search config unchanged
      • Backup file not corrupted

Integration Tests

  • Run API integration tests (if backend running)
    # Start backend
    docker compose up -d backend
    
    # Wait for startup
    sleep 5
    
    # Health check
    curl -f http://localhost:8500/health
    # Expected: 200 OK
    
    # Run integration tests
    poetry run pytest tests/integration/test_artifact_api.py -v
    # Expected: All tests pass

Fixture Validation

  • Verify fixture files restored (v2.0 backups only)

    ls -lh /Users/yonatangross/coding/OrchestKit/backend/tests/smoke/retrieval/fixtures/
    # Expected:
    # - documents_expanded.json
    # - source_url_map.json
    # - queries.json
  • Check fixture counts

    cat backend/tests/smoke/retrieval/fixtures/documents_expanded.json | \
      jq '.documents | length'
    # Expected: 98
    
    cat backend/tests/smoke/retrieval/fixtures/queries.json | \
      jq '.queries | length'
    # Expected: 203

Rollback Checklist (If Restore Fails)

Immediate Actions

  • Stop all database writes

    docker compose stop backend
  • Document failure details

    • Error message: ______________________
    • Failed at stage: ______________________
    • Chunks restored before failure: ______________________

Rollback Options

Option 1: Re-run restore (if partial failure)

  • Identify cause of failure (API rate limit, network issue, etc.)

  • Fix issue (increase timeout, add API key, etc.)

  • Re-run restore with --replace

    poetry run python scripts/backup_golden_dataset.py restore --replace

Option 2: Restore from SQL dump (if available)

  • Check for SQL dump

    ls -lh /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_dump.sql
    # If exists, use pg_restore
  • Drop and recreate database

    docker compose down postgres
    docker compose up -d postgres
    poetry run alembic upgrade head
  • Import SQL dump

    psql -h localhost -p 5437 -U orchestkit < \
      /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_dump.sql

Option 3: Restore from git history (if committed)

  • Find last good backup commit

    git log --oneline -- backend/data/golden_dataset_backup.json
  • Checkout previous backup

    git checkout HEAD~1 -- backend/data/golden_dataset_backup.json
  • Re-run restore

    poetry run python scripts/backup_golden_dataset.py restore --replace

Post-Restore Cleanup

Documentation

  • Update CURRENT_STATUS.md (if significant changes)

    • Document restore date
    • Document restore reason (new env, disaster recovery, etc.)
    • Document pass rate after restore
  • Update golden dataset metadata (if expanded)

    cat backend/data/golden_dataset_metadata.json
    # Verify counts are current

Monitoring

  • Monitor retrieval quality (first 24 hours)

    # Run tests daily for a week to ensure stability
    poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v
  • Monitor API errors (if production)

    • Check logs for embedding errors
    • Check logs for search errors
    • Check logs for database connection errors

Optional: Create New Backup

  • If restore modified data, create new backup
    poetry run python scripts/backup_golden_dataset.py backup
    poetry run python scripts/backup_golden_dataset.py verify
    git add backend/data/golden_dataset_backup.json
    git commit -m "chore: golden dataset backup after restore"

Quick Reference

Full Backup Workflow

cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py backup
poetry run python scripts/backup_golden_dataset.py verify
git add data/golden_dataset_backup.json data/golden_dataset_metadata.json
git commit -m "chore: golden dataset backup"
git push

Full Restore Workflow (New Environment)

cd /Users/yonatangross/coding/OrchestKit/backend
docker compose up -d postgres
sleep 5
poetry run alembic upgrade head
poetry run python scripts/backup_golden_dataset.py verify
poetry run python scripts/backup_golden_dataset.py restore
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v

Full Restore Workflow (Replace Existing)

cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py verify
# CONFIRM: I understand this is destructive
poetry run python scripts/backup_golden_dataset.py restore --replace
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v

Remember: Golden datasets are critical infrastructure. Always verify backups, test restores in staging, and document all changes.


Examples (1)

Orchestkit Dataset Workflow

OrchestKit Golden Dataset Workflow

Complete backup/restore/validation workflow for OrchestKit's 98-document golden dataset


Overview

OrchestKit maintains a golden dataset of 98 curated technical documents with embeddings for testing retrieval quality. This dataset is the source of truth for:

  • Regression testing (ensure new code doesn't break retrieval)
  • Retrieval evaluation (measure precision, recall, MRR)
  • Model benchmarking (compare different embedding models)
  • Environment seeding (new dev environments, CI/CD)

Key Files:

  • Backup Script: /Users/yonatangross/coding/OrchestKit/backend/scripts/backup_golden_dataset.py
  • JSON Backup: /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json (version controlled)
  • Metadata: /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_metadata.json (quick stats)
  • Fixtures: /Users/yonatangross/coding/OrchestKit/backend/tests/smoke/retrieval/fixtures/ (source documents, queries)

Dataset Stats

Current (Production):

  • 98 Analyses (completed content analyses)
  • 415 Chunks (embedded text segments)
  • 203 Test Queries (with expected results)
  • 91.6% Pass Rate (retrieval quality metric)

Content Mix:

  • 76 articles (tutorials, guides, blog posts)
  • 19 technical documentation pages
  • 3 research papers

Topics Covered:

  • RAG (Retrieval-Augmented Generation)
  • LangGraph workflows
  • Prompt engineering
  • API design
  • Testing strategies
  • Performance optimization
  • Security best practices

URL Contract (CRITICAL)

The Rule: Golden dataset analyses MUST store real canonical URLs, not placeholders.

Why this matters:

  • Enables re-fetching content if embeddings need regeneration
  • Allows validation that source content hasn't changed
  • Provides audit trail for data provenance
  • Ensures backup/restore actually works

Validation:

cd /Users/yonatangross/coding/OrchestKit/backend

# Check for placeholder URLs (should return 0)
poetry run python scripts/backup_golden_dataset.py verify | grep "placeholder URLs"
# Expected: "0 analyses with placeholder URLs"

Invalid URLs (will break restore):

  • https://docs.orchestkit.dev/placeholder/123
  • https://learn.orchestkit.dev/fake-content
  • https://content.orchestkit.dev/test

Valid URLs:

  • https://docs.python.org/3/library/asyncio.html
  • https://blog.langchain.dev/langgraph-multi-agent-workflows/
  • https://python.langchain.com/docs/modules/data_connection/retrievers/

Workflow 1: Backup Golden Dataset

When to run:

  • After adding new documents to golden dataset
  • Before major database migrations
  • Weekly automated backup (via GitHub Actions)
  • Before deploying to production

Step 1: Pre-Backup Validation

cd /Users/yonatangross/coding/OrchestKit/backend

# Check database connection
psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analyses WHERE status = 'completed';"
# Expected: 98

# Verify URL contract
psql -h localhost -p 5437 -U orchestkit -c \
  "SELECT COUNT(*) FROM analyses WHERE url LIKE '%orchestkit.dev%';"
# Expected: 0 (no placeholder URLs)

Step 2: Run Backup

cd /Users/yonatangross/coding/OrchestKit/backend

# Create backup (includes fixtures in v2.0)
poetry run python scripts/backup_golden_dataset.py backup

# Output:
# ============================================================
# BACKUP COMPLETE (v2.0)
# ============================================================
#    Analyses:  98
#    Artifacts: 98
#    Chunks:    415
#    Fixtures:  98 documents
#    URL Maps:  98 mappings
#    Queries:   203 test queries
#    Location:  /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json
# ============================================================

Step 3: Verify Backup

# Run verification
poetry run python scripts/backup_golden_dataset.py verify

# Output:
# ============================================================
# BACKUP VERIFICATION
# ============================================================
#    File: /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json
#    Created: 2025-12-21T10:30:00Z
#    Version: 2.0
#
#    Counts:
#      Analyses:  98 (expected: 98)
#      Artifacts: 98 (expected: 98)
#      Chunks:    415 (expected: 415)
#
#    Fixtures:
#      Documents: 98
#      URL Maps:  98
#      Queries:   203
#
#    Referential Integrity: OK
#    All analyses have artifacts: OK
# ============================================================
# BACKUP IS VALID
# ============================================================

Step 4: Commit to Git

# Stage backup files
git add backend/data/golden_dataset_backup.json
git add backend/data/golden_dataset_metadata.json

# Commit with descriptive message
git commit -m "chore: golden dataset backup (98 analyses, 415 chunks)

- Backup version: 2.0 (includes fixtures)
- Added 5 new LangGraph tutorial analyses
- Updated 2 outdated React documentation analyses
- Pass rate: 91.6% (186/203 queries)"

# Push to remote
git push origin main

Workflow 2: Restore Golden Dataset

When to run:

  • Setting up new development environment
  • Recovering from accidental data deletion
  • Seeding CI/CD test database
  • Testing migration scripts

Step 1: Pre-Restore Checks

cd /Users/yonatangross/coding/OrchestKit/backend

# Ensure backup exists
ls -lh data/golden_dataset_backup.json
# Expected: ~2.5 MB file

# Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify
# Expected: "BACKUP IS VALID"

# Check database is empty (or ready to replace)
psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analyses;"
# If > 0 and you want to replace, use --replace flag

Step 2: Run Restore

Option A: Add to existing data (no deletion)

poetry run python scripts/backup_golden_dataset.py restore

# This will:
# 1. Load backup
# 2. Insert analyses (ON CONFLICT DO NOTHING)
# 3. Insert artifacts (ON CONFLICT DO NOTHING)
# 4. Regenerate embeddings for chunks
# 5. Insert chunks (ON CONFLICT DO NOTHING)

Option B: Replace existing data (DESTRUCTIVE)

# WARNING: This deletes ALL existing analyses, artifacts, and chunks
poetry run python scripts/backup_golden_dataset.py restore --replace

# This will:
# 1. DELETE FROM analysis_chunks
# 2. DELETE FROM artifacts
# 3. DELETE FROM analyses
# 4. Restore from backup (with regenerated embeddings)

Step 3: Monitor Restore Progress

# Restore output:
# Loaded backup from: /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json
# Backup version: 2.0
# Backup created: 2025-12-19T10:30:00Z
# Restoring 98 analyses...
# Restoring 98 artifacts...
# Restoring 415 chunks (regenerating embeddings)...
#   Restored 50/415 chunks
#   Restored 100/415 chunks
#   Restored 150/415 chunks
#   Restored 200/415 chunks
#   Restored 250/415 chunks
#   Restored 300/415 chunks
#   Restored 350/415 chunks
#   Restored 400/415 chunks
#   Restored 415/415 chunks
#
# ============================================================
# RESTORE COMPLETE
# ============================================================
#    Analyses:  98
#    Artifacts: 98
#    Chunks:    415
# ============================================================

Step 4: Verify Restore

# Check counts
psql -h localhost -p 5437 -U orchestkit -c \
  "SELECT COUNT(*) FROM analyses WHERE status = 'completed';"
# Expected: 98

psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM artifacts;"
# Expected: 98

psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analysis_chunks;"
# Expected: 415

# Check embeddings generated
psql -h localhost -p 5437 -U orchestkit -c \
  "SELECT COUNT(*) FROM analysis_chunks WHERE vector IS NULL;"
# Expected: 0 (all chunks should have embeddings)

# Run retrieval quality tests
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v

# Expected output:
# test_query_langchain_agent_memory PASSED
# test_query_rag_chunking_strategies PASSED
# test_query_prompt_engineering_basics PASSED
# ...
# 186 passed, 17 failed (91.6% pass rate)

Workflow 3: Expand Golden Dataset

When to run:

  • Adding new technical content for better coverage
  • Improving retrieval quality for specific topics
  • Testing new embedding models

Step 1: Prepare Source Documents

cd /Users/yonatangross/coding/OrchestKit/backend/tests/smoke/retrieval/fixtures

# Edit documents_expanded.json to add new documents
# Example:
{
  "version": "2.0",
  "generated": "2025-12-21",
  "source": "Manual expansion",
  "documents": [
    {
      "id": "langgraph-streaming-guide",
      "source_url": "https://blog.langchain.dev/streaming-in-langgraph/",
      "content_type": "tutorial",
      "title": "Streaming in LangGraph: A Complete Guide",
      "content": "...",
      "metadata": {
        "author": "LangChain Team",
        "published_date": "2025-11-15"
      }
    }
  ]
}

Step 2: Add Test Queries

# Edit queries.json to add test queries for new content
{
  "version": "1.1",
  "generated": "2025-12-21",
  "queries": [
    {
      "id": "q-langgraph-streaming-1",
      "query": "How do I stream outputs in LangGraph?",
      "expected_chunks": ["langgraph-streaming-guide-chunk-0"],
      "difficulty": "medium",
      "category": "implementation"
    }
  ]
}

Step 3: Run Fixture Loader

cd /Users/yonatangross/coding/OrchestKit/backend

# Load new fixtures into database
poetry run python tests/smoke/retrieval/load_fixtures.py

# This will:
# 1. Load documents_expanded.json
# 2. Create analyses for each document
# 3. Generate chunks with embeddings
# 4. Create artifacts
# 5. Store in PostgreSQL

Step 4: Validate New Data

# Run retrieval quality tests
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v

# Check for new query test
# Expected: test_query_langgraph_streaming_1 PASSED

# Verify new document in database
psql -h localhost -p 5437 -U orchestkit -c \
  "SELECT title FROM analyses WHERE url = 'https://blog.langchain.dev/streaming-in-langgraph/';"
# Expected: "Streaming in LangGraph: A Complete Guide"

Step 5: Create New Backup

# Backup expanded dataset
poetry run python scripts/backup_golden_dataset.py backup

# Verify backup includes new content
poetry run python scripts/backup_golden_dataset.py verify

# Expected output shows increased counts:
#    Analyses:  99 (was 98)
#    Chunks:    420 (was 415)
#    Queries:   204 (was 203)

# Commit to git
git add backend/data/golden_dataset_backup.json
git add backend/tests/smoke/retrieval/fixtures/documents_expanded.json
git add backend/tests/smoke/retrieval/fixtures/queries.json

git commit -m "feat: expand golden dataset with LangGraph streaming guide

- Added 1 new analysis (LangGraph streaming)
- Added 5 new chunks
- Added 1 new test query
- Total: 99 analyses, 420 chunks, 204 queries"

Workflow 4: CI/CD Integration

Automated weekly backup via GitHub Actions

GitHub Actions Workflow

File: /Users/yonatangross/coding/OrchestKit/.github/workflows/backup-golden-dataset.yml

name: Backup Golden Dataset

on:
  schedule:
    - cron: '0 2 * * 0'  # Weekly on Sunday at 2am UTC
  workflow_dispatch:  # Manual trigger

jobs:
  backup:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install Poetry
        run: |
          curl -sSL https://install.python-poetry.org | python3 -
          echo "$HOME/.local/bin" >> $GITHUB_PATH

      - name: Install dependencies
        run: |
          cd backend
          poetry install --no-root

      - name: Setup PostgreSQL
        run: |
          docker run -d \
            --name postgres \
            -e POSTGRES_USER=orchestkit \
            -e POSTGRES_PASSWORD=orchestkit \
            -e POSTGRES_DB=orchestkit \
            -p 5437:5432 \
            pgvector/pgvector:pg16

          # Wait for PostgreSQL to be ready
          sleep 10

      - name: Run migrations
        env:
          DATABASE_URL: postgresql://orchestkit:orchestkit@localhost:5437/orchestkit
        run: |
          cd backend
          poetry run alembic upgrade head

      - name: Restore current backup (to have data to backup)
        env:
          DATABASE_URL: postgresql://orchestkit:orchestkit@localhost:5437/orchestkit
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py restore

      - name: Create fresh backup
        env:
          DATABASE_URL: postgresql://orchestkit:orchestkit@localhost:5437/orchestkit
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py backup

      - name: Verify backup
        run: |
          cd backend
          poetry run python scripts/backup_golden_dataset.py verify

      - name: Commit backup
        run: |
          git config user.name "GitHub Actions"
          git config user.email "actions@github.com"
          git add backend/data/golden_dataset_backup.json
          git add backend/data/golden_dataset_metadata.json
          git diff-index --quiet HEAD || git commit -m "chore: automated golden dataset backup [skip ci]"
          git push

Manual CI Trigger

# Trigger workflow manually
gh workflow run backup-golden-dataset.yml

# Check workflow status
gh run list --workflow=backup-golden-dataset.yml

# View logs
gh run view --log

Workflow 5: Disaster Recovery

Scenario: Accidental DELETE FROM analyses WHERE 1=1

Recovery Steps

# Step 1: Stop all database writes immediately
docker compose stop backend

# Step 2: Verify backup exists
cd /Users/yonatangross/coding/OrchestKit/backend
ls -lh data/golden_dataset_backup.json
# Expected: ~2.5 MB file modified recently

# Step 3: Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify
# Expected: "BACKUP IS VALID"

# Step 4: Restore from backup
poetry run python scripts/backup_golden_dataset.py restore --replace

# Step 5: Verify restoration
psql -h localhost -p 5437 -U orchestkit -c \
  "SELECT COUNT(*) FROM analyses WHERE status = 'completed';"
# Expected: 98

# Step 6: Run integrity tests
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v
# Expected: 186/203 passed (91.6%)

# Step 7: Restart backend
docker compose up -d backend

# Step 8: Smoke test API
curl -f http://localhost:8500/health
# Expected: 200 OK

Workflow 6: New Dev Environment Setup

Scenario: Fresh MacBook, setting up OrchestKit for first time

Setup Steps

# Step 1: Clone repository (includes backup in version control)
git clone https://github.com/your-org/orchestkit.git
cd orchestkit

# Step 2: Setup backend
cd backend
poetry install

# Step 3: Start PostgreSQL
cd ..
docker compose up -d postgres

# Wait for PostgreSQL to be ready
sleep 5

# Step 4: Run migrations
cd backend
poetry run alembic upgrade head

# Step 5: Restore golden dataset
poetry run python scripts/backup_golden_dataset.py restore

# Expected output:
# ============================================================
# RESTORE COMPLETE
# ============================================================
#    Analyses:  98
#    Artifacts: 98
#    Chunks:    415
# ============================================================

# Step 6: Verify with tests
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v

# Expected: 186/203 passed (91.6%)

# Step 7: Start backend
cd ..
docker compose up -d backend

# Step 8: Verify API
curl -f http://localhost:8500/health
# Expected: 200 OK

# Step 9: Setup frontend
cd frontend
npm install
npm run dev

# Open http://localhost:5173

Common Issues & Solutions

Issue 1: Backup verification fails with "placeholder URLs"

Error:

WARNING: 5 analyses still use placeholder URLs
(example: https://docs.orchestkit.dev/placeholder/123)

Solution:

# Identify analyses with placeholder URLs
psql -h localhost -p 5437 -U orchestkit -c \
  "SELECT id, url FROM analyses WHERE url LIKE '%orchestkit.dev%';"

# Update with real canonical URLs
psql -h localhost -p 5437 -U orchestkit -c \
  "UPDATE analyses
   SET url = 'https://docs.python.org/3/library/asyncio.html'
   WHERE id = '550e8400-e29b-41d4-a716-446655440000';"

# Re-run backup
poetry run python scripts/backup_golden_dataset.py backup

# Verify
poetry run python scripts/backup_golden_dataset.py verify
# Expected: "BACKUP IS VALID" (no placeholder URLs)

Issue 2: Restore fails with "Failed to generate embedding"

Error:

WARNING: Failed to generate embedding for chunk 123: OpenAI API error

Solution:

# Check OpenAI API key
echo $OPENAI_API_KEY
# Should be set

# Check .env file
grep OPENAI_API_KEY backend/.env
# Should have: OPENAI_API_KEY=sk-...

# Retry restore
poetry run python scripts/backup_golden_dataset.py restore --replace

# If still failing, check OpenAI quota
curl https://api.openai.com/v1/usage \
  -H "Authorization: Bearer $OPENAI_API_KEY"

Issue 3: Retrieval quality tests fail after restore

Error:

186 passed, 17 failed (91.6% pass rate)
BUT EXPECTED: 203 passed (100% pass rate)

Solution:

# This is EXPECTED! Retrieval quality is not 100%.
# 91.6% is the BASELINE pass rate for OrchestKit golden dataset.

# Check if pass rate DECREASED (regression):
# Before restore: 186/203 (91.6%)
# After restore:  186/203 (91.6%)
# NO REGRESSION - restore successful

# If pass rate dropped significantly (e.g., to 80%):
# 1. Check embedding model matches (should use same model)
# 2. Check hybrid search weights (RRF multiplier, boosts)
# 3. Run backup verification again
poetry run python scripts/backup_golden_dataset.py verify

Quick Reference

Backup

cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py backup
poetry run python scripts/backup_golden_dataset.py verify
git add data/golden_dataset_backup.json
git commit -m "chore: golden dataset backup"

Restore (New Environment)

cd /Users/yonatangross/coding/OrchestKit/backend
docker compose up -d postgres
poetry run alembic upgrade head
poetry run python scripts/backup_golden_dataset.py restore
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v

Restore (Replace Existing)

cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py restore --replace

Verify Backup Integrity

cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py verify

Remember: The golden dataset is the foundation of retrieval quality testing. Always verify backups, never skip URL validation, and test restore in staging before production.

Edit on GitHub

Last updated on

On this page

Golden DatasetQuick ReferenceCurationManagementValidationAdd WorkflowQuick Start ExampleKey DecisionsCommon MistakesEvaluationsRelated SkillsCapability DetailscurationmanagementvalidationRules (10)Follow the full curation pipeline when adding entries to the golden dataset — HIGHAdd to Golden Dataset WorkflowUse multi-agent annotation for consistent and thorough curation quality decisions — HIGHMulti-Agent AnnotationApply systematic collection criteria to maintain consistent golden dataset quality — HIGHContent CollectionBalance dataset coverage across difficulty levels, content types, and domains — HIGHDataset DiversityIntegrate golden dataset validation and backups into the CI/CD pipeline — HIGHCI IntegrationChoose the right backup strategy and URL contract for golden dataset storage — HIGHStorage PatternsVersion golden datasets for reproducible evaluation across environments and recovery — HIGHDataset VersioningDetect duplicate entries and coverage gaps that skew golden dataset evaluation results — CRITICALDrift DetectionValidate schema and content quality to prevent invalid entries from degrading evaluations — CRITICALQuality ValidationRun regression tests and enforce difficulty distribution to maintain evaluation reliability — CRITICALRegression TestingReferences (8)Annotation PatternsAnnotation PatternsMulti-Agent Analysis PipelineArchitectureAgent SpecificationsQuality Evaluator AgentDifficulty Classifier AgentDomain Tagger AgentQuery Generator AgentConsensus AggregationAggregation LogicLangfuse IntegrationTrace StructurePrompt ManagementBackup RestoreBackup & Restore Golden DatasetBackup Process1. Export to JSON2. Serialize Without EmbeddingsRestore Process1. Load and Validate Backup2. Clear Existing Data (Optional)3. Restore Analyses and Chunks4. Verify RestoreCLI CommandsRegenerating EmbeddingsSQL Dump (Alternative)Create SQL DumpRestore from SQL DumpError HandlingReferencesQuality MetricsQuality Metrics and Coverage AnalysisCoverage AnalysisGap DetectionValidation WorkflowPre-Addition ValidationFull Dataset ValidationCLI IntegrationValidation CommandsPre-Commit HookSelection CriteriaSelection CriteriaContent Type ClassificationSupported TypesClassification CriteriaDifficulty ClassificationStratification LevelsClassification FactorsQuality Evaluation Dimensions1. Accuracy (Weight: 0.25)2. Coherence (Weight: 0.20)3. Depth (Weight: 0.25)4. Relevance (Weight: 0.30)Best PracticesQuality ThresholdsCoverage BalanceDuplicate PreventionProvenance TrackingStorage PatternsStorage PatternsBackup StrategiesStrategy 1: JSON Backup (Recommended)Strategy 2: SQL DumpBackup FormatBackup ImplementationCLI UsageCI/CD IntegrationAutomated BackupsValidation ContractsData Validation & ContractsThe URL ContractWhy This MattersValidation CheckData Integrity Checks1. Count Validation2. Embedding Validation3. Orphaned Data Check4. Duplicate CheckComprehensive ValidationPre-Deployment ChecklistAutomated Validation (CI)ReferencesValidation RulesValidation RulesRule 1: No Placeholder URLsRule 2: Unique IdentifiersRule 3: Referential IntegrityRule 4: Content QualityRule 5: Difficulty DistributionDuplicate DetectionSemantic Similarity CheckURL Duplicate CheckVersioningVersioning and RecoveryRestore ImplementationProcess OverviewRegenerating EmbeddingsValidationValidation ChecklistBest Practices1. Version Control Backups2. Validate Before Deployment3. Test Restore in Staging4. Document ChangesDisaster RecoveryScenario 1: Accidental DeletionScenario 2: Database Migration Gone WrongScenario 3: New Environment SetupData Integrity ContractsThe URL ContractChecklists (1)Backup Restore ChecklistGolden Dataset Backup/Restore ChecklistPre-Backup ChecklistEnvironment VerificationData Quality ValidationScript AvailabilityBackup Execution ChecklistRun BackupVerify BackupCommit BackupPre-Restore ChecklistBackup VerificationDatabase State AssessmentEnvironment SetupRestore Execution ChecklistRun RestoreMonitor Restore ProgressPost-Restore VerificationValidation Testing ChecklistRetrieval Quality TestsIntegration TestsFixture ValidationRollback Checklist (If Restore Fails)Immediate ActionsRollback OptionsPost-Restore CleanupDocumentationMonitoringOptional: Create New BackupQuick ReferenceFull Backup WorkflowFull Restore Workflow (New Environment)Full Restore Workflow (Replace Existing)Examples (1)Orchestkit Dataset WorkflowOrchestKit Golden Dataset WorkflowOverviewDataset StatsURL Contract (CRITICAL)Workflow 1: Backup Golden DatasetStep 1: Pre-Backup ValidationStep 2: Run BackupStep 3: Verify BackupStep 4: Commit to GitWorkflow 2: Restore Golden DatasetStep 1: Pre-Restore ChecksStep 2: Run RestoreStep 3: Monitor Restore ProgressStep 4: Verify RestoreWorkflow 3: Expand Golden DatasetStep 1: Prepare Source DocumentsStep 2: Add Test QueriesStep 3: Run Fixture LoaderStep 4: Validate New DataStep 5: Create New BackupWorkflow 4: CI/CD IntegrationGitHub Actions WorkflowManual CI TriggerWorkflow 5: Disaster RecoveryRecovery StepsWorkflow 6: New Dev Environment SetupSetup StepsCommon Issues & SolutionsIssue 1: Backup verification fails with "placeholder URLs"Issue 2: Restore fails with "Failed to generate embedding"Issue 3: Retrieval quality tests fail after restoreQuick ReferenceBackupRestore (New Environment)Restore (Replace Existing)Verify Backup Integrity