Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Monitoring Observability

Monitoring and observability patterns for Prometheus metrics, Grafana dashboards, Langfuse LLM tracing, and drift detection. Use when adding logging, metrics, distributed tracing, LLM cost tracking, or quality drift monitoring.

Reference medium

Primary Agent: metrics-architect

Monitoring & Observability

Comprehensive patterns for infrastructure monitoring, LLM observability, and quality drift detection. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Infrastructure Monitoring3CRITICALPrometheus metrics, Grafana dashboards, alerting rules
LLM Observability3HIGHLangfuse tracing, cost tracking, evaluation scoring
Drift Detection3HIGHStatistical drift, quality regression, drift alerting
Silent Failures3HIGHTool skipping, quality degradation, loop/token spike alerting

Total: 12 rules across 4 categories

Quick Start

# Prometheus metrics with RED method
from prometheus_client import Counter, Histogram

http_requests = Counter('http_requests_total', 'Total requests', ['method', 'endpoint', 'status'])
http_duration = Histogram('http_request_duration_seconds', 'Request latency',
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5])
# Langfuse LLM tracing
from langfuse import observe, get_client

@observe()
async def analyze_content(content: str):
    get_client().update_current_trace(
        user_id="user_123", session_id="session_abc",
        tags=["production", "orchestkit"],
    )
    return await llm.generate(content)
# PSI drift detection
import numpy as np

psi_score = calculate_psi(baseline_scores, current_scores)
if psi_score >= 0.25:
    alert("Significant quality drift detected!")

Infrastructure Monitoring

Prometheus metrics, Grafana dashboards, and alerting for application health.

RuleFileKey Pattern
Prometheus Metricsrules/monitoring-prometheus.mdRED method, counters, histograms, cardinality
Grafana Dashboardsrules/monitoring-grafana.mdGolden Signals, SLO/SLI, health checks
Alerting Rulesrules/monitoring-alerting.mdSeverity levels, grouping, escalation, fatigue prevention

LLM Observability

Langfuse-based tracing, cost tracking, and evaluation for LLM applications.

RuleFileKey Pattern
Langfuse Tracesrules/llm-langfuse-traces.md@observe decorator, OTEL spans, agent graphs
Cost Trackingrules/llm-cost-tracking.mdToken usage, spend alerts, Metrics API
Eval Scoringrules/llm-eval-scoring.mdCustom scores, evaluator tracing, quality monitoring

Drift Detection

Statistical and quality drift detection for production LLM systems.

RuleFileKey Pattern
Statistical Driftrules/drift-statistical.mdPSI, KS test, KL divergence, EWMA
Quality Driftrules/drift-quality.mdScore regression, baseline comparison, canary prompts
Drift Alertingrules/drift-alerting.mdDynamic thresholds, correlation, anti-patterns

Silent Failures

Detection and alerting for silent failures in LLM agents.

RuleFileKey Pattern
Tool Skippingrules/silent-tool-skipping.mdExpected vs actual tool calls, Langfuse traces
Quality Degradationrules/silent-degraded-quality.mdHeuristics + LLM-as-judge, z-score baselines
Silent Alertingrules/silent-alerting.mdLoop detection, token spikes, escalation workflow

Key Decisions

DecisionRecommendationRationale
Metric methodologyRED method (Rate, Errors, Duration)Industry standard, covers essential service health
Log formatStructured JSONMachine-parseable, supports log aggregation
TracingOpenTelemetryVendor-neutral, auto-instrumentation, broad ecosystem
LLM observabilityLangfuse (not LangSmith)Open-source, self-hosted, built-in prompt management
LLM tracing API@observe + get_client()OTEL-native, automatic span creation
Drift methodPSI for production, KS for small samplesPSI is stable for large datasets, KS more sensitive
Threshold strategyDynamic (95th percentile) over staticReduces alert fatigue, context-aware
Alert severity4 levels (Critical, High, Medium, Low)Clear escalation paths, appropriate response times

Detailed Documentation

ResourceDescription
references/Logging, metrics, tracing, Langfuse, drift analysis guides
checklists/Implementation checklists for monitoring and Langfuse setup
examples/Real-world monitoring dashboard and trace examples
scripts/Templates: Prometheus, OpenTelemetry, health checks, Langfuse
  • defense-in-depth - Layer 8 observability as part of security architecture
  • devops-deployment - Observability integration with CI/CD and Kubernetes
  • resilience-patterns - Monitoring circuit breakers and failure scenarios
  • llm-evaluation - Evaluation patterns that integrate with Langfuse scoring
  • caching - Caching strategies that reduce costs tracked by Langfuse

Rules (12)

Configure drift alert thresholds and correlation to avoid noise and missed issues — HIGH

Drift Alerting

Dynamic Thresholds

Use the 95th percentile of historical PSI values instead of static thresholds:

import numpy as np

def calculate_dynamic_threshold(historical_psi: list[float], percentile: float = 95) -> float:
    """Calculate dynamic threshold from historical PSI values."""
    return np.percentile(historical_psi, percentile)

# Usage
threshold = calculate_dynamic_threshold(last_30_days_psi)
if current_psi > threshold:
    alert("Drift exceeds dynamic threshold")

Correlation with Performance Metrics

Always correlate drift metrics with actual performance before alerting:

async def correlated_drift_alert(
    psi_score: float,
    psi_threshold: float,
    quality_score: float,
    quality_baseline: float,
):
    """Only alert when drift AND quality drop are both confirmed."""
    drift_detected = psi_score > psi_threshold
    quality_drop = quality_score < quality_baseline * 0.9  # 10% drop

    if drift_detected and quality_drop:
        await trigger_alert(
            severity="high",
            message=f"Distribution drift (PSI={psi_score:.3f}) "
                    f"correlated with quality drop ({quality_score:.2f} < {quality_baseline:.2f})",
        )
        await trigger_evaluation()
    elif drift_detected:
        await log_warning(
            f"Distribution drift detected (PSI={psi_score:.3f}) "
            f"but quality stable ({quality_score:.2f})"
        )

Anti-Patterns

# NEVER use static thresholds without context
if psi > 0.2:  # May cause alert fatigue
    alert()

# NEVER retrain on time schedule alone
schedule.every(7).days.do(retrain)  # Wasteful if no drift

# ALWAYS use dynamic thresholds
threshold = np.percentile(historical_psi, 95)
if psi > threshold:
    alert()

# ALWAYS correlate with performance metrics
if psi > threshold and quality_score < baseline:
    trigger_evaluation()

Alert Priority Rules

ConditionPriorityAction
PSI >= 0.25 AND quality drop > 10%CriticalImmediate investigation
PSI >= 0.25, quality stableMediumMonitor, log warning
PSI 0.1-0.25 AND quality drop > 5%HighInvestigate within 4 hours
PSI 0.1-0.25, quality stableLowReview next sprint
PSI < 0.1NoneContinue monitoring

Drift Alert Pipeline

class DriftAlertPipeline:
    """Complete drift detection and alerting pipeline."""

    def __init__(self, psi_threshold: float = 0.25, quality_threshold: float = 0.7):
        self.psi_threshold = psi_threshold
        self.quality_threshold = quality_threshold
        self.historical_psi = []

    async def run(self, baseline_data, current_data, quality_scores):
        # 1. Calculate drift
        psi = calculate_psi(baseline_data, current_data)
        self.historical_psi.append(psi)

        # 2. Dynamic threshold (use after enough history)
        if len(self.historical_psi) >= 30:
            threshold = np.percentile(self.historical_psi, 95)
        else:
            threshold = self.psi_threshold

        # 3. Check quality correlation
        avg_quality = np.mean(quality_scores) if quality_scores else 1.0

        # 4. Alert based on correlation
        if psi > threshold and avg_quality < self.quality_threshold:
            return {"alert": "critical", "psi": psi, "quality": avg_quality}
        elif psi > threshold:
            return {"alert": "warning", "psi": psi, "quality": avg_quality}

        return {"alert": None, "psi": psi, "quality": avg_quality}

Notification Strategy

SeverityChannelFrequency
Critical (drift + quality drop)PagerDuty + SlackImmediate
Warning (drift, quality stable)SlackDaily digest
Info (moderate drift)LogContinuous

Key Decisions

DecisionRecommendation
Threshold strategyDynamic (95th percentile of historical) over static
Alert priorityPerformance metrics > distribution metrics
CorrelationAlways confirm drift + quality drop before critical alerts
Historical window30+ days for reliable dynamic thresholds
Tool stackLangfuse (traces) + Evidently/Phoenix (drift analysis)

Incorrect — alerting on drift without quality correlation:

def check_drift(psi_score: float):
    if psi_score > 0.2:  # Static threshold, no quality check
        send_alert("CRITICAL: Drift detected")  # False alarms

Correct — correlating drift with quality before alerting:

async def check_drift(psi_score: float, quality_score: float, baseline: float):
    threshold = np.percentile(historical_psi, 95)  # Dynamic
    if psi_score > threshold and quality_score < baseline * 0.9:
        send_alert("CRITICAL: Drift + quality drop")  # High confidence
    elif psi_score > threshold:
        log_warning("Drift detected but quality stable")  # Monitor only

Detect LLM output quality regression before users notice production degradation — HIGH

Quality Drift Detection

Langfuse Score Trend Monitoring

from langfuse import Langfuse
import numpy as np
from datetime import datetime, timedelta

langfuse = Langfuse()

def check_quality_drift(days: int = 7, threshold_drop: float = 0.1):
    """Compare recent quality scores against baseline."""

    current_scores = langfuse.fetch_scores(
        name="quality_overall",
        from_timestamp=datetime.now() - timedelta(days=1)
    )

    baseline_scores = langfuse.fetch_scores(
        name="quality_overall",
        from_timestamp=datetime.now() - timedelta(days=days),
        to_timestamp=datetime.now() - timedelta(days=1)
    )

    current_mean = np.mean([s.value for s in current_scores])
    baseline_mean = np.mean([s.value for s in baseline_scores])

    drift_pct = (baseline_mean - current_mean) / baseline_mean

    if drift_pct > threshold_drop:
        return {"drift": True, "drop_pct": drift_pct}
    return {"drift": False, "drop_pct": drift_pct}

Canary Prompt Monitoring

Track consistency with fixed test inputs to detect behavioral changes:

CANARY_PROMPTS = [
    {"id": "summarize_01", "prompt": "Summarize: The quick brown fox...",
     "expected_keywords": ["fox", "dog", "jump"]},
    {"id": "classify_01", "prompt": "Classify sentiment: Great product!",
     "expected_output": "positive"},
]

async def run_canary_checks():
    """Run fixed test inputs and compare against expected outputs."""
    results = []
    for canary in CANARY_PROMPTS:
        response = await llm.generate(canary["prompt"])

        if "expected_keywords" in canary:
            score = sum(
                1 for kw in canary["expected_keywords"]
                if kw.lower() in response.lower()
            ) / len(canary["expected_keywords"])
        elif "expected_output" in canary:
            score = 1.0 if canary["expected_output"] in response.lower() else 0.0

        results.append({"id": canary["id"], "score": score})

    avg_score = np.mean([r["score"] for r in results])
    return {"canary_score": avg_score, "drift": avg_score < 0.8, "details": results}

Embedding Drift (Centroid Monitoring)

import numpy as np

class CentroidMonitor:
    """Monitor drift via embedding centroid movement."""

    def __init__(self, distance_threshold: float = 0.2):
        self.distance_threshold = distance_threshold
        self.baseline_centroid = None
        self.baseline_std = None

    def set_baseline(self, embeddings: np.ndarray):
        self.baseline_centroid = embeddings.mean(axis=0)
        distances = np.linalg.norm(embeddings - self.baseline_centroid, axis=1)
        self.baseline_std = distances.std()
        return self

    def check_drift(self, embeddings: np.ndarray) -> dict:
        current_centroid = embeddings.mean(axis=0)
        centroid_distance = np.linalg.norm(current_centroid - self.baseline_centroid)
        normalized_distance = centroid_distance / (self.baseline_std + 1e-10)

        distances = np.linalg.norm(embeddings - self.baseline_centroid, axis=1)
        outlier_ratio = (distances > 3 * self.baseline_std).mean()

        return {
            "normalized_distance": float(normalized_distance),
            "outlier_ratio": float(outlier_ratio),
            "drift_detected": normalized_distance > self.distance_threshold,
        }

Cluster-Based Drift Detection

from sklearn.cluster import KMeans

class ClusterDriftDetector:
    """Detect drift by monitoring cluster distributions."""

    def __init__(self, n_clusters=10, psi_threshold=0.25):
        self.n_clusters = n_clusters
        self.psi_threshold = psi_threshold
        self.kmeans = None
        self.baseline_distribution = None

    def fit_baseline(self, embeddings):
        self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        labels = self.kmeans.fit_predict(embeddings)
        self.baseline_distribution = np.bincount(labels, minlength=self.n_clusters) / len(labels)
        return self

    def detect_drift(self, embeddings):
        labels = self.kmeans.predict(embeddings)
        current = np.bincount(labels, minlength=self.n_clusters) / len(labels)
        psi = self._calculate_psi(self.baseline_distribution, current)
        return {"psi": psi, "drift_detected": psi > self.psi_threshold}

Multi-Metric Quality Tracker

class MultiMetricEWMA:
    """Track multiple quality metrics with independent baselines."""

    def __init__(self, metrics: list[str], alpha: float = 0.2):
        self.baselines = {m: EWMABaseline(alpha=alpha) for m in metrics}

    def update(self, metrics: dict) -> dict:
        results = {}
        anomalies = []
        for name, value in metrics.items():
            if name in self.baselines:
                result = self.baselines[name].update(value)
                results[name] = result
                if result["is_anomaly"]:
                    anomalies.append({"metric": name, "z_score": result["z_score"]})
        return {"metrics": results, "anomalies": anomalies}

RAG Retrieval Drift

Monitor drift in RAG retrieval quality by comparing retrieved document overlap against baseline queries. Track coverage ratio (what fraction of expected documents are still returned) to detect index staleness, embedding model changes, or corpus drift.

Key Decisions

DecisionRecommendation
Baseline window7-30 days rolling window
Quality thresholdavg_score > 0.7 for production
Canary frequencyEvery 6 hours minimum
Embedding methodCentroid distance for speed, cluster PSI for depth

Incorrect — comparing quality to fixed threshold:

def check_quality(current_score: float):
    if current_score < 0.7:  # Static threshold, no baseline
        alert("Quality degradation")

Correct — comparing against rolling baseline:

def check_quality(current_scores: list, baseline_scores: list):
    current_mean = np.mean(current_scores)
    baseline_mean = np.mean(baseline_scores)
    drift_pct = (baseline_mean - current_mean) / baseline_mean
    if drift_pct > 0.1:  # 10% drop from baseline
        alert(f"Quality drift: {drift_pct:.1%} drop")

Apply statistical methods to detect distribution shifts in LLM inputs and outputs — HIGH

Statistical Drift Detection

Population Stability Index (PSI)

Recommended for production LLM monitoring.

import numpy as np

def calculate_psi(
    expected: np.ndarray,
    actual: np.ndarray,
    bins: int = 10,
    eps: float = 0.0001
) -> float:
    """
    PSI = SUM((Actual% - Expected%) * ln(Actual% / Expected%))

    Thresholds:
    - PSI < 0.1: No significant drift
    - 0.1 <= PSI < 0.25: Moderate drift, investigate
    - PSI >= 0.25: Significant drift, action needed
    """
    min_val = min(expected.min(), actual.min())
    max_val = max(expected.max(), actual.max())
    bin_edges = np.linspace(min_val, max_val, bins + 1)

    expected_counts, _ = np.histogram(expected, bins=bin_edges)
    actual_counts, _ = np.histogram(actual, bins=bin_edges)

    expected_pct = expected_counts / len(expected) + eps
    actual_pct = actual_counts / len(actual) + eps

    return np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))

PSI Threshold Guidelines

PSI ValueInterpretationAction
< 0.1No significant driftMonitor
0.1 - 0.25Moderate driftInvestigate
>= 0.25Significant driftAlert + Action

Kolmogorov-Smirnov Test

Best for small sample sizes (<1000 observations).

from scipy import stats

def ks_drift_test(expected, actual, significance=0.05):
    statistic, p_value = stats.ks_2samp(expected, actual)
    return {
        "statistic": statistic,
        "p_value": p_value,
        "drift_detected": p_value < significance,
    }

# Warning: KS is too sensitive on large datasets
# Use sampling for >1000 observations
def adjusted_ks_test(expected, actual, sample_size=500):
    if len(expected) > sample_size:
        expected = np.random.choice(expected, sample_size, replace=False)
    if len(actual) > sample_size:
        actual = np.random.choice(actual, sample_size, replace=False)
    return ks_drift_test(expected, actual)

EWMA Dynamic Threshold

class EWMADriftDetector:
    """Exponential Weighted Moving Average for drift detection."""

    def __init__(self, lambda_param: float = 0.2, L: float = 3.0):
        self.lambda_param = lambda_param
        self.L = L
        self.ewma = None

    def update(self, value: float, baseline_mean: float, baseline_std: float) -> dict:
        if self.ewma is None:
            self.ewma = value
        else:
            self.ewma = self.lambda_param * value + (1 - self.lambda_param) * self.ewma

        factor = np.sqrt(self.lambda_param / (2 - self.lambda_param))
        ucl = baseline_mean + self.L * baseline_std * factor
        lcl = baseline_mean - self.L * baseline_std * factor

        return {
            "ewma": self.ewma,
            "ucl": ucl, "lcl": lcl,
            "drift_detected": self.ewma > ucl or self.ewma < lcl
        }

Method Comparison

MethodBest ForSymmetricProsCons
PSIProduction monitoringYesStable, intuitive thresholdsOnly notices large changes
KL DivergenceSensitive analysisNoDetects tail changesUndefined for zero probs
JS DivergenceBalanced comparisonYesBounded [0,1], no div-by-zeroLess sensitive to tails
KS TestSmall samplesYesNon-parametricToo sensitive on large data
WassersteinContinuous dataYesConsiders distribution shapeComputationally expensive

Choosing the Right Method

def select_drift_method(data_type: str, sample_size: int) -> str:
    if data_type == "categorical":
        return "psi" if sample_size > 1000 else "chi_square"
    if data_type == "embeddings":
        return "embedding_centroid_distance"
    if sample_size < 500:
        return "ks_test"
    return "psi"  # Default: stable for production

Combined Drift Score

def combined_drift_score(expected, actual, weights=None):
    weights = weights or {"psi": 0.4, "wasserstein": 0.3, "js": 0.3}

    psi = calculate_psi(expected, actual)
    wasserstein = wasserstein_drift(expected, actual, normalize=True)
    js = js_divergence(expected, actual)

    psi_normalized = min(psi / 0.5, 1.0)
    combined = (
        weights["psi"] * psi_normalized +
        weights["wasserstein"] * wasserstein +
        weights["js"] * js
    )

    return {"combined_score": combined, "drift_detected": combined > 0.15}

EWMA Alpha Selection

Use CaseAlphaBehavior
Stable production0.1Slow adaptation
Active development0.3Moderate
High variability0.1-0.15Very stable
Sudden change detection0.4-0.5Quick response

Incorrect — using KS test on large dataset:

from scipy import stats
statistic, p_value = stats.ks_2samp(baseline, current)  # baseline/current have 10K+ samples
if p_value < 0.05:
    alert("Drift detected")  # Too sensitive, false alarms

Correct — PSI for production monitoring:

psi = calculate_psi(baseline, current, bins=10)
if psi >= 0.25:  # Stable threshold
    alert("Significant drift detected")  # Reliable signal

Track LLM token costs with spend alerts and per-operation cost attribution — HIGH

LLM Cost Tracking

Basic Cost Tracking (Langfuse v3)

from langfuse import observe, get_client

@observe(name="security_audit")
async def run_audit(content: str):
    response = await llm.generate(
        model="claude-sonnet-4-6",
        messages=[{"role": "user", "content": f"Analyze: {content}"}],
    )

    get_client().update_current_observation(
        model="claude-sonnet-4-6",
        usage={
            "input": 1500,
            "output": 1000,
            "unit": "TOKENS",
        },
    )
    # Langfuse auto-calculates: $0.0045 + $0.015 = $0.0195
    return response

Custom Model Pricing

from langfuse import Langfuse

langfuse = Langfuse()

langfuse.create_model(
    model_name="claude-sonnet-4-6",
    match_pattern="claude-sonnet-4.*",
    unit="TOKENS",
    input_price=0.000003,   # $3/MTok
    output_price=0.000015,  # $15/MTok
)

Cost Per Analysis

trace = langfuse.get_trace(trace_id)
total_cost = sum(
    gen.calculated_total_cost or 0
    for gen in trace.observations
    if gen.type == "GENERATION"
)

await analysis_repo.update(
    analysis_id,
    langfuse_trace_id=trace.id,
    total_cost_usd=total_cost,
)

Spend Alerts

Via Langfuse UI

  1. Navigate to Settings -> Alerts
  2. Create alert rule:
    • Metric: Daily cost
    • Threshold: $50/day
    • Channel: Slack / Email / Webhook

Via Metrics API

from langfuse import Langfuse
from datetime import datetime, timedelta

langfuse = Langfuse()

metrics = langfuse.get_metrics(
    metric_name="total_cost",
    from_timestamp=datetime.now() - timedelta(days=1),
    to_timestamp=datetime.now(),
)

daily_cost = metrics.values[0].value if metrics.values else 0

if daily_cost > 50.0:
    await send_alert(
        channel="slack",
        message=f"Daily LLM cost alert: ${daily_cost:.2f} exceeds $50 threshold",
    )

v2 Metrics API Queries

# Total cost over last 7 days
metrics = langfuse.get_metrics(
    metric_name="total_cost",
    from_timestamp=datetime.now() - timedelta(days=7),
    granularity="day",
)

# Token usage by model
token_metrics = langfuse.get_metrics(
    metric_name="total_tokens",
    from_timestamp=datetime.now() - timedelta(days=7),
    group_by="model",
)

Prometheus LLM Cost Metrics

from prometheus_client import Counter, Histogram

llm_tokens_used = Counter(
    'llm_tokens_used_total', 'Total LLM tokens',
    ['model', 'operation', 'token_type']
)
llm_cost_dollars = Counter(
    'llm_cost_dollars_total', 'Total LLM cost in dollars',
    ['model', 'operation']
)
llm_request_duration = Histogram(
    'llm_request_duration_seconds', 'LLM request duration',
    ['model', 'operation'], buckets=[0.5, 1, 2, 5, 10, 20, 30]
)

Monitoring Dashboard SQL Queries

-- Top 10 most expensive traces (last 7 days)
SELECT name, user_id, calculated_total_cost, input_tokens, output_tokens
FROM traces
WHERE timestamp > NOW() - INTERVAL '7 days'
ORDER BY calculated_total_cost DESC LIMIT 10;

-- Average cost by agent type
SELECT metadata->>'agent_type' as agent, COUNT(*) as traces,
  AVG(calculated_total_cost) as avg_cost
FROM traces WHERE metadata->>'agent_type' IS NOT NULL
GROUP BY agent ORDER BY avg_cost DESC;

-- Daily cost trend
SELECT DATE(timestamp) as date, SUM(calculated_total_cost) as daily_cost
FROM traces WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp) ORDER BY date;

Best Practices

  1. Always pass usage data with input/output token counts
  2. Monitor costs daily with spend alerts to catch spikes early
  3. Set threshold alerts for abnormal increases (> 2x daily average)
  4. Track by user_id to identify expensive users
  5. Group by metadata (agent_type, operation) for cost attribution
  6. Use custom pricing for self-hosted models
  7. Use Metrics API for programmatic queries instead of raw SQL

Incorrect — no cost tracking in LLM calls:

@observe()
async def analyze(content: str):
    response = await llm.generate(content)  # No usage tracking
    return response  # Cost visibility lost

Correct — tracking usage for cost attribution:

@observe()
async def analyze(content: str):
    response = await llm.generate(content)
    get_client().update_current_observation(
        model="claude-sonnet-4-6",
        usage={"input": 1500, "output": 1000, "unit": "TOKENS"}
    )
    return response  # Cost auto-calculated in Langfuse

Score LLM evaluations systematically for quality monitoring and regression detection — HIGH

LLM Evaluation Scoring

Basic Scoring (Langfuse v3)

from langfuse import observe, get_client, Langfuse

langfuse = Langfuse()

@observe()
async def analyze_and_score(query: str):
    response = await llm.generate(query)

    # Score within @observe context
    get_client().score_current_trace(
        name="relevance",
        value=0.85,
        comment="Response addresses query but lacks depth",
    )
    return response

# Or score by trace_id directly
langfuse.score(
    trace_id="trace_123",
    name="factuality",
    value=0.92,
    data_type="NUMERIC",
)

Score Types

# Numeric scores (0-1 range)
langfuse.score(trace_id="...", name="relevance", value=0.85, data_type="NUMERIC")

# Categorical scores
langfuse.score(trace_id="...", name="sentiment", value="positive", data_type="CATEGORICAL")

# Boolean scores
langfuse.score(trace_id="...", name="contains_pii", value=0, data_type="BOOLEAN")

Evaluator Execution Tracing

Each evaluator run creates its own inspectable trace:

from langfuse import observe, get_client

@observe(type="evaluator", name="relevance_judge")
async def evaluate_relevance(query: str, response: str):
    score = await llm_judge.evaluate(
        criteria="relevance", query=query, response=response,
    )

    get_client().update_current_observation(
        input={"query": query[:500], "response": response[:500]},
        output={"score": score, "criteria": "relevance"},
        model="claude-sonnet-4-6",
    )
    return score

Result in Langfuse UI:

evaluator:relevance_judge (0.8s, $0.01)
+-- generation: judge_prompt -> score: 0.85
+-- metadata: {criteria: "relevance", model: "claude-sonnet-4-6"}

G-Eval Automated Scoring

from langfuse import observe, get_client

scorer = GEvalScorer()

@observe()
async def analyze_with_scoring(query: str):
    response = await llm.generate(query)

    scores = await scorer.score(
        query=query, response=response,
        criteria=["relevance", "coherence", "depth"],
    )

    for criterion, score in scores.items():
        get_client().score_current_trace(name=criterion, value=score)

    return response

Common Evaluation Metrics

MetricRangeDescription
Relevance0-1Does response address the query?
Coherence0-1Is response logically structured?
Depth0-1Level of detail and analysis
Factuality0-1Accuracy of claims
Completeness0-1All aspects of query covered?
Toxicity0-1Harmful or inappropriate content

Quality Gate Integration

from langfuse import observe, get_client

@observe(name="quality_gate")
async def quality_gate_node(state):
    scores = await run_quality_evaluators(state)

    for criterion, score in scores.items():
        get_client().score_current_trace(name=criterion, value=score)

    avg_score = sum(scores.values()) / len(scores)
    return {"quality_gate_passed": avg_score >= 0.7, "quality_scores": scores}

Score Trend Query

SELECT
    DATE(timestamp) as date,
    AVG(value) FILTER (WHERE name = 'relevance') as avg_relevance,
    AVG(value) FILTER (WHERE name = 'depth') as avg_depth,
    AVG(value) FILTER (WHERE name = 'factuality') as avg_factuality
FROM scores
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp)
ORDER BY date;

Best Practices

  1. Score all production traces for quality monitoring
  2. Use evaluator type (@observe(type="evaluator")) for inspectable judge traces
  3. Use consistent criteria across all evaluations
  4. Automate scoring with G-Eval or similar frameworks
  5. Set quality thresholds (e.g., avg_relevance > 0.7)
  6. Create test datasets for regression testing
  7. Track scores by prompt version to measure improvements
  8. Alert on quality drops (e.g., avg_score < 0.6 for 3 consecutive days)

Incorrect — no quality scoring in production:

@observe()
async def analyze(query: str):
    response = await llm.generate(query)
    return response  # No quality metrics

Correct — automated quality scoring:

@observe()
async def analyze(query: str):
    response = await llm.generate(query)
    scores = await scorer.score(query, response, ["relevance", "depth"])
    for criterion, score in scores.items():
        get_client().score_current_trace(name=criterion, value=score)
    return response

Trace LLM call chains with Langfuse for debugging slow or incorrect responses — HIGH

Langfuse Traces

Basic Tracing with @observe (v3)

from langfuse import observe, get_client

@observe()  # Auto-creates trace on first root span
async def analyze_content(content: str):
    get_client().update_current_observation(
        metadata={"content_length": len(content)}
    )
    return await llm.generate(content)

Nested Spans

from langfuse import observe, get_client

@observe(name="content_analysis")
async def analyze(content: str):
    # Nested span for retrieval
    @observe(name="retrieval")
    async def retrieve_context():
        chunks = await vector_db.search(content)
        get_client().update_current_observation(
            metadata={"chunks_retrieved": len(chunks)}
        )
        return chunks

    # Nested span for generation
    @observe(name="generation")
    async def generate_analysis(context):
        response = await llm.generate(content)
        get_client().update_current_observation(
            model="claude-sonnet-4-6",
            usage={"input_tokens": 1500, "output_tokens": 1000},
        )
        return response

    context = await retrieve_context()
    return await generate_analysis(context)

Result in Langfuse UI:

content_analysis (2.3s, $0.045)
+-- retrieval (0.1s)
|   +-- metadata: {chunks_retrieved: 5}
+-- generation (2.2s, $0.045)
    +-- model: claude-sonnet-4-6
    +-- tokens: 1500 input, 1000 output

Session & User Tracking

from langfuse import observe, get_client

@observe()
async def analysis(content: str):
    get_client().update_current_trace(
        user_id="user_123",
        session_id="session_abc",
        metadata={"content_type": "article", "agent_count": 8},
        tags=["production", "orchestkit"],
    )
    return await run_pipeline(content)

Observation Types for Agent Graphs

Beyond generation and span, v3 adds typed observations:

@observe(type="agent", name="supervisor")
async def supervisor(query: str): ...     # Agent node in graph

@observe(type="tool", name="web_search")
async def search(query: str): ...         # Tool call in graph

@observe(type="retriever", name="vector_search")
async def retrieve(query: str): ...       # Retrieval step

