Golden Dataset
Golden dataset lifecycle patterns for curation, versioning, quality validation, and CI integration. Use when building evaluation datasets, managing dataset versions, validating quality scores, or integrating golden tests into pipelines.
Primary Agent: data-pipeline-engineer
Golden Dataset
Comprehensive patterns for building, managing, and validating golden datasets for AI/ML evaluation. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Curation | 3 | HIGH | Content collection, annotation pipelines, diversity analysis |
| Management | 3 | HIGH | Versioning, backup/restore, CI/CD automation |
| Validation | 3 | CRITICAL | Quality scoring, drift detection, regression testing |
| Add Workflow | 1 | HIGH | 9-phase curation, quality scoring, bias detection, silver-to-gold |
Total: 10 rules across 4 categories
Curation
Content collection, multi-agent annotation, and diversity analysis for golden datasets.
| Rule | File | Key Pattern |
|---|---|---|
| Collection | rules/curation-collection.md | Content type classification, quality thresholds, duplicate prevention |
| Annotation | rules/curation-annotation.md | Multi-agent pipeline, consensus aggregation, Langfuse tracing |
| Diversity | rules/curation-diversity.md | Difficulty stratification, domain coverage, balance guidelines |
Management
Versioning, storage, and CI/CD automation for golden datasets.
| Rule | File | Key Pattern |
|---|---|---|
| Versioning | rules/management-versioning.md | JSON backup format, embedding regeneration, disaster recovery |
| Storage | rules/management-storage.md | Backup strategies, URL contract, data integrity checks |
| CI Integration | rules/management-ci.md | GitHub Actions automation, pre-deployment validation, weekly backups |
Validation
Quality scoring, drift detection, and regression testing for golden datasets.
| Rule | File | Key Pattern |
|---|---|---|
| Quality | rules/validation-quality.md | Schema validation, content quality, referential integrity |
| Drift | rules/validation-drift.md | Duplicate detection, semantic similarity, coverage gap analysis |
| Regression | rules/validation-regression.md | Difficulty distribution, pre-commit hooks, full dataset validation |
Add Workflow
Structured workflow for adding new documents to the golden dataset.
| Rule | File | Key Pattern |
|---|---|---|
| Add Document | rules/curation-add-workflow.md | 9-phase curation, parallel quality analysis, bias detection |
Quick Start Example
from app.shared.services.embeddings import embed_text
async def validate_before_add(document: dict, source_url_map: dict) -> dict:
"""Pre-addition validation for golden dataset entries."""
errors = []
# 1. URL contract check
if "placeholder" in document.get("source_url", ""):
errors.append("URL must be canonical, not a placeholder")
# 2. Content quality
if len(document.get("title", "")) < 10:
errors.append("Title too short (min 10 chars)")
# 3. Tag requirements
if len(document.get("tags", [])) < 2:
errors.append("At least 2 domain tags required")
return {"valid": len(errors) == 0, "errors": errors}Key Decisions
| Decision | Recommendation |
|---|---|
| Backup format | JSON (version controlled, portable) |
| Embedding storage | Exclude from backup (regenerate on restore) |
| Quality threshold | >= 0.70 quality score for inclusion |
| Confidence threshold | >= 0.65 for auto-include |
| Duplicate threshold | >= 0.90 similarity blocks, >= 0.85 warns |
| Min tags per entry | 2 domain tags |
| Min test queries | 3 per document |
| Difficulty balance | Trivial 3, Easy 3, Medium 5, Hard 3 minimum |
| CI frequency | Weekly automated backup (Sunday 2am UTC) |
Common Mistakes
- Using placeholder URLs instead of canonical source URLs
- Skipping embedding regeneration after restore
- Not validating referential integrity between documents and queries
- Over-indexing on articles (neglecting tutorials, research papers)
- Missing difficulty distribution balance in test queries
- Not running verification after backup/restore operations
- Testing restore procedures in production instead of staging
- Committing SQL dumps instead of JSON (not version-control friendly)
Evaluations
See test-cases.json for 9 test cases across all categories.
Related Skills
ork:rag-retrieval- Retrieval evaluation using golden datasetlangfuse-observability- Tracing patterns for curation workflowsork:testing-patterns- General testing patterns and strategiesai-native-development- Embedding generation for restore
Capability Details
curation
Keywords: golden dataset, curation, content collection, annotation, quality criteria
Solves:
- Classify document content types for golden dataset
- Run multi-agent quality analysis pipelines
- Generate test queries for new documents
management
Keywords: golden dataset, backup, restore, versioning, disaster recovery
Solves:
- Backup and restore golden datasets with JSON
- Regenerate embeddings after restore
- Automate backups with CI/CD
validation
Keywords: golden dataset, validation, schema, duplicate detection, quality metrics
Solves:
- Validate entries against document schema
- Detect duplicate or near-duplicate entries
- Analyze dataset coverage and distribution gaps
Rules (10)
Follow the full curation pipeline when adding entries to the golden dataset — HIGH
Add to Golden Dataset Workflow
Multi-agent curation pipeline with quality scoring, bias detection, and silver-to-gold promotion.
Incorrect — adding documents without validation:
# No quality check, no bias detection, no dedup
dataset.append({"url": url, "content": content})Correct — 9-phase curation workflow:
Phase 1-2: Input and extraction
# Detect content type and extract structure
content_type = classify(url) # article, tutorial, documentation, research_paper
structured = extract(url) # title, sections, code blocks, key terms, metadataPhase 3: Parallel quality analysis (4 agents)
# Launch ALL quality agents in parallel
# Agent 1: Accuracy, coherence, depth, relevance scores
# Agent 2: Keyword directness, difficulty level
# Agent 3: Domain tags, skill level classification
# Agent 4: Test query generation (direct, paraphrased, multi-hop)Phase 4: Quality scoring formula
quality_score = (
accuracy * 0.25 +
coherence * 0.20 +
depth * 0.25 +
relevance * 0.30
)Phase 5-6: Bias detection and diversity check
| Bias Score | Action |
|---|---|
| 0-2 | Proceed normally |
| 3-5 | Add disclaimer |
| 6-8 | Require user review |
| 9-10 | Recommend against inclusion |
Phase 7-8: Validation and classification
| Status | Quality Score | Action |
|---|---|---|
| GOLD | >= 0.75 | Add to main dataset |
| SILVER | 0.55-0.74 | Add to silver tier, track |
| REJECT | < 0.55 | Do not add |
Promotion criteria: 7+ days in silver, quality >= 0.75, no negative feedback.
Phase 9: Version tracking
{
"version": "1.2.3",
"change_type": "ADD",
"document_id": "doc-123",
"quality_score": 0.82,
"rollback_available": true
}| Update Type | Version Bump |
|---|---|
| Add/Update document | Patch (0.0.X) |
| Remove document | Minor (0.X.0) |
| Schema change | Major (X.0.0) |
Key rules:
- Never skip the quality analysis phase — it prevents low-quality entries from degrading evaluations
- Run bias detection on every addition — dataset contamination is hard to reverse
- Use the two-tier system (silver/gold) to let borderline documents prove themselves
- Always validate URL is canonical (not a placeholder) and check for >80% duplicate similarity
- Minimum requirements: 2+ domain tags, 3+ test queries per document
Use multi-agent annotation for consistent and thorough curation quality decisions — HIGH
Multi-Agent Annotation
Multi-agent analysis pipeline with consensus aggregation for golden dataset curation.
Pipeline Architecture:
INPUT: URL/Content
|
v
+------------------+
| FETCH AGENT | WebFetch or file read
| (sequential) | Extract structure, detect type
+--------+---------+
|
v
+-----------------------------------------------+
| PARALLEL ANALYSIS AGENTS |
| +----------+ +----------+ +--------+ +------+ |
| | Quality | |Difficulty| | Domain | |Query | |
| |Evaluator | |Classifier| | Tagger | |Gen | |
| +----+-----+ +----+-----+ +---+----+ +--+---+ |
+-------+------------+-----------+---------+-----+
|
v
+-----------------------------------------------+
| CONSENSUS AGGREGATOR |
| - Weighted quality score |
| - Confidence level (agent agreement) |
| - Final recommendation: include/review/exclude|
+--------+--------------------------------------+
|
v
+------------------+
| USER APPROVAL | Show scores, get confirmation
+------------------+Quality Evaluator Agent:
Task(
subagent_type="code-quality-reviewer",
prompt="""GOLDEN DATASET QUALITY EVALUATION
Evaluate this content for golden dataset inclusion:
Content: {content_preview}
Source: {source_url}
Type: {content_type}
Score these dimensions (0.0-1.0):
1. ACCURACY (weight 0.25)
- Technical correctness
- Code validity
- Up-to-date information
2. COHERENCE (weight 0.20)
- Logical structure
- Clear flow
- Consistent terminology
3. DEPTH (weight 0.25)
- Comprehensive coverage
- Edge cases mentioned
- Appropriate detail level
4. RELEVANCE (weight 0.30)
- Alignment with AI/ML, backend, frontend, DevOps
- Practical applicability
- Technical value
Output JSON:
{
"accuracy": {"score": 0.X, "rationale": "..."},
"coherence": {"score": 0.X, "rationale": "..."},
"depth": {"score": 0.X, "rationale": "..."},
"relevance": {"score": 0.X, "rationale": "..."},
"weighted_total": 0.X,
"recommendation": "include|review|exclude"
}
""",
run_in_background=True
)Consensus Aggregation Logic:
from dataclasses import dataclass
from typing import Literal
@dataclass
class CurationConsensus:
"""Aggregated result from multi-agent analysis."""
quality_score: float # Weighted average (0-1)
confidence: float # Agent agreement (0-1)
decision: Literal["include", "review", "exclude"]
content_type: str
difficulty: str
tags: list[str]
suggested_queries: list[dict]
warnings: list[str]
def aggregate_results(
quality_result: dict,
difficulty_result: dict,
domain_result: dict,
query_result: dict,
) -> CurationConsensus:
"""Aggregate multi-agent results into consensus."""
# Calculate weighted quality score
q = quality_result
quality_score = (
q["accuracy"]["score"] * 0.25 +
q["coherence"]["score"] * 0.20 +
q["depth"]["score"] * 0.25 +
q["relevance"]["score"] * 0.30
)
# Calculate confidence (variance-based)
scores = [
q["accuracy"]["score"],
q["coherence"]["score"],
q["depth"]["score"],
q["relevance"]["score"],
]
variance = sum((s - quality_score)**2 for s in scores) / len(scores)
confidence = 1.0 - min(variance * 4, 1.0)
# Decision thresholds
if quality_score >= 0.75 and confidence >= 0.7:
decision = "include"
elif quality_score >= 0.55:
decision = "review"
else:
decision = "exclude"
return CurationConsensus(
quality_score=quality_score,
confidence=confidence,
decision=decision,
content_type=difficulty_result.get("content_type", "article"),
difficulty=difficulty_result["difficulty"],
tags=domain_result["tags"],
suggested_queries=query_result["queries"],
warnings=[],
)Langfuse Integration (v3):
from langfuse import observe, get_client
@observe(name="golden-dataset-curation")
async def curate_with_tracing(url: str, doc_id: str, consensus: CurationConsensus) -> dict:
"""Trace curation decisions to Langfuse for audit trail."""
get_client().update_current_trace(
metadata={"source_url": url, "document_id": doc_id}
)
# Log individual dimension scores against the current trace
lf = get_client()
trace_id = lf.get_current_trace_id()
lf.score(trace_id=trace_id, name="accuracy", value=0.85)
lf.score(trace_id=trace_id, name="coherence", value=0.90)
lf.score(trace_id=trace_id, name="depth", value=0.78)
lf.score(trace_id=trace_id, name="relevance", value=0.92)
# Final aggregated score
lf.score(trace_id=trace_id, name="quality_total", value=consensus.quality_score)
get_client().update_current_observation(
metadata={"curation_decision": consensus.decision}
)
return {"decision": consensus.decision, "score": consensus.quality_score}Incorrect — Sequential agent execution:
# Sequential - 4x slower
quality_result = await analyze_quality(content)
difficulty_result = await analyze_difficulty(content)
domain_result = await analyze_domain(content)
query_result = await generate_queries(content)Correct — Parallel agent execution:
# Parallel - all agents run concurrently
quality_task = Task(subagent_type="code-quality-reviewer", prompt=quality_prompt, run_in_background=True)
difficulty_task = Task(subagent_type="classifier", prompt=difficulty_prompt, run_in_background=True)
domain_task = Task(subagent_type="tagger", prompt=domain_prompt, run_in_background=True)
query_task = Task(subagent_type="query-generator", prompt=query_prompt, run_in_background=True)
# Wait for all results
results = await gather_task_results([quality_task, difficulty_task, domain_task, query_task])Key rules:
- Run all 4 analysis agents in parallel for throughput
- Use weighted scoring (accuracy 0.25, coherence 0.20, depth 0.25, relevance 0.30)
- Require user approval before final inclusion
- Log all scores to Langfuse for audit trail
Apply systematic collection criteria to maintain consistent golden dataset quality — HIGH
Content Collection
Systematic patterns for collecting and classifying content for golden dataset inclusion.
Content Type Classification:
| Type | Description | Quality Focus |
|---|---|---|
article | Technical articles, blog posts | Depth, accuracy, actionability |
tutorial | Step-by-step guides | Completeness, clarity, code quality |
research_paper | Academic papers, whitepapers | Rigor, citations, methodology |
documentation | API docs, reference materials | Accuracy, completeness, examples |
video_transcript | Transcribed video content | Structure, coherence, key points |
code_repository | README, code analysis | Code quality, documentation |
Classification Decision Tree:
def classify_content_type(content: str, source_url: str) -> str:
"""Classify content type based on structure and source."""
# URL-based hints
if "arxiv.org" in source_url or "papers" in source_url:
return "research_paper"
if "docs." in source_url or "/api/" in source_url:
return "documentation"
if "github.com" in source_url:
return "code_repository"
# Content-based analysis
if has_step_by_step_structure(content):
return "tutorial"
if has_academic_structure(content): # Abstract, methodology, results
return "research_paper"
# Default
return "article"Quality Thresholds:
# Recommended thresholds for golden dataset inclusion
minimum_quality_score: 0.70
minimum_confidence: 0.65
required_tags: 2 # At least 2 domain tags
required_queries: 3 # At least 3 test queriesQuality Dimensions:
| Dimension | Weight | Perfect | Acceptable | Failing |
|---|---|---|---|---|
| Accuracy | 0.25 | 0.95-1.0 | 0.70-0.94 | <0.70 |
| Coherence | 0.20 | 0.90-1.0 | 0.60-0.89 | <0.60 |
| Depth | 0.25 | 0.90-1.0 | 0.55-0.89 | <0.55 |
| Relevance | 0.30 | 0.95-1.0 | 0.70-0.94 | <0.70 |
Decision Thresholds:
| Quality Score | Confidence | Decision |
|---|---|---|
| >= 0.75 | >= 0.70 | include |
| >= 0.55 | any | review |
| < 0.55 | any | exclude |
Duplicate Prevention Checklist:
- Check URL against existing
source_url_map.json - Run semantic similarity against existing document embeddings
- Warn if >80% similar to existing document
Provenance Tracking -- always record:
- Source URL (canonical)
- Curation date
- Agent scores (for audit trail)
- Langfuse trace ID
Incorrect — Placeholder URL:
# Missing real source URL
analysis = Analysis(
url="https://orchestkit.dev/placeholder/123",
content_type="article",
)Correct — Real canonical URL:
# Real source for re-fetching and validation
analysis = Analysis(
url="https://docs.python.org/3/library/asyncio.html",
content_type="documentation",
)Key rules:
- Never use placeholder URLs -- always store real canonical source URLs
- Require minimum 2 domain tags and 3 test queries per entry
- Score all 4 quality dimensions before inclusion decision
- Track provenance for full audit trail
Balance dataset coverage across difficulty levels, content types, and domains — HIGH
Dataset Diversity
Difficulty stratification, domain coverage, and balance guidelines for golden datasets.
Difficulty Levels:
| Level | Semantic Complexity | Expected Score | Characteristics |
|---|---|---|---|
| trivial | Direct keyword match | >0.85 | Technical terms, exact phrases |
| easy | Common synonyms | >0.70 | Well-known concepts, slight variations |
| medium | Paraphrased intent | >0.55 | Conceptual queries, multi-topic |
| hard | Multi-hop reasoning | >0.40 | Cross-domain, comparative analysis |
| adversarial | Edge cases | Graceful degradation | Robustness tests, off-domain |
Difficulty Classification:
def classify_difficulty(document: dict) -> str:
"""Classify document difficulty for retrieval testing."""
factors = {
"technical_density": count_technical_terms(document["content"]),
"section_count": len(document.get("sections", [])),
"cross_references": count_cross_references(document),
"abstraction_level": assess_abstraction(document),
"domain_specificity": assess_domain_specificity(document),
}
# Scoring rubric
score = 0
if factors["technical_density"] > 50:
score += 2
if factors["section_count"] > 10:
score += 1
if factors["cross_references"] > 5:
score += 2
if factors["abstraction_level"] == "high":
score += 2
# Map score to difficulty
if score <= 2:
return "trivial"
elif score <= 4:
return "easy"
elif score <= 6:
return "medium"
elif score <= 8:
return "hard"
else:
return "adversarial"Coverage Requirements:
| Metric | Minimum |
|---|---|
| Tutorials | >= 15% of documents |
| Research papers | >= 5% of documents |
| Domain coverage | >= 5 docs per expected domain |
| Hard queries | >= 10% of queries |
| Adversarial queries | >= 5% of queries |
Difficulty Distribution Minimums:
| Level | Minimum Count |
|---|---|
| trivial | 3 |
| easy | 3 |
| medium | 5 |
| hard | 3 |
Coverage Gap Detection:
def analyze_coverage_gaps(
documents: list[dict],
queries: list[dict],
) -> dict:
"""Analyze dataset coverage and identify gaps."""
# Content type distribution
content_types = {}
for doc in documents:
ct = doc.get("content_type", "unknown")
content_types[ct] = content_types.get(ct, 0) + 1
# Domain/tag distribution
all_tags = []
for doc in documents:
all_tags.extend(doc.get("tags", []))
tag_counts = {}
for tag in all_tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
# Difficulty distribution
difficulties = {}
for query in queries:
diff = query.get("difficulty", "unknown")
difficulties[diff] = difficulties.get(diff, 0) + 1
# Identify gaps
gaps = []
total_docs = len(documents)
if content_types.get("tutorial", 0) / total_docs < 0.15:
gaps.append("Under-represented: tutorials (<15%)")
if content_types.get("research_paper", 0) / total_docs < 0.05:
gaps.append("Under-represented: research papers (<5%)")
expected_domains = ["ai-ml", "backend", "frontend", "devops", "security"]
for domain in expected_domains:
if tag_counts.get(domain, 0) < 5:
gaps.append(f"Under-represented domain: {domain} (<5 docs)")
total_queries = len(queries)
if difficulties.get("hard", 0) / total_queries < 0.10:
gaps.append("Under-represented: hard queries (<10%)")
return {
"content_type_distribution": content_types,
"difficulty_distribution": difficulties,
"gaps": gaps,
}Incorrect — Hardcoded difficulty without analysis:
# Guessing difficulty level
document = {
"id": "new-doc",
"difficulty": "medium", # No assessment
"tags": ["ai-ml"], # Only 1 tag
}Correct — Classified difficulty with analysis:
# Analyze multiple factors
factors = {
"technical_density": count_technical_terms(document["content"]),
"section_count": len(document.get("sections", [])),
"abstraction_level": assess_abstraction(document),
}
difficulty = classify_difficulty(document) # Returns "hard" based on factors
document["difficulty"] = difficulty
document["tags"] = ["ai-ml", "backend", "devops"] # Minimum 2 tagsKey rules:
- Maintain balanced coverage across content types, difficulty levels, and domains
- Do not over-index on articles -- ensure tutorials and research papers are represented
- Need both trivial AND hard queries for meaningful evaluation
- Run coverage analysis before and after adding new entries
- Target all 5 expected domains (ai-ml, backend, frontend, devops, security)
Integrate golden dataset validation and backups into the CI/CD pipeline — HIGH
CI Integration
GitHub Actions automation, pre-deployment validation, and scheduled backup patterns.
Automated Weekly Backup:
# .github/workflows/backup-golden-dataset.yml
name: Backup Golden Dataset
on:
schedule:
- cron: '0 2 * * 0' # Weekly on Sunday at 2am
workflow_dispatch: # Manual trigger
jobs:
backup:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
cd backend
poetry install
- name: Run backup
env:
DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py backup
- name: Commit backup
run: |
git config user.name "GitHub Actions"
git config user.email "actions@github.com"
git add backend/data/golden_dataset_backup.json
git add backend/data/golden_dataset_metadata.json
git diff-index --quiet HEAD || git commit -m "chore: automated golden dataset backup [skip ci]"
git pushValidation on Pull Request:
# .github/workflows/validate-golden-dataset.yml
name: Validate Golden Dataset
on:
pull_request:
paths:
- 'backend/data/golden_dataset_backup.json'
schedule:
- cron: '0 8 * * 1' # Weekly on Monday 8am
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
cd backend
poetry install
- name: Start PostgreSQL
run: docker compose up -d postgres
- name: Run migrations
run: |
cd backend
poetry run alembic upgrade head
- name: Restore golden dataset
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py restore
- name: Validate dataset
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py verify
- name: Run retrieval tests
run: |
cd backend
poetry run pytest tests/integration/test_retrieval_quality.py -vPre-Deployment Checklist:
cd backend
# 1. Backup current data
poetry run python scripts/backup_golden_dataset.py backup
# 2. Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify
# 3. Run retrieval quality tests
poetry run pytest tests/integration/test_retrieval_quality.py
# 4. Check for regressions
# Expected: 91.6% pass rate, 0.777 MRR
# If lower, investigate before deployingManual CI Trigger:
# Trigger workflow manually
gh workflow run backup-golden-dataset.yml
# Check workflow status
gh run list --workflow=backup-golden-dataset.yml
# View logs
gh run view --logPre-Commit Hook:
#!/bin/bash
# Only run if golden dataset files changed
CHANGED_FILES=$(git diff --cached --name-only)
if echo "$CHANGED_FILES" | grep -q "fixtures/documents_expanded.json\|fixtures/queries.json"; then
echo "Validating golden dataset changes..."
cd backend
poetry run python scripts/data/add_to_golden_dataset.py validate-all
if [ $? -ne 0 ]; then
echo "Golden dataset validation failed!"
exit 1
fi
echo "Golden dataset validation passed"
fiIncorrect — Backup without validation:
# Missing verification step
- name: Run backup
run: poetry run python scripts/backup_golden_dataset.py backup
- name: Commit backup
run: git commit -m "chore: backup"Correct — Backup with verification:
# Verify backup integrity
- name: Run backup
run: poetry run python scripts/backup_golden_dataset.py backup
- name: Verify backup
run: poetry run python scripts/backup_golden_dataset.py verify
- name: Commit backup
run: git commit -m "chore: automated golden dataset backup [skip ci]"Key rules:
- Set up weekly automated backups to prevent data staleness
- Validate golden dataset on every PR that modifies dataset files
- Always run verification after automated backup creation
- Use
[skip ci]in automated commit messages to prevent infinite loops - Include pre-deployment validation in release checklists
Choose the right backup strategy and URL contract for golden dataset storage — HIGH
Storage Patterns
Backup strategies, URL contract enforcement, and data integrity checks.
Backup Strategy Comparison:
| Strategy | Version Control | Restore Speed | Portability | Inspection |
|---|---|---|---|---|
| JSON (recommended) | Yes | Slower (regen embeddings) | High | Easy |
| SQL Dump | No (binary) | Fast | DB-version dependent | Hard |
The URL Contract:
Golden dataset analyses MUST store real canonical URLs, not placeholders.
# WRONG - Placeholder URL (breaks restore)
analysis.url = "https://orchestkit.dev/placeholder/123"
# CORRECT - Real canonical URL (enables re-fetch if needed)
analysis.url = "https://docs.python.org/3/library/asyncio.html"Why this matters:
- Enables re-fetching content if embeddings need regeneration
- Allows validation that source content hasn't changed
- Provides audit trail for data provenance
URL Validation:
FORBIDDEN_URL_PATTERNS = [
"orchestkit.dev",
"placeholder",
"example.com",
"localhost",
"127.0.0.1",
]
def validate_url(url: str) -> tuple[bool, str]:
"""Validate URL is not a placeholder."""
for pattern in FORBIDDEN_URL_PATTERNS:
if pattern in url.lower():
return False, f"URL contains forbidden pattern: {pattern}"
if not url.startswith("https://"):
if not url.startswith("http://arxiv.org"): # arXiv redirects
return False, "URL must use HTTPS"
return True, "OK"Data Integrity Checks:
| Check | Error/Warning | Description |
|---|---|---|
| Count mismatch | Error | Analysis/chunk count differs from metadata |
| Placeholder URLs | Error | URLs containing orchestkit.dev or placeholder |
| Missing embeddings | Error | Chunks without embeddings after restore |
| Orphaned chunks | Warning | Chunks with no parent analysis |
Verification Implementation:
async def verify_golden_dataset() -> dict:
"""Verify golden dataset integrity."""
errors = []
warnings = []
async with get_session() as session:
# 1. Check counts
analysis_count = await session.scalar(select(func.count(Analysis.id)))
chunk_count = await session.scalar(select(func.count(Chunk.id)))
expected = load_metadata()
if analysis_count != expected["total_analyses"]:
errors.append(f"Analysis count mismatch: {analysis_count} vs {expected['total_analyses']}")
# 2. Check URL contract
query = select(Analysis).where(
Analysis.url.like("%orchestkit.dev%") |
Analysis.url.like("%placeholder%")
)
result = await session.execute(query)
invalid_urls = result.scalars().all()
if invalid_urls:
errors.append(f"Found {len(invalid_urls)} analyses with placeholder URLs")
# 3. Check embeddings exist
query = select(Chunk).where(Chunk.embedding.is_(None))
result = await session.execute(query)
missing_embeddings = result.scalars().all()
if missing_embeddings:
errors.append(f"Found {len(missing_embeddings)} chunks without embeddings")
# 4. Check orphaned chunks
query = select(Chunk).outerjoin(Analysis).where(Analysis.id.is_(None))
result = await session.execute(query)
orphaned = result.scalars().all()
if orphaned:
warnings.append(f"Found {len(orphaned)} orphaned chunks")
return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}Best Practices:
- Version control backups -- commit JSON to git for history and diffs
- Validate before deployment -- run verify before production changes
- Test restore in staging -- never test restore in production first
- Document changes -- track additions/removals in metadata
Incorrect — Missing URL validation:
# No URL contract enforcement
analysis.url = url # Could be placeholder
session.add(analysis)
await session.commit()Correct — Enforcing URL contract:
# Validate before saving
valid, msg = validate_url(url)
if not valid:
raise ValueError(f"Invalid URL: {msg}")
analysis.url = url # Guaranteed to be real canonical URL
session.add(analysis)
await session.commit()Key rules:
- Always use JSON backup for version control and portability
- Never store placeholder URLs -- enforce the URL contract
- Run all 4 integrity checks (counts, URLs, embeddings, orphans) after every restore
- SQL dumps for local snapshots only, not version control
Version golden datasets for reproducible evaluation across environments and recovery — HIGH
Dataset Versioning
JSON backup format, embedding regeneration, and disaster recovery patterns.
Backup Format:
{
"version": "1.0",
"created_at": "2025-12-19T10:30:00Z",
"metadata": {
"total_analyses": 98,
"total_chunks": 415,
"total_artifacts": 98
},
"analyses": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"url": "https://docs.python.org/3/library/asyncio.html",
"content_type": "documentation",
"status": "completed",
"created_at": "2025-11-15T08:20:00Z",
"chunks": [
{
"id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"content": "asyncio is a library...",
"section_title": "Introduction to asyncio"
}
]
}
]
}Key design decisions:
- Embeddings excluded (regenerate on restore with current model)
- Nested structure (analyses -> chunks -> artifacts)
- Metadata for validation
- ISO timestamps for reproducibility
Restore with Embedding Regeneration:
async def restore_golden_dataset(replace: bool = False):
"""Restore golden dataset from JSON backup."""
with open(BACKUP_FILE) as f:
backup_data = json.load(f)
async with get_session() as session:
if replace:
await session.execute(delete(Chunk))
await session.execute(delete(Artifact))
await session.execute(delete(Analysis))
await session.commit()
from app.shared.services.embeddings import embed_text
for analysis_data in backup_data["analyses"]:
analysis = Analysis(
id=UUID(analysis_data["id"]),
url=analysis_data["url"],
)
session.add(analysis)
for chunk_data in analysis_data["chunks"]:
# Regenerate embedding using CURRENT model
embedding = await embed_text(chunk_data["content"])
chunk = Chunk(
id=UUID(chunk_data["id"]),
analysis_id=analysis.id,
content=chunk_data["content"],
embedding=embedding, # Freshly generated!
)
session.add(chunk)
if idx % 10 == 0:
await session.commit()
await session.commit()Why regenerate embeddings?
- Embedding models improve over time (Voyage AI v1 -> v2)
- Ensures consistency with current production model
- Smaller backup files (exclude large vectors)
Disaster Recovery Scenarios:
| Scenario | Steps |
|---|---|
| Accidental deletion | restore --replace -> verify -> run tests |
| Migration failure | alembic downgrade -1 -> restore --replace -> fix migration |
| New environment | Clone repo -> setup DB -> restore -> run tests |
CLI Commands:
cd backend
# Backup golden dataset
poetry run python scripts/backup_golden_dataset.py backup
# Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify
# Restore from backup (WARNING: Deletes existing data)
poetry run python scripts/backup_golden_dataset.py restore --replace
# Restore without deleting (adds to existing)
poetry run python scripts/backup_golden_dataset.py restoreIncorrect — Storing embeddings in backup:
# Embedding vectors bloat backup file
backup_data = {
"chunks": [{
"content": "...",
"embedding": [0.123, 0.456, ...], # 1024 floats!
}]
}Correct — Regenerate embeddings on restore:
# Exclude embeddings from backup
backup_data = {
"chunks": [{
"content": "...",
# No embedding field
}]
}
# Regenerate during restore
embedding = await embed_text(chunk_data["content"])
chunk.embedding = embedding # Fresh with current modelKey rules:
- Always regenerate embeddings on restore -- never store them in backup
- Commit backups every 10 analyses to avoid huge transactions
- Verify counts match metadata after every restore
- Test restore procedures in staging before production
Detect duplicate entries and coverage gaps that skew golden dataset evaluation results — CRITICAL
Drift Detection
Duplicate detection, semantic similarity checking, and coverage gap analysis.
Duplicate Detection Thresholds:
| Similarity | Action |
|---|---|
| >= 0.90 | Block -- Content too similar |
| >= 0.85 | Warn -- High similarity detected |
| >= 0.80 | Note -- Similar content exists |
| < 0.80 | Allow -- Sufficiently unique |
Semantic Similarity Check:
import numpy as np
from typing import Optional
async def check_duplicate(
new_content: str,
existing_embeddings: list[tuple[str, np.ndarray]],
embedding_service,
threshold: float = 0.85,
) -> Optional[tuple[str, float]]:
"""Check if content is duplicate of existing document.
Returns:
(doc_id, similarity) if duplicate found, None otherwise
"""
# Generate embedding for new content
new_embedding = await embedding_service.generate_embedding(
text=new_content[:8000], # Truncate for embedding
normalize=True,
)
new_vec = np.array(new_embedding)
# Compare against existing
max_similarity = 0.0
most_similar_doc = None
for doc_id, existing_vec in existing_embeddings:
# Cosine similarity (vectors are normalized)
similarity = np.dot(new_vec, existing_vec)
if similarity > max_similarity:
max_similarity = similarity
most_similar_doc = doc_id
if max_similarity >= threshold:
return (most_similar_doc, max_similarity)
return NoneURL Duplicate Check:
def check_url_duplicate(
new_url: str,
source_url_map: dict[str, str],
) -> Optional[str]:
"""Check if URL already exists in dataset."""
normalized = normalize_url(new_url)
for doc_id, existing_url in source_url_map.items():
if normalize_url(existing_url) == normalized:
return doc_id
return None
def normalize_url(url: str) -> str:
"""Normalize URL for comparison."""
from urllib.parse import urlparse, urlunparse
parsed = urlparse(url.lower())
netloc = parsed.netloc.replace("www.", "")
path = parsed.path.rstrip("/")
return urlunparse((
parsed.scheme, netloc, path,
"", "", "", # params, query, fragment stripped
))Pre-Addition Validation Workflow:
async def validate_before_add(
document: dict,
existing_documents: list[dict],
source_url_map: dict[str, str],
embedding_service,
) -> dict:
"""Run full validation before adding document."""
errors = []
warnings = []
# 1. Schema validation
schema_errors = validate_schema(document)
errors.extend(schema_errors)
# 2. URL validation
url_valid, url_msg = validate_url(document.get("source_url", ""))
if not url_valid:
errors.append(url_msg)
# 3. URL duplicate check
url_dup = check_url_duplicate(document.get("source_url", ""), source_url_map)
if url_dup:
errors.append(f"URL already exists in dataset as: {url_dup}")
# 4. Semantic duplicate check
content = " ".join(
s.get("content", "") for s in document.get("sections", [])
)
existing_embeddings = await load_existing_embeddings(existing_documents)
dup_result = await check_duplicate(content, existing_embeddings, embedding_service)
if dup_result and dup_result[1] >= 0.90:
errors.append(
f"Content too similar to: {dup_result[0]} (similarity: {dup_result[1]:.2f})"
)
elif dup_result and dup_result[1] >= 0.80:
warnings.append(
f"Content similar to: {dup_result[0]} (similarity: {dup_result[1]:.2f})"
)
return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}Incorrect — Raw URL comparison:
# Fails for www/https/trailing slash variants
if new_url == existing_url:
return "duplicate"Correct — Normalized URL comparison:
# Normalize both URLs before comparing
def normalize_url(url: str) -> str:
parsed = urlparse(url.lower())
netloc = parsed.netloc.replace("www.", "")
path = parsed.path.rstrip("/")
return urlunparse((parsed.scheme, netloc, path, "", "", ""))
if normalize_url(new_url) == normalize_url(existing_url):
return "duplicate"Key rules:
- Always run both URL and semantic duplicate checks before adding entries
- Block entries with >= 0.90 cosine similarity to existing content
- Normalize URLs before comparison (strip www, trailing slashes, query params)
- Run coverage gap analysis periodically to detect dataset drift
- Truncate content to 8000 chars for embedding comparison
Validate schema and content quality to prevent invalid entries from degrading evaluations — CRITICAL
Quality Validation
Schema validation, content quality checks, and referential integrity enforcement.
Document Schema (v2.0):
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["id", "title", "source_url", "content_type", "sections"],
"properties": {
"id": {
"type": "string",
"pattern": "^[a-z0-9-]+$",
"description": "Unique kebab-case identifier"
},
"title": {
"type": "string",
"minLength": 10,
"maxLength": 200
},
"source_url": {
"type": "string",
"format": "uri",
"description": "Canonical source URL (NOT placeholder)"
},
"content_type": {
"type": "string",
"enum": ["article", "tutorial", "research_paper", "documentation", "video_transcript", "code_repository"]
},
"tags": {
"type": "array",
"items": {"type": "string"},
"minItems": 2,
"maxItems": 10
},
"sections": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": ["id", "title", "content"],
"properties": {
"id": {"type": "string", "pattern": "^[a-z0-9-/]+$"},
"title": {"type": "string"},
"content": {"type": "string", "minLength": 50}
}
}
}
}
}Content Quality Validation:
def validate_content_quality(document: dict) -> list[str]:
"""Validate document content meets quality standards."""
warnings = []
# Title length
title = document.get("title", "")
if len(title) < 10:
warnings.append("Title too short (min 10 chars)")
if len(title) > 200:
warnings.append("Title too long (max 200 chars)")
# Section content
for section in document.get("sections", []):
content = section.get("content", "")
if len(content) < 50:
warnings.append(f"Section {section['id']} content too short (min 50 chars)")
if len(content) > 50000:
warnings.append(f"Section {section['id']} content very long (>50k chars)")
# Tags
tags = document.get("tags", [])
if len(tags) < 2:
warnings.append("Too few tags (min 2)")
if len(tags) > 10:
warnings.append("Too many tags (max 10)")
return warningsUnique ID Validation:
def validate_unique_ids(documents: list[dict], queries: list[dict]) -> list[str]:
"""Ensure all IDs are unique across documents and queries."""
errors = []
# Document IDs
doc_ids = [d["id"] for d in documents]
if len(doc_ids) != len(set(doc_ids)):
duplicates = [id for id in doc_ids if doc_ids.count(id) > 1]
errors.append(f"Duplicate document IDs: {set(duplicates)}")
# Query IDs
query_ids = [q["id"] for q in queries]
if len(query_ids) != len(set(query_ids)):
duplicates = [id for id in query_ids if query_ids.count(id) > 1]
errors.append(f"Duplicate query IDs: {set(duplicates)}")
# Section IDs within documents
for doc in documents:
section_ids = [s["id"] for s in doc.get("sections", [])]
if len(section_ids) != len(set(section_ids)):
errors.append(f"Duplicate section IDs in document: {doc['id']}")
return errorsReferential Integrity:
def validate_references(documents: list[dict], queries: list[dict]) -> list[str]:
"""Ensure query expected_chunks reference valid section IDs."""
errors = []
# Build set of all valid section IDs
valid_sections = set()
for doc in documents:
for section in doc.get("sections", []):
valid_sections.add(section["id"])
# Check query references
for query in queries:
for chunk_id in query.get("expected_chunks", []):
if chunk_id not in valid_sections:
errors.append(
f"Query {query['id']} references invalid section: {chunk_id}"
)
return errorsValidation Rules Summary:
| Rule | Purpose | Severity |
|---|---|---|
| No Placeholder URLs | Ensure real canonical URLs | Error |
| Unique Identifiers | No duplicate doc/query/section IDs | Error |
| Referential Integrity | Query chunks reference valid sections | Error |
| Content Quality | Title/content length, tag count | Warning |
| Difficulty Distribution | Balanced query difficulty levels | Warning |
Incorrect — Missing referential integrity check:
# Query references non-existent section
query = {
"id": "q-test",
"expected_chunks": ["section-999"], # Doesn't exist!
}
queries.append(query) # No validationCorrect — Validate references exist:
# Build set of valid section IDs
valid_sections = set()
for doc in documents:
for section in doc.get("sections", []):
valid_sections.add(section["id"])
# Validate query references
for chunk_id in query.get("expected_chunks", []):
if chunk_id not in valid_sections:
raise ValueError(f"Query references invalid section: {chunk_id}")Key rules:
- All documents must pass schema validation before inclusion
- IDs must be unique across documents, queries, and sections
- Query expected_chunks must reference existing section IDs
- Content quality checks are warnings (non-blocking) but should be addressed
Run regression tests and enforce difficulty distribution to maintain evaluation reliability — CRITICAL
Regression Testing
Difficulty distribution enforcement, pre-commit hooks, and full dataset validation.
Difficulty Distribution Validation:
def validate_difficulty_distribution(queries: list[dict]) -> list[str]:
"""Ensure balanced difficulty distribution."""
warnings = []
# Count by difficulty
distribution = {}
for query in queries:
diff = query.get("difficulty", "unknown")
distribution[diff] = distribution.get(diff, 0) + 1
# Minimum requirements
requirements = {
"trivial": 3,
"easy": 3,
"medium": 5, # Most common real-world case
"hard": 3,
}
for level, min_count in requirements.items():
actual = distribution.get(level, 0)
if actual < min_count:
warnings.append(
f"Insufficient {level} queries: {actual}/{min_count}"
)
return warningsQuery Schema:
{
"type": "object",
"required": ["id", "query", "difficulty", "expected_chunks", "min_score"],
"properties": {
"id": {"type": "string", "pattern": "^q-[a-z0-9-]+$"},
"query": {"type": "string", "minLength": 5, "maxLength": 500},
"modes": {"type": "array", "items": {"enum": ["semantic", "keyword", "hybrid"]}},
"category": {"enum": ["specific", "broad", "negative", "edge", "coarse-to-fine"]},
"difficulty": {"enum": ["trivial", "easy", "medium", "hard", "adversarial"]},
"expected_chunks": {"type": "array", "items": {"type": "string"}, "minItems": 1},
"min_score": {"type": "number", "minimum": 0, "maximum": 1}
}
}Full Dataset Validation:
async def validate_full_dataset() -> dict:
"""Run comprehensive validation on entire dataset.
Use this for:
- Pre-commit hooks
- CI/CD validation
- Periodic integrity checks
"""
from backend.tests.smoke.retrieval.fixtures.loader import FixtureLoader
loader = FixtureLoader(use_expanded=True)
documents = loader.load_documents()
queries = loader.load_queries()
source_url_map = loader.load_source_url_map()
all_errors = []
all_warnings = []
# 1. Schema validation for all documents
for doc in documents:
errors = validate_schema(doc)
all_errors.extend([f"[{doc['id']}] {e}" for e in errors])
# 2. Unique ID validation
id_errors = validate_unique_ids(documents, queries)
all_errors.extend(id_errors)
# 3. Referential integrity
ref_errors = validate_references(documents, queries)
all_errors.extend(ref_errors)
# 4. URL validation
for doc in documents:
valid, msg = validate_url(doc.get("source_url", ""))
if not valid:
all_errors.append(f"[{doc['id']}] {msg}")
# 5. Difficulty distribution
dist_warnings = validate_difficulty_distribution(queries)
all_warnings.extend(dist_warnings)
# 6. Coverage analysis
coverage = analyze_coverage_gaps(documents, queries)
all_warnings.extend(coverage["gaps"])
return {
"valid": len(all_errors) == 0,
"errors": all_errors,
"warnings": all_warnings,
"coverage": coverage,
"stats": {
"documents": len(documents),
"queries": len(queries),
"sections": sum(len(d.get("sections", [])) for d in documents),
}
}Pre-Commit Hook:
#!/bin/bash
# .claude/hooks/pretool/bash/validate-golden-dataset.sh
# Only run if golden dataset files changed
CHANGED_FILES=$(git diff --cached --name-only)
if echo "$CHANGED_FILES" | grep -q "fixtures/documents_expanded.json\|fixtures/queries.json\|fixtures/source_url_map.json"; then
echo "Validating golden dataset changes..."
cd backend
poetry run python scripts/data/add_to_golden_dataset.py validate-all
if [ $? -ne 0 ]; then
echo "Golden dataset validation failed!"
echo "Fix errors before committing."
exit 1
fi
echo "Golden dataset validation passed"
fiCLI Validation Commands:
# Validate specific document
poetry run python scripts/data/add_to_golden_dataset.py validate \
--document-id "new-doc-id"
# Validate full dataset
poetry run python scripts/data/add_to_golden_dataset.py validate-all
# Check for duplicates
poetry run python scripts/data/add_to_golden_dataset.py check-duplicate \
--url "https://example.com/article"
# Analyze coverage gaps
poetry run python scripts/data/add_to_golden_dataset.py coverageIncorrect — Unbalanced difficulty distribution:
# All queries marked "easy"
queries = [
{"id": "q-1", "difficulty": "easy"},
{"id": "q-2", "difficulty": "easy"},
{"id": "q-3", "difficulty": "easy"},
]Correct — Balanced difficulty distribution:
# Mix of difficulty levels
queries = [
{"id": "q-1", "difficulty": "trivial"}, # 3+ trivial
{"id": "q-2", "difficulty": "easy"}, # 3+ easy
{"id": "q-3", "difficulty": "medium"}, # 5+ medium
{"id": "q-4", "difficulty": "hard"}, # 3+ hard
]
# Validate distribution
validate_difficulty_distribution(queries) # Checks minimumsKey rules:
- Run full dataset validation before every commit that modifies golden dataset files
- Enforce minimum difficulty distribution (trivial 3, easy 3, medium 5, hard 3)
- Run all 6 validation steps: schema, IDs, references, URLs, distribution, coverage
- Block commits that introduce schema errors or referential integrity violations
- Treat difficulty distribution and coverage gaps as warnings that should be addressed
References (8)
Annotation Patterns
Annotation Patterns
Multi-agent analysis pipeline and consensus aggregation for golden dataset curation.
Multi-Agent Analysis Pipeline
Architecture
INPUT: URL/Content
|
v
+------------------+
| FETCH AGENT | WebFetch or file read
| (sequential) | Extract structure, detect type
+--------+---------+
|
v
+-----------------------------------------------+
| PARALLEL ANALYSIS AGENTS |
| +----------+ +----------+ +--------+ +------+ |
| | Quality | |Difficulty| | Domain | |Query | |
| |Evaluator | |Classifier| | Tagger | |Gen | |
| +----+-----+ +----+-----+ +---+----+ +--+---+ |
| | | | | |
+-------+------------+-----------+---------+-----+
| | | |
+------------+-----------+---------+
|
v
+-----------------------------------------------+
| CONSENSUS AGGREGATOR |
| - Weighted quality score |
| - Confidence level (agent agreement) |
| - Final recommendation: include/review/exclude|
+--------+--------------------------------------+
|
v
+------------------+
| USER APPROVAL | Show scores, get confirmation
+--------+---------+
|
v
OUTPUT: Curated document entryAgent Specifications
Quality Evaluator Agent
Task(
subagent_type="code-quality-reviewer",
prompt="""GOLDEN DATASET QUALITY EVALUATION
Evaluate this content for golden dataset inclusion:
Content: {content_preview}
Source: {source_url}
Type: {content_type}
Score these dimensions (0.0-1.0):
1. ACCURACY (weight 0.25)
- Technical correctness
- Code validity
- Up-to-date information
2. COHERENCE (weight 0.20)
- Logical structure
- Clear flow
- Consistent terminology
3. DEPTH (weight 0.25)
- Comprehensive coverage
- Edge cases mentioned
- Appropriate detail level
4. RELEVANCE (weight 0.30)
- Alignment with AI/ML, backend, frontend, DevOps
- Practical applicability
- Technical value
Output JSON:
{
"accuracy": {"score": 0.X, "rationale": "..."},
"coherence": {"score": 0.X, "rationale": "..."},
"depth": {"score": 0.X, "rationale": "..."},
"relevance": {"score": 0.X, "rationale": "..."},
"weighted_total": 0.X,
"recommendation": "include|review|exclude"
}
""",
run_in_background=True
)Difficulty Classifier Agent
Task(
subagent_type="workflow-architect",
prompt="""DIFFICULTY CLASSIFICATION
Analyze document complexity for retrieval testing:
Content: {content_preview}
Sections: {section_titles}
Assess these factors:
1. Technical term density (count specialized terms)
2. Section complexity (nesting depth, count)
3. Cross-domain references (links between topics)
4. Abstraction level (concrete vs conceptual)
5. Query ambiguity potential (how many ways to ask about this?)
Output JSON:
{
"difficulty": "trivial|easy|medium|hard|adversarial",
"factors": {
"technical_density": "low|medium|high",
"structure_complexity": "simple|moderate|complex",
"cross_references": "none|some|many",
"abstraction": "concrete|mixed|abstract"
},
"expected_retrieval_score": 0.X,
"rationale": "..."
}
"""
)Domain Tagger Agent
Task(
subagent_type="data-pipeline-engineer",
prompt="""DOMAIN TAGGING
Extract domain tags for this content:
Content: {content_preview}
Source: {source_url}
Primary domains (pick 1-2):
- ai-ml (LLM, agents, RAG, embeddings, LangGraph)
- backend (FastAPI, PostgreSQL, APIs, microservices)
- frontend (React, TypeScript, UI/UX)
- devops (Docker, K8s, CI/CD, infrastructure)
- security (auth, OWASP, encryption)
- databases (SQL, NoSQL, vector DBs)
- testing (pytest, playwright, TDD)
Secondary tags (pick 3-5):
- Specific technologies mentioned
- Patterns/concepts covered
- Use cases addressed
Output JSON:
{
"primary_domains": ["ai-ml", "backend"],
"tags": ["langraph", "agents", "tool-use", "fastapi"],
"confidence": 0.X
}
"""
)Query Generator Agent
Task(
subagent_type="test-generator",
prompt="""TEST QUERY GENERATION
Generate test queries for this golden dataset document:
Document ID: {document_id}
Title: {title}
Sections: {section_titles}
Content preview: {content_preview}
Generate 3-5 test queries with varied difficulty:
1. At least 1 TRIVIAL query (exact keyword match)
2. At least 1 EASY query (synonyms, common terms)
3. At least 1 MEDIUM query (paraphrased intent)
4. Optional: 1 HARD query (cross-section reasoning)
For each query specify:
- Query text
- Expected sections to match
- Difficulty level
- Minimum expected score
Output JSON:
{
"queries": [
{
"id": "q-{doc-id}-{num}",
"query": "How to implement X with Y?",
"difficulty": "medium",
"expected_chunks": ["section-id-1", "section-id-2"],
"min_score": 0.55,
"modes": ["semantic", "hybrid"],
"category": "specific",
"description": "Tests retrieval of X implementation details"
}
]
}
"""
)Consensus Aggregation
Aggregation Logic
from dataclasses import dataclass
from typing import Literal
@dataclass
class CurationConsensus:
"""Aggregated result from multi-agent analysis."""
quality_score: float # Weighted average (0-1)
confidence: float # Agent agreement (0-1)
decision: Literal["include", "review", "exclude"]
# Individual scores
accuracy: float
coherence: float
depth: float
relevance: float
# Classification results
content_type: str
difficulty: str
tags: list[str]
# Generated queries
suggested_queries: list[dict]
# Warnings
warnings: list[str]
def aggregate_results(
quality_result: dict,
difficulty_result: dict,
domain_result: dict,
query_result: dict,
) -> CurationConsensus:
"""Aggregate multi-agent results into consensus."""
# Calculate weighted quality score
q = quality_result
quality_score = (
q["accuracy"]["score"] * 0.25 +
q["coherence"]["score"] * 0.20 +
q["depth"]["score"] * 0.25 +
q["relevance"]["score"] * 0.30
)
# Calculate confidence (variance-based)
scores = [
q["accuracy"]["score"],
q["coherence"]["score"],
q["depth"]["score"],
q["relevance"]["score"],
]
variance = sum((s - quality_score)**2 for s in scores) / len(scores)
confidence = 1.0 - min(variance * 4, 1.0)
# Decision thresholds
if quality_score >= 0.75 and confidence >= 0.7:
decision = "include"
elif quality_score >= 0.55:
decision = "review"
else:
decision = "exclude"
# Collect warnings
warnings = []
if q["accuracy"]["score"] < 0.6:
warnings.append("Low accuracy score - verify technical claims")
if q["relevance"]["score"] < 0.7:
warnings.append("Low relevance - may be off-topic for OrchestKit")
if domain_result["confidence"] < 0.7:
warnings.append("Low confidence in domain classification")
return CurationConsensus(
quality_score=quality_score,
confidence=confidence,
decision=decision,
accuracy=q["accuracy"]["score"],
coherence=q["coherence"]["score"],
depth=q["depth"]["score"],
relevance=q["relevance"]["score"],
content_type=difficulty_result.get("content_type", "article"),
difficulty=difficulty_result["difficulty"],
tags=domain_result["tags"],
suggested_queries=query_result["queries"],
warnings=warnings,
)Langfuse Integration
Trace Structure
# Langfuse trace for curation workflow
trace = langfuse.trace(
name="golden-dataset-curation",
metadata={
"source_url": url,
"document_id": doc_id,
}
)
# Spans for each agent
with trace.span(name="fetch_content") as span:
content = fetch_url(url)
span.update(output={"length": len(content)})
with trace.span(name="quality_evaluation") as span:
quality_result = await run_quality_agent(content)
span.update(output=quality_result)
# Log individual dimension scores
trace.score(name="accuracy", value=quality_result["accuracy"]["score"])
trace.score(name="coherence", value=quality_result["coherence"]["score"])
trace.score(name="depth", value=quality_result["depth"]["score"])
trace.score(name="relevance", value=quality_result["relevance"]["score"])
# Final aggregated score
trace.score(name="quality_total", value=consensus.quality_score)
trace.event(
name="curation_decision",
metadata={"decision": consensus.decision}
)Prompt Management
All curation prompts are managed in Langfuse:
| Prompt Name | Purpose | Tags |
|---|---|---|
golden-content-classifier | Classify content_type | golden-dataset, classification |
golden-difficulty-classifier | Assign difficulty | golden-dataset, difficulty |
golden-domain-tagger | Extract tags | golden-dataset, tagging |
golden-query-generator | Generate queries | golden-dataset, query-gen |
Backup Restore
Backup & Restore Golden Dataset
Backup Process
1. Export to JSON
# backend/scripts/backup_golden_dataset.py backup
async def backup_golden_dataset():
"""Export golden dataset to JSON."""
async with get_session() as session:
# Fetch all completed analyses
query = (
select(Analysis)
.where(Analysis.status == "completed")
.options(
selectinload(Analysis.chunks),
selectinload(Analysis.artifact)
)
.order_by(Analysis.created_at)
)
result = await session.execute(query)
analyses = result.scalars().all()
# Serialize
backup_data = {
"version": "1.0",
"created_at": datetime.now(UTC).isoformat(),
"metadata": {
"total_analyses": len(analyses),
"total_chunks": sum(len(a.chunks) for a in analyses),
"total_artifacts": sum(1 for a in analyses if a.artifact)
},
"analyses": [serialize_analysis(a) for a in analyses]
}
# Write to file
BACKUP_FILE.parent.mkdir(exist_ok=True)
with open(BACKUP_FILE, "w") as f:
json.dump(backup_data, f, indent=2, default=str)
# Write metadata (quick stats)
with open(METADATA_FILE, "w") as f:
json.dump(backup_data["metadata"], f, indent=2)
print(f"✅ Backup completed: {len(analyses)} analyses, {backup_data['metadata']['total_chunks']} chunks")2. Serialize Without Embeddings
def serialize_chunk(chunk: Chunk) -> dict:
"""Serialize chunk WITHOUT embedding vector."""
return {
"id": str(chunk.id),
"content": chunk.content,
"section_title": chunk.section_title,
"section_path": chunk.section_path,
"content_type": chunk.content_type,
"chunk_index": chunk.chunk_index
# embedding excluded - regenerated on restore
}Why exclude embeddings?
- Smaller backup files (415 chunks × 1024 dims × 4 bytes = 1.7 MB saved)
- Model independence (can restore with different model)
- Version control friendly (JSON diffs are meaningful)
Restore Process
1. Load and Validate Backup
async def restore_golden_dataset(replace: bool = False):
"""Restore golden dataset from JSON backup."""
# Load backup file
if not BACKUP_FILE.exists():
raise FileNotFoundError(f"Backup file not found: {BACKUP_FILE}")
with open(BACKUP_FILE) as f:
backup_data = json.load(f)
# Validate structure
required_keys = ["version", "created_at", "metadata", "analyses"]
for key in required_keys:
if key not in backup_data:
raise ValueError(f"Invalid backup: missing '{key}'")
print(f"📦 Loading backup from {backup_data['created_at']}")
print(f" Analyses: {backup_data['metadata']['total_analyses']}")
print(f" Chunks: {backup_data['metadata']['total_chunks']}")2. Clear Existing Data (Optional)
async with get_session() as session:
if replace:
print("⚠️ Deleting existing data...")
# Delete in correct order (respect foreign keys)
await session.execute(delete(Chunk))
await session.execute(delete(Artifact))
await session.execute(delete(Analysis))
await session.commit()
print("✅ Existing data cleared")3. Restore Analyses and Chunks
from app.shared.services.embeddings import embed_text
total_chunks = 0
for idx, analysis_data in enumerate(backup_data["analyses"], 1):
print(f"[{idx}/{len(backup_data['analyses'])}] Restoring {analysis_data['url'][:50]}...")
# Create analysis
analysis = Analysis(
id=UUID(analysis_data["id"]),
url=analysis_data["url"],
content_type=analysis_data["content_type"],
status=analysis_data["status"],
created_at=datetime.fromisoformat(analysis_data["created_at"])
# ... other fields ...
)
session.add(analysis)
# Restore chunks with regenerated embeddings
for chunk_data in analysis_data["chunks"]:
# Generate embedding using CURRENT model
embedding = await embed_text(chunk_data["content"])
chunk = Chunk(
id=UUID(chunk_data["id"]),
analysis_id=analysis.id,
content=chunk_data["content"],
embedding=embedding, # Freshly generated
section_title=chunk_data.get("section_title"),
section_path=chunk_data.get("section_path"),
content_type=chunk_data["content_type"],
chunk_index=chunk_data["chunk_index"]
)
session.add(chunk)
total_chunks += 1
# Restore artifact
if analysis_data.get("artifact"):
artifact_data = analysis_data["artifact"]
artifact = Artifact(
id=UUID(artifact_data["id"]),
analysis_id=analysis.id,
summary=artifact_data["summary"],
# ... other fields ...
)
session.add(artifact)
# Commit every 10 analyses (avoid huge transactions)
if idx % 10 == 0:
await session.commit()
# Final commit
await session.commit()
print(f"✅ Restore completed: {len(backup_data['analyses'])} analyses, {total_chunks} chunks")4. Verify Restore
# Verify counts match
analysis_count = await session.scalar(select(func.count(Analysis.id)))
chunk_count = await session.scalar(select(func.count(Chunk.id)))
assert analysis_count == backup_data["metadata"]["total_analyses"]
assert chunk_count == backup_data["metadata"]["total_chunks"]
print("✅ Verification passed")CLI Commands
cd backend
# Backup
poetry run python scripts/backup_golden_dataset.py backup
# Restore (add to existing data)
poetry run python scripts/backup_golden_dataset.py restore
# Restore (replace all data - DESTRUCTIVE!)
poetry run python scripts/backup_golden_dataset.py restore --replace
# Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verifyRegenerating Embeddings
Why regenerate?
- Embedding models improve over time (Voyage AI v1 → v2)
- Ensures consistency with current production model
- Smaller backup files
Process:
from app.shared.services.embeddings import embed_text
async def regenerate_embeddings():
"""Regenerate embeddings for all chunks."""
async with get_session() as session:
# Fetch all chunks
query = select(Chunk).order_by(Chunk.id)
result = await session.execute(query)
chunks = result.scalars().all()
print(f"Regenerating embeddings for {len(chunks)} chunks...")
for idx, chunk in enumerate(chunks, 1):
# Generate new embedding
embedding = await embed_text(chunk.content)
# Update chunk
chunk.embedding = embedding
if idx % 50 == 0:
await session.commit()
print(f" Progress: {idx}/{len(chunks)}")
await session.commit()
print("✅ Embeddings regenerated")Runtime: ~415 chunks × 200ms = ~83 seconds
SQL Dump (Alternative)
Create SQL Dump
# Dump only golden dataset tables
pg_dump $DATABASE_URL \
--table=analyses \
--table=chunks \
--table=artifacts \
--data-only \
--file=backend/data/golden_dataset_dump.sql
# ~5 MB for 98 analyses + 415 chunks (includes embeddings)Restore from SQL Dump
# Restore SQL dump
psql $DATABASE_URL < backend/data/golden_dataset_dump.sqlPros:
- Fast (includes embeddings, no regeneration)
- Exact replica
Cons:
- Not version controlled (too large, binary)
- DB version dependent
- No easy inspection
OrchestKit uses JSON (version controlled), SQL dump for local snapshots only.
Error Handling
async def restore_with_error_handling():
"""Restore with proper error handling."""
try:
await restore_golden_dataset(replace=True)
except FileNotFoundError as e:
print(f"❌ Backup file not found: {e}")
print(f" Expected: {BACKUP_FILE}")
return False
except ValueError as e:
print(f"❌ Invalid backup format: {e}")
return False
except Exception as e:
print(f"❌ Restore failed: {e}")
# Rollback handled by async context manager
return False
return TrueReferences
- OrchestKit:
backend/scripts/backup_golden_dataset.py - OrchestKit:
backend/data/golden_dataset_backup.json
Quality Metrics
Quality Metrics and Coverage Analysis
Metrics and analysis patterns for golden dataset quality.
Coverage Analysis
Gap Detection
def analyze_coverage_gaps(
documents: list[dict],
queries: list[dict],
) -> dict:
"""Analyze dataset coverage and identify gaps."""
# Content type distribution
content_types = {}
for doc in documents:
ct = doc.get("content_type", "unknown")
content_types[ct] = content_types.get(ct, 0) + 1
# Domain/tag distribution
all_tags = []
for doc in documents:
all_tags.extend(doc.get("tags", []))
tag_counts = {}
for tag in all_tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
# Difficulty distribution
difficulties = {}
for query in queries:
diff = query.get("difficulty", "unknown")
difficulties[diff] = difficulties.get(diff, 0) + 1
# Identify gaps
gaps = []
# Check content type balance
total_docs = len(documents)
if content_types.get("tutorial", 0) / total_docs < 0.15:
gaps.append("Under-represented: tutorials (<15%)")
if content_types.get("research_paper", 0) / total_docs < 0.05:
gaps.append("Under-represented: research papers (<5%)")
# Check domain coverage
expected_domains = ["ai-ml", "backend", "frontend", "devops", "security"]
for domain in expected_domains:
if tag_counts.get(domain, 0) < 5:
gaps.append(f"Under-represented domain: {domain} (<5 docs)")
# Check difficulty balance
total_queries = len(queries)
if difficulties.get("hard", 0) / total_queries < 0.10:
gaps.append("Under-represented: hard queries (<10%)")
if difficulties.get("adversarial", 0) / total_queries < 0.05:
gaps.append("Under-represented: adversarial queries (<5%)")
return {
"content_type_distribution": content_types,
"tag_distribution": dict(sorted(tag_counts.items(), key=lambda x: -x[1])[:20]),
"difficulty_distribution": difficulties,
"gaps": gaps,
"total_documents": total_docs,
"total_queries": total_queries,
}Validation Workflow
Pre-Addition Validation
async def validate_before_add(
document: dict,
existing_documents: list[dict],
existing_queries: list[dict],
source_url_map: dict[str, str],
embedding_service,
) -> dict:
"""Run full validation before adding document.
Returns:
{
"valid": bool,
"errors": list[str], # Blocking issues
"warnings": list[str], # Non-blocking issues
"duplicate_check": {
"is_duplicate": bool,
"similar_to": str | None,
"similarity": float | None,
}
}
"""
errors = []
warnings = []
# 1. Schema validation
schema_errors = validate_schema(document)
errors.extend(schema_errors)
# 2. URL validation
url_valid, url_msg = validate_url(document.get("source_url", ""))
if not url_valid:
errors.append(url_msg)
# 3. URL duplicate check
url_dup = check_url_duplicate(document.get("source_url", ""), source_url_map)
if url_dup:
errors.append(f"URL already exists in dataset as: {url_dup}")
# 4. Content quality
quality_warnings = validate_content_quality(document)
warnings.extend(quality_warnings)
# 5. Semantic duplicate check
content = " ".join(
s.get("content", "") for s in document.get("sections", [])
)
existing_embeddings = await load_existing_embeddings(existing_documents)
dup_result = await check_duplicate(
content, existing_embeddings, embedding_service
)
duplicate_check = {
"is_duplicate": dup_result is not None,
"similar_to": dup_result[0] if dup_result else None,
"similarity": dup_result[1] if dup_result else None,
}
if dup_result and dup_result[1] >= 0.90:
errors.append(
f"Content too similar to existing document: {dup_result[0]} "
f"(similarity: {dup_result[1]:.2f})"
)
elif dup_result and dup_result[1] >= 0.80:
warnings.append(
f"Content similar to existing document: {dup_result[0]} "
f"(similarity: {dup_result[1]:.2f})"
)
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"duplicate_check": duplicate_check,
}Full Dataset Validation
async def validate_full_dataset() -> dict:
"""Run comprehensive validation on entire dataset.
Use this for:
- Pre-commit hooks
- CI/CD validation
- Periodic integrity checks
"""
from backend.tests.smoke.retrieval.fixtures.loader import FixtureLoader
loader = FixtureLoader(use_expanded=True)
documents = loader.load_documents()
queries = loader.load_queries()
source_url_map = loader.load_source_url_map()
all_errors = []
all_warnings = []
# 1. Schema validation for all documents
for doc in documents:
errors = validate_schema(doc)
all_errors.extend([f"[{doc['id']}] {e}" for e in errors])
# 2. Unique ID validation
id_errors = validate_unique_ids(documents, queries)
all_errors.extend(id_errors)
# 3. Referential integrity
ref_errors = validate_references(documents, queries)
all_errors.extend(ref_errors)
# 4. URL validation
for doc in documents:
valid, msg = validate_url(doc.get("source_url", ""))
if not valid:
all_errors.append(f"[{doc['id']}] {msg}")
# 5. Difficulty distribution
dist_warnings = validate_difficulty_distribution(queries)
all_warnings.extend(dist_warnings)
# 6. Coverage analysis
coverage = analyze_coverage_gaps(documents, queries)
all_warnings.extend(coverage["gaps"])
return {
"valid": len(all_errors) == 0,
"errors": all_errors,
"warnings": all_warnings,
"coverage": coverage,
"stats": {
"documents": len(documents),
"queries": len(queries),
"sections": sum(len(d.get("sections", [])) for d in documents),
}
}CLI Integration
Validation Commands
# Validate specific document
poetry run python scripts/data/add_to_golden_dataset.py validate \
--document-id "new-doc-id"
# Validate full dataset
poetry run python scripts/data/add_to_golden_dataset.py validate-all
# Check for duplicates
poetry run python scripts/data/add_to_golden_dataset.py check-duplicate \
--url "https://example.com/article"
# Analyze coverage gaps
poetry run python scripts/data/add_to_golden_dataset.py coveragePre-Commit Hook
#!/bin/bash
# .claude/hooks/pretool/bash/validate-golden-dataset.sh
# Only run if golden dataset files changed
CHANGED_FILES=$(git diff --cached --name-only)
if echo "$CHANGED_FILES" | grep -q "fixtures/documents_expanded.json\|fixtures/queries.json\|fixtures/source_url_map.json"; then
echo "Validating golden dataset changes..."
cd backend
poetry run python scripts/data/add_to_golden_dataset.py validate-all
if [ $? -ne 0 ]; then
echo "Golden dataset validation failed!"
echo "Fix errors before committing."
exit 1
fi
echo "Golden dataset validation passed"
fiSelection Criteria
Selection Criteria
Content classification and difficulty stratification for golden datasets.
Content Type Classification
Supported Types
| Type | Description | Quality Focus |
|---|---|---|
article | Technical articles, blog posts | Depth, accuracy, actionability |
tutorial | Step-by-step guides | Completeness, clarity, code quality |
research_paper | Academic papers, whitepapers | Rigor, citations, methodology |
documentation | API docs, reference materials | Accuracy, completeness, examples |
video_transcript | Transcribed video content | Structure, coherence, key points |
code_repository | README, code analysis | Code quality, documentation |
Classification Criteria
# Content Type Decision Tree
def classify_content_type(content: str, source_url: str) -> str:
"""Classify content type based on structure and source."""
# URL-based hints
if "arxiv.org" in source_url or "papers" in source_url:
return "research_paper"
if "docs." in source_url or "/api/" in source_url:
return "documentation"
if "github.com" in source_url:
return "code_repository"
# Content-based analysis
if has_step_by_step_structure(content):
return "tutorial"
if has_academic_structure(content): # Abstract, methodology, results
return "research_paper"
# Default
return "article"Difficulty Classification
Stratification Levels
| Level | Semantic Complexity | Expected Score | Characteristics |
|---|---|---|---|
| trivial | Direct keyword match | >0.85 | Technical terms, exact phrases |
| easy | Common synonyms | >0.70 | Well-known concepts, slight variations |
| medium | Paraphrased intent | >0.55 | Conceptual queries, multi-topic |
| hard | Multi-hop reasoning | >0.40 | Cross-domain, comparative analysis |
| adversarial | Edge cases | Graceful degradation | Robustness tests, off-domain |
Classification Factors
def classify_difficulty(document: dict) -> str:
"""Classify document difficulty for retrieval testing."""
factors = {
"technical_density": count_technical_terms(document["content"]),
"section_count": len(document.get("sections", [])),
"cross_references": count_cross_references(document),
"abstraction_level": assess_abstraction(document),
"domain_specificity": assess_domain_specificity(document),
}
# Scoring rubric
score = 0
if factors["technical_density"] > 50:
score += 2
if factors["section_count"] > 10:
score += 1
if factors["cross_references"] > 5:
score += 2
if factors["abstraction_level"] == "high":
score += 2
# Map score to difficulty
if score <= 2:
return "trivial"
elif score <= 4:
return "easy"
elif score <= 6:
return "medium"
elif score <= 8:
return "hard"
else:
return "adversarial"Quality Evaluation Dimensions
1. Accuracy (Weight: 0.25)
What it measures: Factual correctness, up-to-date information
Evaluation criteria:
- Technical claims are verifiable
- Code examples are syntactically correct
- No outdated information (check dates, versions)
- Sources/citations where applicable
Thresholds:
- Perfect: 0.95-1.0 (all claims verifiable)
- Acceptable: 0.70-0.94 (minor inaccuracies)
- Failing: <0.70 (significant errors)
2. Coherence (Weight: 0.20)
What it measures: Logical flow, structure, readability
Evaluation criteria:
- Clear introduction and conclusion
- Logical section ordering
- Smooth transitions between topics
- Consistent terminology
Thresholds:
- Perfect: 0.90-1.0 (professional quality)
- Acceptable: 0.60-0.89 (readable but rough)
- Failing: <0.60 (confusing structure)
3. Depth (Weight: 0.25)
What it measures: Thoroughness, detail level, comprehensiveness
Evaluation criteria:
- Covers topic comprehensively
- Includes edge cases and caveats
- Provides context and background
- Appropriate level of detail for audience
Thresholds:
- Perfect: 0.90-1.0 (exhaustive coverage)
- Acceptable: 0.55-0.89 (covers main points)
- Failing: <0.55 (superficial treatment)
4. Relevance (Weight: 0.30)
What it measures: Alignment with OrchestKit's technical domains
Target domains:
- AI/ML (LangGraph, RAG, agents, embeddings)
- Backend (FastAPI, PostgreSQL, APIs)
- Frontend (React, TypeScript)
- DevOps (Docker, Kubernetes, CI/CD)
- Security (OWASP, authentication)
Thresholds:
- Perfect: 0.95-1.0 (core domain, highly relevant)
- Acceptable: 0.70-0.94 (related domain)
- Failing: <0.70 (off-topic for OrchestKit)
Best Practices
Quality Thresholds
# Recommended thresholds for golden dataset inclusion
minimum_quality_score: 0.70
minimum_confidence: 0.65
required_tags: 2 # At least 2 domain tags
required_queries: 3 # At least 3 test queriesCoverage Balance
Maintain balanced coverage across:
- Content types (don't over-index on articles)
- Difficulty levels (need trivial AND hard)
- Domains (spread across AI/ML, backend, frontend, etc.)
Duplicate Prevention
Before adding:
- Check URL against existing
source_url_map.json - Run semantic similarity against existing document embeddings
- Warn if >80% similar to existing document
Provenance Tracking
Always record:
- Source URL (canonical)
- Curation date
- Agent scores (for audit trail)
- Langfuse trace ID
Storage Patterns
Storage Patterns
Backup strategies and storage formats for golden datasets.
Backup Strategies
Strategy 1: JSON Backup (Recommended)
Pros:
- Version controlled (commit to git)
- Human-readable (easy to inspect)
- Portable (works across DB versions)
- Incremental diffs (see what changed)
Cons:
- Must regenerate embeddings on restore
- Larger file size than SQL dump
OrchestKit uses JSON backup.
Strategy 2: SQL Dump
Pros:
- Fast restore (includes embeddings)
- Exact replica (binary-identical)
- Native PostgreSQL format
Cons:
- Not version controlled (binary format)
- DB version dependent
- No easy inspection
Use case: Local snapshots, not version control.
Backup Format
{
"version": "1.0",
"created_at": "2025-12-19T10:30:00Z",
"metadata": {
"total_analyses": 98,
"total_chunks": 415,
"total_artifacts": 98
},
"analyses": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"url": "https://docs.python.org/3/library/asyncio.html",
"content_type": "documentation",
"status": "completed",
"created_at": "2025-11-15T08:20:00Z",
"findings": [
{
"agent": "security_agent",
"category": "best_practices",
"content": "Always use asyncio.run() for top-level entry point",
"confidence": 0.92
}
],
"chunks": [
{
"id": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"content": "asyncio is a library to write concurrent code...",
"section_title": "Introduction to asyncio",
"section_path": "docs/python/asyncio/intro.md",
"content_type": "paragraph",
"chunk_index": 0
// Note: embedding NOT included (regenerated on restore)
}
],
"artifact": {
"id": "a1b2c3d4-e5f6-4a5b-8c7d-9e8f7a6b5c4d",
"summary": "Comprehensive guide to asyncio...",
"key_findings": ["..."],
"metadata": {}
}
}
]
}Key Design Decisions:
- Embeddings excluded (regenerate on restore with current model)
- Nested structure (analyses -> chunks -> artifacts)
- Metadata for validation
- ISO timestamps for reproducibility
Backup Implementation
# backend/scripts/backup_golden_dataset.py
import asyncio
import json
from datetime import datetime, UTC
from pathlib import Path
from sqlalchemy import select
from app.db.session import get_session
from app.db.models import Analysis, Chunk, Artifact
BACKUP_DIR = Path("backend/data")
BACKUP_FILE = BACKUP_DIR / "golden_dataset_backup.json"
METADATA_FILE = BACKUP_DIR / "golden_dataset_metadata.json"
async def backup_golden_dataset():
"""Backup golden dataset to JSON."""
async with get_session() as session:
# Fetch all completed analyses
query = (
select(Analysis)
.where(Analysis.status == "completed")
.order_by(Analysis.created_at)
)
result = await session.execute(query)
analyses = result.scalars().all()
# Serialize to JSON
backup_data = {
"version": "1.0",
"created_at": datetime.now(UTC).isoformat(),
"metadata": {
"total_analyses": len(analyses),
"total_chunks": sum(len(a.chunks) for a in analyses),
"total_artifacts": len([a for a in analyses if a.artifact])
},
"analyses": [
serialize_analysis(a) for a in analyses
]
}
# Write backup file
BACKUP_DIR.mkdir(exist_ok=True)
with open(BACKUP_FILE, "w") as f:
json.dump(backup_data, f, indent=2, default=str)
# Write metadata file (quick stats)
with open(METADATA_FILE, "w") as f:
json.dump(backup_data["metadata"], f, indent=2)
print(f"Backup completed: {BACKUP_FILE}")
print(f" Analyses: {backup_data['metadata']['total_analyses']}")
print(f" Chunks: {backup_data['metadata']['total_chunks']}")
def serialize_analysis(analysis: Analysis) -> dict:
"""Serialize analysis to dict."""
return {
"id": str(analysis.id),
"url": analysis.url,
"content_type": analysis.content_type,
"status": analysis.status,
"created_at": analysis.created_at.isoformat(),
"findings": [serialize_finding(f) for f in analysis.findings],
"chunks": [serialize_chunk(c) for c in analysis.chunks],
"artifact": serialize_artifact(analysis.artifact) if analysis.artifact else None
}
def serialize_chunk(chunk: Chunk) -> dict:
"""Serialize chunk (WITHOUT embedding)."""
return {
"id": str(chunk.id),
"content": chunk.content,
"section_title": chunk.section_title,
"section_path": chunk.section_path,
"content_type": chunk.content_type,
"chunk_index": chunk.chunk_index
# embedding excluded (regenerate on restore)
}CLI Usage
cd backend
# Backup golden dataset
poetry run python scripts/backup_golden_dataset.py backup
# Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify
# Restore from backup (WARNING: Deletes existing data)
poetry run python scripts/backup_golden_dataset.py restore --replace
# Restore without deleting (adds to existing)
poetry run python scripts/backup_golden_dataset.py restoreCI/CD Integration
Automated Backups
# .github/workflows/backup-golden-dataset.yml
name: Backup Golden Dataset
on:
schedule:
- cron: '0 2 * * 0' # Weekly on Sunday at 2am
workflow_dispatch: # Manual trigger
jobs:
backup:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
cd backend
poetry install
- name: Run backup
env:
DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py backup
- name: Commit backup
run: |
git config user.name "GitHub Actions"
git config user.email "actions@github.com"
git add backend/data/golden_dataset_backup.json
git add backend/data/golden_dataset_metadata.json
git commit -m "chore: automated golden dataset backup"
git pushValidation Contracts
Data Validation & Contracts
The URL Contract
Rule: Golden dataset analyses MUST store real canonical URLs, not placeholders.
Why This Matters
# WRONG - Placeholder URL
analysis.url = "https://orchestkit.dev/placeholder/doc-123"
# Problems:
# 1. Can't re-fetch content if needed
# 2. Can't verify source hasn't changed
# 3. No audit trail for data provenance
# 4. Breaks restore on different domains
# CORRECT - Real canonical URL
analysis.url = "https://docs.python.org/3/library/asyncio.html"
# Benefits:
# 1. Can re-fetch if embeddings model changes
# 2. Can validate content hasn't been updated
# 3. Clear data provenance
# 4. Works across environmentsValidation Check
async def check_url_contract() -> list[str]:
"""Find analyses with placeholder URLs."""
async with get_session() as session:
query = select(Analysis).where(
Analysis.url.like("%orchestkit.dev%") |
Analysis.url.like("%placeholder%") |
Analysis.url.like("%example.com%") |
Analysis.url.like("%test.local%")
)
result = await session.execute(query)
invalid = result.scalars().all()
if invalid:
print(f"❌ Found {len(invalid)} analyses with placeholder URLs:")
for analysis in invalid:
print(f" - {analysis.id}: {analysis.url}")
return [str(a.id) for a in invalid]
print("✅ URL contract validated: All URLs are canonical")
return []Data Integrity Checks
1. Count Validation
async def validate_counts(expected_metadata: dict) -> dict:
"""Verify counts match expected values."""
async with get_session() as session:
actual = {
"analyses": await session.scalar(select(func.count(Analysis.id))),
"chunks": await session.scalar(select(func.count(Chunk.id))),
"artifacts": await session.scalar(select(func.count(Artifact.id)))
}
expected = {
"analyses": expected_metadata["total_analyses"],
"chunks": expected_metadata["total_chunks"],
"artifacts": expected_metadata["total_artifacts"]
}
errors = []
for key in ["analyses", "chunks", "artifacts"]:
if actual[key] != expected[key]:
errors.append(f"{key}: expected {expected[key]}, got {actual[key]}")
return {
"valid": len(errors) == 0,
"errors": errors,
"actual": actual,
"expected": expected
}2. Embedding Validation
async def validate_embeddings() -> dict:
"""Check all chunks have embeddings."""
async with get_session() as session:
# Find chunks without embeddings
query = select(Chunk).where(Chunk.embedding.is_(None))
result = await session.execute(query)
missing = result.scalars().all()
if missing:
return {
"valid": False,
"error": f"Found {len(missing)} chunks without embeddings",
"chunk_ids": [str(c.id) for c in missing]
}
# Check embedding dimensions
query = select(Chunk).limit(1)
result = await session.execute(query)
sample = result.scalar_one()
if len(sample.embedding) != 1024:
return {
"valid": False,
"error": f"Invalid embedding dimensions: {len(sample.embedding)} (expected 1024)"
}
return {"valid": True, "message": "All chunks have valid embeddings"}3. Orphaned Data Check
async def check_orphaned_data() -> dict:
"""Find orphaned chunks (no parent analysis)."""
async with get_session() as session:
# Find chunks without parent analysis
query = (
select(Chunk)
.outerjoin(Analysis, Chunk.analysis_id == Analysis.id)
.where(Analysis.id.is_(None))
)
result = await session.execute(query)
orphaned = result.scalars().all()
if orphaned:
return {
"valid": False,
"warning": f"Found {len(orphaned)} orphaned chunks",
"chunk_ids": [str(c.id) for c in orphaned]
}
return {"valid": True, "message": "No orphaned data found"}4. Duplicate Check
async def check_duplicates() -> dict:
"""Find duplicate analyses (same URL)."""
async with get_session() as session:
# Find URLs that appear more than once
query = (
select(Analysis.url, func.count(Analysis.id).label("count"))
.group_by(Analysis.url)
.having(func.count(Analysis.id) > 1)
)
result = await session.execute(query)
duplicates = result.all()
if duplicates:
return {
"valid": False,
"warning": f"Found {len(duplicates)} duplicate URLs",
"urls": [(url, count) for url, count in duplicates]
}
return {"valid": True, "message": "No duplicates found"}Comprehensive Validation
async def verify_golden_dataset() -> dict:
"""Run all validation checks."""
print("🔍 Validating golden dataset...")
# Load expected metadata
with open(METADATA_FILE) as f:
expected_metadata = json.load(f)
results = {
"timestamp": datetime.now(UTC).isoformat(),
"checks": {}
}
# 1. URL Contract
print("\n1. Checking URL contract...")
invalid_urls = await check_url_contract()
results["checks"]["url_contract"] = {
"passed": len(invalid_urls) == 0,
"invalid_count": len(invalid_urls),
"invalid_ids": invalid_urls
}
# 2. Count Validation
print("\n2. Validating counts...")
count_result = await validate_counts(expected_metadata)
results["checks"]["counts"] = count_result
# 3. Embedding Validation
print("\n3. Validating embeddings...")
embedding_result = await validate_embeddings()
results["checks"]["embeddings"] = embedding_result
# 4. Orphaned Data
print("\n4. Checking for orphaned data...")
orphan_result = await check_orphaned_data()
results["checks"]["orphaned_data"] = orphan_result
# 5. Duplicates
print("\n5. Checking for duplicates...")
duplicate_result = await check_duplicates()
results["checks"]["duplicates"] = duplicate_result
# Overall result
all_passed = all(
check.get("valid") or check.get("passed")
for check in results["checks"].values()
)
results["overall"] = {
"passed": all_passed,
"total_checks": len(results["checks"]),
"passed_checks": sum(
1 for check in results["checks"].values()
if check.get("valid") or check.get("passed")
)
}
# Print summary
print("\n" + "="*50)
if all_passed:
print("✅ All validation checks passed")
else:
print("❌ Validation failed")
for name, check in results["checks"].items():
if not (check.get("valid") or check.get("passed")):
print(f" - {name}: {check.get('error') or check.get('warning')}")
return resultsPre-Deployment Checklist
# Run before deploying to production
cd backend
# 1. Backup current data
poetry run python scripts/backup_golden_dataset.py backup
# 2. Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify
# 3. Run retrieval quality tests
poetry run pytest tests/integration/test_retrieval_quality.py
# 4. Check for regressions
# Expected: 91.6% pass rate, 0.777 MRR
# If lower, investigate before deployingAutomated Validation (CI)
# .github/workflows/validate-golden-dataset.yml
name: Validate Golden Dataset
on:
pull_request:
paths:
- 'backend/data/golden_dataset_backup.json'
schedule:
- cron: '0 8 * * 1' # Weekly on Monday 8am
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
cd backend
poetry install
- name: Start PostgreSQL
run: docker compose up -d postgres
- name: Run migrations
run: |
cd backend
poetry run alembic upgrade head
- name: Restore golden dataset
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py restore
- name: Validate dataset
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py verify
- name: Run retrieval tests
run: |
cd backend
poetry run pytest tests/integration/test_retrieval_quality.py -vReferences
- OrchestKit:
backend/scripts/backup_golden_dataset.py - OrchestKit:
backend/tests/integration/test_retrieval_quality.py
Validation Rules
Validation Rules
Detailed validation rules for golden dataset integrity.
Rule 1: No Placeholder URLs
FORBIDDEN_URL_PATTERNS = [
"orchestkit.dev",
"placeholder",
"example.com",
"localhost",
"127.0.0.1",
]
def validate_url(url: str) -> tuple[bool, str]:
"""Validate URL is not a placeholder."""
for pattern in FORBIDDEN_URL_PATTERNS:
if pattern in url.lower():
return False, f"URL contains forbidden pattern: {pattern}"
# Must be HTTPS (except for specific cases)
if not url.startswith("https://"):
if not url.startswith("http://arxiv.org"): # arXiv redirects
return False, "URL must use HTTPS"
return True, "OK"Rule 2: Unique Identifiers
def validate_unique_ids(documents: list[dict], queries: list[dict]) -> list[str]:
"""Ensure all IDs are unique across documents and queries."""
errors = []
# Document IDs
doc_ids = [d["id"] for d in documents]
if len(doc_ids) != len(set(doc_ids)):
duplicates = [id for id in doc_ids if doc_ids.count(id) > 1]
errors.append(f"Duplicate document IDs: {set(duplicates)}")
# Query IDs
query_ids = [q["id"] for q in queries]
if len(query_ids) != len(set(query_ids)):
duplicates = [id for id in query_ids if query_ids.count(id) > 1]
errors.append(f"Duplicate query IDs: {set(duplicates)}")
# Section IDs within documents
for doc in documents:
section_ids = [s["id"] for s in doc.get("sections", [])]
if len(section_ids) != len(set(section_ids)):
errors.append(f"Duplicate section IDs in document: {doc['id']}")
return errorsRule 3: Referential Integrity
def validate_references(documents: list[dict], queries: list[dict]) -> list[str]:
"""Ensure query expected_chunks reference valid section IDs."""
errors = []
# Build set of all valid section IDs
valid_sections = set()
for doc in documents:
for section in doc.get("sections", []):
valid_sections.add(section["id"])
# Check query references
for query in queries:
for chunk_id in query.get("expected_chunks", []):
if chunk_id not in valid_sections:
errors.append(
f"Query {query['id']} references invalid section: {chunk_id}"
)
return errorsRule 4: Content Quality
def validate_content_quality(document: dict) -> list[str]:
"""Validate document content meets quality standards."""
warnings = []
# Title length
title = document.get("title", "")
if len(title) < 10:
warnings.append("Title too short (min 10 chars)")
if len(title) > 200:
warnings.append("Title too long (max 200 chars)")
# Section content
for section in document.get("sections", []):
content = section.get("content", "")
if len(content) < 50:
warnings.append(f"Section {section['id']} content too short (min 50 chars)")
if len(content) > 50000:
warnings.append(f"Section {section['id']} content very long (>50k chars)")
# Tags
tags = document.get("tags", [])
if len(tags) < 2:
warnings.append("Too few tags (min 2)")
if len(tags) > 10:
warnings.append("Too many tags (max 10)")
return warningsRule 5: Difficulty Distribution
def validate_difficulty_distribution(queries: list[dict]) -> list[str]:
"""Ensure balanced difficulty distribution."""
warnings = []
# Count by difficulty
distribution = {}
for query in queries:
diff = query.get("difficulty", "unknown")
distribution[diff] = distribution.get(diff, 0) + 1
# Minimum requirements
requirements = {
"trivial": 3,
"easy": 3,
"medium": 5, # Most common real-world case
"hard": 3,
}
for level, min_count in requirements.items():
actual = distribution.get(level, 0)
if actual < min_count:
warnings.append(
f"Insufficient {level} queries: {actual}/{min_count}"
)
return warningsDuplicate Detection
Semantic Similarity Check
import numpy as np
from typing import Optional
async def check_duplicate(
new_content: str,
existing_embeddings: list[tuple[str, np.ndarray]],
embedding_service,
threshold: float = 0.85,
) -> Optional[tuple[str, float]]:
"""Check if content is duplicate of existing document.
Args:
new_content: Content to check
existing_embeddings: List of (doc_id, embedding) tuples
embedding_service: Service to generate embeddings
threshold: Similarity threshold for duplicate warning
Returns:
(doc_id, similarity) if duplicate found, None otherwise
"""
# Generate embedding for new content
new_embedding = await embedding_service.generate_embedding(
text=new_content[:8000], # Truncate for embedding
normalize=True,
)
new_vec = np.array(new_embedding)
# Compare against existing
max_similarity = 0.0
most_similar_doc = None
for doc_id, existing_vec in existing_embeddings:
# Cosine similarity (vectors are normalized)
similarity = np.dot(new_vec, existing_vec)
if similarity > max_similarity:
max_similarity = similarity
most_similar_doc = doc_id
if max_similarity >= threshold:
return (most_similar_doc, max_similarity)
return NoneURL Duplicate Check
def check_url_duplicate(
new_url: str,
source_url_map: dict[str, str],
) -> Optional[str]:
"""Check if URL already exists in dataset.
Returns document ID if duplicate found.
"""
# Normalize URL
normalized = normalize_url(new_url)
for doc_id, existing_url in source_url_map.items():
if normalize_url(existing_url) == normalized:
return doc_id
return None
def normalize_url(url: str) -> str:
"""Normalize URL for comparison."""
from urllib.parse import urlparse, urlunparse
parsed = urlparse(url.lower())
# Remove trailing slashes, www prefix
netloc = parsed.netloc.replace("www.", "")
path = parsed.path.rstrip("/")
return urlunparse((
parsed.scheme,
netloc,
path,
"", # params
"", # query (stripped)
"", # fragment
))Versioning
Versioning and Recovery
Restore procedures, validation, and disaster recovery patterns.
Restore Implementation
Process Overview
- Load JSON backup
- Validate structure (version, required fields)
- Create analyses (without embeddings yet)
- Create chunks (without embeddings yet)
- Generate embeddings (using current embedding model)
- Create artifacts
- Verify integrity (counts, URL contract)
Regenerating Embeddings
async def restore_golden_dataset(replace: bool = False):
"""Restore golden dataset from JSON backup."""
# Load backup
with open(BACKUP_FILE) as f:
backup_data = json.load(f)
async with get_session() as session:
if replace:
# Delete existing data
await session.execute(delete(Chunk))
await session.execute(delete(Artifact))
await session.execute(delete(Analysis))
await session.commit()
# Restore analyses and chunks
from app.shared.services.embeddings import embed_text
for analysis_data in backup_data["analyses"]:
# Create analysis
analysis = Analysis(
id=UUID(analysis_data["id"]),
url=analysis_data["url"],
# ... other fields ...
)
session.add(analysis)
# Create chunks with regenerated embeddings
for chunk_data in analysis_data["chunks"]:
# Regenerate embedding using CURRENT model
embedding = await embed_text(chunk_data["content"])
chunk = Chunk(
id=UUID(chunk_data["id"]),
analysis_id=analysis.id,
content=chunk_data["content"],
embedding=embedding, # Freshly generated!
# ... other fields ...
)
session.add(chunk)
await session.commit()
print("Restore completed")Why regenerate embeddings?
- Embedding models improve over time
- Ensures consistency with current model
- Smaller backup files (exclude large vectors)
Validation
Validation Checklist
async def verify_golden_dataset() -> dict:
"""Verify golden dataset integrity."""
errors = []
warnings = []
async with get_session() as session:
# 1. Check counts
analysis_count = await session.scalar(select(func.count(Analysis.id)))
chunk_count = await session.scalar(select(func.count(Chunk.id)))
artifact_count = await session.scalar(select(func.count(Artifact.id)))
expected = load_metadata()
if analysis_count != expected["total_analyses"]:
errors.append(f"Analysis count mismatch: {analysis_count} vs {expected['total_analyses']}")
# 2. Check URL contract
query = select(Analysis).where(
Analysis.url.like("%orchestkit.dev%") |
Analysis.url.like("%placeholder%")
)
result = await session.execute(query)
invalid_urls = result.scalars().all()
if invalid_urls:
errors.append(f"Found {len(invalid_urls)} analyses with placeholder URLs")
# 3. Check embeddings exist
query = select(Chunk).where(Chunk.embedding.is_(None))
result = await session.execute(query)
missing_embeddings = result.scalars().all()
if missing_embeddings:
errors.append(f"Found {len(missing_embeddings)} chunks without embeddings")
# 4. Check orphaned chunks
query = select(Chunk).outerjoin(Analysis).where(Analysis.id.is_(None))
result = await session.execute(query)
orphaned = result.scalars().all()
if orphaned:
warnings.append(f"Found {len(orphaned)} orphaned chunks")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"stats": {
"analyses": analysis_count,
"chunks": chunk_count,
"artifacts": artifact_count
}
}Best Practices
1. Version Control Backups
# Commit backups to git
git add backend/data/golden_dataset_backup.json
git commit -m "chore: golden dataset backup (98 analyses, 415 chunks)"2. Validate Before Deployment
# Pre-deployment check
poetry run python scripts/backup_golden_dataset.py verify
# Should output:
# Validation passed
# Analyses: 98
# Chunks: 415
# Artifacts: 98
# No errors found3. Test Restore in Staging
# Never test restore in production first!
# Staging environment
export DATABASE_URL=$STAGING_DATABASE_URL
poetry run python scripts/backup_golden_dataset.py restore --replace
# Run tests to verify
poetry run pytest tests/integration/test_retrieval_quality.py4. Document Changes
// backend/data/golden_dataset_metadata.json
{
"total_analyses": 98,
"total_chunks": 415,
"last_updated": "2025-12-19T10:30:00Z",
"changes": [
{
"date": "2025-12-19",
"action": "added",
"count": 5,
"description": "Added 5 new LangGraph tutorial analyses"
},
{
"date": "2025-12-10",
"action": "removed",
"count": 2,
"description": "Removed 2 outdated React 17 analyses"
}
]
}Disaster Recovery
Scenario 1: Accidental Deletion
# Oh no! Someone ran DELETE FROM analyses WHERE 1=1
# 1. Restore from backup
poetry run python scripts/backup_golden_dataset.py restore --replace
# 2. Verify
poetry run python scripts/backup_golden_dataset.py verify
# 3. Run tests
poetry run pytest tests/integration/test_retrieval_quality.pyScenario 2: Database Migration Gone Wrong
# Migration corrupted data
# 1. Rollback migration
alembic downgrade -1
# 2. Restore from backup
poetry run python scripts/backup_golden_dataset.py restore --replace
# 3. Re-run migration (fixed)
alembic upgrade headScenario 3: New Environment Setup
# Fresh dev environment, need golden dataset
# 1. Clone repo (includes backup)
git clone https://github.com/your-org/orchestkit
cd orchestkit/backend
# 2. Setup DB
docker compose up -d postgres
alembic upgrade head
# 3. Restore golden dataset
poetry run python scripts/backup_golden_dataset.py restore
# 4. Verify
poetry run pytest tests/integration/test_retrieval_quality.pyData Integrity Contracts
The URL Contract
Golden dataset analyses MUST store real canonical URLs, not placeholders.
# WRONG - Placeholder URL (breaks restore)
analysis.url = "https://orchestkit.dev/placeholder/123"
# CORRECT - Real canonical URL (enables re-fetch if needed)
analysis.url = "https://docs.python.org/3/library/asyncio.html"Why this matters:
- Enables re-fetching content if embeddings need regeneration
- Allows validation that source content hasn't changed
- Provides audit trail for data provenance
Verification:
# Check for placeholder URLs
def verify_url_contract(analyses: list[Analysis]) -> list[str]:
"""Find analyses with placeholder URLs."""
invalid = []
for analysis in analyses:
if "orchestkit.dev" in analysis.url or "placeholder" in analysis.url:
invalid.append(analysis.id)
return invalidChecklists (1)
Backup Restore Checklist
Golden Dataset Backup/Restore Checklist
Use this checklist to ensure safe, reliable backup and restoration of golden datasets
Pre-Backup Checklist
Environment Verification
-
Database connection verified
psql -h localhost -p 5437 -U orchestkit -c "SELECT version();" # Expected: PostgreSQL 16.x -
Database contains expected data
psql -h localhost -p 5437 -U orchestkit -c \ "SELECT COUNT(*) FROM analyses WHERE status = 'completed';" # Expected: 98 (or current golden dataset size) -
Embeddings generated for all chunks
psql -h localhost -p 5437 -U orchestkit -c \ "SELECT COUNT(*) FROM analysis_chunks WHERE vector IS NULL;" # Expected: 0 (no chunks without embeddings)
Data Quality Validation
-
URL contract verified (no placeholder URLs)
psql -h localhost -p 5437 -U orchestkit -c \ "SELECT COUNT(*) FROM analyses WHERE url LIKE '%orchestkit.dev%';" # Expected: 0 (no placeholder URLs) -
All analyses have artifacts
psql -h localhost -p 5437 -U orchestkit -c \ "SELECT COUNT(*) FROM analyses a LEFT JOIN artifacts ar ON a.id = ar.analysis_id WHERE ar.id IS NULL AND a.status = 'completed';" # Expected: 0 (all completed analyses have artifacts) -
No orphaned chunks
psql -h localhost -p 5437 -U orchestkit -c \ "SELECT COUNT(*) FROM analysis_chunks c LEFT JOIN analyses a ON c.analysis_id = a.id WHERE a.id IS NULL;" # Expected: 0 (all chunks belong to an analysis)
Script Availability
-
Backup script exists
ls -lh /Users/yonatangross/coding/OrchestKit/backend/scripts/backup_golden_dataset.py # Expected: File exists -
Dependencies installed
cd /Users/yonatangross/coding/OrchestKit/backend poetry install # Expected: All dependencies installed -
Data directory exists
mkdir -p /Users/yonatangross/coding/OrchestKit/backend/data ls -ld /Users/yonatangross/coding/OrchestKit/backend/data # Expected: Directory exists and is writable
Backup Execution Checklist
Run Backup
-
Execute backup command
cd /Users/yonatangross/coding/OrchestKit/backend poetry run python scripts/backup_golden_dataset.py backup -
Verify backup output shows success
- "BACKUP COMPLETE (v2.0)" message displayed
- Analyses count matches expected (98)
- Artifacts count matches expected (98)
- Chunks count matches expected (415)
- Fixtures count matches expected (98 documents)
-
Check backup file created
ls -lh /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json # Expected: ~2.5 MB file -
Check metadata file created
ls -lh /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_metadata.json # Expected: ~1 KB file
Verify Backup
-
Run verification command
poetry run python scripts/backup_golden_dataset.py verify -
Verify output shows valid backup
- "BACKUP IS VALID" message displayed
- Analyses count correct
- Artifacts count correct
- Chunks count correct
- Fixtures included (documents, URL maps, queries)
- Referential integrity: OK
- All analyses have artifacts: OK
- No placeholder URLs warning
-
Verify backup file is valid JSON
cat /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json | jq '.' # Expected: Valid JSON, no parse errors -
Check backup version
cat /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json | \ jq '.version' # Expected: "2.0"
Commit Backup
-
Stage backup files
git add backend/data/golden_dataset_backup.json git add backend/data/golden_dataset_metadata.json -
Write descriptive commit message
git commit -m "chore: golden dataset backup (98 analyses, 415 chunks) - Backup version: 2.0 (includes fixtures) - Pass rate: 91.6% (186/203 queries) - Changes: [describe any additions/removals]" -
Push to remote
git push origin main
Pre-Restore Checklist
Backup Verification
-
Backup file exists
ls -lh /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json # Expected: File exists -
Backup integrity verified
cd /Users/yonatangross/coding/OrchestKit/backend poetry run python scripts/backup_golden_dataset.py verify # Expected: "BACKUP IS VALID" -
Backup version compatible
cat backend/data/golden_dataset_backup.json | jq '.version' # Expected: "1.0" or "2.0" (script handles both)
Database State Assessment
-
Database accessible
psql -h localhost -p 5437 -U orchestkit -c "SELECT 1;" # Expected: "1" -
Current data count known
psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analyses;" # Note the count for comparison after restore -
Decision made: Add or Replace?
- ADD mode: Keep existing data, add from backup (use
restore) - REPLACE mode: Delete existing data, restore from backup (use
restore --replace)
WARNING: REPLACE mode is DESTRUCTIVE. Use only if:
- Setting up fresh environment
- Recovering from data corruption
- You have confirmed backup is valid
- ADD mode: Keep existing data, add from backup (use
Environment Setup
-
PostgreSQL running
docker compose ps postgres # Expected: State = "running" -
Database migrations applied
cd /Users/yonatangross/coding/OrchestKit/backend poetry run alembic current # Expected: Shows latest migration revision -
OpenAI API key set (for embedding regeneration)
echo $OPENAI_API_KEY # Expected: sk-... (valid API key) # OR check .env file grep OPENAI_API_KEY backend/.env # Expected: OPENAI_API_KEY=sk-... -
Sufficient disk space
df -h /Users/yonatangross/coding/OrchestKit/backend/data # Expected: At least 1 GB free
Restore Execution Checklist
Run Restore
Option A: Add to existing data (non-destructive)
- Execute restore command
cd /Users/yonatangross/coding/OrchestKit/backend poetry run python scripts/backup_golden_dataset.py restore
Option B: Replace existing data (DESTRUCTIVE)
-
CONFIRM backup is valid (run verify again)
poetry run python scripts/backup_golden_dataset.py verify # Expected: "BACKUP IS VALID" -
CONFIRM you want to delete existing data (no turning back)
- Yes, I understand this is destructive
- Yes, I have verified the backup
- Yes, I am ready to proceed
-
Execute restore with --replace flag
poetry run python scripts/backup_golden_dataset.py restore --replace
Monitor Restore Progress
-
Watch for restore stages
- "Loaded backup from: ..." (backup file loaded)
- "Backup version: 2.0" (schema version)
- "Restoring 98 analyses..." (analyses being inserted)
- "Restoring 98 artifacts..." (artifacts being inserted)
- "Restoring 415 chunks (regenerating embeddings)..." (chunks + embeddings)
- "Restored 50/415 chunks" (progress updates)
- "Restored 100/415 chunks"
- "Restored 150/415 chunks"
- ... (continues until 415/415)
-
Check for errors during embedding generation
- No "Failed to generate embedding" warnings
- No OpenAI API errors
- All chunks processed successfully
-
Verify restore completion message
- "RESTORE COMPLETE" displayed
- Analyses: 98
- Artifacts: 98
- Chunks: 415
Post-Restore Verification
-
Check database counts
# Analyses psql -h localhost -p 5437 -U orchestkit -c \ "SELECT COUNT(*) FROM analyses WHERE status = 'completed';" # Expected: 98 # Artifacts psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM artifacts;" # Expected: 98 # Chunks psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analysis_chunks;" # Expected: 415 -
Verify embeddings generated
psql -h localhost -p 5437 -U orchestkit -c \ "SELECT COUNT(*) FROM analysis_chunks WHERE vector IS NULL;" # Expected: 0 (all chunks have embeddings) -
Verify URL contract maintained
psql -h localhost -p 5437 -U orchestkit -c \ "SELECT COUNT(*) FROM analyses WHERE url LIKE '%orchestkit.dev%';" # Expected: 0 (no placeholder URLs) -
Check sample data integrity
# Verify a known document exists psql -h localhost -p 5437 -U orchestkit -c \ "SELECT title FROM analyses WHERE url = 'https://docs.python.org/3/library/asyncio.html';" # Expected: Row returned with title
Validation Testing Checklist
Retrieval Quality Tests
-
Run smoke tests
cd /Users/yonatangross/coding/OrchestKit/backend poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v -
Check pass rate
- Total queries: 203
- Expected pass rate: ~91.6% (186/203 queries)
- Actual pass rate: ____ (fill in from test output)
- Pass rate within acceptable range (±2%)
-
No critical regressions
- If pass rate dropped >5%, investigate:
- Embedding model matches (check model version)
- Hybrid search config unchanged
- Backup file not corrupted
- If pass rate dropped >5%, investigate:
Integration Tests
- Run API integration tests (if backend running)
# Start backend docker compose up -d backend # Wait for startup sleep 5 # Health check curl -f http://localhost:8500/health # Expected: 200 OK # Run integration tests poetry run pytest tests/integration/test_artifact_api.py -v # Expected: All tests pass
Fixture Validation
-
Verify fixture files restored (v2.0 backups only)
ls -lh /Users/yonatangross/coding/OrchestKit/backend/tests/smoke/retrieval/fixtures/ # Expected: # - documents_expanded.json # - source_url_map.json # - queries.json -
Check fixture counts
cat backend/tests/smoke/retrieval/fixtures/documents_expanded.json | \ jq '.documents | length' # Expected: 98 cat backend/tests/smoke/retrieval/fixtures/queries.json | \ jq '.queries | length' # Expected: 203
Rollback Checklist (If Restore Fails)
Immediate Actions
-
Stop all database writes
docker compose stop backend -
Document failure details
- Error message: ______________________
- Failed at stage: ______________________
- Chunks restored before failure: ______________________
Rollback Options
Option 1: Re-run restore (if partial failure)
-
Identify cause of failure (API rate limit, network issue, etc.)
-
Fix issue (increase timeout, add API key, etc.)
-
Re-run restore with --replace
poetry run python scripts/backup_golden_dataset.py restore --replace
Option 2: Restore from SQL dump (if available)
-
Check for SQL dump
ls -lh /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_dump.sql # If exists, use pg_restore -
Drop and recreate database
docker compose down postgres docker compose up -d postgres poetry run alembic upgrade head -
Import SQL dump
psql -h localhost -p 5437 -U orchestkit < \ /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_dump.sql
Option 3: Restore from git history (if committed)
-
Find last good backup commit
git log --oneline -- backend/data/golden_dataset_backup.json -
Checkout previous backup
git checkout HEAD~1 -- backend/data/golden_dataset_backup.json -
Re-run restore
poetry run python scripts/backup_golden_dataset.py restore --replace
Post-Restore Cleanup
Documentation
-
Update CURRENT_STATUS.md (if significant changes)
- Document restore date
- Document restore reason (new env, disaster recovery, etc.)
- Document pass rate after restore
-
Update golden dataset metadata (if expanded)
cat backend/data/golden_dataset_metadata.json # Verify counts are current
Monitoring
-
Monitor retrieval quality (first 24 hours)
# Run tests daily for a week to ensure stability poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v -
Monitor API errors (if production)
- Check logs for embedding errors
- Check logs for search errors
- Check logs for database connection errors
Optional: Create New Backup
- If restore modified data, create new backup
poetry run python scripts/backup_golden_dataset.py backup poetry run python scripts/backup_golden_dataset.py verify git add backend/data/golden_dataset_backup.json git commit -m "chore: golden dataset backup after restore"
Quick Reference
Full Backup Workflow
cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py backup
poetry run python scripts/backup_golden_dataset.py verify
git add data/golden_dataset_backup.json data/golden_dataset_metadata.json
git commit -m "chore: golden dataset backup"
git pushFull Restore Workflow (New Environment)
cd /Users/yonatangross/coding/OrchestKit/backend
docker compose up -d postgres
sleep 5
poetry run alembic upgrade head
poetry run python scripts/backup_golden_dataset.py verify
poetry run python scripts/backup_golden_dataset.py restore
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -vFull Restore Workflow (Replace Existing)
cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py verify
# CONFIRM: I understand this is destructive
poetry run python scripts/backup_golden_dataset.py restore --replace
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -vRemember: Golden datasets are critical infrastructure. Always verify backups, test restores in staging, and document all changes.
Examples (1)
Orchestkit Dataset Workflow
OrchestKit Golden Dataset Workflow
Complete backup/restore/validation workflow for OrchestKit's 98-document golden dataset
Overview
OrchestKit maintains a golden dataset of 98 curated technical documents with embeddings for testing retrieval quality. This dataset is the source of truth for:
- Regression testing (ensure new code doesn't break retrieval)
- Retrieval evaluation (measure precision, recall, MRR)
- Model benchmarking (compare different embedding models)
- Environment seeding (new dev environments, CI/CD)
Key Files:
- Backup Script:
/Users/yonatangross/coding/OrchestKit/backend/scripts/backup_golden_dataset.py - JSON Backup:
/Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json(version controlled) - Metadata:
/Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_metadata.json(quick stats) - Fixtures:
/Users/yonatangross/coding/OrchestKit/backend/tests/smoke/retrieval/fixtures/(source documents, queries)
Dataset Stats
Current (Production):
- 98 Analyses (completed content analyses)
- 415 Chunks (embedded text segments)
- 203 Test Queries (with expected results)
- 91.6% Pass Rate (retrieval quality metric)
Content Mix:
- 76 articles (tutorials, guides, blog posts)
- 19 technical documentation pages
- 3 research papers
Topics Covered:
- RAG (Retrieval-Augmented Generation)
- LangGraph workflows
- Prompt engineering
- API design
- Testing strategies
- Performance optimization
- Security best practices
URL Contract (CRITICAL)
The Rule: Golden dataset analyses MUST store real canonical URLs, not placeholders.
Why this matters:
- Enables re-fetching content if embeddings need regeneration
- Allows validation that source content hasn't changed
- Provides audit trail for data provenance
- Ensures backup/restore actually works
Validation:
cd /Users/yonatangross/coding/OrchestKit/backend
# Check for placeholder URLs (should return 0)
poetry run python scripts/backup_golden_dataset.py verify | grep "placeholder URLs"
# Expected: "0 analyses with placeholder URLs"Invalid URLs (will break restore):
https://docs.orchestkit.dev/placeholder/123https://learn.orchestkit.dev/fake-contenthttps://content.orchestkit.dev/test
Valid URLs:
https://docs.python.org/3/library/asyncio.htmlhttps://blog.langchain.dev/langgraph-multi-agent-workflows/https://python.langchain.com/docs/modules/data_connection/retrievers/
Workflow 1: Backup Golden Dataset
When to run:
- After adding new documents to golden dataset
- Before major database migrations
- Weekly automated backup (via GitHub Actions)
- Before deploying to production
Step 1: Pre-Backup Validation
cd /Users/yonatangross/coding/OrchestKit/backend
# Check database connection
psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analyses WHERE status = 'completed';"
# Expected: 98
# Verify URL contract
psql -h localhost -p 5437 -U orchestkit -c \
"SELECT COUNT(*) FROM analyses WHERE url LIKE '%orchestkit.dev%';"
# Expected: 0 (no placeholder URLs)Step 2: Run Backup
cd /Users/yonatangross/coding/OrchestKit/backend
# Create backup (includes fixtures in v2.0)
poetry run python scripts/backup_golden_dataset.py backup
# Output:
# ============================================================
# BACKUP COMPLETE (v2.0)
# ============================================================
# Analyses: 98
# Artifacts: 98
# Chunks: 415
# Fixtures: 98 documents
# URL Maps: 98 mappings
# Queries: 203 test queries
# Location: /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json
# ============================================================Step 3: Verify Backup
# Run verification
poetry run python scripts/backup_golden_dataset.py verify
# Output:
# ============================================================
# BACKUP VERIFICATION
# ============================================================
# File: /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json
# Created: 2025-12-21T10:30:00Z
# Version: 2.0
#
# Counts:
# Analyses: 98 (expected: 98)
# Artifacts: 98 (expected: 98)
# Chunks: 415 (expected: 415)
#
# Fixtures:
# Documents: 98
# URL Maps: 98
# Queries: 203
#
# Referential Integrity: OK
# All analyses have artifacts: OK
# ============================================================
# BACKUP IS VALID
# ============================================================Step 4: Commit to Git
# Stage backup files
git add backend/data/golden_dataset_backup.json
git add backend/data/golden_dataset_metadata.json
# Commit with descriptive message
git commit -m "chore: golden dataset backup (98 analyses, 415 chunks)
- Backup version: 2.0 (includes fixtures)
- Added 5 new LangGraph tutorial analyses
- Updated 2 outdated React documentation analyses
- Pass rate: 91.6% (186/203 queries)"
# Push to remote
git push origin mainWorkflow 2: Restore Golden Dataset
When to run:
- Setting up new development environment
- Recovering from accidental data deletion
- Seeding CI/CD test database
- Testing migration scripts
Step 1: Pre-Restore Checks
cd /Users/yonatangross/coding/OrchestKit/backend
# Ensure backup exists
ls -lh data/golden_dataset_backup.json
# Expected: ~2.5 MB file
# Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify
# Expected: "BACKUP IS VALID"
# Check database is empty (or ready to replace)
psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analyses;"
# If > 0 and you want to replace, use --replace flagStep 2: Run Restore
Option A: Add to existing data (no deletion)
poetry run python scripts/backup_golden_dataset.py restore
# This will:
# 1. Load backup
# 2. Insert analyses (ON CONFLICT DO NOTHING)
# 3. Insert artifacts (ON CONFLICT DO NOTHING)
# 4. Regenerate embeddings for chunks
# 5. Insert chunks (ON CONFLICT DO NOTHING)Option B: Replace existing data (DESTRUCTIVE)
# WARNING: This deletes ALL existing analyses, artifacts, and chunks
poetry run python scripts/backup_golden_dataset.py restore --replace
# This will:
# 1. DELETE FROM analysis_chunks
# 2. DELETE FROM artifacts
# 3. DELETE FROM analyses
# 4. Restore from backup (with regenerated embeddings)Step 3: Monitor Restore Progress
# Restore output:
# Loaded backup from: /Users/yonatangross/coding/OrchestKit/backend/data/golden_dataset_backup.json
# Backup version: 2.0
# Backup created: 2025-12-19T10:30:00Z
# Restoring 98 analyses...
# Restoring 98 artifacts...
# Restoring 415 chunks (regenerating embeddings)...
# Restored 50/415 chunks
# Restored 100/415 chunks
# Restored 150/415 chunks
# Restored 200/415 chunks
# Restored 250/415 chunks
# Restored 300/415 chunks
# Restored 350/415 chunks
# Restored 400/415 chunks
# Restored 415/415 chunks
#
# ============================================================
# RESTORE COMPLETE
# ============================================================
# Analyses: 98
# Artifacts: 98
# Chunks: 415
# ============================================================Step 4: Verify Restore
# Check counts
psql -h localhost -p 5437 -U orchestkit -c \
"SELECT COUNT(*) FROM analyses WHERE status = 'completed';"
# Expected: 98
psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM artifacts;"
# Expected: 98
psql -h localhost -p 5437 -U orchestkit -c "SELECT COUNT(*) FROM analysis_chunks;"
# Expected: 415
# Check embeddings generated
psql -h localhost -p 5437 -U orchestkit -c \
"SELECT COUNT(*) FROM analysis_chunks WHERE vector IS NULL;"
# Expected: 0 (all chunks should have embeddings)
# Run retrieval quality tests
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v
# Expected output:
# test_query_langchain_agent_memory PASSED
# test_query_rag_chunking_strategies PASSED
# test_query_prompt_engineering_basics PASSED
# ...
# 186 passed, 17 failed (91.6% pass rate)Workflow 3: Expand Golden Dataset
When to run:
- Adding new technical content for better coverage
- Improving retrieval quality for specific topics
- Testing new embedding models
Step 1: Prepare Source Documents
cd /Users/yonatangross/coding/OrchestKit/backend/tests/smoke/retrieval/fixtures
# Edit documents_expanded.json to add new documents
# Example:
{
"version": "2.0",
"generated": "2025-12-21",
"source": "Manual expansion",
"documents": [
{
"id": "langgraph-streaming-guide",
"source_url": "https://blog.langchain.dev/streaming-in-langgraph/",
"content_type": "tutorial",
"title": "Streaming in LangGraph: A Complete Guide",
"content": "...",
"metadata": {
"author": "LangChain Team",
"published_date": "2025-11-15"
}
}
]
}Step 2: Add Test Queries
# Edit queries.json to add test queries for new content
{
"version": "1.1",
"generated": "2025-12-21",
"queries": [
{
"id": "q-langgraph-streaming-1",
"query": "How do I stream outputs in LangGraph?",
"expected_chunks": ["langgraph-streaming-guide-chunk-0"],
"difficulty": "medium",
"category": "implementation"
}
]
}Step 3: Run Fixture Loader
cd /Users/yonatangross/coding/OrchestKit/backend
# Load new fixtures into database
poetry run python tests/smoke/retrieval/load_fixtures.py
# This will:
# 1. Load documents_expanded.json
# 2. Create analyses for each document
# 3. Generate chunks with embeddings
# 4. Create artifacts
# 5. Store in PostgreSQLStep 4: Validate New Data
# Run retrieval quality tests
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v
# Check for new query test
# Expected: test_query_langgraph_streaming_1 PASSED
# Verify new document in database
psql -h localhost -p 5437 -U orchestkit -c \
"SELECT title FROM analyses WHERE url = 'https://blog.langchain.dev/streaming-in-langgraph/';"
# Expected: "Streaming in LangGraph: A Complete Guide"Step 5: Create New Backup
# Backup expanded dataset
poetry run python scripts/backup_golden_dataset.py backup
# Verify backup includes new content
poetry run python scripts/backup_golden_dataset.py verify
# Expected output shows increased counts:
# Analyses: 99 (was 98)
# Chunks: 420 (was 415)
# Queries: 204 (was 203)
# Commit to git
git add backend/data/golden_dataset_backup.json
git add backend/tests/smoke/retrieval/fixtures/documents_expanded.json
git add backend/tests/smoke/retrieval/fixtures/queries.json
git commit -m "feat: expand golden dataset with LangGraph streaming guide
- Added 1 new analysis (LangGraph streaming)
- Added 5 new chunks
- Added 1 new test query
- Total: 99 analyses, 420 chunks, 204 queries"Workflow 4: CI/CD Integration
Automated weekly backup via GitHub Actions
GitHub Actions Workflow
File: /Users/yonatangross/coding/OrchestKit/.github/workflows/backup-golden-dataset.yml
name: Backup Golden Dataset
on:
schedule:
- cron: '0 2 * * 0' # Weekly on Sunday at 2am UTC
workflow_dispatch: # Manual trigger
jobs:
backup:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install Poetry
run: |
curl -sSL https://install.python-poetry.org | python3 -
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Install dependencies
run: |
cd backend
poetry install --no-root
- name: Setup PostgreSQL
run: |
docker run -d \
--name postgres \
-e POSTGRES_USER=orchestkit \
-e POSTGRES_PASSWORD=orchestkit \
-e POSTGRES_DB=orchestkit \
-p 5437:5432 \
pgvector/pgvector:pg16
# Wait for PostgreSQL to be ready
sleep 10
- name: Run migrations
env:
DATABASE_URL: postgresql://orchestkit:orchestkit@localhost:5437/orchestkit
run: |
cd backend
poetry run alembic upgrade head
- name: Restore current backup (to have data to backup)
env:
DATABASE_URL: postgresql://orchestkit:orchestkit@localhost:5437/orchestkit
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py restore
- name: Create fresh backup
env:
DATABASE_URL: postgresql://orchestkit:orchestkit@localhost:5437/orchestkit
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py backup
- name: Verify backup
run: |
cd backend
poetry run python scripts/backup_golden_dataset.py verify
- name: Commit backup
run: |
git config user.name "GitHub Actions"
git config user.email "actions@github.com"
git add backend/data/golden_dataset_backup.json
git add backend/data/golden_dataset_metadata.json
git diff-index --quiet HEAD || git commit -m "chore: automated golden dataset backup [skip ci]"
git pushManual CI Trigger
# Trigger workflow manually
gh workflow run backup-golden-dataset.yml
# Check workflow status
gh run list --workflow=backup-golden-dataset.yml
# View logs
gh run view --logWorkflow 5: Disaster Recovery
Scenario: Accidental DELETE FROM analyses WHERE 1=1
Recovery Steps
# Step 1: Stop all database writes immediately
docker compose stop backend
# Step 2: Verify backup exists
cd /Users/yonatangross/coding/OrchestKit/backend
ls -lh data/golden_dataset_backup.json
# Expected: ~2.5 MB file modified recently
# Step 3: Verify backup integrity
poetry run python scripts/backup_golden_dataset.py verify
# Expected: "BACKUP IS VALID"
# Step 4: Restore from backup
poetry run python scripts/backup_golden_dataset.py restore --replace
# Step 5: Verify restoration
psql -h localhost -p 5437 -U orchestkit -c \
"SELECT COUNT(*) FROM analyses WHERE status = 'completed';"
# Expected: 98
# Step 6: Run integrity tests
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v
# Expected: 186/203 passed (91.6%)
# Step 7: Restart backend
docker compose up -d backend
# Step 8: Smoke test API
curl -f http://localhost:8500/health
# Expected: 200 OKWorkflow 6: New Dev Environment Setup
Scenario: Fresh MacBook, setting up OrchestKit for first time
Setup Steps
# Step 1: Clone repository (includes backup in version control)
git clone https://github.com/your-org/orchestkit.git
cd orchestkit
# Step 2: Setup backend
cd backend
poetry install
# Step 3: Start PostgreSQL
cd ..
docker compose up -d postgres
# Wait for PostgreSQL to be ready
sleep 5
# Step 4: Run migrations
cd backend
poetry run alembic upgrade head
# Step 5: Restore golden dataset
poetry run python scripts/backup_golden_dataset.py restore
# Expected output:
# ============================================================
# RESTORE COMPLETE
# ============================================================
# Analyses: 98
# Artifacts: 98
# Chunks: 415
# ============================================================
# Step 6: Verify with tests
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -v
# Expected: 186/203 passed (91.6%)
# Step 7: Start backend
cd ..
docker compose up -d backend
# Step 8: Verify API
curl -f http://localhost:8500/health
# Expected: 200 OK
# Step 9: Setup frontend
cd frontend
npm install
npm run dev
# Open http://localhost:5173Common Issues & Solutions
Issue 1: Backup verification fails with "placeholder URLs"
Error:
WARNING: 5 analyses still use placeholder URLs
(example: https://docs.orchestkit.dev/placeholder/123)Solution:
# Identify analyses with placeholder URLs
psql -h localhost -p 5437 -U orchestkit -c \
"SELECT id, url FROM analyses WHERE url LIKE '%orchestkit.dev%';"
# Update with real canonical URLs
psql -h localhost -p 5437 -U orchestkit -c \
"UPDATE analyses
SET url = 'https://docs.python.org/3/library/asyncio.html'
WHERE id = '550e8400-e29b-41d4-a716-446655440000';"
# Re-run backup
poetry run python scripts/backup_golden_dataset.py backup
# Verify
poetry run python scripts/backup_golden_dataset.py verify
# Expected: "BACKUP IS VALID" (no placeholder URLs)Issue 2: Restore fails with "Failed to generate embedding"
Error:
WARNING: Failed to generate embedding for chunk 123: OpenAI API errorSolution:
# Check OpenAI API key
echo $OPENAI_API_KEY
# Should be set
# Check .env file
grep OPENAI_API_KEY backend/.env
# Should have: OPENAI_API_KEY=sk-...
# Retry restore
poetry run python scripts/backup_golden_dataset.py restore --replace
# If still failing, check OpenAI quota
curl https://api.openai.com/v1/usage \
-H "Authorization: Bearer $OPENAI_API_KEY"Issue 3: Retrieval quality tests fail after restore
Error:
186 passed, 17 failed (91.6% pass rate)
BUT EXPECTED: 203 passed (100% pass rate)Solution:
# This is EXPECTED! Retrieval quality is not 100%.
# 91.6% is the BASELINE pass rate for OrchestKit golden dataset.
# Check if pass rate DECREASED (regression):
# Before restore: 186/203 (91.6%)
# After restore: 186/203 (91.6%)
# NO REGRESSION - restore successful
# If pass rate dropped significantly (e.g., to 80%):
# 1. Check embedding model matches (should use same model)
# 2. Check hybrid search weights (RRF multiplier, boosts)
# 3. Run backup verification again
poetry run python scripts/backup_golden_dataset.py verifyQuick Reference
Backup
cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py backup
poetry run python scripts/backup_golden_dataset.py verify
git add data/golden_dataset_backup.json
git commit -m "chore: golden dataset backup"Restore (New Environment)
cd /Users/yonatangross/coding/OrchestKit/backend
docker compose up -d postgres
poetry run alembic upgrade head
poetry run python scripts/backup_golden_dataset.py restore
poetry run pytest tests/smoke/retrieval/test_retrieval_quality.py -vRestore (Replace Existing)
cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py restore --replaceVerify Backup Integrity
cd /Users/yonatangross/coding/OrchestKit/backend
poetry run python scripts/backup_golden_dataset.py verifyRemember: The golden dataset is the foundation of retrieval quality testing. Always verify backups, never skip URL validation, and test restore in staging before production.
Github Operations
GitHub CLI operations for issues, PRs, milestones, and Projects v2. Covers gh commands, REST API patterns, and automation scripts. Use when managing GitHub issues, PRs, milestones, or Projects with gh.
Help
OrchestKit skill directory with categorized listings. Use when discovering skills for a task, finding the right workflow, or browsing capabilities.
Last updated on