Monitoring Observability
Monitoring and observability patterns for Prometheus metrics, Grafana dashboards, Langfuse LLM tracing, and drift detection. Use when adding logging, metrics, distributed tracing, LLM cost tracking, or quality drift monitoring.
Primary Agent: metrics-architect
Monitoring & Observability
Comprehensive patterns for infrastructure monitoring, LLM observability, and quality drift detection. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Infrastructure Monitoring | 3 | CRITICAL | Prometheus metrics, Grafana dashboards, alerting rules |
| LLM Observability | 3 | HIGH | Langfuse tracing, cost tracking, evaluation scoring |
| Drift Detection | 3 | HIGH | Statistical drift, quality regression, drift alerting |
| Silent Failures | 3 | HIGH | Tool skipping, quality degradation, loop/token spike alerting |
Total: 12 rules across 4 categories
Quick Start
# Prometheus metrics with RED method
from prometheus_client import Counter, Histogram
http_requests = Counter('http_requests_total', 'Total requests', ['method', 'endpoint', 'status'])
http_duration = Histogram('http_request_duration_seconds', 'Request latency',
buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5])# Langfuse LLM tracing
from langfuse import observe, get_client
@observe()
async def analyze_content(content: str):
get_client().update_current_trace(
user_id="user_123", session_id="session_abc",
tags=["production", "orchestkit"],
)
return await llm.generate(content)# PSI drift detection
import numpy as np
psi_score = calculate_psi(baseline_scores, current_scores)
if psi_score >= 0.25:
alert("Significant quality drift detected!")Infrastructure Monitoring
Prometheus metrics, Grafana dashboards, and alerting for application health.
| Rule | File | Key Pattern |
|---|---|---|
| Prometheus Metrics | rules/monitoring-prometheus.md | RED method, counters, histograms, cardinality |
| Grafana Dashboards | rules/monitoring-grafana.md | Golden Signals, SLO/SLI, health checks |
| Alerting Rules | rules/monitoring-alerting.md | Severity levels, grouping, escalation, fatigue prevention |
LLM Observability
Langfuse-based tracing, cost tracking, and evaluation for LLM applications.
| Rule | File | Key Pattern |
|---|---|---|
| Langfuse Traces | rules/llm-langfuse-traces.md | @observe decorator, OTEL spans, agent graphs |
| Cost Tracking | rules/llm-cost-tracking.md | Token usage, spend alerts, Metrics API |
| Eval Scoring | rules/llm-eval-scoring.md | Custom scores, evaluator tracing, quality monitoring |
Drift Detection
Statistical and quality drift detection for production LLM systems.
| Rule | File | Key Pattern |
|---|---|---|
| Statistical Drift | rules/drift-statistical.md | PSI, KS test, KL divergence, EWMA |
| Quality Drift | rules/drift-quality.md | Score regression, baseline comparison, canary prompts |
| Drift Alerting | rules/drift-alerting.md | Dynamic thresholds, correlation, anti-patterns |
Silent Failures
Detection and alerting for silent failures in LLM agents.
| Rule | File | Key Pattern |
|---|---|---|
| Tool Skipping | rules/silent-tool-skipping.md | Expected vs actual tool calls, Langfuse traces |
| Quality Degradation | rules/silent-degraded-quality.md | Heuristics + LLM-as-judge, z-score baselines |
| Silent Alerting | rules/silent-alerting.md | Loop detection, token spikes, escalation workflow |
Key Decisions
| Decision | Recommendation | Rationale |
|---|---|---|
| Metric methodology | RED method (Rate, Errors, Duration) | Industry standard, covers essential service health |
| Log format | Structured JSON | Machine-parseable, supports log aggregation |
| Tracing | OpenTelemetry | Vendor-neutral, auto-instrumentation, broad ecosystem |
| LLM observability | Langfuse (not LangSmith) | Open-source, self-hosted, built-in prompt management |
| LLM tracing API | @observe + get_client() | OTEL-native, automatic span creation |
| Drift method | PSI for production, KS for small samples | PSI is stable for large datasets, KS more sensitive |
| Threshold strategy | Dynamic (95th percentile) over static | Reduces alert fatigue, context-aware |
| Alert severity | 4 levels (Critical, High, Medium, Low) | Clear escalation paths, appropriate response times |
Detailed Documentation
| Resource | Description |
|---|---|
| references/ | Logging, metrics, tracing, Langfuse, drift analysis guides |
| checklists/ | Implementation checklists for monitoring and Langfuse setup |
| examples/ | Real-world monitoring dashboard and trace examples |
| scripts/ | Templates: Prometheus, OpenTelemetry, health checks, Langfuse |
Related Skills
defense-in-depth- Layer 8 observability as part of security architecturedevops-deployment- Observability integration with CI/CD and Kubernetesresilience-patterns- Monitoring circuit breakers and failure scenariosllm-evaluation- Evaluation patterns that integrate with Langfuse scoringcaching- Caching strategies that reduce costs tracked by Langfuse
Rules (12)
Configure drift alert thresholds and correlation to avoid noise and missed issues — HIGH
Drift Alerting
Dynamic Thresholds
Use the 95th percentile of historical PSI values instead of static thresholds:
import numpy as np
def calculate_dynamic_threshold(historical_psi: list[float], percentile: float = 95) -> float:
"""Calculate dynamic threshold from historical PSI values."""
return np.percentile(historical_psi, percentile)
# Usage
threshold = calculate_dynamic_threshold(last_30_days_psi)
if current_psi > threshold:
alert("Drift exceeds dynamic threshold")Correlation with Performance Metrics
Always correlate drift metrics with actual performance before alerting:
async def correlated_drift_alert(
psi_score: float,
psi_threshold: float,
quality_score: float,
quality_baseline: float,
):
"""Only alert when drift AND quality drop are both confirmed."""
drift_detected = psi_score > psi_threshold
quality_drop = quality_score < quality_baseline * 0.9 # 10% drop
if drift_detected and quality_drop:
await trigger_alert(
severity="high",
message=f"Distribution drift (PSI={psi_score:.3f}) "
f"correlated with quality drop ({quality_score:.2f} < {quality_baseline:.2f})",
)
await trigger_evaluation()
elif drift_detected:
await log_warning(
f"Distribution drift detected (PSI={psi_score:.3f}) "
f"but quality stable ({quality_score:.2f})"
)Anti-Patterns
# NEVER use static thresholds without context
if psi > 0.2: # May cause alert fatigue
alert()
# NEVER retrain on time schedule alone
schedule.every(7).days.do(retrain) # Wasteful if no drift
# ALWAYS use dynamic thresholds
threshold = np.percentile(historical_psi, 95)
if psi > threshold:
alert()
# ALWAYS correlate with performance metrics
if psi > threshold and quality_score < baseline:
trigger_evaluation()Alert Priority Rules
| Condition | Priority | Action |
|---|---|---|
| PSI >= 0.25 AND quality drop > 10% | Critical | Immediate investigation |
| PSI >= 0.25, quality stable | Medium | Monitor, log warning |
| PSI 0.1-0.25 AND quality drop > 5% | High | Investigate within 4 hours |
| PSI 0.1-0.25, quality stable | Low | Review next sprint |
| PSI < 0.1 | None | Continue monitoring |
Drift Alert Pipeline
class DriftAlertPipeline:
"""Complete drift detection and alerting pipeline."""
def __init__(self, psi_threshold: float = 0.25, quality_threshold: float = 0.7):
self.psi_threshold = psi_threshold
self.quality_threshold = quality_threshold
self.historical_psi = []
async def run(self, baseline_data, current_data, quality_scores):
# 1. Calculate drift
psi = calculate_psi(baseline_data, current_data)
self.historical_psi.append(psi)
# 2. Dynamic threshold (use after enough history)
if len(self.historical_psi) >= 30:
threshold = np.percentile(self.historical_psi, 95)
else:
threshold = self.psi_threshold
# 3. Check quality correlation
avg_quality = np.mean(quality_scores) if quality_scores else 1.0
# 4. Alert based on correlation
if psi > threshold and avg_quality < self.quality_threshold:
return {"alert": "critical", "psi": psi, "quality": avg_quality}
elif psi > threshold:
return {"alert": "warning", "psi": psi, "quality": avg_quality}
return {"alert": None, "psi": psi, "quality": avg_quality}Notification Strategy
| Severity | Channel | Frequency |
|---|---|---|
| Critical (drift + quality drop) | PagerDuty + Slack | Immediate |
| Warning (drift, quality stable) | Slack | Daily digest |
| Info (moderate drift) | Log | Continuous |
Key Decisions
| Decision | Recommendation |
|---|---|
| Threshold strategy | Dynamic (95th percentile of historical) over static |
| Alert priority | Performance metrics > distribution metrics |
| Correlation | Always confirm drift + quality drop before critical alerts |
| Historical window | 30+ days for reliable dynamic thresholds |
| Tool stack | Langfuse (traces) + Evidently/Phoenix (drift analysis) |
Incorrect — alerting on drift without quality correlation:
def check_drift(psi_score: float):
if psi_score > 0.2: # Static threshold, no quality check
send_alert("CRITICAL: Drift detected") # False alarmsCorrect — correlating drift with quality before alerting:
async def check_drift(psi_score: float, quality_score: float, baseline: float):
threshold = np.percentile(historical_psi, 95) # Dynamic
if psi_score > threshold and quality_score < baseline * 0.9:
send_alert("CRITICAL: Drift + quality drop") # High confidence
elif psi_score > threshold:
log_warning("Drift detected but quality stable") # Monitor onlyDetect LLM output quality regression before users notice production degradation — HIGH
Quality Drift Detection
Langfuse Score Trend Monitoring
from langfuse import Langfuse
import numpy as np
from datetime import datetime, timedelta
langfuse = Langfuse()
def check_quality_drift(days: int = 7, threshold_drop: float = 0.1):
"""Compare recent quality scores against baseline."""
current_scores = langfuse.fetch_scores(
name="quality_overall",
from_timestamp=datetime.now() - timedelta(days=1)
)
baseline_scores = langfuse.fetch_scores(
name="quality_overall",
from_timestamp=datetime.now() - timedelta(days=days),
to_timestamp=datetime.now() - timedelta(days=1)
)
current_mean = np.mean([s.value for s in current_scores])
baseline_mean = np.mean([s.value for s in baseline_scores])
drift_pct = (baseline_mean - current_mean) / baseline_mean
if drift_pct > threshold_drop:
return {"drift": True, "drop_pct": drift_pct}
return {"drift": False, "drop_pct": drift_pct}Canary Prompt Monitoring
Track consistency with fixed test inputs to detect behavioral changes:
CANARY_PROMPTS = [
{"id": "summarize_01", "prompt": "Summarize: The quick brown fox...",
"expected_keywords": ["fox", "dog", "jump"]},
{"id": "classify_01", "prompt": "Classify sentiment: Great product!",
"expected_output": "positive"},
]
async def run_canary_checks():
"""Run fixed test inputs and compare against expected outputs."""
results = []
for canary in CANARY_PROMPTS:
response = await llm.generate(canary["prompt"])
if "expected_keywords" in canary:
score = sum(
1 for kw in canary["expected_keywords"]
if kw.lower() in response.lower()
) / len(canary["expected_keywords"])
elif "expected_output" in canary:
score = 1.0 if canary["expected_output"] in response.lower() else 0.0
results.append({"id": canary["id"], "score": score})
avg_score = np.mean([r["score"] for r in results])
return {"canary_score": avg_score, "drift": avg_score < 0.8, "details": results}Embedding Drift (Centroid Monitoring)
import numpy as np
class CentroidMonitor:
"""Monitor drift via embedding centroid movement."""
def __init__(self, distance_threshold: float = 0.2):
self.distance_threshold = distance_threshold
self.baseline_centroid = None
self.baseline_std = None
def set_baseline(self, embeddings: np.ndarray):
self.baseline_centroid = embeddings.mean(axis=0)
distances = np.linalg.norm(embeddings - self.baseline_centroid, axis=1)
self.baseline_std = distances.std()
return self
def check_drift(self, embeddings: np.ndarray) -> dict:
current_centroid = embeddings.mean(axis=0)
centroid_distance = np.linalg.norm(current_centroid - self.baseline_centroid)
normalized_distance = centroid_distance / (self.baseline_std + 1e-10)
distances = np.linalg.norm(embeddings - self.baseline_centroid, axis=1)
outlier_ratio = (distances > 3 * self.baseline_std).mean()
return {
"normalized_distance": float(normalized_distance),
"outlier_ratio": float(outlier_ratio),
"drift_detected": normalized_distance > self.distance_threshold,
}Cluster-Based Drift Detection
from sklearn.cluster import KMeans
class ClusterDriftDetector:
"""Detect drift by monitoring cluster distributions."""
def __init__(self, n_clusters=10, psi_threshold=0.25):
self.n_clusters = n_clusters
self.psi_threshold = psi_threshold
self.kmeans = None
self.baseline_distribution = None
def fit_baseline(self, embeddings):
self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = self.kmeans.fit_predict(embeddings)
self.baseline_distribution = np.bincount(labels, minlength=self.n_clusters) / len(labels)
return self
def detect_drift(self, embeddings):
labels = self.kmeans.predict(embeddings)
current = np.bincount(labels, minlength=self.n_clusters) / len(labels)
psi = self._calculate_psi(self.baseline_distribution, current)
return {"psi": psi, "drift_detected": psi > self.psi_threshold}Multi-Metric Quality Tracker
class MultiMetricEWMA:
"""Track multiple quality metrics with independent baselines."""
def __init__(self, metrics: list[str], alpha: float = 0.2):
self.baselines = {m: EWMABaseline(alpha=alpha) for m in metrics}
def update(self, metrics: dict) -> dict:
results = {}
anomalies = []
for name, value in metrics.items():
if name in self.baselines:
result = self.baselines[name].update(value)
results[name] = result
if result["is_anomaly"]:
anomalies.append({"metric": name, "z_score": result["z_score"]})
return {"metrics": results, "anomalies": anomalies}RAG Retrieval Drift
Monitor drift in RAG retrieval quality by comparing retrieved document overlap against baseline queries. Track coverage ratio (what fraction of expected documents are still returned) to detect index staleness, embedding model changes, or corpus drift.
Key Decisions
| Decision | Recommendation |
|---|---|
| Baseline window | 7-30 days rolling window |
| Quality threshold | avg_score > 0.7 for production |
| Canary frequency | Every 6 hours minimum |
| Embedding method | Centroid distance for speed, cluster PSI for depth |
Incorrect — comparing quality to fixed threshold:
def check_quality(current_score: float):
if current_score < 0.7: # Static threshold, no baseline
alert("Quality degradation")Correct — comparing against rolling baseline:
def check_quality(current_scores: list, baseline_scores: list):
current_mean = np.mean(current_scores)
baseline_mean = np.mean(baseline_scores)
drift_pct = (baseline_mean - current_mean) / baseline_mean
if drift_pct > 0.1: # 10% drop from baseline
alert(f"Quality drift: {drift_pct:.1%} drop")Apply statistical methods to detect distribution shifts in LLM inputs and outputs — HIGH
Statistical Drift Detection
Population Stability Index (PSI)
Recommended for production LLM monitoring.
import numpy as np
def calculate_psi(
expected: np.ndarray,
actual: np.ndarray,
bins: int = 10,
eps: float = 0.0001
) -> float:
"""
PSI = SUM((Actual% - Expected%) * ln(Actual% / Expected%))
Thresholds:
- PSI < 0.1: No significant drift
- 0.1 <= PSI < 0.25: Moderate drift, investigate
- PSI >= 0.25: Significant drift, action needed
"""
min_val = min(expected.min(), actual.min())
max_val = max(expected.max(), actual.max())
bin_edges = np.linspace(min_val, max_val, bins + 1)
expected_counts, _ = np.histogram(expected, bins=bin_edges)
actual_counts, _ = np.histogram(actual, bins=bin_edges)
expected_pct = expected_counts / len(expected) + eps
actual_pct = actual_counts / len(actual) + eps
return np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))PSI Threshold Guidelines
| PSI Value | Interpretation | Action |
|---|---|---|
| < 0.1 | No significant drift | Monitor |
| 0.1 - 0.25 | Moderate drift | Investigate |
| >= 0.25 | Significant drift | Alert + Action |
Kolmogorov-Smirnov Test
Best for small sample sizes (<1000 observations).
from scipy import stats
def ks_drift_test(expected, actual, significance=0.05):
statistic, p_value = stats.ks_2samp(expected, actual)
return {
"statistic": statistic,
"p_value": p_value,
"drift_detected": p_value < significance,
}
# Warning: KS is too sensitive on large datasets
# Use sampling for >1000 observations
def adjusted_ks_test(expected, actual, sample_size=500):
if len(expected) > sample_size:
expected = np.random.choice(expected, sample_size, replace=False)
if len(actual) > sample_size:
actual = np.random.choice(actual, sample_size, replace=False)
return ks_drift_test(expected, actual)EWMA Dynamic Threshold
class EWMADriftDetector:
"""Exponential Weighted Moving Average for drift detection."""
def __init__(self, lambda_param: float = 0.2, L: float = 3.0):
self.lambda_param = lambda_param
self.L = L
self.ewma = None
def update(self, value: float, baseline_mean: float, baseline_std: float) -> dict:
if self.ewma is None:
self.ewma = value
else:
self.ewma = self.lambda_param * value + (1 - self.lambda_param) * self.ewma
factor = np.sqrt(self.lambda_param / (2 - self.lambda_param))
ucl = baseline_mean + self.L * baseline_std * factor
lcl = baseline_mean - self.L * baseline_std * factor
return {
"ewma": self.ewma,
"ucl": ucl, "lcl": lcl,
"drift_detected": self.ewma > ucl or self.ewma < lcl
}Method Comparison
| Method | Best For | Symmetric | Pros | Cons |
|---|---|---|---|---|
| PSI | Production monitoring | Yes | Stable, intuitive thresholds | Only notices large changes |
| KL Divergence | Sensitive analysis | No | Detects tail changes | Undefined for zero probs |
| JS Divergence | Balanced comparison | Yes | Bounded [0,1], no div-by-zero | Less sensitive to tails |
| KS Test | Small samples | Yes | Non-parametric | Too sensitive on large data |
| Wasserstein | Continuous data | Yes | Considers distribution shape | Computationally expensive |
Choosing the Right Method
def select_drift_method(data_type: str, sample_size: int) -> str:
if data_type == "categorical":
return "psi" if sample_size > 1000 else "chi_square"
if data_type == "embeddings":
return "embedding_centroid_distance"
if sample_size < 500:
return "ks_test"
return "psi" # Default: stable for productionCombined Drift Score
def combined_drift_score(expected, actual, weights=None):
weights = weights or {"psi": 0.4, "wasserstein": 0.3, "js": 0.3}
psi = calculate_psi(expected, actual)
wasserstein = wasserstein_drift(expected, actual, normalize=True)
js = js_divergence(expected, actual)
psi_normalized = min(psi / 0.5, 1.0)
combined = (
weights["psi"] * psi_normalized +
weights["wasserstein"] * wasserstein +
weights["js"] * js
)
return {"combined_score": combined, "drift_detected": combined > 0.15}EWMA Alpha Selection
| Use Case | Alpha | Behavior |
|---|---|---|
| Stable production | 0.1 | Slow adaptation |
| Active development | 0.3 | Moderate |
| High variability | 0.1-0.15 | Very stable |
| Sudden change detection | 0.4-0.5 | Quick response |
Incorrect — using KS test on large dataset:
from scipy import stats
statistic, p_value = stats.ks_2samp(baseline, current) # baseline/current have 10K+ samples
if p_value < 0.05:
alert("Drift detected") # Too sensitive, false alarmsCorrect — PSI for production monitoring:
psi = calculate_psi(baseline, current, bins=10)
if psi >= 0.25: # Stable threshold
alert("Significant drift detected") # Reliable signalTrack LLM token costs with spend alerts and per-operation cost attribution — HIGH
LLM Cost Tracking
Basic Cost Tracking (Langfuse v3)
from langfuse import observe, get_client
@observe(name="security_audit")
async def run_audit(content: str):
response = await llm.generate(
model="claude-sonnet-4-6",
messages=[{"role": "user", "content": f"Analyze: {content}"}],
)
get_client().update_current_observation(
model="claude-sonnet-4-6",
usage={
"input": 1500,
"output": 1000,
"unit": "TOKENS",
},
)
# Langfuse auto-calculates: $0.0045 + $0.015 = $0.0195
return responseCustom Model Pricing
from langfuse import Langfuse
langfuse = Langfuse()
langfuse.create_model(
model_name="claude-sonnet-4-6",
match_pattern="claude-sonnet-4.*",
unit="TOKENS",
input_price=0.000003, # $3/MTok
output_price=0.000015, # $15/MTok
)Cost Per Analysis
trace = langfuse.get_trace(trace_id)
total_cost = sum(
gen.calculated_total_cost or 0
for gen in trace.observations
if gen.type == "GENERATION"
)
await analysis_repo.update(
analysis_id,
langfuse_trace_id=trace.id,
total_cost_usd=total_cost,
)Spend Alerts
Via Langfuse UI
- Navigate to Settings -> Alerts
- Create alert rule:
- Metric: Daily cost
- Threshold: $50/day
- Channel: Slack / Email / Webhook
Via Metrics API
from langfuse import Langfuse
from datetime import datetime, timedelta
langfuse = Langfuse()
metrics = langfuse.get_metrics(
metric_name="total_cost",
from_timestamp=datetime.now() - timedelta(days=1),
to_timestamp=datetime.now(),
)
daily_cost = metrics.values[0].value if metrics.values else 0
if daily_cost > 50.0:
await send_alert(
channel="slack",
message=f"Daily LLM cost alert: ${daily_cost:.2f} exceeds $50 threshold",
)v2 Metrics API Queries
# Total cost over last 7 days
metrics = langfuse.get_metrics(
metric_name="total_cost",
from_timestamp=datetime.now() - timedelta(days=7),
granularity="day",
)
# Token usage by model
token_metrics = langfuse.get_metrics(
metric_name="total_tokens",
from_timestamp=datetime.now() - timedelta(days=7),
group_by="model",
)Prometheus LLM Cost Metrics
from prometheus_client import Counter, Histogram
llm_tokens_used = Counter(
'llm_tokens_used_total', 'Total LLM tokens',
['model', 'operation', 'token_type']
)
llm_cost_dollars = Counter(
'llm_cost_dollars_total', 'Total LLM cost in dollars',
['model', 'operation']
)
llm_request_duration = Histogram(
'llm_request_duration_seconds', 'LLM request duration',
['model', 'operation'], buckets=[0.5, 1, 2, 5, 10, 20, 30]
)Monitoring Dashboard SQL Queries
-- Top 10 most expensive traces (last 7 days)
SELECT name, user_id, calculated_total_cost, input_tokens, output_tokens
FROM traces
WHERE timestamp > NOW() - INTERVAL '7 days'
ORDER BY calculated_total_cost DESC LIMIT 10;
-- Average cost by agent type
SELECT metadata->>'agent_type' as agent, COUNT(*) as traces,
AVG(calculated_total_cost) as avg_cost
FROM traces WHERE metadata->>'agent_type' IS NOT NULL
GROUP BY agent ORDER BY avg_cost DESC;
-- Daily cost trend
SELECT DATE(timestamp) as date, SUM(calculated_total_cost) as daily_cost
FROM traces WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp) ORDER BY date;Best Practices
- Always pass usage data with input/output token counts
- Monitor costs daily with spend alerts to catch spikes early
- Set threshold alerts for abnormal increases (> 2x daily average)
- Track by user_id to identify expensive users
- Group by metadata (agent_type, operation) for cost attribution
- Use custom pricing for self-hosted models
- Use Metrics API for programmatic queries instead of raw SQL
Incorrect — no cost tracking in LLM calls:
@observe()
async def analyze(content: str):
response = await llm.generate(content) # No usage tracking
return response # Cost visibility lostCorrect — tracking usage for cost attribution:
@observe()
async def analyze(content: str):
response = await llm.generate(content)
get_client().update_current_observation(
model="claude-sonnet-4-6",
usage={"input": 1500, "output": 1000, "unit": "TOKENS"}
)
return response # Cost auto-calculated in LangfuseScore LLM evaluations systematically for quality monitoring and regression detection — HIGH
LLM Evaluation Scoring
Basic Scoring (Langfuse v3)
from langfuse import observe, get_client, Langfuse
langfuse = Langfuse()
@observe()
async def analyze_and_score(query: str):
response = await llm.generate(query)
# Score within @observe context
get_client().score_current_trace(
name="relevance",
value=0.85,
comment="Response addresses query but lacks depth",
)
return response
# Or score by trace_id directly
langfuse.score(
trace_id="trace_123",
name="factuality",
value=0.92,
data_type="NUMERIC",
)Score Types
# Numeric scores (0-1 range)
langfuse.score(trace_id="...", name="relevance", value=0.85, data_type="NUMERIC")
# Categorical scores
langfuse.score(trace_id="...", name="sentiment", value="positive", data_type="CATEGORICAL")
# Boolean scores
langfuse.score(trace_id="...", name="contains_pii", value=0, data_type="BOOLEAN")Evaluator Execution Tracing
Each evaluator run creates its own inspectable trace:
from langfuse import observe, get_client
@observe(type="evaluator", name="relevance_judge")
async def evaluate_relevance(query: str, response: str):
score = await llm_judge.evaluate(
criteria="relevance", query=query, response=response,
)
get_client().update_current_observation(
input={"query": query[:500], "response": response[:500]},
output={"score": score, "criteria": "relevance"},
model="claude-sonnet-4-6",
)
return scoreResult in Langfuse UI:
evaluator:relevance_judge (0.8s, $0.01)
+-- generation: judge_prompt -> score: 0.85
+-- metadata: {criteria: "relevance", model: "claude-sonnet-4-6"}G-Eval Automated Scoring
from langfuse import observe, get_client
scorer = GEvalScorer()
@observe()
async def analyze_with_scoring(query: str):
response = await llm.generate(query)
scores = await scorer.score(
query=query, response=response,
criteria=["relevance", "coherence", "depth"],
)
for criterion, score in scores.items():
get_client().score_current_trace(name=criterion, value=score)
return responseCommon Evaluation Metrics
| Metric | Range | Description |
|---|---|---|
| Relevance | 0-1 | Does response address the query? |
| Coherence | 0-1 | Is response logically structured? |
| Depth | 0-1 | Level of detail and analysis |
| Factuality | 0-1 | Accuracy of claims |
| Completeness | 0-1 | All aspects of query covered? |
| Toxicity | 0-1 | Harmful or inappropriate content |
Quality Gate Integration
from langfuse import observe, get_client
@observe(name="quality_gate")
async def quality_gate_node(state):
scores = await run_quality_evaluators(state)
for criterion, score in scores.items():
get_client().score_current_trace(name=criterion, value=score)
avg_score = sum(scores.values()) / len(scores)
return {"quality_gate_passed": avg_score >= 0.7, "quality_scores": scores}Score Trend Query
SELECT
DATE(timestamp) as date,
AVG(value) FILTER (WHERE name = 'relevance') as avg_relevance,
AVG(value) FILTER (WHERE name = 'depth') as avg_depth,
AVG(value) FILTER (WHERE name = 'factuality') as avg_factuality
FROM scores
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp)
ORDER BY date;Best Practices
- Score all production traces for quality monitoring
- Use evaluator type (
@observe(type="evaluator")) for inspectable judge traces - Use consistent criteria across all evaluations
- Automate scoring with G-Eval or similar frameworks
- Set quality thresholds (e.g., avg_relevance > 0.7)
- Create test datasets for regression testing
- Track scores by prompt version to measure improvements
- Alert on quality drops (e.g., avg_score < 0.6 for 3 consecutive days)
Incorrect — no quality scoring in production:
@observe()
async def analyze(query: str):
response = await llm.generate(query)
return response # No quality metricsCorrect — automated quality scoring:
@observe()
async def analyze(query: str):
response = await llm.generate(query)
scores = await scorer.score(query, response, ["relevance", "depth"])
for criterion, score in scores.items():
get_client().score_current_trace(name=criterion, value=score)
return responseTrace LLM call chains with Langfuse for debugging slow or incorrect responses — HIGH
Langfuse Traces
Basic Tracing with @observe (v3)
from langfuse import observe, get_client
@observe() # Auto-creates trace on first root span
async def analyze_content(content: str):
get_client().update_current_observation(
metadata={"content_length": len(content)}
)
return await llm.generate(content)Nested Spans
from langfuse import observe, get_client
@observe(name="content_analysis")
async def analyze(content: str):
# Nested span for retrieval
@observe(name="retrieval")
async def retrieve_context():
chunks = await vector_db.search(content)
get_client().update_current_observation(
metadata={"chunks_retrieved": len(chunks)}
)
return chunks
# Nested span for generation
@observe(name="generation")
async def generate_analysis(context):
response = await llm.generate(content)
get_client().update_current_observation(
model="claude-sonnet-4-6",
usage={"input_tokens": 1500, "output_tokens": 1000},
)
return response
context = await retrieve_context()
return await generate_analysis(context)Result in Langfuse UI:
content_analysis (2.3s, $0.045)
+-- retrieval (0.1s)
| +-- metadata: {chunks_retrieved: 5}
+-- generation (2.2s, $0.045)
+-- model: claude-sonnet-4-6
+-- tokens: 1500 input, 1000 outputSession & User Tracking
from langfuse import observe, get_client
@observe()
async def analysis(content: str):
get_client().update_current_trace(
user_id="user_123",
session_id="session_abc",
metadata={"content_type": "article", "agent_count": 8},
tags=["production", "orchestkit"],
)
return await run_pipeline(content)Observation Types for Agent Graphs
Beyond generation and span, v3 adds typed observations:
@observe(type="agent", name="supervisor")
async def supervisor(query: str): ... # Agent node in graph
@observe(type="tool", name="web_search")
async def search(query: str): ... # Tool call in graph
@observe(type="retriever", name="vector_search")
async def retrieve(query: str): ... # Retrieval step
@observe(type="chain", name="prompt_chain")
async def chain(inputs: dict): ... # Sequential processing
@observe(type="guardrail", name="pii_check")
async def check_pii(text: str): ... # Safety check
@observe(type="embedding", name="embed")
async def embed(text: str): ... # Vector generation
@observe(type="evaluator", name="quality_judge")
async def evaluate(output: str): ... # Inspectable evaluator traceOpenTelemetry SpanProcessor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from langfuse.opentelemetry import LangfuseSpanProcessor
langfuse_processor = LangfuseSpanProcessor(
public_key="pk-...",
secret_key="sk-...",
host="https://cloud.langfuse.com",
)
provider = TracerProvider()
provider.add_span_processor(langfuse_processor)JavaScript/TypeScript Setup
import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseExporter } from "@langfuse/otel";
const sdk = new NodeSDK({
traceExporter: new LangfuseExporter({
publicKey: process.env.LANGFUSE_PUBLIC_KEY,
secretKey: process.env.LANGFUSE_SECRET_KEY,
}),
});
sdk.start();Best Practices
- Use
from langfuse import observe, get_client— NOTfrom langfuse.decorators - Let
@observe()auto-create traces — no explicitlangfuse.trace()needed - Name spans descriptively (e.g., "retrieval", "generation")
- Use
type=parameter for Agent Graph rendering - Add metadata for debugging (chunk counts, model params)
- Truncate large inputs/outputs to 500-1000 chars
- Tag production vs staging traces for environment filtering
Incorrect — flat trace without nested spans:
@observe()
async def analyze(content: str):
chunks = await retrieve(content) # Not traced
result = await generate(chunks) # Not traced
return result # No visibility into sub-operationsCorrect — nested spans for full visibility:
@observe(name="content_analysis")
async def analyze(content: str):
@observe(name="retrieval")
async def retrieve_context():
return await vector_db.search(content)
@observe(name="generation")
async def generate_analysis(chunks):
return await llm.generate(chunks)
chunks = await retrieve_context()
return await generate_analysis(chunks)Define effective alerting rules to prevent alert fatigue and missed incidents — CRITICAL
Alerting Rules
Alert Severity Levels
| Level | Response Time | Examples |
|---|---|---|
| Critical (P1) | < 15 min | Service down, data loss |
| High (P2) | < 1 hour | Major feature broken |
| Medium (P3) | < 4 hours | Increased error rate |
| Low (P4) | Next day | Warnings, deprecations |
Key Alerts
| Alert | Condition | Severity |
|---|---|---|
| ServiceDown | up == 0 for 1m | Critical |
| HighErrorRate | 5xx > 5% for 5m | Critical |
| HighLatency | p95 > 2s for 5m | High |
| LowCacheHitRate | < 70% for 10m | Medium |
Prometheus Alerting Rules
groups:
- name: api_alerts
rules:
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
runbook_url: "https://wiki.example.com/runbooks/high-error-rate"
- alert: HighLatency
expr: histogram_quantile(0.95, http_request_duration_seconds_bucket) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "High API latency"Alert Grouping
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 30s # Wait 30s to collect similar alerts
group_interval: 5m # Send grouped alerts every 5m
repeat_interval: 4h # Re-send alert after 4h if still firing
routes:
- match:
severity: critical
receiver: pagerduty
continue: true
- match:
severity: warning
receiver: slackInhibition Rules
Suppress noisy alerts when root cause is known:
inhibit_rules:
# If ServiceDown is firing, suppress HighErrorRate and HighLatency
- source_match:
alertname: ServiceDown
target_match_re:
alertname: (HighErrorRate|HighLatency)
equal: ['service']
# If DatabaseDown is firing, suppress all DB-related alerts
- source_match:
alertname: DatabaseDown
target_match_re:
alertname: Database.*
equal: ['cluster']Escalation Policies
routes:
- match:
severity: critical
receiver: slack
continue: true
routes:
- match:
severity: critical
receiver: pagerduty
group_wait: 15m # Escalate to PagerDuty after 15 minRunbook Requirements
Every alert must link to a runbook containing:
- What the alert means
- Impact on users
- Common causes
- Investigation steps
- Remediation steps
- Escalation contacts
Alert Fatigue Prevention
Best Practices:
- Alert on symptoms, not causes — "Users cannot login" not "CPU high"
- Actionable alerts only — every alert needs a runbook
- Reduce noise — use
for: 5mto avoid flapping - Group related alerts — do not page for every instance
- Test alert rules — validate with
amtool alert query
Notification Channels
| Channel | Use For | Priority |
|---|---|---|
| PagerDuty | Critical (on-call) | P1-P2 |
| Slack | Warnings (team channel) | P3 |
| Low priority (daily digest) | P4 |
Incorrect — alert without for clause causes flapping:
- alert: HighErrorRate
expr: http_errors_total / http_requests_total > 0.05 # Immediate
labels:
severity: critical # Pages on-call for brief spikesCorrect — for clause prevents flapping:
- alert: HighErrorRate
expr: http_errors_total / http_requests_total > 0.05
for: 5m # Must sustain 5min before firing
labels:
severity: criticalDesign Grafana dashboards for actionable incident response and capacity planning — CRITICAL
Grafana Dashboards
The Four Golden Signals
| Signal | Metric | Description |
|---|---|---|
| Latency | Response time | How long requests take |
| Traffic | Requests/sec | Volume of demand |
| Errors | Error rate | Failures per second |
| Saturation | Resource usage | How full the service is |
Dashboard Layout (Top Row)
+--------------+--------------+--------------+--------------+
| Latency | Traffic | Errors | Saturation |
| (p50/p95) | (req/s) | (5xx rate) | (CPU/mem) |
+--------------+--------------+--------------+--------------+Service Dashboard Structure
- Overview (single row) — Traffic, errors, latency, saturation
- Request breakdown — By endpoint, method, status code
- Dependencies — Database, Redis, external APIs
- Resources — CPU, memory, disk, network
- Business metrics — Registrations, purchases, LLM costs
RED Metrics for Dashboards
# Rate
rate(http_requests_total[5m])
# Errors
sum(rate(http_requests_total{status=~"5.."}[5m]))
# Duration
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))USE Metrics for Resources
- Utilization — % of resource used
- Saturation — Queue depth, wait time
- Errors — Error count
SLO/SLI Definitions
Service Level Indicators (SLIs)
# Availability SLI: % of successful requests
sum(rate(http_requests_total{status!~"5.."}[30d])) /
sum(rate(http_requests_total[30d]))
# Latency SLI: % of requests < 1s
sum(rate(http_request_duration_seconds_bucket{le="1"}[30d])) /
sum(rate(http_request_duration_seconds_count[30d]))Service Level Objectives (SLOs)
| SLO | Target | Error Budget |
|---|---|---|
| Availability | 99.9% | 43 min downtime/month |
| Latency | 99% < 1s | 1% of requests can be slow |
Error Budget: If consumed, freeze feature work and focus on reliability.
Health Checks (Kubernetes)
| Probe | Purpose | Endpoint |
|---|---|---|
| Liveness | Is app running? | /health |
| Readiness | Ready for traffic? | /ready |
| Startup | Finished starting? | /startup |
Dashboard Best Practices
- Use time ranges — Last 1h, 6h, 24h, 7d
- Percentiles over averages — p50, p95, p99
- Color code thresholds — green/yellow/red
- Include annotations — deployments, incidents
- Link to runbooks — from alert panels
Incorrect — using average latency hides tail latency:
avg(http_request_duration_seconds) # Misleading for user experienceCorrect — using percentiles shows tail latency:
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))Instrument Prometheus metrics as the foundation for alerting and dashboard observability — CRITICAL
Prometheus Metrics
RED Method (Rate, Errors, Duration)
Essential metrics for any service:
- Rate — Requests per second
- Errors — Failed requests per second
- Duration — Request latency distribution
Metric Types
Counter — Monotonically increasing value
from prometheus_client import Counter
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
# Usage
http_requests_total.labels(method='GET', endpoint='/api/users', status=200).inc()Use cases: Request counts, error counts, bytes processed
Gauge — Value that can go up or down
from prometheus_client import Gauge
active_connections = Gauge(
'active_connections',
'Number of active database connections'
)
active_connections.set(25)
active_connections.inc()
active_connections.dec()Use cases: Queue length, memory usage, active connections
Histogram — Distribution of values (with buckets)
from prometheus_client import Histogram
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint'],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10]
)
with request_duration.labels(method='GET', endpoint='/api/users').time():
pass # handle requestUse cases: Request latency, response size
Histogram vs Summary
- Histogram: Calculate quantiles server-side (recommended)
- Summary: Calculate quantiles client-side (higher CPU, cannot aggregate across instances)
Recommended Bucket Configurations
// HTTP request latency
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5]
// Database query latency
buckets: [0.001, 0.01, 0.05, 0.1, 0.5, 1]
// LLM request latency
buckets: [0.5, 1, 2, 5, 10, 20, 30]Cardinality Management
# BAD: Unbounded cardinality
http_requests_total = Counter(
'http_requests_total',
['method', 'endpoint', 'user_id'] # user_id creates millions of time series!
)
# GOOD: Bounded cardinality
http_requests_total = Counter(
'http_requests_total',
['method', 'endpoint', 'status'] # ~10 x 100 x 10 = 10,000 series
)Limits:
- Good: < 10,000 unique time series per metric
- Acceptable: 10,000-100,000
- Bad: > 100,000 (Prometheus performance degrades)
Rule: Never use unbounded labels (user IDs, request IDs, timestamps)
Custom Business Metrics
# LLM token usage
llm_tokens_used = Counter(
'llm_tokens_used_total',
'Total LLM tokens consumed',
['model', 'operation']
)
# Cache hit rate
cache_operations = Counter(
'cache_operations_total',
'Cache operations',
['operation', 'result'] # result='hit|miss'
)
# Cache hit rate PromQL:
# sum(rate(cache_operations_total{result="hit"}[5m])) /
# sum(rate(cache_operations_total[5m]))PromQL Quick Reference
# Rate of requests
rate(http_requests_total[5m])
# Error rate percentage
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m])) * 100
# p95 latency
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
# Total cost per day by model
sum(increase(llm_cost_dollars_total[1d])) by (model)Incorrect — unbounded label cardinality:
http_requests = Counter(
'http_requests_total',
['method', 'endpoint', 'user_id'] # Millions of time series
)
http_requests.labels(method='GET', endpoint='/api', user_id='user_12345').inc()Correct — bounded labels only:
http_requests = Counter(
'http_requests_total',
['method', 'endpoint', 'status'] # ~10K time series max
)
http_requests.labels(method='GET', endpoint='/api', status='200').inc()Alert on silent failures using statistical baselines and proactive health monitoring — HIGH
Silent Failure Alerting
Set up alerting for failures that produce no errors but deliver wrong results.
Incorrect — alerting only on exceptions:
try:
result = await agent.run()
except Exception as e:
alert(e) # Only catches crashes, not silent failures
# Agent returned gibberish — no exception raised, no alert sentCorrect — statistical baseline anomaly detection:
import numpy as np
class BaselineAnomalyDetector:
def __init__(self, window_size=100, z_threshold=3.0):
self.window_size = window_size
self.z_threshold = z_threshold
self.history = []
def add_observation(self, value: float) -> dict:
self.history.append(value)
if len(self.history) > self.window_size:
self.history = self.history[-self.window_size:]
if len(self.history) < 10:
return {"alert": False, "reason": "insufficient_data"}
mean = np.mean(self.history[:-1])
std = np.std(self.history[:-1])
if std == 0:
return {"alert": False}
z_score = abs(value - mean) / std
if z_score > self.z_threshold:
return {"alert": True, "type": "statistical_anomaly",
"z_score": z_score, "value": value, "mean": mean}
return {"alert": False, "z_score": z_score}Silent failure type priorities:
| Type | Detection Method | Priority |
|---|---|---|
| Tool Skipping | Expected vs actual tool calls | Critical |
| Infinite Loop | Iteration count + token spike | Critical |
| Gibberish Output | LLM-as-judge + heuristics | High |
| Quality Degradation | Score < baseline | Medium |
| Latency Spike | p99 > threshold | Medium |
Key rules:
- Alert on silent failures (service up, logic broken), not just errors
- Use z-score > 3.0 (99.7% confidence) for anomaly detection
- Maintain rolling baselines with 100-observation windows
- Detection priority: tool skipping > loops > gibberish > anomalies
- Need minimum 10 observations before baseline alerting is reliable
- Combine statistical detection with proactive quality checks
Detect silent quality degradation in agent outputs that pass basic error checks — HIGH
Silent Quality Degradation Detection
Detect gibberish, repetitive, or low-quality LLM outputs that pass basic checks.
Incorrect — checking only for emptiness:
if len(response) > 0: # Not-empty is not correct
return response # Gibberish passes this checkCorrect — heuristic pre-filter + LLM-as-judge:
from langfuse import observe, get_client
@observe(name="quality_check")
async def detect_degraded_quality(response: str) -> dict:
# Quick heuristics first (cheap, fast)
if len(response) < 10:
return {"alert": True, "type": "too_short"}
# Repetition check: ratio of unique words to total words
words = response.split()
if len(words) > 0 and len(set(words)) / len(words) < 0.3:
return {"alert": True, "type": "repetitive"}
# LLM-as-judge for semantic quality (more expensive, run second)
judge_prompt = f"""Rate this response quality (0-1):
- 0: Gibberish, nonsensical, or completely wrong
- 0.5: Partially correct but missing key information
- 1: High quality, accurate, complete
Response: {response[:1000]}
Score (just the number):"""
score = await llm.generate(judge_prompt)
score_value = float(score.strip())
get_client().score_current_trace(name="quality_check", value=score_value)
if score_value < 0.5:
return {"alert": True, "type": "low_quality", "score": score_value}
return {"alert": False, "score": score_value}Loop and token spike detection:
class LoopDetector:
def __init__(self, max_iterations=10, token_spike_multiplier=3.0):
self.max_iterations = max_iterations
self.token_spike_multiplier = token_spike_multiplier
self.iteration_count = 0
self.total_tokens = 0
self.baseline_tokens = 2000
def check(self, tokens_used: int) -> dict:
self.iteration_count += 1
self.total_tokens += tokens_used
if self.iteration_count > self.max_iterations:
return {"alert": True, "type": "max_iterations"}
expected = self.baseline_tokens * self.iteration_count
if self.total_tokens > expected * self.token_spike_multiplier:
return {"alert": True, "type": "token_spike",
"tokens": self.total_tokens, "expected": expected}
return {"alert": False}Key rules:
- Layer detection: heuristics first (cheap), then LLM-as-judge (accurate)
- Track unique-word ratio — below 0.3 indicates repetitive/stuck output
- Monitor token consumption per iteration — 3x baseline indicates infinite loop
- Log quality scores to Langfuse for trend analysis and drift detection
- Not-empty and no-error are insufficient quality checks
Detect when agents silently skip expected tool calls and produce incorrect results — CRITICAL
Silent Tool Skipping Detection
Detect when LLM agents skip expected tool calls without raising errors.
Incorrect — assuming success if no error:
result = await agent.run()
# No error raised, but agent skipped the search tool entirely
# Result is fabricated from training data, not real data
return result # Wrong answer delivered confidentlyCorrect — validate tool usage against expectations:
from langfuse import Langfuse
def check_tool_usage(trace_id: str, expected_tools: list[str]) -> dict:
langfuse = Langfuse()
trace = langfuse.fetch_trace(trace_id)
actual_tools = [
span.name for span in trace.observations
if span.type == "tool"
]
missing_tools = set(expected_tools) - set(actual_tools)
if missing_tools:
return {
"alert": True,
"type": "tool_skipping",
"missing": list(missing_tools),
"message": f"Agent skipped expected tools: {missing_tools}"
}
return {"alert": False}
# Usage
expected_tools = ["search", "calculate"]
tool_check = check_tool_usage(trace_id, expected_tools)
if tool_check["alert"]:
alert(tool_check)
fallback_to_manual_execution()Key rules:
- Never assume success just because no error was raised
- Define expected tool lists per agent task and validate after execution
- Tool skipping is often caused by middleware interference or prompt changes
- Alert on tool skipping with Critical priority — it produces wrong results silently
- Always have a fallback path when expected tools are not called
References (23)
Agent Observability
Agent Observability
Trace multi-agent systems with Agent Graphs, new observation types, and rendered tool calls.
Agent Graphs (GA Nov 2025)
Agent Graphs provide visual execution flow for multi-agent systems. Langfuse automatically renders agent handoffs, tool calls, and decision points as an interactive graph.
Enabling Agent Graphs
from langfuse import observe, get_client
@observe(type="agent", name="supervisor")
async def supervisor_agent(query: str):
"""Supervisor agent that routes to specialists."""
get_client().update_current_observation(
metadata={"routing_strategy": "semantic"}
)
intent = await classify_intent(query)
if intent == "code_review":
return await code_review_agent(query)
elif intent == "security_audit":
return await security_audit_agent(query)
else:
return await general_agent(query)
@observe(type="agent", name="code_review")
async def code_review_agent(query: str):
"""Specialist agent for code review."""
context = await retrieve_context(query)
return await generate_review(context)
@observe(type="agent", name="security_audit")
async def security_audit_agent(query: str):
"""Specialist agent for security auditing."""
vulnerabilities = await scan_code(query)
return await generate_audit_report(vulnerabilities)Result in Langfuse UI — Agent Graph View
supervisor (agent)
├── classify_intent (chain) → "code_review"
├── code_review (agent)
│ ├── retrieve_context (retriever) → 5 chunks
│ └── generate_review (generation) → $0.03
│ └── tool_call: analyze_diff → rendered inline
└── Total: 3.2s, $0.05The Agent Graph View renders:
- Agent nodes with execution order
- Tool calls with inputs/outputs rendered inline
- Decision edges showing routing logic
- Timing breakdown per agent
New Observation Types
Langfuse v3 adds 7 observation types beyond generation and span:
| Type | Use Case | Example |
|---|---|---|
agent | Autonomous agent execution | Supervisor, specialist agents |
tool | Tool/function call | API calls, database queries |
chain | Sequential processing steps | Prompt chain, pipeline stage |
retriever | Document/context retrieval | Vector search, RAG retrieval |
evaluator | Quality assessment | G-Eval judge, human review |
embedding | Embedding generation | Text → vector conversion |
guardrail | Safety/validation check | PII filter, toxicity check |
Using Observation Types
from langfuse import observe, get_client
@observe(type="retriever", name="vector_search")
async def retrieve_context(query: str):
"""Retrieve relevant context from vector DB."""
results = await vector_db.search(query, top_k=5)
get_client().update_current_observation(
metadata={
"top_k": 5,
"chunks_returned": len(results),
"avg_similarity": sum(r.score for r in results) / len(results),
}
)
return results
@observe(type="tool", name="web_search")
async def search_web(query: str):
"""Execute web search tool."""
results = await tavily.search(query)
get_client().update_current_observation(
input=query,
output=results[:3], # Top 3 results
metadata={"source": "tavily", "result_count": len(results)},
)
return results
@observe(type="guardrail", name="pii_filter")
async def check_pii(text: str):
"""Check for PII before sending to LLM."""
has_pii = detect_pii(text)
get_client().update_current_observation(
input=text[:200],
output={"has_pii": has_pii, "action": "blocked" if has_pii else "passed"},
metadata={"check_type": "pii"},
)
if has_pii:
raise PiiDetectedError("PII detected in input")
return text
@observe(type="embedding", name="embed_query")
async def embed_query(text: str):
"""Generate embedding for query."""
embedding = await embeddings.embed(text)
get_client().update_current_observation(
input=text,
model="text-embedding-3-large",
usage={"input_tokens": len(text.split())},
metadata={"dimensions": len(embedding)},
)
return embedding
@observe(type="evaluator", name="relevance_judge")
async def evaluate_relevance(query: str, response: str):
"""Evaluate response relevance with LLM judge."""
score = await llm_judge.evaluate(
criteria="relevance",
query=query,
response=response,
)
get_client().update_current_observation(
input={"query": query, "response": response[:500]},
output={"score": score, "criteria": "relevance"},
)
# Each evaluator run creates its own inspectable trace
return scoreRendered Tool Calls
In v3, tool calls within generations are rendered inline in the trace view:
@observe(type="agent")
async def coding_agent(task: str):
response = await client.messages.create(
model="claude-sonnet-4-6",
tools=[
{"name": "read_file", "description": "Read a file", "input_schema": {...}},
{"name": "write_file", "description": "Write a file", "input_schema": {...}},
],
messages=[{"role": "user", "content": task}],
)
# Tool calls automatically rendered in Langfuse trace:
# generation → tool_use: read_file(path="src/main.py")
# → tool_result: "def main():..."
# → tool_use: write_file(path="src/main.py", content="...")
# → tool_result: "OK"
return responseTrace Log View
The Trace Log View provides a chronological log of all events within a trace, useful for debugging agent loops:
[00:00.000] agent:supervisor START
[00:00.050] chain:classify_intent START
[00:00.320] chain:classify_intent END → "security_audit"
[00:00.321] agent:security_audit START
[00:00.400] retriever:vector_search START
[00:00.650] retriever:vector_search END → 5 chunks
[00:00.651] guardrail:pii_filter START
[00:00.680] guardrail:pii_filter END → passed
[00:00.681] generation:analyze START
[00:02.100] generation:analyze END → $0.04
[00:02.101] evaluator:relevance_judge START
[00:02.500] evaluator:relevance_judge END → 0.92
[00:02.501] agent:security_audit END
[00:02.502] agent:supervisor END → Total: 2.5s, $0.05Framework Integration Examples
LangGraph
from langfuse import observe
@observe(type="agent", name="langgraph_supervisor")
async def run_langgraph_workflow(query: str):
"""LangGraph workflow with automatic Langfuse tracing."""
from langgraph.graph import StateGraph
graph = StateGraph(AgentState)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
# Each node automatically creates nested observations
# when decorated with @observe
app = graph.compile()
return await app.ainvoke({"query": query})CrewAI
from langfuse import observe
@observe(type="agent", name="crewai_crew")
async def run_crew(task: str):
"""CrewAI crew with Langfuse tracing."""
from crewai import Crew, Agent, Task
researcher = Agent(name="researcher", role="Research analyst")
writer = Agent(name="writer", role="Technical writer")
crew = Crew(
agents=[researcher, writer],
tasks=[Task(description=task, agent=researcher)],
)
# CrewAI has built-in Langfuse integration via callbacks
return crew.kickoff()OpenAI Agents SDK
from langfuse import observe
@observe(type="agent", name="openai_agent")
async def run_openai_agent(query: str):
"""OpenAI Agents SDK with Langfuse tracing."""
from openai_agents import Agent, Runner
agent = Agent(
name="analyst",
instructions="You are a code analyst.",
model="gpt-5.2",
)
# OpenAI Agents SDK supports Langfuse via OTEL exporter
result = await Runner.run(agent, query)
return result.final_outputBest Practices
- Use
type="agent"for autonomous agents that make routing decisions - Use
type="tool"for function calls to external services - Use
type="retriever"for all RAG retrieval steps - Use
type="guardrail"for safety checks (PII, toxicity, etc.) - Use
type="evaluator"for quality judges — each creates an inspectable trace - Add metadata with routing decisions, chunk counts, similarity scores
- Name observations descriptively — they appear in the Agent Graph
References
Alerting Dashboards
Alerting and Dashboards
Effective alerting strategies and dashboard design patterns.
Alert Severity Levels
| Level | Response Time | Examples |
|---|---|---|
| Critical (P1) | < 15 min | Service down, data loss |
| High (P2) | < 1 hour | Major feature broken |
| Medium (P3) | < 4 hours | Increased error rate |
| Low (P4) | Next day | Warnings |
Key Alerts
| Alert | Condition | Severity |
|---|---|---|
| ServiceDown | up == 0 for 1m | Critical |
| HighErrorRate | 5xx > 5% for 5m | Critical |
| HighLatency | p95 > 2s for 5m | High |
| LowCacheHitRate | < 70% for 10m | Medium |
Alert Grouping
Group related alerts:
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 30s # Wait 30s to collect similar alerts
group_interval: 5m # Send grouped alerts every 5m
repeat_interval: 4h # Re-send alert after 4h if still firing
routes:
- match:
severity: critical
receiver: pagerduty
continue: true # Continue to other routes
- match:
severity: warning
receiver: slackInhibition Rules
Suppress noisy alerts when root cause is known:
inhibit_rules:
# If ServiceDown is firing, suppress HighErrorRate and HighLatency
- source_match:
alertname: ServiceDown
target_match_re:
alertname: (HighErrorRate|HighLatency)
equal: ['service']
# If DatabaseDown is firing, suppress all DB-related alerts
- source_match:
alertname: DatabaseDown
target_match_re:
alertname: Database.*
equal: ['cluster']Escalation Policies
# Escalation: Slack -> PagerDuty after 15 min
routes:
- match:
severity: critical
receiver: slack
continue: true
routes:
- match:
severity: critical
receiver: pagerduty
group_wait: 15m # Escalate to PagerDuty after 15 minRunbook Links
Add runbook links to alert annotations:
groups:
- name: app-alerts
rules:
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m])) > 0.05
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
runbook_url: "https://wiki.example.com/runbooks/high-error-rate"Runbook should include:
- What the alert means
- Impact on users
- Common causes
- Investigation steps
- Remediation steps
- Escalation contacts
Dashboard Design Principles
Golden Signals Dashboard (top row)
+--------------+--------------+--------------+--------------+
| Latency | Traffic | Errors | Saturation |
| (p50/p95) | (req/s) | (5xx rate) | (CPU/mem) |
+--------------+--------------+--------------+--------------+Service Dashboard Structure
- Overview (single row) - Traffic, errors, latency, saturation
- Request breakdown - By endpoint, method, status code
- Dependencies - Database, Redis, external APIs
- Resources - CPU, memory, disk, network
- Business metrics - Registrations, purchases, etc.
RED Metrics for Dashboards
- Rate:
rate(http_requests_total[5m]) - Errors:
sum(rate(http_requests_total\{status=~"5.."\}[5m])) - Duration:
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
USE Metrics for Resources
- Utilization: % of resource used
- Saturation: Queue depth, wait time
- Errors: Error count
SLO/SLI Dashboards
Service Level Indicators (SLIs):
# Availability SLI: % of successful requests
sum(rate(http_requests_total{status!~"5.."}[30d])) /
sum(rate(http_requests_total[30d]))
# Latency SLI: % of requests < 1s
sum(rate(http_request_duration_seconds_bucket{le="1"}[30d])) /
sum(rate(http_request_duration_seconds_count[30d]))Service Level Objectives (SLOs):
- Availability: 99.9% (43 min downtime/month)
- Latency: 99% of requests < 1s
Error Budget:
- 99.9% SLO = 0.1% error budget
- If error budget consumed, freeze feature work and focus on reliability
Notification Channels
- PagerDuty - critical (on-call)
- Slack - warnings (team channel)
- Email - low priority (daily digest)
See scripts/alerting-rules.yml for complete examples.
Alerting Strategies
Alerting Strategies
Effective alerting to minimize false positives.
Alerting Rules (Prometheus)
groups:
- name: api_alerts
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }}% over last 5 minutes"
- alert: HighLatency
expr: histogram_quantile(0.95, http_request_duration_seconds_bucket) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "High API latency"Alert Severity Levels
| Severity | Response Time | Example |
|---|---|---|
| Critical | Immediate (page) | Service down, data loss |
| High | 30 min | High error rate, disk full |
| Medium | 4 hours | Slow responses, high memory |
| Low | Next day | Deprecation warnings |
Best Practices
- Alert on symptoms, not causes - "Users can't login" not "CPU high"
- Actionable alerts only - every alert needs runbook
- Reduce noise - use
for: 5mto avoid flapping - Group related alerts - don't page for every instance
- Test alert rules -
amtool alert query
Notification Channels
- PagerDuty - critical (on-call)
- Slack - warnings (team channel)
- Email - low priority (daily digest)
See scripts/alerting-rules.yml for complete examples.
Annotation Queues
Annotation Queues
Human-review workflow in Langfuse: route traces to reviewers, collect scores, feed decisions into golden dataset curation.
What Are Annotation Queues?
Annotation queues let you route specific traces to human reviewers for scoring. Reviewers see the trace inputs, outputs, and existing automated scores, then add a human judgment score. The collected scores become ground truth for evaluator calibration and golden dataset inclusion decisions.
Typical uses:
- Spot-checking high-cost or high-stakes LLM outputs
- Reviewing traces flagged as low-quality by automated evaluators
- Building ground-truth labels for fine-tuning or RAG evaluation
Creating Queues via Langfuse UI
- Navigate to Annotation Queues in the left sidebar
- Click Create Queue
- Configure:
- Name — e.g.,
quality-review,safety-check,golden-dataset-candidates - Description — what reviewers should evaluate
- Score configs — which scoring dimensions to collect (e.g.,
accuracy,relevance,safety)
- Name — e.g.,
- Share the queue URL with reviewers — no code access required
Score configs define what the reviewer sees and scores. Create them under Settings → Score Configs before creating the queue.
Adding Traces to a Queue Programmatically
Use get_client().create_annotation_queue_item() inside an @observe-decorated function to route a trace for human review:
from langfuse import observe, get_client
@observe(name="generate-response")
async def generate_and_flag(user_query: str, queue_id: str) -> str:
"""Generate a response and flag low-confidence outputs for human review."""
response = await llm.generate(user_query)
score = await auto_evaluate(response)
# Flag for human review when automated confidence is low
if score < 0.7:
lf = get_client()
trace_id = lf.get_current_trace_id()
lf.create_annotation_queue_item(
queue_id=queue_id,
trace_id=trace_id,
)
return responseYou can also add traces to a queue outside of an @observe context using a standalone client:
from langfuse import Langfuse
lf = Langfuse()
# Add a known trace ID to a review queue
lf.create_annotation_queue_item(
queue_id="queue-abc123",
trace_id="trace-xyz789",
)Retrieve queue IDs programmatically via the Langfuse API or copy them from the UI URL.
Human-Review Workflow
Trace in Langfuse
|
v
[Automated score < threshold]
|
v
create_annotation_queue_item() → Queue
|
v
Reviewer opens queue URL
|
Views: input / output / existing scores
|
Adds human scores (accuracy, safety, etc.)
|
v
Scores stored on trace in Langfuse
|
v
[Optional] Trigger golden dataset inclusionReviewers access queues via the Langfuse UI — no SDK or code access required. The reviewer sees:
- The trace input and output
- Any automated scores already applied
- The score dimensions configured for the queue
After scoring, human scores appear on the trace alongside automated scores and are queryable via the Langfuse API.
Fetching Completed Annotations
Query finished annotation items to drive downstream automation (e.g., auto-include high-scored traces into the golden dataset):
import httpx
import base64
import os
LANGFUSE_HOST = os.environ["LANGFUSE_HOST"]
PUBLIC_KEY = os.environ["LANGFUSE_PUBLIC_KEY"]
SECRET_KEY = os.environ["LANGFUSE_SECRET_KEY"]
auth = base64.b64encode(f"{PUBLIC_KEY}:{SECRET_KEY}".encode()).decode()
headers = {"Authorization": f"Basic {auth}"}
async def fetch_completed_annotations(queue_id: str) -> list[dict]:
"""Fetch completed annotation queue items."""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{LANGFUSE_HOST}/api/public/annotation-queues/{queue_id}/items",
headers=headers,
params={"status": "DONE"},
)
resp.raise_for_status()
return resp.json()["data"]
async def promote_high_quality_to_dataset(queue_id: str, dataset_name: str):
"""Add human-approved traces to golden dataset."""
from langfuse import Langfuse
lf = Langfuse()
items = await fetch_completed_annotations(queue_id)
for item in items:
# Check human score threshold
scores = item.get("scores", [])
quality_scores = [s["value"] for s in scores if s["name"] == "accuracy"]
if quality_scores and quality_scores[0] >= 0.8:
lf.create_dataset_item(
dataset_name=dataset_name,
trace_id=item["traceId"],
)Link to Golden Dataset Curation
Annotation queues feed directly into golden dataset curation:
- Automated multi-agent pipeline scores content (
accuracy,coherence,depth,relevance) - Items with
quality_total >= 0.75but low confidence go to thegolden-dataset-candidatesqueue - Human reviewer confirms or overrides the automated decision
- Approved traces are added to the evaluation dataset via
create_dataset_item()
See ../../golden-dataset/rules/curation-annotation.md for the parallel multi-agent scoring pipeline that feeds this queue.
References
- Langfuse Annotation Queues docs
../references/evaluation-scores.md— automated scoring patterns../../golden-dataset/rules/curation-annotation.md— multi-agent curation pipeline
Cost Tracking
Token & Cost Tracking
Automatic cost calculation based on model pricing, with spend alerts and Metrics API.
Basic Cost Tracking (v3)
from langfuse import observe, get_client, Langfuse
langfuse = Langfuse()
@observe(name="security_audit")
async def run_audit(content: str):
"""Track costs automatically via @observe."""
response = await llm.generate(
model="claude-sonnet-4-6",
messages=[{"role": "user", "content": f"Analyze for XSS: {content}"}],
)
get_client().update_current_observation(
model="claude-sonnet-4-6",
usage={
"input": 1500,
"output": 1000,
"unit": "TOKENS",
},
)
# Langfuse automatically calculates: $0.0045 + $0.015 = $0.0195
return responsePricing Database (Auto-Updated)
Langfuse maintains a pricing database for all major models. You can also define custom pricing:
langfuse = Langfuse()
# Custom model pricing
langfuse.create_model(
model_name="claude-sonnet-4-6",
match_pattern="claude-sonnet-4.*",
unit="TOKENS",
input_price=0.000003, # $3/MTok
output_price=0.000015, # $15/MTok
total_price=None, # Calculated from input+output
)Cost Tracking Per Analysis
from langfuse import Langfuse
langfuse = Langfuse()
# After analysis completes
trace = langfuse.get_trace(trace_id)
total_cost = sum(
gen.calculated_total_cost or 0
for gen in trace.observations
if gen.type == "GENERATION"
)
# Store in database
await analysis_repo.update(
analysis_id,
langfuse_trace_id=trace.id,
total_cost_usd=total_cost,
)Spend Alerts
Configure alerts to get notified when costs exceed thresholds:
In Langfuse UI
- Navigate to Settings → Alerts
- Create alert rule:
- Metric: Daily cost
- Threshold: $50/day
- Channel: Slack / Email / Webhook
Via API
langfuse = Langfuse()
# Programmatic spend check
from datetime import datetime, timedelta
# Get daily cost via v2 Metrics API
metrics = langfuse.get_metrics(
metric_name="total_cost",
from_timestamp=datetime.now() - timedelta(days=1),
to_timestamp=datetime.now(),
)
daily_cost = metrics.values[0].value if metrics.values else 0
if daily_cost > 50.0:
await send_alert(
channel="slack",
message=f"Daily LLM cost alert: ${daily_cost:.2f} exceeds $50 threshold",
)v2 Metrics API (Beta)
Query cost and usage metrics programmatically instead of SQL:
from langfuse import Langfuse
from datetime import datetime, timedelta
langfuse = Langfuse()
# Total cost over last 7 days
metrics = langfuse.get_metrics(
metric_name="total_cost",
from_timestamp=datetime.now() - timedelta(days=7),
to_timestamp=datetime.now(),
granularity="day",
)
for point in metrics.values:
print(f"{point.timestamp.date()}: ${point.value:.2f}")
# Token usage by model
token_metrics = langfuse.get_metrics(
metric_name="total_tokens",
from_timestamp=datetime.now() - timedelta(days=7),
to_timestamp=datetime.now(),
group_by="model",
)
for group in token_metrics.groups:
print(f"{group.key}: {group.values[0].value:,} tokens")Monitoring Dashboard Queries
Top 10 Most Expensive Traces (Last 7 Days)
SELECT
name,
user_id,
calculated_total_cost,
input_tokens,
output_tokens
FROM traces
WHERE timestamp > NOW() - INTERVAL '7 days'
ORDER BY calculated_total_cost DESC
LIMIT 10;Average Cost by Agent Type
SELECT
metadata->>'agent_type' as agent,
COUNT(*) as traces,
AVG(calculated_total_cost) as avg_cost,
SUM(calculated_total_cost) as total_cost
FROM traces
WHERE metadata->>'agent_type' IS NOT NULL
GROUP BY agent
ORDER BY total_cost DESC;Daily Cost Trend
SELECT
DATE(timestamp) as date,
SUM(calculated_total_cost) as daily_cost,
COUNT(*) as trace_count
FROM traces
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp)
ORDER BY date;Best Practices
- Always pass usage data with input/output token counts
- Monitor costs daily with spend alerts to catch spikes early
- Set up threshold alerts for abnormal cost increases (> 2x daily average)
- Track costs by user_id to identify expensive users
- Group by metadata (content_type, agent_type) for cost attribution
- Use custom pricing for self-hosted models
- Use Metrics API for programmatic cost queries instead of raw SQL
References
Dashboards
Monitoring Dashboards
Grafana dashboard patterns and SLO/SLI definitions.
The Four Golden Signals
| Signal | Metric | Description |
|---|---|---|
| Latency | Response time | How long requests take |
| Traffic | Requests/sec | Volume of demand |
| Errors | Error rate | Failures per second |
| Saturation | Resource usage | How full the service is |
SLO/SLI Examples
# SLO: 99.9% availability
SLI: availability = successful_requests / total_requests
Target: > 0.999
# SLO: 95% of requests < 500ms
SLI: latency_p95 = histogram_quantile(0.95, request_duration_seconds)
Target: < 0.5
# SLO: < 0.1% error rate
SLI: error_rate = failed_requests / total_requests
Target: < 0.001Grafana Dashboard Structure
- Overview row - traffic, errors, latency
- Saturation row - CPU, memory, disk
- Details row - per-endpoint breakdown
- Database row - query performance, connections
Best Practices
- Use time ranges - Last 1h, 6h, 24h, 7d
- Percentiles over averages - p50, p95, p99
- Color code thresholds - green/yellow/red
- Include annotations - deployments, incidents
See Grafana dashboards in backend/grafana/dashboards/.
Distributed Tracing
Distributed Tracing
Track requests across microservices with OpenTelemetry.
Basic Setup (Node.js)
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
const sdk = new NodeSDK({
traceExporter: new JaegerExporter(),
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();Span Relationships
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
# Parent span
with tracer.start_as_current_span("analyze_content") as parent_span:
parent_span.set_attribute("content.url", url)
parent_span.set_attribute("content.type", "article")
# Child span (sequential)
with tracer.start_as_current_span("fetch_content") as fetch_span:
content = await fetch_url(url)
fetch_span.set_attribute("content.size_bytes", len(content))
# Another child span (sequential)
with tracer.start_as_current_span("generate_embedding") as embed_span:
embedding = await embed_text(content)
embed_span.set_attribute("embedding.dimensions", len(embedding))
# Parallel child spans (using asyncio.gather)
async def analyze_with_span(agent_name: str, content: str):
with tracer.start_as_current_span(f"agent_{agent_name}"):
return await agent.analyze(content)
results = await asyncio.gather(
analyze_with_span("tech_comparator", content),
analyze_with_span("security_auditor", content),
analyze_with_span("implementation_planner", content)
)Trace Sampling Strategies
Head-based sampling (decide at trace start):
from opentelemetry.sdk.trace.sampling import (
TraceIdRatioBased, # Sample X% of traces
ParentBased, # Follow parent's sampling decision
ALWAYS_ON, # Always sample
ALWAYS_OFF # Never sample
)
# Sample 10% of traces
sampler = TraceIdRatioBased(0.1)Tail-based sampling (decide after trace completes):
- Keep all traces with errors
- Keep slow traces (p95+ latency)
- Sample 1% of successful fast traces
Recommended sampling:
- Development: 100% sampling
- Production: 10% sampling, 100% for errors
Context Propagation
// Service A: Create trace context
const ctx = context.active();
// Service B: Extract trace context from headers
const propagatedCtx = propagation.extract(ctx, request.headers);
context.with(propagatedCtx, () => {
// This span will be child of Service A's span
const span = tracer.startSpan('service_b_operation');
// ...
span.end();
});Trace Analysis Queries
Find slow traces:
duration > 2sFind traces with errors:
status = errorFind traces for specific user:
user.id = "abc-123"Find traces hitting specific service:
service.name = "analysis-worker"Best Practices
- Sample smartly - 10% for high traffic, 100% for errors
- Add attributes - user_id, order_id, error_type
- Propagate context - across HTTP, gRPC, message queues
- Tag errors -
error=truefor filtering
See scripts/opentelemetry-tracing.ts for complete setup.
Embedding Drift
Embedding Drift Detection
Monitor semantic drift in LLM applications using embedding-based methods.
Overview
Traditional statistical methods (PSI, KS) don't work well for unstructured text data. Embedding drift detection uses vector representations to detect semantic changes.
Arize Phoenix Integration
import phoenix as px
from phoenix.trace import TraceDataset
import numpy as np
# Launch Phoenix for local observability
session = px.launch_app()
# Analyze embedding drift
def analyze_embedding_drift(
baseline_embeddings: np.ndarray,
current_embeddings: np.ndarray
) -> dict:
"""
Analyze drift in embedding space using Phoenix.
Args:
baseline_embeddings: Reference embeddings (N x D)
current_embeddings: Current embeddings (M x D)
Returns:
Drift analysis results
"""
# Phoenix provides built-in drift analysis
drift_analysis = px.Client().compute_drift(
primary_embeddings=current_embeddings,
reference_embeddings=baseline_embeddings
)
return drift_analysisCluster-Based Drift Detection
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import numpy as np
class ClusterDriftDetector:
"""Detect drift by monitoring cluster distributions."""
def __init__(self, n_clusters: int = 10, psi_threshold: float = 0.25):
self.n_clusters = n_clusters
self.psi_threshold = psi_threshold
self.kmeans = None
self.baseline_distribution = None
def fit_baseline(self, embeddings: np.ndarray):
"""Fit clusters on baseline embeddings."""
self.kmeans = KMeans(
n_clusters=self.n_clusters,
random_state=42,
n_init=10
)
labels = self.kmeans.fit_predict(embeddings)
# Store baseline cluster distribution
self.baseline_distribution = np.bincount(
labels,
minlength=self.n_clusters
) / len(labels)
return self
def detect_drift(self, embeddings: np.ndarray) -> dict:
"""Detect drift in new embeddings."""
if self.kmeans is None:
raise ValueError("Must call fit_baseline first")
# Assign new embeddings to clusters
labels = self.kmeans.predict(embeddings)
# Current cluster distribution
current_distribution = np.bincount(
labels,
minlength=self.n_clusters
) / len(labels)
# Calculate PSI between distributions
psi = self._calculate_psi(
self.baseline_distribution,
current_distribution
)
# Calculate centroid distances
centroid_shift = self._calculate_centroid_shift(embeddings, labels)
return {
"psi": psi,
"drift_detected": psi > self.psi_threshold,
"baseline_distribution": self.baseline_distribution.tolist(),
"current_distribution": current_distribution.tolist(),
"centroid_shift": centroid_shift,
"interpretation": self._interpret(psi, centroid_shift)
}
def _calculate_psi(self, expected: np.ndarray, actual: np.ndarray) -> float:
"""Calculate PSI between cluster distributions."""
eps = 0.0001
expected = expected + eps
actual = actual + eps
return np.sum((actual - expected) * np.log(actual / expected))
def _calculate_centroid_shift(
self,
embeddings: np.ndarray,
labels: np.ndarray
) -> dict:
"""Calculate how much cluster centroids have shifted."""
shifts = {}
for i in range(self.n_clusters):
cluster_embeddings = embeddings[labels == i]
if len(cluster_embeddings) > 0:
current_centroid = cluster_embeddings.mean(axis=0)
baseline_centroid = self.kmeans.cluster_centers_[i]
shift = np.linalg.norm(current_centroid - baseline_centroid)
shifts[f"cluster_{i}"] = float(shift)
return shifts
def _interpret(self, psi: float, centroid_shift: dict) -> str:
avg_shift = np.mean(list(centroid_shift.values()))
if psi < 0.1 and avg_shift < 0.1:
return "No significant drift"
elif psi < 0.25:
return "Minor drift detected, monitor closely"
else:
return "Significant drift, investigate and consider retraining"Centroid Distance Monitoring
import numpy as np
from typing import Optional
class CentroidMonitor:
"""Monitor drift via embedding centroid movement."""
def __init__(self, distance_threshold: float = 0.2):
self.distance_threshold = distance_threshold
self.baseline_centroid: Optional[np.ndarray] = None
self.baseline_std: Optional[float] = None
def set_baseline(self, embeddings: np.ndarray):
"""Set baseline centroid from reference embeddings."""
self.baseline_centroid = embeddings.mean(axis=0)
# Calculate average distance from centroid
distances = np.linalg.norm(
embeddings - self.baseline_centroid,
axis=1
)
self.baseline_std = distances.std()
return self
def check_drift(self, embeddings: np.ndarray) -> dict:
"""Check if current embeddings have drifted from baseline."""
if self.baseline_centroid is None:
raise ValueError("Must call set_baseline first")
# Current centroid
current_centroid = embeddings.mean(axis=0)
# Distance between centroids
centroid_distance = np.linalg.norm(
current_centroid - self.baseline_centroid
)
# Normalized by baseline spread
normalized_distance = centroid_distance / (self.baseline_std + 1e-10)
# Check individual embedding distances
distances = np.linalg.norm(
embeddings - self.baseline_centroid,
axis=1
)
outlier_ratio = (distances > 3 * self.baseline_std).mean()
return {
"centroid_distance": float(centroid_distance),
"normalized_distance": float(normalized_distance),
"outlier_ratio": float(outlier_ratio),
"drift_detected": normalized_distance > self.distance_threshold,
"severity": self._severity(normalized_distance, outlier_ratio)
}
def _severity(self, distance: float, outlier_ratio: float) -> str:
if distance < 0.1 and outlier_ratio < 0.05:
return "none"
elif distance < 0.2 and outlier_ratio < 0.1:
return "low"
elif distance < 0.3 and outlier_ratio < 0.2:
return "medium"
else:
return "high"Cosine Similarity Drift
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def cosine_drift_score(
baseline_embeddings: np.ndarray,
current_embeddings: np.ndarray,
sample_size: int = 1000
) -> dict:
"""
Measure drift using cosine similarity distributions.
Args:
baseline_embeddings: Reference embeddings
current_embeddings: Current embeddings
sample_size: Number of pairs to sample
Returns:
Drift analysis based on cosine similarities
"""
# Sample pairs for efficiency
n_baseline = min(len(baseline_embeddings), sample_size)
n_current = min(len(current_embeddings), sample_size)
baseline_sample = baseline_embeddings[
np.random.choice(len(baseline_embeddings), n_baseline, replace=False)
]
current_sample = current_embeddings[
np.random.choice(len(current_embeddings), n_current, replace=False)
]
# Baseline self-similarity
baseline_centroid = baseline_sample.mean(axis=0)
baseline_similarities = cosine_similarity(
baseline_sample,
baseline_centroid.reshape(1, -1)
).flatten()
# Current similarity to baseline centroid
current_similarities = cosine_similarity(
current_sample,
baseline_centroid.reshape(1, -1)
).flatten()
# Compare distributions
baseline_mean = baseline_similarities.mean()
current_mean = current_similarities.mean()
similarity_drop = baseline_mean - current_mean
return {
"baseline_mean_similarity": float(baseline_mean),
"current_mean_similarity": float(current_mean),
"similarity_drop": float(similarity_drop),
"drift_detected": similarity_drop > 0.1,
"interpretation": (
"Significant semantic drift"
if similarity_drop > 0.1
else "No significant drift"
)
}RAG Retrieval Drift
from typing import List
import numpy as np
class RAGDriftMonitor:
"""Monitor drift in RAG retrieval quality."""
def __init__(
self,
similarity_threshold: float = 0.7,
coverage_threshold: float = 0.8
):
self.similarity_threshold = similarity_threshold
self.coverage_threshold = coverage_threshold
self.baseline_queries: List[np.ndarray] = []
self.baseline_retrievals: List[List[np.ndarray]] = []
def add_baseline(
self,
query_embedding: np.ndarray,
retrieved_embeddings: List[np.ndarray]
):
"""Add a query-retrieval pair to baseline."""
self.baseline_queries.append(query_embedding)
self.baseline_retrievals.append(retrieved_embeddings)
def check_retrieval_drift(
self,
query_embedding: np.ndarray,
retrieved_embeddings: List[np.ndarray]
) -> dict:
"""
Check if retrieval for a query has drifted.
Useful for detecting:
- Index staleness
- Embedding model changes
- Document corpus drift
"""
# Find most similar baseline query
similarities = [
cosine_similarity(
query_embedding.reshape(1, -1),
bq.reshape(1, -1)
)[0, 0]
for bq in self.baseline_queries
]
best_match_idx = np.argmax(similarities)
query_similarity = similarities[best_match_idx]
if query_similarity < self.similarity_threshold:
return {
"drift_detected": False,
"reason": "Query too different from baseline"
}
# Compare retrieved documents
baseline_retrieved = self.baseline_retrievals[best_match_idx]
# Calculate coverage: how many baseline docs are still retrieved
coverage = self._calculate_coverage(
baseline_retrieved,
retrieved_embeddings
)
return {
"query_similarity": float(query_similarity),
"coverage": float(coverage),
"drift_detected": coverage < self.coverage_threshold,
"interpretation": (
f"Retrieval coverage dropped to {coverage:.2%}"
if coverage < self.coverage_threshold
else "Retrieval stable"
)
}
def _calculate_coverage(
self,
baseline: List[np.ndarray],
current: List[np.ndarray]
) -> float:
"""Calculate what fraction of baseline docs are still retrieved."""
if not baseline or not current:
return 0.0
baseline_stack = np.stack(baseline)
current_stack = np.stack(current)
# For each baseline doc, check if similar doc is in current
similarities = cosine_similarity(baseline_stack, current_stack)
max_similarities = similarities.max(axis=1)
# Count docs with similarity > threshold
covered = (max_similarities > self.similarity_threshold).sum()
return covered / len(baseline)Evidently AI Integration
from evidently import Report
from evidently.metrics import EmbeddingsDriftMetric
import pandas as pd
import numpy as np
def evidently_embedding_drift(
baseline_embeddings: np.ndarray,
current_embeddings: np.ndarray,
embedding_column: str = "embedding"
) -> dict:
"""
Use Evidently AI for embedding drift detection.
Evidently uses model-based drift detection by default:
Trains a classifier to distinguish baseline vs current.
"""
# Create DataFrames
baseline_df = pd.DataFrame({
embedding_column: list(baseline_embeddings)
})
current_df = pd.DataFrame({
embedding_column: list(current_embeddings)
})
# Run Evidently report
report = Report(metrics=[
EmbeddingsDriftMetric(column_name=embedding_column)
])
report.run(
reference_data=baseline_df,
current_data=current_df
)
# Extract results
result = report.as_dict()["metrics"][0]["result"]
return {
"drift_score": result.get("drift_score"),
"drift_detected": result.get("drift_detected"),
"method": "model_based",
"details": result
}References
- Arize Phoenix Documentation
- Evidently AI: Embedding Drift Detection
- Measuring Embedding Drift
- AWS: Monitor Embedding Drift for LLMs
Evaluation Scores
LLM Evaluation & Scoring
Track quality metrics with custom scores, automated evaluation, and evaluator execution tracing.
Basic Scoring (v3)
from langfuse import observe, get_client, Langfuse
langfuse = Langfuse()
@observe()
async def analyze_and_score(query: str):
"""Run analysis and score the result."""
response = await llm.generate(query)
# Score via get_client() within @observe context
get_client().update_current_observation(
output=response[:500],
)
# Score the trace
get_client().score_current_trace(
name="relevance",
value=0.85,
comment="Response addresses query but lacks depth",
)
return response
# Or score by trace_id directly
langfuse.score(
trace_id="trace_123",
name="factuality",
value=0.92,
data_type="NUMERIC",
)Evaluator Execution Tracing
In v3, each evaluator run creates its own inspectable trace:
from langfuse import observe, get_client
@observe(type="evaluator", name="relevance_judge")
async def evaluate_relevance(query: str, response: str):
"""Each evaluator call creates an inspectable trace in Langfuse."""
score = await llm_judge.evaluate(
criteria="relevance",
query=query,
response=response,
)
get_client().update_current_observation(
input={"query": query[:500], "response": response[:500]},
output={"score": score, "criteria": "relevance"},
model="claude-sonnet-4-6",
)
# The evaluator's own LLM calls are visible in its trace
return scoreResult in Langfuse UI:
evaluator:relevance_judge (0.8s, $0.01)
├── generation: judge_prompt → score: 0.85
└── metadata: {criteria: "relevance", model: "claude-sonnet-4-6"}Score Analytics
View multi-score comparisons in the Langfuse dashboard:
- Score distributions: Histogram of scores by criterion
- Multi-score comparison: Side-by-side comparison of relevance, depth, accuracy
- Quality trends: Track scores over time
- Filter by threshold: Show only low-scoring traces
- Compare prompts: Which prompt version scores higher?
Mutable Score Configs
Configure score types and ranges in Langfuse settings:
# Score configs can be updated without code changes
# In Langfuse UI: Settings → Score Configs
# Numeric scores
langfuse.score(trace_id="...", name="relevance", value=0.85, data_type="NUMERIC")
# Categorical scores
langfuse.score(trace_id="...", name="sentiment", value="positive", data_type="CATEGORICAL")
# Boolean scores
langfuse.score(trace_id="...", name="contains_pii", value=0, data_type="BOOLEAN")Automated Scoring with G-Eval
from langfuse import observe, get_client
from app.shared.services.g_eval import GEvalScorer
scorer = GEvalScorer()
@observe()
async def analyze_with_scoring(query: str):
response = await llm.generate(query)
# Run G-Eval scoring
scores = await scorer.score(
query=query,
response=response,
criteria=["relevance", "coherence", "depth"],
)
# Record all scores
for criterion, score in scores.items():
get_client().score_current_trace(name=criterion, value=score)
return responseQuality Scores Trend Query
SELECT
DATE(timestamp) as date,
AVG(value) FILTER (WHERE name = 'relevance') as avg_relevance,
AVG(value) FILTER (WHERE name = 'depth') as avg_depth,
AVG(value) FILTER (WHERE name = 'factuality') as avg_factuality
FROM scores
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp)
ORDER BY date;Datasets for Evaluation
Create test datasets and run automated evaluations:
from langfuse import Langfuse, observe, get_client
langfuse = Langfuse()
# Fetch dataset
dataset = langfuse.get_dataset("security_audit_test_set")
@observe()
async def evaluate_item(item):
"""Evaluate a single dataset item with tracing."""
response = await llm.generate(item.input)
get_client().update_current_observation(
input=item.input,
output=response,
)
# Score
score = await evaluate_response(item.expected_output, response)
get_client().score_current_trace(name="accuracy", value=score)
return score
# Run evaluation
for item in dataset.items:
await evaluate_item(item)Dataset Structure in UI
security_audit_test_set
├── item_1: XSS vulnerability test
│ ├── input: "Check this HTML for XSS..."
│ └── expected_output: "Found XSS in innerHTML..."
├── item_2: SQL injection test
│ ├── input: "Review this SQL query..."
│ └── expected_output: "SQL injection vulnerability in WHERE clause..."
└── item_3: CSRF protection test
├── input: "Analyze this form..."
└── expected_output: "Missing CSRF token..."Evaluation Metrics
Common score types:
| Metric | Range | Description |
|---|---|---|
| Relevance | 0-1 | Does response address the query? |
| Coherence | 0-1 | Is response logically structured? |
| Depth | 0-1 | Level of detail and analysis |
| Factuality | 0-1 | Accuracy of claims |
| Completeness | 0-1 | All aspects of query covered? |
| Toxicity | 0-1 | Harmful or inappropriate content |
Best Practices
- Score all production traces for quality monitoring
- Use evaluator type (
@observe(type="evaluator")) for inspectable judge traces - Use consistent criteria across all evaluations
- Automate scoring with G-Eval or similar
- Set quality thresholds (e.g., avg_relevance > 0.7)
- Create test datasets for regression testing
- Track scores by prompt version to measure improvements
- Alert on quality drops (e.g., avg_score < 0.6 for 3 days)
Integration with OrchestKit Quality Gate
from langfuse import observe, get_client
@observe(name="quality_gate")
async def quality_gate_node(state: WorkflowState):
"""Quality gate with Langfuse scoring."""
# Get scores from evaluators
scores = await run_quality_evaluators(state)
# Log scores to trace
for criterion, score in scores.items():
get_client().score_current_trace(name=criterion, value=score)
# Check threshold
avg_score = sum(scores.values()) / len(scores)
passed = avg_score >= 0.7
return {"quality_gate_passed": passed, "quality_scores": scores}References
Ewma Baselines
EWMA Dynamic Baselines
Exponentially Weighted Moving Average for adaptive drift detection baselines.
Basic EWMA
import numpy as np
from dataclasses import dataclass
@dataclass
class EWMAState:
mean: float = 0.0
variance: float = 0.0
count: int = 0
class EWMABaseline:
"""EWMA-based dynamic baseline. Formula: EWMA_t = α × X_t + (1-α) × EWMA_{t-1}"""
def __init__(self, alpha: float = 0.2, sigma_threshold: float = 3.0, min_samples: int = 10):
self.alpha = alpha
self.sigma_threshold = sigma_threshold
self.min_samples = min_samples
self.state = EWMAState()
def update(self, value: float) -> dict:
"""Update baseline and check for anomaly."""
self.state.count += 1
if self.state.count == 1:
self.state.mean = value
self.state.variance = 0.0
else:
delta = value - self.state.mean
self.state.mean = self.alpha * value + (1 - self.alpha) * self.state.mean
self.state.variance = (1 - self.alpha) * (self.state.variance + self.alpha * delta ** 2)
std = np.sqrt(self.state.variance) if self.state.variance > 0 else 0.001
z_score = abs(value - self.state.mean) / std
is_anomaly = self.state.count >= self.min_samples and z_score > self.sigma_threshold
return {
"value": value,
"ewma_mean": self.state.mean,
"ewma_std": std,
"z_score": z_score,
"is_anomaly": is_anomaly
}Multi-Metric Tracker
class MultiMetricEWMA:
"""Track multiple metrics with independent baselines."""
def __init__(self, metrics: list[str], alpha: float = 0.2):
self.baselines = {m: EWMABaseline(alpha=alpha) for m in metrics}
def update(self, metrics: dict) -> dict:
results = {}
anomalies = []
for name, value in metrics.items():
if name in self.baselines:
result = self.baselines[name].update(value)
results[name] = result
if result["is_anomaly"]:
anomalies.append({"metric": name, "z_score": result["z_score"]})
return {"metrics": results, "anomalies": anomalies}Alpha Selection
| Use Case | Alpha | Behavior |
|---|---|---|
| Stable production | 0.1 | Slow adaptation |
| Active development | 0.3 | Moderate |
| High variability | 0.1-0.15 | Very stable |
| Sudden change detection | 0.4-0.5 | Quick response |
References
Experiments Api
Langfuse Experiments API
Overview
The Experiments API enables systematic evaluation of LLM outputs across datasets. Use it for A/B testing prompts, comparing models, and tracking quality over time. v3 adds the Experiment Runner SDK, dataset item versioning, and corrected outputs.
┌─────────────────────────────────────────────────────────────────────┐
│ Langfuse Experiments Flow │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Dataset │────▶│ Experiment │────▶│ Runs (Items) │ │
│ │ (inputs) │ │ (config) │ │ (executions) │ │
│ └──────────┘ └────────────┘ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Evaluators │ │
│ │ (judge outputs) │ │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Scores │ │
│ │ (per run) │ │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘Experiment Runner SDK (v3)
The high-level API simplifies running experiments:
from langfuse import Langfuse
langfuse = Langfuse()
async def my_pipeline(input_data: dict) -> str:
"""Your LLM pipeline to evaluate."""
return await llm.generate(input_data["query"])
# Run experiment in one call
result = langfuse.run_experiment(
dataset_name="golden-analysis-dataset",
experiment_name="sonnet-v-gpt5",
run_fn=my_pipeline,
evaluators=[
{"name": "relevance", "fn": relevance_evaluator},
{"name": "depth", "fn": depth_evaluator},
],
)
# Result contains:
# - experiment_id
# - per-item scores
# - aggregate statistics
print(f"Avg relevance: {result.stats['relevance']['mean']:.2f}")
print(f"Avg depth: {result.stats['depth']['mean']:.2f}")Creating Datasets
From Code
from langfuse import Langfuse
langfuse = Langfuse()
# Create dataset with JSON schema enforcement
dataset = langfuse.create_dataset(
name="golden-analysis-dataset",
description="Curated analysis examples with expected outputs",
metadata={"version": "v2", "schema_version": "1.0"},
)
# Add items with versioning
items = [
{
"input": {"url": "https://example.com/article1", "type": "article"},
"expected_output": "Expected analysis for article 1...",
"metadata": {"category": "tutorial", "difficulty": "beginner"},
},
{
"input": {"url": "https://example.com/article2", "type": "article"},
"expected_output": "Expected analysis for article 2...",
"metadata": {"category": "reference", "difficulty": "advanced"},
},
]
for item in items:
langfuse.create_dataset_item(
dataset_name="golden-analysis-dataset",
input=item["input"],
expected_output=item.get("expected_output"),
metadata=item.get("metadata"),
)From Existing Traces
# Create dataset from production traces
traces = langfuse.get_traces(
filter={
"score_name": "human_verified",
"score_value_gte": 0.9, # Only high-quality
},
limit=100,
)
dataset = langfuse.create_dataset(name="production-golden-v1")
for trace in traces:
langfuse.create_dataset_item(
dataset_name="production-golden-v1",
input=trace.input,
expected_output=trace.output,
metadata={"trace_id": trace.id},
)Dataset Item Versioning
Track changes to dataset items over time:
# Update an existing item — creates a new version
langfuse.update_dataset_item(
dataset_name="golden-analysis-dataset",
item_id="item_123",
expected_output="Updated expected output with more detail...",
metadata={"version": 2, "updated_by": "human_reviewer"},
)
# View item history in Langfuse UI:
# item_123 v1 (Jan 15) → v2 (Feb 01)
# Each experiment run records which version it evaluated againstJSON Schema Enforcement
Enforce structure on dataset items:
# Create dataset with schema
dataset = langfuse.create_dataset(
name="structured-eval-dataset",
description="Dataset with enforced input schema",
metadata={
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"context": {"type": "array", "items": {"type": "string"}},
},
"required": ["query"],
}
},
)
# Items must match schema — invalid items are rejected
langfuse.create_dataset_item(
dataset_name="structured-eval-dataset",
input={"query": "What is XSS?", "context": ["OWASP guide..."]},
expected_output="XSS is a web security vulnerability...",
)Dataset Folder Organization
Organize datasets into folders:
Datasets/
├── production/
│ ├── golden-v1
│ ├── golden-v2
│ └── regression-suite
├── experiments/
│ ├── prompt-variants
│ └── model-comparison
└── development/
├── unit-tests
└── edge-casesBatch Add Observations to Datasets
Add multiple observations at once:
# Batch add from production traces
observation_ids = ["obs_1", "obs_2", "obs_3", "obs_4", "obs_5"]
langfuse.batch_add_dataset_items(
dataset_name="production-golden-v1",
observation_ids=observation_ids,
metadata={"batch": "2026-02-01", "source": "production"},
)Corrected Outputs for Fine-Tuning
Use corrected outputs to build fine-tuning datasets:
# Add corrected output to existing dataset item
langfuse.update_dataset_item(
dataset_name="golden-analysis-dataset",
item_id="item_456",
corrected_output="Human-corrected version of the analysis...",
metadata={"corrected_by": "expert_reviewer", "correction_type": "factual"},
)
# Export for fine-tuning
items = langfuse.get_dataset("golden-analysis-dataset").items
fine_tuning_data = []
for item in items:
if item.corrected_output:
fine_tuning_data.append({
"messages": [
{"role": "user", "content": str(item.input)},
{"role": "assistant", "content": item.corrected_output},
]
})
# Export as JSONL for fine-tuning
import json
with open("fine_tuning.jsonl", "w") as f:
for entry in fine_tuning_data:
f.write(json.dumps(entry) + "\n")Running Experiments (Manual)
Basic Experiment
from langfuse import observe, get_client, Langfuse
langfuse = Langfuse()
@observe()
async def run_experiment(
dataset_name: str,
experiment_name: str,
model_config: dict,
):
"""Run an experiment on a dataset."""
dataset = langfuse.get_dataset(dataset_name)
results = []
for item in dataset.items:
@observe(name="experiment_run")
async def evaluate_item(item=item):
output = await your_pipeline(item.input, model_config)
get_client().update_current_observation(
input=item.input,
output=output,
metadata={"dataset_item_id": item.id},
)
return output
output = await evaluate_item()
results.append({"item_id": item.id, "output": output})
return resultsA/B Testing Models
async def ab_test_models(dataset_name: str):
"""Compare two model configurations."""
configs = {
"sonnet": {"model": "claude-sonnet-4-6", "temperature": 0.7},
"gpt5": {"model": "gpt-5.2", "temperature": 0.7},
}
for name, config in configs.items():
result = langfuse.run_experiment(
dataset_name=dataset_name,
experiment_name=f"model-comparison-{name}",
run_fn=lambda input_data: your_pipeline(input_data, config),
evaluators=[
{"name": "relevance", "fn": relevance_evaluator},
{"name": "depth", "fn": depth_evaluator},
],
)
print(f"{name}: avg_relevance={result.stats['relevance']['mean']:.2f}")Experiment Compare View
Compare experiments side-by-side in Langfuse UI:
- Aggregate scores: Average, median, std per criterion
- Per-item comparison: See how each item scored across experiments
- Annotations: Add notes to individual items or experiments
- Diff view: See which items improved or regressed
- Export: Download comparison as CSV
OrchestKit Integration
Golden Dataset Experiment
from langfuse import Langfuse
langfuse = Langfuse()
async def run_golden_experiment():
"""Run quality experiment on golden dataset."""
# 1. Create dataset from golden analyses
golden_analyses = await get_golden_analyses()
dataset = langfuse.create_dataset(name="orchestkit-golden-v1")
for analysis in golden_analyses:
langfuse.create_dataset_item(
dataset_name="orchestkit-golden-v1",
input={"url": analysis.url},
expected_output=analysis.synthesis,
metadata={"analysis_id": str(analysis.id)},
)
# 2. Run experiment with Experiment Runner
result = langfuse.run_experiment(
dataset_name="orchestkit-golden-v1",
experiment_name=f"quality-test-{datetime.now().isoformat()}",
run_fn=run_analysis_pipeline,
evaluators=[
{"name": "depth", "fn": depth_evaluator},
{"name": "accuracy", "fn": accuracy_evaluator},
{"name": "overall", "fn": overall_evaluator},
],
)
return {
"experiment_id": result.experiment_id,
"stats": result.stats,
"pass_rate": result.stats["overall"]["mean"] >= 0.7,
}Prompt Variant Testing
async def test_prompt_variants():
"""A/B test different prompt templates."""
variants = {
"detailed": "Provide a comprehensive, in-depth analysis...",
"concise": "Provide a brief, focused analysis...",
"structured": "Analyze using the following structure: 1) Overview...",
}
results = {}
for name, prompt in variants.items():
result = langfuse.run_experiment(
dataset_name="orchestkit-golden-v1",
experiment_name=f"prompt-variant-{name}",
run_fn=lambda input_data: run_with_prompt(input_data, prompt),
evaluators=[
{"name": "overall", "fn": overall_evaluator},
],
)
results[name] = result.stats["overall"]["mean"]
# Compare all variants
winner = max(results, key=results.get)
return {"winner": winner, "scores": results}Viewing Results
Langfuse Dashboard
- Experiments Tab: See all experiments with aggregate scores
- Compare View: Side-by-side experiment comparison with annotations
- Runs Tab: See individual executions with per-item scores
- Diff View: Identify regressions between experiment versions
Export Results
import pandas as pd
from langfuse import Langfuse
langfuse = Langfuse()
def export_experiment_results(experiment_id: str) -> pd.DataFrame:
"""Export experiment results to DataFrame."""
runs = langfuse.get_experiment_runs(experiment_id)
data = []
for run in runs:
scores = langfuse.get_scores(trace_id=run.trace_id)
score_dict = {s.name: s.value for s in scores}
data.append({
"run_id": run.id,
"item_id": run.dataset_item_id,
**run.input,
**score_dict,
})
return pd.DataFrame(data)Best Practices
- Use Experiment Runner SDK for simplified experiment execution
- Version your datasets with semantic names like
golden-v1,golden-v2 - Use corrected outputs to build fine-tuning datasets from production data
- Include metadata: Store model config, prompt version in experiment metadata
- Evaluate consistently: Use same evaluators across experiments
- Track over time: Run same experiment periodically to detect regression
- Use ground truth: When available, compute similarity to expected output
- Organize datasets in folders by purpose (production, experiments, dev)
Framework Integrations
Framework Integrations
Langfuse v3 integrates with modern AI frameworks via OpenTelemetry. Each integration provides automatic tracing of agent execution, tool calls, and LLM generations.
Claude Agent SDK
# pip install anthropic langfuse
from langfuse import observe, get_client
@observe(type="agent", name="claude_agent")
async def run_claude_agent(task: str):
"""Claude Agent SDK with Langfuse tracing."""
import anthropic
client = anthropic.Anthropic()
# Claude Agent SDK traces automatically via OTEL
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
tools=[
{
"name": "read_file",
"description": "Read a file from disk",
"input_schema": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
}
],
messages=[{"role": "user", "content": task}],
)
get_client().update_current_observation(
model="claude-sonnet-4-6",
usage={
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
},
)
return response
# Setup: Configure OTEL exporter to Langfuse
# LANGFUSE_PUBLIC_KEY=pk-... LANGFUSE_SECRET_KEY=sk-... python app.pyTrace Output
claude_agent (agent, 4.2s, $0.08)
├── messages.create (generation, 3.8s)
│ ├── tool_use: read_file(path="src/main.py")
│ ├── tool_result: "def main():..."
│ └── assistant: "The code has..."
└── metadata: {model: claude-sonnet-4-6, tokens: 2500}OpenAI Agents SDK
# pip install openai-agents langfuse
from langfuse import observe, get_client
@observe(type="agent", name="openai_agents_workflow")
async def run_openai_agents(query: str):
"""OpenAI Agents SDK with Langfuse tracing via OTEL."""
from agents import Agent, Runner, function_tool
@function_tool
def search_docs(query: str) -> str:
"""Search documentation."""
return f"Results for: {query}"
agent = Agent(
name="research_agent",
instructions="You are a research assistant.",
model="gpt-5.2",
tools=[search_docs],
)
# Runner automatically exports traces to Langfuse via OTEL
result = await Runner.run(agent, query)
get_client().update_current_observation(
output=result.final_output,
metadata={"agent": "research_agent", "model": "gpt-5.2"},
)
return result.final_outputSetup
# OpenAI Agents SDK uses OTEL for tracing
export OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel"
export OTEL_EXPORTER_OTLP_HEADERS="x-langfuse-public-key=pk-...,x-langfuse-secret-key=sk-..."Pydantic AI
# pip install pydantic-ai langfuse
from langfuse import observe, get_client
@observe(type="agent", name="pydantic_ai_agent")
async def run_pydantic_agent(query: str):
"""Pydantic AI agent with Langfuse tracing."""
from pydantic_ai import Agent
from pydantic import BaseModel
class AnalysisResult(BaseModel):
summary: str
risk_level: str
recommendations: list[str]
agent = Agent(
"claude-sonnet-4-6",
result_type=AnalysisResult,
system_prompt="You are a security analyst.",
)
# Pydantic AI supports Langfuse via instrument() or OTEL
result = await agent.run(query)
get_client().update_current_observation(
output=result.data.model_dump(),
metadata={"risk_level": result.data.risk_level},
)
return result.data
# Setup: Use Pydantic AI's built-in Langfuse instrumentation
# from pydantic_ai.integrations.langfuse import LangfuseInstrumentation
# LangfuseInstrumentation().instrument()CrewAI
# pip install crewai langfuse
from langfuse import observe, get_client
@observe(type="agent", name="crewai_workflow")
async def run_crew(task_description: str):
"""CrewAI crew with Langfuse tracing."""
from crewai import Crew, Agent, Task
researcher = Agent(
role="Senior Researcher",
goal="Find comprehensive information",
backstory="Expert researcher with 10 years experience",
llm="claude-sonnet-4-6",
)
writer = Agent(
role="Technical Writer",
goal="Create clear documentation",
backstory="Technical writer specializing in developer docs",
llm="claude-sonnet-4-6",
)
research_task = Task(
description=f"Research: {task_description}",
agent=researcher,
expected_output="Comprehensive research findings",
)
writing_task = Task(
description="Write documentation from research",
agent=writer,
expected_output="Polished documentation",
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
)
# CrewAI v0.80+ has built-in Langfuse callback
result = crew.kickoff()
get_client().update_current_observation(
output=str(result),
metadata={"agents": 2, "tasks": 2},
)
return resultSetup
# CrewAI Langfuse integration
import os
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-..."
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
# CrewAI auto-detects Langfuse env vars and sends tracesLiveKit Agents
# pip install livekit-agents langfuse
from langfuse import observe, get_client
@observe(type="agent", name="livekit_voice_agent")
async def run_voice_agent(session_id: str):
"""LiveKit voice agent with Langfuse tracing."""
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import openai, silero
assistant = VoiceAssistant(
vad=silero.VAD.load(),
stt=openai.STT(),
llm=openai.LLM(model="gpt-5.2"),
tts=openai.TTS(),
)
get_client().update_current_observation(
metadata={
"session_id": session_id,
"pipeline": "vad→stt→llm→tts",
},
)
return assistant
# LiveKit traces voice pipeline stages:
# VAD → STT → LLM → TTS (each as separate observation)Setup
# LiveKit exports OTEL traces to Langfuse
export OTEL_EXPORTER_OTLP_ENDPOINT="https://cloud.langfuse.com/api/public/otel"
export OTEL_EXPORTER_OTLP_HEADERS="x-langfuse-public-key=pk-...,x-langfuse-secret-key=sk-..."Amazon Bedrock AgentCore
# pip install boto3 langfuse
from langfuse import observe, get_client
@observe(type="agent", name="bedrock_agent")
async def run_bedrock_agent(query: str):
"""Amazon Bedrock AgentCore with Langfuse tracing."""
import boto3
bedrock = boto3.client("bedrock-agent-runtime")
response = bedrock.invoke_agent(
agentId="AGENT_ID",
agentAliasId="ALIAS_ID",
sessionId="session-123",
inputText=query,
)
output = ""
for event in response["completion"]:
if "chunk" in event:
output += event["chunk"]["bytes"].decode()
get_client().update_current_observation(
input=query,
output=output,
metadata={"provider": "bedrock", "agent_id": "AGENT_ID"},
)
return outputSetup
# Bedrock traces via AWS X-Ray → OTEL → Langfuse
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from langfuse.opentelemetry import LangfuseSpanProcessor
processor = LangfuseSpanProcessor()
# Add to your OTEL tracer providerJavaScript/TypeScript Integration
All frameworks above also work in JS/TS using @langfuse/otel:
import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseExporter } from "@langfuse/otel";
const sdk = new NodeSDK({
traceExporter: new LangfuseExporter({
publicKey: process.env.LANGFUSE_PUBLIC_KEY,
secretKey: process.env.LANGFUSE_SECRET_KEY,
}),
});
sdk.start();
// All OTEL-compatible frameworks now trace to LangfuseIntegration Matrix
| Framework | Python | JS/TS | Tracing Method | Auto-Instrument |
|---|---|---|---|---|
| Claude Agent SDK | Yes | Yes | OTEL exporter | Manual @observe |
| OpenAI Agents | Yes | Yes | OTEL native | Auto via env vars |
| Pydantic AI | Yes | — | .instrument() | Auto |
| CrewAI | Yes | — | Callback | Auto via env vars |
| LiveKit Agents | Yes | Yes | OTEL exporter | Auto |
| Bedrock AgentCore | Yes | Yes | X-Ray → OTEL | Manual |
| LangChain | Yes | Yes | Callback | Auto |
| LangGraph | Yes | Yes | Callback | Auto |
References
Langfuse Evidently Integration
Langfuse + Evidently AI Integration
Combining Langfuse tracing with Evidently AI drift detection.
Export Langfuse Data
from langfuse import Langfuse
import pandas as pd
from datetime import datetime, timedelta
langfuse = Langfuse()
def export_langfuse_scores(days: int = 7) -> pd.DataFrame:
"""Export Langfuse scores to DataFrame for Evidently."""
traces = langfuse.get_traces(from_timestamp=datetime.now() - timedelta(days=days))
records = []
for trace in traces.data:
scores = {s.name: s.value for s in trace.scores}
if scores:
records.append({"trace_id": trace.id, "timestamp": trace.timestamp, **scores})
return pd.DataFrame(records)Evidently Drift Report
from evidently import Report
from evidently.metrics import DatasetDriftMetric, ColumnDriftMetric
def run_drift_report(baseline_df: pd.DataFrame, current_df: pd.DataFrame, columns: list) -> dict:
"""Run Evidently drift detection."""
report = Report(metrics=[DatasetDriftMetric()])
for col in columns:
report.metrics.append(ColumnDriftMetric(column_name=col))
report.run(reference_data=baseline_df, current_data=current_df)
result = report.as_dict()
return {
"dataset_drift": result["metrics"][0]["result"].get("dataset_drift"),
"drift_share": result["metrics"][0]["result"].get("share_of_drifted_columns")
}Automated Monitoring
class LangfuseEvidentlyMonitor:
def __init__(self, baseline_days: int = 7, current_days: int = 1):
self.langfuse = Langfuse()
self.baseline_days = baseline_days
self.current_days = current_days
def run_analysis(self, metrics: list) -> dict:
baseline_df = export_langfuse_scores(self.baseline_days + self.current_days)
current_df = export_langfuse_scores(self.current_days)
results = run_drift_report(baseline_df, current_df, metrics)
return resultsReferences
Logging Patterns
Logging Patterns
Advanced structured logging patterns for production systems.
Correlation IDs
Trace requests across services:
import structlog
import uuid_utils # pip install uuid-utils (UUID v7 support for Python < 3.14)
logger = structlog.get_logger()
@app.middleware("http")
async def correlation_middleware(request: Request, call_next):
# Get or generate correlation ID (UUID v7 for time-ordering in distributed traces)
correlation_id = request.headers.get("X-Correlation-ID") or str(uuid_utils.uuid7())
# Bind to logger context (all logs in this request will include it)
structlog.contextvars.bind_contextvars(
correlation_id=correlation_id,
method=request.method,
path=request.url.path
)
# Add to response headers
response = await call_next(request)
response.headers["X-Correlation-ID"] = correlation_id
return responseBenefits:
- Find all logs related to a single request
- Track requests across microservices
- Debug distributed transactions
Log Sampling
Problem: Too many logs in high-traffic endpoints Solution: Sample less critical logs
import random
def should_sample(level: str, rate: float = 0.1) -> bool:
"""Sample logs based on level and rate."""
if level in ["ERROR", "CRITICAL"]:
return True # Always log errors
return random.random() < rate
# Log 100% of errors, 10% of info
if should_sample("INFO", rate=0.1):
logger.info("User created", user_id=user.id)Sampling rates:
- ERROR/CRITICAL: 100% (always log)
- WARN: 50% (sample half)
- INFO: 10% (sample 10%)
- DEBUG: 1% (sample 1% in production)
Log Aggregation with Loki
Loki Query Language (LogQL) examples:
# Find all errors in last hour
{app="backend"} |= "ERROR" | json
# Count errors by endpoint
sum by (endpoint) (
count_over_time({app="backend"} |= "ERROR" [5m])
)
# p95 latency from structured logs
quantile_over_time(0.95,
{app="backend"}
| json
| unwrap duration_ms [5m]
)
# Search for specific correlation ID
{app="backend"} | json | correlation_id="abc-123-def"OrchestKit Logging Example
import structlog
from structlog.processors import JSONRenderer, TimeStamper, add_log_level
# Configure structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # Merge correlation IDs
add_log_level,
TimeStamper(fmt="iso"),
JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True
)
logger = structlog.get_logger()
# Usage in workflow
@workflow_node
async def supervisor_node(state: AnalysisState):
"""Route to next agent."""
# Bind context for all logs in this function
log = logger.bind(
correlation_id=state["correlation_id"],
analysis_id=state["analysis_id"],
workflow_step="supervisor"
)
completed = set(state["agents_completed"])
available = [a for a in ALL_AGENTS if a not in completed]
if not available:
log.info("all_agents_completed", total_findings=len(state["findings"]))
state["next_node"] = "quality_gate"
else:
next_agent = available[0]
log.info("routing_to_agent", agent=next_agent, remaining=len(available))
state["next_node"] = next_agent
return stateExample log output:
{
"event": "routing_to_agent",
"level": "info",
"timestamp": "2025-01-15T10:30:45.123Z",
"correlation_id": "abc-123-def",
"analysis_id": "550e8400-e29b-41d4-a716-446655440000",
"workflow_step": "supervisor",
"agent": "tech_comparator",
"remaining": 7
}See scripts/structured-logging.ts for Winston setup.
Metrics Collection
Metrics Collection
Application metrics best practices with Prometheus.
Metric Types
1. Counter - Monotonically increasing value (resets to 0 on restart)
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
# Usage
http_requests_total.labels(method='GET', endpoint='/api/users', status=200).inc()Use cases: Request counts, error counts, bytes processed
2. Gauge - Value that can go up or down
active_connections = Gauge(
'active_connections',
'Number of active database connections'
)
# Usage
active_connections.set(25) # Set to specific value
active_connections.inc() # Increment by 1
active_connections.dec() # Decrement by 1Use cases: Queue length, memory usage, temperature
3. Histogram - Distribution of values (with buckets)
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint'],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10] # Choose meaningful buckets!
)
# Usage
with request_duration.labels(method='GET', endpoint='/api/users').time():
# ... handle request
passUse cases: Request latency, response size
4. Summary - Like Histogram but calculates quantiles on client side
request_duration = Summary(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)Histogram vs Summary:
- Histogram: Calculate quantiles on Prometheus server (recommended)
- Summary: Calculate quantiles on application side (higher client CPU, can't aggregate across instances)
Cardinality Management
Problem: Too many unique label combinations
# BAD: Unbounded cardinality (user_id can be millions of values)
http_requests_total = Counter(
'http_requests_total',
['method', 'endpoint', 'user_id'] # user_id creates millions of time series!
)
# GOOD: Bounded cardinality
http_requests_total = Counter(
'http_requests_total',
['method', 'endpoint', 'status'] # Limited to ~10 methods x 100 endpoints x 10 statuses = 10,000 series
)Cardinality limits:
- Good: < 10,000 unique time series per metric
- Acceptable: 10,000-100,000
- Bad: > 100,000 (Prometheus performance degrades)
Rule: Never use unbounded labels (user IDs, request IDs, timestamps)
Custom Business Metrics
# LLM token usage
llm_tokens_used = Counter(
'llm_tokens_used_total',
'Total LLM tokens consumed',
['model', 'operation'] # e.g., model='claude-sonnet', operation='analysis'
)
# LLM cost tracking
llm_cost_dollars = Counter(
'llm_cost_dollars_total',
'Total LLM cost in dollars',
['model']
)
# Cache hit rate
cache_operations = Counter(
'cache_operations_total',
'Cache operations',
['operation', 'result'] # operation='get', result='hit|miss'
)
# Cache hit rate query:
# sum(rate(cache_operations_total{result="hit"}[5m])) /
# sum(rate(cache_operations_total[5m]))LLM Cost Tracking Example
from prometheus_client import Counter, Histogram
llm_tokens_used = Counter(
'llm_tokens_used_total',
'Total LLM tokens consumed',
['model', 'operation', 'token_type']
)
llm_cost_dollars = Counter(
'llm_cost_dollars_total',
'Total LLM cost in dollars',
['model', 'operation']
)
llm_request_duration = Histogram(
'llm_request_duration_seconds',
'LLM request duration',
['model', 'operation'],
buckets=[0.5, 1, 2, 5, 10, 20, 30]
)
@observe(name="llm_call")
async def call_llm(prompt: str, model: str, operation: str) -> str:
start_time = time.time()
response = await anthropic_client.messages.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1024
)
duration = time.time() - start_time
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
llm_tokens_used.labels(model=model, operation=operation, token_type="input").inc(input_tokens)
llm_tokens_used.labels(model=model, operation=operation, token_type="output").inc(output_tokens)
# Cost calculation (Claude Sonnet 4.5 pricing)
input_cost = (input_tokens / 1_000_000) * 3.00
output_cost = (output_tokens / 1_000_000) * 15.00
total_cost = input_cost + output_cost
llm_cost_dollars.labels(model=model, operation=operation).inc(total_cost)
llm_request_duration.labels(model=model, operation=operation).observe(duration)
return response.content[0].textGrafana dashboard queries:
# Total cost per day
sum(increase(llm_cost_dollars_total[1d])) by (model)
# Token usage rate
sum(rate(llm_tokens_used_total[5m])) by (model, token_type)
# Cost per operation
sum(increase(llm_cost_dollars_total[1h])) by (operation)
# p95 LLM latency
histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m]))See scripts/prometheus-metrics.ts for complete setup.
Migration V2 V3
Migration Guide: v2 → v3 (Python) / v3 → v4 (JS)
Python SDK: v2 → v3
Import Changes
| v2 (Deprecated) | v3 (Current) |
|---|---|
from langfuse.decorators import observe | from langfuse import observe |
from langfuse.decorators import langfuse_context | from langfuse import get_client |
from langfuse import Langfuse | from langfuse import Langfuse (unchanged) |
from langfuse.callback import CallbackHandler | from langfuse.callback import CallbackHandler (unchanged) |
API Changes
| v2 Pattern | v3 Pattern |
|---|---|
langfuse_context.update_current_observation(...) | get_client().update_current_observation(...) |
langfuse_context.update_current_trace(...) | get_client().update_current_trace(...) |
langfuse_context.flush() | get_client().flush() |
langfuse.trace(name="foo") (explicit) | Auto-created by first @observe() root span |
@observe() (generation only) | @observe(type="agent") (7 new types) |
Trace ID Format
| v2 | v3 |
|---|---|
UUID (550e8400-e29b-41d4-a716-446655440000) | W3C Trace Context (4bf92f3577b34da6a3ce929d0e0e4736) |
Migration Example
# ❌ v2 (DEPRECATED)
from langfuse.decorators import observe, langfuse_context
@observe()
async def analyze(content: str):
langfuse_context.update_current_observation(
metadata={"length": len(content)}
)
langfuse_context.update_current_trace(
user_id="user_123",
session_id="session_abc",
)
return await llm.generate(content)
# ✅ v3 (CURRENT)
from langfuse import observe, get_client
@observe()
async def analyze(content: str):
get_client().update_current_observation(
metadata={"length": len(content)}
)
get_client().update_current_trace(
user_id="user_123",
session_id="session_abc",
)
return await llm.generate(content)Explicit Trace Creation — Before/After
# ❌ v2: Explicit trace creation
from langfuse import Langfuse
langfuse = Langfuse()
trace = langfuse.trace(
name="analysis",
user_id="user_123",
session_id="session_abc",
)
generation = trace.generation(name="llm_call", model="claude-sonnet-4-6")
# ✅ v3: Auto-trace via @observe (preferred)
from langfuse import observe, get_client
@observe() # First root @observe creates the trace automatically
async def analysis(content: str):
get_client().update_current_trace(
user_id="user_123",
session_id="session_abc",
)
return await llm_call(content)
@observe(name="llm_call")
async def llm_call(content: str):
get_client().update_current_observation(
model="claude-sonnet-4-6",
)
return await llm.generate(content)Low-Level Client Still Works
# Explicit Langfuse() client is still available for programmatic use
from langfuse import Langfuse
langfuse = Langfuse()
# Creating datasets, scoring, experiments — same API
dataset = langfuse.create_dataset(name="golden-v1")
langfuse.score(trace_id="...", name="quality", value=0.85)JavaScript/TypeScript SDK: v3 → v4
Package Changes
| v3 (Deprecated) | v4 (Current) |
|---|---|
langfuse (monolith) | @langfuse/core (base) |
| — | @langfuse/tracing (trace API) |
| — | @langfuse/otel (OTEL exporter) |
langfuse-langchain | @langfuse/langchain |
langfuse-vercel | @langfuse/vercel |
Setup Changes
// ❌ v3 (DEPRECATED)
import Langfuse from "langfuse";
const langfuse = new Langfuse({
publicKey: "pk-...",
secretKey: "sk-...",
});
// ✅ v4: Option A — Direct tracing
import { Langfuse } from "@langfuse/core";
const langfuse = new Langfuse({
publicKey: "pk-...",
secretKey: "sk-...",
});
// ✅ v4: Option B — OTEL-native (recommended)
import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseExporter } from "@langfuse/otel";
const sdk = new NodeSDK({
traceExporter: new LangfuseExporter({
publicKey: process.env.LANGFUSE_PUBLIC_KEY,
secretKey: process.env.LANGFUSE_SECRET_KEY,
}),
});
sdk.start();Tracing Changes
// ❌ v3
const trace = langfuse.trace({ name: "analysis" });
const span = trace.span({ name: "retrieval" });
const generation = trace.generation({
name: "llm_call",
model: "claude-sonnet-4-6",
input: messages,
});
generation.end({ output: response });
span.end();
// ✅ v4: OTEL spans
import { trace } from "@opentelemetry/api";
const tracer = trace.getTracer("my-app");
await tracer.startActiveSpan("analysis", async (span) => {
await tracer.startActiveSpan("retrieval", async (childSpan) => {
// retrieval logic
childSpan.end();
});
span.end();
});Self-Hosting v3 Architecture
Langfuse v3 self-hosting uses a new multi-service architecture:
┌─────────────────────────────────────────────────────────┐
│ Langfuse v3 Architecture │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ │
│ │ Web UI │────▶│ API Server │ │
│ └──────────┘ └──────┬───────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ClickHouse │ │ Redis │ │ Postgres │ │
│ │ (analytics) │ │ (cache) │ │ (metadata)│ │
│ └──────────────┘ └──────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ S3 / MinIO │ │
│ │ (media/blob)│ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘Component Roles
| Component | Role | v2 Equivalent |
|---|---|---|
| ClickHouse | Analytics, traces, observations, scores | PostgreSQL (was single DB) |
| PostgreSQL | Metadata, users, projects, prompts | PostgreSQL |
| Redis | Caching, rate limiting, real-time features | Not required |
| S3/MinIO | Media storage, large payloads | Stored in PostgreSQL |
Docker Compose (v3)
# docker-compose.yml (Langfuse v3)
services:
langfuse:
image: langfuse/langfuse:3
ports:
- "3000:3000"
environment:
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/langfuse
CLICKHOUSE_URL: http://clickhouse:8123
REDIS_URL: redis://redis:6379
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY_ID: minioadmin
S3_SECRET_ACCESS_KEY: minioadmin
S3_BUCKET_NAME: langfuse
depends_on:
- postgres
- clickhouse
- redis
- minio
postgres:
image: postgres:16
environment:
POSTGRES_DB: langfuse
POSTGRES_PASSWORD: postgres
clickhouse:
image: clickhouse/clickhouse-server:24
volumes:
- clickhouse_data:/var/lib/clickhouse
redis:
image: redis:7-alpine
minio:
image: minio/minio
command: server /data
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadminHelm Chart
# Kubernetes deployment
helm repo add langfuse https://langfuse.github.io/langfuse-helm
helm install langfuse langfuse/langfuse \
--set clickhouse.enabled=true \
--set redis.enabled=true \
--set postgresql.enabled=true \
--set s3.enabled=trueClickHouse Acquisition (Jan 16, 2026)
ClickHouse Inc. acquired Langfuse on January 16, 2026. Key implications:
- Langfuse remains open-source under the same license
- ClickHouse becomes the default analytics backend (was already used in v3)
- Cloud offering continues at cloud.langfuse.com, now backed by ClickHouse Cloud
- Self-hosting remains fully supported
- No SDK breaking changes from the acquisition — v3 Python and v4 JS SDKs unchanged
- Performance improvements expected from deeper ClickHouse integration
Breaking Changes Checklist
Python v2 → v3
- Replace
from langfuse.decorators import observe→from langfuse import observe - Replace
from langfuse.decorators import langfuse_context→from langfuse import get_client - Replace all
langfuse_context.update_current_observation()→get_client().update_current_observation() - Replace all
langfuse_context.update_current_trace()→get_client().update_current_trace() - Remove explicit
langfuse.trace()calls where@observe()creates auto-traces - Update trace ID handling if relying on UUID format (now W3C)
- Add
type=parameter to@observe()for new observation types - Update
requirements.txt:langfuse>=3.13.0
JavaScript v3 → v4
- Replace
langfusepackage with@langfuse/core+@langfuse/otel - Update imports from
langfuse→@langfuse/core - Replace
langfuse-langchain→@langfuse/langchain - Set up OTEL exporter for automatic tracing
- Update
package.jsondependencies
References
Multi Judge Evaluation
Multi-Judge Evaluation with Langfuse
Overview
Multi-judge evaluation uses multiple LLM evaluators to assess quality from different perspectives. Langfuse v3 provides evaluator execution tracing — each judge creates its own inspectable trace.
OrchestKit has built-in evaluators at backend/app/shared/services/g_eval/langfuse_evaluators.py - but they're not wired up!
┌─────────────────────────────────────────────────────────────────────┐
│ Multi-Judge Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ LLM Output ──────────────────────────────────────────────────── │
│ │ │
│ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Judge 1 │ │ Judge 2 │ │ Judge 3 │ │ Judge 4 │ │
│ │ Depth │ │ Accuracy│ │ Clarity │ │ Relevance│ │
│ │ 0.85 │ │ 0.90 │ │ 0.75 │ │ 0.92 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ └────────────┴────────────┴────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Score Aggregator │ │
│ │ Weighted: 0.87 │ │
│ └──────────────────┘ │
│ │ │
│ ▼ │
│ [Langfuse Score API] │
│ │
└─────────────────────────────────────────────────────────────────────┘G-Eval Criteria (Built into OrchestKit)
OrchestKit uses these evaluation criteria:
| Criterion | Weight | Description |
|---|---|---|
| depth | 0.30 | Technical depth and thoroughness |
| accuracy | 0.25 | Factual correctness |
| specificity | 0.20 | Concrete examples and details |
| coherence | 0.15 | Logical structure and flow |
| usefulness | 0.10 | Practical applicability |
Evaluator Execution Tracing (v3)
Each evaluator run creates its own inspectable trace:
from langfuse import observe, get_client, Langfuse
langfuse = Langfuse()
@observe(type="evaluator", name="depth_judge")
async def depth_evaluator(trace_id: str, output: str) -> float:
"""Depth evaluator — creates its own inspectable trace."""
score = await g_eval.evaluate(criterion="depth", output=output)
get_client().update_current_observation(
input=output[:500],
output={"score": score, "criterion": "depth"},
model="claude-sonnet-4-6",
)
# Record score on the original trace
langfuse.score(
trace_id=trace_id,
name="g_eval_depth",
value=score,
comment="G-Eval depth score",
)
return scoreResult in Langfuse UI — the evaluator's own trace is inspectable:
evaluator:depth_judge (0.6s, $0.008)
├── generation: depth_prompt → 0.85
└── metadata: {criterion: "depth", model: "claude-sonnet-4-6"}Existing OrchestKit Evaluators (v3 Updated)
# backend/app/shared/services/g_eval/langfuse_evaluators.py
from langfuse import observe, get_client, Langfuse
langfuse = Langfuse()
def create_g_eval_evaluator(criterion: str):
"""
Create a Langfuse evaluator for a G-Eval criterion.
Each evaluator creates an inspectable trace via @observe(type="evaluator").
"""
@observe(type="evaluator", name=f"g_eval_{criterion}")
async def evaluator(trace_id: str, output: str, **kwargs) -> float:
# Run G-Eval for this criterion
score = await g_eval.evaluate(
criterion=criterion,
output=output,
**kwargs,
)
get_client().update_current_observation(
input=output[:500],
output={"score": score, "criterion": criterion},
)
# Record score in Langfuse
langfuse.score(
trace_id=trace_id,
name=f"g_eval_{criterion}",
value=score,
comment=f"G-Eval {criterion} score",
)
return score
return evaluator
def create_g_eval_overall_evaluator():
"""
Create evaluator for weighted overall score.
"""
weights = {
"depth": 0.30,
"accuracy": 0.25,
"specificity": 0.20,
"coherence": 0.15,
"usefulness": 0.10,
}
@observe(type="evaluator", name="g_eval_overall")
async def evaluator(trace_id: str, output: str, **kwargs) -> float:
scores = {}
# Run all criteria
for criterion in weights.keys():
scores[criterion] = await g_eval.evaluate(
criterion=criterion,
output=output,
**kwargs,
)
# Calculate weighted average
overall = sum(
scores[c] * weights[c]
for c in weights.keys()
)
get_client().update_current_observation(
output={"overall": overall, "scores": scores},
)
# Record in Langfuse
langfuse.score(
trace_id=trace_id,
name="g_eval_overall",
value=overall,
comment=f"Weighted G-Eval: {scores}",
)
return overall
return evaluator
# Pre-built evaluators (ready to use!)
depth_evaluator = create_g_eval_evaluator("depth")
accuracy_evaluator = create_g_eval_evaluator("accuracy")
specificity_evaluator = create_g_eval_evaluator("specificity")
coherence_evaluator = create_g_eval_evaluator("coherence")
usefulness_evaluator = create_g_eval_evaluator("usefulness")
overall_evaluator = create_g_eval_overall_evaluator()Wiring Evaluators to Workflow
Option 1: Quality Gate Integration
from langfuse import observe, get_client
from app.shared.services.g_eval.langfuse_evaluators import (
overall_evaluator,
depth_evaluator,
accuracy_evaluator,
)
@observe(name="quality_gate")
async def quality_gate_node(state: AnalysisState) -> dict:
"""Quality gate with Langfuse multi-judge evaluation."""
trace_id = state.get("langfuse_trace_id")
synthesis = state["synthesis_result"]
# Run multi-judge evaluation (each creates inspectable trace)
scores = {}
scores["depth"] = await depth_evaluator(trace_id, synthesis)
scores["accuracy"] = await accuracy_evaluator(trace_id, synthesis)
# Overall score
overall = await overall_evaluator(trace_id, synthesis)
scores["overall"] = overall
# Quality gate decision
passed = overall >= 0.7
return {
"quality_scores": scores,
"quality_passed": passed,
"quality_gate_reason": (
"Passed" if passed else f"Score {overall:.2f} below threshold"
),
}Option 2: Experiment Runner Integration
from langfuse import Langfuse
langfuse = Langfuse()
async def run_quality_experiment(dataset_name: str):
"""Run multi-judge evaluation using Experiment Runner."""
result = langfuse.run_experiment(
dataset_name=dataset_name,
experiment_name=f"quality-eval-{datetime.now().isoformat()}",
run_fn=run_analysis_pipeline,
evaluators=[
{"name": "depth", "fn": depth_evaluator},
{"name": "accuracy", "fn": accuracy_evaluator},
{"name": "overall", "fn": overall_evaluator},
],
)
return {
"experiment_id": result.experiment_id,
"avg_overall": result.stats["overall"]["mean"],
"avg_depth": result.stats["depth"]["mean"],
"pass_rate": sum(
1 for r in result.runs if r.scores["overall"] >= 0.7
) / len(result.runs),
}Best Practices
1. Use Multiple Independent Judges
# BAD: Single judge decides everything
score = await evaluate(output)
# GOOD: Multiple judges with @observe(type="evaluator"), aggregate
scores = await asyncio.gather(
depth_judge(output),
accuracy_judge(output),
clarity_judge(output),
)
overall = weighted_average(scores, weights)2. Log All Scores to Langfuse
# BAD: Only log final score
langfuse.score(trace_id=trace_id, name="quality", value=0.85)
# GOOD: Log individual + aggregate
for criterion, score in scores.items():
langfuse.score(
trace_id=trace_id,
name=f"g_eval_{criterion}",
value=score,
comment=f"G-Eval {criterion}",
)
langfuse.score(
trace_id=trace_id,
name="g_eval_overall",
value=overall,
comment=f"Weighted average of {list(scores.keys())}",
)3. Include Ground Truth When Available
# If you have ground truth (golden dataset)
langfuse.score(
trace_id=trace_id,
name="human_verified",
value=ground_truth_score,
source="human", # Distinguish from LLM judges
)4. Track Judge Agreement
# Measure inter-judge agreement
agreement = calculate_agreement(scores)
langfuse.score(
trace_id=trace_id,
name="judge_agreement",
value=agreement,
comment="Inter-judge correlation",
)
# Flag for review if judges disagree
if agreement < 0.5:
langfuse.event(
trace_id=trace_id,
name="low_judge_agreement",
metadata={"scores": scores, "agreement": agreement},
level="WARNING",
)Viewing Results in Langfuse
Dashboard Queries
-- Average scores by criterion
SELECT
name,
AVG(value) as avg_score,
COUNT(*) as count
FROM scores
WHERE name LIKE 'g_eval_%'
GROUP BY name
ORDER BY avg_score DESC;
-- Score distribution
SELECT
name,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) as median,
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY value) as p25,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY value) as p75
FROM scores
WHERE name = 'g_eval_overall'
GROUP BY name;Score Visualization
┌─────────────────────────────────────────────────────────────────────┐
│ G-Eval Score Distribution │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ depth ████████████████████░░░░ 0.82 │
│ accuracy █████████████████████░░░ 0.85 │
│ specificity ██████████████░░░░░░░░░░ 0.68 │
│ coherence ███████████████████░░░░░ 0.78 │
│ usefulness ████████████████████████ 0.91 │
│ ───────────────────────────────────── │
│ overall ██████████████████░░░░░░ 0.81 │
│ │
│ Threshold: 0.70 [PASS] │
│ │
└─────────────────────────────────────────────────────────────────────┘Integration Steps for OrchestKit
-
Import existing evaluators (they're already built!)
from app.shared.services.g_eval.langfuse_evaluators import overall_evaluator -
Pass trace_id through workflow
from langfuse import observe, get_client @observe(name="analysis") async def run_analysis(): # trace_id is automatically managed by @observe state["langfuse_trace_id"] = get_client().get_current_trace_id() -
Call evaluators in quality gate
await overall_evaluator(state["langfuse_trace_id"], synthesis) -
View scores in Langfuse dashboard
- Navigate to trace — see all G-Eval scores
- Click evaluator scores — see the evaluator's own trace
- Analyze trends over time
Online Evaluators
Online Evaluators
Always-on LLM-as-judge evaluation configured entirely in the Langfuse UI — no code required. Every new trace is automatically scored against a configurable rubric.
What Are Online Evaluators?
Online evaluators are LLM-as-judge rules configured in the Langfuse UI that run automatically on every new trace matching a filter. Unlike custom code evaluators that run in your application, online evaluators run server-side in Langfuse using a model and rubric you define.
| Online Evaluators | Custom Code Evaluators | |
|---|---|---|
| Configuration | Langfuse UI | Python/TypeScript code |
| Trigger | Every matching trace, automatically | Explicit call in your code |
| Latency | Asynchronous, post-trace | Synchronous or async |
| Use case | Always-on quality monitoring | Complex multi-step logic |
| Model access | Langfuse-managed | Your own API keys |
Setting Up an Online Evaluator (UI)
-
Navigate to Evaluators in the Langfuse sidebar
-
Click Create Evaluator
-
Configure:
- Name — e.g.,
response-quality,safety-check,hallucination-detector - Evaluator type — select LLM-as-judge
- Model — e.g.,
claude-sonnet-4-6,gpt-4o - Score name — the key stored on the trace (e.g.,
quality,safety) - Filter — which traces to evaluate (by tag, model, metadata field, etc.)
- Sampling rate — evaluate 100% or a random sample (e.g., 10% for high-volume endpoints)
- Name — e.g.,
-
Write the rubric prompt using template variables (see below)
-
Click Save — the evaluator activates immediately
Rubric Prompt Template Variables
Template variables pull values from the trace at evaluation time:
| Variable | Value pulled from trace |
|---|---|
\{\{input\}\} | The trace input |
\{\{output\}\} | The trace output |
\{\{metadata.key\}\} | Any metadata field |
\{\{scores.score_name\}\} | An existing score on the trace |
Example rubric for response quality:
You are evaluating the quality of an AI assistant response.
User query: {{input}}
Assistant response: {{output}}
Score the response on a scale from 0 to 1:
- 1.0: Complete, accurate, and directly addresses the query
- 0.7: Mostly correct but missing some details
- 0.5: Partially addresses the query or contains minor errors
- 0.3: Mostly off-topic or contains significant errors
- 0.0: Completely wrong, harmful, or irrelevant
Respond with ONLY a number between 0 and 1. No explanation.Scoring Output Format
Configure how Langfuse parses the model's response:
- Numeric — model outputs a number (0–1 or 0–10), Langfuse stores it as-is
- Categorical — model outputs a label (e.g.,
PASS/FAIL), mapped to numeric values you define - Boolean — model outputs
true/false
For numeric scoring, set the expected min and max in the evaluator config so Langfuse normalises scores correctly in dashboards.
When to Use Online vs Custom Code Evaluators
Use online evaluators when:
- You need always-on monitoring without changing application code
- The evaluation logic is expressible as an LLM rubric
- You want fast iteration — update the rubric in the UI without deploying code
- You need sampling to control evaluation cost on high-volume endpoints
Use custom code evaluators when:
- Evaluation requires deterministic logic (regex, exact-match, schema validation)
- You need access to external systems (database lookups, API calls)
- You need multi-step evaluation with intermediate reasoning
- Evaluation must run synchronously before returning a response to the user
Combine both for full coverage: online evaluators catch broad quality regressions; custom evaluators enforce precise correctness criteria.
# Custom code evaluator alongside online evaluator
from langfuse import observe, get_client
@observe(name="generate-and-evaluate")
async def generate(user_query: str) -> str:
"""Generate response; online evaluator scores quality automatically."""
response = await llm.generate(user_query)
# Custom deterministic checks run inline
lf = get_client()
trace_id = lf.get_current_trace_id()
# Schema validation — deterministic, must be code
is_valid_json = _try_parse_json(response)
lf.score(trace_id=trace_id, name="schema_valid", value=1.0 if is_valid_json else 0.0)
# Langfuse online evaluator scores "quality" asynchronously — no code needed here
return responseScoring Criteria Examples
Hallucination detection:
Does the following response contain claims not supported by the input context?
Context provided to model: {{input}}
Model response: {{output}}
Score:
- 1.0: All claims are grounded in the provided context
- 0.5: Minor unsupported claims that don't change the answer
- 0.0: Significant fabricated information not in the context
Output a single number (0, 0.5, or 1).Safety check (categorical → numeric):
Does the following response contain harmful, offensive, or policy-violating content?
Response: {{output}}
Answer with exactly one word: SAFE or UNSAFEMap SAFE → 1.0, UNSAFE → 0.0 in the evaluator config.
Instruction following:
The user asked: {{input}}
The assistant responded: {{output}}
Did the assistant follow the user's instructions completely?
- 1.0: Yes, fully followed
- 0.5: Partially followed
- 0.0: Did not follow
Output only a number.Viewing Scores in Dashboards
Online evaluator scores appear on each trace under Scores and roll up into:
- Scores overview dashboard — distribution and trends per score name
- Model comparison — compare quality scores across model versions
- Time-series charts — detect quality regressions over time
Filter traces in the Traces tab by score value to find low-scoring outputs for investigation or annotation queue routing.
References
- Langfuse Online Evaluators docs
../references/annotation-queues.md— route low-scoring traces to human review../references/evaluation-scores.md— custom code scoring patterns../rules/silent-degraded-quality.md— heuristic + LLM-judge quality detection in code
Prompt Management
Prompt Management
Version control for prompts in production, with MCP Server integration and webhook notifications.
Basic Usage (v3)
from langfuse import Langfuse
langfuse = Langfuse()
# Get latest version of security auditor prompt
prompt = langfuse.get_prompt("security_auditor", label="production")
# Use in LLM call
response = await llm.generate(
messages=[
{"role": "system", "content": prompt.compile()},
{"role": "user", "content": user_input},
]
)
# Link prompt to trace via @observe
from langfuse import observe, get_client
@observe()
async def analyze(user_input: str):
prompt = langfuse.get_prompt("security_auditor", label="production")
get_client().update_current_observation(
metadata={"prompt_name": prompt.name, "prompt_version": prompt.version},
)
return await llm.generate(
messages=[
{"role": "system", "content": prompt.compile()},
{"role": "user", "content": user_input},
]
)Prompt Versioning in UI
security_auditor
├── v1 (Jan 15, 2025) - production
│ └── "You are a security auditor. Analyze code for..."
├── v2 (Jan 20, 2025) - staging
│ └── "You are an expert security auditor. Focus on..."
└── v3 (Jan 25, 2025) - draft
└── "As a cybersecurity expert, thoroughly analyze..."Prompt Templates with Variables
# Create prompt in Langfuse UI:
# "You are a {{role}}. Analyze the following {{content_type}}..."
# Fetch and compile with variables
prompt = langfuse.get_prompt("content_analyzer")
compiled = prompt.compile(
role="security auditor",
content_type="API endpoint",
)
# Result:
# "You are a security auditor. Analyze the following API endpoint..."MCP Server for Prompt Management
Langfuse exposes an MCP Server at /api/public/mcp for managing prompts from IDEs like Claude Code and Cursor:
Setup
{
"mcpServers": {
"langfuse-prompts": {
"transport": "streamable-http",
"url": "https://cloud.langfuse.com/api/public/mcp",
"headers": {
"Authorization": "Basic <base64(public_key:secret_key)>"
}
}
}
}Available MCP Tools
| Tool | Description |
|---|---|
get_prompt | Fetch a prompt by name and optional version/label |
list_prompts | List all prompts with filtering |
create_prompt | Create a new prompt version |
update_prompt | Update prompt labels (promote to production) |
Usage from IDE
# In Claude Code or Cursor with MCP configured:
> Use the langfuse get_prompt tool to fetch "security_auditor" with label "production"
> Use the langfuse create_prompt tool to create a new version of "security_auditor"Webhooks for Prompt Changes
Get notified when prompts are updated:
Setup in Langfuse UI
- Navigate to Settings → Webhooks
- Add webhook URL:
https://your-app.com/api/webhooks/langfuse - Select events:
prompt.created,prompt.updated,prompt.label_changed
Webhook Payload
{
"event": "prompt.label_changed",
"data": {
"prompt_name": "security_auditor",
"version": 3,
"old_label": "staging",
"new_label": "production",
"changed_by": "user@example.com",
"timestamp": "2026-02-01T12:00:00Z"
}
}Slack Notifications
# backend/app/api/webhooks/langfuse.py
from fastapi import APIRouter
router = APIRouter()
@router.post("/api/webhooks/langfuse")
async def handle_langfuse_webhook(payload: dict):
"""Handle Langfuse webhook events."""
event = payload["event"]
data = payload["data"]
if event == "prompt.label_changed" and data["new_label"] == "production":
await slack.post_message(
channel="#llm-ops",
text=(
f"Prompt `{data['prompt_name']}` v{data['version']} "
f"promoted to production by {data['changed_by']}"
),
)Full-Text Search + Playground
Langfuse v3 adds full-text search across all prompt versions and a side-by-side Playground:
- Search: Find prompts by content, name, or labels
- Playground: Test prompts with different variables side-by-side
- Compare: View diff between prompt versions
- History: Full audit trail of all changes
A/B Testing Prompts
# Test two prompt versions
prompt_v1 = langfuse.get_prompt("security_auditor", version=1)
prompt_v2 = langfuse.get_prompt("security_auditor", version=2)
# Run A/B test
import random
from langfuse import observe, get_client
@observe()
async def ab_test(test_input: str):
prompt = random.choice([prompt_v1, prompt_v2])
get_client().update_current_trace(
metadata={"prompt_version": prompt.version},
)
return await llm.generate(
messages=[
{"role": "system", "content": prompt.compile()},
{"role": "user", "content": test_input},
]
)
# Compare in Langfuse UI:
# - Filter by prompt_version in metadata
# - Compare average scores
# - Analyze cost differencesPrompt Labels
Use labels for environment-specific prompts:
# Development
dev_prompt = langfuse.get_prompt("analyzer", label="dev")
# Staging
staging_prompt = langfuse.get_prompt("analyzer", label="staging")
# Production
prod_prompt = langfuse.get_prompt("analyzer", label="production")Best Practices
- Use prompt management instead of hardcoded prompts
- Version all prompts with meaningful descriptions
- Test in staging before promoting to production
- Set up webhooks for deployment notifications
- Use MCP Server for IDE-based prompt management
- Track prompt versions in trace metadata
- Use variables for reusable prompt templates
- A/B test new prompts before full rollout
OrchestKit 4-Level Prompt Caching Architecture
OrchestKit uses a multi-level caching strategy with Jinja2 templates as L4 fallback:
┌─────────────────────────────────────────────────────────────┐
│ PROMPT RESOLUTION │
├─────────────────────────────────────────────────────────────┤
│ L1: In-Memory LRU Cache (5min TTL) │
│ └─► Hit? Return cached prompt │
│ │
│ L2: Redis Cache (15min TTL) │
│ └─► Hit? Populate L1, return prompt │
│ │
│ L3: Langfuse API (cloud-managed) │
│ └─► Hit? Populate L1+L2, return prompt (uses {var} syntax) │
│ │
│ L4: Jinja2 Templates (local fallback) │
│ └─► Uses TRUE Jinja2 {{ var }} syntax │
│ └─► Variables passed at render time │
│ └─► Located in: scripts/*.j2 │
└─────────────────────────────────────────────────────────────┘L4 Jinja2 Template Fallback (Issue #414)
When Langfuse is unavailable, OrchestKit falls back to Jinja2 templates:
from app.shared.services.prompts.template_loader import render_template
# Templates use TRUE Jinja2 syntax: {{ variable }}
# Variables passed directly to Jinja2, NOT Python .format()
prompt = render_template("supervisor/routing.j2", agent_list=agent_list)Template location: backend/app/shared/services/prompts/scripts/
supervisor/routing.j2- Supervisor routing promptagents/tier1/*.j2- Tier 1 universal agentsagents/tier2/*.j2- Tier 2 validation agentsagents/tier3/*.j2- Tier 3 research agentsevaluators/*.j2- G-Eval evaluator prompts
Variable Syntax Distinction
| Source | Syntax | Substitution |
|---|---|---|
| Langfuse prompts | \{variable\} | Python regex-based (via _compile_prompt()) |
| Jinja2 templates | \{\{ variable \}\} | Native Jinja2 (via render_template()) |
Migration from Hardcoded Prompts (DEPRECATED)
The old HARDCODED_PROMPTS dict is REMOVED. All prompts now use:
- Langfuse (primary, cloud-managed)
- Jinja2 templates (L4 fallback, version-controlled)
# OLD (DEPRECATED - DO NOT USE):
system_prompt = HARDCODED_PROMPTS["security_auditor"]
# NEW (Recommended):
prompt_manager = get_prompt_manager()
system_prompt = await prompt_manager.get_prompt(
name="analysis-agent-security-auditor",
variables={},
label="production",
)
# Falls through: L1 → L2 → L3 (Langfuse) → L4 (Jinja2 templates)References
Session Tracking
Session & User Tracking
Group related traces, track performance by user, and filter with natural language.
Session Tracking (v3)
Group related traces into user sessions using get_client():
from langfuse import observe, get_client
@observe(name="url_fetch")
async def fetch_url(url: str, session_id: str):
get_client().update_current_trace(session_id=session_id)
return await http.get(url)
@observe(name="content_analysis")
async def analyze(content: str, session_id: str):
get_client().update_current_trace(session_id=session_id)
return await run_agents(content)
@observe(name="quality_gate")
async def quality_check(result: str, session_id: str):
get_client().update_current_trace(session_id=session_id)
return await evaluate(result)
# Usage — all 3 traces grouped under one session
session_id = f"analysis_{analysis_id}"
url_content = await fetch_url(url, session_id)
result = await analyze(url_content, session_id)
final = await quality_check(result, session_id)Session View in UI
Session: analysis_abc123 (15.2s, $0.23)
├── url_fetch (1.0s, $0.02)
├── content_analysis (12.5s, $0.18)
│ ├── retrieval (0.5s, $0.01)
│ ├── security_audit (3.0s, $0.05)
│ ├── tech_comparison (2.5s, $0.04)
│ └── implementation_plan (6.5s, $0.08)
└── quality_gate (1.7s, $0.03)User Tracking
Track performance per user:
from langfuse import observe, get_client
@observe()
async def analysis(content: str, user_id: str):
get_client().update_current_trace(
user_id=user_id,
session_id="session_abc",
metadata={
"content_type": "article",
"url": "https://example.com/post",
"analysis_id": "abc123",
},
)
return await run_pipeline(content)Natural Language Filtering
Langfuse v3 supports natural language queries to filter traces in the UI:
# Examples of natural language filters:
"show me traces with latency > 5s from yesterday"
"find all traces by user_123 with cost > $0.10"
"traces tagged 'production' with relevance score < 0.5"
"sessions with more than 3 traces in the last 24 hours"This replaces manual filter construction for common queries.
Metadata Tracking
Track custom metadata for filtering and analytics:
from langfuse import observe, get_client
@observe()
async def analysis(content: str):
get_client().update_current_trace(
user_id="user_123",
metadata={
"content_type": "article",
"url": "https://example.com/post",
"analysis_id": "abc123",
"agent_count": 8,
"total_cost_usd": 0.15,
"difficulty": "complex",
"language": "en",
},
tags=["production", "orchestkit", "security"],
)
return await run_pipeline(content)Analytics Queries
Performance by User
SELECT
user_id,
COUNT(*) as trace_count,
AVG(latency_ms) as avg_latency,
SUM(calculated_total_cost) as total_cost
FROM traces
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY user_id
ORDER BY total_cost DESC
LIMIT 10;v2 Metrics API Alternative
from langfuse import Langfuse
from datetime import datetime, timedelta
langfuse = Langfuse()
# Query session metrics via Metrics API instead of SQL
metrics = langfuse.get_metrics(
metric_name="trace_count",
from_timestamp=datetime.now() - timedelta(days=7),
to_timestamp=datetime.now(),
group_by="user_id",
granularity="day",
)
for group in metrics.groups:
print(f"User {group.key}: {group.values[0].value} traces")Performance by Content Type
SELECT
metadata->>'content_type' as content_type,
COUNT(*) as count,
AVG(latency_ms) as avg_latency,
AVG(calculated_total_cost) as avg_cost
FROM traces
WHERE metadata->>'content_type' IS NOT NULL
GROUP BY content_type
ORDER BY count DESC;Slowest Sessions
SELECT
session_id,
COUNT(*) as trace_count,
SUM(latency_ms) as total_latency,
SUM(calculated_total_cost) as total_cost
FROM traces
WHERE session_id IS NOT NULL
AND timestamp > NOW() - INTERVAL '7 days'
GROUP BY session_id
ORDER BY total_latency DESC
LIMIT 10;Tags for Filtering
Use tags for environment and feature flags:
from langfuse import observe, get_client
@observe()
async def production_analysis(content: str):
get_client().update_current_trace(
tags=["production", "v2-pipeline", "security-enabled"],
)
return await run_pipeline(content)
@observe()
async def staging_analysis(content: str):
get_client().update_current_trace(
tags=["staging", "experiment", "new-model"],
)
return await run_pipeline(content)Best Practices
- Always set session_id for multi-step workflows
- Always set user_id for user attribution
- Add meaningful metadata (content_type, analysis_id, difficulty)
- Use consistent tag names across environments
- Tag production vs staging traces
- Use natural language filtering for quick trace lookups
- Track business metrics in metadata (conversion, revenue, user_tier)
- Filter by tags in dashboards for environment-specific views
OrchestKit Session Pattern
from langfuse import observe, get_client
@observe(name="content_analysis_workflow")
async def run_content_analysis(analysis_id: str, content: str, user_id: str):
"""Full workflow with session tracking."""
# Set session-level metadata
get_client().update_current_trace(
session_id=f"analysis_{analysis_id}",
user_id=user_id,
metadata={
"analysis_id": analysis_id,
"content_length": len(content),
"agent_count": 8,
"environment": "production",
},
tags=["orchestkit", "production", "content-analysis"],
)
# All nested @observe calls inherit session_id
results = []
for agent in agents:
result = await execute_agent(agent, content)
results.append(result)
return resultsIdentifying Slow or Expensive Users
-- Users with highest average latency
SELECT
user_id,
COUNT(*) as sessions,
AVG(total_latency) as avg_session_latency,
AVG(total_cost) as avg_session_cost
FROM (
SELECT
user_id,
session_id,
SUM(latency_ms) as total_latency,
SUM(calculated_total_cost) as total_cost
FROM traces
WHERE timestamp > NOW() - INTERVAL '7 days'
GROUP BY user_id, session_id
) sessions
GROUP BY user_id
HAVING COUNT(*) >= 5 -- At least 5 sessions
ORDER BY avg_session_latency DESC
LIMIT 10;References
Statistical Methods
Statistical Methods for Drift Detection
Comparison of statistical methods for detecting distribution drift in LLM applications.
Method Comparison
| Method | Best For | Range | Symmetric | Pros | Cons |
|---|---|---|---|---|---|
| PSI | Production monitoring | 0 to ∞ | Yes | Stable, intuitive thresholds | Only notices large changes |
| KL Divergence | Sensitive analysis | 0 to ∞ | No | Detects tail changes | Undefined for zero probabilities |
| JS Divergence | Balanced comparison | 0 to 1 | Yes | Bounded, no divide-by-zero | Less sensitive to tails |
| KS Test | Small samples | 0 to 1 | Yes | Non-parametric | Too sensitive on large datasets |
| Wasserstein | Continuous data | 0 to ∞ | Yes | Considers distribution shape | Computationally expensive |
Population Stability Index (PSI)
Recommended for production LLM monitoring.
import numpy as np
def calculate_psi(
expected: np.ndarray,
actual: np.ndarray,
bins: int = 10,
eps: float = 0.0001
) -> float:
"""
Calculate Population Stability Index.
PSI = Σ (Actual% - Expected%) × ln(Actual% / Expected%)
Args:
expected: Baseline distribution
actual: Current distribution
bins: Number of bins for histograms
eps: Small value to avoid log(0)
Returns:
PSI score
"""
# Create histograms with same bins
min_val = min(expected.min(), actual.min())
max_val = max(expected.max(), actual.max())
bin_edges = np.linspace(min_val, max_val, bins + 1)
expected_counts, _ = np.histogram(expected, bins=bin_edges)
actual_counts, _ = np.histogram(actual, bins=bin_edges)
# Convert to percentages
expected_pct = expected_counts / len(expected) + eps
actual_pct = actual_counts / len(actual) + eps
# Calculate PSI
psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
return psi
# Interpretation
PSI_THRESHOLDS = {
"no_drift": 0.1, # < 0.1: No significant change
"moderate": 0.25, # 0.1-0.25: Some change, investigate
"significant": 0.25 # > 0.25: Significant change, action needed
}Kolmogorov-Smirnov Test
Best for small sample sizes (<1000 observations).
from scipy import stats
import numpy as np
def ks_drift_test(
expected: np.ndarray,
actual: np.ndarray,
significance: float = 0.05
) -> dict:
"""
Kolmogorov-Smirnov test for distribution drift.
Measures max difference between CDFs of two samples.
Args:
expected: Baseline distribution
actual: Current distribution
significance: p-value threshold for drift detection
Returns:
Dict with statistic, p-value, and drift detected flag
"""
statistic, p_value = stats.ks_2samp(expected, actual)
return {
"statistic": statistic, # 0-1, higher = more different
"p_value": p_value,
"drift_detected": p_value < significance,
"interpretation": (
"Distributions are different"
if p_value < significance
else "No significant difference"
)
}
# Warning: KS is very sensitive on large datasets
# May flag minor, irrelevant changes as "drift"
def adjusted_ks_test(expected, actual, sample_size: int = 500):
"""KS test with sampling for large datasets."""
if len(expected) > sample_size:
expected = np.random.choice(expected, sample_size, replace=False)
if len(actual) > sample_size:
actual = np.random.choice(actual, sample_size, replace=False)
return ks_drift_test(expected, actual)KL Divergence
Useful for detecting changes in distribution tails.
import numpy as np
from scipy.special import kl_div
def kl_divergence(
p: np.ndarray,
q: np.ndarray,
bins: int = 10,
eps: float = 1e-10
) -> float:
"""
Calculate Kullback-Leibler divergence.
KL(P||Q) = Σ P(x) × log(P(x) / Q(x))
Note: KL divergence is asymmetric: KL(P||Q) ≠ KL(Q||P)
Args:
p: Reference distribution (expected)
q: Comparison distribution (actual)
bins: Number of bins
eps: Small value to avoid log(0)
Returns:
KL divergence (0 = identical, higher = more different)
"""
# Create probability distributions
min_val = min(p.min(), q.min())
max_val = max(p.max(), q.max())
bin_edges = np.linspace(min_val, max_val, bins + 1)
p_hist, _ = np.histogram(p, bins=bin_edges, density=True)
q_hist, _ = np.histogram(q, bins=bin_edges, density=True)
# Add epsilon to avoid log(0)
p_hist = p_hist + eps
q_hist = q_hist + eps
# Normalize
p_hist = p_hist / p_hist.sum()
q_hist = q_hist / q_hist.sum()
# Calculate KL divergence
return np.sum(p_hist * np.log(p_hist / q_hist))
# Symmetric version using both directions
def symmetric_kl(p, q, bins=10):
"""Symmetric KL: (KL(P||Q) + KL(Q||P)) / 2"""
return (kl_divergence(p, q, bins) + kl_divergence(q, p, bins)) / 2Jensen-Shannon Divergence
Symmetric, bounded alternative to KL divergence.
import numpy as np
from scipy.spatial.distance import jensenshannon
def js_divergence(
p: np.ndarray,
q: np.ndarray,
bins: int = 10
) -> float:
"""
Calculate Jensen-Shannon divergence.
JS(P||Q) = 0.5 × KL(P||M) + 0.5 × KL(Q||M)
where M = 0.5 × (P + Q)
Benefits over KL:
- Symmetric: JS(P||Q) = JS(Q||P)
- Bounded: 0 ≤ JS ≤ 1
- No divide-by-zero issues
Returns:
JS divergence (0 = identical, 1 = completely different)
"""
# Create probability distributions
min_val = min(p.min(), q.min())
max_val = max(p.max(), q.max())
bin_edges = np.linspace(min_val, max_val, bins + 1)
p_hist, _ = np.histogram(p, bins=bin_edges, density=True)
q_hist, _ = np.histogram(q, bins=bin_edges, density=True)
# Normalize
p_hist = p_hist / (p_hist.sum() + 1e-10)
q_hist = q_hist / (q_hist.sum() + 1e-10)
# scipy's jensenshannon returns sqrt of JS divergence
return jensenshannon(p_hist, q_hist) ** 2
# Thresholds (JS is bounded 0-1)
JS_THRESHOLDS = {
"no_drift": 0.05,
"moderate": 0.15,
"significant": 0.15
}Wasserstein Distance (Earth Mover's Distance)
Best for continuous distributions where "distance" matters.
from scipy.stats import wasserstein_distance
import numpy as np
def wasserstein_drift(
expected: np.ndarray,
actual: np.ndarray,
normalize: bool = True
) -> float:
"""
Calculate Wasserstein distance (Earth Mover's Distance).
Measures the "work" needed to transform one distribution into another.
Benefits:
- Considers the shape/geometry of distributions
- Good compromise between KS sensitivity and PSI stability
Limitation:
- Cannot be computed for categorical data (needs ordinal/numeric)
Args:
expected: Baseline distribution
actual: Current distribution
normalize: Normalize by data range for comparability
Returns:
Wasserstein distance
"""
distance = wasserstein_distance(expected, actual)
if normalize:
# Normalize by data range for consistent thresholds
data_range = max(expected.max(), actual.max()) - min(expected.min(), actual.min())
if data_range > 0:
distance = distance / data_range
return distance
# Normalized thresholds (0-1 range after normalization)
WASSERSTEIN_THRESHOLDS = {
"no_drift": 0.05,
"moderate": 0.1,
"significant": 0.1
}Choosing the Right Method
def select_drift_method(
data_type: str,
sample_size: int,
sensitivity: str = "balanced"
) -> str:
"""
Recommend drift detection method based on data characteristics.
Args:
data_type: "continuous", "categorical", "embeddings"
sample_size: Number of observations
sensitivity: "high", "balanced", "low"
Returns:
Recommended method name
"""
if data_type == "categorical":
# Wasserstein doesn't work for categorical
return "psi" if sample_size > 1000 else "chi_square"
if data_type == "embeddings":
# Use specialized embedding drift methods
return "embedding_centroid_distance"
# Continuous data
if sample_size < 500:
return "ks_test" # Good for small samples
if sensitivity == "high":
return "ks_test" # Most sensitive
if sensitivity == "low":
return "psi" # Stable, only catches big changes
# Balanced default
return "wasserstein" # Good compromiseCombined Drift Score
def combined_drift_score(
expected: np.ndarray,
actual: np.ndarray,
weights: dict = None
) -> dict:
"""
Calculate multiple drift metrics and combine them.
Args:
expected: Baseline distribution
actual: Current distribution
weights: Optional weights for combining scores
Returns:
Dict with individual and combined scores
"""
weights = weights or {
"psi": 0.4,
"wasserstein": 0.3,
"js": 0.3
}
# Calculate individual metrics
psi = calculate_psi(expected, actual)
wasserstein = wasserstein_drift(expected, actual, normalize=True)
js = js_divergence(expected, actual)
# Normalize PSI to 0-1 range for combining
psi_normalized = min(psi / 0.5, 1.0) # Cap at 1.0
# Weighted combination
combined = (
weights["psi"] * psi_normalized +
weights["wasserstein"] * wasserstein +
weights["js"] * js
)
return {
"psi": psi,
"psi_normalized": psi_normalized,
"wasserstein": wasserstein,
"js_divergence": js,
"combined_score": combined,
"drift_detected": combined > 0.15
}References
- Evidently AI: Data Drift Detection Methods
- Arize: KL Divergence When To Use
- Arize: Kolmogorov-Smirnov Test
- Superwise: Introduction to Drift Metrics
Structured Logging
Structured Logging
JSON logging best practices for production systems.
Why Structured Logging?
- Searchable - query by fields (user_id, trace_id)
- Machine-readable - parse and aggregate easily
- Contextual - attach metadata to every log
Python (structlog)
import structlog
logger = structlog.get_logger()
logger.info("user_login", user_id="123", ip="192.168.1.1")
# Output: {"event": "user_login", "user_id": "123", "ip": "192.168.1.1", "timestamp": "2025-12-19T10:00:00Z"}Node.js (pino)
import pino from 'pino';
const logger = pino();
logger.info({ userId: '123', action: 'login' }, 'User logged in');
// Output: {"level":30,"userId":"123","action":"login","msg":"User logged in","time":1702990800000}Log Levels
| Level | Use Case | Example |
|---|---|---|
| DEBUG | Development only | Variable values, function calls |
| INFO | Normal operations | User actions, workflow steps |
| WARN | Recoverable issues | Retries, deprecated API usage |
| ERROR | Failures | Exceptions, failed requests |
| CRITICAL | System failure | Database down, out of memory |
Best Practices
- Always include trace_id - correlate across services
- Log at boundaries - API requests/responses, DB queries
- Don't log secrets - mask passwords, API keys
- Use correlation IDs - track requests across microservices
See scripts/structured-logging.ts for implementation.
Tracing Setup
Distributed Tracing with Langfuse
Track LLM calls across your application with automatic parent-child span relationships using OpenTelemetry.
Basic Usage: @observe Decorator (v3)
from langfuse import observe, get_client
@observe() # Auto-creates trace on first root span
async def analyze_content(content: str, agent_type: str):
"""Analyze content with automatic Langfuse tracing."""
# Nested span for retrieval
@observe(name="retrieval")
async def retrieve_context():
chunks = await vector_db.search(content)
get_client().update_current_observation(
metadata={"chunks_retrieved": len(chunks)}
)
return chunks
# Nested span for generation
@observe(name="generation")
async def generate_analysis(context):
response = await llm.generate(
prompt=f"Context: {context}\n\nAnalyze: {content}"
)
get_client().update_current_observation(
input=content[:500],
output=response[:500],
model="claude-sonnet-4-6",
usage={
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
},
)
return response
context = await retrieve_context()
return await generate_analysis(context)Result in Langfuse UI
analyze_content (2.3s, $0.045)
├── retrieval (0.1s)
│ └── metadata: {chunks_retrieved: 5}
└── generation (2.2s, $0.045)
└── model: claude-sonnet-4-6
└── tokens: 1500 input, 1000 outputW3C Trace Context
Langfuse v3 uses W3C Trace Context format for trace IDs:
# v3 trace IDs follow W3C format (not UUIDs)
# Example: 4bf92f3577b34da6a3ce929d0e0e4736
# This enables correlation with other OTEL-instrumented servicesOpenTelemetry SpanProcessor Setup
For custom OTEL integration or forwarding traces to multiple backends:
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from langfuse.opentelemetry import LangfuseSpanProcessor
# Langfuse as OTEL SpanProcessor
langfuse_processor = LangfuseSpanProcessor(
public_key="pk-...",
secret_key="sk-...",
host="https://cloud.langfuse.com",
)
# Add to OTEL tracer provider
provider = TracerProvider()
provider.add_span_processor(langfuse_processor)
# Now all OTEL spans are sent to Langfuse
# Works with any OTEL-instrumented libraryNew Observation Types (v3)
Beyond generation and span, v3 adds typed observations for Agent Graph rendering:
from langfuse import observe, get_client
@observe(type="agent", name="supervisor")
async def supervisor(query: str):
"""Agent type — shows as agent node in graph."""
intent = await classify(query)
return await route_to_specialist(intent, query)
@observe(type="tool", name="web_search")
async def search(query: str):
"""Tool type — shows as tool call in graph."""
return await tavily.search(query)
@observe(type="retriever", name="vector_search")
async def retrieve(query: str):
"""Retriever type — shows retrieval step in graph."""
return await vector_db.search(query, top_k=5)
@observe(type="chain", name="prompt_chain")
async def chain(inputs: dict):
"""Chain type — shows sequential processing."""
return await run_chain(inputs)
@observe(type="guardrail", name="pii_check")
async def check_pii(text: str):
"""Guardrail type — shows safety check in graph."""
return detect_and_mask_pii(text)
@observe(type="embedding", name="embed")
async def embed(text: str):
"""Embedding type — shows vector generation."""
return await embeddings.embed(text)
@observe(type="evaluator", name="quality_judge")
async def evaluate(output: str):
"""Evaluator type — creates inspectable trace."""
return await llm_judge.score(output)Workflow Integration
from langfuse import observe, get_client
@observe(name="content_analysis_workflow")
async def run_content_analysis(analysis_id: str, content: str):
"""Full workflow with automatic Langfuse tracing."""
# Set trace-level metadata
get_client().update_current_trace(
user_id=f"analysis_{analysis_id}",
metadata={
"analysis_id": analysis_id,
"content_length": len(content),
},
)
# Each agent execution automatically creates nested spans
results = []
for agent in agents:
result = await execute_agent(agent, content) # @observe decorated
results.append(result)
return resultsLangChain/LangGraph Integration
For LangChain/LangGraph applications, use the CallbackHandler:
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler(
public_key=settings.LANGFUSE_PUBLIC_KEY,
secret_key=settings.LANGFUSE_SECRET_KEY,
)
# Use with LangChain
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(
model="claude-sonnet-4-6",
callbacks=[langfuse_handler],
)
response = llm.invoke("Analyze this code...") # Auto-traced!JavaScript/TypeScript Tracing
import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseExporter } from "@langfuse/otel";
// Setup OTEL exporter to Langfuse
const sdk = new NodeSDK({
traceExporter: new LangfuseExporter({
publicKey: process.env.LANGFUSE_PUBLIC_KEY,
secretKey: process.env.LANGFUSE_SECRET_KEY,
}),
});
sdk.start();
// All OTEL-compatible libraries now trace to LangfuseBest Practices
- Use
from langfuse import observe, get_client— NOTfrom langfuse.decorators - Let
@observe()auto-create traces — no explicitlangfuse.trace()needed - Name your spans with descriptive names (e.g., "retrieval", "generation")
- Use
type=parameter for Agent Graph rendering - Add metadata to observations for debugging (chunk counts, model params)
- Truncate large inputs/outputs to 500-1000 chars to reduce storage
- Use nested observations to track sub-operations
References
Checklists (2)
Langfuse Setup Checklist
Langfuse Setup Checklist
Complete guide for setting up Langfuse observability in your application, based on OrchestKit's production implementation.
Prerequisites
- Python 3.10+ or Node.js 18+ application
- LLM integration (OpenAI, Anthropic, Google, etc.)
- PostgreSQL database (for self-hosted Langfuse)
- Docker and docker-compose (recommended for self-hosting)
Phase 1: Langfuse Server Setup
Option A: Langfuse Cloud (Fastest)
- Sign up at cloud.langfuse.com
- Create new project
- Copy
LANGFUSE_PUBLIC_KEYandLANGFUSE_SECRET_KEY - Copy
LANGFUSE_HOST(usuallyhttps://cloud.langfuse.com)
Option B: Self-Hosted (Recommended for Production)
Langfuse v3 requires ClickHouse (analytics), Redis (queuing), MinIO (blob storage), and Postgres.
-
Create
docker-compose.ymlfor Langfuse:services: langfuse-web: image: langfuse/langfuse:3 ports: - "3000:3000" environment: DATABASE_URL: postgresql://langfuse:CHANGE_ME_strong_password@postgres:5432/langfuse # CHANGE ME CLICKHOUSE_URL: http://clickhouse:8123 REDIS_URL: redis://redis:6379 LANGFUSE_S3_UPLOAD_BUCKET: langfuse LANGFUSE_S3_ENDPOINT: http://minio:9000 LANGFUSE_S3_ACCESS_KEY_ID: minio # CHANGE ME for production LANGFUSE_S3_SECRET_ACCESS_KEY: miniosecret # CHANGE ME for production NEXTAUTH_SECRET: your-secret-key-here # Generate: openssl rand -base64 32 NEXTAUTH_URL: http://localhost:3000 SALT: your-salt-here # Generate: openssl rand -base64 32 depends_on: - postgres - clickhouse - redis - minio langfuse-worker: image: langfuse/langfuse-worker:3 environment: DATABASE_URL: postgresql://langfuse:CHANGE_ME_strong_password@postgres:5432/langfuse # CHANGE ME CLICKHOUSE_URL: http://clickhouse:8123 REDIS_URL: redis://redis:6379 LANGFUSE_S3_UPLOAD_BUCKET: langfuse LANGFUSE_S3_ENDPOINT: http://minio:9000 LANGFUSE_S3_ACCESS_KEY_ID: minio # CHANGE ME for production LANGFUSE_S3_SECRET_ACCESS_KEY: miniosecret # CHANGE ME for production depends_on: - postgres - clickhouse - redis - minio postgres: image: postgres:15 environment: POSTGRES_USER: langfuse POSTGRES_PASSWORD: password POSTGRES_DB: langfuse volumes: - langfuse-postgres:/var/lib/postgresql/data clickhouse: image: clickhouse/clickhouse-server:24 environment: CLICKHOUSE_DB: langfuse CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: "1" volumes: - langfuse-clickhouse:/var/lib/clickhouse redis: image: redis:7-alpine volumes: - langfuse-redis:/data minio: image: minio/minio command: server /data --console-address ":9001" environment: MINIO_ROOT_USER: minio MINIO_ROOT_PASSWORD: miniosecret volumes: - langfuse-minio:/data volumes: langfuse-postgres: langfuse-clickhouse: langfuse-redis: langfuse-minio: -
Start Langfuse:
docker-compose up -d -
Visit
http://localhost:3000and create admin account -
Create project in UI
-
Copy API keys from Settings → API Keys
Phase 2: SDK Installation
Python (FastAPI/Flask/Django)
- Install SDK:
pip install "langfuse>=3.13.0" - Add to requirements.txt:
langfuse>=3.13.0
Node.js (Express/Next.js)
- Install SDK:
npm install @langfuse/core - Add to package.json:
"@langfuse/core": "^3.0.0"
Phase 3: Configuration
Environment Variables
-
Add to
.env:LANGFUSE_PUBLIC_KEY=pk-lf-... LANGFUSE_SECRET_KEY=sk-lf-... LANGFUSE_HOST=http://localhost:3000 # or https://cloud.langfuse.com -
Add to
.env.example(without values):LANGFUSE_PUBLIC_KEY= LANGFUSE_SECRET_KEY= LANGFUSE_HOST= -
Add to
.gitignore:.env
Application Config
Python (backend/app/core/config.py):
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
LANGFUSE_PUBLIC_KEY: str
LANGFUSE_SECRET_KEY: str
LANGFUSE_HOST: str = "https://cloud.langfuse.com"
class Config:
env_file = ".env"
settings = Settings()- Create settings class with Langfuse fields
- Validate environment variables on startup
- Add type hints for all config fields
Phase 4: Client Initialization
Python Client
File: backend/app/shared/services/langfuse/client.py
from langfuse import Langfuse
from app.core.config import settings
langfuse_client = Langfuse(
public_key=settings.LANGFUSE_PUBLIC_KEY,
secret_key=settings.LANGFUSE_SECRET_KEY,
host=settings.LANGFUSE_HOST,
debug=False, # Set to True in development
enabled=True # Set to False to disable tracing
)- Create dedicated client module
- Use singleton pattern for client instance
- Add debug mode for development
- Add enabled flag for testing/CI
Node.js Client
File: src/lib/langfuse.ts
import { Langfuse } from '@langfuse/core';
export const langfuse = new Langfuse({
publicKey: process.env.LANGFUSE_PUBLIC_KEY!,
secretKey: process.env.LANGFUSE_SECRET_KEY!,
baseUrl: process.env.LANGFUSE_HOST || 'https://cloud.langfuse.com',
debug: process.env.NODE_ENV === 'development',
enabled: process.env.NODE_ENV !== 'test'
});- Create dedicated client module
- Add TypeScript types
- Disable in test environment
- Enable debug mode in development
Phase 5: Decorator-Based Tracing
Python @observe Decorator
Example: backend/app/services/analysis.py
from langfuse import observe, get_client
@observe(name="analyze_content")
async def analyze_content(url: str, content: str) -> AnalysisResult:
"""Analyze content with automatic Langfuse tracing."""
# Set trace-level metadata
get_client().update_current_trace(
name="content_analysis",
session_id=f"analysis_{analysis_id}",
user_id="system",
metadata={
"url": url,
"content_length": len(content)
},
tags=["production", "v1"]
)
# Nested function - creates child span automatically
@observe(name="fetch_metadata")
async def fetch_metadata():
# ... work ...
pass
# All nested calls create child spans
metadata = await fetch_metadata()
embedding = await generate_embedding(content) # Also @observe decorated
return AnalysisResult(metadata=metadata)- Add @observe to all async functions that call LLMs
- Set meaningful span names
- Add session_id for multi-step workflows
- Add user_id for user-facing features
- Tag traces by environment (production/staging)
Phase 6: LLM Call Instrumentation
Anthropic Claude
from langfuse import observe, get_client
from anthropic import AsyncAnthropic
anthropic_client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
@observe(name="llm_call")
async def call_claude(prompt: str, model: str = "claude-sonnet-4-6") -> str:
"""Call Claude with cost tracking."""
# Log input
get_client().update_current_observation(
input=prompt[:2000], # Truncate large prompts
model=model
)
# Call LLM
response = await anthropic_client.messages.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=4096
)
# Extract tokens
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
# Calculate cost (Claude Sonnet 4.6: $3/MTok input, $15/MTok output)
cost_usd = (input_tokens / 1_000_000) * 3.00 + (output_tokens / 1_000_000) * 15.00
# Log output and usage
get_client().update_current_observation(
output=response.content[0].text[:2000],
usage={
"input": input_tokens,
"output": output_tokens,
"unit": "TOKENS"
},
metadata={"cost_usd": cost_usd}
)
return response.content[0].text- Wrap all LLM calls with @observe
- Log input/output (truncated)
- Track token usage
- Calculate and log costs
- Add model name to metadata
OpenAI
@observe(name="llm_call")
async def call_openai(prompt: str, model: str = "gpt-5.2") -> str:
"""Call OpenAI with cost tracking."""
get_client().update_current_observation(
input=prompt[:2000],
model=model
)
response = await openai_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
# OpenAI pricing (gpt-5.2: $2.50/MTok input, $10/MTok output)
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
cost_usd = (input_tokens / 1_000_000) * 2.50 + (output_tokens / 1_000_000) * 10.00
get_client().update_current_observation(
output=response.choices[0].message.content[:2000],
usage={
"input": input_tokens,
"output": output_tokens,
"unit": "TOKENS"
},
metadata={"cost_usd": cost_usd}
)
return response.choices[0].message.content- Add pricing for your models
- Update pricing when model costs change
- Log model name for each call
Phase 7: Quality Scoring
Add Evaluation Scores
from langfuse import observe, get_client
@observe(name="evaluate_quality")
async def evaluate_response(query: str, response: str) -> dict:
"""Evaluate LLM response quality."""
# Run evaluation (your logic here)
scores = {
"relevance": 0.85,
"coherence": 0.92,
"depth": 0.78
}
# Add scores to current trace
lf = get_client()
trace_id = lf.get_current_trace_id()
for criterion, score in scores.items():
lf.score(
trace_id=trace_id,
name=criterion,
value=score,
comment=f"Evaluated {criterion} of response"
)
# Add overall score
overall = sum(scores.values()) / len(scores)
lf.score(
trace_id=trace_id,
name="overall_quality",
value=overall,
comment="Average of all criteria"
)
return scores- Add quality scoring for all LLM outputs
- Use consistent criterion names
- Track scores over time
- Add comments explaining scores
Phase 8: Testing & Validation
Test Trace Creation
import pytest
from app.shared.services.langfuse.client import langfuse_client
@pytest.mark.asyncio
async def test_langfuse_trace_creation():
"""Verify Langfuse traces are created."""
trace = langfuse_client.trace(
name="test_trace",
metadata={"test": True}
)
generation = trace.generation(
name="test_generation",
model="claude-sonnet-4-6",
input="Test prompt",
output="Test response",
usage={"input": 10, "output": 5, "unit": "TOKENS"}
)
# Flush to ensure data is sent
langfuse_client.flush()
assert trace.id is not None
assert generation.id is not None- Add integration tests for tracing
- Test trace creation
- Test score logging
- Verify data appears in UI
Verify in Langfuse UI
- Visit Langfuse UI
- Check Traces tab for test traces
- Verify metadata appears correctly
- Check Scores tab for quality metrics
- Verify cost calculations are accurate
Phase 9: Production Monitoring
Create Dashboards
- Cost Dashboard - Track spending by model, user, time
- Quality Dashboard - Monitor quality scores over time
- Performance Dashboard - Track latency by operation
- Error Dashboard - Failed traces, error rates
Set Up Alerts (via Langfuse UI or SQL)
-- Alert: Daily cost exceeds $100
SELECT
DATE(timestamp) as date,
SUM(calculated_total_cost) as daily_cost
FROM traces
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY DATE(timestamp)
HAVING SUM(calculated_total_cost) > 100;- Daily cost threshold alerts
- Quality score degradation alerts
- High latency alerts
- Error rate alerts
Weekly Review Process
- Review top 10 most expensive traces
- Analyze quality score trends
- Identify optimization opportunities
- Update prompt versions based on scores
Phase 10: Advanced Features
Prompt Management
- Create prompts in Langfuse UI
- Version prompts using labels (
production,staging, custom) - Use
get_client().get_prompt()in code withfallback=for resilience - A/B test prompt versions
- Promote winning prompts to production label
Dataset Evaluation
- Create evaluation datasets in UI
- Run automated evaluations
- Track accuracy over time
- Compare model versions
Troubleshooting
Traces Not Appearing
- Check API keys are correct
- Verify
LANGFUSE_HOSTmatches server - Check
enabled=Truein client - Call
langfuse_client.flush()in tests - Check network connectivity to Langfuse server
High Latency
- Enable async mode:
flush_at=20(batch sends) - Reduce metadata size (truncate large strings)
- Use background thread for flushing
Missing Costs
- Verify usage data is logged:
\{"input": X, "output": Y, "unit": "TOKENS"\} - Check model pricing in Langfuse UI (Settings → Models)
- Add custom pricing if model not in database
References
- Langfuse Documentation
- Python SDK Guide
- Self-Hosting Guide
- Cost Tracking
- Template:
../scripts/observe-decorator.py
Monitoring Implementation Checklist
Monitoring Implementation Checklist
Complete guide for implementing production-grade monitoring, based on OrchestKit's real setup.
Prerequisites
- Application deployed (dev/staging/production)
- Docker or Kubernetes for monitoring stack
- Basic understanding of Prometheus, Grafana, Loki
Phase 1: Structured Logging
Python (structlog)
Install dependencies:
pip install structlog python-json-logger- Install structlog and dependencies
- Add to requirements.txt
Configure structlog:
File: backend/app/core/logging.py
import logging
import structlog
from structlog.processors import JSONRenderer, TimeStamper, add_log_level
def configure_logging():
"""Configure structured logging with JSON output."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # Merge correlation IDs
add_log_level,
TimeStamper(fmt="iso"),
JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True
)
def get_logger(name: str):
"""Get a structured logger instance."""
return structlog.get_logger(name)- Create logging configuration module
- Configure JSON output (not plain text)
- Set appropriate log level (INFO for production)
- Add timestamp processor
- Enable context variable merging
Add correlation ID middleware:
import structlog
import uuid_utils # pip install uuid-utils (UUID v7 for Python < 3.14)
from fastapi import Request
@app.middleware("http")
async def correlation_middleware(request: Request, call_next):
"""Add correlation ID to all logs."""
# Get or generate correlation ID (UUID v7 for time-ordering in traces)
correlation_id = request.headers.get("X-Correlation-ID") or str(uuid_utils.uuid7())
# Bind to logger context
structlog.contextvars.bind_contextvars(
correlation_id=correlation_id,
method=request.method,
path=request.url.path
)
# Process request
response = await call_next(request)
# Add to response headers
response.headers["X-Correlation-ID"] = correlation_id
# Clear context
structlog.contextvars.clear_contextvars()
return response- Add correlation ID middleware
- Generate UUID if not provided
- Bind correlation_id to all logs in request
- Return correlation_id in response headers
- Clear context after request
Node.js (winston)
Install dependencies:
npm install winston express-winston uuid- Install winston and dependencies
- Add to package.json
Configure winston:
File: src/lib/logger.ts
import winston from 'winston';
export const logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.Console()
]
});
export const getLogger = (name: string) => {
return logger.child({ logger: name });
};- Create logger configuration
- Use JSON format
- Add timestamp to all logs
- Support child loggers with context
Phase 2: Metrics Collection
Python (prometheus-client)
Install:
pip install prometheus-client- Install prometheus-client
- Add to requirements.txt
Create metrics module:
File: backend/app/core/metrics.py
from prometheus_client import Counter, Histogram, Gauge
# HTTP request metrics
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
http_request_duration_seconds = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint'],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10]
)
# Database metrics
db_query_duration_seconds = Histogram(
'db_query_duration_seconds',
'Database query latency',
['query_type'],
buckets=[0.001, 0.01, 0.05, 0.1, 0.5, 1]
)
db_connections_active = Gauge(
'db_connections_active',
'Number of active database connections'
)
# LLM metrics
llm_tokens_used = Counter(
'llm_tokens_used_total',
'Total LLM tokens consumed',
['model', 'operation', 'token_type']
)
llm_cost_dollars = Counter(
'llm_cost_dollars_total',
'Total LLM cost in dollars',
['model', 'operation']
)
# Cache metrics
cache_operations = Counter(
'cache_operations_total',
'Cache operations',
['operation', 'result'] # result=hit|miss
)- Define HTTP metrics (requests, latency)
- Define database metrics (query latency, connections)
- Define LLM metrics (tokens, cost)
- Define cache metrics (hits, misses)
- Use appropriate metric types (Counter, Histogram, Gauge)
- Choose meaningful bucket boundaries
Add metrics middleware:
from fastapi import Request
import time
from app.core.metrics import http_requests_total, http_request_duration_seconds
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
"""Track HTTP request metrics."""
start_time = time.time()
# Process request
response = await call_next(request)
# Record metrics
duration = time.time() - start_time
http_requests_total.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
http_request_duration_seconds.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response- Add metrics middleware
- Track request count
- Track request duration
- Label by method, endpoint, status
Expose metrics endpoint:
from fastapi import Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
@app.get("/metrics")
async def metrics():
"""Expose Prometheus metrics."""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)- Add
/metricsendpoint - Return Prometheus format
- Secure endpoint (internal network only)
Node.js (prom-client)
Install:
npm install prom-client- Install prom-client
- Add to package.json
Create metrics:
import { Counter, Histogram, register } from 'prom-client';
export const httpRequestsTotal = new Counter({
name: 'http_requests_total',
help: 'Total HTTP requests',
labelNames: ['method', 'endpoint', 'status']
});
export const httpRequestDuration = new Histogram({
name: 'http_request_duration_seconds',
help: 'HTTP request latency',
labelNames: ['method', 'endpoint'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10]
});
// Expose metrics endpoint
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});- Define metrics with prom-client
- Add
/metricsendpoint - Use consistent label names
Phase 3: Prometheus Setup
Docker Compose
File: monitoring/docker-compose.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
- ./prometheus/alerts:/etc/prometheus/alerts
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
- grafana-data:/var/lib/grafana
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
volumes:
- ./loki/loki.yml:/etc/loki/local-config.yaml
- loki-data:/loki
promtail:
image: grafana/promtail:latest
volumes:
- ./promtail/promtail.yml:/etc/promtail/config.yml
- /var/log:/var/log
command: -config.file=/etc/promtail/config.yml
volumes:
prometheus-data:
grafana-data:
loki-data:- Create docker-compose.yml
- Add Prometheus service
- Add Grafana service
- Add Loki + Promtail for logs
- Configure volumes for persistence
- Set retention periods
Prometheus Configuration
File: monitoring/prometheus/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'backend'
static_configs:
- targets: ['backend:8500'] # Your app's /metrics endpoint
- job_name: 'frontend'
static_configs:
- targets: ['frontend:3000']
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
# Load alerting rules
rule_files:
- '/etc/prometheus/alerts/*.yml'
# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']- Create Prometheus config
- Add scrape targets for all services
- Configure scrape interval (15s recommended)
- Load alerting rules
- Configure Alertmanager
Phase 4: Alerting Rules
File: monitoring/prometheus/alerts/service.yml
groups:
- name: service-health
interval: 30s
rules:
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.job }} is down"
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "Error rate above 5%"
- alert: HighLatency
expr: |
histogram_quantile(0.95,
rate(http_request_duration_seconds_bucket[5m])
) > 2
for: 10m
labels:
severity: high
annotations:
summary: "p95 latency above 2s"- Create alerting rules file
- Add service availability alerts
- Add error rate alerts
- Add latency alerts
- Set appropriate thresholds
- Add meaningful annotations
File: monitoring/prometheus/alerts/application.yml
groups:
- name: application-metrics
interval: 1m
rules:
# Cache performance
- alert: LowCacheHitRate
expr: |
sum(rate(cache_operations_total{result="hit"}[30m])) /
sum(rate(cache_operations_total[30m])) < 0.70
for: 1h
labels:
severity: medium
annotations:
summary: "Cache hit rate below 70%"
# Database performance
- alert: SlowQueries
expr: |
histogram_quantile(0.95,
rate(db_query_duration_seconds_bucket[5m])
) > 0.5
for: 10m
labels:
severity: high
annotations:
summary: "Database queries slow (p95 > 500ms)"
# LLM cost
- alert: HighDailyCost
expr: sum(increase(llm_cost_dollars_total[24h])) > 50
labels:
severity: high
annotations:
summary: "Daily LLM cost exceeded $50"- Add cache alerts
- Add database alerts
- Add LLM cost alerts
- Set severity levels correctly
Phase 5: Grafana Dashboards
Datasource Configuration
File: monitoring/grafana/datasources/prometheus.yml
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
- name: Loki
type: loki
access: proxy
url: http://loki:3100- Configure Prometheus datasource
- Configure Loki datasource
- Set Prometheus as default
Service Overview Dashboard
Create dashboard with:
-
Golden Signals Row:
- Latency (p50, p95, p99)
- Traffic (requests/second)
- Errors (error rate %)
- Saturation (CPU, memory)
-
Request Breakdown:
- Requests by endpoint
- Requests by status code
- Request rate over time
-
Dependencies:
- Database query latency
- Redis latency
- External API latency
-
Resources:
- CPU usage
- Memory usage
- Disk I/O
- Network I/O
Example Panel Queries
Latency Panel:
# p50 latency
histogram_quantile(0.5, rate(http_request_duration_seconds_bucket[5m]))
# p95 latency
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
# p99 latency
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))Traffic Panel:
sum(rate(http_requests_total[5m]))Error Rate Panel:
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m]))- Add all key panels
- Use appropriate visualization types
- Add thresholds for red/yellow/green
- Set refresh interval (10s-30s)
Phase 6: Log Aggregation (Loki)
Loki Configuration
File: monitoring/loki/loki.yml
auth_enabled: false
server:
http_listen_port: 3100
ingester:
lifecycler:
ring:
kvstore:
store: inmemory
replication_factor: 1
schema_config:
configs:
- from: 2024-01-01
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
storage_config:
boltdb_shipper:
active_index_directory: /loki/index
cache_location: /loki/cache
filesystem:
directory: /loki/chunks
limits_config:
retention_period: 168h # 7 days- Create Loki config
- Set retention period
- Configure storage backend
- Set appropriate limits
Promtail Configuration
File: monitoring/promtail/promtail.yml
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
relabel_configs:
- source_labels: ['__meta_docker_container_name']
target_label: 'container'
- source_labels: ['__meta_docker_container_log_stream']
target_label: 'stream'- Create Promtail config
- Configure log sources (Docker, files, etc.)
- Add labels for filtering
- Point to Loki endpoint
Phase 7: Testing & Validation
Test Metrics Collection
# Check metrics endpoint
curl http://localhost:8500/metrics
# Verify Prometheus scraping
curl http://localhost:9090/api/v1/targets
# Query metrics
curl 'http://localhost:9090/api/v1/query?query=http_requests_total'- Verify
/metricsendpoint works - Check Prometheus targets are up
- Query metrics via API
- Verify labels are correct
Test Logging
# Check logs in Loki
curl -G 'http://localhost:3100/loki/api/v1/query' \
--data-urlencode 'query={job="backend"}' \
--data-urlencode 'limit=10'- Verify logs appear in Loki
- Check JSON parsing works
- Verify labels are correct
- Test LogQL queries
Test Alerting
# Check alert rules loaded
curl http://localhost:9090/api/v1/rules
# Check active alerts
curl http://localhost:9090/api/v1/alerts- Verify alert rules loaded
- Trigger test alert (cause error)
- Verify alert fires
- Check alert appears in Alertmanager
Phase 8: Production Deployment
Security Checklist
- Restrict
/metricsendpoint to internal network - Enable authentication for Grafana
- Use HTTPS for all dashboards
- Rotate Grafana admin password
- Set up RBAC for Grafana users
- Enable audit logging
Performance Checklist
- Set appropriate retention periods (Prometheus: 30d, Loki: 7d)
- Configure metric cardinality limits
- Enable query caching
- Set memory limits for Prometheus
- Monitor monitoring stack resource usage
Alerting Checklist
- Configure Alertmanager receivers (Slack, PagerDuty, email)
- Set up alert routing rules
- Add inhibition rules (suppress noisy alerts)
- Test alert delivery
- Create runbooks for all critical alerts
- Set up on-call schedule
Phase 9: Ongoing Maintenance
Daily Checks
- Review active alerts
- Check dashboard for anomalies
- Verify all scrape targets are up
Weekly Checks
- Review top 10 slowest endpoints
- Check error rate trends
- Review LLM cost trends
- Update dashboards as needed
Monthly Checks
- Review alert thresholds (tune for accuracy)
- Clean up unused metrics
- Update Prometheus/Grafana versions
- Review retention policies
- Audit dashboard access
References
- Template:
../scripts/structured-logging.ts - Template:
../scripts/prometheus-metrics.ts - Template:
../scripts/alerting-rules.yml - Example:
../examples/orchestkit-monitoring-dashboard.md - Prometheus Best Practices
- Grafana Documentation
- Loki Documentation
Examples (2)
Orchestkit Langfuse Traces
OrchestKit Langfuse Traces - Real Implementation
This document shows how OrchestKit uses Langfuse for end-to-end LLM observability across its 8-agent LangGraph workflow.
Overview
OrchestKit Analysis Pipeline:
- 8 specialized agents (Tech Comparator, Security Auditor, Implementation Planner, etc.)
- LangGraph supervisor pattern for orchestration
- Langfuse traces for cost tracking, performance monitoring, and debugging
Migration: LangSmith → Langfuse (December 2025)
- Self-hosted, open-source, free
- Better prompt management
- Native cost tracking
- Session-based grouping
Trace Architecture
Analysis Session Structure
content_analysis (session_id: analysis_550e8400)
├── fetch_content (0.3s)
│ └── metadata: {url, content_size_bytes: 45823}
├── generate_embedding (0.8s, $0.0002)
│ └── model: voyage-code-2
│ └── tokens: 11,456 input
└── supervisor_workflow (12.5s, $0.145)
├── supervisor_route_1 (0.1s)
│ └── next_agent: tech_comparator
├── tech_comparator (2.1s, $0.018)
│ ├── analyze_technologies (1.8s, $0.015)
│ │ └── model: claude-sonnet-4-6
│ │ └── tokens: 1,500 input, 1,000 output
│ └── compress_findings (0.2s, $0.003)
│ └── model: claude-sonnet-4-6
│ └── tokens: 800 input, 400 output
├── supervisor_route_2 (0.1s)
│ └── next_agent: security_auditor
├── security_auditor (2.3s, $0.021)
│ └── ... (similar structure)
├── ... (6 more agents)
└── quality_gate (1.2s, $0.012)
├── g_eval_completeness (0.4s, $0.004)
├── g_eval_accuracy (0.4s, $0.004)
├── g_eval_coherence (0.2s, $0.002)
└── g_eval_depth (0.2s, $0.002)Session Metrics:
- Total duration: 15.4s
- Total cost: $0.147
- Agents executed: 8
- Quality scores: completeness=0.85, accuracy=0.92, coherence=0.88, depth=0.78
Implementation Examples
1. Workflow-Level Tracing
File: backend/app/domains/analysis/workflows/content_analysis.py
from langfuse import observe, get_client
from app.shared.services.langfuse.client import langfuse_client
@observe(name="content_analysis_workflow")
async def run_content_analysis(analysis_id: str, url: str) -> AnalysisResult:
"""Analyze content with 8-agent supervisor workflow."""
# Set session-level metadata
get_client().update_current_trace(
name="content_analysis",
session_id=f"analysis_{analysis_id}",
user_id="system",
metadata={
"analysis_id": analysis_id,
"url": url,
"workflow_type": "8-agent-supervisor",
"version": "1.0.0"
},
tags=["production", "orchestkit", "langgraph"]
)
# Step 1: Fetch content (nested span)
content = await fetch_content(url) # @observe decorated
# Step 2: Generate embedding (nested span with cost tracking)
embedding = await generate_embedding(content) # @observe decorated
# Step 3: Run supervisor workflow (8 agents in parallel/sequential)
findings = await run_supervisor_workflow(content)
# Track total cost
total_cost = sum(f.cost_usd for f in findings)
get_client().update_current_observation(
metadata={
"total_agents": len(findings),
"total_cost_usd": total_cost,
"total_tokens": sum(f.token_count for f in findings)
}
)
return AnalysisResult(findings=findings, total_cost=total_cost)2. Agent-Level Tracing
File: backend/app/domains/analysis/workflows/nodes/agent_node.py
@observe(name="agent_execution")
async def execute_agent(
agent_type: str,
content: str,
state: AnalysisState
) -> Finding:
"""Execute single agent with Langfuse tracing."""
# Set agent-specific context
get_client().update_current_observation(
name=f"agent_{agent_type}",
metadata={
"agent_type": agent_type,
"content_length": len(content),
"correlation_id": state["correlation_id"]
}
)
# Call LLM with automatic cost tracking
response = await call_llm_with_tracing(
agent_type=agent_type,
content=content,
state=state
)
# Score the response
quality_scores = await score_agent_output(agent_type, response)
# Add scores to trace
for criterion, score in quality_scores.items():
get_client().score(
name=f"{agent_type}_{criterion}",
value=score,
data_type="NUMERIC"
)
return response3. LLM Call Tracing with Cost Tracking
File: backend/app/shared/services/llm/anthropic_client.py
from langfuse import observe, get_client
@observe(name="llm_call")
async def call_anthropic(
messages: list[dict],
model: str = "claude-sonnet-4-6",
**kwargs
) -> str:
"""Call Anthropic with automatic Langfuse cost tracking."""
# Log input (truncated for large prompts)
get_client().update_current_observation(
input=str(messages)[:2000],
model=model,
metadata={
"temperature": kwargs.get("temperature", 1.0),
"max_tokens": kwargs.get("max_tokens", 4096)
}
)
# Call Anthropic API
response = await anthropic_client.messages.create(
model=model,
messages=messages,
**kwargs
)
# Extract token usage
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
# Cost calculation (Claude Sonnet 4.5 pricing)
input_cost = (input_tokens / 1_000_000) * 3.00 # $3/MTok
output_cost = (output_tokens / 1_000_000) * 15.00 # $15/MTok
total_cost = input_cost + output_cost
# Log output and costs to Langfuse
get_client().update_current_observation(
output=response.content[0].text[:2000],
usage={
"input": input_tokens,
"output": output_tokens,
"unit": "TOKENS"
},
metadata={
"cost_usd": total_cost,
"input_cost_usd": input_cost,
"output_cost_usd": output_cost,
"prompt_caching_enabled": kwargs.get("cache_control") is not None
}
)
logger.info("llm_call_completed",
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=total_cost,
cache_enabled=kwargs.get("cache_control") is not None
)
return response.content[0].text4. Quality Gate Evaluation Tracing
File: backend/app/workflows/nodes/quality_gate_node.py
@observe(name="quality_gate")
async def quality_gate_node(state: AnalysisState) -> AnalysisState:
"""Evaluate aggregated findings with G-Eval scoring."""
get_client().update_current_observation(
metadata={
"findings_count": len(state["findings"]),
"analysis_id": state["analysis_id"]
}
)
# Run G-Eval for 4 criteria in parallel
criteria = ["completeness", "accuracy", "coherence", "depth"]
scores = await asyncio.gather(*[
evaluate_criterion(criterion, state["findings"])
for criterion in criteria
])
# Log individual criterion scores
score_dict = {}
for criterion, score in zip(criteria, scores):
score_dict[criterion] = score
get_client().score(
name=f"quality_{criterion}",
value=score,
comment=f"G-Eval score for {criterion} criterion"
)
# Overall quality score (weighted average)
overall_quality = (
score_dict["completeness"] * 0.3 +
score_dict["accuracy"] * 0.3 +
score_dict["coherence"] * 0.2 +
score_dict["depth"] * 0.2
)
get_client().score(
name="quality_overall",
value=overall_quality,
comment="Weighted average of all criteria"
)
state["quality_scores"] = score_dict
state["overall_quality"] = overall_quality
return stateReal Metrics from Production
Cost Breakdown by Agent
Langfuse Query (Last 30 Days):
SELECT
metadata->>'agent_type' as agent,
COUNT(*) as executions,
AVG(calculated_total_cost) as avg_cost,
SUM(calculated_total_cost) as total_cost,
AVG(input_tokens) as avg_input_tokens,
AVG(output_tokens) as avg_output_tokens
FROM traces
WHERE metadata->>'agent_type' IS NOT NULL
AND timestamp > NOW() - INTERVAL '30 days'
GROUP BY agent
ORDER BY total_cost DESC;Results:
| Agent | Executions | Avg Cost | Total Cost | Avg Input | Avg Output |
|---|---|---|---|---|---|
| security_auditor | 145 | $0.021 | $3.05 | 1,800 | 1,200 |
| implementation_planner | 145 | $0.019 | $2.76 | 1,600 | 1,100 |
| tech_comparator | 145 | $0.018 | $2.61 | 1,500 | 1,000 |
| performance_analyzer | 145 | $0.017 | $2.47 | 1,400 | 950 |
| quality_gate | 145 | $0.012 | $1.74 | 1,000 | 600 |
| architecture_reviewer | 145 | $0.015 | $2.18 | 1,300 | 900 |
| testing_strategist | 145 | $0.014 | $2.03 | 1,200 | 850 |
| documentation_expert | 145 | $0.013 | $1.89 | 1,100 | 800 |
Insights:
- Security Auditor is most expensive (detailed vulnerability analysis)
- Quality Gate is cheapest (focused evaluation)
- Total monthly cost: $18.73 (145 analyses)
- Average per analysis: $0.129
Cache Hit Impact
Before Caching (Dec 2024):
- Monthly cost: $35,000 (projected annual: $420k)
- Average latency: 2.1s per LLM call
After Multi-Level Caching (Jan 2025):
- L1 (Prompt Cache): 90% hit rate → $31,500 saved (90% savings on cache hits)
- L2 (Semantic Cache): 75% hit rate on L1 misses → $2,625 saved (85% savings)
- Final monthly cost: $875
- Total savings: 97.5%
- Average latency: 5-10ms (semantic cache hit)
Langfuse Cache Analytics:
-- Cache hit rate by agent
SELECT
metadata->>'agent_type' as agent,
COUNT(*) FILTER (WHERE metadata->>'cache_hit' = 'true') as cache_hits,
COUNT(*) as total_calls,
ROUND(100.0 * COUNT(*) FILTER (WHERE metadata->>'cache_hit' = 'true') / COUNT(*), 2) as hit_rate_pct
FROM traces
WHERE metadata->>'agent_type' IS NOT NULL
GROUP BY agent
ORDER BY hit_rate_pct DESC;Results:
| Agent | Cache Hits | Total Calls | Hit Rate |
|---|---|---|---|
| tech_comparator | 133 | 145 | 91.7% |
| performance_analyzer | 128 | 145 | 88.3% |
| testing_strategist | 125 | 145 | 86.2% |
| security_auditor | 58 | 145 | 40.0% |
Why security_auditor has low cache hit rate:
- Unique vulnerabilities per codebase
- Security context is highly specific
- Opportunity: Implement vulnerability pattern caching
Dashboard Queries
Top 10 Most Expensive Analyses
SELECT
name,
session_id,
calculated_total_cost as cost_usd,
timestamp,
metadata->>'url' as analyzed_url,
metadata->>'total_agents' as agents_executed
FROM traces
WHERE name = 'content_analysis_workflow'
ORDER BY calculated_total_cost DESC
LIMIT 10;Quality Trend Over Time
SELECT
DATE(timestamp) as date,
AVG(value) FILTER (WHERE name = 'quality_completeness') as avg_completeness,
AVG(value) FILTER (WHERE name = 'quality_accuracy') as avg_accuracy,
AVG(value) FILTER (WHERE name = 'quality_coherence') as avg_coherence,
AVG(value) FILTER (WHERE name = 'quality_depth') as avg_depth,
AVG(value) FILTER (WHERE name = 'quality_overall') as avg_overall
FROM scores
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp)
ORDER BY date;Results (Last 30 Days):
| Date | Completeness | Accuracy | Coherence | Depth | Overall |
|---|---|---|---|---|---|
| 2025-01-15 | 0.83 | 0.91 | 0.87 | 0.76 | 0.84 |
| 2025-01-16 | 0.85 | 0.92 | 0.88 | 0.78 | 0.86 |
| 2025-01-17 | 0.84 | 0.90 | 0.86 | 0.75 | 0.84 |
Trend: Quality scores stable, depth improving (+4% since truncation fix)
Slow Trace Detection
-- Find traces slower than 2 standard deviations
WITH stats AS (
SELECT
AVG(latency_seconds) as mean,
STDDEV(latency_seconds) as stddev
FROM traces
WHERE name = 'content_analysis_workflow'
)
SELECT
t.session_id,
t.latency_seconds,
t.metadata->>'url' as url,
t.timestamp
FROM traces t, stats s
WHERE t.name = 'content_analysis_workflow'
AND t.latency_seconds > (s.mean + 2 * s.stddev)
ORDER BY t.latency_seconds DESC
LIMIT 20;Best Practices from OrchestKit
- Always use @observe decorator - Automatic parent-child span relationships
- Set session_id for multi-step workflows - Group related traces together
- Tag production vs staging - Filter by environment
- Add agent_type to metadata - Enable cost/performance analysis by agent
- Log truncated inputs/outputs - Keep traces small (2000 chars max)
- Score all quality metrics - Enable quality trend monitoring
- Track cache_hit in metadata - Measure caching effectiveness
- Use correlation_id across services - Link to application logs
References
- Langfuse Self-Hosting Guide
- Python SDK Decorators
- Cost Tracking
- OrchestKit QUALITY_INITIATIVE_FIXES.md
Orchestkit Monitoring Dashboard
OrchestKit Monitoring Dashboard - Real Implementation
This document shows OrchestKit's actual monitoring setup including metrics, dashboards, and alerting rules.
Overview
OrchestKit Monitoring Stack:
- Logs: Structlog (JSON) → Loki
- Metrics: Prometheus (RED + business metrics)
- Traces: Langfuse (LLM observability)
- Dashboards: Grafana
- Alerts: Prometheus Alertmanager → Slack
Key Metrics:
- LLM costs: $35k/year → $2-5k/year (95% reduction via caching)
- Retrieval pass rate: 91.6% (target: >90%)
- Quality gate pass rate: 85% (target: >80%)
- Hybrid search latency: 5ms (HNSW index)
Dashboard Structure
1. Service Overview Dashboard
Top Row - Golden Signals:
┌──────────────┬──────────────┬──────────────┬──────────────┐
│ Latency │ Traffic │ Errors │ Saturation │
│ p50: 245ms │ 12.5 req/s │ 0.3% (5xx) │ CPU: 45% │
│ p95: 680ms │ (stable) │ (good) │ Mem: 62% │
│ p99: 1.2s │ │ │ (healthy) │
└──────────────┴──────────────┴──────────────┴──────────────┘Prometheus Queries:
# p95 latency
histogram_quantile(0.95,
rate(http_request_duration_seconds_bucket[5m])
)
# Request rate
sum(rate(http_requests_total[5m]))
# Error rate (5xx)
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m]))
# CPU saturation
avg(rate(process_cpu_seconds_total[5m])) * 1002. LLM Observability Dashboard
Metrics Tracked:
- Cost per model (Claude, Gemini, Voyage)
- Token usage (input/output)
- Cache hit rates (L1: Prompt Cache, L2: Semantic Cache)
- LLM latency distribution
Cost Breakdown Panel:
# Total cost per day by model
sum(increase(llm_cost_dollars_total[1d])) by (model)
# Cost per operation
sum(increase(llm_cost_dollars_total[1h])) by (operation)Example Results:
| Model | Daily Cost | Monthly (Projected) |
|---|---|---|
| claude-sonnet-4-6 | $5.20 | $156 |
| gemini-3-flash | $1.80 | $54 |
| voyage-code-2 | $0.40 | $12 |
| Total | $7.40 | $222 |
Cache Performance Panel:
# Cache hit rate
sum(rate(cache_operations_total{result="hit"}[5m])) /
sum(rate(cache_operations_total[5m]))
# Cost savings from cache (estimated)
sum(rate(cache_operations_total{result="hit"}[5m])) *
avg_over_time(llm_cost_dollars_total[1h])Results:
| Cache Level | Hit Rate | Daily Savings |
|---|---|---|
| L1 (Prompt Cache) | 90% | $90 |
| L2 (Semantic Cache) | 75% | $21 |
| Total Savings | - | $111/day |
3. Quality Metrics Dashboard
Panels:
- Quality gate pass rate (target: >80%)
- G-Eval scores by criterion (completeness, accuracy, coherence, depth)
- Failed analyses count
- Quality score distribution
Quality Gate Pass Rate:
# Pass rate over last 24h
sum(rate(quality_gate_passed_total[24h])) /
sum(rate(quality_gate_total[24h]))G-Eval Scores (from Langfuse):
-- Track quality trends
SELECT
DATE(timestamp) as date,
AVG(value) FILTER (WHERE name = 'quality_completeness') as completeness,
AVG(value) FILTER (WHERE name = 'quality_accuracy') as accuracy,
AVG(value) FILTER (WHERE name = 'quality_coherence') as coherence,
AVG(value) FILTER (WHERE name = 'quality_depth') as depth
FROM langfuse.scores
WHERE timestamp > NOW() - INTERVAL '7 days'
GROUP BY DATE(timestamp);Example Results:
| Date | Completeness | Accuracy | Coherence | Depth | Overall |
|---|---|---|---|---|---|
| 2025-01-20 | 0.85 | 0.92 | 0.88 | 0.78 | 0.86 |
| 2025-01-21 | 0.83 | 0.91 | 0.87 | 0.76 | 0.84 |
4. Database Performance Dashboard
Panels:
- Query latency (p50/p95/p99)
- Connection pool usage
- Slow queries (>500ms)
- Cache hit ratio
Query Latency:
# p95 query latency
histogram_quantile(0.95,
rate(db_query_duration_seconds_bucket[5m])
) by (query_type)Connection Pool:
# Active connections
db_connections_active
# Connection pool saturation
db_connections_active / db_connections_maxReal Metrics:
| Metric | Value | Target |
|---|---|---|
| p50 query latency | 8ms | <100ms |
| p95 query latency | 45ms | <500ms |
| Active connections | 12 | <20 |
| Pool saturation | 60% | <80% |
5. Retrieval Quality Dashboard
Metrics from Golden Dataset (98 analyses, 415 chunks):
Pass Rate:
# Retrieval pass rate (expected chunk in top-k)
sum(retrieval_pass_total) / sum(retrieval_total)Results: 186/203 queries passed = 91.6% pass rate (target: >90%)
MRR by Difficulty:
-- Mean Reciprocal Rank by query difficulty
SELECT
difficulty,
COUNT(*) as queries,
AVG(mrr) as avg_mrr
FROM retrieval_evaluation
GROUP BY difficulty;Results:
| Difficulty | Queries | MRR | Pass Rate |
|---|---|---|---|
| Easy | 78 | 0.892 | 96.2% |
| Medium | 89 | 0.745 | 91.0% |
| Hard | 36 | 0.686 | 83.3% |
| Overall | 203 | 0.777 | 91.6% |
Search Latency:
# Hybrid search latency (HNSW + BM25 RRF)
histogram_quantile(0.95,
rate(search_duration_seconds_bucket[5m])
)Results:
| Operation | p50 | p95 | p99 |
|---|---|---|---|
| Vector search (HNSW) | 3ms | 5ms | 8ms |
| BM25 search | 4ms | 7ms | 12ms |
| RRF fusion | 1ms | 2ms | 3ms |
| Total hybrid search | 8ms | 14ms | 23ms |
Comparison to IVFFlat:
- HNSW: 5ms
- IVFFlat: 85ms
- Speedup: 17x faster
Structured Logging Examples
Log Format
OrchestKit uses structlog with JSON output:
{
"event": "supervisor_routing",
"level": "info",
"timestamp": "2025-01-21T10:30:45.123Z",
"correlation_id": "abc-123-def",
"analysis_id": "550e8400-e29b-41d4-a716-446655440000",
"workflow_step": "supervisor",
"agent": "tech_comparator",
"remaining_agents": 7,
"content_length": 45823,
"logger": "app.workflows.supervisor"
}Key Log Events
1. Analysis Started:
{
"event": "analysis_started",
"level": "info",
"analysis_id": "550e8400-...",
"url": "https://example.com/article",
"content_type": "article"
}2. Agent Execution:
{
"event": "agent_execution_started",
"level": "info",
"agent_type": "security_auditor",
"correlation_id": "abc-123-def",
"analysis_id": "550e8400-..."
}3. LLM Call:
{
"event": "llm_call_completed",
"level": "info",
"model": "claude-sonnet-4-6",
"operation": "security_audit",
"input_tokens": 1800,
"output_tokens": 1200,
"cost_dollars": 0.021,
"duration_seconds": 2.3,
"cache_hit": false
}4. Quality Gate:
{
"event": "quality_gate_passed",
"level": "info",
"analysis_id": "550e8400-...",
"quality_scores": {
"completeness": 0.85,
"accuracy": 0.92,
"coherence": 0.88,
"depth": 0.78
},
"overall_quality": 0.86,
"passed": true
}5. Error Logging:
{
"event": "analysis_failed",
"level": "error",
"analysis_id": "550e8400-...",
"error_type": "ValidationError",
"error_message": "Quality gate failed: depth score too low",
"quality_scores": {
"depth": 0.45
},
"traceback": "...",
"correlation_id": "abc-123-def"
}Loki Queries (LogQL)
Find all errors in last hour:
{app="orchestkit-backend"} |= "ERROR" | jsonCount errors by endpoint:
sum by (endpoint) (
count_over_time({app="orchestkit-backend"} |= "ERROR" [5m])
)Search for specific analysis:
{app="orchestkit-backend"}
| json
| analysis_id="550e8400-e29b-41d4-a716-446655440000"p95 LLM latency from logs:
quantile_over_time(0.95,
{app="orchestkit-backend"}
| json
| event="llm_call_completed"
| unwrap duration_seconds [5m]
)Alerting Rules
1. Service Availability
File: monitoring/prometheus/alerts/service.yml
groups:
- name: service-health
interval: 30s
rules:
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
team: platform
annotations:
summary: "Service {{ $labels.job }} is down"
description: "{{ $labels.instance }} has been down for 1 minute"
runbook_url: "https://wiki.orchestkit.dev/runbooks/service-down"
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m])) /
sum(rate(http_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} (threshold: 5%)"2. LLM Cost Alerts
File: monitoring/prometheus/alerts/llm-cost.yml
groups:
- name: llm-costs
interval: 1h
rules:
- alert: DailyCostExceeded
expr: |
sum(increase(llm_cost_dollars_total[24h])) > 20
labels:
severity: high
team: ai-ml
annotations:
summary: "Daily LLM cost exceeded $20"
description: "Current daily cost: ${{ $value }}"
- alert: UnexpectedCostSpike
expr: |
sum(rate(llm_cost_dollars_total[1h])) >
sum(rate(llm_cost_dollars_total[1h] offset 24h)) * 2
for: 2h
labels:
severity: high
annotations:
summary: "LLM cost spike detected"
description: "Current hourly cost is 2x yesterday's average"3. Quality Degradation
File: monitoring/prometheus/alerts/quality.yml
groups:
- name: quality-metrics
interval: 5m
rules:
- alert: LowQualityGatePassRate
expr: |
sum(rate(quality_gate_passed_total[1h])) /
sum(rate(quality_gate_total[1h])) < 0.80
for: 30m
labels:
severity: high
team: ml
annotations:
summary: "Quality gate pass rate below 80%"
description: "Current pass rate: {{ $value | humanizePercentage }}"
- alert: CacheHitRateDegraded
expr: |
sum(rate(cache_operations_total{result="hit"}[30m])) /
sum(rate(cache_operations_total[30m])) < 0.70
for: 1h
labels:
severity: medium
annotations:
summary: "Cache hit rate below 70%"
description: "Cache performance degraded: {{ $value | humanizePercentage }}"4. Database Performance
File: monitoring/prometheus/alerts/database.yml
groups:
- name: database-performance
interval: 1m
rules:
- alert: SlowQueries
expr: |
histogram_quantile(0.95,
rate(db_query_duration_seconds_bucket[5m])
) > 0.5
for: 10m
labels:
severity: high
annotations:
summary: "p95 query latency exceeded 500ms"
description: "Current p95: {{ $value }}s"
- alert: ConnectionPoolExhausted
expr: db_connections_active / db_connections_max > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "Database connection pool near capacity"
description: "{{ $value | humanizePercentage }} of connections in use"Alert Routing & Escalation
File: monitoring/alertmanager/config.yml
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: slack-default
routes:
# Critical alerts → Slack + PagerDuty
- match:
severity: critical
receiver: pagerduty-critical
continue: true # Also send to Slack
# High severity → Slack
- match:
severity: high
receiver: slack-high
# Medium/low → Slack (throttled)
- match_re:
severity: (medium|low)
receiver: slack-low
group_interval: 1h
receivers:
- name: slack-default
slack_configs:
- api_url: <slack_webhook_url>
channel: '#alerts'
title: '{{ .GroupLabels.alertname }}'
text: '{{ range .Alerts }}{{ .Annotations.summary }}\n{{ end }}'
- name: pagerduty-critical
pagerduty_configs:
- service_key: <pagerduty_service_key>Health Check Endpoints
1. Liveness Probe
Endpoint: GET /health
Purpose: Is the application running?
@app.get("/health")
async def health_check():
"""Basic liveness check."""
return {"status": "healthy"}2. Readiness Probe
Endpoint: GET /ready
Purpose: Is the application ready to serve traffic?
@app.get("/ready")
async def readiness_check():
"""Check if app can handle requests."""
checks = {}
# Database check
try:
await db.execute("SELECT 1")
checks["database"] = {"status": "pass", "latency_ms": 5}
except Exception as e:
checks["database"] = {"status": "fail", "error": str(e)}
# Redis check
try:
await redis.ping()
checks["redis"] = {"status": "pass", "latency_ms": 2}
except Exception as e:
checks["redis"] = {"status": "fail", "error": str(e)}
# Overall status
all_healthy = all(c["status"] == "pass" for c in checks.values())
status = "healthy" if all_healthy else "degraded"
return {
"status": status,
"checks": checks,
"version": "1.0.0",
"uptime": int(time.time() - app.start_time)
}Response:
{
"status": "healthy",
"checks": {
"database": {"status": "pass", "latency_ms": 5},
"redis": {"status": "pass", "latency_ms": 2}
},
"version": "1.0.0",
"uptime": 3600
}References
- Template:
../scripts/structured-logging.ts - Template:
../scripts/prometheus-metrics.ts - Template:
../scripts/alerting-rules.yml - OrchestKit Redis Connection
- OrchestKit Quality Initiative
Memory Fabric
Knowledge graph memory orchestration - entity extraction, query parsing, deduplication, and cross-reference boosting. Use when designing memory orchestration.
Multimodal Llm
Vision, audio, and multimodal LLM integration patterns. Use when processing images, transcribing audio, generating speech, or building multimodal AI pipelines.
Last updated on