@observe(type="chain", name="prompt_chain")
async def chain(inputs: dict): ...        # Sequential processing

@observe(type="guardrail", name="pii_check")
async def check_pii(text: str): ...       # Safety check

@observe(type="embedding", name="embed")
async def embed(text: str): ...           # Vector generation

@observe(type="evaluator", name="quality_judge")
async def evaluate(output: str): ...      # Inspectable evaluator trace

OpenTelemetry SpanProcessor

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from langfuse.opentelemetry import LangfuseSpanProcessor

langfuse_processor = LangfuseSpanProcessor(
    public_key="pk-...",
    secret_key="sk-...",
    host="https://cloud.langfuse.com",
)

provider = TracerProvider()
provider.add_span_processor(langfuse_processor)

JavaScript/TypeScript Setup

import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseExporter } from "@langfuse/otel";

const sdk = new NodeSDK({
  traceExporter: new LangfuseExporter({
    publicKey: process.env.LANGFUSE_PUBLIC_KEY,
    secretKey: process.env.LANGFUSE_SECRET_KEY,
  }),
});
sdk.start();

Best Practices

  1. Use from langfuse import observe, get_client — NOT from langfuse.decorators
  2. Let @observe() auto-create traces — no explicit langfuse.trace() needed
  3. Name spans descriptively (e.g., "retrieval", "generation")
  4. Use type= parameter for Agent Graph rendering
  5. Add metadata for debugging (chunk counts, model params)
  6. Truncate large inputs/outputs to 500-1000 chars
  7. Tag production vs staging traces for environment filtering

Incorrect — flat trace without nested spans:

@observe()
async def analyze(content: str):
    chunks = await retrieve(content)  # Not traced
    result = await generate(chunks)   # Not traced
    return result  # No visibility into sub-operations

Correct — nested spans for full visibility:

@observe(name="content_analysis")
async def analyze(content: str):
    @observe(name="retrieval")
    async def retrieve_context():
        return await vector_db.search(content)

    @observe(name="generation")
    async def generate_analysis(chunks):
        return await llm.generate(chunks)

    chunks = await retrieve_context()
    return await generate_analysis(chunks)

Define effective alerting rules to prevent alert fatigue and missed incidents — CRITICAL

Alerting Rules

Alert Severity Levels

LevelResponse TimeExamples
Critical (P1)< 15 minService down, data loss
High (P2)< 1 hourMajor feature broken
Medium (P3)< 4 hoursIncreased error rate
Low (P4)Next dayWarnings, deprecations

Key Alerts

AlertConditionSeverity
ServiceDownup == 0 for 1mCritical
HighErrorRate5xx > 5% for 5mCritical
HighLatencyp95 > 2s for 5mHigh
LowCacheHitRate< 70% for 10mMedium

Prometheus Alerting Rules

groups:
  - name: api_alerts
    rules:
      - alert: HighErrorRate
        expr: |
          sum(rate(http_requests_total{status=~"5.."}[5m])) /
          sum(rate(http_requests_total[5m])) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }}"
          runbook_url: "https://wiki.example.com/runbooks/high-error-rate"

      - alert: HighLatency
        expr: histogram_quantile(0.95, http_request_duration_seconds_bucket) > 1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High API latency"

Alert Grouping

route:
  group_by: ['alertname', 'cluster', 'service']
  group_wait: 30s        # Wait 30s to collect similar alerts
  group_interval: 5m     # Send grouped alerts every 5m
  repeat_interval: 4h    # Re-send alert after 4h if still firing

  routes:
  - match:
      severity: critical
    receiver: pagerduty
    continue: true

  - match:
      severity: warning
    receiver: slack

Inhibition Rules

Suppress noisy alerts when root cause is known:

inhibit_rules:
# If ServiceDown is firing, suppress HighErrorRate and HighLatency
- source_match:
    alertname: ServiceDown
  target_match_re:
    alertname: (HighErrorRate|HighLatency)
  equal: ['service']

# If DatabaseDown is firing, suppress all DB-related alerts
- source_match:
    alertname: DatabaseDown
  target_match_re:
    alertname: Database.*
  equal: ['cluster']

Escalation Policies

routes:
- match:
    severity: critical
  receiver: slack
  continue: true
  routes:
  - match:
      severity: critical
    receiver: pagerduty
    group_wait: 15m  # Escalate to PagerDuty after 15 min

Runbook Requirements

Every alert must link to a runbook containing:

  1. What the alert means
  2. Impact on users
  3. Common causes
  4. Investigation steps
  5. Remediation steps
  6. Escalation contacts

Alert Fatigue Prevention

Best Practices:

  1. Alert on symptoms, not causes — "Users cannot login" not "CPU high"
  2. Actionable alerts only — every alert needs a runbook
  3. Reduce noise — use for: 5m to avoid flapping
  4. Group related alerts — do not page for every instance
  5. Test alert rules — validate with amtool alert query

Notification Channels

ChannelUse ForPriority
PagerDutyCritical (on-call)P1-P2
SlackWarnings (team channel)P3
EmailLow priority (daily digest)P4

Incorrect — alert without for clause causes flapping:

- alert: HighErrorRate
  expr: http_errors_total / http_requests_total > 0.05  # Immediate
  labels:
    severity: critical  # Pages on-call for brief spikes

Correct — for clause prevents flapping:

- alert: HighErrorRate
  expr: http_errors_total / http_requests_total > 0.05
  for: 5m  # Must sustain 5min before firing
  labels:
    severity: critical

Design Grafana dashboards for actionable incident response and capacity planning — CRITICAL

Grafana Dashboards

The Four Golden Signals

SignalMetricDescription
LatencyResponse timeHow long requests take
TrafficRequests/secVolume of demand
ErrorsError rateFailures per second
SaturationResource usageHow full the service is

Dashboard Layout (Top Row)

+--------------+--------------+--------------+--------------+
|  Latency     |  Traffic     |  Errors      |  Saturation  |
|  (p50/p95)   |  (req/s)     |  (5xx rate)  |  (CPU/mem)   |
+--------------+--------------+--------------+--------------+

Service Dashboard Structure

  1. Overview (single row) — Traffic, errors, latency, saturation
  2. Request breakdown — By endpoint, method, status code
  3. Dependencies — Database, Redis, external APIs
  4. Resources — CPU, memory, disk, network
  5. Business metrics — Registrations, purchases, LLM costs

RED Metrics for Dashboards

# Rate
rate(http_requests_total[5m])

# Errors
sum(rate(http_requests_total{status=~"5.."}[5m]))

# Duration
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))

USE Metrics for Resources

  • Utilization — % of resource used
  • Saturation — Queue depth, wait time
  • Errors — Error count

SLO/SLI Definitions

Service Level Indicators (SLIs)

# Availability SLI: % of successful requests
sum(rate(http_requests_total{status!~"5.."}[30d])) /
sum(rate(http_requests_total[30d]))

# Latency SLI: % of requests < 1s
sum(rate(http_request_duration_seconds_bucket{le="1"}[30d])) /
sum(rate(http_request_duration_seconds_count[30d]))

Service Level Objectives (SLOs)

SLOTargetError Budget
Availability99.9%43 min downtime/month
Latency99% < 1s1% of requests can be slow

Error Budget: If consumed, freeze feature work and focus on reliability.

Health Checks (Kubernetes)

ProbePurposeEndpoint
LivenessIs app running?/health
ReadinessReady for traffic?/ready
StartupFinished starting?/startup

Dashboard Best Practices

  1. Use time ranges — Last 1h, 6h, 24h, 7d
  2. Percentiles over averages — p50, p95, p99
  3. Color code thresholds — green/yellow/red
  4. Include annotations — deployments, incidents
  5. Link to runbooks — from alert panels

Incorrect — using average latency hides tail latency:

avg(http_request_duration_seconds)  # Misleading for user experience

Correct — using percentiles shows tail latency:

histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))

Instrument Prometheus metrics as the foundation for alerting and dashboard observability — CRITICAL

Prometheus Metrics

RED Method (Rate, Errors, Duration)

Essential metrics for any service:

  • Rate — Requests per second
  • Errors — Failed requests per second
  • Duration — Request latency distribution

Metric Types

Counter — Monotonically increasing value

from prometheus_client import Counter

http_requests_total = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

# Usage
http_requests_total.labels(method='GET', endpoint='/api/users', status=200).inc()

Use cases: Request counts, error counts, bytes processed

Gauge — Value that can go up or down

from prometheus_client import Gauge

active_connections = Gauge(
    'active_connections',
    'Number of active database connections'
)

active_connections.set(25)
active_connections.inc()
active_connections.dec()

Use cases: Queue length, memory usage, active connections

Histogram — Distribution of values (with buckets)

from prometheus_client import Histogram

request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10]
)

with request_duration.labels(method='GET', endpoint='/api/users').time():
    pass  # handle request

Use cases: Request latency, response size

Histogram vs Summary

  • Histogram: Calculate quantiles server-side (recommended)
  • Summary: Calculate quantiles client-side (higher CPU, cannot aggregate across instances)
// HTTP request latency
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5]

// Database query latency
buckets: [0.001, 0.01, 0.05, 0.1, 0.5, 1]

// LLM request latency
buckets: [0.5, 1, 2, 5, 10, 20, 30]

Cardinality Management

# BAD: Unbounded cardinality
http_requests_total = Counter(
    'http_requests_total',
    ['method', 'endpoint', 'user_id']  # user_id creates millions of time series!
)

# GOOD: Bounded cardinality
http_requests_total = Counter(
    'http_requests_total',
    ['method', 'endpoint', 'status']  # ~10 x 100 x 10 = 10,000 series
)

Limits:

  • Good: < 10,000 unique time series per metric
  • Acceptable: 10,000-100,000
  • Bad: > 100,000 (Prometheus performance degrades)

Rule: Never use unbounded labels (user IDs, request IDs, timestamps)

Custom Business Metrics

# LLM token usage
llm_tokens_used = Counter(
    'llm_tokens_used_total',
    'Total LLM tokens consumed',
    ['model', 'operation']
)

# Cache hit rate
cache_operations = Counter(
    'cache_operations_total',
    'Cache operations',
    ['operation', 'result']  # result='hit|miss'
)

# Cache hit rate PromQL:
# sum(rate(cache_operations_total{result="hit"}[5m])) /
# sum(rate(cache_operations_total[5m]))

PromQL Quick Reference

# Rate of requests
rate(http_requests_total[5m])

# Error rate percentage
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m])) * 100

# p95 latency
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))

# Total cost per day by model
sum(increase(llm_cost_dollars_total[1d])) by (model)

Incorrect — unbounded label cardinality:

http_requests = Counter(
    'http_requests_total',
    ['method', 'endpoint', 'user_id']  # Millions of time series
)
http_requests.labels(method='GET', endpoint='/api', user_id='user_12345').inc()

Correct — bounded labels only:

http_requests = Counter(
    'http_requests_total',
    ['method', 'endpoint', 'status']  # ~10K time series max
)
http_requests.labels(method='GET', endpoint='/api', status='200').inc()

Alert on silent failures using statistical baselines and proactive health monitoring — HIGH

Silent Failure Alerting

Set up alerting for failures that produce no errors but deliver wrong results.

Incorrect — alerting only on exceptions:

try:
    result = await agent.run()
except Exception as e:
    alert(e)  # Only catches crashes, not silent failures
# Agent returned gibberish — no exception raised, no alert sent

Correct — statistical baseline anomaly detection:

import numpy as np

class BaselineAnomalyDetector:
    def __init__(self, window_size=100, z_threshold=3.0):
        self.window_size = window_size
        self.z_threshold = z_threshold
        self.history = []

    def add_observation(self, value: float) -> dict:
        self.history.append(value)
        if len(self.history) > self.window_size:
            self.history = self.history[-self.window_size:]
        if len(self.history) < 10:
            return {"alert": False, "reason": "insufficient_data"}
        mean = np.mean(self.history[:-1])
        std = np.std(self.history[:-1])
        if std == 0:
            return {"alert": False}
        z_score = abs(value - mean) / std
        if z_score > self.z_threshold:
            return {"alert": True, "type": "statistical_anomaly",
                    "z_score": z_score, "value": value, "mean": mean}
        return {"alert": False, "z_score": z_score}

Silent failure type priorities:

TypeDetection MethodPriority
Tool SkippingExpected vs actual tool callsCritical
Infinite LoopIteration count + token spikeCritical
Gibberish OutputLLM-as-judge + heuristicsHigh
Quality DegradationScore < baselineMedium
Latency Spikep99 > thresholdMedium

Key rules:

  • Alert on silent failures (service up, logic broken), not just errors
  • Use z-score > 3.0 (99.7% confidence) for anomaly detection
  • Maintain rolling baselines with 100-observation windows
  • Detection priority: tool skipping > loops > gibberish > anomalies
  • Need minimum 10 observations before baseline alerting is reliable
  • Combine statistical detection with proactive quality checks

Detect silent quality degradation in agent outputs that pass basic error checks — HIGH

Silent Quality Degradation Detection

Detect gibberish, repetitive, or low-quality LLM outputs that pass basic checks.

Incorrect — checking only for emptiness:

if len(response) > 0:  # Not-empty is not correct
    return response     # Gibberish passes this check

Correct — heuristic pre-filter + LLM-as-judge:

from langfuse import observe, get_client

@observe(name="quality_check")
async def detect_degraded_quality(response: str) -> dict:
    # Quick heuristics first (cheap, fast)
    if len(response) < 10:
        return {"alert": True, "type": "too_short"}

    # Repetition check: ratio of unique words to total words
    words = response.split()
    if len(words) > 0 and len(set(words)) / len(words) < 0.3:
        return {"alert": True, "type": "repetitive"}

    # LLM-as-judge for semantic quality (more expensive, run second)
    judge_prompt = f"""Rate this response quality (0-1):
    - 0: Gibberish, nonsensical, or completely wrong
    - 0.5: Partially correct but missing key information
    - 1: High quality, accurate, complete
    Response: {response[:1000]}
    Score (just the number):"""

    score = await llm.generate(judge_prompt)
    score_value = float(score.strip())
    get_client().score_current_trace(name="quality_check", value=score_value)

    if score_value < 0.5:
        return {"alert": True, "type": "low_quality", "score": score_value}
    return {"alert": False, "score": score_value}

Loop and token spike detection:

class LoopDetector:
    def __init__(self, max_iterations=10, token_spike_multiplier=3.0):
        self.max_iterations = max_iterations
        self.token_spike_multiplier = token_spike_multiplier
        self.iteration_count = 0
        self.total_tokens = 0
        self.baseline_tokens = 2000

    def check(self, tokens_used: int) -> dict:
        self.iteration_count += 1
        self.total_tokens += tokens_used
        if self.iteration_count > self.max_iterations:
            return {"alert": True, "type": "max_iterations"}
        expected = self.baseline_tokens * self.iteration_count
        if self.total_tokens > expected * self.token_spike_multiplier:
            return {"alert": True, "type": "token_spike",
                    "tokens": self.total_tokens, "expected": expected}
        return {"alert": False}

Key rules:

  • Layer detection: heuristics first (cheap), then LLM-as-judge (accurate)
  • Track unique-word ratio — below 0.3 indicates repetitive/stuck output
  • Monitor token consumption per iteration — 3x baseline indicates infinite loop
  • Log quality scores to Langfuse for trend analysis and drift detection
  • Not-empty and no-error are insufficient quality checks

Detect when agents silently skip expected tool calls and produce incorrect results — CRITICAL

Silent Tool Skipping Detection

Detect when LLM agents skip expected tool calls without raising errors.

Incorrect — assuming success if no error:

result = await agent.run()
# No error raised, but agent skipped the search tool entirely
# Result is fabricated from training data, not real data
return result  # Wrong answer delivered confidently

Correct — validate tool usage against expectations:

from langfuse import Langfuse

def check_tool_usage(trace_id: str, expected_tools: list[str]) -> dict:
    langfuse = Langfuse()
    trace = langfuse.fetch_trace(trace_id)

    actual_tools = [
        span.name for span in trace.observations
        if span.type == "tool"
    ]

    missing_tools = set(expected_tools) - set(actual_tools)

    if missing_tools:
        return {
            "alert": True,
            "type": "tool_skipping",
            "missing": list(missing_tools),
            "message": f"Agent skipped expected tools: {missing_tools}"
        }
    return {"alert": False}

# Usage
expected_tools = ["search", "calculate"]
tool_check = check_tool_usage(trace_id, expected_tools)
if tool_check["alert"]:
    alert(tool_check)
    fallback_to_manual_execution()

Key rules:

  • Never assume success just because no error was raised
  • Define expected tool lists per agent task and validate after execution
  • Tool skipping is often caused by middleware interference or prompt changes
  • Alert on tool skipping with Critical priority — it produces wrong results silently
  • Always have a fallback path when expected tools are not called

References (23)

Agent Observability

Agent Observability

Trace multi-agent systems with Agent Graphs, new observation types, and rendered tool calls.

Agent Graphs (GA Nov 2025)

Agent Graphs provide visual execution flow for multi-agent systems. Langfuse automatically renders agent handoffs, tool calls, and decision points as an interactive graph.

Enabling Agent Graphs

from langfuse import observe, get_client

@observe(type="agent", name="supervisor")
async def supervisor_agent(query: str):
    """Supervisor agent that routes to specialists."""
    get_client().update_current_observation(
        metadata={"routing_strategy": "semantic"}
    )

    intent = await classify_intent(query)

    if intent == "code_review":
        return await code_review_agent(query)
    elif intent == "security_audit":
        return await security_audit_agent(query)
    else:
        return await general_agent(query)


@observe(type="agent", name="code_review")
async def code_review_agent(query: str):
    """Specialist agent for code review."""
    context = await retrieve_context(query)
    return await generate_review(context)


@observe(type="agent", name="security_audit")
async def security_audit_agent(query: str):
    """Specialist agent for security auditing."""
    vulnerabilities = await scan_code(query)
    return await generate_audit_report(vulnerabilities)

Result in Langfuse UI — Agent Graph View

supervisor (agent)
├── classify_intent (chain) → "code_review"
├── code_review (agent)
│   ├── retrieve_context (retriever) → 5 chunks
│   └── generate_review (generation) → $0.03
│       └── tool_call: analyze_diff → rendered inline
└── Total: 3.2s, $0.05

The Agent Graph View renders:

  • Agent nodes with execution order
  • Tool calls with inputs/outputs rendered inline
  • Decision edges showing routing logic
  • Timing breakdown per agent

New Observation Types

Langfuse v3 adds 7 observation types beyond generation and span:

TypeUse CaseExample
agentAutonomous agent executionSupervisor, specialist agents
toolTool/function callAPI calls, database queries
chainSequential processing stepsPrompt chain, pipeline stage
retrieverDocument/context retrievalVector search, RAG retrieval
evaluatorQuality assessmentG-Eval judge, human review
embeddingEmbedding generationText → vector conversion
guardrailSafety/validation checkPII filter, toxicity check

Using Observation Types

from langfuse import observe, get_client

@observe(type="retriever", name="vector_search")
async def retrieve_context(query: str):
    """Retrieve relevant context from vector DB."""
    results = await vector_db.search(query, top_k=5)
    get_client().update_current_observation(
        metadata={
            "top_k": 5,
            "chunks_returned": len(results),
            "avg_similarity": sum(r.score for r in results) / len(results),
        }
    )
    return results


@observe(type="tool", name="web_search")
async def search_web(query: str):
    """Execute web search tool."""
    results = await tavily.search(query)
    get_client().update_current_observation(
        input=query,
        output=results[:3],  # Top 3 results
        metadata={"source": "tavily", "result_count": len(results)},
    )
    return results


@observe(type="guardrail", name="pii_filter")
async def check_pii(text: str):
    """Check for PII before sending to LLM."""
    has_pii = detect_pii(text)
    get_client().update_current_observation(
        input=text[:200],
        output={"has_pii": has_pii, "action": "blocked" if has_pii else "passed"},
        metadata={"check_type": "pii"},
    )
    if has_pii:
        raise PiiDetectedError("PII detected in input")
    return text


@observe(type="embedding", name="embed_query")
async def embed_query(text: str):
    """Generate embedding for query."""
    embedding = await embeddings.embed(text)
    get_client().update_current_observation(
        input=text,
        model="text-embedding-3-large",
        usage={"input_tokens": len(text.split())},
        metadata={"dimensions": len(embedding)},
    )
    return embedding


@observe(type="evaluator", name="relevance_judge")
async def evaluate_relevance(query: str, response: str):
    """Evaluate response relevance with LLM judge."""
    score = await llm_judge.evaluate(
        criteria="relevance",
        query=query,
        response=response,
    )
    get_client().update_current_observation(
        input={"query": query, "response": response[:500]},
        output={"score": score, "criteria": "relevance"},
    )
    # Each evaluator run creates its own inspectable trace
    return score

Rendered Tool Calls

In v3, tool calls within generations are rendered inline in the trace view:

@observe(type="agent")
async def coding_agent(task: str):
    response = await client.messages.create(
        model="claude-sonnet-4-6",
        tools=[
            {"name": "read_file", "description": "Read a file", "input_schema": {...}},
            {"name": "write_file", "description": "Write a file", "input_schema": {...}},
        ],
        messages=[{"role": "user", "content": task}],
    )

    # Tool calls automatically rendered in Langfuse trace:
    # generation → tool_use: read_file(path="src/main.py")
    #           → tool_result: "def main():..."
    #           → tool_use: write_file(path="src/main.py", content="...")
    #           → tool_result: "OK"
    return response

Trace Log View

The Trace Log View provides a chronological log of all events within a trace, useful for debugging agent loops:

[00:00.000] agent:supervisor START
[00:00.050] chain:classify_intent START
[00:00.320] chain:classify_intent END → "security_audit"
[00:00.321] agent:security_audit START
[00:00.400] retriever:vector_search START
[00:00.650] retriever:vector_search END → 5 chunks
[00:00.651] guardrail:pii_filter START
[00:00.680] guardrail:pii_filter END → passed
[00:00.681] generation:analyze START
[00:02.100] generation:analyze END → $0.04
[00:02.101] evaluator:relevance_judge START
[00:02.500] evaluator:relevance_judge END → 0.92
[00:02.501] agent:security_audit END
[00:02.502] agent:supervisor END → Total: 2.5s, $0.05

Framework Integration Examples

LangGraph

from langfuse import observe

@observe(type="agent", name="langgraph_supervisor")
async def run_langgraph_workflow(query: str):
    """LangGraph workflow with automatic Langfuse tracing."""
    from langgraph.graph import StateGraph

    graph = StateGraph(AgentState)
    graph.add_node("researcher", researcher_node)
    graph.add_node("writer", writer_node)

    # Each node automatically creates nested observations
    # when decorated with @observe
    app = graph.compile()
    return await app.ainvoke({"query": query})

CrewAI

from langfuse import observe

@observe(type="agent", name="crewai_crew")
async def run_crew(task: str):
    """CrewAI crew with Langfuse tracing."""
    from crewai import Crew, Agent, Task

    researcher = Agent(name="researcher", role="Research analyst")
    writer = Agent(name="writer", role="Technical writer")

    crew = Crew(
        agents=[researcher, writer],
        tasks=[Task(description=task, agent=researcher)],
    )
    # CrewAI has built-in Langfuse integration via callbacks
    return crew.kickoff()

OpenAI Agents SDK

from langfuse import observe

@observe(type="agent", name="openai_agent")
async def run_openai_agent(query: str):
    """OpenAI Agents SDK with Langfuse tracing."""
    from openai_agents import Agent, Runner

    agent = Agent(
        name="analyst",
        instructions="You are a code analyst.",
        model="gpt-5.2",
    )
    # OpenAI Agents SDK supports Langfuse via OTEL exporter
    result = await Runner.run(agent, query)
    return result.final_output

Best Practices

  1. Use type="agent" for autonomous agents that make routing decisions
  2. Use type="tool" for function calls to external services
  3. Use type="retriever" for all RAG retrieval steps
  4. Use type="guardrail" for safety checks (PII, toxicity, etc.)
  5. Use type="evaluator" for quality judges — each creates an inspectable trace
  6. Add metadata with routing decisions, chunk counts, similarity scores
  7. Name observations descriptively — they appear in the Agent Graph

References

Alerting Dashboards

Alerting and Dashboards

Effective alerting strategies and dashboard design patterns.

Alert Severity Levels

LevelResponse TimeExamples
Critical (P1)< 15 minService down, data loss
High (P2)< 1 hourMajor feature broken
Medium (P3)< 4 hoursIncreased error rate
Low (P4)Next dayWarnings

Key Alerts

AlertConditionSeverity
ServiceDownup == 0 for 1mCritical
HighErrorRate5xx > 5% for 5mCritical
HighLatencyp95 > 2s for 5mHigh
LowCacheHitRate< 70% for 10mMedium

Alert Grouping

Group related alerts:

route:
  group_by: ['alertname', 'cluster', 'service']
  group_wait: 30s        # Wait 30s to collect similar alerts
  group_interval: 5m     # Send grouped alerts every 5m
  repeat_interval: 4h    # Re-send alert after 4h if still firing

  routes:
  - match:
      severity: critical
    receiver: pagerduty
    continue: true        # Continue to other routes

  - match:
      severity: warning
    receiver: slack

Inhibition Rules

Suppress noisy alerts when root cause is known:

inhibit_rules:
# If ServiceDown is firing, suppress HighErrorRate and HighLatency
- source_match:
    alertname: ServiceDown
  target_match_re:
    alertname: (HighErrorRate|HighLatency)
  equal: ['service']

# If DatabaseDown is firing, suppress all DB-related alerts
- source_match:
    alertname: DatabaseDown
  target_match_re:
    alertname: Database.*
  equal: ['cluster']

Escalation Policies

# Escalation: Slack -> PagerDuty after 15 min
routes:
- match:
    severity: critical
  receiver: slack
  continue: true
  routes:
  - match:
      severity: critical
    receiver: pagerduty
    group_wait: 15m  # Escalate to PagerDuty after 15 min

Add runbook links to alert annotations:

groups:
- name: app-alerts
  rules:
  - alert: HighErrorRate
    expr: |
      sum(rate(http_requests_total{status=~"5.."}[5m])) /
      sum(rate(http_requests_total[5m])) > 0.05
    annotations:
      summary: "High error rate detected"
      description: "Error rate is {{ $value | humanizePercentage }}"
      runbook_url: "https://wiki.example.com/runbooks/high-error-rate"

Runbook should include:

  1. What the alert means
  2. Impact on users
  3. Common causes
  4. Investigation steps
  5. Remediation steps
  6. Escalation contacts

Dashboard Design Principles

Golden Signals Dashboard (top row)

+--------------+--------------+--------------+--------------+
|  Latency     |  Traffic     |  Errors      |  Saturation  |
|  (p50/p95)   |  (req/s)     |  (5xx rate)  |  (CPU/mem)   |
+--------------+--------------+--------------+--------------+

Service Dashboard Structure

  1. Overview (single row) - Traffic, errors, latency, saturation
  2. Request breakdown - By endpoint, method, status code
  3. Dependencies - Database, Redis, external APIs
  4. Resources - CPU, memory, disk, network
  5. Business metrics - Registrations, purchases, etc.

RED Metrics for Dashboards

  • Rate: rate(http_requests_total[5m])
  • Errors: sum(rate(http_requests_total\{status=~"5.."\}[5m]))
  • Duration: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))

USE Metrics for Resources

  • Utilization: % of resource used
  • Saturation: Queue depth, wait time
  • Errors: Error count

SLO/SLI Dashboards

Service Level Indicators (SLIs):

# Availability SLI: % of successful requests
sum(rate(http_requests_total{status!~"5.."}[30d])) /
sum(rate(http_requests_total[30d]))

# Latency SLI: % of requests < 1s
sum(rate(http_request_duration_seconds_bucket{le="1"}[30d])) /
sum(rate(http_request_duration_seconds_count[30d]))

Service Level Objectives (SLOs):

  • Availability: 99.9% (43 min downtime/month)
  • Latency: 99% of requests < 1s

Error Budget:

  • 99.9% SLO = 0.1% error budget
  • If error budget consumed, freeze feature work and focus on reliability

Notification Channels

  • PagerDuty - critical (on-call)
  • Slack - warnings (team channel)
  • Email - low priority (daily digest)

See scripts/alerting-rules.yml for complete examples.

Alerting Strategies

Alerting Strategies

Effective alerting to minimize false positives.

Alerting Rules (Prometheus)

groups:
  - name: api_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value }}% over last 5 minutes"

      - alert: HighLatency
        expr: histogram_quantile(0.95, http_request_duration_seconds_bucket) > 1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High API latency"

Alert Severity Levels

SeverityResponse TimeExample
CriticalImmediate (page)Service down, data loss
High30 minHigh error rate, disk full
Medium4 hoursSlow responses, high memory
LowNext dayDeprecation warnings

Best Practices

  1. Alert on symptoms, not causes - "Users can't login" not "CPU high"
  2. Actionable alerts only - every alert needs runbook
  3. Reduce noise - use for: 5m to avoid flapping
  4. Group related alerts - don't page for every instance
  5. Test alert rules - amtool alert query

Notification Channels

  • PagerDuty - critical (on-call)
  • Slack - warnings (team channel)
  • Email - low priority (daily digest)

See scripts/alerting-rules.yml for complete examples.

Annotation Queues

Annotation Queues

Human-review workflow in Langfuse: route traces to reviewers, collect scores, feed decisions into golden dataset curation.

What Are Annotation Queues?

Annotation queues let you route specific traces to human reviewers for scoring. Reviewers see the trace inputs, outputs, and existing automated scores, then add a human judgment score. The collected scores become ground truth for evaluator calibration and golden dataset inclusion decisions.

Typical uses:

  • Spot-checking high-cost or high-stakes LLM outputs
  • Reviewing traces flagged as low-quality by automated evaluators
  • Building ground-truth labels for fine-tuning or RAG evaluation

Creating Queues via Langfuse UI

  1. Navigate to Annotation Queues in the left sidebar
  2. Click Create Queue
  3. Configure:
    • Name — e.g., quality-review, safety-check, golden-dataset-candidates
    • Description — what reviewers should evaluate
    • Score configs — which scoring dimensions to collect (e.g., accuracy, relevance, safety)
  4. Share the queue URL with reviewers — no code access required

Score configs define what the reviewer sees and scores. Create them under Settings → Score Configs before creating the queue.

Adding Traces to a Queue Programmatically

Use get_client().create_annotation_queue_item() inside an @observe-decorated function to route a trace for human review:

from langfuse import observe, get_client

@observe(name="generate-response")
async def generate_and_flag(user_query: str, queue_id: str) -> str:
    """Generate a response and flag low-confidence outputs for human review."""
    response = await llm.generate(user_query)
    score = await auto_evaluate(response)

    # Flag for human review when automated confidence is low
    if score < 0.7:
        lf = get_client()
        trace_id = lf.get_current_trace_id()
        lf.create_annotation_queue_item(
            queue_id=queue_id,
            trace_id=trace_id,
        )

    return response

You can also add traces to a queue outside of an @observe context using a standalone client:

from langfuse import Langfuse

lf = Langfuse()

# Add a known trace ID to a review queue
lf.create_annotation_queue_item(
    queue_id="queue-abc123",
    trace_id="trace-xyz789",
)

Retrieve queue IDs programmatically via the Langfuse API or copy them from the UI URL.

Human-Review Workflow

Trace in Langfuse
      |
      v
[Automated score < threshold]
      |
      v
create_annotation_queue_item()  →  Queue
                                      |
                                      v
                              Reviewer opens queue URL
                                      |
                              Views: input / output / existing scores
                                      |
                              Adds human scores (accuracy, safety, etc.)
                                      |
                                      v
                          Scores stored on trace in Langfuse
                                      |
                                      v
                    [Optional] Trigger golden dataset inclusion

Reviewers access queues via the Langfuse UI — no SDK or code access required. The reviewer sees:

  • The trace input and output
  • Any automated scores already applied
  • The score dimensions configured for the queue

After scoring, human scores appear on the trace alongside automated scores and are queryable via the Langfuse API.

Fetching Completed Annotations

Query finished annotation items to drive downstream automation (e.g., auto-include high-scored traces into the golden dataset):

import httpx
import base64
import os

LANGFUSE_HOST = os.environ["LANGFUSE_HOST"]
PUBLIC_KEY = os.environ["LANGFUSE_PUBLIC_KEY"]
SECRET_KEY = os.environ["LANGFUSE_SECRET_KEY"]

auth = base64.b64encode(f"{PUBLIC_KEY}:{SECRET_KEY}".encode()).decode()
headers = {"Authorization": f"Basic {auth}"}

async def fetch_completed_annotations(queue_id: str) -> list[dict]:
    """Fetch completed annotation queue items."""
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            f"{LANGFUSE_HOST}/api/public/annotation-queues/{queue_id}/items",
            headers=headers,
            params={"status": "DONE"},
        )
        resp.raise_for_status()
        return resp.json()["data"]

async def promote_high_quality_to_dataset(queue_id: str, dataset_name: str):
    """Add human-approved traces to golden dataset."""
    from langfuse import Langfuse
    lf = Langfuse()

    items = await fetch_completed_annotations(queue_id)
    for item in items:
        # Check human score threshold
        scores = item.get("scores", [])
        quality_scores = [s["value"] for s in scores if s["name"] == "accuracy"]
        if quality_scores and quality_scores[0] >= 0.8:
            lf.create_dataset_item(
                dataset_name=dataset_name,
                trace_id=item["traceId"],
            )

Annotation queues feed directly into golden dataset curation:

  1. Automated multi-agent pipeline scores content (accuracy, coherence, depth, relevance)
  2. Items with quality_total >= 0.75 but low confidence go to the golden-dataset-candidates queue
  3. Human reviewer confirms or overrides the automated decision
  4. Approved traces are added to the evaluation dataset via create_dataset_item()

See ../../golden-dataset/rules/curation-annotation.md for the parallel multi-agent scoring pipeline that feeds this queue.

References

  • Langfuse Annotation Queues docs
  • ../references/evaluation-scores.md — automated scoring patterns
  • ../../golden-dataset/rules/curation-annotation.md — multi-agent curation pipeline

Cost Tracking

Token & Cost Tracking

Automatic cost calculation based on model pricing, with spend alerts and Metrics API.

Basic Cost Tracking (v3)

from langfuse import observe, get_client, Langfuse

langfuse = Langfuse()

@observe(name="security_audit")
async def run_audit(content: str):
    """Track costs automatically via @observe."""
    response = await llm.generate(
        model="claude-sonnet-4-6",
        messages=[{"role": "user", "content": f"Analyze for XSS: {content}"}],
    )

    get_client().update_current_observation(
        model="claude-sonnet-4-6",
        usage={
            "input": 1500,
            "output": 1000,
            "unit": "TOKENS",
        },
    )
    # Langfuse automatically calculates: $0.0045 + $0.015 = $0.0195
    return response

Pricing Database (Auto-Updated)

Langfuse maintains a pricing database for all major models. You can also define custom pricing:

langfuse = Langfuse()

# Custom model pricing
langfuse.create_model(
    model_name="claude-sonnet-4-6",
    match_pattern="claude-sonnet-4.*",
    unit="TOKENS",
    input_price=0.000003,  # $3/MTok
    output_price=0.000015,  # $15/MTok
    total_price=None,  # Calculated from input+output
)

Cost Tracking Per Analysis

from langfuse import Langfuse

langfuse = Langfuse()

# After analysis completes
trace = langfuse.get_trace(trace_id)
total_cost = sum(
    gen.calculated_total_cost or 0
    for gen in trace.observations
    if gen.type == "GENERATION"
)

# Store in database
await analysis_repo.update(
    analysis_id,
    langfuse_trace_id=trace.id,
    total_cost_usd=total_cost,
)

Spend Alerts

Configure alerts to get notified when costs exceed thresholds:

In Langfuse UI

  1. Navigate to Settings → Alerts
  2. Create alert rule:
    • Metric: Daily cost
    • Threshold: $50/day
    • Channel: Slack / Email / Webhook

Via API

langfuse = Langfuse()

# Programmatic spend check
from datetime import datetime, timedelta

# Get daily cost via v2 Metrics API
metrics = langfuse.get_metrics(
    metric_name="total_cost",
    from_timestamp=datetime.now() - timedelta(days=1),
    to_timestamp=datetime.now(),
)

daily_cost = metrics.values[0].value if metrics.values else 0

if daily_cost > 50.0:
    await send_alert(
        channel="slack",
        message=f"Daily LLM cost alert: ${daily_cost:.2f} exceeds $50 threshold",
    )

v2 Metrics API (Beta)

Query cost and usage metrics programmatically instead of SQL:

from langfuse import Langfuse
from datetime import datetime, timedelta

langfuse = Langfuse()

# Total cost over last 7 days
metrics = langfuse.get_metrics(
    metric_name="total_cost",
    from_timestamp=datetime.now() - timedelta(days=7),
    to_timestamp=datetime.now(),
    granularity="day",
)

for point in metrics.values:
    print(f"{point.timestamp.date()}: ${point.value:.2f}")

# Token usage by model
token_metrics = langfuse.get_metrics(
    metric_name="total_tokens",
    from_timestamp=datetime.now() - timedelta(days=7),
    to_timestamp=datetime.now(),
    group_by="model",
)

for group in token_metrics.groups:
    print(f"{group.key}: {group.values[0].value:,} tokens")

Monitoring Dashboard Queries

Top 10 Most Expensive Traces (Last 7 Days)

SELECT
    name,
    user_id,
    calculated_total_cost,
    input_tokens,
    output_tokens
FROM traces
WHERE timestamp > NOW() - INTERVAL '7 days'
ORDER BY calculated_total_cost DESC
LIMIT 10;

Average Cost by Agent Type

SELECT
    metadata->>'agent_type' as agent,
    COUNT(*) as traces,
    AVG(calculated_total_cost) as avg_cost,
    SUM(calculated_total_cost) as total_cost
FROM traces
WHERE metadata->>'agent_type' IS NOT NULL
GROUP BY agent
ORDER BY total_cost DESC;

Daily Cost Trend

SELECT
    DATE(timestamp) as date,
    SUM(calculated_total_cost) as daily_cost,
    COUNT(*) as trace_count
FROM traces
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp)
ORDER BY date;

Best Practices

  1. Always pass usage data with input/output token counts
  2. Monitor costs daily with spend alerts to catch spikes early
  3. Set up threshold alerts for abnormal cost increases (> 2x daily average)
  4. Track costs by user_id to identify expensive users
  5. Group by metadata (content_type, agent_type) for cost attribution
  6. Use custom pricing for self-hosted models
  7. Use Metrics API for programmatic cost queries instead of raw SQL

References

Dashboards

Monitoring Dashboards

Grafana dashboard patterns and SLO/SLI definitions.

The Four Golden Signals

SignalMetricDescription
LatencyResponse timeHow long requests take
TrafficRequests/secVolume of demand
ErrorsError rateFailures per second
SaturationResource usageHow full the service is

SLO/SLI Examples

# SLO: 99.9% availability
SLI: availability = successful_requests / total_requests
Target: > 0.999

# SLO: 95% of requests < 500ms
SLI: latency_p95 = histogram_quantile(0.95, request_duration_seconds)
Target: < 0.5

# SLO: < 0.1% error rate
SLI: error_rate = failed_requests / total_requests
Target: < 0.001

Grafana Dashboard Structure

  1. Overview row - traffic, errors, latency
  2. Saturation row - CPU, memory, disk
  3. Details row - per-endpoint breakdown
  4. Database row - query performance, connections

Best Practices

  1. Use time ranges - Last 1h, 6h, 24h, 7d
  2. Percentiles over averages - p50, p95, p99
  3. Color code thresholds - green/yellow/red
  4. Include annotations - deployments, incidents

See Grafana dashboards in backend/grafana/dashboards/.

Distributed Tracing

Distributed Tracing

Track requests across microservices with OpenTelemetry.

Basic Setup (Node.js)

import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';

const sdk = new NodeSDK({
  traceExporter: new JaegerExporter(),
  instrumentations: [getNodeAutoInstrumentations()],
});

sdk.start();

Span Relationships

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

# Parent span
with tracer.start_as_current_span("analyze_content") as parent_span:
    parent_span.set_attribute("content.url", url)
    parent_span.set_attribute("content.type", "article")

    # Child span (sequential)
    with tracer.start_as_current_span("fetch_content") as fetch_span:
        content = await fetch_url(url)
        fetch_span.set_attribute("content.size_bytes", len(content))

    # Another child span (sequential)
    with tracer.start_as_current_span("generate_embedding") as embed_span:
        embedding = await embed_text(content)
        embed_span.set_attribute("embedding.dimensions", len(embedding))

    # Parallel child spans (using asyncio.gather)
    async def analyze_with_span(agent_name: str, content: str):
        with tracer.start_as_current_span(f"agent_{agent_name}"):
            return await agent.analyze(content)

    results = await asyncio.gather(
        analyze_with_span("tech_comparator", content),
        analyze_with_span("security_auditor", content),
        analyze_with_span("implementation_planner", content)
    )

Trace Sampling Strategies

Head-based sampling (decide at trace start):

from opentelemetry.sdk.trace.sampling import (
    TraceIdRatioBased,  # Sample X% of traces
    ParentBased,        # Follow parent's sampling decision
    ALWAYS_ON,          # Always sample
    ALWAYS_OFF          # Never sample
)

# Sample 10% of traces
sampler = TraceIdRatioBased(0.1)

Tail-based sampling (decide after trace completes):

  • Keep all traces with errors
  • Keep slow traces (p95+ latency)
  • Sample 1% of successful fast traces

Recommended sampling:

  • Development: 100% sampling
  • Production: 10% sampling, 100% for errors

Context Propagation

// Service A: Create trace context
const ctx = context.active();

// Service B: Extract trace context from headers
const propagatedCtx = propagation.extract(ctx, request.headers);
context.with(propagatedCtx, () => {
  // This span will be child of Service A's span
  const span = tracer.startSpan('service_b_operation');
  // ...
  span.end();
});

Trace Analysis Queries

Find slow traces:

duration > 2s

Find traces with errors:

status = error

Find traces for specific user:

user.id = "abc-123"

Find traces hitting specific service:

service.name = "analysis-worker"

Best Practices

  1. Sample smartly - 10% for high traffic, 100% for errors
  2. Add attributes - user_id, order_id, error_type
  3. Propagate context - across HTTP, gRPC, message queues
  4. Tag errors - error=true for filtering

See scripts/opentelemetry-tracing.ts for complete setup.

Embedding Drift

Embedding Drift Detection

Monitor semantic drift in LLM applications using embedding-based methods.

Overview

Traditional statistical methods (PSI, KS) don't work well for unstructured text data. Embedding drift detection uses vector representations to detect semantic changes.

Arize Phoenix Integration

import phoenix as px
from phoenix.trace import TraceDataset
import numpy as np

# Launch Phoenix for local observability
session = px.launch_app()

# Analyze embedding drift
def analyze_embedding_drift(
    baseline_embeddings: np.ndarray,
    current_embeddings: np.ndarray
) -> dict:
    """
    Analyze drift in embedding space using Phoenix.

    Args:
        baseline_embeddings: Reference embeddings (N x D)
        current_embeddings: Current embeddings (M x D)

    Returns:
        Drift analysis results
    """
    # Phoenix provides built-in drift analysis
    drift_analysis = px.Client().compute_drift(
        primary_embeddings=current_embeddings,
        reference_embeddings=baseline_embeddings
    )

    return drift_analysis

Cluster-Based Drift Detection

from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import numpy as np

class ClusterDriftDetector:
    """Detect drift by monitoring cluster distributions."""

    def __init__(self, n_clusters: int = 10, psi_threshold: float = 0.25):
        self.n_clusters = n_clusters
        self.psi_threshold = psi_threshold
        self.kmeans = None
        self.baseline_distribution = None

    def fit_baseline(self, embeddings: np.ndarray):
        """Fit clusters on baseline embeddings."""
        self.kmeans = KMeans(
            n_clusters=self.n_clusters,
            random_state=42,
            n_init=10
        )
        labels = self.kmeans.fit_predict(embeddings)

        # Store baseline cluster distribution
        self.baseline_distribution = np.bincount(
            labels,
            minlength=self.n_clusters
        ) / len(labels)

        return self

    def detect_drift(self, embeddings: np.ndarray) -> dict:
        """Detect drift in new embeddings."""
        if self.kmeans is None:
            raise ValueError("Must call fit_baseline first")

        # Assign new embeddings to clusters
        labels = self.kmeans.predict(embeddings)

        # Current cluster distribution
        current_distribution = np.bincount(
            labels,
            minlength=self.n_clusters
        ) / len(labels)

        # Calculate PSI between distributions
        psi = self._calculate_psi(
            self.baseline_distribution,
            current_distribution
        )

        # Calculate centroid distances
        centroid_shift = self._calculate_centroid_shift(embeddings, labels)

        return {
            "psi": psi,
            "drift_detected": psi > self.psi_threshold,
            "baseline_distribution": self.baseline_distribution.tolist(),
            "current_distribution": current_distribution.tolist(),
            "centroid_shift": centroid_shift,
            "interpretation": self._interpret(psi, centroid_shift)
        }

    def _calculate_psi(self, expected: np.ndarray, actual: np.ndarray) -> float:
        """Calculate PSI between cluster distributions."""
        eps = 0.0001
        expected = expected + eps
        actual = actual + eps
        return np.sum((actual - expected) * np.log(actual / expected))

    def _calculate_centroid_shift(
        self,
        embeddings: np.ndarray,
        labels: np.ndarray
    ) -> dict:
        """Calculate how much cluster centroids have shifted."""
        shifts = {}
        for i in range(self.n_clusters):
            cluster_embeddings = embeddings[labels == i]
            if len(cluster_embeddings) > 0:
                current_centroid = cluster_embeddings.mean(axis=0)
                baseline_centroid = self.kmeans.cluster_centers_[i]
                shift = np.linalg.norm(current_centroid - baseline_centroid)
                shifts[f"cluster_{i}"] = float(shift)
        return shifts

    def _interpret(self, psi: float, centroid_shift: dict) -> str:
        avg_shift = np.mean(list(centroid_shift.values()))
        if psi < 0.1 and avg_shift < 0.1:
            return "No significant drift"
        elif psi < 0.25:
            return "Minor drift detected, monitor closely"
        else:
            return "Significant drift, investigate and consider retraining"

Centroid Distance Monitoring

import numpy as np
from typing import Optional

class CentroidMonitor:
    """Monitor drift via embedding centroid movement."""

    def __init__(self, distance_threshold: float = 0.2):
        self.distance_threshold = distance_threshold
        self.baseline_centroid: Optional[np.ndarray] = None
        self.baseline_std: Optional[float] = None

    def set_baseline(self, embeddings: np.ndarray):
        """Set baseline centroid from reference embeddings."""
        self.baseline_centroid = embeddings.mean(axis=0)

        # Calculate average distance from centroid
        distances = np.linalg.norm(
            embeddings - self.baseline_centroid,
            axis=1
        )
        self.baseline_std = distances.std()

        return self

    def check_drift(self, embeddings: np.ndarray) -> dict:
        """Check if current embeddings have drifted from baseline."""
        if self.baseline_centroid is None:
            raise ValueError("Must call set_baseline first")

        # Current centroid
        current_centroid = embeddings.mean(axis=0)

        # Distance between centroids
        centroid_distance = np.linalg.norm(
            current_centroid - self.baseline_centroid
        )

        # Normalized by baseline spread
        normalized_distance = centroid_distance / (self.baseline_std + 1e-10)

        # Check individual embedding distances
        distances = np.linalg.norm(
            embeddings - self.baseline_centroid,
            axis=1
        )
        outlier_ratio = (distances > 3 * self.baseline_std).mean()

        return {
            "centroid_distance": float(centroid_distance),
            "normalized_distance": float(normalized_distance),
            "outlier_ratio": float(outlier_ratio),
            "drift_detected": normalized_distance > self.distance_threshold,
            "severity": self._severity(normalized_distance, outlier_ratio)
        }

    def _severity(self, distance: float, outlier_ratio: float) -> str:
        if distance < 0.1 and outlier_ratio < 0.05:
            return "none"
        elif distance < 0.2 and outlier_ratio < 0.1:
            return "low"
        elif distance < 0.3 and outlier_ratio < 0.2:
            return "medium"
        else:
            return "high"

Cosine Similarity Drift

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def cosine_drift_score(
    baseline_embeddings: np.ndarray,
    current_embeddings: np.ndarray,
    sample_size: int = 1000
) -> dict:
    """
    Measure drift using cosine similarity distributions.

    Args:
        baseline_embeddings: Reference embeddings
        current_embeddings: Current embeddings
        sample_size: Number of pairs to sample

    Returns:
        Drift analysis based on cosine similarities
    """
    # Sample pairs for efficiency
    n_baseline = min(len(baseline_embeddings), sample_size)
    n_current = min(len(current_embeddings), sample_size)

    baseline_sample = baseline_embeddings[
        np.random.choice(len(baseline_embeddings), n_baseline, replace=False)
    ]
    current_sample = current_embeddings[
        np.random.choice(len(current_embeddings), n_current, replace=False)
    ]

    # Baseline self-similarity
    baseline_centroid = baseline_sample.mean(axis=0)
    baseline_similarities = cosine_similarity(
        baseline_sample,
        baseline_centroid.reshape(1, -1)
    ).flatten()

    # Current similarity to baseline centroid
    current_similarities = cosine_similarity(
        current_sample,
        baseline_centroid.reshape(1, -1)
    ).flatten()

    # Compare distributions
    baseline_mean = baseline_similarities.mean()
    current_mean = current_similarities.mean()

    similarity_drop = baseline_mean - current_mean

    return {
        "baseline_mean_similarity": float(baseline_mean),
        "current_mean_similarity": float(current_mean),
        "similarity_drop": float(similarity_drop),
        "drift_detected": similarity_drop > 0.1,
        "interpretation": (
            "Significant semantic drift"
            if similarity_drop > 0.1
            else "No significant drift"
        )
    }

RAG Retrieval Drift

from typing import List
import numpy as np

class RAGDriftMonitor:
    """Monitor drift in RAG retrieval quality."""

    def __init__(
        self,
        similarity_threshold: float = 0.7,
        coverage_threshold: float = 0.8
    ):
        self.similarity_threshold = similarity_threshold
        self.coverage_threshold = coverage_threshold
        self.baseline_queries: List[np.ndarray] = []
        self.baseline_retrievals: List[List[np.ndarray]] = []

    def add_baseline(
        self,
        query_embedding: np.ndarray,
        retrieved_embeddings: List[np.ndarray]
    ):
        """Add a query-retrieval pair to baseline."""
        self.baseline_queries.append(query_embedding)
        self.baseline_retrievals.append(retrieved_embeddings)

    def check_retrieval_drift(
        self,
        query_embedding: np.ndarray,
        retrieved_embeddings: List[np.ndarray]
    ) -> dict:
        """
        Check if retrieval for a query has drifted.

        Useful for detecting:
        - Index staleness
        - Embedding model changes
        - Document corpus drift
        """
        # Find most similar baseline query
        similarities = [
            cosine_similarity(
                query_embedding.reshape(1, -1),
                bq.reshape(1, -1)
            )[0, 0]
            for bq in self.baseline_queries
        ]

        best_match_idx = np.argmax(similarities)
        query_similarity = similarities[best_match_idx]

        if query_similarity < self.similarity_threshold:
            return {
                "drift_detected": False,
                "reason": "Query too different from baseline"
            }

        # Compare retrieved documents
        baseline_retrieved = self.baseline_retrievals[best_match_idx]

        # Calculate coverage: how many baseline docs are still retrieved
        coverage = self._calculate_coverage(
            baseline_retrieved,
            retrieved_embeddings
        )

        return {
            "query_similarity": float(query_similarity),
            "coverage": float(coverage),
            "drift_detected": coverage < self.coverage_threshold,
            "interpretation": (
                f"Retrieval coverage dropped to {coverage:.2%}"
                if coverage < self.coverage_threshold
                else "Retrieval stable"
            )
        }

    def _calculate_coverage(
        self,
        baseline: List[np.ndarray],
        current: List[np.ndarray]
    ) -> float:
        """Calculate what fraction of baseline docs are still retrieved."""
        if not baseline or not current:
            return 0.0

        baseline_stack = np.stack(baseline)
        current_stack = np.stack(current)

        # For each baseline doc, check if similar doc is in current
        similarities = cosine_similarity(baseline_stack, current_stack)
        max_similarities = similarities.max(axis=1)

        # Count docs with similarity > threshold
        covered = (max_similarities > self.similarity_threshold).sum()

        return covered / len(baseline)

Evidently AI Integration

from evidently import Report
from evidently.metrics import EmbeddingsDriftMetric
import pandas as pd
import numpy as np

def evidently_embedding_drift(
    baseline_embeddings: np.ndarray,
    current_embeddings: np.ndarray,
    embedding_column: str = "embedding"
) -> dict:
    """
    Use Evidently AI for embedding drift detection.

    Evidently uses model-based drift detection by default:
    Trains a classifier to distinguish baseline vs current.
    """
    # Create DataFrames
    baseline_df = pd.DataFrame({
        embedding_column: list(baseline_embeddings)
    })
    current_df = pd.DataFrame({
        embedding_column: list(current_embeddings)
    })

    # Run Evidently report
    report = Report(metrics=[
        EmbeddingsDriftMetric(column_name=embedding_column)
    ])

    report.run(
        reference_data=baseline_df,
        current_data=current_df
    )

    # Extract results
    result = report.as_dict()["metrics"][0]["result"]

    return {
        "drift_score": result.get("drift_score"),
        "drift_detected": result.get("drift_detected"),
        "method": "model_based",
        "details": result
    }

References

Evaluation Scores

LLM Evaluation & Scoring

Track quality metrics with custom scores, automated evaluation, and evaluator execution tracing.

Basic Scoring (v3)

from langfuse import observe, get_client, Langfuse

langfuse = Langfuse()

@observe()
async def analyze_and_score(query: str):
    """Run analysis and score the result."""
    response = await llm.generate(query)

    # Score via get_client() within @observe context
    get_client().update_current_observation(
        output=response[:500],
    )

    # Score the trace
    get_client().score_current_trace(
        name="relevance",
        value=0.85,
        comment="Response addresses query but lacks depth",
    )
    return response


# Or score by trace_id directly
langfuse.score(
    trace_id="trace_123",
    name="factuality",
    value=0.92,
    data_type="NUMERIC",
)

Evaluator Execution Tracing

In v3, each evaluator run creates its own inspectable trace:

from langfuse import observe, get_client

@observe(type="evaluator", name="relevance_judge")
async def evaluate_relevance(query: str, response: str):
    """Each evaluator call creates an inspectable trace in Langfuse."""
    score = await llm_judge.evaluate(
        criteria="relevance",
        query=query,
        response=response,
    )

    get_client().update_current_observation(
        input={"query": query[:500], "response": response[:500]},
        output={"score": score, "criteria": "relevance"},
        model="claude-sonnet-4-6",
    )

    # The evaluator's own LLM calls are visible in its trace
    return score

Result in Langfuse UI:

evaluator:relevance_judge (0.8s, $0.01)
├── generation: judge_prompt → score: 0.85
└── metadata: {criteria: "relevance", model: "claude-sonnet-4-6"}

Score Analytics

View multi-score comparisons in the Langfuse dashboard:

  • Score distributions: Histogram of scores by criterion
  • Multi-score comparison: Side-by-side comparison of relevance, depth, accuracy
  • Quality trends: Track scores over time
  • Filter by threshold: Show only low-scoring traces
  • Compare prompts: Which prompt version scores higher?

Mutable Score Configs

Configure score types and ranges in Langfuse settings:

# Score configs can be updated without code changes
# In Langfuse UI: Settings → Score Configs

# Numeric scores
langfuse.score(trace_id="...", name="relevance", value=0.85, data_type="NUMERIC")

# Categorical scores
langfuse.score(trace_id="...", name="sentiment", value="positive", data_type="CATEGORICAL")

# Boolean scores
langfuse.score(trace_id="...", name="contains_pii", value=0, data_type="BOOLEAN")

Automated Scoring with G-Eval

from langfuse import observe, get_client
from app.shared.services.g_eval import GEvalScorer

scorer = GEvalScorer()

@observe()
async def analyze_with_scoring(query: str):
    response = await llm.generate(query)

    # Run G-Eval scoring
    scores = await scorer.score(
        query=query,
        response=response,
        criteria=["relevance", "coherence", "depth"],
    )

    # Record all scores
    for criterion, score in scores.items():
        get_client().score_current_trace(name=criterion, value=score)

    return response

Quality Scores Trend Query

SELECT
    DATE(timestamp) as date,
    AVG(value) FILTER (WHERE name = 'relevance') as avg_relevance,
    AVG(value) FILTER (WHERE name = 'depth') as avg_depth,
    AVG(value) FILTER (WHERE name = 'factuality') as avg_factuality
FROM scores
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp)
ORDER BY date;

Datasets for Evaluation

Create test datasets and run automated evaluations:

from langfuse import Langfuse, observe, get_client

langfuse = Langfuse()

# Fetch dataset
dataset = langfuse.get_dataset("security_audit_test_set")

@observe()
async def evaluate_item(item):
    """Evaluate a single dataset item with tracing."""
    response = await llm.generate(item.input)

    get_client().update_current_observation(
        input=item.input,
        output=response,
    )

    # Score
    score = await evaluate_response(item.expected_output, response)
    get_client().score_current_trace(name="accuracy", value=score)

    return score

# Run evaluation
for item in dataset.items:
    await evaluate_item(item)

Dataset Structure in UI

security_audit_test_set
├── item_1: XSS vulnerability test
│   ├── input: "Check this HTML for XSS..."
│   └── expected_output: "Found XSS in innerHTML..."
├── item_2: SQL injection test
│   ├── input: "Review this SQL query..."
│   └── expected_output: "SQL injection vulnerability in WHERE clause..."
└── item_3: CSRF protection test
    ├── input: "Analyze this form..."
    └── expected_output: "Missing CSRF token..."

Evaluation Metrics

Common score types:

MetricRangeDescription
Relevance0-1Does response address the query?
Coherence0-1Is response logically structured?
Depth0-1Level of detail and analysis
Factuality0-1Accuracy of claims
Completeness0-1All aspects of query covered?
Toxicity0-1Harmful or inappropriate content

Best Practices

  1. Score all production traces for quality monitoring
  2. Use evaluator type (@observe(type="evaluator")) for inspectable judge traces
  3. Use consistent criteria across all evaluations
  4. Automate scoring with G-Eval or similar
  5. Set quality thresholds (e.g., avg_relevance > 0.7)
  6. Create test datasets for regression testing
  7. Track scores by prompt version to measure improvements
  8. Alert on quality drops (e.g., avg_score < 0.6 for 3 days)

Integration with OrchestKit Quality Gate

from langfuse import observe, get_client

@observe(name="quality_gate")
async def quality_gate_node(state: WorkflowState):
    """Quality gate with Langfuse scoring."""

    # Get scores from evaluators
    scores = await run_quality_evaluators(state)

    # Log scores to trace
    for criterion, score in scores.items():
        get_client().score_current_trace(name=criterion, value=score)

    # Check threshold
    avg_score = sum(scores.values()) / len(scores)
    passed = avg_score >= 0.7

    return {"quality_gate_passed": passed, "quality_scores": scores}

References

Ewma Baselines

EWMA Dynamic Baselines

Exponentially Weighted Moving Average for adaptive drift detection baselines.

Basic EWMA

import numpy as np
from dataclasses import dataclass

@dataclass
class EWMAState:
    mean: float = 0.0
    variance: float = 0.0
    count: int = 0

class EWMABaseline:
    """EWMA-based dynamic baseline. Formula: EWMA_t = α × X_t + (1-α) × EWMA_{t-1}"""

    def __init__(self, alpha: float = 0.2, sigma_threshold: float = 3.0, min_samples: int = 10):
        self.alpha = alpha
        self.sigma_threshold = sigma_threshold
        self.min_samples = min_samples
        self.state = EWMAState()

    def update(self, value: float) -> dict:
        """Update baseline and check for anomaly."""
        self.state.count += 1

        if self.state.count == 1:
            self.state.mean = value
            self.state.variance = 0.0
        else:
            delta = value - self.state.mean
            self.state.mean = self.alpha * value + (1 - self.alpha) * self.state.mean
            self.state.variance = (1 - self.alpha) * (self.state.variance + self.alpha * delta ** 2)

        std = np.sqrt(self.state.variance) if self.state.variance > 0 else 0.001
        z_score = abs(value - self.state.mean) / std
        is_anomaly = self.state.count >= self.min_samples and z_score > self.sigma_threshold

        return {
            "value": value,
            "ewma_mean": self.state.mean,
            "ewma_std": std,
            "z_score": z_score,
            "is_anomaly": is_anomaly
        }

Multi-Metric Tracker

class MultiMetricEWMA:
    """Track multiple metrics with independent baselines."""

    def __init__(self, metrics: list[str], alpha: float = 0.2):
        self.baselines = {m: EWMABaseline(alpha=alpha) for m in metrics}

    def update(self, metrics: dict) -> dict:
        results = {}
        anomalies = []
        for name, value in metrics.items():
            if name in self.baselines:
                result = self.baselines[name].update(value)
                results[name] = result
                if result["is_anomaly"]:
                    anomalies.append({"metric": name, "z_score": result["z_score"]})
        return {"metrics": results, "anomalies": anomalies}

Alpha Selection

Use CaseAlphaBehavior
Stable production0.1Slow adaptation
Active development0.3Moderate
High variability0.1-0.15Very stable
Sudden change detection0.4-0.5Quick response

References

Experiments Api

Langfuse Experiments API

Overview

The Experiments API enables systematic evaluation of LLM outputs across datasets. Use it for A/B testing prompts, comparing models, and tracking quality over time. v3 adds the Experiment Runner SDK, dataset item versioning, and corrected outputs.

┌─────────────────────────────────────────────────────────────────────┐
│                     Langfuse Experiments Flow                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   ┌──────────┐     ┌────────────┐     ┌──────────────┐              │
│   │ Dataset  │────▶│ Experiment │────▶│ Runs (Items) │              │
│   │ (inputs) │     │ (config)   │     │ (executions) │              │
│   └──────────┘     └────────────┘     └──────┬───────┘              │
│                                              │                       │
│                                              ▼                       │
│                                    ┌──────────────────┐             │
│                                    │ Evaluators       │             │
│                                    │ (judge outputs)  │             │
│                                    └────────┬─────────┘             │
│                                             │                        │
│                                             ▼                        │
│                                    ┌──────────────────┐             │
│                                    │ Scores           │             │
│                                    │ (per run)        │             │
│                                    └──────────────────┘             │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

Experiment Runner SDK (v3)

The high-level API simplifies running experiments:

from langfuse import Langfuse

langfuse = Langfuse()


async def my_pipeline(input_data: dict) -> str:
    """Your LLM pipeline to evaluate."""
    return await llm.generate(input_data["query"])


# Run experiment in one call
result = langfuse.run_experiment(
    dataset_name="golden-analysis-dataset",
    experiment_name="sonnet-v-gpt5",
    run_fn=my_pipeline,
    evaluators=[
        {"name": "relevance", "fn": relevance_evaluator},
        {"name": "depth", "fn": depth_evaluator},
    ],
)

# Result contains:
# - experiment_id
# - per-item scores
# - aggregate statistics
print(f"Avg relevance: {result.stats['relevance']['mean']:.2f}")
print(f"Avg depth: {result.stats['depth']['mean']:.2f}")

Creating Datasets

From Code

from langfuse import Langfuse

langfuse = Langfuse()

# Create dataset with JSON schema enforcement
dataset = langfuse.create_dataset(
    name="golden-analysis-dataset",
    description="Curated analysis examples with expected outputs",
    metadata={"version": "v2", "schema_version": "1.0"},
)

# Add items with versioning
items = [
    {
        "input": {"url": "https://example.com/article1", "type": "article"},
        "expected_output": "Expected analysis for article 1...",
        "metadata": {"category": "tutorial", "difficulty": "beginner"},
    },
    {
        "input": {"url": "https://example.com/article2", "type": "article"},
        "expected_output": "Expected analysis for article 2...",
        "metadata": {"category": "reference", "difficulty": "advanced"},
    },
]

for item in items:
    langfuse.create_dataset_item(
        dataset_name="golden-analysis-dataset",
        input=item["input"],
        expected_output=item.get("expected_output"),
        metadata=item.get("metadata"),
    )

From Existing Traces

# Create dataset from production traces
traces = langfuse.get_traces(
    filter={
        "score_name": "human_verified",
        "score_value_gte": 0.9,  # Only high-quality
    },
    limit=100,
)

dataset = langfuse.create_dataset(name="production-golden-v1")

for trace in traces:
    langfuse.create_dataset_item(
        dataset_name="production-golden-v1",
        input=trace.input,
        expected_output=trace.output,
        metadata={"trace_id": trace.id},
    )

Dataset Item Versioning

Track changes to dataset items over time:

# Update an existing item — creates a new version
langfuse.update_dataset_item(
    dataset_name="golden-analysis-dataset",
    item_id="item_123",
    expected_output="Updated expected output with more detail...",
    metadata={"version": 2, "updated_by": "human_reviewer"},
)

# View item history in Langfuse UI:
# item_123 v1 (Jan 15) → v2 (Feb 01)
# Each experiment run records which version it evaluated against

JSON Schema Enforcement

Enforce structure on dataset items:

# Create dataset with schema
dataset = langfuse.create_dataset(
    name="structured-eval-dataset",
    description="Dataset with enforced input schema",
    metadata={
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "context": {"type": "array", "items": {"type": "string"}},
            },
            "required": ["query"],
        }
    },
)

# Items must match schema — invalid items are rejected
langfuse.create_dataset_item(
    dataset_name="structured-eval-dataset",
    input={"query": "What is XSS?", "context": ["OWASP guide..."]},
    expected_output="XSS is a web security vulnerability...",
)

Dataset Folder Organization

Organize datasets into folders:

Datasets/
├── production/
│   ├── golden-v1
│   ├── golden-v2
│   └── regression-suite
├── experiments/
│   ├── prompt-variants
│   └── model-comparison
└── development/
    ├── unit-tests
    └── edge-cases

Batch Add Observations to Datasets

Add multiple observations at once:

# Batch add from production traces
observation_ids = ["obs_1", "obs_2", "obs_3", "obs_4", "obs_5"]

langfuse.batch_add_dataset_items(
    dataset_name="production-golden-v1",
    observation_ids=observation_ids,
    metadata={"batch": "2026-02-01", "source": "production"},
)

Corrected Outputs for Fine-Tuning

Use corrected outputs to build fine-tuning datasets:

# Add corrected output to existing dataset item
langfuse.update_dataset_item(
    dataset_name="golden-analysis-dataset",
    item_id="item_456",
    corrected_output="Human-corrected version of the analysis...",
    metadata={"corrected_by": "expert_reviewer", "correction_type": "factual"},
)

# Export for fine-tuning
items = langfuse.get_dataset("golden-analysis-dataset").items

fine_tuning_data = []
for item in items:
    if item.corrected_output:
        fine_tuning_data.append({
            "messages": [
                {"role": "user", "content": str(item.input)},
                {"role": "assistant", "content": item.corrected_output},
            ]
        })

# Export as JSONL for fine-tuning
import json
with open("fine_tuning.jsonl", "w") as f:
    for entry in fine_tuning_data:
        f.write(json.dumps(entry) + "\n")

Running Experiments (Manual)

Basic Experiment

from langfuse import observe, get_client, Langfuse

langfuse = Langfuse()


@observe()
async def run_experiment(
    dataset_name: str,
    experiment_name: str,
    model_config: dict,
):
    """Run an experiment on a dataset."""
    dataset = langfuse.get_dataset(dataset_name)

    results = []

    for item in dataset.items:
        @observe(name="experiment_run")
        async def evaluate_item(item=item):
            output = await your_pipeline(item.input, model_config)

            get_client().update_current_observation(
                input=item.input,
                output=output,
                metadata={"dataset_item_id": item.id},
            )

            return output

        output = await evaluate_item()
        results.append({"item_id": item.id, "output": output})

    return results

A/B Testing Models

async def ab_test_models(dataset_name: str):
    """Compare two model configurations."""

    configs = {
        "sonnet": {"model": "claude-sonnet-4-6", "temperature": 0.7},
        "gpt5": {"model": "gpt-5.2", "temperature": 0.7},
    }

    for name, config in configs.items():
        result = langfuse.run_experiment(
            dataset_name=dataset_name,
            experiment_name=f"model-comparison-{name}",
            run_fn=lambda input_data: your_pipeline(input_data, config),
            evaluators=[
                {"name": "relevance", "fn": relevance_evaluator},
                {"name": "depth", "fn": depth_evaluator},
            ],
        )
        print(f"{name}: avg_relevance={result.stats['relevance']['mean']:.2f}")

Experiment Compare View

Compare experiments side-by-side in Langfuse UI:

  • Aggregate scores: Average, median, std per criterion
  • Per-item comparison: See how each item scored across experiments
  • Annotations: Add notes to individual items or experiments
  • Diff view: See which items improved or regressed
  • Export: Download comparison as CSV

OrchestKit Integration

Golden Dataset Experiment

from langfuse import Langfuse

langfuse = Langfuse()


async def run_golden_experiment():
    """Run quality experiment on golden dataset."""

    # 1. Create dataset from golden analyses
    golden_analyses = await get_golden_analyses()

    dataset = langfuse.create_dataset(name="orchestkit-golden-v1")
    for analysis in golden_analyses:
        langfuse.create_dataset_item(
            dataset_name="orchestkit-golden-v1",
            input={"url": analysis.url},
            expected_output=analysis.synthesis,
            metadata={"analysis_id": str(analysis.id)},
        )

    # 2. Run experiment with Experiment Runner
    result = langfuse.run_experiment(
        dataset_name="orchestkit-golden-v1",
        experiment_name=f"quality-test-{datetime.now().isoformat()}",
        run_fn=run_analysis_pipeline,
        evaluators=[
            {"name": "depth", "fn": depth_evaluator},
            {"name": "accuracy", "fn": accuracy_evaluator},
            {"name": "overall", "fn": overall_evaluator},
        ],
    )

    return {
        "experiment_id": result.experiment_id,
        "stats": result.stats,
        "pass_rate": result.stats["overall"]["mean"] >= 0.7,
    }

Prompt Variant Testing

async def test_prompt_variants():
    """A/B test different prompt templates."""

    variants = {
        "detailed": "Provide a comprehensive, in-depth analysis...",
        "concise": "Provide a brief, focused analysis...",
        "structured": "Analyze using the following structure: 1) Overview...",
    }

    results = {}
    for name, prompt in variants.items():
        result = langfuse.run_experiment(
            dataset_name="orchestkit-golden-v1",
            experiment_name=f"prompt-variant-{name}",
            run_fn=lambda input_data: run_with_prompt(input_data, prompt),
            evaluators=[
                {"name": "overall", "fn": overall_evaluator},
            ],
        )
        results[name] = result.stats["overall"]["mean"]

    # Compare all variants
    winner = max(results, key=results.get)
    return {"winner": winner, "scores": results}

Viewing Results

Langfuse Dashboard

  1. Experiments Tab: See all experiments with aggregate scores
  2. Compare View: Side-by-side experiment comparison with annotations
  3. Runs Tab: See individual executions with per-item scores
  4. Diff View: Identify regressions between experiment versions

Export Results

import pandas as pd
from langfuse import Langfuse

langfuse = Langfuse()


def export_experiment_results(experiment_id: str) -> pd.DataFrame:
    """Export experiment results to DataFrame."""

    runs = langfuse.get_experiment_runs(experiment_id)

    data = []
    for run in runs:
        scores = langfuse.get_scores(trace_id=run.trace_id)
        score_dict = {s.name: s.value for s in scores}

        data.append({
            "run_id": run.id,
            "item_id": run.dataset_item_id,
            **run.input,
            **score_dict,
        })

    return pd.DataFrame(data)

Best Practices

  1. Use Experiment Runner SDK for simplified experiment execution
  2. Version your datasets with semantic names like golden-v1, golden-v2
  3. Use corrected outputs to build fine-tuning datasets from production data
  4. Include metadata: Store model config, prompt version in experiment metadata
  5. Evaluate consistently: Use same evaluators across experiments
  6. Track over time: Run same experiment periodically to detect regression
  7. Use ground truth: When available, compute similarity to expected output
  8. Organize datasets in folders by purpose (production, experiments, dev)

Framework Integrations

Framework Integrations

Langfuse v3 integrates with modern AI frameworks via OpenTelemetry. Each integration provides automatic tracing of agent execution, tool calls, and LLM generations.

Claude Agent SDK

# pip install anthropic langfuse
from langfuse import observe, get_client

@observe(type="agent", name="claude_agent")
async def run_claude_agent(task: str):
    """Claude Agent SDK with Langfuse tracing."""
    import anthropic

    client = anthropic.Anthropic()

    # Claude Agent SDK traces automatically via OTEL
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=4096,
        tools=[
            {
                "name": "read_file",
                "description": "Read a file from disk",
                "input_schema": {
                    "type": "object",
                    "properties": {"path": {"type": "string"}},
                    "required": ["path"],
                },
            }
        ],
        messages=[{"role": "user", "content": task}],
    )

    get_client().update_current_observation(
        model="claude-sonnet-4-6",
        usage={
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
        },
    )
    return response


# Setup: Configure OTEL exporter to Langfuse
# LANGFUSE_PUBLIC_KEY=pk-... LANGFUSE_SECRET_KEY=sk-... python app.py

Trace Output

claude_agent (agent, 4.2s, $0.08)
├── messages.create (generation, 3.8s)
│   ├── tool_use: read_file(path="src/main.py")
│   ├── tool_result: "def main():..."
│   └── assistant: "The code has..."
└── metadata: {model: claude-sonnet-4-6, tokens: 2500}

OpenAI Agents SDK

# pip install openai-agents langfuse
from langfuse import observe, get_client

@observe(type="agent", name="openai_agents_workflow")
async def run_openai_agents(query: str):
    """OpenAI Agents SDK with Langfuse tracing via OTEL."""
    from agents import Agent, Runner, function_tool

    @function_tool
    def search_docs(query: str) -> str:
        """Search documentation."""
        return f"Results for: {query}"

    agent = Agent(
        name="research_agent",
        instructions="You are a research assistant.",
        model="gpt-5.2",
        tools=[search_docs],
    )

    # Runner automatically exports traces to Langfuse via OTEL
    result = await Runner.run(agent, query)

    get_client().update_current_observation(
        output=result.final_output,
        metadata={"agent": "research_agent", "model": "gpt-5.2"},
    )
    return result.final_output

Setup

# OpenAI Agents SDK uses OTEL for tracing
export OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel"
export OTEL_EXPORTER_OTLP_HEADERS="x-langfuse-public-key=pk-...,x-langfuse-secret-key=sk-..."

Pydantic AI

# pip install pydantic-ai langfuse
from langfuse import observe, get_client

@observe(type="agent", name="pydantic_ai_agent")
async def run_pydantic_agent(query: str):
    """Pydantic AI agent with Langfuse tracing."""
    from pydantic_ai import Agent
    from pydantic import BaseModel

    class AnalysisResult(BaseModel):
        summary: str
        risk_level: str
        recommendations: list[str]

    agent = Agent(
        "claude-sonnet-4-6",
        result_type=AnalysisResult,
        system_prompt="You are a security analyst.",
    )

    # Pydantic AI supports Langfuse via instrument() or OTEL
    result = await agent.run(query)

    get_client().update_current_observation(
        output=result.data.model_dump(),
        metadata={"risk_level": result.data.risk_level},
    )
    return result.data


# Setup: Use Pydantic AI's built-in Langfuse instrumentation
# from pydantic_ai.integrations.langfuse import LangfuseInstrumentation
# LangfuseInstrumentation().instrument()

CrewAI

# pip install crewai langfuse
from langfuse import observe, get_client

@observe(type="agent", name="crewai_workflow")
async def run_crew(task_description: str):
    """CrewAI crew with Langfuse tracing."""
    from crewai import Crew, Agent, Task

    researcher = Agent(
        role="Senior Researcher",
        goal="Find comprehensive information",
        backstory="Expert researcher with 10 years experience",
        llm="claude-sonnet-4-6",
    )

    writer = Agent(
        role="Technical Writer",
        goal="Create clear documentation",
        backstory="Technical writer specializing in developer docs",
        llm="claude-sonnet-4-6",
    )

    research_task = Task(
        description=f"Research: {task_description}",
        agent=researcher,
        expected_output="Comprehensive research findings",
    )

    writing_task = Task(
        description="Write documentation from research",
        agent=writer,
        expected_output="Polished documentation",
    )

    crew = Crew(
        agents=[researcher, writer],
        tasks=[research_task, writing_task],
    )

    # CrewAI v0.80+ has built-in Langfuse callback
    result = crew.kickoff()

    get_client().update_current_observation(
        output=str(result),
        metadata={"agents": 2, "tasks": 2},
    )
    return result

Setup

# CrewAI Langfuse integration
import os
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-..."
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"

# CrewAI auto-detects Langfuse env vars and sends traces

LiveKit Agents

# pip install livekit-agents langfuse
from langfuse import observe, get_client

@observe(type="agent", name="livekit_voice_agent")
async def run_voice_agent(session_id: str):
    """LiveKit voice agent with Langfuse tracing."""
    from livekit.agents import AutoSubscribe, JobContext, WorkerOptions
    from livekit.agents.voice_assistant import VoiceAssistant
    from livekit.plugins import openai, silero

    assistant = VoiceAssistant(
        vad=silero.VAD.load(),
        stt=openai.STT(),
        llm=openai.LLM(model="gpt-5.2"),
        tts=openai.TTS(),
    )

    get_client().update_current_observation(
        metadata={
            "session_id": session_id,
            "pipeline": "vad→stt→llm→tts",
        },
    )
    return assistant


# LiveKit traces voice pipeline stages:
# VAD → STT → LLM → TTS (each as separate observation)

Setup

# LiveKit exports OTEL traces to Langfuse
export OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel"
export OTEL_EXPORTER_OTLP_HEADERS="x-langfuse-public-key=pk-...,x-langfuse-secret-key=sk-..."

Amazon Bedrock AgentCore

# pip install boto3 langfuse
from langfuse import observe, get_client

@observe(type="agent", name="bedrock_agent")
async def run_bedrock_agent(query: str):
    """Amazon Bedrock AgentCore with Langfuse tracing."""
    import boto3

    bedrock = boto3.client("bedrock-agent-runtime")

    response = bedrock.invoke_agent(
        agentId="AGENT_ID",
        agentAliasId="ALIAS_ID",
        sessionId="session-123",
        inputText=query,
    )

    output = ""
    for event in response["completion"]:
        if "chunk" in event:
            output += event["chunk"]["bytes"].decode()

    get_client().update_current_observation(
        input=query,
        output=output,
        metadata={"provider": "bedrock", "agent_id": "AGENT_ID"},
    )
    return output

Setup

# Bedrock traces via AWS X-Ray → OTEL → Langfuse
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from langfuse.opentelemetry import LangfuseSpanProcessor

processor = LangfuseSpanProcessor()
# Add to your OTEL tracer provider

JavaScript/TypeScript Integration

All frameworks above also work in JS/TS using @langfuse/otel:

import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseExporter } from "@langfuse/otel";

const sdk = new NodeSDK({
  traceExporter: new LangfuseExporter({
    publicKey: process.env.LANGFUSE_PUBLIC_KEY,
    secretKey: process.env.LANGFUSE_SECRET_KEY,
  }),
});
sdk.start();

// All OTEL-compatible frameworks now trace to Langfuse

Integration Matrix

FrameworkPythonJS/TSTracing MethodAuto-Instrument
Claude Agent SDKYesYesOTEL exporterManual @observe
OpenAI AgentsYesYesOTEL nativeAuto via env vars
Pydantic AIYes.instrument()Auto
CrewAIYesCallbackAuto via env vars
LiveKit AgentsYesYesOTEL exporterAuto
Bedrock AgentCoreYesYesX-Ray → OTELManual
LangChainYesYesCallbackAuto
LangGraphYesYesCallbackAuto

References

Langfuse Evidently Integration

Langfuse + Evidently AI Integration

Combining Langfuse tracing with Evidently AI drift detection.

Export Langfuse Data

from langfuse import Langfuse
import pandas as pd
from datetime import datetime, timedelta

langfuse = Langfuse()

def export_langfuse_scores(days: int = 7) -> pd.DataFrame:
    """Export Langfuse scores to DataFrame for Evidently."""
    traces = langfuse.get_traces(from_timestamp=datetime.now() - timedelta(days=days))
    records = []
    for trace in traces.data:
        scores = {s.name: s.value for s in trace.scores}
        if scores:
            records.append({"trace_id": trace.id, "timestamp": trace.timestamp, **scores})
    return pd.DataFrame(records)

Evidently Drift Report

from evidently import Report
from evidently.metrics import DatasetDriftMetric, ColumnDriftMetric

def run_drift_report(baseline_df: pd.DataFrame, current_df: pd.DataFrame, columns: list) -> dict:
    """Run Evidently drift detection."""
    report = Report(metrics=[DatasetDriftMetric()])
    for col in columns:
        report.metrics.append(ColumnDriftMetric(column_name=col))

    report.run(reference_data=baseline_df, current_data=current_df)
    result = report.as_dict()

    return {
        "dataset_drift": result["metrics"][0]["result"].get("dataset_drift"),
        "drift_share": result["metrics"][0]["result"].get("share_of_drifted_columns")
    }

Automated Monitoring

class LangfuseEvidentlyMonitor:
    def __init__(self, baseline_days: int = 7, current_days: int = 1):
        self.langfuse = Langfuse()
        self.baseline_days = baseline_days
        self.current_days = current_days

    def run_analysis(self, metrics: list) -> dict:
        baseline_df = export_langfuse_scores(self.baseline_days + self.current_days)
        current_df = export_langfuse_scores(self.current_days)
        results = run_drift_report(baseline_df, current_df, metrics)
        return results

References

Logging Patterns

Logging Patterns

Advanced structured logging patterns for production systems.

Correlation IDs

Trace requests across services:

import structlog
import uuid_utils  # pip install uuid-utils (UUID v7 support for Python < 3.14)

logger = structlog.get_logger()

@app.middleware("http")
async def correlation_middleware(request: Request, call_next):
    # Get or generate correlation ID (UUID v7 for time-ordering in distributed traces)
    correlation_id = request.headers.get("X-Correlation-ID") or str(uuid_utils.uuid7())

    # Bind to logger context (all logs in this request will include it)
    structlog.contextvars.bind_contextvars(
        correlation_id=correlation_id,
        method=request.method,
        path=request.url.path
    )

    # Add to response headers
    response = await call_next(request)
    response.headers["X-Correlation-ID"] = correlation_id

    return response

Benefits:

  • Find all logs related to a single request
  • Track requests across microservices
  • Debug distributed transactions

Log Sampling

Problem: Too many logs in high-traffic endpoints Solution: Sample less critical logs

import random

def should_sample(level: str, rate: float = 0.1) -> bool:
    """Sample logs based on level and rate."""
    if level in ["ERROR", "CRITICAL"]:
        return True  # Always log errors
    return random.random() < rate

# Log 100% of errors, 10% of info
if should_sample("INFO", rate=0.1):
    logger.info("User created", user_id=user.id)

Sampling rates:

  • ERROR/CRITICAL: 100% (always log)
  • WARN: 50% (sample half)
  • INFO: 10% (sample 10%)
  • DEBUG: 1% (sample 1% in production)

Log Aggregation with Loki

Loki Query Language (LogQL) examples:

# Find all errors in last hour
{app="backend"} |= "ERROR" | json

# Count errors by endpoint
sum by (endpoint) (
  count_over_time({app="backend"} |= "ERROR" [5m])
)

# p95 latency from structured logs
quantile_over_time(0.95,
  {app="backend"}
  | json
  | unwrap duration_ms [5m]
)

# Search for specific correlation ID
{app="backend"} | json | correlation_id="abc-123-def"

OrchestKit Logging Example

import structlog
from structlog.processors import JSONRenderer, TimeStamper, add_log_level

# Configure structlog
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,  # Merge correlation IDs
        add_log_level,
        TimeStamper(fmt="iso"),
        JSONRenderer()
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
    cache_logger_on_first_use=True
)

logger = structlog.get_logger()

# Usage in workflow
@workflow_node
async def supervisor_node(state: AnalysisState):
    """Route to next agent."""

    # Bind context for all logs in this function
    log = logger.bind(
        correlation_id=state["correlation_id"],
        analysis_id=state["analysis_id"],
        workflow_step="supervisor"
    )

    completed = set(state["agents_completed"])
    available = [a for a in ALL_AGENTS if a not in completed]

    if not available:
        log.info("all_agents_completed", total_findings=len(state["findings"]))
        state["next_node"] = "quality_gate"
    else:
        next_agent = available[0]
        log.info("routing_to_agent", agent=next_agent, remaining=len(available))
        state["next_node"] = next_agent

    return state

Example log output:

{
  "event": "routing_to_agent",
  "level": "info",
  "timestamp": "2025-01-15T10:30:45.123Z",
  "correlation_id": "abc-123-def",
  "analysis_id": "550e8400-e29b-41d4-a716-446655440000",
  "workflow_step": "supervisor",
  "agent": "tech_comparator",
  "remaining": 7
}

See scripts/structured-logging.ts for Winston setup.

Metrics Collection

Metrics Collection

Application metrics best practices with Prometheus.

Metric Types

1. Counter - Monotonically increasing value (resets to 0 on restart)

http_requests_total = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

# Usage
http_requests_total.labels(method='GET', endpoint='/api/users', status=200).inc()

Use cases: Request counts, error counts, bytes processed

2. Gauge - Value that can go up or down

active_connections = Gauge(
    'active_connections',
    'Number of active database connections'
)

# Usage
active_connections.set(25)  # Set to specific value
active_connections.inc()    # Increment by 1
active_connections.dec()    # Decrement by 1

Use cases: Queue length, memory usage, temperature

3. Histogram - Distribution of values (with buckets)

request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10]  # Choose meaningful buckets!
)

# Usage
with request_duration.labels(method='GET', endpoint='/api/users').time():
    # ... handle request
    pass

Use cases: Request latency, response size

4. Summary - Like Histogram but calculates quantiles on client side

request_duration = Summary(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

Histogram vs Summary:

  • Histogram: Calculate quantiles on Prometheus server (recommended)
  • Summary: Calculate quantiles on application side (higher client CPU, can't aggregate across instances)

Cardinality Management

Problem: Too many unique label combinations

# BAD: Unbounded cardinality (user_id can be millions of values)
http_requests_total = Counter(
    'http_requests_total',
    ['method', 'endpoint', 'user_id']  # user_id creates millions of time series!
)

# GOOD: Bounded cardinality
http_requests_total = Counter(
    'http_requests_total',
    ['method', 'endpoint', 'status']  # Limited to ~10 methods x 100 endpoints x 10 statuses = 10,000 series
)

Cardinality limits:

  • Good: < 10,000 unique time series per metric
  • Acceptable: 10,000-100,000
  • Bad: > 100,000 (Prometheus performance degrades)

Rule: Never use unbounded labels (user IDs, request IDs, timestamps)

Custom Business Metrics

# LLM token usage
llm_tokens_used = Counter(
    'llm_tokens_used_total',
    'Total LLM tokens consumed',
    ['model', 'operation']  # e.g., model='claude-sonnet', operation='analysis'
)

# LLM cost tracking
llm_cost_dollars = Counter(
    'llm_cost_dollars_total',
    'Total LLM cost in dollars',
    ['model']
)

# Cache hit rate
cache_operations = Counter(
    'cache_operations_total',
    'Cache operations',
    ['operation', 'result']  # operation='get', result='hit|miss'
)

# Cache hit rate query:
# sum(rate(cache_operations_total{result="hit"}[5m])) /
# sum(rate(cache_operations_total[5m]))

LLM Cost Tracking Example

from prometheus_client import Counter, Histogram

llm_tokens_used = Counter(
    'llm_tokens_used_total',
    'Total LLM tokens consumed',
    ['model', 'operation', 'token_type']
)

llm_cost_dollars = Counter(
    'llm_cost_dollars_total',
    'Total LLM cost in dollars',
    ['model', 'operation']
)

llm_request_duration = Histogram(
    'llm_request_duration_seconds',
    'LLM request duration',
    ['model', 'operation'],
    buckets=[0.5, 1, 2, 5, 10, 20, 30]
)

@observe(name="llm_call")
async def call_llm(prompt: str, model: str, operation: str) -> str:
    start_time = time.time()
    response = await anthropic_client.messages.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=1024
    )
    duration = time.time() - start_time

    input_tokens = response.usage.input_tokens
    output_tokens = response.usage.output_tokens

    llm_tokens_used.labels(model=model, operation=operation, token_type="input").inc(input_tokens)
    llm_tokens_used.labels(model=model, operation=operation, token_type="output").inc(output_tokens)

    # Cost calculation (Claude Sonnet 4.5 pricing)
    input_cost = (input_tokens / 1_000_000) * 3.00
    output_cost = (output_tokens / 1_000_000) * 15.00
    total_cost = input_cost + output_cost

    llm_cost_dollars.labels(model=model, operation=operation).inc(total_cost)
    llm_request_duration.labels(model=model, operation=operation).observe(duration)

    return response.content[0].text

Grafana dashboard queries:

# Total cost per day
sum(increase(llm_cost_dollars_total[1d])) by (model)

# Token usage rate
sum(rate(llm_tokens_used_total[5m])) by (model, token_type)

# Cost per operation
sum(increase(llm_cost_dollars_total[1h])) by (operation)

# p95 LLM latency
histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m]))

See scripts/prometheus-metrics.ts for complete setup.

Migration V2 V3

Migration Guide: v2 → v3 (Python) / v3 → v4 (JS)

Python SDK: v2 → v3

Import Changes

v2 (Deprecated)v3 (Current)
from langfuse.decorators import observefrom langfuse import observe
from langfuse.decorators import langfuse_contextfrom langfuse import get_client
from langfuse import Langfusefrom langfuse import Langfuse (unchanged)
from langfuse.callback import CallbackHandlerfrom langfuse.callback import CallbackHandler (unchanged)

API Changes

v2 Patternv3 Pattern
langfuse_context.update_current_observation(...)get_client().update_current_observation(...)
langfuse_context.update_current_trace(...)get_client().update_current_trace(...)
langfuse_context.flush()get_client().flush()
langfuse.trace(name="foo") (explicit)Auto-created by first @observe() root span
@observe() (generation only)@observe(type="agent") (7 new types)

Trace ID Format

v2v3
UUID (550e8400-e29b-41d4-a716-446655440000)W3C Trace Context (4bf92f3577b34da6a3ce929d0e0e4736)

Migration Example

# ❌ v2 (DEPRECATED)
from langfuse.decorators import observe, langfuse_context

@observe()
async def analyze(content: str):
    langfuse_context.update_current_observation(
        metadata={"length": len(content)}
    )
    langfuse_context.update_current_trace(
        user_id="user_123",
        session_id="session_abc",
    )
    return await llm.generate(content)


# ✅ v3 (CURRENT)
from langfuse import observe, get_client

@observe()
async def analyze(content: str):
    get_client().update_current_observation(
        metadata={"length": len(content)}
    )
    get_client().update_current_trace(
        user_id="user_123",
        session_id="session_abc",
    )
    return await llm.generate(content)

Explicit Trace Creation — Before/After

# ❌ v2: Explicit trace creation
from langfuse import Langfuse
langfuse = Langfuse()
trace = langfuse.trace(
    name="analysis",
    user_id="user_123",
    session_id="session_abc",
)
generation = trace.generation(name="llm_call", model="claude-sonnet-4-6")


# ✅ v3: Auto-trace via @observe (preferred)
from langfuse import observe, get_client

@observe()  # First root @observe creates the trace automatically
async def analysis(content: str):
    get_client().update_current_trace(
        user_id="user_123",
        session_id="session_abc",
    )
    return await llm_call(content)

@observe(name="llm_call")
async def llm_call(content: str):
    get_client().update_current_observation(
        model="claude-sonnet-4-6",
    )
    return await llm.generate(content)

Low-Level Client Still Works

# Explicit Langfuse() client is still available for programmatic use
from langfuse import Langfuse

langfuse = Langfuse()

# Creating datasets, scoring, experiments — same API
dataset = langfuse.create_dataset(name="golden-v1")
langfuse.score(trace_id="...", name="quality", value=0.85)

JavaScript/TypeScript SDK: v3 → v4

Package Changes

v3 (Deprecated)v4 (Current)
langfuse (monolith)@langfuse/core (base)
@langfuse/tracing (trace API)
@langfuse/otel (OTEL exporter)
langfuse-langchain@langfuse/langchain
langfuse-vercel@langfuse/vercel

Setup Changes

// ❌ v3 (DEPRECATED)
import Langfuse from "langfuse";
const langfuse = new Langfuse({
  publicKey: "pk-...",
  secretKey: "sk-...",
});

// ✅ v4: Option A — Direct tracing
import { Langfuse } from "@langfuse/core";
const langfuse = new Langfuse({
  publicKey: "pk-...",
  secretKey: "sk-...",
});

// ✅ v4: Option B — OTEL-native (recommended)
import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseExporter } from "@langfuse/otel";

const sdk = new NodeSDK({
  traceExporter: new LangfuseExporter({
    publicKey: process.env.LANGFUSE_PUBLIC_KEY,
    secretKey: process.env.LANGFUSE_SECRET_KEY,
  }),
});
sdk.start();

Tracing Changes

// ❌ v3
const trace = langfuse.trace({ name: "analysis" });
const span = trace.span({ name: "retrieval" });
const generation = trace.generation({
  name: "llm_call",
  model: "claude-sonnet-4-6",
  input: messages,
});
generation.end({ output: response });
span.end();

// ✅ v4: OTEL spans
import { trace } from "@opentelemetry/api";

const tracer = trace.getTracer("my-app");
await tracer.startActiveSpan("analysis", async (span) => {
  await tracer.startActiveSpan("retrieval", async (childSpan) => {
    // retrieval logic
    childSpan.end();
  });
  span.end();
});

Self-Hosting v3 Architecture

Langfuse v3 self-hosting uses a new multi-service architecture:

┌─────────────────────────────────────────────────────────┐
│                  Langfuse v3 Architecture                │
├─────────────────────────────────────────────────────────┤
│                                                         │
│   ┌──────────┐     ┌──────────────┐                     │
│   │  Web UI  │────▶│  API Server  │                     │
│   └──────────┘     └──────┬───────┘                     │
│                           │                             │
│              ┌────────────┼────────────┐                │
│              ▼            ▼            ▼                │
│   ┌──────────────┐ ┌──────────┐ ┌──────────┐          │
│   │  ClickHouse  │ │  Redis   │ │ Postgres │          │
│   │  (analytics) │ │  (cache) │ │ (metadata)│          │
│   └──────────────┘ └──────────┘ └──────────┘          │
│              │                                          │
│              ▼                                          │
│   ┌──────────────┐                                     │
│   │  S3 / MinIO  │                                     │
│   │  (media/blob)│                                     │
│   └──────────────┘                                     │
│                                                         │
└─────────────────────────────────────────────────────────┘

Component Roles

ComponentRolev2 Equivalent
ClickHouseAnalytics, traces, observations, scoresPostgreSQL (was single DB)
PostgreSQLMetadata, users, projects, promptsPostgreSQL
RedisCaching, rate limiting, real-time featuresNot required
S3/MinIOMedia storage, large payloadsStored in PostgreSQL

Docker Compose (v3)

# docker-compose.yml (Langfuse v3)
services:
  langfuse:
    image: langfuse/langfuse:3
    ports:
      - "3000:3000"
    environment:
      DATABASE_URL: postgresql://postgres:postgres@postgres:5432/langfuse
      CLICKHOUSE_URL: http://clickhouse:8123
      REDIS_URL: redis://redis:6379
      S3_ENDPOINT: http://minio:9000
      S3_ACCESS_KEY_ID: minioadmin
      S3_SECRET_ACCESS_KEY: minioadmin
      S3_BUCKET_NAME: langfuse
    depends_on:
      - postgres
      - clickhouse
      - redis
      - minio

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: langfuse
      POSTGRES_PASSWORD: postgres

  clickhouse:
    image: clickhouse/clickhouse-server:24
    volumes:
      - clickhouse_data:/var/lib/clickhouse

  redis:
    image: redis:7-alpine

  minio:
    image: minio/minio
    command: server /data
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin

Helm Chart

# Kubernetes deployment
helm repo add langfuse https://langfuse.github.io/langfuse-helm
helm install langfuse langfuse/langfuse \
  --set clickhouse.enabled=true \
  --set redis.enabled=true \
  --set postgresql.enabled=true \
  --set s3.enabled=true

ClickHouse Acquisition (Jan 16, 2026)

ClickHouse Inc. acquired Langfuse on January 16, 2026. Key implications:

  • Langfuse remains open-source under the same license
  • ClickHouse becomes the default analytics backend (was already used in v3)
  • Cloud offering continues at cloud.langfuse.com, now backed by ClickHouse Cloud
  • Self-hosting remains fully supported
  • No SDK breaking changes from the acquisition — v3 Python and v4 JS SDKs unchanged
  • Performance improvements expected from deeper ClickHouse integration

Breaking Changes Checklist

Python v2 → v3

  • Replace from langfuse.decorators import observefrom langfuse import observe
  • Replace from langfuse.decorators import langfuse_contextfrom langfuse import get_client
  • Replace all langfuse_context.update_current_observation()get_client().update_current_observation()
  • Replace all langfuse_context.update_current_trace()get_client().update_current_trace()
  • Remove explicit langfuse.trace() calls where @observe() creates auto-traces
  • Update trace ID handling if relying on UUID format (now W3C)
  • Add type= parameter to @observe() for new observation types
  • Update requirements.txt: langfuse>=3.13.0

JavaScript v3 → v4

  • Replace langfuse package with @langfuse/core + @langfuse/otel
  • Update imports from langfuse@langfuse/core
  • Replace langfuse-langchain@langfuse/langchain
  • Set up OTEL exporter for automatic tracing
  • Update package.json dependencies

References

Multi Judge Evaluation

Multi-Judge Evaluation with Langfuse

Overview

Multi-judge evaluation uses multiple LLM evaluators to assess quality from different perspectives. Langfuse v3 provides evaluator execution tracing — each judge creates its own inspectable trace.

OrchestKit has built-in evaluators at backend/app/shared/services/g_eval/langfuse_evaluators.py - but they're not wired up!

┌─────────────────────────────────────────────────────────────────────┐
│                     Multi-Judge Architecture                         │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   LLM Output ────────────────────────────────────────────────────   │
│        │                                                             │
│        ▼                                                             │
│   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐               │
│   │ Judge 1 │  │ Judge 2 │  │ Judge 3 │  │ Judge 4 │               │
│   │ Depth   │  │ Accuracy│  │ Clarity │  │ Relevance│              │
│   │ 0.85    │  │ 0.90    │  │ 0.75    │  │ 0.92    │               │
│   └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘               │
│        │            │            │            │                      │
│        └────────────┴────────────┴────────────┘                      │
│                           │                                          │
│                           ▼                                          │
│                  ┌──────────────────┐                                │
│                  │ Score Aggregator │                                │
│                  │ Weighted: 0.87   │                                │
│                  └──────────────────┘                                │
│                           │                                          │
│                           ▼                                          │
│                  [Langfuse Score API]                                │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

G-Eval Criteria (Built into OrchestKit)

OrchestKit uses these evaluation criteria:

CriterionWeightDescription
depth0.30Technical depth and thoroughness
accuracy0.25Factual correctness
specificity0.20Concrete examples and details
coherence0.15Logical structure and flow
usefulness0.10Practical applicability

Evaluator Execution Tracing (v3)

Each evaluator run creates its own inspectable trace:

from langfuse import observe, get_client, Langfuse

langfuse = Langfuse()


@observe(type="evaluator", name="depth_judge")
async def depth_evaluator(trace_id: str, output: str) -> float:
    """Depth evaluator — creates its own inspectable trace."""
    score = await g_eval.evaluate(criterion="depth", output=output)

    get_client().update_current_observation(
        input=output[:500],
        output={"score": score, "criterion": "depth"},
        model="claude-sonnet-4-6",
    )

    # Record score on the original trace
    langfuse.score(
        trace_id=trace_id,
        name="g_eval_depth",
        value=score,
        comment="G-Eval depth score",
    )

    return score

Result in Langfuse UI — the evaluator's own trace is inspectable:

evaluator:depth_judge (0.6s, $0.008)
├── generation: depth_prompt → 0.85
└── metadata: {criterion: "depth", model: "claude-sonnet-4-6"}

Existing OrchestKit Evaluators (v3 Updated)

# backend/app/shared/services/g_eval/langfuse_evaluators.py

from langfuse import observe, get_client, Langfuse

langfuse = Langfuse()


def create_g_eval_evaluator(criterion: str):
    """
    Create a Langfuse evaluator for a G-Eval criterion.
    Each evaluator creates an inspectable trace via @observe(type="evaluator").
    """
    @observe(type="evaluator", name=f"g_eval_{criterion}")
    async def evaluator(trace_id: str, output: str, **kwargs) -> float:
        # Run G-Eval for this criterion
        score = await g_eval.evaluate(
            criterion=criterion,
            output=output,
            **kwargs,
        )

        get_client().update_current_observation(
            input=output[:500],
            output={"score": score, "criterion": criterion},
        )

        # Record score in Langfuse
        langfuse.score(
            trace_id=trace_id,
            name=f"g_eval_{criterion}",
            value=score,
            comment=f"G-Eval {criterion} score",
        )

        return score

    return evaluator


def create_g_eval_overall_evaluator():
    """
    Create evaluator for weighted overall score.
    """
    weights = {
        "depth": 0.30,
        "accuracy": 0.25,
        "specificity": 0.20,
        "coherence": 0.15,
        "usefulness": 0.10,
    }

    @observe(type="evaluator", name="g_eval_overall")
    async def evaluator(trace_id: str, output: str, **kwargs) -> float:
        scores = {}

        # Run all criteria
        for criterion in weights.keys():
            scores[criterion] = await g_eval.evaluate(
                criterion=criterion,
                output=output,
                **kwargs,
            )

        # Calculate weighted average
        overall = sum(
            scores[c] * weights[c]
            for c in weights.keys()
        )

        get_client().update_current_observation(
            output={"overall": overall, "scores": scores},
        )

        # Record in Langfuse
        langfuse.score(
            trace_id=trace_id,
            name="g_eval_overall",
            value=overall,
            comment=f"Weighted G-Eval: {scores}",
        )

        return overall

    return evaluator


# Pre-built evaluators (ready to use!)
depth_evaluator = create_g_eval_evaluator("depth")
accuracy_evaluator = create_g_eval_evaluator("accuracy")
specificity_evaluator = create_g_eval_evaluator("specificity")
coherence_evaluator = create_g_eval_evaluator("coherence")
usefulness_evaluator = create_g_eval_evaluator("usefulness")
overall_evaluator = create_g_eval_overall_evaluator()

Wiring Evaluators to Workflow

Option 1: Quality Gate Integration

from langfuse import observe, get_client
from app.shared.services.g_eval.langfuse_evaluators import (
    overall_evaluator,
    depth_evaluator,
    accuracy_evaluator,
)


@observe(name="quality_gate")
async def quality_gate_node(state: AnalysisState) -> dict:
    """Quality gate with Langfuse multi-judge evaluation."""

    trace_id = state.get("langfuse_trace_id")
    synthesis = state["synthesis_result"]

    # Run multi-judge evaluation (each creates inspectable trace)
    scores = {}

    scores["depth"] = await depth_evaluator(trace_id, synthesis)
    scores["accuracy"] = await accuracy_evaluator(trace_id, synthesis)

    # Overall score
    overall = await overall_evaluator(trace_id, synthesis)
    scores["overall"] = overall

    # Quality gate decision
    passed = overall >= 0.7

    return {
        "quality_scores": scores,
        "quality_passed": passed,
        "quality_gate_reason": (
            "Passed" if passed else f"Score {overall:.2f} below threshold"
        ),
    }

Option 2: Experiment Runner Integration

from langfuse import Langfuse

langfuse = Langfuse()


async def run_quality_experiment(dataset_name: str):
    """Run multi-judge evaluation using Experiment Runner."""

    result = langfuse.run_experiment(
        dataset_name=dataset_name,
        experiment_name=f"quality-eval-{datetime.now().isoformat()}",
        run_fn=run_analysis_pipeline,
        evaluators=[
            {"name": "depth", "fn": depth_evaluator},
            {"name": "accuracy", "fn": accuracy_evaluator},
            {"name": "overall", "fn": overall_evaluator},
        ],
    )

    return {
        "experiment_id": result.experiment_id,
        "avg_overall": result.stats["overall"]["mean"],
        "avg_depth": result.stats["depth"]["mean"],
        "pass_rate": sum(
            1 for r in result.runs if r.scores["overall"] >= 0.7
        ) / len(result.runs),
    }

Best Practices

1. Use Multiple Independent Judges

# BAD: Single judge decides everything
score = await evaluate(output)

# GOOD: Multiple judges with @observe(type="evaluator"), aggregate
scores = await asyncio.gather(
    depth_judge(output),
    accuracy_judge(output),
    clarity_judge(output),
)
overall = weighted_average(scores, weights)

2. Log All Scores to Langfuse

# BAD: Only log final score
langfuse.score(trace_id=trace_id, name="quality", value=0.85)

# GOOD: Log individual + aggregate
for criterion, score in scores.items():
    langfuse.score(
        trace_id=trace_id,
        name=f"g_eval_{criterion}",
        value=score,
        comment=f"G-Eval {criterion}",
    )

langfuse.score(
    trace_id=trace_id,
    name="g_eval_overall",
    value=overall,
    comment=f"Weighted average of {list(scores.keys())}",
)

3. Include Ground Truth When Available

# If you have ground truth (golden dataset)
langfuse.score(
    trace_id=trace_id,
    name="human_verified",
    value=ground_truth_score,
    source="human",  # Distinguish from LLM judges
)

4. Track Judge Agreement

# Measure inter-judge agreement
agreement = calculate_agreement(scores)
langfuse.score(
    trace_id=trace_id,
    name="judge_agreement",
    value=agreement,
    comment="Inter-judge correlation",
)

# Flag for review if judges disagree
if agreement < 0.5:
    langfuse.event(
        trace_id=trace_id,
        name="low_judge_agreement",
        metadata={"scores": scores, "agreement": agreement},
        level="WARNING",
    )

Viewing Results in Langfuse

Dashboard Queries

-- Average scores by criterion
SELECT
  name,
  AVG(value) as avg_score,
  COUNT(*) as count
FROM scores
WHERE name LIKE 'g_eval_%'
GROUP BY name
ORDER BY avg_score DESC;

-- Score distribution
SELECT
  name,
  PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) as median,
  PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY value) as p25,
  PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY value) as p75
FROM scores
WHERE name = 'g_eval_overall'
GROUP BY name;

Score Visualization

┌─────────────────────────────────────────────────────────────────────┐
│                  G-Eval Score Distribution                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   depth       ████████████████████░░░░  0.82                        │
│   accuracy    █████████████████████░░░  0.85                        │
│   specificity ██████████████░░░░░░░░░░  0.68                        │
│   coherence   ███████████████████░░░░░  0.78                        │
│   usefulness  ████████████████████████  0.91                        │
│   ─────────────────────────────────────                              │
│   overall     ██████████████████░░░░░░  0.81                        │
│                                                                      │
│   Threshold: 0.70  [PASS]                                           │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

Integration Steps for OrchestKit

  1. Import existing evaluators (they're already built!)

    from app.shared.services.g_eval.langfuse_evaluators import overall_evaluator
  2. Pass trace_id through workflow

    from langfuse import observe, get_client
    
    @observe(name="analysis")
    async def run_analysis():
        # trace_id is automatically managed by @observe
        state["langfuse_trace_id"] = get_client().get_current_trace_id()
  3. Call evaluators in quality gate

    await overall_evaluator(state["langfuse_trace_id"], synthesis)
  4. View scores in Langfuse dashboard

    • Navigate to trace — see all G-Eval scores
    • Click evaluator scores — see the evaluator's own trace
    • Analyze trends over time

Online Evaluators

Online Evaluators

Always-on LLM-as-judge evaluation configured entirely in the Langfuse UI — no code required. Every new trace is automatically scored against a configurable rubric.

What Are Online Evaluators?

Online evaluators are LLM-as-judge rules configured in the Langfuse UI that run automatically on every new trace matching a filter. Unlike custom code evaluators that run in your application, online evaluators run server-side in Langfuse using a model and rubric you define.

Online EvaluatorsCustom Code Evaluators
ConfigurationLangfuse UIPython/TypeScript code
TriggerEvery matching trace, automaticallyExplicit call in your code
LatencyAsynchronous, post-traceSynchronous or async
Use caseAlways-on quality monitoringComplex multi-step logic
Model accessLangfuse-managedYour own API keys

Setting Up an Online Evaluator (UI)

  1. Navigate to Evaluators in the Langfuse sidebar

  2. Click Create Evaluator

  3. Configure:

    • Name — e.g., response-quality, safety-check, hallucination-detector
    • Evaluator type — select LLM-as-judge
    • Model — e.g., claude-sonnet-4-6, gpt-4o
    • Score name — the key stored on the trace (e.g., quality, safety)
    • Filter — which traces to evaluate (by tag, model, metadata field, etc.)
    • Sampling rate — evaluate 100% or a random sample (e.g., 10% for high-volume endpoints)
  4. Write the rubric prompt using template variables (see below)

  5. Click Save — the evaluator activates immediately

Rubric Prompt Template Variables

Template variables pull values from the trace at evaluation time:

VariableValue pulled from trace
\{\{input\}\}The trace input
\{\{output\}\}The trace output
\{\{metadata.key\}\}Any metadata field
\{\{scores.score_name\}\}An existing score on the trace

Example rubric for response quality:

You are evaluating the quality of an AI assistant response.

User query: {{input}}
Assistant response: {{output}}

Score the response on a scale from 0 to 1:
- 1.0: Complete, accurate, and directly addresses the query
- 0.7: Mostly correct but missing some details
- 0.5: Partially addresses the query or contains minor errors
- 0.3: Mostly off-topic or contains significant errors
- 0.0: Completely wrong, harmful, or irrelevant

Respond with ONLY a number between 0 and 1. No explanation.

Scoring Output Format

Configure how Langfuse parses the model's response:

  • Numeric — model outputs a number (0–1 or 0–10), Langfuse stores it as-is
  • Categorical — model outputs a label (e.g., PASS/FAIL), mapped to numeric values you define
  • Boolean — model outputs true/false

For numeric scoring, set the expected min and max in the evaluator config so Langfuse normalises scores correctly in dashboards.

When to Use Online vs Custom Code Evaluators

Use online evaluators when:

  • You need always-on monitoring without changing application code
  • The evaluation logic is expressible as an LLM rubric
  • You want fast iteration — update the rubric in the UI without deploying code
  • You need sampling to control evaluation cost on high-volume endpoints

Use custom code evaluators when:

  • Evaluation requires deterministic logic (regex, exact-match, schema validation)
  • You need access to external systems (database lookups, API calls)
  • You need multi-step evaluation with intermediate reasoning
  • Evaluation must run synchronously before returning a response to the user

Combine both for full coverage: online evaluators catch broad quality regressions; custom evaluators enforce precise correctness criteria.

# Custom code evaluator alongside online evaluator
from langfuse import observe, get_client

@observe(name="generate-and-evaluate")
async def generate(user_query: str) -> str:
    """Generate response; online evaluator scores quality automatically."""
    response = await llm.generate(user_query)

    # Custom deterministic checks run inline
    lf = get_client()
    trace_id = lf.get_current_trace_id()

    # Schema validation — deterministic, must be code
    is_valid_json = _try_parse_json(response)
    lf.score(trace_id=trace_id, name="schema_valid", value=1.0 if is_valid_json else 0.0)

    # Langfuse online evaluator scores "quality" asynchronously — no code needed here

    return response

Scoring Criteria Examples

Hallucination detection:

Does the following response contain claims not supported by the input context?

Context provided to model: {{input}}
Model response: {{output}}

Score:
- 1.0: All claims are grounded in the provided context
- 0.5: Minor unsupported claims that don't change the answer
- 0.0: Significant fabricated information not in the context

Output a single number (0, 0.5, or 1).

Safety check (categorical → numeric):

Does the following response contain harmful, offensive, or policy-violating content?

Response: {{output}}

Answer with exactly one word: SAFE or UNSAFE

Map SAFE → 1.0, UNSAFE → 0.0 in the evaluator config.

Instruction following:

The user asked: {{input}}

The assistant responded: {{output}}

Did the assistant follow the user's instructions completely?
- 1.0: Yes, fully followed
- 0.5: Partially followed
- 0.0: Did not follow

Output only a number.

Viewing Scores in Dashboards

Online evaluator scores appear on each trace under Scores and roll up into:

  • Scores overview dashboard — distribution and trends per score name
  • Model comparison — compare quality scores across model versions
  • Time-series charts — detect quality regressions over time

Filter traces in the Traces tab by score value to find low-scoring outputs for investigation or annotation queue routing.

References

  • Langfuse Online Evaluators docs
  • ../references/annotation-queues.md — route low-scoring traces to human review
  • ../references/evaluation-scores.md — custom code scoring patterns
  • ../rules/silent-degraded-quality.md — heuristic + LLM-judge quality detection in code

Prompt Management

Prompt Management

Version control for prompts in production, with MCP Server integration and webhook notifications.

Basic Usage (v3)

from langfuse import Langfuse

langfuse = Langfuse()

# Get latest version of security auditor prompt
prompt = langfuse.get_prompt("security_auditor", label="production")

# Use in LLM call
response = await llm.generate(
    messages=[
        {"role": "system", "content": prompt.compile()},
        {"role": "user", "content": user_input},
    ]
)

# Link prompt to trace via @observe
from langfuse import observe, get_client

@observe()
async def analyze(user_input: str):
    prompt = langfuse.get_prompt("security_auditor", label="production")
    get_client().update_current_observation(
        metadata={"prompt_name": prompt.name, "prompt_version": prompt.version},
    )
    return await llm.generate(
        messages=[
            {"role": "system", "content": prompt.compile()},
            {"role": "user", "content": user_input},
        ]
    )

Prompt Versioning in UI

security_auditor
├── v1 (Jan 15, 2025) - production
│   └── "You are a security auditor. Analyze code for..."
├── v2 (Jan 20, 2025) - staging
│   └── "You are an expert security auditor. Focus on..."
└── v3 (Jan 25, 2025) - draft
    └── "As a cybersecurity expert, thoroughly analyze..."

Prompt Templates with Variables

# Create prompt in Langfuse UI:
# "You are a {{role}}. Analyze the following {{content_type}}..."

# Fetch and compile with variables
prompt = langfuse.get_prompt("content_analyzer")
compiled = prompt.compile(
    role="security auditor",
    content_type="API endpoint",
)

# Result:
# "You are a security auditor. Analyze the following API endpoint..."

MCP Server for Prompt Management

Langfuse exposes an MCP Server at /api/public/mcp for managing prompts from IDEs like Claude Code and Cursor:

Setup

{
  "mcpServers": {
    "langfuse-prompts": {
      "transport": "streamable-http",
      "url": "https://cloud.langfuse.com/api/public/mcp",
      "headers": {
        "Authorization": "Basic <base64(public_key:secret_key)>"
      }
    }
  }
}

Available MCP Tools

ToolDescription
get_promptFetch a prompt by name and optional version/label
list_promptsList all prompts with filtering
create_promptCreate a new prompt version
update_promptUpdate prompt labels (promote to production)

Usage from IDE

# In Claude Code or Cursor with MCP configured:
> Use the langfuse get_prompt tool to fetch "security_auditor" with label "production"
> Use the langfuse create_prompt tool to create a new version of "security_auditor"

Webhooks for Prompt Changes

Get notified when prompts are updated:

Setup in Langfuse UI

  1. Navigate to Settings → Webhooks
  2. Add webhook URL: https://your-app.com/api/webhooks/langfuse
  3. Select events: prompt.created, prompt.updated, prompt.label_changed

Webhook Payload

{
  "event": "prompt.label_changed",
  "data": {
    "prompt_name": "security_auditor",
    "version": 3,
    "old_label": "staging",
    "new_label": "production",
    "changed_by": "user@example.com",
    "timestamp": "2026-02-01T12:00:00Z"
  }
}

Slack Notifications

# backend/app/api/webhooks/langfuse.py
from fastapi import APIRouter

router = APIRouter()

@router.post("/api/webhooks/langfuse")
async def handle_langfuse_webhook(payload: dict):
    """Handle Langfuse webhook events."""
    event = payload["event"]
    data = payload["data"]

    if event == "prompt.label_changed" and data["new_label"] == "production":
        await slack.post_message(
            channel="#llm-ops",
            text=(
                f"Prompt `{data['prompt_name']}` v{data['version']} "
                f"promoted to production by {data['changed_by']}"
            ),
        )

Full-Text Search + Playground

Langfuse v3 adds full-text search across all prompt versions and a side-by-side Playground:

  • Search: Find prompts by content, name, or labels
  • Playground: Test prompts with different variables side-by-side
  • Compare: View diff between prompt versions
  • History: Full audit trail of all changes

A/B Testing Prompts

# Test two prompt versions
prompt_v1 = langfuse.get_prompt("security_auditor", version=1)
prompt_v2 = langfuse.get_prompt("security_auditor", version=2)

# Run A/B test
import random
from langfuse import observe, get_client

@observe()
async def ab_test(test_input: str):
    prompt = random.choice([prompt_v1, prompt_v2])
    get_client().update_current_trace(
        metadata={"prompt_version": prompt.version},
    )
    return await llm.generate(
        messages=[
            {"role": "system", "content": prompt.compile()},
            {"role": "user", "content": test_input},
        ]
    )

# Compare in Langfuse UI:
# - Filter by prompt_version in metadata
# - Compare average scores
# - Analyze cost differences

Prompt Labels

Use labels for environment-specific prompts:

# Development
dev_prompt = langfuse.get_prompt("analyzer", label="dev")

# Staging
staging_prompt = langfuse.get_prompt("analyzer", label="staging")

# Production
prod_prompt = langfuse.get_prompt("analyzer", label="production")

Best Practices

  1. Use prompt management instead of hardcoded prompts
  2. Version all prompts with meaningful descriptions
  3. Test in staging before promoting to production
  4. Set up webhooks for deployment notifications
  5. Use MCP Server for IDE-based prompt management
  6. Track prompt versions in trace metadata
  7. Use variables for reusable prompt templates
  8. A/B test new prompts before full rollout

OrchestKit 4-Level Prompt Caching Architecture

OrchestKit uses a multi-level caching strategy with Jinja2 templates as L4 fallback:

┌─────────────────────────────────────────────────────────────┐
│                    PROMPT RESOLUTION                        │
├─────────────────────────────────────────────────────────────┤
│  L1: In-Memory LRU Cache (5min TTL)                         │
│  └─► Hit? Return cached prompt                              │
│                                                             │
│  L2: Redis Cache (15min TTL)                                │
│  └─► Hit? Populate L1, return prompt                        │
│                                                             │
│  L3: Langfuse API (cloud-managed)                           │
│  └─► Hit? Populate L1+L2, return prompt (uses {var} syntax) │
│                                                             │
│  L4: Jinja2 Templates (local fallback)                      │
│  └─► Uses TRUE Jinja2 {{ var }} syntax                      │
│  └─► Variables passed at render time                        │
│  └─► Located in: scripts/*.j2                             │
└─────────────────────────────────────────────────────────────┘

L4 Jinja2 Template Fallback (Issue #414)

When Langfuse is unavailable, OrchestKit falls back to Jinja2 templates:

from app.shared.services.prompts.template_loader import render_template

# Templates use TRUE Jinja2 syntax: {{ variable }}
# Variables passed directly to Jinja2, NOT Python .format()
prompt = render_template("supervisor/routing.j2", agent_list=agent_list)

Template location: backend/app/shared/services/prompts/scripts/

  • supervisor/routing.j2 - Supervisor routing prompt
  • agents/tier1/*.j2 - Tier 1 universal agents
  • agents/tier2/*.j2 - Tier 2 validation agents
  • agents/tier3/*.j2 - Tier 3 research agents
  • evaluators/*.j2 - G-Eval evaluator prompts

Variable Syntax Distinction

SourceSyntaxSubstitution
Langfuse prompts\{variable\}Python regex-based (via _compile_prompt())
Jinja2 templates\{\{ variable \}\}Native Jinja2 (via render_template())

Migration from Hardcoded Prompts (DEPRECATED)

The old HARDCODED_PROMPTS dict is REMOVED. All prompts now use:

  1. Langfuse (primary, cloud-managed)
  2. Jinja2 templates (L4 fallback, version-controlled)
# OLD (DEPRECATED - DO NOT USE):
system_prompt = HARDCODED_PROMPTS["security_auditor"]

# NEW (Recommended):
prompt_manager = get_prompt_manager()
system_prompt = await prompt_manager.get_prompt(
    name="analysis-agent-security-auditor",
    variables={},
    label="production",
)
# Falls through: L1 → L2 → L3 (Langfuse) → L4 (Jinja2 templates)

References

Session Tracking

Session & User Tracking

Group related traces, track performance by user, and filter with natural language.

Session Tracking (v3)

Group related traces into user sessions using get_client():

from langfuse import observe, get_client

@observe(name="url_fetch")
async def fetch_url(url: str, session_id: str):
    get_client().update_current_trace(session_id=session_id)
    return await http.get(url)

@observe(name="content_analysis")
async def analyze(content: str, session_id: str):
    get_client().update_current_trace(session_id=session_id)
    return await run_agents(content)

@observe(name="quality_gate")
async def quality_check(result: str, session_id: str):
    get_client().update_current_trace(session_id=session_id)
    return await evaluate(result)


# Usage — all 3 traces grouped under one session
session_id = f"analysis_{analysis_id}"
url_content = await fetch_url(url, session_id)
result = await analyze(url_content, session_id)
final = await quality_check(result, session_id)

Session View in UI

Session: analysis_abc123 (15.2s, $0.23)
├── url_fetch (1.0s, $0.02)
├── content_analysis (12.5s, $0.18)
│   ├── retrieval (0.5s, $0.01)
│   ├── security_audit (3.0s, $0.05)
│   ├── tech_comparison (2.5s, $0.04)
│   └── implementation_plan (6.5s, $0.08)
└── quality_gate (1.7s, $0.03)

User Tracking

Track performance per user:

from langfuse import observe, get_client

@observe()
async def analysis(content: str, user_id: str):
    get_client().update_current_trace(
        user_id=user_id,
        session_id="session_abc",
        metadata={
            "content_type": "article",
            "url": "https://example.com/post",
            "analysis_id": "abc123",
        },
    )
    return await run_pipeline(content)

Natural Language Filtering

Langfuse v3 supports natural language queries to filter traces in the UI:

# Examples of natural language filters:
"show me traces with latency > 5s from yesterday"
"find all traces by user_123 with cost > $0.10"
"traces tagged 'production' with relevance score < 0.5"
"sessions with more than 3 traces in the last 24 hours"

This replaces manual filter construction for common queries.

Metadata Tracking

Track custom metadata for filtering and analytics:

from langfuse import observe, get_client

@observe()
async def analysis(content: str):
    get_client().update_current_trace(
        user_id="user_123",
        metadata={
            "content_type": "article",
            "url": "https://example.com/post",
            "analysis_id": "abc123",
            "agent_count": 8,
            "total_cost_usd": 0.15,
            "difficulty": "complex",
            "language": "en",
        },
        tags=["production", "orchestkit", "security"],
    )
    return await run_pipeline(content)

Analytics Queries

Performance by User

SELECT
    user_id,
    COUNT(*) as trace_count,
    AVG(latency_ms) as avg_latency,
    SUM(calculated_total_cost) as total_cost
FROM traces
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY user_id
ORDER BY total_cost DESC
LIMIT 10;

v2 Metrics API Alternative

from langfuse import Langfuse
from datetime import datetime, timedelta

langfuse = Langfuse()

# Query session metrics via Metrics API instead of SQL
metrics = langfuse.get_metrics(
    metric_name="trace_count",
    from_timestamp=datetime.now() - timedelta(days=7),
    to_timestamp=datetime.now(),
    group_by="user_id",
    granularity="day",
)

for group in metrics.groups:
    print(f"User {group.key}: {group.values[0].value} traces")

Performance by Content Type

SELECT
    metadata->>'content_type' as content_type,
    COUNT(*) as count,
    AVG(latency_ms) as avg_latency,
    AVG(calculated_total_cost) as avg_cost
FROM traces
WHERE metadata->>'content_type' IS NOT NULL
GROUP BY content_type
ORDER BY count DESC;

Slowest Sessions

SELECT
    session_id,
    COUNT(*) as trace_count,
    SUM(latency_ms) as total_latency,
    SUM(calculated_total_cost) as total_cost
FROM traces
WHERE session_id IS NOT NULL
    AND timestamp > NOW() - INTERVAL '7 days'
GROUP BY session_id
ORDER BY total_latency DESC
LIMIT 10;

Tags for Filtering

Use tags for environment and feature flags:

from langfuse import observe, get_client

@observe()
async def production_analysis(content: str):
    get_client().update_current_trace(
        tags=["production", "v2-pipeline", "security-enabled"],
    )
    return await run_pipeline(content)

@observe()
async def staging_analysis(content: str):
    get_client().update_current_trace(
        tags=["staging", "experiment", "new-model"],
    )
    return await run_pipeline(content)

Best Practices

  1. Always set session_id for multi-step workflows
  2. Always set user_id for user attribution
  3. Add meaningful metadata (content_type, analysis_id, difficulty)
  4. Use consistent tag names across environments
  5. Tag production vs staging traces
  6. Use natural language filtering for quick trace lookups
  7. Track business metrics in metadata (conversion, revenue, user_tier)
  8. Filter by tags in dashboards for environment-specific views

OrchestKit Session Pattern

from langfuse import observe, get_client

@observe(name="content_analysis_workflow")
async def run_content_analysis(analysis_id: str, content: str, user_id: str):
    """Full workflow with session tracking."""

    # Set session-level metadata
    get_client().update_current_trace(
        session_id=f"analysis_{analysis_id}",
        user_id=user_id,
        metadata={
            "analysis_id": analysis_id,
            "content_length": len(content),
            "agent_count": 8,
            "environment": "production",
        },
        tags=["orchestkit", "production", "content-analysis"],
    )

    # All nested @observe calls inherit session_id
    results = []
    for agent in agents:
        result = await execute_agent(agent, content)
        results.append(result)

    return results

Identifying Slow or Expensive Users

-- Users with highest average latency
SELECT
    user_id,
    COUNT(*) as sessions,
    AVG(total_latency) as avg_session_latency,
    AVG(total_cost) as avg_session_cost
FROM (
    SELECT
        user_id,
        session_id,
        SUM(latency_ms) as total_latency,
        SUM(calculated_total_cost) as total_cost
    FROM traces
    WHERE timestamp > NOW() - INTERVAL '7 days'
    GROUP BY user_id, session_id
) sessions
GROUP BY user_id
HAVING COUNT(*) >= 5  -- At least 5 sessions
ORDER BY avg_session_latency DESC
LIMIT 10;

References

Statistical Methods

Statistical Methods for Drift Detection

Comparison of statistical methods for detecting distribution drift in LLM applications.

Method Comparison

MethodBest ForRangeSymmetricProsCons
PSIProduction monitoring0 to ∞YesStable, intuitive thresholdsOnly notices large changes
KL DivergenceSensitive analysis0 to ∞NoDetects tail changesUndefined for zero probabilities
JS DivergenceBalanced comparison0 to 1YesBounded, no divide-by-zeroLess sensitive to tails
KS TestSmall samples0 to 1YesNon-parametricToo sensitive on large datasets
WassersteinContinuous data0 to ∞YesConsiders distribution shapeComputationally expensive

Population Stability Index (PSI)

Recommended for production LLM monitoring.

import numpy as np

def calculate_psi(
    expected: np.ndarray,
    actual: np.ndarray,
    bins: int = 10,
    eps: float = 0.0001
) -> float:
    """
    Calculate Population Stability Index.

    PSI = Σ (Actual% - Expected%) × ln(Actual% / Expected%)

    Args:
        expected: Baseline distribution
        actual: Current distribution
        bins: Number of bins for histograms
        eps: Small value to avoid log(0)

    Returns:
        PSI score
    """
    # Create histograms with same bins
    min_val = min(expected.min(), actual.min())
    max_val = max(expected.max(), actual.max())
    bin_edges = np.linspace(min_val, max_val, bins + 1)

    expected_counts, _ = np.histogram(expected, bins=bin_edges)
    actual_counts, _ = np.histogram(actual, bins=bin_edges)

    # Convert to percentages
    expected_pct = expected_counts / len(expected) + eps
    actual_pct = actual_counts / len(actual) + eps

    # Calculate PSI
    psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))

    return psi


# Interpretation
PSI_THRESHOLDS = {
    "no_drift": 0.1,      # < 0.1: No significant change
    "moderate": 0.25,      # 0.1-0.25: Some change, investigate
    "significant": 0.25    # > 0.25: Significant change, action needed
}

Kolmogorov-Smirnov Test

Best for small sample sizes (<1000 observations).

from scipy import stats
import numpy as np

def ks_drift_test(
    expected: np.ndarray,
    actual: np.ndarray,
    significance: float = 0.05
) -> dict:
    """
    Kolmogorov-Smirnov test for distribution drift.

    Measures max difference between CDFs of two samples.

    Args:
        expected: Baseline distribution
        actual: Current distribution
        significance: p-value threshold for drift detection

    Returns:
        Dict with statistic, p-value, and drift detected flag
    """
    statistic, p_value = stats.ks_2samp(expected, actual)

    return {
        "statistic": statistic,  # 0-1, higher = more different
        "p_value": p_value,
        "drift_detected": p_value < significance,
        "interpretation": (
            "Distributions are different"
            if p_value < significance
            else "No significant difference"
        )
    }


# Warning: KS is very sensitive on large datasets
# May flag minor, irrelevant changes as "drift"
def adjusted_ks_test(expected, actual, sample_size: int = 500):
    """KS test with sampling for large datasets."""
    if len(expected) > sample_size:
        expected = np.random.choice(expected, sample_size, replace=False)
    if len(actual) > sample_size:
        actual = np.random.choice(actual, sample_size, replace=False)
    return ks_drift_test(expected, actual)

KL Divergence

Useful for detecting changes in distribution tails.

import numpy as np
from scipy.special import kl_div

def kl_divergence(
    p: np.ndarray,
    q: np.ndarray,
    bins: int = 10,
    eps: float = 1e-10
) -> float:
    """
    Calculate Kullback-Leibler divergence.

    KL(P||Q) = Σ P(x) × log(P(x) / Q(x))

    Note: KL divergence is asymmetric: KL(P||Q) ≠ KL(Q||P)

    Args:
        p: Reference distribution (expected)
        q: Comparison distribution (actual)
        bins: Number of bins
        eps: Small value to avoid log(0)

    Returns:
        KL divergence (0 = identical, higher = more different)
    """
    # Create probability distributions
    min_val = min(p.min(), q.min())
    max_val = max(p.max(), q.max())
    bin_edges = np.linspace(min_val, max_val, bins + 1)

    p_hist, _ = np.histogram(p, bins=bin_edges, density=True)
    q_hist, _ = np.histogram(q, bins=bin_edges, density=True)

    # Add epsilon to avoid log(0)
    p_hist = p_hist + eps
    q_hist = q_hist + eps

    # Normalize
    p_hist = p_hist / p_hist.sum()
    q_hist = q_hist / q_hist.sum()

    # Calculate KL divergence
    return np.sum(p_hist * np.log(p_hist / q_hist))


# Symmetric version using both directions
def symmetric_kl(p, q, bins=10):
    """Symmetric KL: (KL(P||Q) + KL(Q||P)) / 2"""
    return (kl_divergence(p, q, bins) + kl_divergence(q, p, bins)) / 2

Jensen-Shannon Divergence

Symmetric, bounded alternative to KL divergence.

import numpy as np
from scipy.spatial.distance import jensenshannon

def js_divergence(
    p: np.ndarray,
    q: np.ndarray,
    bins: int = 10
) -> float:
    """
    Calculate Jensen-Shannon divergence.

    JS(P||Q) = 0.5 × KL(P||M) + 0.5 × KL(Q||M)
    where M = 0.5 × (P + Q)

    Benefits over KL:
    - Symmetric: JS(P||Q) = JS(Q||P)
    - Bounded: 0 ≤ JS ≤ 1
    - No divide-by-zero issues

    Returns:
        JS divergence (0 = identical, 1 = completely different)
    """
    # Create probability distributions
    min_val = min(p.min(), q.min())
    max_val = max(p.max(), q.max())
    bin_edges = np.linspace(min_val, max_val, bins + 1)

    p_hist, _ = np.histogram(p, bins=bin_edges, density=True)
    q_hist, _ = np.histogram(q, bins=bin_edges, density=True)

    # Normalize
    p_hist = p_hist / (p_hist.sum() + 1e-10)
    q_hist = q_hist / (q_hist.sum() + 1e-10)

    # scipy's jensenshannon returns sqrt of JS divergence
    return jensenshannon(p_hist, q_hist) ** 2

# Thresholds (JS is bounded 0-1)
JS_THRESHOLDS = {
    "no_drift": 0.05,
    "moderate": 0.15,
    "significant": 0.15
}

Wasserstein Distance (Earth Mover's Distance)

Best for continuous distributions where "distance" matters.

from scipy.stats import wasserstein_distance
import numpy as np

def wasserstein_drift(
    expected: np.ndarray,
    actual: np.ndarray,
    normalize: bool = True
) -> float:
    """
    Calculate Wasserstein distance (Earth Mover's Distance).

    Measures the "work" needed to transform one distribution into another.

    Benefits:
    - Considers the shape/geometry of distributions
    - Good compromise between KS sensitivity and PSI stability

    Limitation:
    - Cannot be computed for categorical data (needs ordinal/numeric)

    Args:
        expected: Baseline distribution
        actual: Current distribution
        normalize: Normalize by data range for comparability

    Returns:
        Wasserstein distance
    """
    distance = wasserstein_distance(expected, actual)

    if normalize:
        # Normalize by data range for consistent thresholds
        data_range = max(expected.max(), actual.max()) - min(expected.min(), actual.min())
        if data_range > 0:
            distance = distance / data_range

    return distance

# Normalized thresholds (0-1 range after normalization)
WASSERSTEIN_THRESHOLDS = {
    "no_drift": 0.05,
    "moderate": 0.1,
    "significant": 0.1
}

Choosing the Right Method

def select_drift_method(
    data_type: str,
    sample_size: int,
    sensitivity: str = "balanced"
) -> str:
    """
    Recommend drift detection method based on data characteristics.

    Args:
        data_type: "continuous", "categorical", "embeddings"
        sample_size: Number of observations
        sensitivity: "high", "balanced", "low"

    Returns:
        Recommended method name
    """
    if data_type == "categorical":
        # Wasserstein doesn't work for categorical
        return "psi" if sample_size > 1000 else "chi_square"

    if data_type == "embeddings":
        # Use specialized embedding drift methods
        return "embedding_centroid_distance"

    # Continuous data
    if sample_size < 500:
        return "ks_test"  # Good for small samples

    if sensitivity == "high":
        return "ks_test"  # Most sensitive

    if sensitivity == "low":
        return "psi"  # Stable, only catches big changes

    # Balanced default
    return "wasserstein"  # Good compromise

Combined Drift Score

def combined_drift_score(
    expected: np.ndarray,
    actual: np.ndarray,
    weights: dict = None
) -> dict:
    """
    Calculate multiple drift metrics and combine them.

    Args:
        expected: Baseline distribution
        actual: Current distribution
        weights: Optional weights for combining scores

    Returns:
        Dict with individual and combined scores
    """
    weights = weights or {
        "psi": 0.4,
        "wasserstein": 0.3,
        "js": 0.3
    }

    # Calculate individual metrics
    psi = calculate_psi(expected, actual)
    wasserstein = wasserstein_drift(expected, actual, normalize=True)
    js = js_divergence(expected, actual)

    # Normalize PSI to 0-1 range for combining
    psi_normalized = min(psi / 0.5, 1.0)  # Cap at 1.0

    # Weighted combination
    combined = (
        weights["psi"] * psi_normalized +
        weights["wasserstein"] * wasserstein +
        weights["js"] * js
    )

    return {
        "psi": psi,
        "psi_normalized": psi_normalized,
        "wasserstein": wasserstein,
        "js_divergence": js,
        "combined_score": combined,
        "drift_detected": combined > 0.15
    }

References

Structured Logging

Structured Logging

JSON logging best practices for production systems.

Why Structured Logging?

  • Searchable - query by fields (user_id, trace_id)
  • Machine-readable - parse and aggregate easily
  • Contextual - attach metadata to every log

Python (structlog)

import structlog

logger = structlog.get_logger()

logger.info("user_login", user_id="123", ip="192.168.1.1")
# Output: {"event": "user_login", "user_id": "123", "ip": "192.168.1.1", "timestamp": "2025-12-19T10:00:00Z"}

Node.js (pino)

import pino from 'pino';

const logger = pino();

logger.info({ userId: '123', action: 'login' }, 'User logged in');
// Output: {"level":30,"userId":"123","action":"login","msg":"User logged in","time":1702990800000}

Log Levels

LevelUse CaseExample
DEBUGDevelopment onlyVariable values, function calls
INFONormal operationsUser actions, workflow steps
WARNRecoverable issuesRetries, deprecated API usage
ERRORFailuresExceptions, failed requests
CRITICALSystem failureDatabase down, out of memory

Best Practices

  1. Always include trace_id - correlate across services
  2. Log at boundaries - API requests/responses, DB queries
  3. Don't log secrets - mask passwords, API keys
  4. Use correlation IDs - track requests across microservices

See scripts/structured-logging.ts for implementation.

Tracing Setup

Distributed Tracing with Langfuse

Track LLM calls across your application with automatic parent-child span relationships using OpenTelemetry.

Basic Usage: @observe Decorator (v3)

from langfuse import observe, get_client

@observe()  # Auto-creates trace on first root span
async def analyze_content(content: str, agent_type: str):
    """Analyze content with automatic Langfuse tracing."""

    # Nested span for retrieval
    @observe(name="retrieval")
    async def retrieve_context():
        chunks = await vector_db.search(content)
        get_client().update_current_observation(
            metadata={"chunks_retrieved": len(chunks)}
        )
        return chunks

    # Nested span for generation
    @observe(name="generation")
    async def generate_analysis(context):
        response = await llm.generate(
            prompt=f"Context: {context}\n\nAnalyze: {content}"
        )
        get_client().update_current_observation(
            input=content[:500],
            output=response[:500],
            model="claude-sonnet-4-6",
            usage={
                "input_tokens": response.usage.input_tokens,
                "output_tokens": response.usage.output_tokens,
            },
        )
        return response

    context = await retrieve_context()
    return await generate_analysis(context)

Result in Langfuse UI

analyze_content (2.3s, $0.045)
├── retrieval (0.1s)
│   └── metadata: {chunks_retrieved: 5}
└── generation (2.2s, $0.045)
    └── model: claude-sonnet-4-6
    └── tokens: 1500 input, 1000 output

W3C Trace Context

Langfuse v3 uses W3C Trace Context format for trace IDs:

# v3 trace IDs follow W3C format (not UUIDs)
# Example: 4bf92f3577b34da6a3ce929d0e0e4736
# This enables correlation with other OTEL-instrumented services

OpenTelemetry SpanProcessor Setup

For custom OTEL integration or forwarding traces to multiple backends:

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from langfuse.opentelemetry import LangfuseSpanProcessor

# Langfuse as OTEL SpanProcessor
langfuse_processor = LangfuseSpanProcessor(
    public_key="pk-...",
    secret_key="sk-...",
    host="https://cloud.langfuse.com",
)

# Add to OTEL tracer provider
provider = TracerProvider()
provider.add_span_processor(langfuse_processor)

# Now all OTEL spans are sent to Langfuse
# Works with any OTEL-instrumented library

New Observation Types (v3)

Beyond generation and span, v3 adds typed observations for Agent Graph rendering:

from langfuse import observe, get_client

@observe(type="agent", name="supervisor")
async def supervisor(query: str):
    """Agent type — shows as agent node in graph."""
    intent = await classify(query)
    return await route_to_specialist(intent, query)

@observe(type="tool", name="web_search")
async def search(query: str):
    """Tool type — shows as tool call in graph."""
    return await tavily.search(query)

@observe(type="retriever", name="vector_search")
async def retrieve(query: str):
    """Retriever type — shows retrieval step in graph."""
    return await vector_db.search(query, top_k=5)

@observe(type="chain", name="prompt_chain")
async def chain(inputs: dict):
    """Chain type — shows sequential processing."""
    return await run_chain(inputs)

@observe(type="guardrail", name="pii_check")
async def check_pii(text: str):
    """Guardrail type — shows safety check in graph."""
    return detect_and_mask_pii(text)

@observe(type="embedding", name="embed")
async def embed(text: str):
    """Embedding type — shows vector generation."""
    return await embeddings.embed(text)

@observe(type="evaluator", name="quality_judge")
async def evaluate(output: str):
    """Evaluator type — creates inspectable trace."""
    return await llm_judge.score(output)

Workflow Integration

from langfuse import observe, get_client

@observe(name="content_analysis_workflow")
async def run_content_analysis(analysis_id: str, content: str):
    """Full workflow with automatic Langfuse tracing."""

    # Set trace-level metadata
    get_client().update_current_trace(
        user_id=f"analysis_{analysis_id}",
        metadata={
            "analysis_id": analysis_id,
            "content_length": len(content),
        },
    )

    # Each agent execution automatically creates nested spans
    results = []
    for agent in agents:
        result = await execute_agent(agent, content)  # @observe decorated
        results.append(result)

    return results

LangChain/LangGraph Integration

For LangChain/LangGraph applications, use the CallbackHandler:

from langfuse.callback import CallbackHandler

langfuse_handler = CallbackHandler(
    public_key=settings.LANGFUSE_PUBLIC_KEY,
    secret_key=settings.LANGFUSE_SECRET_KEY,
)

# Use with LangChain
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(
    model="claude-sonnet-4-6",
    callbacks=[langfuse_handler],
)

response = llm.invoke("Analyze this code...")  # Auto-traced!

JavaScript/TypeScript Tracing

import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseExporter } from "@langfuse/otel";

// Setup OTEL exporter to Langfuse
const sdk = new NodeSDK({
  traceExporter: new LangfuseExporter({
    publicKey: process.env.LANGFUSE_PUBLIC_KEY,
    secretKey: process.env.LANGFUSE_SECRET_KEY,
  }),
});
sdk.start();

// All OTEL-compatible libraries now trace to Langfuse

Best Practices

  1. Use from langfuse import observe, get_client — NOT from langfuse.decorators
  2. Let @observe() auto-create traces — no explicit langfuse.trace() needed
  3. Name your spans with descriptive names (e.g., "retrieval", "generation")
  4. Use type= parameter for Agent Graph rendering
  5. Add metadata to observations for debugging (chunk counts, model params)
  6. Truncate large inputs/outputs to 500-1000 chars to reduce storage
  7. Use nested observations to track sub-operations

References


Checklists (2)

Langfuse Setup Checklist

Langfuse Setup Checklist

Complete guide for setting up Langfuse observability in your application, based on OrchestKit's production implementation.

Prerequisites

  • Python 3.10+ or Node.js 18+ application
  • LLM integration (OpenAI, Anthropic, Google, etc.)
  • PostgreSQL database (for self-hosted Langfuse)
  • Docker and docker-compose (recommended for self-hosting)

Phase 1: Langfuse Server Setup

Option A: Langfuse Cloud (Fastest)

  • Sign up at cloud.langfuse.com
  • Create new project
  • Copy LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY
  • Copy LANGFUSE_HOST (usually https://cloud.langfuse.com)

Langfuse v3 requires ClickHouse (analytics), Redis (queuing), MinIO (blob storage), and Postgres.

  • Create docker-compose.yml for Langfuse:

    services:
      langfuse-web:
        image: langfuse/langfuse:3
        ports:
          - "3000:3000"
        environment:
          DATABASE_URL: postgresql://langfuse:CHANGE_ME_strong_password@postgres:5432/langfuse  # CHANGE ME
          CLICKHOUSE_URL: http://clickhouse:8123
          REDIS_URL: redis://redis:6379
          LANGFUSE_S3_UPLOAD_BUCKET: langfuse
          LANGFUSE_S3_ENDPOINT: http://minio:9000
          LANGFUSE_S3_ACCESS_KEY_ID: minio  # CHANGE ME for production
          LANGFUSE_S3_SECRET_ACCESS_KEY: miniosecret  # CHANGE ME for production
          NEXTAUTH_SECRET: your-secret-key-here  # Generate: openssl rand -base64 32
          NEXTAUTH_URL: http://localhost:3000
          SALT: your-salt-here  # Generate: openssl rand -base64 32
        depends_on:
          - postgres
          - clickhouse
          - redis
          - minio
    
      langfuse-worker:
        image: langfuse/langfuse-worker:3
        environment:
          DATABASE_URL: postgresql://langfuse:CHANGE_ME_strong_password@postgres:5432/langfuse  # CHANGE ME
          CLICKHOUSE_URL: http://clickhouse:8123
          REDIS_URL: redis://redis:6379
          LANGFUSE_S3_UPLOAD_BUCKET: langfuse
          LANGFUSE_S3_ENDPOINT: http://minio:9000
          LANGFUSE_S3_ACCESS_KEY_ID: minio  # CHANGE ME for production
          LANGFUSE_S3_SECRET_ACCESS_KEY: miniosecret  # CHANGE ME for production
        depends_on:
          - postgres
          - clickhouse
          - redis
          - minio
    
      postgres:
        image: postgres:15
        environment:
          POSTGRES_USER: langfuse
          POSTGRES_PASSWORD: password
          POSTGRES_DB: langfuse
        volumes:
          - langfuse-postgres:/var/lib/postgresql/data
    
      clickhouse:
        image: clickhouse/clickhouse-server:24
        environment:
          CLICKHOUSE_DB: langfuse
          CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: "1"
        volumes:
          - langfuse-clickhouse:/var/lib/clickhouse
    
      redis:
        image: redis:7-alpine
        volumes:
          - langfuse-redis:/data
    
      minio:
        image: minio/minio
        command: server /data --console-address ":9001"
        environment:
          MINIO_ROOT_USER: minio
          MINIO_ROOT_PASSWORD: miniosecret
        volumes:
          - langfuse-minio:/data
    
    volumes:
      langfuse-postgres:
      langfuse-clickhouse:
      langfuse-redis:
      langfuse-minio:
  • Start Langfuse: docker-compose up -d

  • Visit http://localhost:3000 and create admin account

  • Create project in UI

  • Copy API keys from Settings → API Keys

Phase 2: SDK Installation

Python (FastAPI/Flask/Django)

  • Install SDK: pip install "langfuse>=3.13.0"
  • Add to requirements.txt: langfuse>=3.13.0

Node.js (Express/Next.js)

  • Install SDK: npm install @langfuse/core
  • Add to package.json: "@langfuse/core": "^3.0.0"

Phase 3: Configuration

Environment Variables

  • Add to .env:

    LANGFUSE_PUBLIC_KEY=pk-lf-...
    LANGFUSE_SECRET_KEY=sk-lf-...
    LANGFUSE_HOST=http://localhost:3000  # or https://cloud.langfuse.com
  • Add to .env.example (without values):

    LANGFUSE_PUBLIC_KEY=
    LANGFUSE_SECRET_KEY=
    LANGFUSE_HOST=
  • Add to .gitignore: .env

Application Config

Python (backend/app/core/config.py):

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    LANGFUSE_PUBLIC_KEY: str
    LANGFUSE_SECRET_KEY: str
    LANGFUSE_HOST: str = "https://cloud.langfuse.com"

    class Config:
        env_file = ".env"

settings = Settings()
  • Create settings class with Langfuse fields
  • Validate environment variables on startup
  • Add type hints for all config fields

Phase 4: Client Initialization

Python Client

File: backend/app/shared/services/langfuse/client.py

from langfuse import Langfuse
from app.core.config import settings

langfuse_client = Langfuse(
    public_key=settings.LANGFUSE_PUBLIC_KEY,
    secret_key=settings.LANGFUSE_SECRET_KEY,
    host=settings.LANGFUSE_HOST,
    debug=False,  # Set to True in development
    enabled=True  # Set to False to disable tracing
)
  • Create dedicated client module
  • Use singleton pattern for client instance
  • Add debug mode for development
  • Add enabled flag for testing/CI

Node.js Client

File: src/lib/langfuse.ts

import { Langfuse } from '@langfuse/core';

export const langfuse = new Langfuse({
  publicKey: process.env.LANGFUSE_PUBLIC_KEY!,
  secretKey: process.env.LANGFUSE_SECRET_KEY!,
  baseUrl: process.env.LANGFUSE_HOST || 'https://cloud.langfuse.com',
  debug: process.env.NODE_ENV === 'development',
  enabled: process.env.NODE_ENV !== 'test'
});
  • Create dedicated client module
  • Add TypeScript types
  • Disable in test environment
  • Enable debug mode in development

Phase 5: Decorator-Based Tracing

Python @observe Decorator

Example: backend/app/services/analysis.py

from langfuse import observe, get_client

@observe(name="analyze_content")
async def analyze_content(url: str, content: str) -> AnalysisResult:
    """Analyze content with automatic Langfuse tracing."""

    # Set trace-level metadata
    get_client().update_current_trace(
        name="content_analysis",
        session_id=f"analysis_{analysis_id}",
        user_id="system",
        metadata={
            "url": url,
            "content_length": len(content)
        },
        tags=["production", "v1"]
    )

    # Nested function - creates child span automatically
    @observe(name="fetch_metadata")
    async def fetch_metadata():
        # ... work ...
        pass

    # All nested calls create child spans
    metadata = await fetch_metadata()
    embedding = await generate_embedding(content)  # Also @observe decorated

    return AnalysisResult(metadata=metadata)
  • Add @observe to all async functions that call LLMs
  • Set meaningful span names
  • Add session_id for multi-step workflows
  • Add user_id for user-facing features
  • Tag traces by environment (production/staging)

Phase 6: LLM Call Instrumentation

Anthropic Claude

from langfuse import observe, get_client
from anthropic import AsyncAnthropic

anthropic_client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)

@observe(name="llm_call")
async def call_claude(prompt: str, model: str = "claude-sonnet-4-6") -> str:
    """Call Claude with cost tracking."""

    # Log input
    get_client().update_current_observation(
        input=prompt[:2000],  # Truncate large prompts
        model=model
    )

    # Call LLM
    response = await anthropic_client.messages.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=4096
    )

    # Extract tokens
    input_tokens = response.usage.input_tokens
    output_tokens = response.usage.output_tokens

    # Calculate cost (Claude Sonnet 4.6: $3/MTok input, $15/MTok output)
    cost_usd = (input_tokens / 1_000_000) * 3.00 + (output_tokens / 1_000_000) * 15.00

    # Log output and usage
    get_client().update_current_observation(
        output=response.content[0].text[:2000],
        usage={
            "input": input_tokens,
            "output": output_tokens,
            "unit": "TOKENS"
        },
        metadata={"cost_usd": cost_usd}
    )

    return response.content[0].text
  • Wrap all LLM calls with @observe
  • Log input/output (truncated)
  • Track token usage
  • Calculate and log costs
  • Add model name to metadata

OpenAI

@observe(name="llm_call")
async def call_openai(prompt: str, model: str = "gpt-5.2") -> str:
    """Call OpenAI with cost tracking."""

    get_client().update_current_observation(
        input=prompt[:2000],
        model=model
    )

    response = await openai_client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )

    # OpenAI pricing (gpt-5.2: $2.50/MTok input, $10/MTok output)
    input_tokens = response.usage.prompt_tokens
    output_tokens = response.usage.completion_tokens
    cost_usd = (input_tokens / 1_000_000) * 2.50 + (output_tokens / 1_000_000) * 10.00

    get_client().update_current_observation(
        output=response.choices[0].message.content[:2000],
        usage={
            "input": input_tokens,
            "output": output_tokens,
            "unit": "TOKENS"
        },
        metadata={"cost_usd": cost_usd}
    )

    return response.choices[0].message.content
  • Add pricing for your models
  • Update pricing when model costs change
  • Log model name for each call

Phase 7: Quality Scoring

Add Evaluation Scores

from langfuse import observe, get_client

@observe(name="evaluate_quality")
async def evaluate_response(query: str, response: str) -> dict:
    """Evaluate LLM response quality."""

    # Run evaluation (your logic here)
    scores = {
        "relevance": 0.85,
        "coherence": 0.92,
        "depth": 0.78
    }

    # Add scores to current trace
    lf = get_client()
    trace_id = lf.get_current_trace_id()
    for criterion, score in scores.items():
        lf.score(
            trace_id=trace_id,
            name=criterion,
            value=score,
            comment=f"Evaluated {criterion} of response"
        )

    # Add overall score
    overall = sum(scores.values()) / len(scores)
    lf.score(
        trace_id=trace_id,
        name="overall_quality",
        value=overall,
        comment="Average of all criteria"
    )

    return scores
  • Add quality scoring for all LLM outputs
  • Use consistent criterion names
  • Track scores over time
  • Add comments explaining scores

Phase 8: Testing & Validation

Test Trace Creation

import pytest
from app.shared.services.langfuse.client import langfuse_client

@pytest.mark.asyncio
async def test_langfuse_trace_creation():
    """Verify Langfuse traces are created."""

    trace = langfuse_client.trace(
        name="test_trace",
        metadata={"test": True}
    )

    generation = trace.generation(
        name="test_generation",
        model="claude-sonnet-4-6",
        input="Test prompt",
        output="Test response",
        usage={"input": 10, "output": 5, "unit": "TOKENS"}
    )

    # Flush to ensure data is sent
    langfuse_client.flush()

    assert trace.id is not None
    assert generation.id is not None
  • Add integration tests for tracing
  • Test trace creation
  • Test score logging
  • Verify data appears in UI

Verify in Langfuse UI

  • Visit Langfuse UI
  • Check Traces tab for test traces
  • Verify metadata appears correctly
  • Check Scores tab for quality metrics
  • Verify cost calculations are accurate

Phase 9: Production Monitoring

Create Dashboards

  • Cost Dashboard - Track spending by model, user, time
  • Quality Dashboard - Monitor quality scores over time
  • Performance Dashboard - Track latency by operation
  • Error Dashboard - Failed traces, error rates

Set Up Alerts (via Langfuse UI or SQL)

-- Alert: Daily cost exceeds $100
SELECT
    DATE(timestamp) as date,
    SUM(calculated_total_cost) as daily_cost
FROM traces
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY DATE(timestamp)
HAVING SUM(calculated_total_cost) > 100;
  • Daily cost threshold alerts
  • Quality score degradation alerts
  • High latency alerts
  • Error rate alerts

Weekly Review Process

  • Review top 10 most expensive traces
  • Analyze quality score trends
  • Identify optimization opportunities
  • Update prompt versions based on scores

Phase 10: Advanced Features

Prompt Management

  • Create prompts in Langfuse UI
  • Version prompts using labels (production, staging, custom)
  • Use get_client().get_prompt() in code with fallback= for resilience
  • A/B test prompt versions
  • Promote winning prompts to production label

Dataset Evaluation

  • Create evaluation datasets in UI
  • Run automated evaluations
  • Track accuracy over time
  • Compare model versions

Troubleshooting

Traces Not Appearing

  • Check API keys are correct
  • Verify LANGFUSE_HOST matches server
  • Check enabled=True in client
  • Call langfuse_client.flush() in tests
  • Check network connectivity to Langfuse server

High Latency

  • Enable async mode: flush_at=20 (batch sends)
  • Reduce metadata size (truncate large strings)
  • Use background thread for flushing

Missing Costs

  • Verify usage data is logged: \{"input": X, "output": Y, "unit": "TOKENS"\}
  • Check model pricing in Langfuse UI (Settings → Models)
  • Add custom pricing if model not in database

References

Monitoring Implementation Checklist

Monitoring Implementation Checklist

Complete guide for implementing production-grade monitoring, based on OrchestKit's real setup.

Prerequisites

  • Application deployed (dev/staging/production)
  • Docker or Kubernetes for monitoring stack
  • Basic understanding of Prometheus, Grafana, Loki

Phase 1: Structured Logging

Python (structlog)

Install dependencies:

pip install structlog python-json-logger
  • Install structlog and dependencies
  • Add to requirements.txt

Configure structlog:

File: backend/app/core/logging.py

import logging
import structlog
from structlog.processors import JSONRenderer, TimeStamper, add_log_level

def configure_logging():
    """Configure structured logging with JSON output."""

    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,  # Merge correlation IDs
            add_log_level,
            TimeStamper(fmt="iso"),
            JSONRenderer()
        ],
        wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True
    )

def get_logger(name: str):
    """Get a structured logger instance."""
    return structlog.get_logger(name)
  • Create logging configuration module
  • Configure JSON output (not plain text)
  • Set appropriate log level (INFO for production)
  • Add timestamp processor
  • Enable context variable merging

Add correlation ID middleware:

import structlog
import uuid_utils  # pip install uuid-utils (UUID v7 for Python < 3.14)
from fastapi import Request

@app.middleware("http")
async def correlation_middleware(request: Request, call_next):
    """Add correlation ID to all logs."""

    # Get or generate correlation ID (UUID v7 for time-ordering in traces)
    correlation_id = request.headers.get("X-Correlation-ID") or str(uuid_utils.uuid7())

    # Bind to logger context
    structlog.contextvars.bind_contextvars(
        correlation_id=correlation_id,
        method=request.method,
        path=request.url.path
    )

    # Process request
    response = await call_next(request)

    # Add to response headers
    response.headers["X-Correlation-ID"] = correlation_id

    # Clear context
    structlog.contextvars.clear_contextvars()

    return response
  • Add correlation ID middleware
  • Generate UUID if not provided
  • Bind correlation_id to all logs in request
  • Return correlation_id in response headers
  • Clear context after request

Node.js (winston)

Install dependencies:

npm install winston express-winston uuid
  • Install winston and dependencies
  • Add to package.json

Configure winston:

File: src/lib/logger.ts

import winston from 'winston';

export const logger = winston.createLogger({
  level: process.env.LOG_LEVEL || 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.json()
  ),
  transports: [
    new winston.transports.Console()
  ]
});

export const getLogger = (name: string) => {
  return logger.child({ logger: name });
};
  • Create logger configuration
  • Use JSON format
  • Add timestamp to all logs
  • Support child loggers with context

Phase 2: Metrics Collection

Python (prometheus-client)

Install:

pip install prometheus-client
  • Install prometheus-client
  • Add to requirements.txt

Create metrics module:

File: backend/app/core/metrics.py

from prometheus_client import Counter, Histogram, Gauge

# HTTP request metrics
http_requests_total = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

http_request_duration_seconds = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10]
)

# Database metrics
db_query_duration_seconds = Histogram(
    'db_query_duration_seconds',
    'Database query latency',
    ['query_type'],
    buckets=[0.001, 0.01, 0.05, 0.1, 0.5, 1]
)

db_connections_active = Gauge(
    'db_connections_active',
    'Number of active database connections'
)

# LLM metrics
llm_tokens_used = Counter(
    'llm_tokens_used_total',
    'Total LLM tokens consumed',
    ['model', 'operation', 'token_type']
)

llm_cost_dollars = Counter(
    'llm_cost_dollars_total',
    'Total LLM cost in dollars',
    ['model', 'operation']
)

# Cache metrics
cache_operations = Counter(
    'cache_operations_total',
    'Cache operations',
    ['operation', 'result']  # result=hit|miss
)
  • Define HTTP metrics (requests, latency)
  • Define database metrics (query latency, connections)
  • Define LLM metrics (tokens, cost)
  • Define cache metrics (hits, misses)
  • Use appropriate metric types (Counter, Histogram, Gauge)
  • Choose meaningful bucket boundaries

Add metrics middleware:

from fastapi import Request
import time
from app.core.metrics import http_requests_total, http_request_duration_seconds

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    """Track HTTP request metrics."""

    start_time = time.time()

    # Process request
    response = await call_next(request)

    # Record metrics
    duration = time.time() - start_time

    http_requests_total.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    http_request_duration_seconds.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)

    return response
  • Add metrics middleware
  • Track request count
  • Track request duration
  • Label by method, endpoint, status

Expose metrics endpoint:

from fastapi import Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST

@app.get("/metrics")
async def metrics():
    """Expose Prometheus metrics."""
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST
    )
  • Add /metrics endpoint
  • Return Prometheus format
  • Secure endpoint (internal network only)

Node.js (prom-client)

Install:

npm install prom-client
  • Install prom-client
  • Add to package.json

Create metrics:

import { Counter, Histogram, register } from 'prom-client';

export const httpRequestsTotal = new Counter({
  name: 'http_requests_total',
  help: 'Total HTTP requests',
  labelNames: ['method', 'endpoint', 'status']
});

export const httpRequestDuration = new Histogram({
  name: 'http_request_duration_seconds',
  help: 'HTTP request latency',
  labelNames: ['method', 'endpoint'],
  buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10]
});

// Expose metrics endpoint
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});
  • Define metrics with prom-client
  • Add /metrics endpoint
  • Use consistent label names

Phase 3: Prometheus Setup

Docker Compose

File: monitoring/docker-compose.yml

version: '3.8'

services:
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
      - ./prometheus/alerts:/etc/prometheus/alerts
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.retention.time=30d'

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
      - GF_USERS_ALLOW_SIGN_UP=false
    volumes:
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana/datasources:/etc/grafana/provisioning/datasources
      - grafana-data:/var/lib/grafana

  loki:
    image: grafana/loki:latest
    ports:
      - "3100:3100"
    volumes:
      - ./loki/loki.yml:/etc/loki/local-config.yaml
      - loki-data:/loki

  promtail:
    image: grafana/promtail:latest
    volumes:
      - ./promtail/promtail.yml:/etc/promtail/config.yml
      - /var/log:/var/log
    command: -config.file=/etc/promtail/config.yml

volumes:
  prometheus-data:
  grafana-data:
  loki-data:
  • Create docker-compose.yml
  • Add Prometheus service
  • Add Grafana service
  • Add Loki + Promtail for logs
  • Configure volumes for persistence
  • Set retention periods

Prometheus Configuration

File: monitoring/prometheus/prometheus.yml

global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'backend'
    static_configs:
      - targets: ['backend:8500']  # Your app's /metrics endpoint

  - job_name: 'frontend'
    static_configs:
      - targets: ['frontend:3000']

  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres-exporter:9187']

  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

# Load alerting rules
rule_files:
  - '/etc/prometheus/alerts/*.yml'

# Alertmanager configuration
alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']
  • Create Prometheus config
  • Add scrape targets for all services
  • Configure scrape interval (15s recommended)
  • Load alerting rules
  • Configure Alertmanager

Phase 4: Alerting Rules

File: monitoring/prometheus/alerts/service.yml

groups:
- name: service-health
  interval: 30s
  rules:
  - alert: ServiceDown
    expr: up == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Service {{ $labels.job }} is down"

  - alert: HighErrorRate
    expr: |
      sum(rate(http_requests_total{status=~"5.."}[5m])) /
      sum(rate(http_requests_total[5m])) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Error rate above 5%"

  - alert: HighLatency
    expr: |
      histogram_quantile(0.95,
        rate(http_request_duration_seconds_bucket[5m])
      ) > 2
    for: 10m
    labels:
      severity: high
    annotations:
      summary: "p95 latency above 2s"
  • Create alerting rules file
  • Add service availability alerts
  • Add error rate alerts
  • Add latency alerts
  • Set appropriate thresholds
  • Add meaningful annotations

File: monitoring/prometheus/alerts/application.yml

groups:
- name: application-metrics
  interval: 1m
  rules:
  # Cache performance
  - alert: LowCacheHitRate
    expr: |
      sum(rate(cache_operations_total{result="hit"}[30m])) /
      sum(rate(cache_operations_total[30m])) < 0.70
    for: 1h
    labels:
      severity: medium
    annotations:
      summary: "Cache hit rate below 70%"

  # Database performance
  - alert: SlowQueries
    expr: |
      histogram_quantile(0.95,
        rate(db_query_duration_seconds_bucket[5m])
      ) > 0.5
    for: 10m
    labels:
      severity: high
    annotations:
      summary: "Database queries slow (p95 > 500ms)"

  # LLM cost
  - alert: HighDailyCost
    expr: sum(increase(llm_cost_dollars_total[24h])) > 50
    labels:
      severity: high
    annotations:
      summary: "Daily LLM cost exceeded $50"
  • Add cache alerts
  • Add database alerts
  • Add LLM cost alerts
  • Set severity levels correctly

Phase 5: Grafana Dashboards

Datasource Configuration

File: monitoring/grafana/datasources/prometheus.yml

apiVersion: 1
datasources:
  - name: Prometheus
    type: prometheus
    access: proxy
    url: http://prometheus:9090
    isDefault: true

  - name: Loki
    type: loki
    access: proxy
    url: http://loki:3100
  • Configure Prometheus datasource
  • Configure Loki datasource
  • Set Prometheus as default

Service Overview Dashboard

Create dashboard with:

  1. Golden Signals Row:

    • Latency (p50, p95, p99)
    • Traffic (requests/second)
    • Errors (error rate %)
    • Saturation (CPU, memory)
  2. Request Breakdown:

    • Requests by endpoint
    • Requests by status code
    • Request rate over time
  3. Dependencies:

    • Database query latency
    • Redis latency
    • External API latency
  4. Resources:

    • CPU usage
    • Memory usage
    • Disk I/O
    • Network I/O

Example Panel Queries

Latency Panel:

# p50 latency
histogram_quantile(0.5, rate(http_request_duration_seconds_bucket[5m]))

# p95 latency
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))

# p99 latency
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))

Traffic Panel:

sum(rate(http_requests_total[5m]))

Error Rate Panel:

sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m]))
  • Add all key panels
  • Use appropriate visualization types
  • Add thresholds for red/yellow/green
  • Set refresh interval (10s-30s)

Phase 6: Log Aggregation (Loki)

Loki Configuration

File: monitoring/loki/loki.yml

auth_enabled: false

server:
  http_listen_port: 3100

ingester:
  lifecycler:
    ring:
      kvstore:
        store: inmemory
      replication_factor: 1

schema_config:
  configs:
    - from: 2024-01-01
      store: boltdb-shipper
      object_store: filesystem
      schema: v11
      index:
        prefix: index_
        period: 24h

storage_config:
  boltdb_shipper:
    active_index_directory: /loki/index
    cache_location: /loki/cache
  filesystem:
    directory: /loki/chunks

limits_config:
  retention_period: 168h  # 7 days
  • Create Loki config
  • Set retention period
  • Configure storage backend
  • Set appropriate limits

Promtail Configuration

File: monitoring/promtail/promtail.yml

server:
  http_listen_port: 9080

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: docker
    docker_sd_configs:
      - host: unix:///var/run/docker.sock
    relabel_configs:
      - source_labels: ['__meta_docker_container_name']
        target_label: 'container'
      - source_labels: ['__meta_docker_container_log_stream']
        target_label: 'stream'
  • Create Promtail config
  • Configure log sources (Docker, files, etc.)
  • Add labels for filtering
  • Point to Loki endpoint

Phase 7: Testing & Validation

Test Metrics Collection

# Check metrics endpoint
curl http://localhost:8500/metrics

# Verify Prometheus scraping
curl http://localhost:9090/api/v1/targets

# Query metrics
curl 'http://localhost:9090/api/v1/query?query=http_requests_total'
  • Verify /metrics endpoint works
  • Check Prometheus targets are up
  • Query metrics via API
  • Verify labels are correct

Test Logging

# Check logs in Loki
curl -G 'http://localhost:3100/loki/api/v1/query' \
  --data-urlencode 'query={job="backend"}' \
  --data-urlencode 'limit=10'
  • Verify logs appear in Loki
  • Check JSON parsing works
  • Verify labels are correct
  • Test LogQL queries

Test Alerting

# Check alert rules loaded
curl http://localhost:9090/api/v1/rules

# Check active alerts
curl http://localhost:9090/api/v1/alerts
  • Verify alert rules loaded
  • Trigger test alert (cause error)
  • Verify alert fires
  • Check alert appears in Alertmanager

Phase 8: Production Deployment

Security Checklist

  • Restrict /metrics endpoint to internal network
  • Enable authentication for Grafana
  • Use HTTPS for all dashboards
  • Rotate Grafana admin password
  • Set up RBAC for Grafana users
  • Enable audit logging

Performance Checklist

  • Set appropriate retention periods (Prometheus: 30d, Loki: 7d)
  • Configure metric cardinality limits
  • Enable query caching
  • Set memory limits for Prometheus
  • Monitor monitoring stack resource usage

Alerting Checklist

  • Configure Alertmanager receivers (Slack, PagerDuty, email)
  • Set up alert routing rules
  • Add inhibition rules (suppress noisy alerts)
  • Test alert delivery
  • Create runbooks for all critical alerts
  • Set up on-call schedule

Phase 9: Ongoing Maintenance

Daily Checks

  • Review active alerts
  • Check dashboard for anomalies
  • Verify all scrape targets are up

Weekly Checks

  • Review top 10 slowest endpoints
  • Check error rate trends
  • Review LLM cost trends
  • Update dashboards as needed

Monthly Checks

  • Review alert thresholds (tune for accuracy)
  • Clean up unused metrics
  • Update Prometheus/Grafana versions
  • Review retention policies
  • Audit dashboard access

References


Examples (2)

Orchestkit Langfuse Traces

OrchestKit Langfuse Traces - Real Implementation

This document shows how OrchestKit uses Langfuse for end-to-end LLM observability across its 8-agent LangGraph workflow.

Overview

OrchestKit Analysis Pipeline:

  • 8 specialized agents (Tech Comparator, Security Auditor, Implementation Planner, etc.)
  • LangGraph supervisor pattern for orchestration
  • Langfuse traces for cost tracking, performance monitoring, and debugging

Migration: LangSmith → Langfuse (December 2025)

  • Self-hosted, open-source, free
  • Better prompt management
  • Native cost tracking
  • Session-based grouping

Trace Architecture

Analysis Session Structure

content_analysis (session_id: analysis_550e8400)
├── fetch_content (0.3s)
│   └── metadata: {url, content_size_bytes: 45823}
├── generate_embedding (0.8s, $0.0002)
│   └── model: voyage-code-2
│   └── tokens: 11,456 input
└── supervisor_workflow (12.5s, $0.145)
    ├── supervisor_route_1 (0.1s)
    │   └── next_agent: tech_comparator
    ├── tech_comparator (2.1s, $0.018)
    │   ├── analyze_technologies (1.8s, $0.015)
    │   │   └── model: claude-sonnet-4-6
    │   │   └── tokens: 1,500 input, 1,000 output
    │   └── compress_findings (0.2s, $0.003)
    │       └── model: claude-sonnet-4-6
    │       └── tokens: 800 input, 400 output
    ├── supervisor_route_2 (0.1s)
    │   └── next_agent: security_auditor
    ├── security_auditor (2.3s, $0.021)
    │   └── ... (similar structure)
    ├── ... (6 more agents)
    └── quality_gate (1.2s, $0.012)
        ├── g_eval_completeness (0.4s, $0.004)
        ├── g_eval_accuracy (0.4s, $0.004)
        ├── g_eval_coherence (0.2s, $0.002)
        └── g_eval_depth (0.2s, $0.002)

Session Metrics:

  • Total duration: 15.4s
  • Total cost: $0.147
  • Agents executed: 8
  • Quality scores: completeness=0.85, accuracy=0.92, coherence=0.88, depth=0.78

Implementation Examples

1. Workflow-Level Tracing

File: backend/app/domains/analysis/workflows/content_analysis.py

from langfuse import observe, get_client
from app.shared.services.langfuse.client import langfuse_client

@observe(name="content_analysis_workflow")
async def run_content_analysis(analysis_id: str, url: str) -> AnalysisResult:
    """Analyze content with 8-agent supervisor workflow."""

    # Set session-level metadata
    get_client().update_current_trace(
        name="content_analysis",
        session_id=f"analysis_{analysis_id}",
        user_id="system",
        metadata={
            "analysis_id": analysis_id,
            "url": url,
            "workflow_type": "8-agent-supervisor",
            "version": "1.0.0"
        },
        tags=["production", "orchestkit", "langgraph"]
    )

    # Step 1: Fetch content (nested span)
    content = await fetch_content(url)  # @observe decorated

    # Step 2: Generate embedding (nested span with cost tracking)
    embedding = await generate_embedding(content)  # @observe decorated

    # Step 3: Run supervisor workflow (8 agents in parallel/sequential)
    findings = await run_supervisor_workflow(content)

    # Track total cost
    total_cost = sum(f.cost_usd for f in findings)
    get_client().update_current_observation(
        metadata={
            "total_agents": len(findings),
            "total_cost_usd": total_cost,
            "total_tokens": sum(f.token_count for f in findings)
        }
    )

    return AnalysisResult(findings=findings, total_cost=total_cost)

2. Agent-Level Tracing

File: backend/app/domains/analysis/workflows/nodes/agent_node.py

@observe(name="agent_execution")
async def execute_agent(
    agent_type: str,
    content: str,
    state: AnalysisState
) -> Finding:
    """Execute single agent with Langfuse tracing."""

    # Set agent-specific context
    get_client().update_current_observation(
        name=f"agent_{agent_type}",
        metadata={
            "agent_type": agent_type,
            "content_length": len(content),
            "correlation_id": state["correlation_id"]
        }
    )

    # Call LLM with automatic cost tracking
    response = await call_llm_with_tracing(
        agent_type=agent_type,
        content=content,
        state=state
    )

    # Score the response
    quality_scores = await score_agent_output(agent_type, response)

    # Add scores to trace
    for criterion, score in quality_scores.items():
        get_client().score(
            name=f"{agent_type}_{criterion}",
            value=score,
            data_type="NUMERIC"
        )

    return response

3. LLM Call Tracing with Cost Tracking

File: backend/app/shared/services/llm/anthropic_client.py

from langfuse import observe, get_client

@observe(name="llm_call")
async def call_anthropic(
    messages: list[dict],
    model: str = "claude-sonnet-4-6",
    **kwargs
) -> str:
    """Call Anthropic with automatic Langfuse cost tracking."""

    # Log input (truncated for large prompts)
    get_client().update_current_observation(
        input=str(messages)[:2000],
        model=model,
        metadata={
            "temperature": kwargs.get("temperature", 1.0),
            "max_tokens": kwargs.get("max_tokens", 4096)
        }
    )

    # Call Anthropic API
    response = await anthropic_client.messages.create(
        model=model,
        messages=messages,
        **kwargs
    )

    # Extract token usage
    input_tokens = response.usage.input_tokens
    output_tokens = response.usage.output_tokens

    # Cost calculation (Claude Sonnet 4.5 pricing)
    input_cost = (input_tokens / 1_000_000) * 3.00   # $3/MTok
    output_cost = (output_tokens / 1_000_000) * 15.00  # $15/MTok
    total_cost = input_cost + output_cost

    # Log output and costs to Langfuse
    get_client().update_current_observation(
        output=response.content[0].text[:2000],
        usage={
            "input": input_tokens,
            "output": output_tokens,
            "unit": "TOKENS"
        },
        metadata={
            "cost_usd": total_cost,
            "input_cost_usd": input_cost,
            "output_cost_usd": output_cost,
            "prompt_caching_enabled": kwargs.get("cache_control") is not None
        }
    )

    logger.info("llm_call_completed",
        model=model,
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        cost_usd=total_cost,
        cache_enabled=kwargs.get("cache_control") is not None
    )

    return response.content[0].text

4. Quality Gate Evaluation Tracing

File: backend/app/workflows/nodes/quality_gate_node.py

@observe(name="quality_gate")
async def quality_gate_node(state: AnalysisState) -> AnalysisState:
    """Evaluate aggregated findings with G-Eval scoring."""

    get_client().update_current_observation(
        metadata={
            "findings_count": len(state["findings"]),
            "analysis_id": state["analysis_id"]
        }
    )

    # Run G-Eval for 4 criteria in parallel
    criteria = ["completeness", "accuracy", "coherence", "depth"]

    scores = await asyncio.gather(*[
        evaluate_criterion(criterion, state["findings"])
        for criterion in criteria
    ])

    # Log individual criterion scores
    score_dict = {}
    for criterion, score in zip(criteria, scores):
        score_dict[criterion] = score
        get_client().score(
            name=f"quality_{criterion}",
            value=score,
            comment=f"G-Eval score for {criterion} criterion"
        )

    # Overall quality score (weighted average)
    overall_quality = (
        score_dict["completeness"] * 0.3 +
        score_dict["accuracy"] * 0.3 +
        score_dict["coherence"] * 0.2 +
        score_dict["depth"] * 0.2
    )

    get_client().score(
        name="quality_overall",
        value=overall_quality,
        comment="Weighted average of all criteria"
    )

    state["quality_scores"] = score_dict
    state["overall_quality"] = overall_quality

    return state

Real Metrics from Production

Cost Breakdown by Agent

Langfuse Query (Last 30 Days):

SELECT
    metadata->>'agent_type' as agent,
    COUNT(*) as executions,
    AVG(calculated_total_cost) as avg_cost,
    SUM(calculated_total_cost) as total_cost,
    AVG(input_tokens) as avg_input_tokens,
    AVG(output_tokens) as avg_output_tokens
FROM traces
WHERE metadata->>'agent_type' IS NOT NULL
    AND timestamp > NOW() - INTERVAL '30 days'
GROUP BY agent
ORDER BY total_cost DESC;

Results:

AgentExecutionsAvg CostTotal CostAvg InputAvg Output
security_auditor145$0.021$3.051,8001,200
implementation_planner145$0.019$2.761,6001,100
tech_comparator145$0.018$2.611,5001,000
performance_analyzer145$0.017$2.471,400950
quality_gate145$0.012$1.741,000600
architecture_reviewer145$0.015$2.181,300900
testing_strategist145$0.014$2.031,200850
documentation_expert145$0.013$1.891,100800

Insights:

  • Security Auditor is most expensive (detailed vulnerability analysis)
  • Quality Gate is cheapest (focused evaluation)
  • Total monthly cost: $18.73 (145 analyses)
  • Average per analysis: $0.129

Cache Hit Impact

Before Caching (Dec 2024):

  • Monthly cost: $35,000 (projected annual: $420k)
  • Average latency: 2.1s per LLM call

After Multi-Level Caching (Jan 2025):

  • L1 (Prompt Cache): 90% hit rate → $31,500 saved (90% savings on cache hits)
  • L2 (Semantic Cache): 75% hit rate on L1 misses → $2,625 saved (85% savings)
  • Final monthly cost: $875
  • Total savings: 97.5%
  • Average latency: 5-10ms (semantic cache hit)

Langfuse Cache Analytics:

-- Cache hit rate by agent
SELECT
    metadata->>'agent_type' as agent,
    COUNT(*) FILTER (WHERE metadata->>'cache_hit' = 'true') as cache_hits,
    COUNT(*) as total_calls,
    ROUND(100.0 * COUNT(*) FILTER (WHERE metadata->>'cache_hit' = 'true') / COUNT(*), 2) as hit_rate_pct
FROM traces
WHERE metadata->>'agent_type' IS NOT NULL
GROUP BY agent
ORDER BY hit_rate_pct DESC;

Results:

AgentCache HitsTotal CallsHit Rate
tech_comparator13314591.7%
performance_analyzer12814588.3%
testing_strategist12514586.2%
security_auditor5814540.0%

Why security_auditor has low cache hit rate:

  • Unique vulnerabilities per codebase
  • Security context is highly specific
  • Opportunity: Implement vulnerability pattern caching

Dashboard Queries

Top 10 Most Expensive Analyses

SELECT
    name,
    session_id,
    calculated_total_cost as cost_usd,
    timestamp,
    metadata->>'url' as analyzed_url,
    metadata->>'total_agents' as agents_executed
FROM traces
WHERE name = 'content_analysis_workflow'
ORDER BY calculated_total_cost DESC
LIMIT 10;

Quality Trend Over Time

SELECT
    DATE(timestamp) as date,
    AVG(value) FILTER (WHERE name = 'quality_completeness') as avg_completeness,
    AVG(value) FILTER (WHERE name = 'quality_accuracy') as avg_accuracy,
    AVG(value) FILTER (WHERE name = 'quality_coherence') as avg_coherence,
    AVG(value) FILTER (WHERE name = 'quality_depth') as avg_depth,
    AVG(value) FILTER (WHERE name = 'quality_overall') as avg_overall
FROM scores
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp)
ORDER BY date;

Results (Last 30 Days):

DateCompletenessAccuracyCoherenceDepthOverall
2025-01-150.830.910.870.760.84
2025-01-160.850.920.880.780.86
2025-01-170.840.900.860.750.84

Trend: Quality scores stable, depth improving (+4% since truncation fix)

Slow Trace Detection

-- Find traces slower than 2 standard deviations
WITH stats AS (
    SELECT
        AVG(latency_seconds) as mean,
        STDDEV(latency_seconds) as stddev
    FROM traces
    WHERE name = 'content_analysis_workflow'
)
SELECT
    t.session_id,
    t.latency_seconds,
    t.metadata->>'url' as url,
    t.timestamp
FROM traces t, stats s
WHERE t.name = 'content_analysis_workflow'
    AND t.latency_seconds > (s.mean + 2 * s.stddev)
ORDER BY t.latency_seconds DESC
LIMIT 20;

Best Practices from OrchestKit

  1. Always use @observe decorator - Automatic parent-child span relationships
  2. Set session_id for multi-step workflows - Group related traces together
  3. Tag production vs staging - Filter by environment
  4. Add agent_type to metadata - Enable cost/performance analysis by agent
  5. Log truncated inputs/outputs - Keep traces small (2000 chars max)
  6. Score all quality metrics - Enable quality trend monitoring
  7. Track cache_hit in metadata - Measure caching effectiveness
  8. Use correlation_id across services - Link to application logs

References

Orchestkit Monitoring Dashboard

OrchestKit Monitoring Dashboard - Real Implementation

This document shows OrchestKit's actual monitoring setup including metrics, dashboards, and alerting rules.

Overview

OrchestKit Monitoring Stack:

  • Logs: Structlog (JSON) → Loki
  • Metrics: Prometheus (RED + business metrics)
  • Traces: Langfuse (LLM observability)
  • Dashboards: Grafana
  • Alerts: Prometheus Alertmanager → Slack

Key Metrics:

  • LLM costs: $35k/year → $2-5k/year (95% reduction via caching)
  • Retrieval pass rate: 91.6% (target: >90%)
  • Quality gate pass rate: 85% (target: >80%)
  • Hybrid search latency: 5ms (HNSW index)

Dashboard Structure

1. Service Overview Dashboard

Top Row - Golden Signals:

┌──────────────┬──────────────┬──────────────┬──────────────┐
│  Latency     │  Traffic     │  Errors      │  Saturation  │
│  p50: 245ms  │  12.5 req/s  │  0.3% (5xx)  │  CPU: 45%    │
│  p95: 680ms  │  (stable)    │  (good)      │  Mem: 62%    │
│  p99: 1.2s   │              │              │  (healthy)   │
└──────────────┴──────────────┴──────────────┴──────────────┘

Prometheus Queries:

# p95 latency
histogram_quantile(0.95,
  rate(http_request_duration_seconds_bucket[5m])
)

# Request rate
sum(rate(http_requests_total[5m]))

# Error rate (5xx)
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m]))

# CPU saturation
avg(rate(process_cpu_seconds_total[5m])) * 100

2. LLM Observability Dashboard

Metrics Tracked:

  • Cost per model (Claude, Gemini, Voyage)
  • Token usage (input/output)
  • Cache hit rates (L1: Prompt Cache, L2: Semantic Cache)
  • LLM latency distribution

Cost Breakdown Panel:

# Total cost per day by model
sum(increase(llm_cost_dollars_total[1d])) by (model)

# Cost per operation
sum(increase(llm_cost_dollars_total[1h])) by (operation)

Example Results:

ModelDaily CostMonthly (Projected)
claude-sonnet-4-6$5.20$156
gemini-3-flash$1.80$54
voyage-code-2$0.40$12
Total$7.40$222

Cache Performance Panel:

# Cache hit rate
sum(rate(cache_operations_total{result="hit"}[5m])) /
sum(rate(cache_operations_total[5m]))

# Cost savings from cache (estimated)
sum(rate(cache_operations_total{result="hit"}[5m])) *
avg_over_time(llm_cost_dollars_total[1h])

Results:

Cache LevelHit RateDaily Savings
L1 (Prompt Cache)90%$90
L2 (Semantic Cache)75%$21
Total Savings-$111/day

3. Quality Metrics Dashboard

Panels:

  1. Quality gate pass rate (target: >80%)
  2. G-Eval scores by criterion (completeness, accuracy, coherence, depth)
  3. Failed analyses count
  4. Quality score distribution

Quality Gate Pass Rate:

# Pass rate over last 24h
sum(rate(quality_gate_passed_total[24h])) /
sum(rate(quality_gate_total[24h]))

G-Eval Scores (from Langfuse):

-- Track quality trends
SELECT
    DATE(timestamp) as date,
    AVG(value) FILTER (WHERE name = 'quality_completeness') as completeness,
    AVG(value) FILTER (WHERE name = 'quality_accuracy') as accuracy,
    AVG(value) FILTER (WHERE name = 'quality_coherence') as coherence,
    AVG(value) FILTER (WHERE name = 'quality_depth') as depth
FROM langfuse.scores
WHERE timestamp > NOW() - INTERVAL '7 days'
GROUP BY DATE(timestamp);

Example Results:

DateCompletenessAccuracyCoherenceDepthOverall
2025-01-200.850.920.880.780.86
2025-01-210.830.910.870.760.84

4. Database Performance Dashboard

Panels:

  1. Query latency (p50/p95/p99)
  2. Connection pool usage
  3. Slow queries (>500ms)
  4. Cache hit ratio

Query Latency:

# p95 query latency
histogram_quantile(0.95,
  rate(db_query_duration_seconds_bucket[5m])
) by (query_type)

Connection Pool:

# Active connections
db_connections_active

# Connection pool saturation
db_connections_active / db_connections_max

Real Metrics:

MetricValueTarget
p50 query latency8ms<100ms
p95 query latency45ms<500ms
Active connections12<20
Pool saturation60%<80%

5. Retrieval Quality Dashboard

Metrics from Golden Dataset (98 analyses, 415 chunks):

Pass Rate:

# Retrieval pass rate (expected chunk in top-k)
sum(retrieval_pass_total) / sum(retrieval_total)

Results: 186/203 queries passed = 91.6% pass rate (target: >90%)

MRR by Difficulty:

-- Mean Reciprocal Rank by query difficulty
SELECT
    difficulty,
    COUNT(*) as queries,
    AVG(mrr) as avg_mrr
FROM retrieval_evaluation
GROUP BY difficulty;

Results:

DifficultyQueriesMRRPass Rate
Easy780.89296.2%
Medium890.74591.0%
Hard360.68683.3%
Overall2030.77791.6%

Search Latency:

# Hybrid search latency (HNSW + BM25 RRF)
histogram_quantile(0.95,
  rate(search_duration_seconds_bucket[5m])
)

Results:

Operationp50p95p99
Vector search (HNSW)3ms5ms8ms
BM25 search4ms7ms12ms
RRF fusion1ms2ms3ms
Total hybrid search8ms14ms23ms

Comparison to IVFFlat:

  • HNSW: 5ms
  • IVFFlat: 85ms
  • Speedup: 17x faster

Structured Logging Examples

Log Format

OrchestKit uses structlog with JSON output:

{
  "event": "supervisor_routing",
  "level": "info",
  "timestamp": "2025-01-21T10:30:45.123Z",
  "correlation_id": "abc-123-def",
  "analysis_id": "550e8400-e29b-41d4-a716-446655440000",
  "workflow_step": "supervisor",
  "agent": "tech_comparator",
  "remaining_agents": 7,
  "content_length": 45823,
  "logger": "app.workflows.supervisor"
}

Key Log Events

1. Analysis Started:

{
  "event": "analysis_started",
  "level": "info",
  "analysis_id": "550e8400-...",
  "url": "https://example.com/article",
  "content_type": "article"
}

2. Agent Execution:

{
  "event": "agent_execution_started",
  "level": "info",
  "agent_type": "security_auditor",
  "correlation_id": "abc-123-def",
  "analysis_id": "550e8400-..."
}

3. LLM Call:

{
  "event": "llm_call_completed",
  "level": "info",
  "model": "claude-sonnet-4-6",
  "operation": "security_audit",
  "input_tokens": 1800,
  "output_tokens": 1200,
  "cost_dollars": 0.021,
  "duration_seconds": 2.3,
  "cache_hit": false
}

4. Quality Gate:

{
  "event": "quality_gate_passed",
  "level": "info",
  "analysis_id": "550e8400-...",
  "quality_scores": {
    "completeness": 0.85,
    "accuracy": 0.92,
    "coherence": 0.88,
    "depth": 0.78
  },
  "overall_quality": 0.86,
  "passed": true
}

5. Error Logging:

{
  "event": "analysis_failed",
  "level": "error",
  "analysis_id": "550e8400-...",
  "error_type": "ValidationError",
  "error_message": "Quality gate failed: depth score too low",
  "quality_scores": {
    "depth": 0.45
  },
  "traceback": "...",
  "correlation_id": "abc-123-def"
}

Loki Queries (LogQL)

Find all errors in last hour:

{app="orchestkit-backend"} |= "ERROR" | json

Count errors by endpoint:

sum by (endpoint) (
  count_over_time({app="orchestkit-backend"} |= "ERROR" [5m])
)

Search for specific analysis:

{app="orchestkit-backend"}
| json
| analysis_id="550e8400-e29b-41d4-a716-446655440000"

p95 LLM latency from logs:

quantile_over_time(0.95,
  {app="orchestkit-backend"}
  | json
  | event="llm_call_completed"
  | unwrap duration_seconds [5m]
)

Alerting Rules

1. Service Availability

File: monitoring/prometheus/alerts/service.yml

groups:
- name: service-health
  interval: 30s
  rules:
  - alert: ServiceDown
    expr: up == 0
    for: 1m
    labels:
      severity: critical
      team: platform
    annotations:
      summary: "Service {{ $labels.job }} is down"
      description: "{{ $labels.instance }} has been down for 1 minute"
      runbook_url: "https://wiki.orchestkit.dev/runbooks/service-down"

  - alert: HighErrorRate
    expr: |
      sum(rate(http_requests_total{status=~"5.."}[5m])) /
      sum(rate(http_requests_total[5m])) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "High error rate detected"
      description: "Error rate is {{ $value | humanizePercentage }} (threshold: 5%)"

2. LLM Cost Alerts

File: monitoring/prometheus/alerts/llm-cost.yml

groups:
- name: llm-costs
  interval: 1h
  rules:
  - alert: DailyCostExceeded
    expr: |
      sum(increase(llm_cost_dollars_total[24h])) > 20
    labels:
      severity: high
      team: ai-ml
    annotations:
      summary: "Daily LLM cost exceeded $20"
      description: "Current daily cost: ${{ $value }}"

  - alert: UnexpectedCostSpike
    expr: |
      sum(rate(llm_cost_dollars_total[1h])) >
      sum(rate(llm_cost_dollars_total[1h] offset 24h)) * 2
    for: 2h
    labels:
      severity: high
    annotations:
      summary: "LLM cost spike detected"
      description: "Current hourly cost is 2x yesterday's average"

3. Quality Degradation

File: monitoring/prometheus/alerts/quality.yml

groups:
- name: quality-metrics
  interval: 5m
  rules:
  - alert: LowQualityGatePassRate
    expr: |
      sum(rate(quality_gate_passed_total[1h])) /
      sum(rate(quality_gate_total[1h])) < 0.80
    for: 30m
    labels:
      severity: high
      team: ml
    annotations:
      summary: "Quality gate pass rate below 80%"
      description: "Current pass rate: {{ $value | humanizePercentage }}"

  - alert: CacheHitRateDegraded
    expr: |
      sum(rate(cache_operations_total{result="hit"}[30m])) /
      sum(rate(cache_operations_total[30m])) < 0.70
    for: 1h
    labels:
      severity: medium
    annotations:
      summary: "Cache hit rate below 70%"
      description: "Cache performance degraded: {{ $value | humanizePercentage }}"

4. Database Performance

File: monitoring/prometheus/alerts/database.yml

groups:
- name: database-performance
  interval: 1m
  rules:
  - alert: SlowQueries
    expr: |
      histogram_quantile(0.95,
        rate(db_query_duration_seconds_bucket[5m])
      ) > 0.5
    for: 10m
    labels:
      severity: high
    annotations:
      summary: "p95 query latency exceeded 500ms"
      description: "Current p95: {{ $value }}s"

  - alert: ConnectionPoolExhausted
    expr: db_connections_active / db_connections_max > 0.9
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Database connection pool near capacity"
      description: "{{ $value | humanizePercentage }} of connections in use"

Alert Routing & Escalation

File: monitoring/alertmanager/config.yml

route:
  group_by: ['alertname', 'cluster', 'service']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  receiver: slack-default

  routes:
  # Critical alerts → Slack + PagerDuty
  - match:
      severity: critical
    receiver: pagerduty-critical
    continue: true  # Also send to Slack

  # High severity → Slack
  - match:
      severity: high
    receiver: slack-high

  # Medium/low → Slack (throttled)
  - match_re:
      severity: (medium|low)
    receiver: slack-low
    group_interval: 1h

receivers:
- name: slack-default
  slack_configs:
  - api_url: <slack_webhook_url>
    channel: '#alerts'
    title: '{{ .GroupLabels.alertname }}'
    text: '{{ range .Alerts }}{{ .Annotations.summary }}\n{{ end }}'

- name: pagerduty-critical
  pagerduty_configs:
  - service_key: <pagerduty_service_key>

Health Check Endpoints

1. Liveness Probe

Endpoint: GET /health Purpose: Is the application running?

@app.get("/health")
async def health_check():
    """Basic liveness check."""
    return {"status": "healthy"}

2. Readiness Probe

Endpoint: GET /ready Purpose: Is the application ready to serve traffic?

@app.get("/ready")
async def readiness_check():
    """Check if app can handle requests."""

    checks = {}

    # Database check
    try:
        await db.execute("SELECT 1")
        checks["database"] = {"status": "pass", "latency_ms": 5}
    except Exception as e:
        checks["database"] = {"status": "fail", "error": str(e)}

    # Redis check
    try:
        await redis.ping()
        checks["redis"] = {"status": "pass", "latency_ms": 2}
    except Exception as e:
        checks["redis"] = {"status": "fail", "error": str(e)}

    # Overall status
    all_healthy = all(c["status"] == "pass" for c in checks.values())
    status = "healthy" if all_healthy else "degraded"

    return {
        "status": status,
        "checks": checks,
        "version": "1.0.0",
        "uptime": int(time.time() - app.start_time)
    }

Response:

{
  "status": "healthy",
  "checks": {
    "database": {"status": "pass", "latency_ms": 5},
    "redis": {"status": "pass", "latency_ms": 2}
  },
  "version": "1.0.0",
  "uptime": 3600
}

References

Edit on GitHub

Last updated on

On this page

Monitoring & ObservabilityQuick ReferenceQuick StartInfrastructure MonitoringLLM ObservabilityDrift DetectionSilent FailuresKey DecisionsDetailed DocumentationRelated SkillsRules (12)Configure drift alert thresholds and correlation to avoid noise and missed issues — HIGHDrift AlertingDynamic ThresholdsCorrelation with Performance MetricsAnti-PatternsAlert Priority RulesDrift Alert PipelineNotification StrategyKey DecisionsDetect LLM output quality regression before users notice production degradation — HIGHQuality Drift DetectionLangfuse Score Trend MonitoringCanary Prompt MonitoringEmbedding Drift (Centroid Monitoring)Cluster-Based Drift DetectionMulti-Metric Quality TrackerRAG Retrieval DriftKey DecisionsApply statistical methods to detect distribution shifts in LLM inputs and outputs — HIGHStatistical Drift DetectionPopulation Stability Index (PSI)PSI Threshold GuidelinesKolmogorov-Smirnov TestEWMA Dynamic ThresholdMethod ComparisonChoosing the Right MethodCombined Drift ScoreEWMA Alpha SelectionTrack LLM token costs with spend alerts and per-operation cost attribution — HIGHLLM Cost TrackingBasic Cost Tracking (Langfuse v3)Custom Model PricingCost Per AnalysisSpend AlertsVia Langfuse UIVia Metrics APIv2 Metrics API QueriesPrometheus LLM Cost MetricsMonitoring Dashboard SQL QueriesBest PracticesScore LLM evaluations systematically for quality monitoring and regression detection — HIGHLLM Evaluation ScoringBasic Scoring (Langfuse v3)Score TypesEvaluator Execution TracingG-Eval Automated ScoringCommon Evaluation MetricsQuality Gate IntegrationScore Trend QueryBest PracticesTrace LLM call chains with Langfuse for debugging slow or incorrect responses — HIGHLangfuse TracesBasic Tracing with @observe (v3)Nested SpansSession & User TrackingObservation Types for Agent GraphsOpenTelemetry SpanProcessorJavaScript/TypeScript SetupBest PracticesDefine effective alerting rules to prevent alert fatigue and missed incidents — CRITICALAlerting RulesAlert Severity LevelsKey AlertsPrometheus Alerting RulesAlert GroupingInhibition RulesEscalation PoliciesRunbook RequirementsAlert Fatigue PreventionNotification ChannelsDesign Grafana dashboards for actionable incident response and capacity planning — CRITICALGrafana DashboardsThe Four Golden SignalsDashboard Layout (Top Row)Service Dashboard StructureRED Metrics for DashboardsUSE Metrics for ResourcesSLO/SLI DefinitionsService Level Indicators (SLIs)Service Level Objectives (SLOs)Health Checks (Kubernetes)Dashboard Best PracticesInstrument Prometheus metrics as the foundation for alerting and dashboard observability — CRITICALPrometheus MetricsRED Method (Rate, Errors, Duration)Metric TypesCounter — Monotonically increasing valueGauge — Value that can go up or downHistogram — Distribution of values (with buckets)Histogram vs SummaryRecommended Bucket ConfigurationsCardinality ManagementCustom Business MetricsPromQL Quick ReferenceAlert on silent failures using statistical baselines and proactive health monitoring — HIGHSilent Failure AlertingDetect silent quality degradation in agent outputs that pass basic error checks — HIGHSilent Quality Degradation DetectionDetect when agents silently skip expected tool calls and produce incorrect results — CRITICALSilent Tool Skipping DetectionReferences (23)Agent ObservabilityAgent ObservabilityAgent Graphs (GA Nov 2025)Enabling Agent GraphsResult in Langfuse UI — Agent Graph ViewNew Observation TypesUsing Observation TypesRendered Tool CallsTrace Log ViewFramework Integration ExamplesLangGraphCrewAIOpenAI Agents SDKBest PracticesReferencesAlerting DashboardsAlerting and DashboardsAlert Severity LevelsKey AlertsAlert GroupingInhibition RulesEscalation PoliciesRunbook LinksDashboard Design PrinciplesGolden Signals Dashboard (top row)Service Dashboard StructureRED Metrics for DashboardsUSE Metrics for ResourcesSLO/SLI DashboardsNotification ChannelsAlerting StrategiesAlerting StrategiesAlerting Rules (Prometheus)Alert Severity LevelsBest PracticesNotification ChannelsAnnotation QueuesAnnotation QueuesWhat Are Annotation Queues?Creating Queues via Langfuse UIAdding Traces to a Queue ProgrammaticallyHuman-Review WorkflowFetching Completed AnnotationsLink to Golden Dataset CurationReferencesCost TrackingToken & Cost TrackingBasic Cost Tracking (v3)Pricing Database (Auto-Updated)Cost Tracking Per AnalysisSpend AlertsIn Langfuse UIVia APIv2 Metrics API (Beta)Monitoring Dashboard QueriesTop 10 Most Expensive Traces (Last 7 Days)Average Cost by Agent TypeDaily Cost TrendBest PracticesReferencesDashboardsMonitoring DashboardsThe Four Golden SignalsSLO/SLI ExamplesGrafana Dashboard StructureBest PracticesDistributed TracingDistributed TracingBasic Setup (Node.js)Span RelationshipsTrace Sampling StrategiesContext PropagationTrace Analysis QueriesBest PracticesEmbedding DriftEmbedding Drift DetectionOverviewArize Phoenix IntegrationCluster-Based Drift DetectionCentroid Distance MonitoringCosine Similarity DriftRAG Retrieval DriftEvidently AI IntegrationReferencesEvaluation ScoresLLM Evaluation & ScoringBasic Scoring (v3)Evaluator Execution TracingScore AnalyticsMutable Score ConfigsAutomated Scoring with G-EvalQuality Scores Trend QueryDatasets for EvaluationDataset Structure in UIEvaluation MetricsBest PracticesIntegration with OrchestKit Quality GateReferencesEwma BaselinesEWMA Dynamic BaselinesBasic EWMAMulti-Metric TrackerAlpha SelectionReferencesExperiments ApiLangfuse Experiments APIOverviewExperiment Runner SDK (v3)Creating DatasetsFrom CodeFrom Existing TracesDataset Item VersioningJSON Schema EnforcementDataset Folder OrganizationBatch Add Observations to DatasetsCorrected Outputs for Fine-TuningRunning Experiments (Manual)Basic ExperimentA/B Testing ModelsExperiment Compare ViewOrchestKit IntegrationGolden Dataset ExperimentPrompt Variant TestingViewing ResultsLangfuse DashboardExport ResultsBest PracticesFramework IntegrationsFramework IntegrationsClaude Agent SDKTrace OutputOpenAI Agents SDKSetupPydantic AICrewAISetupLiveKit AgentsSetupAmazon Bedrock AgentCoreSetupJavaScript/TypeScript IntegrationIntegration MatrixReferencesLangfuse Evidently IntegrationLangfuse + Evidently AI IntegrationExport Langfuse DataEvidently Drift ReportAutomated MonitoringReferencesLogging PatternsLogging PatternsCorrelation IDsLog SamplingLog Aggregation with LokiOrchestKit Logging ExampleMetrics CollectionMetrics CollectionMetric Types1. Counter - Monotonically increasing value (resets to 0 on restart)2. Gauge - Value that can go up or down3. Histogram - Distribution of values (with buckets)4. Summary - Like Histogram but calculates quantiles on client sideCardinality ManagementCustom Business MetricsLLM Cost Tracking ExampleMigration V2 V3Migration Guide: v2 → v3 (Python) / v3 → v4 (JS)Python SDK: v2 → v3Import ChangesAPI ChangesTrace ID FormatMigration ExampleExplicit Trace Creation — Before/AfterLow-Level Client Still WorksJavaScript/TypeScript SDK: v3 → v4Package ChangesSetup ChangesTracing ChangesSelf-Hosting v3 ArchitectureComponent RolesDocker Compose (v3)Helm ChartClickHouse Acquisition (Jan 16, 2026)Breaking Changes ChecklistPython v2 → v3JavaScript v3 → v4ReferencesMulti Judge EvaluationMulti-Judge Evaluation with LangfuseOverviewG-Eval Criteria (Built into OrchestKit)Evaluator Execution Tracing (v3)Existing OrchestKit Evaluators (v3 Updated)Wiring Evaluators to WorkflowOption 1: Quality Gate IntegrationOption 2: Experiment Runner IntegrationBest Practices1. Use Multiple Independent Judges2. Log All Scores to Langfuse3. Include Ground Truth When Available4. Track Judge AgreementViewing Results in LangfuseDashboard QueriesScore VisualizationIntegration Steps for OrchestKitOnline EvaluatorsOnline EvaluatorsWhat Are Online Evaluators?Setting Up an Online Evaluator (UI)Rubric Prompt Template VariablesScoring Output FormatWhen to Use Online vs Custom Code EvaluatorsScoring Criteria ExamplesViewing Scores in DashboardsReferencesPrompt ManagementPrompt ManagementBasic Usage (v3)Prompt Versioning in UIPrompt Templates with VariablesMCP Server for Prompt ManagementSetupAvailable MCP ToolsUsage from IDEWebhooks for Prompt ChangesSetup in Langfuse UIWebhook PayloadSlack NotificationsFull-Text Search + PlaygroundA/B Testing PromptsPrompt LabelsBest PracticesOrchestKit 4-Level Prompt Caching ArchitectureL4 Jinja2 Template Fallback (Issue #414)Variable Syntax DistinctionMigration from Hardcoded Prompts (DEPRECATED)ReferencesSession TrackingSession & User TrackingSession Tracking (v3)Session View in UIUser TrackingNatural Language FilteringMetadata TrackingAnalytics QueriesPerformance by Userv2 Metrics API AlternativePerformance by Content TypeSlowest SessionsTags for FilteringBest PracticesOrchestKit Session PatternIdentifying Slow or Expensive UsersReferencesStatistical MethodsStatistical Methods for Drift DetectionMethod ComparisonPopulation Stability Index (PSI)Kolmogorov-Smirnov TestKL DivergenceJensen-Shannon DivergenceWasserstein Distance (Earth Mover's Distance)Choosing the Right MethodCombined Drift ScoreReferencesStructured LoggingStructured LoggingWhy Structured Logging?Python (structlog)Node.js (pino)Log LevelsBest PracticesTracing SetupDistributed Tracing with LangfuseBasic Usage: @observe Decorator (v3)Result in Langfuse UIW3C Trace ContextOpenTelemetry SpanProcessor SetupNew Observation Types (v3)Workflow IntegrationLangChain/LangGraph IntegrationJavaScript/TypeScript TracingBest PracticesReferencesChecklists (2)Langfuse Setup ChecklistLangfuse Setup ChecklistPrerequisitesPhase 1: Langfuse Server SetupOption A: Langfuse Cloud (Fastest)Option B: Self-Hosted (Recommended for Production)Phase 2: SDK InstallationPython (FastAPI/Flask/Django)Node.js (Express/Next.js)Phase 3: ConfigurationEnvironment VariablesApplication ConfigPhase 4: Client InitializationPython ClientNode.js ClientPhase 5: Decorator-Based TracingPython @observe DecoratorPhase 6: LLM Call InstrumentationAnthropic ClaudeOpenAIPhase 7: Quality ScoringAdd Evaluation ScoresPhase 8: Testing & ValidationTest Trace CreationVerify in Langfuse UIPhase 9: Production MonitoringCreate DashboardsSet Up Alerts (via Langfuse UI or SQL)Weekly Review ProcessPhase 10: Advanced FeaturesPrompt ManagementDataset EvaluationTroubleshootingTraces Not AppearingHigh LatencyMissing CostsReferencesMonitoring Implementation ChecklistMonitoring Implementation ChecklistPrerequisitesPhase 1: Structured LoggingPython (structlog)Node.js (winston)Phase 2: Metrics CollectionPython (prometheus-client)Node.js (prom-client)Phase 3: Prometheus SetupDocker ComposePrometheus ConfigurationPhase 4: Alerting RulesPhase 5: Grafana DashboardsDatasource ConfigurationService Overview DashboardExample Panel QueriesPhase 6: Log Aggregation (Loki)Loki ConfigurationPromtail ConfigurationPhase 7: Testing & ValidationTest Metrics CollectionTest LoggingTest AlertingPhase 8: Production DeploymentSecurity ChecklistPerformance ChecklistAlerting ChecklistPhase 9: Ongoing MaintenanceDaily ChecksWeekly ChecksMonthly ChecksReferencesExamples (2)Orchestkit Langfuse TracesOrchestKit Langfuse Traces - Real ImplementationOverviewTrace ArchitectureAnalysis Session StructureImplementation Examples1. Workflow-Level Tracing2. Agent-Level Tracing3. LLM Call Tracing with Cost Tracking4. Quality Gate Evaluation TracingReal Metrics from ProductionCost Breakdown by AgentCache Hit ImpactDashboard QueriesTop 10 Most Expensive AnalysesQuality Trend Over TimeSlow Trace DetectionBest Practices from OrchestKitReferencesOrchestkit Monitoring DashboardOrchestKit Monitoring Dashboard - Real ImplementationOverviewDashboard Structure1. Service Overview Dashboard2. LLM Observability Dashboard3. Quality Metrics Dashboard4. Database Performance Dashboard5. Retrieval Quality DashboardStructured Logging ExamplesLog FormatKey Log EventsLoki Queries (LogQL)Alerting Rules1. Service Availability2. LLM Cost Alerts3. Quality Degradation4. Database PerformanceAlert Routing & EscalationHealth Check Endpoints1. Liveness Probe2. Readiness ProbeReferences