Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Distributed Systems

Distributed systems patterns for locking, resilience, idempotency, and rate limiting. Use when implementing distributed locks, circuit breakers, retry policies, idempotency keys, token bucket rate limiters, or fault tolerance patterns.

Reference medium

Primary Agent: backend-system-architect

Distributed Systems Patterns

Comprehensive patterns for building reliable distributed systems. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Distributed Locks3CRITICALRedis/Redlock locks, PostgreSQL advisory locks, fencing tokens
Resilience3CRITICALCircuit breakers, retry with backoff, bulkhead isolation
Idempotency3HIGHIdempotency keys, request dedup, database-backed idempotency
Rate Limiting3HIGHToken bucket, sliding window, distributed rate limits
Edge Computing2HIGHEdge workers, V8 isolates, CDN caching, geo-routing
Event-Driven2HIGHEvent sourcing, CQRS, transactional outbox, sagas

Total: 16 rules across 6 categories

Quick Start

# Redis distributed lock with Lua scripts
async with RedisLock(redis_client, "payment:order-123"):
    await process_payment(order_id)

# Circuit breaker for external APIs
@circuit_breaker(failure_threshold=5, recovery_timeout=30)
@retry(max_attempts=3, base_delay=1.0)
async def call_external_api():
    ...

# Idempotent API endpoint
@router.post("/payments")
async def create_payment(
    data: PaymentCreate,
    idempotency_key: str = Header(..., alias="Idempotency-Key"),
):
    return await idempotent_execute(db, idempotency_key, "/payments", process)

# Token bucket rate limiting
limiter = TokenBucketLimiter(redis_client, capacity=100, refill_rate=10)
if await limiter.is_allowed(f"user:{user_id}"):
    await handle_request()

Distributed Locks

Coordinate exclusive access to resources across multiple service instances.

RuleFileKey Pattern
Redis & Redlockrules/locks-redis-redlock.mdLua scripts, SET NX, multi-node quorum
PostgreSQL Advisoryrules/locks-postgres-advisory.mdSession/transaction locks, lock ID strategies
Fencing Tokensrules/locks-fencing-tokens.mdOwner validation, TTL, heartbeat extension

Resilience

Production-grade fault tolerance for distributed systems.

RuleFileKey Pattern
Circuit Breakerrules/resilience-circuit-breaker.mdCLOSED/OPEN/HALF_OPEN states, sliding window
Retry & Backoffrules/resilience-retry-backoff.mdExponential backoff, jitter, error classification
Bulkhead Isolationrules/resilience-bulkhead.mdSemaphore tiers, rejection policies, queue depth

Idempotency

Ensure operations can be safely retried without unintended side effects.

RuleFileKey Pattern
Idempotency Keysrules/idempotency-keys.mdDeterministic hashing, Stripe-style headers
Request Deduprules/idempotency-dedup.mdEvent consumer dedup, Redis + DB dual layer
Database-Backedrules/idempotency-database.mdUnique constraints, upsert, TTL cleanup

Rate Limiting

Protect APIs with distributed rate limiting using Redis.

RuleFileKey Pattern
Token Bucketrules/ratelimit-token-bucket.mdRedis Lua scripts, burst capacity, refill rate
Sliding Windowrules/ratelimit-sliding-window.mdSorted sets, precise counting, no boundary spikes
Distributed Limitsrules/ratelimit-distributed.mdSlowAPI + Redis, tiered limits, response headers

Edge Computing

Edge runtime patterns for Cloudflare Workers, Vercel Edge, and Deno Deploy.

RuleFileKey Pattern
Edge Workersrules/edge-workers.mdV8 isolate constraints, Web APIs, geo-routing, auth at edge
Edge Cachingrules/edge-caching.mdCache-aside at edge, CDN headers, KV storage, stale-while-revalidate

Event-Driven

Event sourcing, CQRS, saga orchestration, and reliable messaging patterns.

RuleFileKey Pattern
Event Sourcingrules/event-sourcing.mdEvent-sourced aggregates, CQRS read models, optimistic concurrency
Event Messagingrules/event-messaging.mdTransactional outbox, saga compensation, idempotent consumers

Key Decisions

DecisionRecommendation
Lock backendRedis for speed, PostgreSQL if already using it, Redlock for HA
Lock TTL2-3x expected operation time
Circuit breaker recoveryHalf-open probe with sliding window
Retry algorithmExponential backoff + full jitter
Bulkhead isolationSemaphore-based tiers (Critical/Standard/Optional)
Idempotency storageRedis (speed) + DB (durability), 24-72h TTL
Rate limit algorithmToken bucket for most APIs, sliding window for strict quotas
Rate limit storageRedis (distributed, atomic Lua scripts)

When NOT to Use

No separate event-sourcing/saga/CQRS skills exist — they are rules within distributed-systems. But most projects never need them.

PatternInterviewHackathonMVPGrowthEnterpriseSimpler Alternative
Event sourcingOVERKILLOVERKILLOVERKILLOVERKILLWHEN JUSTIFIEDAppend-only table with status column
Saga orchestrationOVERKILLOVERKILLOVERKILLSELECTIVEAPPROPRIATESequential service calls with manual rollback
Circuit breakerOVERKILLOVERKILLBORDERLINEAPPROPRIATEREQUIREDTry/except with timeout
Distributed locksOVERKILLOVERKILLBORDERLINEAPPROPRIATEREQUIREDDatabase row-level lock (SELECT FOR UPDATE)
CQRSOVERKILLOVERKILLOVERKILLOVERKILLWHEN JUSTIFIEDSingle model for read/write
Transactional outboxOVERKILLOVERKILLOVERKILLSELECTIVEAPPROPRIATEDirect publish after commit
Rate limitingOVERKILLOVERKILLSIMPLE ONLYAPPROPRIATEREQUIREDNginx rate limit or cloud WAF

Rule of thumb: If you have a single server process, you do not need distributed systems patterns. Use in-process alternatives. Add distribution only when you actually have multiple instances.

Anti-Patterns (FORBIDDEN)

# LOCKS: Never forget TTL (causes deadlocks)
await redis.set(f"lock:{name}", "1")  # WRONG - no expiry!

# LOCKS: Never release without owner check
await redis.delete(f"lock:{name}")  # WRONG - might release others' lock

# RESILIENCE: Never retry non-retryable errors
@retry(max_attempts=5, retryable_exceptions={Exception})  # Retries 401!

# RESILIENCE: Never put retry outside circuit breaker
@retry  # Would retry when circuit is open!
@circuit_breaker
async def call(): ...

# IDEMPOTENCY: Never use non-deterministic keys
key = str(uuid.uuid4())  # Different every time!

# IDEMPOTENCY: Never cache error responses
if response.status_code >= 400:
    await cache_response(key, response)  # Errors should retry!

# RATE LIMITING: Never use in-memory counters in distributed systems
request_counts = {}  # Lost on restart, not shared across instances

Detailed Documentation

ResourceDescription
scripts/Templates: lock implementations, circuit breaker, rate limiter
checklists/Pre-flight checklists for each pattern category
references/Deep dives: Redlock algorithm, bulkhead tiers, token bucket
examples/Complete integration examples
  • caching - Redis caching patterns, cache as fallback
  • background-jobs - Job deduplication, async processing with retry
  • observability-monitoring - Metrics and alerting for circuit breaker state changes
  • error-handling-rfc9457 - Structured error responses for resilience failures
  • auth-patterns - API key management, authentication integration

Rules (16)

Configure edge caching with CDN invalidation, TTL, and stale-while-revalidate strategies — HIGH

Edge Caching & CDN Patterns

Cache responses at the edge using Cache API, KV storage, and CDN headers for sub-millisecond response times.

Incorrect — caching without TTL or invalidation:

// WRONG: Cache forever, no way to update
export default {
  async fetch(request: Request) {
    const cache = caches.default;
    const cached = await cache.match(request);
    if (cached) return cached; // Stale forever!

    const response = await fetch(request);
    await cache.put(request, response.clone()); // No expiry!
    return response;
  }
};

Correct — cache-aside with TTL and stale-while-revalidate:

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);
    const cacheKey = new Request(url.toString(), request);
    const cache = caches.default;

    // Check edge cache
    let response = await cache.match(cacheKey);
    if (response) return response;

    // Check KV (global, eventually consistent)
    const kvData = await env.CACHE_KV.get(url.pathname, 'text');
    if (kvData) {
      response = new Response(kvData, {
        headers: {
          'Content-Type': 'application/json',
          'Cache-Control': 'public, max-age=60, stale-while-revalidate=300',
          'CDN-Cache-Control': 'max-age=3600', // CDN caches longer
        },
      });
      // Populate edge cache
      await cache.put(cacheKey, response.clone());
      return response;
    }

    // Fetch from origin
    response = await fetch(request);
    const body = await response.text();

    // Store in KV with TTL
    await env.CACHE_KV.put(url.pathname, body, { expirationTtl: 3600 });

    const cachedResponse = new Response(body, {
      headers: {
        'Content-Type': 'application/json',
        'Cache-Control': 'public, max-age=60, stale-while-revalidate=300',
      },
    });
    await cache.put(cacheKey, cachedResponse.clone());
    return cachedResponse;
  }
};

Cache header strategy:

# Static assets (hashed filenames)
Cache-Control: public, max-age=31536000, immutable

# API responses (frequently changing)
Cache-Control: public, max-age=60, stale-while-revalidate=300

# Personalized content
Cache-Control: private, no-cache
Vary: Cookie, Authorization

Key rules:

  • Always set Cache-Control with max-age and stale-while-revalidate
  • Use CDN-Cache-Control to set different TTLs for CDN vs browser
  • Use Vary header for content that changes by user/locale
  • Cloudflare KV is eventually consistent (read-after-write may be stale)
  • Use purge APIs for immediate invalidation of critical content
  • Never cache authenticated/personalized responses without Vary or private

Deploy edge workers with V8 isolate runtime constraints and fallback handling — HIGH

Edge Workers & Runtime

Deploy code to Cloudflare Workers, Vercel Edge, or Deno Deploy with correct runtime constraints and platform-specific patterns.

Incorrect — using Node.js APIs at edge:

// WRONG: Node.js APIs not available in edge runtime
import fs from 'fs';
import { createHash } from 'crypto';

export default async function handler(req: Request) {
  const data = fs.readFileSync('./config.json'); // FAILS at edge
  const hash = createHash('sha256').update(data); // FAILS at edge
  return new Response(hash.digest('hex'));
}

Correct — using Web APIs at edge:

// Cloudflare Worker with Web Crypto API
export default {
  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);
    const country = (request as any).cf?.country || 'US';

    // Web Crypto API (available at edge)
    const token = request.headers.get('Authorization')?.replace('Bearer ', '');
    if (token) {
      const key = await crypto.subtle.importKey(
        'raw',
        new TextEncoder().encode(SECRET),
        { name: 'HMAC', hash: 'SHA-256' },
        false,
        ['verify']
      );
    }

    // Geo-based routing
    if (country === 'EU') {
      return Response.redirect(`https://eu.example.com${url.pathname}`);
    }

    return fetch(request);
  }
};

Vercel Edge Middleware pattern:

// middleware.ts (Next.js Edge Middleware)
import { NextResponse } from 'next/server';
import type { NextRequest } from 'next/server';

export const config = { matcher: ['/dashboard/:path*'] };

export function middleware(request: NextRequest) {
  const token = request.cookies.get('session');
  if (!token) {
    return NextResponse.redirect(new URL('/login', request.url));
  }
  // A/B testing via cookie
  const bucket = request.cookies.get('ab-bucket')?.value || 'control';
  return NextResponse.rewrite(new URL(`/${bucket}${request.nextUrl.pathname}`, request.url));
}

Key rules:

  • Edge runtimes use V8 isolates — no fs, path, child_process, native modules
  • Available: fetch, Request, Response, crypto.subtle, TextEncoder, streams
  • Keep bundles < 1MB compressed for fast cold starts (< 1ms on Cloudflare)
  • Use KV (Cloudflare) or Edge Config (Vercel) for distributed state
  • Use Durable Objects for strong consistency when needed

Guarantee at-least-once delivery using transactional outbox and event messaging patterns — HIGH

Event Messaging & Outbox

Reliable event publishing with transactional outbox, saga orchestration, and message queue patterns.

Incorrect — dual-write without outbox (data loss on failure):

# WRONG: If publish fails, DB has data but event is lost
async def create_order(order: Order):
    await db.insert(order)              # Step 1: succeeds
    await message_broker.publish(        # Step 2: might fail!
        "order.created", order.dict()
    )
    # If publish fails: order exists but no event was sent
    # Downstream services never know about the order

Correct — transactional outbox pattern:

# Outbox table: events written atomically with business data
async def create_order(order: Order, db: AsyncSession):
    async with db.begin():
        # Both in same transaction — atomic!
        db.add(order)
        db.add(OutboxEvent(
            aggregate_id=order.id,
            event_type="order.created",
            payload=order.dict(),
            created_at=datetime.utcnow(),
        ))

# Outbox publisher (separate process, polls for unsent events)
async def publish_outbox_events(db: AsyncSession, broker: MessageBroker):
    while True:
        async with db.begin():
            events = await db.execute(
                select(OutboxEvent)
                .where(OutboxEvent.published_at.is_(None))
                .order_by(OutboxEvent.created_at)
                .limit(100)
                .with_for_update(skip_locked=True)  # Concurrent workers safe
            )
            for event in events.scalars():
                await broker.publish(event.event_type, event.payload)
                event.published_at = datetime.utcnow()
        await asyncio.sleep(1)

Saga orchestration pattern:

class OrderSaga:
    steps = [
        SagaStep("reserve_inventory", compensate="release_inventory"),
        SagaStep("charge_payment", compensate="refund_payment"),
        SagaStep("ship_order", compensate="cancel_shipment"),
    ]

    async def execute(self, order_id: str):
        completed = []
        for step in self.steps:
            try:
                await step.execute(order_id)
                completed.append(step)
            except Exception:
                # Compensate in reverse order
                for s in reversed(completed):
                    await s.compensate(order_id)
                raise SagaFailed(f"Failed at {step.name}")

Idempotent consumer (prevents duplicate processing):

async def handle_event(event: Event, db: AsyncSession):
    async with db.begin():
        # Check if already processed
        exists = await db.execute(
            select(ProcessedEvent).where(ProcessedEvent.event_id == event.id)
        )
        if exists.scalar():
            return  # Already processed, skip

        # Process the event
        await process_order(event.payload)

        # Mark as processed
        db.add(ProcessedEvent(event_id=event.id, processed_at=datetime.utcnow()))

Key rules:

  • Use transactional outbox to atomically save data and events
  • All saga steps must have compensating actions
  • Every message consumer must be idempotent (use event ID deduplication)
  • Use SKIP LOCKED for concurrent outbox workers
  • Kafka for high-throughput streaming, RabbitMQ for routing, Redis Streams for simplicity
  • Use dead letter queues (DLQ) for messages that fail after max retries

Implement event sourcing with CQRS for full audit trails and temporal queries — HIGH

Event Sourcing & CQRS

Store state as immutable events and separate read/write models for scalable, auditable systems.

Incorrect — mutable state without event history:

# WRONG: Direct mutation loses history
class Account:
    def __init__(self, balance: float = 0):
        self.balance = balance

    def deposit(self, amount: float):
        self.balance += amount  # No record of what happened!

    def withdraw(self, amount: float):
        self.balance -= amount  # Can't audit, can't replay

Correct — event-sourced aggregate with CQRS:

from dataclasses import dataclass, field
from typing import Any

@dataclass
class DomainEvent:
    aggregate_id: str
    version: int
    data: dict[str, Any]

class Account:
    def __init__(self):
        self._changes: list[DomainEvent] = []
        self._version = 0
        self.balance = 0.0

    def deposit(self, amount: float):
        if amount <= 0:
            raise ValueError("Amount must be positive")
        self._raise_event("MoneyDeposited", {"amount": amount})

    def withdraw(self, amount: float):
        if amount > self.balance:
            raise ValueError("Insufficient funds")
        self._raise_event("MoneyWithdrawn", {"amount": amount})

    def _raise_event(self, event_type: str, data: dict):
        event = DomainEvent(
            aggregate_id=self.id,
            version=self._version + 1,
            data={"type": event_type, **data},
        )
        self._apply(event)
        self._changes.append(event)

    def _apply(self, event: DomainEvent):
        match event.data["type"]:
            case "MoneyDeposited":
                self.balance += event.data["amount"]
            case "MoneyWithdrawn":
                self.balance -= event.data["amount"]
        self._version = event.version

# Event store with optimistic concurrency
class EventStore:
    async def save(self, aggregate_id: str, events: list[DomainEvent], expected_version: int):
        async with self.db.transaction():
            current = await self.db.fetchval(
                "SELECT MAX(version) FROM events WHERE aggregate_id = $1",
                aggregate_id,
            )
            if current != expected_version:
                raise ConcurrencyError(f"Expected {expected_version}, got {current}")
            for event in events:
                await self.db.execute(
                    "INSERT INTO events (aggregate_id, version, data) VALUES ($1, $2, $3)",
                    aggregate_id, event.version, event.data,
                )

# CQRS: Separate read model projection
class BalanceProjection:
    async def handle(self, event: DomainEvent):
        match event.data["type"]:
            case "MoneyDeposited":
                await self.db.execute(
                    "UPDATE account_balances SET balance = balance + $1 WHERE id = $2",
                    event.data["amount"], event.aggregate_id,
                )
            case "MoneyWithdrawn":
                await self.db.execute(
                    "UPDATE account_balances SET balance = balance - $1 WHERE id = $2",
                    event.data["amount"], event.aggregate_id,
                )

Key rules:

  • Events are immutable and named in past tense (OrderPlaced, not PlaceOrder)
  • Use optimistic concurrency with version checks to prevent conflicts
  • CQRS read models are eventually consistent — design UX accordingly
  • Snapshot every N events (e.g., 100) to avoid replaying long event streams
  • Never delete events — use compensating events instead

Store idempotency keys in PostgreSQL with ACID guarantees for safe retries — HIGH

Database-Backed Idempotency

Schema

CREATE TABLE idempotency_records (
    idempotency_key VARCHAR(64) PRIMARY KEY,
    endpoint VARCHAR(256) NOT NULL,
    request_hash VARCHAR(64) NOT NULL,
    response_body TEXT,
    response_status INTEGER DEFAULT 200,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    expires_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX ix_idempotency_expires ON idempotency_records (expires_at);
CREATE INDEX ix_idempotency_endpoint_key ON idempotency_records (endpoint, idempotency_key);

Idempotent Execute

from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from datetime import UTC, datetime, timedelta
import json

async def idempotent_execute(
    db: AsyncSession,
    idempotency_key: str,
    endpoint: str,
    operation,
    ttl_hours: int = 24,
) -> tuple[Any, int, bool]:
    """Execute operation idempotently. Returns (response, status, was_replayed)."""
    # Check for existing
    existing = await db.get(ProcessedRequest, idempotency_key)
    if existing and existing.expires_at > datetime.now(UTC):
        return json.loads(existing.response_body), existing.status_code, True

    # Execute operation
    result, status_code = await operation()

    # Store result (upsert to handle races)
    stmt = insert(ProcessedRequest).values(
        idempotency_key=idempotency_key,
        endpoint=endpoint,
        status_code=status_code,
        response_body=json.dumps(result),
        expires_at=datetime.now(UTC) + timedelta(hours=ttl_hours),
    ).on_conflict_do_nothing()

    await db.execute(stmt)
    return result, status_code, False

Request Body Validation

Detect misuse: same idempotency key with different request body.

class IdempotencyService:
    def _hash_request(self, body: dict) -> str:
        return hashlib.sha256(
            json.dumps(body, sort_keys=True, default=str).encode()
        ).hexdigest()

    async def check_idempotency(self, key: str, endpoint: str, body: dict):
        row = await self.db.execute(
            text("SELECT request_hash, response_body, response_status "
                 "FROM idempotency_records "
                 "WHERE idempotency_key = :key AND endpoint = :endpoint"),
            {"key": key, "endpoint": endpoint},
        )
        existing = row.fetchone()
        if not existing:
            return None

        if existing.request_hash != self._hash_request(body):
            raise HTTPException(
                status_code=422,
                detail="Idempotency key reused with different request body",
            )
        return {"body": json.loads(existing.response_body),
                "status": existing.response_status, "replayed": True}

TTL Cleanup Job

async def cleanup_expired_records(db: AsyncSession, batch_size: int = 1000) -> int:
    """Delete expired idempotency records. Run daily via scheduler."""
    total_deleted = 0
    while True:
        result = await db.execute(
            text("""
                DELETE FROM idempotency_records
                WHERE idempotency_key IN (
                    SELECT idempotency_key FROM idempotency_records
                    WHERE expires_at < NOW()
                    LIMIT :batch_size
                )
            """),
            {"batch_size": batch_size},
        )
        await db.commit()
        deleted = result.rowcount
        total_deleted += deleted
        if deleted < batch_size:
            break
        await asyncio.sleep(0.1)  # Reduce DB load
    return total_deleted

FastAPI Endpoint Usage

@app.post("/api/orders", status_code=201)
async def create_order(
    order: OrderCreate,
    idempotency_key: str = Header(..., alias="Idempotency-Key"),
    db: AsyncSession = Depends(get_db),
):
    async def process():
        new_order = Order(**order.model_dump())
        db.add(new_order)
        await db.commit()
        return {"order_id": str(new_order.id)}, 201

    response, status, replayed = await idempotent_execute(
        db=db,
        idempotency_key=idempotency_key,
        endpoint="/api/orders",
        operation=process,
    )
    return JSONResponse(
        content=response, status_code=status,
        headers={"Idempotent-Replayed": "true"} if replayed else {},
    )

Key Decisions

AspectRecommendationRationale
StoragePostgreSQLACID guarantees, no extra infra
Key formatSHA-256 hash, 32-64 charsDeterministic, compact
TTL24-72 hoursBalance storage vs replay window
Race handlingON CONFLICT DO NOTHINGFirst writer wins
Response cachingStatus 2xx onlyDon't cache errors
CleanupBatch delete, daily jobAvoid long locks

Incorrect — Check-then-act pattern has race condition between check and insert:

const existing = await db.query("SELECT * FROM idempotency_records WHERE key = $1", [key]);
if (!existing) {
  await processPayment(data);  // Race! Two processes both see "not existing"
  await db.query("INSERT INTO idempotency_records (key, result) VALUES ($1, $2)", [key, result]);
}

Correct — Insert-first pattern atomically claims idempotency key:

try {
  await db.query("INSERT INTO idempotency_records (key, status) VALUES ($1, 'processing')", [key]);
  const result = await processPayment(data);
  await db.query("UPDATE idempotency_records SET result = $1 WHERE key = $2", [result, key]);
} catch (UniqueViolationError) {
  return await db.query("SELECT result FROM idempotency_records WHERE key = $1", [key]);
}

Deduplicate requests using dual-layer Redis and database exactly-once processing — HIGH

Event Consumer Deduplication

Process events exactly once using dual-layer dedup: Redis (fast) + Database (durable).

Dual-Layer Dedup Pattern

from sqlalchemy import select
from sqlalchemy.exc import IntegrityError

class IdempotentConsumer:
    """Process events exactly once using idempotency keys."""

    def __init__(self, db: AsyncSession, redis: redis.Redis):
        self.db = db
        self.redis = redis

    async def process(self, event: dict, handler) -> tuple[Any, bool]:
        """Process event idempotently. Returns (result, was_duplicate)."""
        idempotency_key = event.get("idempotency_key")
        if not idempotency_key:
            return await handler(event), False

        # Fast path: check Redis cache
        cache_key = f"processed:{idempotency_key}"
        if await self.redis.exists(cache_key):
            return None, True

        # Slow path: check database
        existing = await self.db.execute(
            select(ProcessedEvent)
            .where(ProcessedEvent.idempotency_key == idempotency_key)
        )
        if existing.scalar_one_or_none():
            await self.redis.setex(cache_key, 86400, "1")  # Backfill cache
            return None, True

        # Process with database lock to prevent races
        try:
            async with self.db.begin_nested():
                self.db.add(ProcessedEvent(idempotency_key=idempotency_key))
                await self.db.flush()
                result = await handler(event)

            await self.redis.setex(cache_key, 86400, "1")
            return result, False

        except IntegrityError:
            return None, True  # Another process claimed it

Kafka Consumer Integration

async def consume_orders(processor: IdempotentEventProcessor):
    consumer = AIOKafkaConsumer("orders", bootstrap_servers="localhost:9092")
    await consumer.start()

    try:
        async for msg in consumer:
            event = json.loads(msg.value)
            result, was_duplicate = await processor.process_event(
                event_id=event["event_id"],
                event_type="order.created",
                handler=handle_order_created,
                order_data=event["data"],
            )
            if was_duplicate:
                logger.info(f"Skipped duplicate: {event['event_id']}")
    finally:
        await consumer.stop()

Database-Tracked Event Processing

class IdempotentEventProcessor:
    """Track processed events in database for exactly-once semantics."""

    async def process_event(self, event_id: str, event_type: str,
                           handler, *args, **kwargs) -> tuple:
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                try:
                    await conn.execute(
                        "INSERT INTO processed_events (event_id, event_type) VALUES ($1, $2)",
                        event_id, event_type,
                    )
                except UniqueViolationError:
                    existing = await conn.fetchrow(
                        "SELECT result FROM processed_events WHERE event_id = $1",
                        event_id,
                    )
                    return existing["result"], True

                result = await handler(*args, **kwargs)
                await conn.execute(
                    "UPDATE processed_events SET result = $1 WHERE event_id = $2",
                    json.dumps(result) if result else None, event_id,
                )
                return result, False

Key Design Decisions

AspectRecommendationRationale
Fast layerRedis with TTLSub-millisecond lookups for hot path
Durable layerDatabase unique constraintSurvives Redis restart, handles races
Lock strategyINSERT then processClaim key before processing to prevent races
Cache backfillOn DB hit, write to RedisSpeeds up subsequent duplicate checks
TTL24-72 hoursBalance storage vs replay window

Common Mistakes

# NEVER check-then-act without locking
async def bad_process(key):
    if not await exists(key):  # Race condition!
        await process()
        await mark_processed(key)

# CORRECT: Insert first (claims the key atomically)
async def good_process(key):
    try:
        await insert_processed(key)  # Atomic claim
        await process()
    except IntegrityError:
        pass  # Already processed

Incorrect — Redis-only deduplication loses state after restart:

async def process_event(event_id: str):
    if await redis.exists(f"processed:{event_id}"):
        return  # Already processed
    await handle_event(event_id)
    await redis.set(f"processed:{event_id}", "1", ex=86400)
# Redis restart = all events reprocessed!

Correct — Dual-layer dedup: Redis (fast) + database (durable):

async def process_event(event_id: str):
    # Fast path: Redis check
    if await redis.exists(f"processed:{event_id}"):
        return
    # Durable check: Database with unique constraint
    try:
        await db.execute("INSERT INTO processed_events (event_id) VALUES ($1)", event_id)
        await handle_event(event_id)
        await redis.set(f"processed:{event_id}", "1", ex=86400)
    except UniqueViolationError:
        pass  # Already processed

Generate deterministic idempotency keys using Stripe-style headers for safe retries — HIGH

Idempotency Key Generation & Stripe-Style Header

Deterministic Key Generation

import hashlib
import json
from typing import Any

def generate_idempotency_key(
    *,
    entity_id: str,
    action: str,
    params: dict[str, Any] | None = None,
) -> str:
    """Generate deterministic idempotency key.

    Same input always produces the same key.
    """
    content = f"{entity_id}:{action}"
    if params:
        content += f":{json.dumps(params, sort_keys=True)}"
    return hashlib.sha256(content.encode()).hexdigest()[:32]

# Examples
key = generate_idempotency_key(
    entity_id="order-123",
    action="create",
    params={"amount": 100, "currency": "USD"},
)

Stripe-Style Idempotency Header

Clients send Idempotency-Key header with POST requests. Server caches successful responses and replays them on duplicate requests.

Client Request (Idempotency-Key: abc-123)
     |
     v
Check cache (Redis) --> Exists? --> Return cached response
     |                                (Idempotent-Replayed: true)
     NO
     |
     v
Acquire lock --> Process request --> Cache response --> Return

FastAPI Middleware

from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import redis.asyncio as redis
import json

class IdempotencyMiddleware(BaseHTTPMiddleware):
    """Handle Idempotency-Key header for POST/PUT/PATCH."""

    def __init__(self, app, redis_client: redis.Redis, ttl: int = 86400):
        super().__init__(app)
        self.redis = redis_client
        self.ttl = ttl

    async def dispatch(self, request: Request, call_next):
        if request.method not in ("POST", "PUT", "PATCH"):
            return await call_next(request)

        idempotency_key = request.headers.get("Idempotency-Key")
        if not idempotency_key:
            return await call_next(request)

        cache_key = f"idem:{request.url.path}:{idempotency_key}"

        # Check for cached response
        cached = await self.redis.get(cache_key)
        if cached:
            data = json.loads(cached)
            return Response(
                content=data["body"],
                status_code=data["status"],
                media_type="application/json",
                headers={"X-Idempotent-Replayed": "true"},
            )

        # Process request
        response = await call_next(request)

        # Cache successful responses only
        if 200 <= response.status_code < 300:
            body = b"".join([chunk async for chunk in response.body_iterator])
            await self.redis.setex(
                cache_key, self.ttl,
                json.dumps({"body": body.decode(), "status": response.status_code}),
            )
            return Response(content=body, status_code=response.status_code,
                          media_type=response.media_type)

        return response

Key Principles

  1. Keys are deterministic -- same input = same key (never use uuid4)
  2. Keys are scoped to endpoint -- same key on different endpoints = different operations
  3. 24-hour window -- keys expire after 24 hours
  4. Only cache success -- errors (4xx/5xx) allow retry
  5. Lock during processing -- prevents concurrent duplicates

Common Mistakes

# NEVER use non-deterministic keys
key = str(uuid.uuid4())  # Different every time!

# NEVER include timestamps in keys
key = f"{event.id}:{datetime.now()}"  # Timestamp varies!

# NEVER skip idempotency for financial operations
@router.post("/payments")
async def create_payment(data):
    return await process_payment(data)  # No idempotency!

Incorrect — Random UUID keys make retry detection impossible:

# Client generates new key on each retry
key = str(uuid.uuid4())  # New key every time!
await post("/api/orders", headers={"Idempotency-Key": key}, data=order)
# Retry creates duplicate order

Correct — Deterministic keys ensure retries are detected and deduplicated:

# Client generates same key for same operation
key = hashlib.sha256(f"{order_id}:create:{json.dumps(order_data, sort_keys=True)}".encode()).hexdigest()
await post("/api/orders", headers={"Idempotency-Key": key}, data=order)
# Retry returns cached response

Validate lock ownership with fencing tokens and TTL to prevent data corruption — CRITICAL

Lock Safety: Fencing Tokens, TTL & Heartbeat

Owner Validation (Fencing)

Every lock operation MUST validate the owner before acting. Without this, a slow process whose lock expired can corrupt data when a new owner holds the lock.

# WRONG: No owner check
await redis.delete(f"lock:{name}")  # Might release someone else's lock!

# CORRECT: Atomic owner check via Lua
RELEASE = """
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
end
return 0
"""

TTL Management

Lock TTL must be set to prevent deadlocks from crashed processes. Rule of thumb: TTL = 2-3x expected operation duration.

Operation DurationRecommended TTLRationale
< 1 second5 secondsFast operations with margin
1-10 seconds30 secondsStandard processing
10-60 seconds3 minutesLong operations, use heartbeat
> 60 seconds5 minutes + heartbeatMust extend during processing

Heartbeat Extension

For long-running tasks, extend the lock TTL periodically to prevent expiry mid-operation.

import asyncio
from datetime import timedelta

async def long_running_task(task_id: str, redis_client):
    lock = RedisLock(redis_client, f"task:{task_id}", ttl=timedelta(seconds=30))

    async with lock:
        # Background heartbeat extends lock every 10s
        async def heartbeat():
            while lock.is_acquired:
                await lock.extend(timedelta(seconds=30))
                await asyncio.sleep(10)

        heartbeat_task = asyncio.create_task(heartbeat())
        try:
            await do_long_work()
        finally:
            heartbeat_task.cancel()

Lock Retry with Exponential Backoff

from functools import wraps

def with_lock(lock_name: str, ttl_s: int = 30, retries: int = 3):
    """Decorator to acquire lock before function execution."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, redis_client=None, **kwargs):
            for attempt in range(retries):
                lock = RedisLock(redis_client, lock_name, ttl_ms=ttl_s * 1000)
                if await lock.acquire():
                    try:
                        return await func(*args, **kwargs)
                    finally:
                        await lock.release()
                await asyncio.sleep(0.1 * (2 ** attempt))  # Backoff
            raise LockAcquisitionError(f"Failed after {retries} attempts")
        return wrapper
    return decorator

Lock Ordering (Deadlock Prevention)

When acquiring multiple locks, always acquire in a consistent order.

async def transfer_funds(session, from_account: int, to_account: int, amount):
    # Always lock in sorted order to prevent deadlocks
    accounts = sorted([from_account, to_account])
    for account_id in accounts:
        await session.execute(
            text("SELECT pg_advisory_xact_lock(:ns, :id)"),
            {"ns": NAMESPACE_ACCOUNT, "id": account_id},
        )
    await debit_account(session, from_account, amount)
    await credit_account(session, to_account, amount)
    await session.commit()

Checklist

  • Owner ID stored with lock (UUIDv7 recommended)
  • Atomic release validates owner via Lua script
  • TTL always set (prevents permanent deadlocks)
  • Heartbeat for operations > 30 seconds
  • Lock ordering for multiple locks
  • Retry with exponential backoff + jitter
  • Metrics: acquisition time, hold duration, failures

Incorrect — Releasing lock without owner validation can delete another process's lock:

await redis.delete(f"lock:{resource_id}")
# If our lock expired and another process acquired it,
# we just deleted their lock!

Correct — Atomic owner validation via Lua ensures only owner can release:

RELEASE_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
  return redis.call('del', KEYS[1])
end
return 0
"""
result = await redis.eval(RELEASE_SCRIPT, keys=[f"lock:{resource_id}"], args=[owner_id])
# Only deletes if owner_id matches

Use PostgreSQL advisory locks for session and transaction-level distributed locking — CRITICAL

PostgreSQL Advisory Locks

No extra infrastructure needed -- uses existing PostgreSQL with ACID guarantees.

Lock Types

TypeScopeReleaseUse Case
Session-levelConnectionExplicit or disconnectLong-running jobs, singletons
Transaction-levelTransactionCommit/rollbackData consistency within transactions

Session-Level Lock

from contextlib import asynccontextmanager
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

class PostgresAdvisoryLock:
    """PostgreSQL advisory lock (session-level)."""

    def __init__(self, session: AsyncSession, lock_id: int):
        self._session = session
        self._lock_id = lock_id
        self._acquired = False

    async def acquire(self, blocking: bool = True) -> bool:
        if blocking:
            await self._session.execute(
                text("SELECT pg_advisory_lock(:lock_id)"),
                {"lock_id": self._lock_id},
            )
            self._acquired = True
            return True
        else:
            result = await self._session.execute(
                text("SELECT pg_try_advisory_lock(:lock_id)"),
                {"lock_id": self._lock_id},
            )
            self._acquired = result.scalar()
            return self._acquired

    async def release(self) -> bool:
        if not self._acquired:
            return False
        result = await self._session.execute(
            text("SELECT pg_advisory_unlock(:lock_id)"),
            {"lock_id": self._lock_id},
        )
        released = result.scalar()
        if released:
            self._acquired = False
        return released

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, *_):
        await self.release()

Transaction-Level Lock

class PostgresTransactionLock:
    """Auto-released on commit/rollback."""

    async def acquire(self, session: AsyncSession, lock_id: int, blocking: bool = True) -> bool:
        if blocking:
            await session.execute(
                text("SELECT pg_advisory_xact_lock(:lock_id)"),
                {"lock_id": lock_id},
            )
            return True
        else:
            result = await session.execute(
                text("SELECT pg_try_advisory_xact_lock(:lock_id)"),
                {"lock_id": lock_id},
            )
            return result.scalar()

Lock ID Strategy

import hashlib

def string_to_lock_id(name: str) -> int:
    """Convert any string to a PostgreSQL bigint lock ID."""
    hash_bytes = hashlib.md5(name.encode()).digest()[:8]
    return int.from_bytes(hash_bytes, byteorder="big", signed=True)

# Usage
lock_id = string_to_lock_id("payment:order-123")

# Two-key lock for namespacing
NAMESPACE_PAYMENT = 1
await session.execute(
    text("SELECT pg_advisory_lock(:ns, :id)"),
    {"ns": NAMESPACE_PAYMENT, "id": 12345},
)

Practical Example: Singleton Job

async def run_scheduled_job(session: AsyncSession):
    lock_id = string_to_lock_id("daily-report-job")
    lock = PostgresAdvisoryLock(session, lock_id)

    if not await lock.acquire(blocking=False):
        print("Job already running on another instance")
        return

    try:
        await generate_daily_report()
    finally:
        await lock.release()

Monitoring

SELECT l.pid, l.objid as lock_id, l.granted,
       a.application_name, a.client_addr,
       now() - a.state_change as duration
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.locktype = 'advisory';

Incorrect — Session-level lock without cleanup causes deadlock after crash:

await session.execute(text("SELECT pg_advisory_lock(:id)"), {"id": lock_id})
await process_critical_section()
# Process crashes before unlock - lock held forever!

Correct — Transaction-level lock auto-releases on commit/rollback:

async with session.begin():
    await session.execute(text("SELECT pg_advisory_xact_lock(:id)"), {"id": lock_id})
    await process_critical_section()
    # Auto-released on commit/rollback/disconnect

Implement distributed locking with Redis Lua scripts and multi-node Redlock algorithm — CRITICAL

Redis & Redlock Distributed Locks

Single-Node Redis Lock (Lua Script)

from uuid_utils import uuid7
import redis.asyncio as redis

class RedisLock:
    """Redis lock with Lua scripts for atomicity."""

    ACQUIRE = """
    if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then
        return 1
    end
    return 0
    """

    RELEASE = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    end
    return 0
    """

    EXTEND = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('pexpire', KEYS[1], ARGV[2])
    end
    return 0
    """

    def __init__(self, client: redis.Redis, name: str, ttl_ms: int = 30000):
        self._client = client
        self._name = f"lock:{name}"
        self._owner = str(uuid7())
        self._ttl = ttl_ms

    async def acquire(self) -> bool:
        return await self._client.eval(
            self.ACQUIRE, 1, self._name, self._owner, self._ttl
        ) == 1

    async def release(self) -> bool:
        return await self._client.eval(
            self.RELEASE, 1, self._name, self._owner
        ) == 1

    async def extend(self, ttl_ms: int | None = None) -> bool:
        return await self._client.eval(
            self.EXTEND, 1, self._name, self._owner, ttl_ms or self._ttl
        ) == 1

    async def __aenter__(self):
        if not await self.acquire():
            raise LockError(f"Failed to acquire {self._name}")
        return self

    async def __aexit__(self, *_):
        await self.release()

Redlock Algorithm (Multi-Node HA)

Provides fault-tolerant locking across N Redis instances (recommend N=5).

1. Get current time (T1)
2. Try to acquire lock on N nodes sequentially
3. Get current time (T2)
4. Lock valid if:
   - Acquired on majority (N/2 + 1) nodes
   - Elapsed (T2-T1) < TTL - clock_drift
5. If valid: use lock with remaining TTL
6. If invalid: release on all nodes
from dataclasses import dataclass
from datetime import timedelta
import time

@dataclass
class RedlockResult:
    acquired: bool
    validity_time_ms: int = 0
    resource: str = ""
    owner_id: str = ""

class Redlock:
    """Distributed lock across multiple Redis instances."""

    def __init__(self, clients: list[redis.Redis], ttl: timedelta = timedelta(seconds=30)):
        if len(clients) < 3:
            raise ValueError("Redlock requires at least 3 Redis instances")
        self._clients = clients
        self._ttl_ms = int(ttl.total_seconds() * 1000)
        self._quorum = len(clients) // 2 + 1

    async def acquire(self, resource: str, retry_count: int = 3) -> RedlockResult:
        owner_id = str(uuid7())
        for attempt in range(retry_count):
            start = time.monotonic()
            acquired_count = sum(
                1 for c in self._clients
                if await self._try_lock(c, resource, owner_id)
            )
            elapsed_ms = int((time.monotonic() - start) * 1000)
            drift_ms = int(self._ttl_ms * 0.01) + 2
            validity = self._ttl_ms - elapsed_ms - drift_ms

            if acquired_count >= self._quorum and validity > 0:
                return RedlockResult(True, validity, resource, owner_id)

            await self._release_all(resource, owner_id)
        return RedlockResult(acquired=False, resource=resource)

When to Use

Single-Node Redis LockRedlock
Development/testingProduction with HA
Non-critical operationsCritical operations (payments)
Single datacenterMulti-datacenter
Cost-sensitiveReliability-critical

Common Mistakes

  • Forgetting TTL (causes permanent deadlocks)
  • Releasing without owner check (releases someone else's lock)
  • Using single Redis for critical operations (SPOF)
  • Holding locks across slow external calls without heartbeat

Incorrect — SET without NX allows multiple processes to "acquire" same lock:

await redis.set(f"lock:{name}", owner_id, ex=30)
# Two processes can both succeed!

Correct — SET NX atomically acquires lock only if key doesn't exist:

acquired = await redis.set(f"lock:{name}", owner_id, nx=True, ex=30)
# Only one process succeeds, others get False

Protect APIs with distributed rate limiting using SlowAPI, Redis, and tiered limits — HIGH

Distributed Rate Limiting with SlowAPI and Tiered Limits

SlowAPI + Redis (FastAPI)

from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.middleware import SlowAPIMiddleware

limiter = Limiter(
    key_func=get_remote_address,
    storage_uri="redis://localhost:6379",
    strategy="moving-window",
)

app = FastAPI()
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)

# Endpoint limits
@router.post("/api/v1/auth/login")
@limiter.limit("10/minute")       # Strict for auth
async def login(request: Request): ...

@router.get("/api/v1/analyses")
@limiter.limit("100/minute")      # Normal for reads
async def list_analyses(request: Request): ...

@router.post("/api/v1/analyses")
@limiter.limit("20/minute")       # Moderate for writes
async def create_analysis(request: Request): ...

User-Based Key Function

def get_user_identifier(request: Request) -> str:
    """Rate limit by user ID if authenticated, else IP."""
    if hasattr(request.state, "user"):
        return f"user:{request.state.user.id}"
    return f"ip:{get_remote_address(request)}"

limiter = Limiter(key_func=get_user_identifier)

Tiered Rate Limits

from enum import Enum

class UserTier(Enum):
    FREE = "free"
    PRO = "pro"
    ENTERPRISE = "enterprise"

TIER_LIMITS = {
    UserTier.FREE: {"requests": 100, "window": 3600},
    UserTier.PRO: {"requests": 1000, "window": 3600},
    UserTier.ENTERPRISE: {"requests": 10000, "window": 3600},
}

async def get_rate_limit(user: User) -> str:
    limits = TIER_LIMITS[user.tier]
    return f"{limits['requests']}/{limits['window']}seconds"

@router.get("/api/v1/data")
@limiter.limit(get_rate_limit)
async def get_data(request: Request, user: User = Depends(get_current_user)):
    ...

Response Headers (RFC 6585)

async def add_rate_limit_headers(response: Response, limit: int,
                                  remaining: int, reset_at: datetime):
    response.headers["X-RateLimit-Limit"] = str(limit)
    response.headers["X-RateLimit-Remaining"] = str(remaining)
    response.headers["X-RateLimit-Reset"] = str(int(reset_at.timestamp()))
    response.headers["Retry-After"] = str(
        int((reset_at - datetime.now(timezone.utc)).seconds)
    )

429 Error Response

def rate_limit_exceeded_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=429,
        content={
            "type": "https://api.example.com/errors/rate-limit-exceeded",
            "title": "Too Many Requests",
            "status": 429,
            "detail": "Rate limit exceeded. Please retry after the reset time.",
            "instance": str(request.url),
        },
        headers={"Retry-After": "60", "X-RateLimit-Remaining": "0"},
    )

Algorithm Selection

AlgorithmUse CaseBurst Handling
Token BucketGeneral API, allows burstsExcellent
Sliding WindowPrecise, no burst spikesGood
Leaky BucketSteady rate, queue excessNone
Fixed WindowSimple, some edge issuesModerate

Key Decisions

DecisionRecommendation
StorageRedis (distributed, atomic)
AlgorithmToken bucket for most APIs
KeyUser ID if auth, else IP + fingerprint
Auth endpoints10/min (strict)
Read endpoints100-1000/min (based on tier)
Write endpoints20-100/min (moderate)

Common Mistakes

# NEVER use in-memory counters in distributed systems
request_counts = {}  # Lost on restart, not shared!

# NEVER skip rate limiting on internal APIs
@router.get("/internal/admin")
async def admin_endpoint():  # No rate limit = vulnerable
    ...

# NEVER use fixed window without considering edge spikes

Incorrect — In-memory counters don't work in distributed systems:

counters = {}  # Per-instance state
@app.post("/api/orders")
async def create_order(user_id: str):
    counters[user_id] = counters.get(user_id, 0) + 1
    if counters[user_id] > 100:
        raise RateLimitExceeded()
# Multiple instances = no shared state

Correct — Redis provides atomic distributed rate limiting across all instances:

@app.post("/api/orders")
@limiter.limit("100/hour")
async def create_order(request: Request, user_id: str):
    # Shared Redis state across all instances
    ...

Implement sliding window rate limiting with Redis sorted sets to prevent boundary spikes — HIGH

Sliding Window Rate Limiting

Precise rate limiting that avoids fixed window boundary spikes.

Problem with Fixed Windows

A user can hit 100 at 0:59 and 100 at 1:01 = 200 requests in 2 seconds with a "100/minute" limit.

Sliding window solves this by tracking individual request timestamps.

Redis Implementation

import redis.asyncio as redis
from datetime import datetime, timezone

class SlidingWindowLimiter:
    def __init__(self, redis_client: redis.Redis, window_seconds: int = 60):
        self.redis = redis_client
        self.window = window_seconds

    async def is_allowed(self, key: str, limit: int) -> tuple[bool, int]:
        """Returns (allowed, remaining)."""
        now = datetime.now(timezone.utc).timestamp()
        window_start = now - self.window
        bucket_key = f"ratelimit:sliding:{key}"

        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(bucket_key, 0, window_start)  # Remove old
        pipe.zcard(bucket_key)                               # Count current
        pipe.zadd(bucket_key, {str(now): now})               # Add this request
        pipe.expire(bucket_key, self.window * 2)             # Set expiry

        results = await pipe.execute()
        current_count = results[1]

        if current_count < limit:
            return True, limit - current_count - 1
        return False, 0

Atomic Lua Script (Better for Production)

local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)

-- Count current entries
local count = redis.call('ZCARD', key)

if count < limit then
    redis.call('ZADD', key, now, now .. ':' .. math.random())
    redis.call('EXPIRE', key, window)
    return {1, limit - count - 1, 0}
else
    local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
    local retry_after = 0
    if oldest[2] then
        retry_after = math.ceil((tonumber(oldest[2]) + window * 1000 - now) / 1000)
    end
    return {0, 0, retry_after}
end

When to Use

Sliding WindowToken Bucket
Strict quotas (billing)General API limits
No burst toleranceBurst-friendly
Higher memory (O(n) timestamps)O(1) memory
Exact countingApproximate counting

Best Practices

  1. Use Redis sorted sets with timestamps as scores
  2. Clean expired entries on every check (ZREMRANGEBYSCORE)
  3. Set EXPIRE on the key to auto-cleanup inactive users
  4. Use pipeline or Lua script for atomicity
  5. Consider memory: each request stores a member in the sorted set

Incorrect — Fixed window allows 200 requests in 2 seconds with "100/min" limit:

# Fixed window resets at minute boundaries
window_start = int(time.time() / 60) * 60
if get_count(f"{user}:{window_start}") < 100:
    allow()
# User can send 100 at 0:59 and 100 at 1:01 = 200 in 2 seconds!

Correct — Sliding window tracks individual timestamps for precise limiting:

now = time.time()
window_start = now - 60  # Last 60 seconds
await redis.zremrangebyscore(key, 0, window_start)  # Remove old
count = await redis.zcard(key)  # Count current
if count < 100:
    await redis.zadd(key, {str(now): now})  # Add request
# Exactly 100 requests per rolling 60-second window

Implement token bucket rate limiting with atomic Redis operations and burst capacity — HIGH

Token Bucket Rate Limiting

Allows bursts up to bucket capacity while maintaining a steady average rate.

How It Works

Bucket: capacity=10, refill_rate=5/sec

t=0s: 10 tokens | 10 requests -> 0 tokens (burst allowed)
t=1s: +5 tokens | 5 tokens available
t=2s: +5 tokens | 10 tokens (capped at capacity)

Redis Implementation (Atomic Lua Script)

import redis.asyncio as redis
from datetime import datetime, timezone

class TokenBucketLimiter:
    SCRIPT = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local tokens_requested = tonumber(ARGV[3])
    local now = tonumber(ARGV[4])

    local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
    local current_tokens = tonumber(bucket[1]) or capacity
    local last_update = tonumber(bucket[2]) or now

    -- Calculate refill
    local elapsed = (now - last_update) / 1000
    current_tokens = math.min(capacity, current_tokens + elapsed * refill_rate)

    -- Check and consume
    local allowed = 0
    local remaining = math.floor(current_tokens)
    local retry_after = 0

    if current_tokens >= tokens_requested then
        allowed = 1
        remaining = math.floor(current_tokens - tokens_requested)
        current_tokens = current_tokens - tokens_requested
    else
        local needed = tokens_requested - current_tokens
        retry_after = math.ceil(needed / refill_rate)
    end

    redis.call('HMSET', key, 'tokens', current_tokens, 'last_update', now)
    redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) * 2)
    return {allowed, remaining, retry_after}
    """

    def __init__(self, redis_client: redis.Redis,
                 capacity: int = 100, refill_rate: float = 10.0):
        self.redis = redis_client
        self.capacity = capacity
        self.refill_rate = refill_rate

    async def is_allowed(self, key: str, tokens: int = 1) -> bool:
        now = datetime.now(timezone.utc).timestamp() * 1000
        result = await self.redis.eval(
            self.SCRIPT, 1, f"ratelimit:token:{key}",
            self.capacity, self.refill_rate, tokens, now,
        )
        return result[0] == 1

Properties

PropertyDescription
Burst CapacityAllows short bursts up to bucket size
Smooth LimitingTokens refill continuously
O(1) MemoryOnly stores tokens + timestamp per key
DistributedAtomic via Redis Lua script

When to Use

Good for: API rate limiting (natural bursts), user actions (login), resource protection

Not ideal for: Strict per-second quotas (use sliding window), billing limits, fair queuing (use leaky bucket)

vs Sliding Window

AspectToken BucketSliding Window
Burst HandlingAllows up to capacitySpreads evenly
MemoryO(1) per keyO(n) timestamps
PrecisionApproximateExact
Redis Operations1 HMSET1 ZADD + 1 ZREMRANGEBYSCORE

Incorrect — Check-then-act pattern has race condition in token refill:

const bucket = await redis.get(`bucket:${user}`);
const tokens = JSON.parse(bucket).tokens + elapsed * refillRate;
if (tokens >= 1) {
  // Race! Another request can pass this check too
  await redis.set(`bucket:${user}`, JSON.stringify({tokens: tokens - 1}));
}

Correct — Atomic Lua script ensures thread-safe token bucket operations:

const result = await redis.eval(TOKEN_BUCKET_SCRIPT,
  keys: [`bucket:${user}`],
  args: [capacity, refillRate, tokensRequested, Date.now()]
);
// Single atomic operation, no race conditions

Isolate failures with bulkhead partitioning and tier-based resource capacity limits — CRITICAL

Bulkhead Pattern

Isolates failures by partitioning resources into independent pools. One failing component does not bring down the entire system.

Tier-Based Configuration

TierWorkersQueueTimeoutUse Case
1 (Critical)510180-300sSynthesis, quality gate, user-facing
2 (Standard)812120sAnalysis agents, data processing
3 (Optional)4660sEnrichment, caching, analytics

Implementation

from asyncio import Semaphore, wait_for, TimeoutError
from enum import Enum

class Tier(Enum):
    CRITICAL = 1
    STANDARD = 2
    OPTIONAL = 3

class Bulkhead:
    def __init__(self, tier: Tier, max_concurrent: int, queue_size: int, timeout: float):
        self.tier = tier
        self.semaphore = Semaphore(max_concurrent)
        self.queue_size = queue_size
        self.timeout = timeout
        self.waiting = 0
        self.active = 0

    async def execute(self, fn):
        if self.waiting >= self.queue_size:
            raise BulkheadFullError(f"Tier {self.tier.name} queue full")

        self.waiting += 1
        try:
            await wait_for(self.semaphore.acquire(), timeout=self.timeout)
            self.waiting -= 1
            self.active += 1
            try:
                return await wait_for(fn(), timeout=self.timeout)
            finally:
                self.active -= 1
                self.semaphore.release()
        except TimeoutError:
            self.waiting -= 1
            raise BulkheadTimeoutError(f"Tier {self.tier.name} timeout")

Rejection Policies

class RejectionPolicy(Enum):
    ABORT = "abort"        # Return error immediately
    CALLER_RUNS = "caller" # Execute in caller's context (blocking)
    DISCARD = "discard"    # Silently drop (for optional ops)
    QUEUE = "queue"        # Wait in bounded queue

TIER_POLICIES = {
    Tier.CRITICAL: RejectionPolicy.QUEUE,      # Wait for slot
    Tier.STANDARD: RejectionPolicy.CALLER_RUNS, # Degrade caller
    Tier.OPTIONAL: RejectionPolicy.DISCARD,     # Skip if busy
}

Graceful Degradation by Tier

async def run_analysis(content: str):
    results = {}

    # Tier 1: Must succeed
    results["core"] = await tier1_bulkhead.execute(
        lambda: analyze_core(content)
    )

    # Tier 2: Best effort
    try:
        results["enriched"] = await tier2_bulkhead.execute(
            lambda: enrich_analysis(content)
        )
    except BulkheadFullError:
        results["enriched"] = None  # Skip enrichment

    # Tier 3: Optional (silent failure)
    try:
        await tier3_bulkhead.execute(lambda: warm_cache(results))
    except (BulkheadFullError, BulkheadTimeoutError):
        pass

    return results

Best Practices

  1. Size based on downstream capacity -- if API allows 60 RPM, don't set 100 concurrent
  2. Monitor queue depth -- alert when consistently > 80% full
  3. Combine with circuit breaker -- slow calls trigger circuit, clearing bulkhead slots
  4. Use per-dependency bulkheads -- not per-endpoint (too granular)
  5. Return 503 with Retry-After -- when rejecting, don't return 500

Common Mistakes

  • Too many bulkheads (per-endpoint instead of per-dependency)
  • Ignoring rejection handling (BulkheadFullError becomes 500)
  • No correlation with circuit breaker (slots stay blocked on slow service)

Incorrect — No isolation means slow downstream service blocks all operations:

async def fetch_data():
    await slow_external_api()  # Takes 30s when degraded
# All requests wait, entire system becomes slow

Correct — Bulkhead isolates slow service, protecting critical operations:

@bulkhead(tier=Tier.OPTIONAL, max_concurrent=3, timeout=5)
async def fetch_data():
    await slow_external_api()
# Only 3 concurrent + waiting queue, rest get fast rejection

Prevent cascade failures with circuit breaker thresholds and recovery probe patterns — CRITICAL

Circuit Breaker Pattern

Prevents cascade failures by "tripping" when a downstream service exceeds failure thresholds.

State Machine

              failures >= threshold
    CLOSED --------------------------------> OPEN
       ^                                      |
       |                                      |
       | probe succeeds              timeout  |
       |                              expires |
       |         +-------------+              |
       +---------+  HALF_OPEN  |<-------------+
                 +-------------+
                       |
                       | probe fails
                       v
                     OPEN
  • CLOSED: All requests pass through, failures counted in sliding window
  • OPEN: All requests rejected immediately, return fallback
  • HALF_OPEN: Limited probe requests test recovery

Configuration

ParameterRecommendedDescription
failure_threshold5Failures before opening
success_threshold2Successes in half-open to close
recovery_timeout30sTime before half-open transition
sliding_window_size10Requests to consider for failure rate
slow_call_threshold5-30sCalls slower than this count as failures

Implementation

from collections import deque
from enum import Enum
from time import time

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, name: str, failure_threshold: int = 5,
                 recovery_timeout: float = 30.0):
        self.name = name
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time = None
        self._threshold = failure_threshold
        self._recovery_timeout = recovery_timeout

    async def call(self, fn, *args, **kwargs):
        if self._state == CircuitState.OPEN:
            if self._should_attempt_recovery():
                self._state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError(self.name)

        try:
            result = await fn(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= 2:
                self._state = CircuitState.CLOSED
                self._failure_count = 0

    def _on_failure(self):
        self._last_failure_time = time()
        if self._state == CircuitState.HALF_OPEN:
            self._state = CircuitState.OPEN
        else:
            self._failure_count += 1
            if self._failure_count >= self._threshold:
                self._state = CircuitState.OPEN

Best Practices

  1. Use sliding windows, not fixed counters -- one success should not reset everything
  2. Per-service breakers -- never use a single global breaker
  3. Always provide fallbacks -- cached data, default response, or partial results
  4. Separate health from circuit state -- /health always returns 200
  5. Include observability -- every state change = metric + log + alert on OPEN

Pattern Composition

# Retry INSIDE circuit breaker
@circuit_breaker(failure_threshold=5)
@retry(max_attempts=3, backoff=exponential)
async def call_service():
    ...

# Bulkhead + Circuit breaker
@circuit_breaker(service="analysis")
@bulkhead(tier=Tier.STANDARD, max_concurrent=3)
async def analyze():
    ...

Presets by Service Type

ServiceThresholdRecoverySlow Call
LLM API360s30s
External API530s10s
Database2-315s5s

Incorrect — No circuit breaker causes cascade failure when downstream is down:

async def call_payment_api():
    return await http.post("https://api.payment.com/charge")
    # Keeps trying even when API is down, causing 30s timeouts on every request
# Entire service becomes unresponsive

Correct — Circuit breaker trips on failures, returning fast failures and allowing recovery:

@circuit_breaker(failure_threshold=5, recovery_timeout=30)
async def call_payment_api():
    return await http.post("https://api.payment.com/charge")
# After 5 failures, circuit opens and returns fallback immediately

Implement exponential backoff with jitter to prevent thundering herd on retries — CRITICAL

Retry with Exponential Backoff

Backoff Formula

delay = min(base * 2^attempt, max_delay)
jitter = random(0, delay)   # Full jitter (recommended)
sleep(jitter)
AttemptBase DelayWith Full Jitter
11s0.0s - 1.0s
22s0.0s - 2.0s
34s0.0s - 4.0s
48s0.0s - 8.0s

Full jitter prevents thundering herd when many clients retry simultaneously.

Error Classification

RETRYABLE_ERRORS = {
    # HTTP Status Codes
    408, 429, 500, 502, 503, 504,
    # Python Exceptions
    ConnectionError, TimeoutError, ConnectionResetError,
    # LLM API Errors
    "rate_limit_exceeded", "model_overloaded", "server_error",
}

NON_RETRYABLE_ERRORS = {
    400, 401, 403, 404, 422,
    "invalid_api_key", "content_policy_violation",
    "invalid_request_error", "model_not_found",
}

Retry Decorator

import asyncio
import random
from functools import wraps

def retry(max_attempts=3, base_delay=1.0, max_delay=60.0, jitter=True):
    """Async retry with exponential backoff."""
    def decorator(fn):
        @wraps(fn)
        async def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return await fn(*args, **kwargs)
                except Exception as e:
                    if not is_retryable(e) or attempt == max_attempts:
                        raise
                    delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
                    if jitter:
                        delay = random.uniform(0, delay)
                    await asyncio.sleep(delay)
        return wrapper
    return decorator

Retry with Content Truncation (LLM)

async def retry_with_truncation(fn, content: str, max_attempts: int = 3):
    """Retry LLM call, truncating on context_length_exceeded."""
    for attempt in range(1, max_attempts + 1):
        try:
            return await fn(content)
        except ContextLengthExceededError:
            if attempt == max_attempts:
                raise
            content = content[:int(len(content) * 0.75)]

Retry Budget

Prevents retry storms by limiting total retries per time window.

class RetryBudget:
    def __init__(self, budget_per_second: float = 10.0):
        self.budget = budget_per_second
        self.last_update = time.time()

    def can_retry(self) -> bool:
        self._replenish()
        return self.budget >= 1.0

    def use_retry(self):
        if self.budget >= 1.0:
            self.budget -= 1.0

Presets

Use CaseMax AttemptsBase DelayMax Delay
User-facing API20.5s2s
Background job52.0s60s
LLM API call31.0s60s
Rate-limited API32.0s120s

Critical Rules

  1. Always use jitter -- prevents thundering herd
  2. Classify errors -- never retry 401/403/404
  3. Bound retries -- max 3-5 attempts, never infinite
  4. Retry inside circuit breaker -- circuit only sees final result
  5. Use Retry-After header -- respect server's backoff request
  6. Log all retries -- include trace ID for correlation

Incorrect — Fixed delay without jitter causes thundering herd when service recovers:

for attempt in range(3):
    try:
        return await api_call()
    except Exception:
        await asyncio.sleep(1)  # All clients retry at same time!
# 1000 clients = 1000 simultaneous retries

Correct — Exponential backoff with jitter spreads retries over time:

for attempt in range(3):
    try:
        return await api_call()
    except Exception:
        delay = min(2 ** attempt, 60)  # 1s, 2s, 4s...
        await asyncio.sleep(random.uniform(0, delay))  # Jitter spreads load

References (10)

Bulkhead Pattern

Bulkhead Pattern

Overview

The bulkhead pattern isolates failures by partitioning system resources into independent pools. Named after ship bulkheads that prevent flooding from spreading, it ensures one failing component doesn't bring down the entire system.

Types of Bulkheads

1. Thread Pool Isolation

Dedicated thread pools per service/operation.

┌────────────────────────────────────────────────────────────┐
│                    Thread Pool Bulkhead                     │
├────────────────────────────────────────────────────────────┤
│                                                             │
│   Service A Pool (5 threads)    Service B Pool (3 threads) │
│   ┌─┬─┬─┬─┬─┐                  ┌─┬─┬─┐                    │
│   │█│█│█│░│░│                  │█│░│░│                    │
│   └─┴─┴─┴─┴─┘                  └─┴─┴─┘                    │
│                                                             │
│   If Service A hangs, only 5 threads blocked               │
│   Service B continues with its own 3 threads               │
│                                                             │
└────────────────────────────────────────────────────────────┘

2. Semaphore Isolation

Limits concurrent executions without dedicated threads.

┌────────────────────────────────────────────────────────────┐
│                   Semaphore Bulkhead                        │
├────────────────────────────────────────────────────────────┤
│                                                             │
│   Semaphore: permits=5, current=3                          │
│                                                             │
│   Request 1: acquire() → ✓ (permits=2)                     │
│   Request 2: acquire() → ✓ (permits=1)                     │
│   Request 3: acquire() → ✓ (permits=0)                     │
│   Request 4: acquire() → BLOCKED (queue) or REJECTED       │
│                                                             │
│   Request 1: release() → ✓ (permits=1)                     │
│   Request 4: acquire() → ✓ (permits=0)                     │
│                                                             │
└────────────────────────────────────────────────────────────┘

Group operations by criticality.

┌────────────────────────────────────────────────────────────┐
│                    Tier-Based Bulkheads                     │
├────────────────────────────────────────────────────────────┤
│                                                             │
│   TIER 1: CRITICAL (50% resources)                         │
│   ├── Synthesis node                                        │
│   ├── Quality gate                                          │
│   └── User-facing responses                                 │
│                                                             │
│   TIER 2: STANDARD (35% resources)                         │
│   ├── Content analysis agents                               │
│   ├── Data processing                                       │
│   └── API integrations                                      │
│                                                             │
│   TIER 3: OPTIONAL (15% resources)                         │
│   ├── Enrichment                                            │
│   ├── Caching warmup                                        │
│   └── Analytics                                             │
│                                                             │
│   When Tier 3 exhausted → operations queued/dropped         │
│   Tier 1 & 2 continue unaffected                           │
│                                                             │
└────────────────────────────────────────────────────────────┘

Configuration for OrchestKit

Agent Tier Assignment

TierAgentsMax ConcurrentQueue SizeTimeout
1 (Critical)synthesis, quality_gate, supervisor510300s
2 (Standard)tech_comparator, implementation_planner, security_auditor, learning_synthesizer35120s
3 (Optional)enrichment, cache_warming, metrics2360s

Rejection Policies

class RejectionPolicy(Enum):
    ABORT = "abort"           # Return error immediately
    CALLER_RUNS = "caller"    # Execute in caller's context (blocking)
    DISCARD = "discard"       # Silently drop (for optional ops)
    QUEUE = "queue"           # Wait in bounded queue

# Per-tier policies
TIER_POLICIES = {
    1: RejectionPolicy.QUEUE,      # Critical: wait for slot
    2: RejectionPolicy.CALLER_RUNS, # Standard: degrade caller
    3: RejectionPolicy.DISCARD,     # Optional: skip if busy
}

Implementation Pattern (Python asyncio)

from asyncio import Semaphore, wait_for, TimeoutError
from collections import defaultdict
from enum import Enum
from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")

class Tier(Enum):
    CRITICAL = 1
    STANDARD = 2
    OPTIONAL = 3

class Bulkhead:
    def __init__(self, tier: Tier, max_concurrent: int, queue_size: int, timeout: float):
        self.tier = tier
        self.semaphore = Semaphore(max_concurrent)
        self.queue_size = queue_size
        self.timeout = timeout
        self.waiting = 0
        self.active = 0

    async def execute(self, fn: Callable[[], Awaitable[T]]) -> T:
        # Check queue
        if self.waiting >= self.queue_size:
            raise BulkheadFullError(f"Tier {self.tier.name} queue full")

        self.waiting += 1
        try:
            # Acquire with timeout
            await wait_for(self.semaphore.acquire(), timeout=self.timeout)
            self.waiting -= 1
            self.active += 1

            try:
                return await wait_for(fn(), timeout=self.timeout)
            finally:
                self.active -= 1
                self.semaphore.release()
        except TimeoutError:
            self.waiting -= 1
            raise BulkheadTimeoutError(f"Tier {self.tier.name} timeout")

Best Practices (2026)

1. Size Based on Downstream Capacity

# BAD: Arbitrary numbers
bulkhead = Bulkhead(max_concurrent=100)

# GOOD: Based on downstream limits
# If OpenAI allows 60 RPM, don't have 100 concurrent
bulkhead = Bulkhead(max_concurrent=10)  # 10 concurrent * 6s avg = 60 RPM

2. Monitor Queue Depth

async def execute_with_metrics(self, fn):
    # Metric: queue depth
    metrics.gauge("bulkhead.queue_depth", self.waiting, tags={"tier": self.tier.name})

    # Metric: active requests
    metrics.gauge("bulkhead.active", self.active, tags={"tier": self.tier.name})

    # Alert when queue consistently > 80% full
    if self.waiting > self.queue_size * 0.8:
        logger.warning(f"Bulkhead queue high", tier=self.tier.name, depth=self.waiting)

    return await self.execute(fn)

3. Graceful Degradation by Tier

async def run_analysis(content: str) -> Analysis:
    results = {}

    # Tier 1: Must succeed
    results["core"] = await tier1_bulkhead.execute(
        lambda: analyze_core(content)
    )

    # Tier 2: Best effort
    try:
        results["enriched"] = await tier2_bulkhead.execute(
            lambda: enrich_analysis(content)
        )
    except BulkheadFullError:
        results["enriched"] = None  # Skip enrichment

    # Tier 3: Optional
    try:
        await tier3_bulkhead.execute(
            lambda: warm_cache(results)
        )
    except (BulkheadFullError, BulkheadTimeoutError):
        pass  # Don't even log

    return Analysis(**results)

4. Dynamic Tier Adjustment

class AdaptiveBulkhead:
    """Adjusts tier capacity based on system load."""

    def adjust_for_load(self, cpu_percent: float, memory_percent: float):
        if cpu_percent > 80 or memory_percent > 85:
            # Reduce optional tier
            self.tiers[Tier.OPTIONAL].max_concurrent = 1
            self.tiers[Tier.STANDARD].max_concurrent = 2
        elif cpu_percent < 50 and memory_percent < 60:
            # Restore capacity
            self.tiers[Tier.OPTIONAL].max_concurrent = 2
            self.tiers[Tier.STANDARD].max_concurrent = 3

Anti-Patterns

1. Too Many Bulkheads

# BAD: Bulkhead per endpoint
bulkheads = {
    "/api/v1/users": Bulkhead(5),
    "/api/v1/users/{id}": Bulkhead(5),
    "/api/v1/users/{id}/profile": Bulkhead(5),
    # ... 50 more
}
# Result: Complexity nightmare, no real isolation

# GOOD: Bulkhead per tier/dependency
bulkheads = {
    "database_read": Bulkhead(10),
    "database_write": Bulkhead(3),
    "external_api": Bulkhead(5),
}

2. Ignoring Rejection Handling

# BAD: Exception bubbles up as 500
@app.post("/analyze")
async def analyze(content: str):
    return await bulkhead.execute(lambda: do_analysis(content))
    # BulkheadFullError → 500 Internal Server Error

# GOOD: Proper error handling
@app.post("/analyze")
async def analyze(content: str):
    try:
        return await bulkhead.execute(lambda: do_analysis(content))
    except BulkheadFullError:
        raise HTTPException(
            status_code=503,
            detail="Service busy, please retry",
            headers={"Retry-After": "30"}
        )

3. No Correlation with Circuit Breaker

# BAD: Bulkhead fills up, circuit never opens
# All slots blocked on slow service

# GOOD: Combine patterns
@circuit_breaker(failure_threshold=5)
@bulkhead(tier=Tier.STANDARD)
async def call_external_service():
    ...
# Slow calls → timeouts → circuit opens → bulkhead cleared

Monitoring Dashboard

┌────────────────────────────────────────────────────────────┐
│                  Bulkhead Status Dashboard                  │
├────────────────────────────────────────────────────────────┤
│                                                             │
│   TIER 1: CRITICAL         [████████░░] 8/10 active        │
│   Queue: 2/10              [██░░░░░░░░] 2/10 queued        │
│   Rejected (1h): 0         Timeouts (1h): 1                │
│                                                             │
│   TIER 2: STANDARD         [██████████] 3/3 active ⚠️      │
│   Queue: 5/5 FULL          [██████████] 5/5 queued ⚠️      │
│   Rejected (1h): 23        Timeouts (1h): 5                │
│                                                             │
│   TIER 3: OPTIONAL         [█░░░░░░░░░] 1/2 active         │
│   Queue: 0/3               [░░░░░░░░░░] 0/3 queued         │
│   Rejected (1h): 156       Timeouts (1h): 0                │
│                                                             │
└────────────────────────────────────────────────────────────┘

Circuit Breaker

Circuit Breaker Pattern

Overview

The circuit breaker pattern prevents cascade failures by "tripping" when a downstream service exceeds failure thresholds. Named after electrical circuit breakers, it protects your system from repeated failures.

States

CLOSED (Normal Operation)

  • All requests pass through
  • Failures are counted within a sliding window
  • Success resets failure count (or decrements in sliding window)
  • Transitions to OPEN when failures >= threshold

OPEN (Failing Fast)

  • All requests immediately rejected
  • Returns fallback response or error
  • No calls made to downstream service
  • After recovery_timeout, transitions to HALF_OPEN

HALF_OPEN (Recovery Probe)

  • Limited requests allowed (probe requests)
  • If probe succeeds → CLOSED
  • If probe fails → OPEN (reset recovery timer)

State Machine

              failures >= threshold
    CLOSED ──────────────────────────────▶ OPEN
       ▲                                      │
       │                                      │
       │ probe succeeds              timeout  │
       │                              expires │
       │         ┌─────────────┐             │
       └─────────│  HALF_OPEN  │◀────────────┘
                 └─────────────┘

                       │ probe fails

                     OPEN

Configuration Parameters

ParameterRecommendedDescription
failure_threshold5Failures before opening
success_threshold2Successes in half-open to close
recovery_timeout30sTime before half-open transition
sliding_window_size10Requests to consider for failure rate
sliding_window_typecount-basedcount-based or time-based (60s)
slow_call_threshold5sCalls slower than this count as failures
slow_call_rate50%Percentage of slow calls to trip

Best Practices (2026)

1. Use Sliding Windows, Not Fixed Counters

# BAD: Fixed counter resets on success
if failures >= 5:
    open_circuit()
if success:
    failures = 0  # One success resets everything!

# GOOD: Sliding window with time decay
window = deque(maxlen=10)
window.append(("fail", time.time()))
failure_rate = sum(1 for r, _ in window if r == "fail") / len(window)
if failure_rate >= 0.5:
    open_circuit()

2. Separate Health Checks from Circuit State

# Health endpoint should NOT check circuit state
@app.get("/health")
async def health():
    return {"status": "healthy"}  # Always returns 200

# Readiness endpoint CAN check circuit state
@app.get("/ready")
async def ready():
    if circuit.is_open:
        return {"status": "degraded", "reason": "circuit_open"}, 503
    return {"status": "ready"}

3. Include Observability

def on_state_change(from_state: str, to_state: str, service: str):
    # Metric
    metrics.increment(f"circuit_breaker.{service}.state_change",
                      tags={"from": from_state, "to": to_state})

    # Log
    logger.warning(f"Circuit breaker state change",
                   service=service, from_state=from_state, to_state=to_state)

    # Alert (only on OPEN)
    if to_state == "OPEN":
        alert_service.send(
            severity="warning",
            message=f"Circuit breaker opened for {service}",
            runbook="https://docs.internal/runbooks/circuit-breaker"
        )

4. Provide Meaningful Fallbacks

async def get_analysis_with_fallback(content: str) -> Analysis:
    try:
        return await circuit_breaker.call(analyze_content, content)
    except CircuitOpenError:
        # Fallback 1: Cached result
        cached = await cache.get(f"analysis:{hash(content)}")
        if cached:
            return Analysis.from_cache(cached, is_stale=True)

        # Fallback 2: Simplified analysis
        return Analysis(
            status="degraded",
            message="Full analysis unavailable, showing basic info",
            basic_info=extract_basic_info(content)
        )

5. Per-Service Breakers

# BAD: Single breaker for all services
global_breaker = CircuitBreaker()

# GOOD: Per-service breakers
breakers = {
    "openai": CircuitBreaker(failure_threshold=3, recovery_timeout=60),
    "anthropic": CircuitBreaker(failure_threshold=5, recovery_timeout=30),
    "youtube_api": CircuitBreaker(failure_threshold=10, recovery_timeout=120),
}

Anti-Patterns

1. Opening Too Quickly

# BAD: Opens on first failure
CircuitBreaker(failure_threshold=1)  # One blip = outage

# GOOD: Tolerates transient failures
CircuitBreaker(failure_threshold=5, sliding_window_size=10)

2. Recovery Timeout Too Short

# BAD: Hammers failing service
CircuitBreaker(recovery_timeout=5)  # Tries every 5 seconds

# GOOD: Gives service time to recover
CircuitBreaker(recovery_timeout=30)  # 30 seconds minimum

3. No Fallback

# BAD: Just throws error
async def call():
    if circuit.is_open:
        raise CircuitOpenError()  # User sees error page

# GOOD: Graceful degradation
async def call():
    if circuit.is_open:
        return await fallback_handler()  # User sees partial data

Integration with Other Patterns

Circuit Breaker + Retry

# Retry INSIDE circuit breaker
@circuit_breaker
@retry(max_attempts=3, backoff=exponential)
async def call_service():
    ...

# Circuit only sees final result after retries exhausted

Circuit Breaker + Bulkhead

# Bulkhead limits concurrency, circuit limits failures
@circuit_breaker(service="analysis")
@bulkhead(tier=2, max_concurrent=3)
async def analyze():
    ...

Circuit Breaker + Timeout

# Timeout INSIDE circuit breaker
@circuit_breaker
@timeout(seconds=30)
async def call_service():
    ...

# Timeout counts as failure toward circuit threshold

Monitoring Queries

Prometheus

# Circuit state changes per minute
rate(circuit_breaker_state_changes_total[5m])

# Percentage of time in OPEN state
avg_over_time(circuit_breaker_state{state="open"}[1h])

# Requests rejected due to open circuit
rate(circuit_breaker_rejected_total[5m])

Langfuse (LLM-specific)

# Tag traces with circuit state
trace.update(metadata={
    "circuit_state": circuit.state,
    "circuit_failure_count": circuit.failure_count,
})

Error Classification

Error Classification

Overview

Proper error classification is the foundation of resilience. Different errors require different handling strategies: retry, fallback, fail fast, or alert.

Error Classification Matrix

┌────────────────────────────────────────────────────────────────────┐
│                       Error Classification Matrix                   │
├────────────────────────────────────────────────────────────────────┤
│                                                                     │
│                    TRANSIENT                 PERMANENT              │
│              ┌─────────────────────┬─────────────────────┐         │
│              │                     │                     │         │
│   EXTERNAL   │  • Rate limits      │  • Invalid API key  │         │
│   (API/Net)  │  • Timeouts         │  • 403 Forbidden    │         │
│              │  • 502/503/504      │  • 404 Not Found    │         │
│              │  • Connection reset │  • 400 Bad Request  │         │
│              │                     │                     │         │
│              │  ACTION: Retry      │  ACTION: Fail Fast  │         │
│              │  with backoff       │  Log & Alert        │         │
│              │                     │                     │         │
│              ├─────────────────────┼─────────────────────┤         │
│              │                     │                     │         │
│   INTERNAL   │  • DB connection    │  • Schema error     │         │
│   (System)   │  • Memory pressure  │  • Logic bug        │         │
│              │  • Lock contention  │  • Missing config   │         │
│              │  • Resource exhaust │  • Invalid state    │         │
│              │                     │                     │         │
│              │  ACTION: Retry      │  ACTION: Fail Fast  │         │
│              │  Circuit breaker    │  Fix code, restart  │         │
│              │                     │                     │         │
│              └─────────────────────┴─────────────────────┘         │
│                                                                     │
└────────────────────────────────────────────────────────────────────┘

HTTP Status Code Classification

Retryable (Transient)

CodeNameStrategy
408Request TimeoutRetry immediately
429Too Many RequestsRetry with Retry-After header
500Internal Server ErrorRetry with backoff
502Bad GatewayRetry with backoff
503Service UnavailableRetry with Retry-After
504Gateway TimeoutRetry with backoff

Non-Retryable (Permanent)

CodeNameStrategy
400Bad RequestLog, fix input, fail
401UnauthorizedRefresh token or fail
403ForbiddenFail, alert
404Not FoundFail (resource doesn't exist)
405Method Not AllowedFail, fix code
409ConflictMay retry with merge logic
422Unprocessable EntityFail, fix input

LLM API Error Classification

OpenAI Errors

OPENAI_RETRYABLE = {
    "rate_limit_exceeded",     # Retry with backoff
    "server_error",            # Retry with backoff
    "timeout",                 # Retry immediately
    "overloaded",              # Retry with longer backoff
}

OPENAI_NON_RETRYABLE = {
    "invalid_api_key",         # Fix config
    "invalid_request_error",   # Fix request
    "context_length_exceeded", # Reduce input (special handling)
    "content_policy_violation",# Change content
    "insufficient_quota",      # Add credits
    "model_not_found",         # Fix model name
}

Anthropic Errors

ANTHROPIC_RETRYABLE = {
    "overloaded_error",        # Retry with backoff
    "api_error",               # Retry with backoff
    "rate_limit_error",        # Retry with Retry-After
}

ANTHROPIC_NON_RETRYABLE = {
    "authentication_error",    # Fix API key
    "permission_error",        # Check permissions
    "invalid_request_error",   # Fix request format
    "not_found_error",         # Fix resource reference
}

Exception Classification Helper

from enum import Enum
from typing import Union, Type
import httpx

class ErrorCategory(Enum):
    RETRYABLE = "retryable"           # Transient, retry with backoff
    NON_RETRYABLE = "non_retryable"   # Permanent, fail fast
    CIRCUIT_TRIP = "circuit_trip"     # Count toward circuit breaker
    ALERTABLE = "alertable"           # Should trigger alert
    DEGRADABLE = "degradable"         # Can fall back to alternative

class ErrorClassifier:
    """Classify errors for resilience handling."""

    RETRYABLE_STATUS_CODES = {408, 429, 500, 502, 503, 504}
    NON_RETRYABLE_STATUS_CODES = {400, 401, 403, 404, 405, 422}
    ALERTABLE_STATUS_CODES = {401, 403, 500}

    RETRYABLE_EXCEPTIONS = {
        ConnectionError,
        TimeoutError,
        ConnectionResetError,
        ConnectionRefusedError,
        BrokenPipeError,
        httpx.ConnectError,
        httpx.ConnectTimeout,
        httpx.ReadTimeout,
    }

    def classify(self, error: Exception) -> set[ErrorCategory]:
        """Return set of applicable error categories."""
        categories = set()

        # HTTP errors
        if hasattr(error, "status_code"):
            code = error.status_code
            if code in self.RETRYABLE_STATUS_CODES:
                categories.add(ErrorCategory.RETRYABLE)
            if code in self.NON_RETRYABLE_STATUS_CODES:
                categories.add(ErrorCategory.NON_RETRYABLE)
            if code in self.ALERTABLE_STATUS_CODES:
                categories.add(ErrorCategory.ALERTABLE)
            if code >= 500:
                categories.add(ErrorCategory.CIRCUIT_TRIP)

        # Exception types
        if type(error) in self.RETRYABLE_EXCEPTIONS:
            categories.add(ErrorCategory.RETRYABLE)
            categories.add(ErrorCategory.CIRCUIT_TRIP)

        # LLM-specific errors
        if hasattr(error, "code"):
            categories.update(self._classify_llm_error(error.code))

        # Default: non-retryable if nothing matched
        if not categories:
            categories.add(ErrorCategory.NON_RETRYABLE)
            categories.add(ErrorCategory.ALERTABLE)

        return categories

    def _classify_llm_error(self, error_code: str) -> set[ErrorCategory]:
        """Classify LLM API error codes."""
        categories = set()

        retryable_codes = {
            "rate_limit_exceeded", "server_error", "timeout",
            "overloaded", "overloaded_error", "api_error",
        }

        if error_code in retryable_codes:
            categories.add(ErrorCategory.RETRYABLE)
            categories.add(ErrorCategory.CIRCUIT_TRIP)
        else:
            categories.add(ErrorCategory.NON_RETRYABLE)

        if error_code in {"context_length_exceeded"}:
            categories.add(ErrorCategory.DEGRADABLE)

        return categories

    def should_retry(self, error: Exception) -> bool:
        """Quick check if error should be retried."""
        return ErrorCategory.RETRYABLE in self.classify(error)

    def should_trip_circuit(self, error: Exception) -> bool:
        """Check if error should count toward circuit breaker."""
        return ErrorCategory.CIRCUIT_TRIP in self.classify(error)

    def should_alert(self, error: Exception) -> bool:
        """Check if error should trigger an alert."""
        return ErrorCategory.ALERTABLE in self.classify(error)

Usage in Resilience Patterns

With Retry

classifier = ErrorClassifier()

async def call_with_smart_retry(fn, *args, **kwargs):
    for attempt in range(max_attempts):
        try:
            return await fn(*args, **kwargs)
        except Exception as e:
            categories = classifier.classify(e)

            if ErrorCategory.NON_RETRYABLE in categories:
                logger.error(f"Non-retryable error: {e}")
                raise

            if ErrorCategory.ALERTABLE in categories:
                await alert_service.send(
                    severity="warning",
                    message=f"Retryable error in {fn.__name__}",
                    error=str(e),
                )

            if attempt < max_attempts - 1:
                await asyncio.sleep(backoff(attempt))

    raise MaxRetriesExceeded()

With Circuit Breaker

class SmartCircuitBreaker:
    def __init__(self, classifier: ErrorClassifier):
        self.classifier = classifier
        self.failure_count = 0

    async def call(self, fn, *args, **kwargs):
        try:
            result = await fn(*args, **kwargs)
            self.failure_count = 0
            return result
        except Exception as e:
            if self.classifier.should_trip_circuit(e):
                self.failure_count += 1
                if self.failure_count >= self.threshold:
                    self.open_circuit()
            raise

With Fallback

async def call_with_fallback(primary_fn, fallback_fn, *args):
    try:
        return await primary_fn(*args)
    except Exception as e:
        categories = classifier.classify(e)

        if ErrorCategory.DEGRADABLE in categories:
            logger.info(f"Degrading to fallback: {e}")
            return await fallback_fn(*args)

        raise

Error Context Enrichment

class EnrichedError(Exception):
    """Exception with classification and context."""

    def __init__(
        self,
        original: Exception,
        classifier: ErrorClassifier,
        context: dict = None,
    ):
        self.original = original
        self.categories = classifier.classify(original)
        self.context = context or {}
        self.timestamp = datetime.now(UTC)
        self.trace_id = get_current_trace_id()

        super().__init__(str(original))

    @property
    def is_retryable(self) -> bool:
        return ErrorCategory.RETRYABLE in self.categories

    @property
    def should_alert(self) -> bool:
        return ErrorCategory.ALERTABLE in self.categories

    def to_dict(self) -> dict:
        return {
            "error": str(self.original),
            "type": type(self.original).__name__,
            "categories": [c.value for c in self.categories],
            "context": self.context,
            "timestamp": self.timestamp.isoformat(),
            "trace_id": self.trace_id,
        }

Best Practices

  1. Default to non-retryable: Unknown errors should fail fast
  2. Log all classifications: Helps tune classification rules
  3. Include context: Error classification without context is useless
  4. Review regularly: New error types emerge, update rules
  5. Test classification: Unit test your classification logic

Llm Resilience

LLM-Specific Resilience Patterns

Overview

LLM APIs have unique failure modes that require specialized resilience patterns. This guide covers fallback chains, token budget management, rate limiting, and cost optimization through resilience.

Unique LLM Failure Modes

┌────────────────────────────────────────────────────────────┐
│                    LLM Failure Taxonomy                     │
├────────────────────────────────────────────────────────────┤
│                                                             │
│   TRANSIENT (Retry)          PERMANENT (Fail Fast)         │
│   ─────────────────          ──────────────────────         │
│   • rate_limit_exceeded      • invalid_api_key             │
│   • model_overloaded         • content_policy_violation    │
│   • server_error             • invalid_request_error       │
│   • timeout                  • insufficient_quota          │
│   • context_length_exceeded* • model_not_found             │
│                                                             │
│   * Can retry with truncation                              │
│                                                             │
│   DEGRADABLE (Fallback)      COSTLY (Budget Control)       │
│   ─────────────────────      ────────────────────────       │
│   • Primary model down       • Large context = high cost   │
│   • Quality below threshold  • Streaming = token overhead  │
│   • Latency too high         • Retries multiply cost       │
│                                                             │
└────────────────────────────────────────────────────────────┘

Pattern 1: Fallback Chain

┌────────────────────────────────────────────────────────────┐
│                    LLM Fallback Chain                       │
├────────────────────────────────────────────────────────────┤
│                                                             │
│   Request ─▶ [Primary Model] ──success──▶ Response         │
│                    │                                        │
│                  fail (timeout, rate limit, error)          │
│                    ▼                                        │
│              [Fallback Model] ──success──▶ Response        │
│                    │                                        │
│                  fail                                       │
│                    ▼                                        │
│              [Semantic Cache] ──hit──▶ Response            │
│                    │                                        │
│                  miss                                       │
│                    ▼                                        │
│              [Default Response] ──▶ Graceful Degradation   │
│                                                             │
└────────────────────────────────────────────────────────────┘

Implementation

from dataclasses import dataclass
from typing import Optional, List, Callable, Awaitable

@dataclass
class LLMConfig:
    name: str
    model: str
    api_key: str
    timeout: float = 30.0
    max_tokens: int = 4096
    temperature: float = 0.7

class FallbackChain:
    def __init__(
        self,
        primary: LLMConfig,
        fallbacks: List[LLMConfig],
        cache: Optional[SemanticCache] = None,
        default_response: Optional[Callable[[str], str]] = None,
    ):
        self.primary = primary
        self.fallbacks = fallbacks
        self.cache = cache
        self.default_response = default_response

    async def complete(self, prompt: str, **kwargs) -> LLMResponse:
        # Try primary
        try:
            return await self._call_model(self.primary, prompt, **kwargs)
        except RetryableError as e:
            logger.warning(f"Primary model failed: {e}")

        # Try fallbacks
        for fallback in self.fallbacks:
            try:
                response = await self._call_model(fallback, prompt, **kwargs)
                response.is_fallback = True
                return response
            except RetryableError as e:
                logger.warning(f"Fallback {fallback.name} failed: {e}")

        # Try cache
        if self.cache:
            cached = await self.cache.get_similar(prompt, threshold=0.85)
            if cached:
                logger.info("Returning cached response")
                return LLMResponse(
                    content=cached.content,
                    is_cached=True,
                    cache_similarity=cached.similarity,
                )

        # Default response
        if self.default_response:
            return LLMResponse(
                content=self.default_response(prompt),
                is_degraded=True,
            )

        raise AllModelsFailedError("All LLM options exhausted")
Use CasePrimaryFallback 1Fallback 2Notes
AnalysisClaude SonnetGPT-5.2-miniCacheQuality-first
ChatGPT-5.2Claude HaikuDefault msgLatency-first
Embeddingtext-embedding-3-largetext-embedding-3-small-Dimension compat
Code GenClaude SonnetGPT-5.2-Quality-first

Pattern 2: Token Budget Management

┌────────────────────────────────────────────────────────────┐
│                     Token Budget Guard                      │
├────────────────────────────────────────────────────────────┤
│                                                             │
│   Context Window: 128K tokens                              │
│   ┌────────────────────────────────────────────────────┐   │
│   │████████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░│   │
│   └────────────────────────────────────────────────────┘   │
│   Used: 32K (25%)         Available: 96K                   │
│                                                             │
│   Budget Allocation:                                        │
│   ├── System prompt:     2K  (fixed)                       │
│   ├── Conversation:     20K  (sliding window)              │
│   ├── Retrieved docs:    8K  (chunked)                     │
│   ├── Output reserve:    2K  (for response)                │
│   └── Safety margin:     5K  (overflow buffer)             │
│                                                             │
│   When approaching limit:                                   │
│   1. Summarize conversation history (4:1 compression)      │
│   2. Reduce retrieved chunks                               │
│   3. Truncate oldest messages                              │
│   4. Fail with "context too large" error                   │
│                                                             │
└────────────────────────────────────────────────────────────┘

Implementation

import tiktoken
from dataclasses import dataclass
from typing import List

@dataclass
class BudgetAllocation:
    system_prompt: int = 2000
    conversation: int = 20000
    retrieved_docs: int = 8000
    output_reserve: int = 2000
    safety_margin: int = 5000

    @property
    def total_budget(self) -> int:
        return (
            self.system_prompt +
            self.conversation +
            self.retrieved_docs +
            self.output_reserve +
            self.safety_margin
        )

class TokenBudgetGuard:
    def __init__(
        self,
        model: str,
        context_limit: int,
        allocation: BudgetAllocation = None,
    ):
        self.encoding = tiktoken.encoding_for_model(model)
        self.context_limit = context_limit
        self.allocation = allocation or BudgetAllocation()

    def count_tokens(self, text: str) -> int:
        return len(self.encoding.encode(text))

    def fit_to_budget(
        self,
        system_prompt: str,
        messages: List[dict],
        retrieved_docs: List[str],
    ) -> tuple[str, List[dict], List[str]]:
        """Fit content to token budget, compressing as needed."""

        # Count fixed costs
        system_tokens = self.count_tokens(system_prompt)
        if system_tokens > self.allocation.system_prompt:
            raise TokenBudgetError("System prompt exceeds budget")

        # Fit messages with sliding window
        fitted_messages = self._fit_messages(
            messages,
            self.allocation.conversation
        )

        # Fit retrieved docs
        fitted_docs = self._fit_docs(
            retrieved_docs,
            self.allocation.retrieved_docs
        )

        return system_prompt, fitted_messages, fitted_docs

    def _fit_messages(self, messages: List[dict], budget: int) -> List[dict]:
        """Keep most recent messages that fit in budget."""
        fitted = []
        used = 0

        # Always keep system message if present
        for msg in reversed(messages):
            tokens = self.count_tokens(msg["content"])
            if used + tokens <= budget:
                fitted.insert(0, msg)
                used += tokens
            elif msg["role"] == "system":
                # Summarize old messages
                summary = self._summarize_old_messages(messages[:-len(fitted)])
                fitted.insert(0, {"role": "system", "content": summary})
                break

        return fitted

    def _fit_docs(self, docs: List[str], budget: int) -> List[str]:
        """Keep highest-scoring docs that fit in budget."""
        fitted = []
        used = 0

        for doc in docs:  # Assume already sorted by relevance
            tokens = self.count_tokens(doc)
            if used + tokens <= budget:
                fitted.append(doc)
                used += tokens
            else:
                break

        return fitted

Pattern 3: Rate Limit Management

from asyncio import Semaphore, sleep
from collections import deque
from time import time

class RateLimiter:
    """Token bucket rate limiter for LLM APIs."""

    def __init__(
        self,
        requests_per_minute: int = 60,
        tokens_per_minute: int = 100000,
    ):
        self.rpm_limit = requests_per_minute
        self.tpm_limit = tokens_per_minute
        self.request_times = deque(maxlen=rpm_limit)
        self.token_counts = deque(maxlen=1000)
        self.semaphore = Semaphore(rpm_limit)

    async def acquire(self, estimated_tokens: int):
        """Wait until rate limit allows the request."""
        async with self.semaphore:
            now = time()

            # Check RPM
            while len(self.request_times) >= self.rpm_limit:
                oldest = self.request_times[0]
                wait_time = 60 - (now - oldest)
                if wait_time > 0:
                    await sleep(wait_time)
                    now = time()
                self.request_times.popleft()

            # Check TPM
            recent_tokens = sum(
                t for t, ts in self.token_counts
                if now - ts < 60
            )
            if recent_tokens + estimated_tokens > self.tpm_limit:
                wait_time = 60 - (now - self.token_counts[0][1])
                await sleep(wait_time)

            # Record this request
            self.request_times.append(now)

    def record_usage(self, actual_tokens: int):
        """Record actual token usage after request completes."""
        self.token_counts.append((actual_tokens, time()))

Pattern 4: Cost Control Circuit Breaker

class CostCircuitBreaker:
    """Opens when LLM costs exceed budget."""

    def __init__(
        self,
        hourly_budget: float = 10.0,  # $10/hour
        daily_budget: float = 100.0,   # $100/day
        alert_threshold: float = 0.8,  # Alert at 80%
    ):
        self.hourly_budget = hourly_budget
        self.daily_budget = daily_budget
        self.alert_threshold = alert_threshold
        self.hourly_spend = 0.0
        self.daily_spend = 0.0
        self.last_hour_reset = time()
        self.last_day_reset = time()

    def record_cost(self, input_tokens: int, output_tokens: int, model: str):
        """Record cost and check budget."""
        self._reset_if_needed()

        cost = self._calculate_cost(input_tokens, output_tokens, model)
        self.hourly_spend += cost
        self.daily_spend += cost

        # Alert at threshold
        if self.hourly_spend > self.hourly_budget * self.alert_threshold:
            logger.warning(
                f"Hourly LLM budget at {self.hourly_spend/self.hourly_budget:.0%}"
            )

        # Trip circuit at limit
        if self.hourly_spend >= self.hourly_budget:
            raise CostBudgetExceeded("Hourly LLM budget exceeded")

        if self.daily_spend >= self.daily_budget:
            raise CostBudgetExceeded("Daily LLM budget exceeded")

    def _calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        """Calculate cost based on model pricing (Dec 2025)."""
        PRICING = {
            "claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
            "gpt-5.2": {"input": 2.5, "output": 10.0},
            "gpt-5.2-mini": {"input": 0.15, "output": 0.60},
            "claude-haiku-4-5-20251101": {"input": 0.80, "output": 4.0},
        }

        prices = PRICING.get(model, {"input": 1.0, "output": 3.0})
        return (
            (input_tokens / 1_000_000) * prices["input"] +
            (output_tokens / 1_000_000) * prices["output"]
        )

Pattern 5: Quality-Aware Fallback

class QualityAwareFallback:
    """Falls back when response quality is below threshold."""

    def __init__(
        self,
        primary_chain: FallbackChain,
        quality_evaluator: Callable[[str, str], float],
        quality_threshold: float = 0.7,
        max_retries: int = 2,
    ):
        self.chain = primary_chain
        self.evaluate = quality_evaluator
        self.threshold = quality_threshold
        self.max_retries = max_retries

    async def complete(self, prompt: str, **kwargs) -> LLMResponse:
        for attempt in range(self.max_retries + 1):
            response = await self.chain.complete(prompt, **kwargs)

            # Evaluate quality
            quality_score = await self.evaluate(prompt, response.content)

            if quality_score >= self.threshold:
                response.quality_score = quality_score
                return response

            logger.warning(
                f"Response quality {quality_score:.2f} below threshold",
                attempt=attempt + 1,
            )

            # Try with different parameters on retry
            if attempt < self.max_retries:
                kwargs["temperature"] = max(0.3, kwargs.get("temperature", 0.7) - 0.2)

        # Return best effort with warning
        response.quality_warning = f"Below threshold: {quality_score:.2f}"
        return response

Best Practices (2026)

  1. Always have a fallback: Even a cached or default response is better than an error
  2. Monitor costs per-request: Track token usage in traces (Langfuse)
  3. Use streaming for long responses: Better UX and partial results on failure
  4. Cache aggressively: Semantic cache with 0.85+ similarity saves 60-80% costs
  5. Set appropriate timeouts: 30s for completion, 5s for embeddings
  6. Log all fallback events: Critical for understanding system behavior

OrchestKit Integration

# Example integration for OrchestKit analysis pipeline

llm_chain = FallbackChain(
    primary=LLMConfig(
        name="primary",
        model="claude-sonnet-4-6",
        timeout=30.0,
    ),
    fallbacks=[
        LLMConfig(
            name="fallback",
            model="gpt-5.2-mini",
            timeout=20.0,
        ),
    ],
    cache=semantic_cache,  # Redis-backed
    default_response=lambda p: "Analysis temporarily unavailable",
)

budget_guard = TokenBudgetGuard(
    model="claude-sonnet-4-6",
    context_limit=200000,
    allocation=BudgetAllocation(
        system_prompt=3000,
        conversation=10000,
        retrieved_docs=15000,  # RAG context
        output_reserve=4000,
        safety_margin=5000,
    ),
)

rate_limiter = RateLimiter(
    requests_per_minute=50,  # Leave headroom
    tokens_per_minute=80000,
)

Postgres Advisory Locks

PostgreSQL Advisory Locks

Why PostgreSQL Advisory Locks?

  • No extra infrastructure - uses existing PostgreSQL
  • ACID guarantees - integrated with transactions
  • Two modes - session-level and transaction-level
  • PostgreSQL 18 - enhanced performance and monitoring

Lock Types

TypeScopeReleaseUse Case
Session-levelConnectionExplicit or disconnectLong-running jobs
Transaction-levelTransactionCommit/rollbackData consistency

Session-Level Locks

from contextlib import asynccontextmanager
from typing import AsyncGenerator

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession


class PostgresAdvisoryLock:
    """PostgreSQL advisory lock (session-level).

    Lock persists until explicitly released or connection closes.
    Good for: background jobs, cron tasks, singleton processes.
    """

    def __init__(self, session: AsyncSession, lock_id: int):
        self._session = session
        self._lock_id = lock_id
        self._acquired = False

    async def acquire(self, blocking: bool = True) -> bool:
        """Acquire advisory lock.

        Args:
            blocking: If True, wait for lock. If False, return immediately.
        """
        if blocking:
            # pg_advisory_lock blocks until acquired
            await self._session.execute(
                text("SELECT pg_advisory_lock(:lock_id)"),
                {"lock_id": self._lock_id},
            )
            self._acquired = True
            return True
        else:
            # pg_try_advisory_lock returns immediately
            result = await self._session.execute(
                text("SELECT pg_try_advisory_lock(:lock_id)"),
                {"lock_id": self._lock_id},
            )
            self._acquired = result.scalar()
            return self._acquired

    async def release(self) -> bool:
        """Release advisory lock."""
        if not self._acquired:
            return False

        result = await self._session.execute(
            text("SELECT pg_advisory_unlock(:lock_id)"),
            {"lock_id": self._lock_id},
        )
        released = result.scalar()
        if released:
            self._acquired = False
        return released

    @property
    def is_acquired(self) -> bool:
        return self._acquired

    async def __aenter__(self) -> "PostgresAdvisoryLock":
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.release()


@asynccontextmanager
async def advisory_lock(
    session: AsyncSession,
    lock_id: int,
    blocking: bool = True,
) -> AsyncGenerator[bool, None]:
    """Context manager for advisory locks."""
    lock = PostgresAdvisoryLock(session, lock_id)
    acquired = await lock.acquire(blocking=blocking)

    try:
        yield acquired
    finally:
        if acquired:
            await lock.release()

Transaction-Level Locks

class PostgresTransactionLock:
    """PostgreSQL advisory lock (transaction-level).

    Lock automatically released on commit/rollback.
    Good for: ensuring data consistency within a transaction.
    """

    def __init__(self, session: AsyncSession, lock_id: int):
        self._session = session
        self._lock_id = lock_id

    async def acquire(self, blocking: bool = True) -> bool:
        """Acquire transaction-scoped lock."""
        if blocking:
            await self._session.execute(
                text("SELECT pg_advisory_xact_lock(:lock_id)"),
                {"lock_id": self._lock_id},
            )
            return True
        else:
            result = await self._session.execute(
                text("SELECT pg_try_advisory_xact_lock(:lock_id)"),
                {"lock_id": self._lock_id},
            )
            return result.scalar()

    # No release method - automatically released on transaction end

Lock ID Strategies

import hashlib


def string_to_lock_id(name: str) -> int:
    """Convert string to PostgreSQL lock ID (bigint).

    PostgreSQL advisory locks use bigint IDs.
    This converts any string to a consistent ID.
    """
    # Use MD5 hash, take first 8 bytes as signed int64
    hash_bytes = hashlib.md5(name.encode()).digest()[:8]
    return int.from_bytes(hash_bytes, byteorder="big", signed=True)


def composite_lock_id(namespace: int, resource_id: int) -> tuple[int, int]:
    """Create two-key lock ID.

    PostgreSQL supports advisory locks with two int4 keys.
    Useful for namespacing locks.
    """
    return (namespace, resource_id)


# Usage
NAMESPACE_PAYMENT = 1
NAMESPACE_INVENTORY = 2

# Single key lock
lock_id = string_to_lock_id("payment:order-123")
await session.execute(
    text("SELECT pg_advisory_lock(:id)"),
    {"id": lock_id},
)

# Two-key lock
await session.execute(
    text("SELECT pg_advisory_lock(:ns, :id)"),
    {"ns": NAMESPACE_PAYMENT, "id": 12345},
)

Practical Examples

Singleton Job

async def run_scheduled_job(session: AsyncSession):
    """Ensure only one instance runs this job."""
    lock_id = string_to_lock_id("daily-report-job")

    async with advisory_lock(session, lock_id, blocking=False) as acquired:
        if not acquired:
            print("Job already running on another instance")
            return

        print("Running daily report...")
        await generate_daily_report()
        print("Daily report complete")

Transactional Update

async def transfer_funds(
    session: AsyncSession,
    from_account: int,
    to_account: int,
    amount: Decimal,
):
    """Transfer funds with advisory lock for consistency."""
    # Lock both accounts (always in same order to prevent deadlock)
    accounts = sorted([from_account, to_account])

    # Transaction-level locks
    for account_id in accounts:
        await session.execute(
            text("SELECT pg_advisory_xact_lock(:ns, :id)"),
            {"ns": NAMESPACE_ACCOUNT, "id": account_id},
        )

    # Perform transfer (locks auto-release on commit)
    await debit_account(session, from_account, amount)
    await credit_account(session, to_account, amount)

    await session.commit()

PostgreSQL 18 Lock Monitoring

-- View current advisory locks
SELECT
    pid,
    locktype,
    classid,
    objid,
    mode,
    granted
FROM pg_locks
WHERE locktype = 'advisory';

-- View locks with session info (PostgreSQL 18)
SELECT
    l.pid,
    l.objid as lock_id,
    l.granted,
    a.application_name,
    a.client_addr,
    a.state,
    now() - a.state_change as duration
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.locktype = 'advisory';

Redis Locks

Redis Distributed Locks

Single-Node Redis Lock (Lua Script)

import asyncio
from datetime import timedelta
from uuid import UUID

from uuid_utils import uuid7
import redis.asyncio as redis


class RedisLock:
    """Distributed lock using Redis with Lua scripts.

    Features:
    - Atomic acquire/release with Lua scripts
    - Automatic expiration (prevents deadlocks)
    - Owner validation (only owner can release)
    - Extension support (heartbeat)
    """

    # Lua script for atomic acquire
    ACQUIRE_SCRIPT = """
    if redis.call('exists', KEYS[1]) == 0 then
        redis.call('hset', KEYS[1], 'owner', ARGV[1], 'count', 1)
        redis.call('pexpire', KEYS[1], ARGV[2])
        return 1
    elseif redis.call('hget', KEYS[1], 'owner') == ARGV[1] then
        redis.call('hincrby', KEYS[1], 'count', 1)
        redis.call('pexpire', KEYS[1], ARGV[2])
        return 1
    end
    return 0
    """

    # Lua script for atomic release
    RELEASE_SCRIPT = """
    if redis.call('hget', KEYS[1], 'owner') == ARGV[1] then
        local count = redis.call('hincrby', KEYS[1], 'count', -1)
        if count <= 0 then
            redis.call('del', KEYS[1])
            return 1
        end
        return 1
    end
    return 0
    """

    # Lua script for extending TTL
    EXTEND_SCRIPT = """
    if redis.call('hget', KEYS[1], 'owner') == ARGV[1] then
        redis.call('pexpire', KEYS[1], ARGV[2])
        return 1
    end
    return 0
    """

    def __init__(
        self,
        client: redis.Redis,
        name: str,
        ttl: timedelta = timedelta(seconds=30),
    ):
        self._client = client
        self._name = f"lock:{name}"
        self._ttl_ms = int(ttl.total_seconds() * 1000)
        self._owner_id = str(uuid7())
        self._acquired = False

    async def acquire(self, timeout: timedelta | None = None) -> bool:
        """Acquire lock with optional timeout."""
        deadline = (
            asyncio.get_event_loop().time() + timeout.total_seconds()
            if timeout
            else None
        )

        while True:
            result = await self._client.eval(
                self.ACQUIRE_SCRIPT,
                1,
                self._name,
                self._owner_id,
                self._ttl_ms,
            )

            if result == 1:
                self._acquired = True
                return True

            if deadline and asyncio.get_event_loop().time() >= deadline:
                return False

            # Exponential backoff
            await asyncio.sleep(0.05)

    async def release(self) -> bool:
        """Release lock (only if owner)."""
        if not self._acquired:
            return False

        result = await self._client.eval(
            self.RELEASE_SCRIPT,
            1,
            self._name,
            self._owner_id,
        )

        if result == 1:
            self._acquired = False
            return True
        return False

    async def extend(self, ttl: timedelta | None = None) -> bool:
        """Extend lock TTL (heartbeat)."""
        if not self._acquired:
            return False

        ttl_ms = int((ttl or timedelta(seconds=30)).total_seconds() * 1000)
        result = await self._client.eval(
            self.EXTEND_SCRIPT,
            1,
            self._name,
            self._owner_id,
            ttl_ms,
        )
        return result == 1

    @property
    def is_acquired(self) -> bool:
        return self._acquired

    async def __aenter__(self) -> "RedisLock":
        acquired = await self.acquire(timeout=timedelta(seconds=10))
        if not acquired:
            raise LockAcquisitionError(f"Failed to acquire lock: {self._name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.release()


class LockAcquisitionError(Exception):
    """Raised when lock cannot be acquired."""
    pass

Usage Examples

# Basic usage with context manager
async def process_payment(order_id: UUID, redis_client: redis.Redis):
    lock = RedisLock(redis_client, f"payment:{order_id}")

    async with lock:
        # Only one instance processes this order
        order = await get_order(order_id)
        await charge_payment(order)
        await mark_order_paid(order_id)


# Manual acquire/release with timeout
async def try_process_batch(batch_id: str, redis_client: redis.Redis):
    lock = RedisLock(redis_client, f"batch:{batch_id}", ttl=timedelta(minutes=5))

    if await lock.acquire(timeout=timedelta(seconds=5)):
        try:
            await process_batch(batch_id)
        finally:
            await lock.release()
    else:
        print(f"Batch {batch_id} already being processed")


# Long-running task with heartbeat
async def long_running_task(task_id: str, redis_client: redis.Redis):
    lock = RedisLock(redis_client, f"task:{task_id}", ttl=timedelta(seconds=30))

    async with lock:
        # Start heartbeat in background
        async def heartbeat():
            while lock.is_acquired:
                await lock.extend(timedelta(seconds=30))
                await asyncio.sleep(10)

        heartbeat_task = asyncio.create_task(heartbeat())

        try:
            await do_long_work()
        finally:
            heartbeat_task.cancel()

Lock with Retry Decorator

from functools import wraps
from typing import Callable, ParamSpec, TypeVar

P = ParamSpec("P")
R = TypeVar("R")


def with_lock(
    lock_name: str,
    ttl: timedelta = timedelta(seconds=30),
    timeout: timedelta = timedelta(seconds=10),
    retries: int = 3,
):
    """Decorator to acquire lock before function execution."""

    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            redis_client = kwargs.get("redis") or args[0]  # Adjust as needed

            for attempt in range(retries):
                lock = RedisLock(redis_client, lock_name, ttl=ttl)

                if await lock.acquire(timeout=timeout):
                    try:
                        return await func(*args, **kwargs)
                    finally:
                        await lock.release()

                await asyncio.sleep(0.1 * (2 ** attempt))  # Exponential backoff

            raise LockAcquisitionError(
                f"Failed to acquire lock after {retries} attempts"
            )

        return wrapper
    return decorator


# Usage
@with_lock("payment-processor", ttl=timedelta(seconds=60))
async def process_payment(redis: redis.Redis, order_id: UUID):
    ...

Redlock Algorithm

Redlock Algorithm (Multi-Node Redis)

Why Redlock?

Single Redis instance = single point of failure. Redlock provides:

  • Distributed consensus across N Redis nodes
  • Fault tolerance (works with N/2+1 nodes)
  • Safety guarantees (mutual exclusion)

Algorithm Overview

┌─────────────────────────────────────────────────────────────┐
│                    Redlock Algorithm                         │
├─────────────────────────────────────────────────────────────┤
│  1. Get current time (T1)                                   │
│  2. Try to acquire lock on N nodes sequentially             │
│  3. Get current time (T2)                                   │
│  4. Calculate elapsed = T2 - T1                             │
│  5. Lock valid if:                                          │
│     - Acquired on majority (N/2 + 1) nodes                  │
│     - Elapsed < TTL - clock_drift                           │
│  6. If valid: use lock with remaining TTL                   │
│  7. If invalid: release on all nodes                        │
└─────────────────────────────────────────────────────────────┘

Implementation

import asyncio
from dataclasses import dataclass, field
from datetime import timedelta
import time

from uuid_utils import uuid7
import redis.asyncio as redis


@dataclass
class RedlockConfig:
    """Redlock configuration."""

    ttl: timedelta = timedelta(seconds=30)
    retry_count: int = 3
    retry_delay: timedelta = timedelta(milliseconds=200)
    clock_drift_factor: float = 0.01  # 1% of TTL


@dataclass
class RedlockResult:
    """Result of lock acquisition attempt."""

    acquired: bool
    validity_time_ms: int = 0
    resource: str = ""
    owner_id: str = ""


class Redlock:
    """Distributed lock across multiple Redis instances.

    Implements the Redlock algorithm for fault-tolerant locking.
    Requires N Redis instances (recommend N=5 for production).
    """

    ACQUIRE_SCRIPT = """
    if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then
        return 1
    end
    return 0
    """

    RELEASE_SCRIPT = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    end
    return 0
    """

    def __init__(
        self,
        clients: list[redis.Redis],
        config: RedlockConfig | None = None,
    ):
        if len(clients) < 3:
            raise ValueError("Redlock requires at least 3 Redis instances")

        self._clients = clients
        self._config = config or RedlockConfig()
        self._quorum = len(clients) // 2 + 1

    async def acquire(self, resource: str) -> RedlockResult:
        """Acquire lock on resource across all nodes."""
        owner_id = str(uuid7())
        ttl_ms = int(self._config.ttl.total_seconds() * 1000)

        for attempt in range(self._config.retry_count):
            start_time = time.monotonic()

            # Try to acquire on all nodes
            acquired_count = 0
            for client in self._clients:
                try:
                    result = await asyncio.wait_for(
                        client.eval(
                            self.ACQUIRE_SCRIPT,
                            1,
                            f"lock:{resource}",
                            owner_id,
                            ttl_ms,
                        ),
                        timeout=0.1,  # Fast timeout per node
                    )
                    if result == 1:
                        acquired_count += 1
                except (asyncio.TimeoutError, redis.RedisError):
                    continue

            # Calculate elapsed time
            elapsed_ms = int((time.monotonic() - start_time) * 1000)

            # Calculate validity time with clock drift
            drift_ms = int(ttl_ms * self._config.clock_drift_factor) + 2
            validity_time_ms = ttl_ms - elapsed_ms - drift_ms

            # Check if we acquired quorum with enough validity time
            if acquired_count >= self._quorum and validity_time_ms > 0:
                return RedlockResult(
                    acquired=True,
                    validity_time_ms=validity_time_ms,
                    resource=resource,
                    owner_id=owner_id,
                )

            # Failed - release on all nodes
            await self._release_all(resource, owner_id)

            # Retry with delay + jitter
            if attempt < self._config.retry_count - 1:
                delay = self._config.retry_delay.total_seconds()
                jitter = delay * 0.1 * (0.5 - asyncio.get_event_loop().time() % 1)
                await asyncio.sleep(delay + jitter)

        return RedlockResult(acquired=False, resource=resource)

    async def release(self, result: RedlockResult) -> bool:
        """Release lock on all nodes."""
        if not result.acquired:
            return False
        return await self._release_all(result.resource, result.owner_id)

    async def _release_all(self, resource: str, owner_id: str) -> bool:
        """Release lock on all Redis nodes."""
        released_count = 0

        for client in self._clients:
            try:
                result = await asyncio.wait_for(
                    client.eval(
                        self.RELEASE_SCRIPT,
                        1,
                        f"lock:{resource}",
                        owner_id,
                    ),
                    timeout=0.1,
                )
                if result == 1:
                    released_count += 1
            except (asyncio.TimeoutError, redis.RedisError):
                continue

        return released_count > 0

    async def __aenter__(self) -> "RedlockContext":
        return RedlockContext(self)

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        pass


class RedlockContext:
    """Context manager for Redlock."""

    def __init__(self, redlock: Redlock):
        self._redlock = redlock
        self._result: RedlockResult | None = None

    async def acquire(self, resource: str) -> RedlockResult:
        self._result = await self._redlock.acquire(resource)
        if not self._result.acquired:
            raise LockAcquisitionError(f"Failed to acquire lock: {resource}")
        return self._result

    async def release(self) -> bool:
        if self._result:
            return await self._redlock.release(self._result)
        return False

Usage

# Setup with multiple Redis instances
redis_clients = [
    redis.from_url("redis://redis1:6379"),
    redis.from_url("redis://redis2:6379"),
    redis.from_url("redis://redis3:6379"),
    redis.from_url("redis://redis4:6379"),
    redis.from_url("redis://redis5:6379"),
]

redlock = Redlock(redis_clients, RedlockConfig(
    ttl=timedelta(seconds=30),
    retry_count=3,
))


# Acquire and use lock
async def critical_operation(resource_id: str):
    result = await redlock.acquire(resource_id)

    if result.acquired:
        try:
            # Use lock for validity_time_ms
            print(f"Lock valid for {result.validity_time_ms}ms")
            await do_critical_work()
        finally:
            await redlock.release(result)
    else:
        print("Failed to acquire lock")

When to Use Redlock vs Single-Node

Single-Node Redis LockRedlock
Development/testingProduction with HA
Non-critical operationsCritical operations
Single datacenterMulti-datacenter
Cost-sensitiveReliability-critical
Simpler setupComplex setup

Retry Strategies

Retry Strategies

Overview

Retry strategies handle transient failures by automatically re-attempting operations. The key is knowing when to retry, how long to wait, and when to give up.

Core Concepts

Exponential Backoff with Jitter

┌────────────────────────────────────────────────────────────┐
│              Exponential Backoff + Full Jitter             │
├────────────────────────────────────────────────────────────┤
│                                                             │
│   Attempt  Base Delay   With Jitter (random 0-base)        │
│   ───────  ──────────   ─────────────────────────          │
│      1        1s              0.0s - 1.0s                  │
│      2        2s              0.0s - 2.0s                  │
│      3        4s              0.0s - 4.0s                  │
│      4        8s              0.0s - 8.0s                  │
│      5       16s              0.0s - 16.0s                 │
│                                                             │
│   Formula: sleep = random(0, min(cap, base * 2^attempt))   │
│                                                             │
│   Full jitter prevents thundering herd when many clients   │
│   retry simultaneously after an outage.                    │
│                                                             │
└────────────────────────────────────────────────────────────┘

Jitter Strategies

StrategyFormulaUse Case
No jitterbase * 2^attemptTesting only
Full jitterrandom(0, base * 2^attempt)Most common, best distribution
Equal jitter(base * 2^attempt)/2 + random(0, (base * 2^attempt)/2)When min delay needed
Decorrelatedrandom(base, prev_delay * 3)Aggressive retry scenarios

Error Classification

Retryable Errors

RETRYABLE_ERRORS = {
    # HTTP Status Codes
    408: "Request Timeout",
    429: "Too Many Requests",
    500: "Internal Server Error",
    502: "Bad Gateway",
    503: "Service Unavailable",
    504: "Gateway Timeout",

    # Python Exceptions
    ConnectionError,
    TimeoutError,
    ConnectionResetError,
    BrokenPipeError,

    # LLM API Errors
    "rate_limit_exceeded",
    "model_overloaded",
    "server_error",
    "timeout",
    "context_length_exceeded",  # Retry with truncation
}

def is_retryable(error: Exception) -> bool:
    # HTTP errors
    if hasattr(error, "status_code"):
        return error.status_code in RETRYABLE_ERRORS

    # Exception types
    if type(error) in RETRYABLE_ERRORS:
        return True

    # LLM API error codes
    if hasattr(error, "code"):
        return error.code in RETRYABLE_ERRORS

    return False

Non-Retryable Errors

NON_RETRYABLE_ERRORS = {
    # HTTP Status Codes
    400: "Bad Request",
    401: "Unauthorized",
    403: "Forbidden",
    404: "Not Found",
    405: "Method Not Allowed",
    422: "Unprocessable Entity",

    # LLM API Errors
    "invalid_api_key",
    "invalid_request_error",
    "content_policy_violation",
    "model_not_found",
    "insufficient_quota",
}

Implementation Patterns

Basic Retry Decorator

import asyncio
import random
from functools import wraps
from typing import TypeVar, Callable, Awaitable, Set, Type

T = TypeVar("T")

def retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    retryable_exceptions: Set[Type[Exception]] = None,
):
    """Async retry decorator with exponential backoff."""

    retryable = retryable_exceptions or {Exception}

    def decorator(fn: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        @wraps(fn)
        async def wrapper(*args, **kwargs) -> T:
            last_exception = None

            for attempt in range(1, max_attempts + 1):
                try:
                    return await fn(*args, **kwargs)
                except tuple(retryable) as e:
                    last_exception = e

                    if attempt == max_attempts:
                        raise

                    # Calculate delay
                    delay = min(base_delay * (exponential_base ** (attempt - 1)), max_delay)

                    # Apply jitter
                    if jitter:
                        delay = random.uniform(0, delay)

                    logger.warning(
                        f"Retry {attempt}/{max_attempts} after {delay:.2f}s",
                        error=str(e),
                        function=fn.__name__,
                    )

                    await asyncio.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

Retry with Modification

async def retry_with_truncation(
    fn: Callable[[str], Awaitable[T]],
    content: str,
    max_attempts: int = 3,
) -> T:
    """Retry LLM call, truncating content on context_length_exceeded."""

    for attempt in range(1, max_attempts + 1):
        try:
            return await fn(content)
        except ContextLengthExceededError:
            if attempt == max_attempts:
                raise

            # Truncate content by 25% each retry
            truncate_to = int(len(content) * 0.75)
            content = content[:truncate_to]

            logger.warning(
                f"Truncating content to {truncate_to} chars",
                attempt=attempt,
            )

Retry Budget

class RetryBudget:
    """Limits total retries across all calls to prevent retry storms."""

    def __init__(
        self,
        budget_per_second: float = 10.0,
        min_retries_per_second: float = 1.0,
    ):
        self.budget = budget_per_second
        self.min_budget = min_retries_per_second
        self.last_update = time.time()

    def can_retry(self) -> bool:
        self._replenish()
        return self.budget >= 1.0

    def use_retry(self):
        if self.budget >= 1.0:
            self.budget -= 1.0

    def _replenish(self):
        now = time.time()
        elapsed = now - self.last_update
        self.budget = min(
            self.budget + elapsed * self.min_budget,
            10.0  # Max budget
        )
        self.last_update = now

Best Practices (2026)

1. Set Appropriate Limits

# BAD: Too many retries
@retry(max_attempts=10, base_delay=0.1)  # 10 retries in ~3s = hammering

# GOOD: Reasonable limits
@retry(max_attempts=3, base_delay=1.0, max_delay=30.0)

2. Always Use Jitter

# BAD: No jitter (thundering herd)
delay = base * 2 ** attempt  # All clients retry at same time

# GOOD: Full jitter
delay = random.uniform(0, base * 2 ** attempt)  # Spread retries

3. Different Strategies per Operation

# Fast-fail for user-facing
@retry(max_attempts=2, base_delay=0.5, max_delay=2.0)
async def get_user_data():
    ...

# More patient for background jobs
@retry(max_attempts=5, base_delay=2.0, max_delay=60.0)
async def sync_data():
    ...

4. Log All Retries

async def retry_with_logging(fn, *args, **kwargs):
    for attempt in range(1, max_attempts + 1):
        try:
            return await fn(*args, **kwargs)
        except RetryableError as e:
            logger.warning(
                "Retry attempt",
                attempt=attempt,
                max_attempts=max_attempts,
                error_type=type(e).__name__,
                error_message=str(e),
                function=fn.__name__,
                # Include trace ID for correlation
                trace_id=get_current_trace_id(),
            )
            await asyncio.sleep(calculate_delay(attempt))

5. Combine with Circuit Breaker

# Retry INSIDE circuit breaker
# Circuit only counts final failures after retries exhausted

@circuit_breaker(failure_threshold=5)
@retry(max_attempts=3)
async def call_external_api():
    ...

# NOT the other way around:
# @retry  # Would retry when circuit is open!
# @circuit_breaker

Anti-Patterns

1. Retrying Non-Retryable Errors

# BAD: Retry everything
@retry(max_attempts=5, retryable_exceptions={Exception})
async def call_api():
    ...  # Will retry 401 Unauthorized 5 times!

# GOOD: Specific exceptions
@retry(
    max_attempts=3,
    retryable_exceptions={
        ConnectionError,
        TimeoutError,
        RateLimitError,
    }
)

2. No Backoff

# BAD: Fixed delay
for attempt in range(5):
    try:
        return await call()
    except Exception:
        await asyncio.sleep(1)  # Same delay every time

# GOOD: Exponential backoff
for attempt in range(5):
    try:
        return await call()
    except Exception:
        await asyncio.sleep(2 ** attempt)  # 1, 2, 4, 8, 16

3. Infinite Retries

# BAD: Never gives up
while True:
    try:
        return await call()
    except Exception:
        await asyncio.sleep(1)

# GOOD: Bounded retries
for attempt in range(max_attempts):
    ...
raise MaxRetriesExceeded()

LLM-Specific Retry Strategies

Rate Limit Handling

async def call_llm_with_rate_limit_handling(prompt: str) -> str:
    for attempt in range(3):
        try:
            return await llm.complete(prompt)
        except RateLimitError as e:
            # Use retry-after header if provided
            retry_after = e.headers.get("retry-after", 60)
            logger.warning(f"Rate limited, waiting {retry_after}s")
            await asyncio.sleep(int(retry_after))

    raise MaxRetriesExceeded("Rate limit persists after retries")

Context Length Handling

async def call_with_context_management(prompt: str, max_tokens: int = 4096) -> str:
    for attempt in range(3):
        try:
            return await llm.complete(prompt, max_tokens=max_tokens)
        except ContextLengthExceededError:
            # Reduce by 25% each attempt
            prompt = truncate_prompt(prompt, ratio=0.75 ** attempt)
            logger.warning(f"Truncated prompt to {len(prompt)} chars")

    raise ContextLengthExceededError("Cannot fit in context after truncation")

Monitoring

# Retry rate
rate(retries_total[5m])

# Retry success rate (retries that eventually succeed)
sum(rate(retry_success_total[5m])) / sum(rate(retries_total[5m]))

# Average attempts before success
histogram_quantile(0.95, retry_attempts_bucket)

# Retry budget utilization
retry_budget_used / retry_budget_total

Stripe Pattern

Stripe-Style Idempotency Pattern

Stripe's idempotency implementation is considered the gold standard. Here's how to replicate it.

How Stripe Does It

  1. Client sends Idempotency-Key header with POST request
  2. Server checks if key was seen before
  3. If yes: return cached response
  4. If no: process request, cache response, return

Implementation

Request Flow

Client Request


┌────────────────┐
│ Check cache    │
│ (Redis)        │
└───────┬────────┘

   ┌────┴────┐
   │ Exists? │
   └────┬────┘

    ┌───┴───┐
   YES      NO
    │        │
    ▼        ▼
┌────────┐ ┌────────────┐
│ Return │ │ Lock key   │
│ cached │ │ (Redis)    │
└────────┘ └─────┬──────┘


           ┌────────────┐
           │ Process    │
           │ request    │
           └─────┬──────┘


           ┌────────────┐
           │ Cache      │
           │ response   │
           └─────┬──────┘


           ┌────────────┐
           │ Return     │
           │ response   │
           └────────────┘

FastAPI Implementation

from fastapi import FastAPI, Request, Header, HTTPException
from typing import Optional
import redis.asyncio as redis
import json
import hashlib

app = FastAPI()
redis_client = redis.from_url("redis://localhost")

IDEMPOTENCY_TTL = 86400  # 24 hours

async def get_idempotency_response(
    key: str,
    path: str,
) -> dict | None:
    """Check for existing idempotent response."""
    cache_key = f"idem:{path}:{key}"
    cached = await redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    return None

async def set_idempotency_response(
    key: str,
    path: str,
    response: dict,
    status_code: int,
) -> None:
    """Cache response for idempotency key."""
    cache_key = f"idem:{path}:{key}"
    await redis_client.setex(
        cache_key,
        IDEMPOTENCY_TTL,
        json.dumps({
            "body": response,
            "status": status_code,
        }),
    )

async def acquire_idempotency_lock(
    key: str,
    path: str,
    timeout: int = 60,
) -> bool:
    """Acquire lock for processing (prevents concurrent duplicates)."""
    lock_key = f"idem_lock:{path}:{key}"
    return await redis_client.set(lock_key, "1", nx=True, ex=timeout)

async def release_idempotency_lock(key: str, path: str) -> None:
    """Release processing lock."""
    lock_key = f"idem_lock:{path}:{key}"
    await redis_client.delete(lock_key)


@app.post("/v1/charges")
async def create_charge(
    request: Request,
    idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"),
):
    # If no key, process normally (no idempotency)
    if not idempotency_key:
        return await _process_charge(await request.json())

    path = request.url.path

    # Check for cached response
    cached = await get_idempotency_response(idempotency_key, path)
    if cached:
        return JSONResponse(
            content=cached["body"],
            status_code=cached["status"],
            headers={"Idempotent-Replayed": "true"},
        )

    # Try to acquire lock
    if not await acquire_idempotency_lock(idempotency_key, path):
        # Another request is processing with this key
        raise HTTPException(
            status_code=409,
            detail="A request with this idempotency key is already being processed",
        )

    try:
        # Process the request
        body = await request.json()
        result = await _process_charge(body)

        # Cache the response
        await set_idempotency_response(
            idempotency_key,
            path,
            result,
            status_code=200,
        )

        return result

    finally:
        await release_idempotency_lock(idempotency_key, path)

Client Usage

import httpx
import uuid

async def create_charge_safely(amount: int, currency: str):
    """Create charge with idempotency protection."""
    idempotency_key = str(uuid.uuid4())

    for attempt in range(3):
        try:
            response = await client.post(
                "/v1/charges",
                headers={"Idempotency-Key": idempotency_key},
                json={"amount": amount, "currency": currency},
            )
            return response.json()
        except httpx.TransportError:
            # Network error - safe to retry with same key
            continue

    raise Exception("Failed after 3 attempts")

Key Principles

  1. Keys are client-generated: Server doesn't generate keys
  2. Keys are scoped to endpoint: Same key on different endpoints = different operations
  3. 24-hour window: Keys expire after 24 hours
  4. Locked during processing: Prevents concurrent duplicates
  5. Only success cached: Errors can be retried
  6. Response fully cached: Body, status, and relevant headers

Token Bucket Algorithm

Token Bucket Algorithm

In-depth guide to the token bucket rate limiting algorithm with Redis implementation.

How Token Bucket Works

┌─────────────────────────────────────────────────────────────┐
│                    TOKEN BUCKET                              │
│                                                              │
│   ┌──────────────────────────────────────────────────┐      │
│   │  Tokens: ●●●●●●●○○○  (7/10 tokens available)     │      │
│   │  Capacity: 10 tokens                              │      │
│   │  Refill Rate: 5 tokens/second                     │      │
│   └──────────────────────────────────────────────────┘      │
│                         │                                    │
│   REQUEST ──────────────┼──────────────────────► ALLOWED    │
│                         │                                    │
│   (Each request consumes 1 token)                           │
│   (Bucket refills at constant rate)                         │
│                                                              │
│   Timeline:                                                  │
│   t=0s: 10 tokens │ 10 requests → 0 tokens                  │
│   t=1s: +5 tokens │ 5 tokens available                      │
│   t=2s: +5 tokens │ 10 tokens (capped at capacity)          │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Algorithm Properties

PropertyDescription
Burst CapacityAllows short bursts up to bucket size
Smooth LimitingTokens refill continuously
No MemoryDoesn't track request history
DistributedWorks with Redis for multi-server

Redis Lua Script (Atomic)

-- token_bucket.lua
-- KEYS[1] = bucket key
-- ARGV[1] = bucket capacity
-- ARGV[2] = refill rate (tokens per second)
-- ARGV[3] = current timestamp (milliseconds)
-- ARGV[4] = tokens to consume

local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- Get current bucket state
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now

-- Calculate tokens to add based on time elapsed
local elapsed = (now - last_refill) / 1000  -- Convert to seconds
local refill = math.floor(elapsed * refill_rate)
tokens = math.min(capacity, tokens + refill)

-- Check if we can consume tokens
local allowed = 0
local remaining = tokens
local retry_after = 0

if tokens >= requested then
    allowed = 1
    remaining = tokens - requested
else
    -- Calculate when enough tokens will be available
    local needed = requested - tokens
    retry_after = math.ceil(needed / refill_rate)
end

-- Update bucket state
redis.call('HMSET', key,
    'tokens', remaining,
    'last_refill', now
)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)

return {allowed, remaining, retry_after}

Python Implementation

import time
from typing import NamedTuple

import redis.asyncio as redis


class RateLimitResult(NamedTuple):
    allowed: bool
    remaining: int
    retry_after: int  # seconds


class TokenBucket:
    """Token bucket rate limiter with Redis backend."""

    # Load Lua script once
    SCRIPT = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])
    local requested = tonumber(ARGV[4])

    local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
    local tokens = tonumber(bucket[1]) or capacity
    local last_refill = tonumber(bucket[2]) or now

    local elapsed = (now - last_refill) / 1000
    local refill = math.floor(elapsed * refill_rate)
    tokens = math.min(capacity, tokens + refill)

    local allowed = 0
    local remaining = tokens
    local retry_after = 0

    if tokens >= requested then
        allowed = 1
        remaining = tokens - requested
    else
        local needed = requested - tokens
        retry_after = math.ceil(needed / refill_rate)
    end

    redis.call('HMSET', key, 'tokens', remaining, 'last_refill', now)
    redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)

    return {allowed, remaining, retry_after}
    """

    def __init__(
        self,
        redis_client: redis.Redis,
        capacity: int = 100,
        refill_rate: float = 10,  # tokens per second
    ):
        self.redis = redis_client
        self.capacity = capacity
        self.refill_rate = refill_rate
        self._script = self.redis.register_script(self.SCRIPT)

    async def consume(
        self,
        key: str,
        tokens: int = 1,
    ) -> RateLimitResult:
        """
        Try to consume tokens from the bucket.

        Args:
            key: Unique identifier (user_id, ip_address, etc.)
            tokens: Number of tokens to consume

        Returns:
            RateLimitResult with allowed status and metadata
        """
        bucket_key = f"ratelimit:token_bucket:{key}"
        now_ms = int(time.time() * 1000)

        result = await self._script(
            keys=[bucket_key],
            args=[self.capacity, self.refill_rate, now_ms, tokens],
        )

        return RateLimitResult(
            allowed=bool(result[0]),
            remaining=int(result[1]),
            retry_after=int(result[2]),
        )


# Usage with FastAPI
async def get_rate_limiter() -> TokenBucket:
    redis_client = redis.from_url("redis://localhost:6379")
    return TokenBucket(redis_client, capacity=100, refill_rate=10)

Comparison: Token Bucket vs Sliding Window

AspectToken BucketSliding Window
Burst HandlingAllows up to capacitySpreads evenly
MemoryO(1) per keyO(n) request timestamps
PrecisionApproximateExact
Use CaseAPI rate limitingStrict quotas
Redis Operations1 HMSET1 ZADD + 1 ZREMRANGEBYSCORE

When to Use Token Bucket

Good for:

  • API rate limiting (allows natural bursts)
  • User actions (login attempts, form submissions)
  • Resource protection (database connections)

Not ideal for:

  • Strict per-second quotas
  • Billing-based limits (use sliding window)
  • Fair queuing (use leaky bucket)
  • See examples/fastapi-rate-limiting.md for FastAPI integration
  • See checklists/rate-limiting-checklist.md for implementation checklist
  • See SKILL.md for sliding window implementation

Checklists (5)

Circuit Breaker Setup

Circuit Breaker Setup Guide

Step-by-step guide for adding circuit breakers to a service.

Step 1: Identify Services

List all external dependencies that need circuit breakers:

┌─────────────────────────────────────────────────────────────┐
│ Service Inventory                                            │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│ External APIs:                                               │
│ □ OpenAI API (LLM)                                          │
│ □ Anthropic API (LLM)                                       │
│ □ YouTube Data API                                          │
│ □ GitHub API                                                │
│ □ arXiv API                                                 │
│                                                              │
│ Internal Services:                                           │
│ □ Embedding service                                         │
│ □ Database (PostgreSQL)                                     │
│ □ Redis cache                                               │
│ □ Semantic search                                           │
│                                                              │
│ For each, answer:                                            │
│ 1. What's the expected failure rate?                        │
│ 2. How long does recovery typically take?                   │
│ 3. What's the fallback behavior?                            │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Step 2: Configure Thresholds

Failure Threshold

Service TypeRecommendedReasoning
LLM API3APIs can be unstable, fail fast
External API5More tolerant of transient issues
Database2-3DB issues usually need immediate attention
Internal service3-5Depends on service criticality

Recovery Timeout

Service TypeRecommendedReasoning
LLM API60sRate limits typically reset in minutes
External API30-120sDepends on SLA
Database15-30sShould recover quickly
Internal service15-60sDepends on restart time

Slow Call Threshold

Service TypeRecommendedReasoning
LLM API30sLLM calls can be slow
External API10sMost APIs should be fast
Database5sDB queries should be optimized
Internal service5-10sDepends on operation

Step 3: Implement Circuit Breaker

Basic Implementation

from resilience import CircuitBreaker, CircuitBreakerFactory

# Option 1: Use factory for common patterns
openai_breaker = CircuitBreakerFactory.for_llm_api("openai")
db_breaker = CircuitBreakerFactory.for_database("postgres")

# Option 2: Custom configuration
custom_breaker = CircuitBreaker(
    name="my-service",
    failure_threshold=5,
    success_threshold=2,
    recovery_timeout=30.0,
    slow_call_threshold=10.0,
)

Wrap Service Calls

# Method 1: Decorator
@openai_breaker
async def call_openai(prompt: str) -> str:
    return await openai_client.complete(prompt)

# Method 2: Explicit call
async def call_openai(prompt: str) -> str:
    return await openai_breaker.call(
        openai_client.complete,
        prompt,
    )

Step 4: Add Fallback Handling

from resilience import CircuitOpenError

async def analyze_with_fallback(content: str) -> Analysis:
    try:
        return await circuit_breaker.call(primary_analysis, content)

    except CircuitOpenError as e:
        logger.warning(
            f"Circuit open for {e.name}, using fallback",
            time_until_recovery=e.time_until_recovery,
        )

        # Fallback 1: Try cache
        cached = await cache.get(f"analysis:{hash(content)}")
        if cached:
            return Analysis.from_cache(cached, is_stale=True)

        # Fallback 2: Degraded response
        return Analysis(
            status="degraded",
            message="Full analysis temporarily unavailable",
            basic_info=extract_basic_info(content),
        )

Step 5: Add Observability

Logging

def setup_circuit_logging(breaker: CircuitBreaker):
    def on_state_change(old: str, new: str, name: str):
        logger.warning(
            "circuit_state_change",
            circuit=name,
            old_state=old,
            new_state=new,
        )

        if new == "open":
            # Send alert
            alerting.send(
                severity="warning",
                message=f"Circuit {name} opened",
                runbook="https://docs/runbooks/circuit-breaker",
            )

    breaker._on_state_change = on_state_change

Metrics

from prometheus_client import Gauge, Counter

circuit_state = Gauge(
    "circuit_breaker_state",
    "Circuit breaker state (0=closed, 1=open, 2=half_open)",
    ["service"],
)

circuit_rejections = Counter(
    "circuit_breaker_rejections_total",
    "Total requests rejected by circuit breaker",
    ["service"],
)

def update_metrics(breaker: CircuitBreaker):
    state_map = {"closed": 0, "open": 1, "half_open": 2}
    circuit_state.labels(service=breaker.name).set(
        state_map[breaker.state.value]
    )

Health Endpoint

@app.get("/health/circuits")
async def circuit_health():
    return {
        name: {
            "state": cb.state.value,
            "failure_count": cb._failure_count,
            "time_until_recovery": (
                cb._time_until_recovery()
                if cb.state == CircuitState.OPEN
                else None
            ),
        }
        for name, cb in circuit_breakers.items()
    }

Step 6: Test Circuit Behavior

Unit Tests

@pytest.mark.asyncio
async def test_circuit_opens_on_failures():
    breaker = CircuitBreaker(name="test", failure_threshold=3)

    async def failing_call():
        raise ConnectionError("Failed")

    # Fail 3 times
    for _ in range(3):
        with pytest.raises(ConnectionError):
            await breaker.call(failing_call)

    # Should be open now
    assert breaker.state == CircuitState.OPEN

    # Next call rejected
    with pytest.raises(CircuitOpenError):
        await breaker.call(failing_call)


@pytest.mark.asyncio
async def test_circuit_recovers():
    breaker = CircuitBreaker(
        name="test",
        failure_threshold=1,
        recovery_timeout=0.1,  # Fast for testing
    )

    async def failing_then_succeeding():
        if breaker._failure_count > 0:
            return "success"
        raise ConnectionError("First call fails")

    # Open circuit
    with pytest.raises(ConnectionError):
        await breaker.call(failing_then_succeeding)

    # Wait for recovery
    await asyncio.sleep(0.2)

    # Should succeed now
    result = await breaker.call(failing_then_succeeding)
    assert result == "success"
    assert breaker.state == CircuitState.CLOSED

Integration Tests

@pytest.mark.asyncio
async def test_circuit_isolates_failures():
    """Verify circuit prevents cascade failures."""
    openai_breaker = circuit_breakers["openai"]

    # Simulate OpenAI outage
    with patch("openai.complete", side_effect=ConnectionError):
        # Multiple calls should fail then trip circuit
        for _ in range(5):
            try:
                await analyze_content("test")
            except (ConnectionError, CircuitOpenError):
                pass

    # Circuit should be open
    assert openai_breaker.state == CircuitState.OPEN

    # Anthropic should still work (different circuit)
    anthropic_breaker = circuit_breakers["anthropic"]
    assert anthropic_breaker.state == CircuitState.CLOSED

Step 7: Document and Monitor

Documentation

Add to your service's README:

## Circuit Breakers

| Service | Threshold | Recovery | Fallback |
|---------|-----------|----------|----------|
| openai | 3 failures | 60s | Use gpt-5.2-mini |
| anthropic | 3 failures | 60s | Use cache |
| youtube | 5 failures | 120s | Return partial data |

### Monitoring

- Dashboard: [Grafana Circuit Breakers](...)
- Alerts: PagerDuty channel #resilience
- Runbook: [Circuit Breaker Runbook](...)

Runbook Template

## Circuit Breaker Open - {service}

### Symptoms
- Service returning 503 errors
- Alert: "Circuit {service} opened"
- Dashboard shows circuit in OPEN state

### Impact
- {describe impact on users}

### Resolution
1. Check {service} status page
2. Review logs for failure pattern
3. If transient: wait for auto-recovery
4. If persistent: {escalation steps}

### Verification
1. Circuit state returns to CLOSED
2. Service calls succeeding
3. Metrics returning to normal

Quick Reference

┌─────────────────────────────────────────────────────────────┐
│ Circuit Breaker Quick Reference                              │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│ CREATE:                                                      │
│   breaker = CircuitBreaker("name", failure_threshold=5)     │
│                                                              │
│ USE:                                                         │
│   @breaker                                                   │
│   async def my_function(): ...                              │
│                                                              │
│   result = await breaker.call(func, *args)                  │
│                                                              │
│ HANDLE:                                                      │
│   try:                                                       │
│       await breaker.call(...)                               │
│   except CircuitOpenError:                                  │
│       return fallback_response()                            │
│                                                              │
│ MONITOR:                                                     │
│   breaker.get_status()                                      │
│   breaker.state == CircuitState.OPEN                        │
│                                                              │
│ RESET (manual):                                              │
│   breaker.reset()                                           │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Distributed Locks Checklist

Distributed Locks Checklist

Lock Selection

  • Chose appropriate lock backend
    • Redis: Fast, TTL-based, requires Redis infrastructure
    • PostgreSQL: No extra infra, integrates with transactions
    • Redlock: Multi-node Redis for high availability
  • Determined lock scope (session vs transaction)
  • Set appropriate TTL (not too short, not too long)

Implementation

Acquire

  • Non-blocking option available (try_lock)
  • Timeout support for blocking acquire
  • Retry logic with exponential backoff
  • Jitter added to prevent thundering herd
  • Unique owner ID generated (UUIDv7)

Release

  • Owner validation (only owner can release)
  • Atomic release (Lua script for Redis)
  • Idempotent release (safe to call twice)
  • Finally block ensures release on exception

Extension

  • Heartbeat/extend for long operations
  • Auto-extend background task option
  • Extension validates ownership

Safety

Mutual Exclusion

  • Atomic acquire (SET NX for Redis)
  • Fencing token or owner ID validated
  • No race conditions in acquire/release

Deadlock Prevention

  • TTL prevents permanent deadlocks
  • Lock ordering for multiple locks
  • Timeout on acquire attempts

Split-Brain Protection

  • Redlock for multi-node Redis
  • Clock drift factored into validity
  • Quorum required for lock acquisition

Error Handling

  • Lock acquisition failures handled gracefully
  • Release failures logged and handled
  • Network partition scenarios considered
  • Retry logic for transient failures

Testing

  • Unit tests for lock logic
  • Integration tests with real backend
  • Concurrent access tests
  • Failure scenario tests (network, timeout)
  • Lock expiration tests

Monitoring

  • Lock acquisition metrics
  • Lock hold duration metrics
  • Failed acquisition alerts
  • Long-held lock alerts
  • Deadlock detection

PostgreSQL Advisory Locks

  • Correct lock function used (session vs xact)
  • Lock ID strategy documented
  • Namespace collisions prevented
  • pg_locks monitoring query available

Redis Locks

  • Lua scripts used for atomicity
  • TTL always set (no deadlocks)
  • Owner ID stored with lock
  • Release validates owner

Redlock (Multi-Node)

  • Minimum 3 Redis instances (recommend 5)
  • Quorum calculated correctly (N/2 + 1)
  • Clock drift factored in
  • Failed nodes don't block acquire
  • Release attempted on all nodes

Production Readiness

  • Lock names are descriptive and namespaced
  • TTL tuned for operation duration
  • Metrics and alerting configured
  • Runbook for lock-related incidents
  • Graceful degradation strategy

Idempotency Checklist

Idempotency Implementation Checklist

Key Generation

  • Keys are deterministic (same input = same key)
  • Keys include all relevant parameters
  • Keys are scoped appropriately (user, endpoint, etc.)
  • Keys use consistent hash algorithm (SHA-256)
  • Keys are reasonable length (32-64 chars)

API Endpoints

  • POST/PUT/PATCH endpoints support Idempotency-Key header
  • Idempotency key format is documented
  • Key is validated (format, length)
  • Duplicate requests return cached response
  • Response includes header indicating replay

Storage

  • Redis used for fast lookups
  • Database used for durability
  • TTL configured appropriately (24-72 hours)
  • Cleanup job for expired records
  • Storage sized for expected volume

Race Conditions

  • Database constraint prevents duplicates
  • Check-and-insert is atomic
  • Lost updates are prevented
  • Concurrent requests handled correctly

Response Handling

  • Only successful responses cached (2xx)
  • Error responses allow retry
  • Response body stored completely
  • Status code preserved
  • Headers preserved if needed

Event Processing

  • Events include idempotency key
  • Consumer checks before processing
  • Processed events tracked
  • At-least-once delivery handled
  • Dead letter queue for failures

Error Cases

  • Missing key handled (process normally or reject)
  • Invalid key format rejected
  • Storage failures don't break processing
  • Timeout during processing handled

Testing

  • Duplicate request returns same response
  • Different keys process independently
  • Race condition tests pass
  • TTL expiration verified
  • Cache miss falls back to database

Documentation

  • Idempotency behavior documented
  • Key format documented
  • TTL window documented
  • Client retry guidance provided

Pre Deployment Resilience

Pre-Deployment Resilience Checklist

Use this checklist before deploying services with resilience patterns.

Circuit Breakers

  • Threshold Configuration

    • Failure threshold set appropriately (not too low, not too high)
    • Recovery timeout allows service to actually recover
    • Sliding window size captures representative sample
  • Fallback Behavior

    • Every circuit breaker has a defined fallback response
    • Fallbacks return meaningful partial data when possible
    • Fallbacks don't call other services with closed circuits
  • Observability

    • State changes logged with structured logging
    • Metrics exported (Prometheus/Langfuse)
    • Alerts configured for OPEN state
    • Dashboard shows circuit status
  • Testing

    • Unit tests for state transitions
    • Integration tests simulate failure scenarios
    • Chaos testing validates circuit behavior under load

Bulkheads

  • Tier Assignment

    • Critical operations in Tier 1 (highest priority)
    • Standard operations in Tier 2
    • Optional/background operations in Tier 3
    • No critical path through Tier 3
  • Capacity Planning

    • Max concurrent based on downstream capacity
    • Queue sizes prevent memory exhaustion
    • Timeouts shorter than caller's timeout
  • Rejection Handling

    • Rejection policy defined per tier
    • HTTP 503 returned with Retry-After header
    • Rejections logged and metriced
  • Testing

    • Load test validates bulkhead isolation
    • Tier 3 failure doesn't affect Tier 1
    • Queue depth monitored under load

Retry Logic

  • Error Classification

    • Retryable vs non-retryable errors defined
    • HTTP status codes classified correctly
    • LLM API errors handled specifically
  • Backoff Strategy

    • Exponential backoff configured
    • Jitter enabled to prevent thundering herd
    • Max delay caps retry storms
  • Limits

    • Max attempts bounded (typically 3-5)
    • Total retry time < caller's timeout
    • Retry budget prevents system overload
  • Testing

    • Transient failures recovered automatically
    • Non-retryable errors fail immediately
    • Retry budget depletes under sustained failures

LLM Resilience

  • Fallback Chain

    • Primary model defined
    • At least one fallback model configured
    • Semantic cache as final fallback
    • Default response for complete outage
  • Token Budget

    • Budget allocation per category
    • Truncation strategy defined
    • Output reserve prevents overflow
  • Rate Limiting

    • Client-side rate limiter configured
    • Respects API provider limits
    • Graceful handling of 429 responses
  • Cost Control

    • Per-request cost tracking
    • Hourly/daily budget alerts
    • Cost circuit breaker configured

Integration

  • Pattern Composition

    • Retry INSIDE circuit breaker
    • Bulkhead wraps retry+circuit
    • Timeout inside all patterns
  • Health Endpoints

    • /health returns 200 (doesn't check circuit)
    • /ready reflects degraded state
    • /resilience shows all pattern status
  • Configuration

    • All thresholds configurable via env vars
    • Defaults documented
    • Per-environment overrides

Observability

  • Logging

    • Structured logging with trace IDs
    • State changes logged at WARN level
    • Rejections logged with context
  • Metrics

    • Circuit state gauge
    • Bulkhead utilization gauge
    • Retry counter
    • Latency histograms
  • Alerting

    • Alert when circuit opens
    • Alert when bulkhead consistently full
    • Alert when retry budget exhausted
    • Runbook links in alerts

Documentation

  • Architecture

    • Resilience patterns documented in ADR
    • Diagram shows pattern composition
    • Tier assignments documented
  • Operations

    • Runbook for circuit open scenarios
    • Runbook for bulkhead exhaustion
    • Manual override procedures documented
  • API Documentation

    • 503 responses documented
    • Retry-After header usage documented
    • Degraded response format documented

Final Verification

  • Load Test

    • System handles expected load
    • Graceful degradation under 2x load
    • Recovery after load spike
  • Chaos Test

    • Dependency failure isolated
    • Recovery automatic when dependency restored
    • No cascading failures
  • Security Review

    • Fallback responses don't leak sensitive data
    • Error messages don't expose internals
    • Rate limits prevent abuse

Rate Limiting Checklist

Rate Limiting Implementation Checklist

Planning

  • Define rate limits for each endpoint category

    • Read endpoints (GET) - higher limits
    • Write endpoints (POST/PUT/DELETE) - lower limits
    • Authentication endpoints - very strict limits
    • Expensive operations (LLM calls, file processing) - strictest limits
  • Choose limiting algorithm

    • Token Bucket - for bursty traffic patterns
    • Sliding Window - for strict quotas
    • Fixed Window - for simple requirements
  • Determine key strategy

    • By IP address (anonymous users)
    • By user ID (authenticated users)
    • By API key (service accounts)
    • By organization (enterprise customers)

Implementation

Backend Setup

  • Install dependencies

    pip install slowapi redis
  • Configure Redis connection

    redis_client = Redis.from_url(settings.redis_url)
  • Set up SlowAPI or custom limiter

    limiter = Limiter(key_func=get_user_identifier)
    app.add_middleware(SlowAPIMiddleware)

Route Protection

  • Add @limiter.limit() to all public endpoints
  • Set stricter limits for:
    • Login/register endpoints (prevent brute force)
    • Password reset (prevent enumeration)
    • File upload (prevent abuse)
    • LLM/AI operations (cost control)

Response Headers

  • Include rate limit headers in all responses:

    • X-RateLimit-Limit - max requests in window
    • X-RateLimit-Remaining - requests remaining
    • X-RateLimit-Reset - Unix timestamp when limit resets
  • Include Retry-After header in 429 responses

Error Handling

  • Return proper 429 Too Many Requests status
  • Include helpful error message
    {
      "type": "https://api.example.com/problems/rate-limit-exceeded",
      "title": "Rate Limit Exceeded",
      "status": 429,
      "detail": "You have exceeded 100 requests per minute. Please wait 45 seconds.",
      "retry_after": 45
    }

Tiered Limits

  • Define limits per user tier:

    TierRequests/minBurst
    Anonymous105
    Free10020
    Pro1000100
    Enterprise100001000
  • Implement dynamic limit function

    def get_tier_limit(request: Request) -> str:
        user = request.state.user
        return TIER_LIMITS.get(user.tier, "10/minute")

Distributed Systems

  • Use Redis backend (not in-memory)
  • Configure Redis connection pooling
  • Set appropriate key TTLs
  • Use Lua scripts for atomicity
  • Handle Redis connection failures gracefully

Monitoring

  • Log rate limit hits

    logger.warning("Rate limit exceeded", extra={
        "user_id": user.id,
        "endpoint": request.url.path,
        "limit": limit,
    })
  • Track metrics:

    • Rate limit hits per endpoint
    • Rate limit hits per user
    • Average remaining quota
  • Set up alerts for:

    • Unusual spike in 429 responses
    • Single user hitting limits repeatedly
    • Redis connection failures

Security Considerations

  • Rate limit login endpoints strictly (prevent brute force)
  • Rate limit password reset (prevent enumeration)
  • Consider IP reputation for anonymous limits
  • Don't expose internal rate limit keys
  • Use secure Redis connection (TLS)

Documentation

  • Document rate limits in OpenAPI/Swagger
  • Add rate limit info to API documentation
  • Include examples of handling 429 responses
  • Explain tier limits for customers

Testing

  • Unit test rate limit logic
  • Integration test with Redis
  • Load test to verify limits work
  • Test retry logic in clients
  • Test header values are correct
  • Test limit reset behavior

Client SDK Recommendations

Document recommended client-side handling:

# Python client example
import time
import httpx

def make_request_with_retry(url: str, max_retries: int = 3):
    for attempt in range(max_retries):
        response = httpx.get(url)

        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            time.sleep(retry_after)
            continue

        return response

    raise Exception("Rate limit exceeded after retries")

Rollout Checklist

  • Deploy with monitoring enabled
  • Start with permissive limits
  • Monitor for false positives
  • Gradually tighten limits
  • Communicate changes to users
  • Provide upgrade path for users hitting limits

Examples (3)

Fastapi Rate Limiting

FastAPI Rate Limiting Examples

Complete examples for implementing rate limiting in FastAPI with Redis.

Installation

pip install slowapi redis

Basic Configuration

# app/core/rate_limit.py
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from redis import Redis

# Use Redis backend for distributed rate limiting
redis_client = Redis.from_url("redis://localhost:6379", decode_responses=True)

limiter = Limiter(
    key_func=get_remote_address,
    storage_uri="redis://localhost:6379",
    default_limits=["100/minute"],
)


def setup_rate_limiting(app):
    """Configure rate limiting for the FastAPI app."""
    app.state.limiter = limiter
    app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
    app.add_middleware(SlowAPIMiddleware)

Route-Level Limiting

# app/api/v1/routes/analyses.py
from fastapi import APIRouter, Request, Depends
from slowapi import Limiter
from slowapi.util import get_remote_address

router = APIRouter()
limiter = Limiter(key_func=get_remote_address)

@router.post("/analyses")
@limiter.limit("10/minute")  # Override default
async def create_analysis(request: Request):
    """Create analysis - stricter limit due to resource cost."""
    return {"message": "Analysis created"}

@router.get("/analyses")
@limiter.limit("100/minute")
async def list_analyses(request: Request):
    """List analyses - more permissive."""
    return {"analyses": []}

@router.get("/analyses/{id}")
@limiter.limit("200/minute")
async def get_analysis(request: Request, id: str):
    """Get single analysis - most permissive."""
    return {"id": id}

User-Based Rate Limiting

# app/core/rate_limit.py
from fastapi import Request
from app.api.deps import get_current_user

def get_user_identifier(request: Request) -> str:
    """Get rate limit key from authenticated user or IP."""
    # Try to get user from request state (set by auth middleware)
    user = getattr(request.state, "user", None)
    if user:
        return f"user:{user.id}"

    # Fallback to IP for unauthenticated requests
    return f"ip:{get_remote_address(request)}"

limiter = Limiter(key_func=get_user_identifier)

Tiered Rate Limits

# app/api/v1/routes/protected.py
from fastapi import APIRouter, Request, Depends
from slowapi import Limiter

router = APIRouter()

def get_tier_limit(request: Request) -> str:
    """Dynamic limit based on user tier."""
    user = getattr(request.state, "user", None)
    if not user:
        return "10/minute"  # Anonymous

    tier_limits = {
        "free": "100/minute",
        "pro": "1000/minute",
        "enterprise": "10000/minute",
    }
    return tier_limits.get(user.tier, "100/minute")

@router.post("/generate")
@limiter.limit(get_tier_limit)
async def generate_content(request: Request):
    """Rate limit based on user subscription tier."""
    return {"content": "Generated"}

Custom Redis Token Bucket

For more control, implement custom rate limiting:

# app/core/rate_limit.py
import time
from typing import NamedTuple
import redis.asyncio as redis
from fastapi import Request, HTTPException, status

class RateLimitResult(NamedTuple):
    allowed: bool
    remaining: int
    reset_at: float
    retry_after: int

class RedisRateLimiter:
    """Custom rate limiter with token bucket algorithm."""

    SCRIPT = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])

    local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
    local tokens = tonumber(bucket[1]) or capacity
    local last_refill = tonumber(bucket[2]) or now

    local elapsed = (now - last_refill) / 1000
    tokens = math.min(capacity, tokens + elapsed * refill_rate)

    if tokens >= 1 then
        tokens = tokens - 1
        redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
        redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
        return {1, math.floor(tokens), 0}
    else
        local retry_after = math.ceil((1 - tokens) / refill_rate)
        redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
        return {0, 0, retry_after}
    end
    """

    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        capacity: int = 100,
        refill_rate: float = 10,
    ):
        self.redis = redis.from_url(redis_url)
        self.capacity = capacity
        self.refill_rate = refill_rate
        self._script = None

    async def _get_script(self):
        if self._script is None:
            self._script = self.redis.register_script(self.SCRIPT)
        return self._script

    async def check(self, key: str) -> RateLimitResult:
        """Check rate limit for a key."""
        script = await self._get_script()
        now_ms = int(time.time() * 1000)

        result = await script(
            keys=[f"ratelimit:{key}"],
            args=[self.capacity, self.refill_rate, now_ms],
        )

        reset_at = time.time() + (self.capacity / self.refill_rate)

        return RateLimitResult(
            allowed=bool(result[0]),
            remaining=int(result[1]),
            reset_at=reset_at,
            retry_after=int(result[2]),
        )


# FastAPI Dependency
async def rate_limit_dependency(
    request: Request,
    limiter: RedisRateLimiter = Depends(get_rate_limiter),
):
    """Dependency that enforces rate limiting."""
    # Get identifier
    user = getattr(request.state, "user", None)
    key = f"user:{user.id}" if user else f"ip:{request.client.host}"

    result = await limiter.check(key)

    # Set rate limit headers
    request.state.rate_limit = result

    if not result.allowed:
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="Rate limit exceeded",
            headers={
                "Retry-After": str(result.retry_after),
                "X-RateLimit-Limit": str(limiter.capacity),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(result.reset_at)),
            },
        )


# Middleware to add rate limit headers to all responses
@app.middleware("http")
async def add_rate_limit_headers(request: Request, call_next):
    response = await call_next(request)

    rate_limit = getattr(request.state, "rate_limit", None)
    if rate_limit:
        response.headers["X-RateLimit-Limit"] = str(100)
        response.headers["X-RateLimit-Remaining"] = str(rate_limit.remaining)
        response.headers["X-RateLimit-Reset"] = str(int(rate_limit.reset_at))

    return response

Usage in Routes

@router.post("/expensive-operation")
async def expensive_operation(
    request: Request,
    _: None = Depends(rate_limit_dependency),
):
    """This endpoint is rate limited."""
    return {"result": "success"}

Rate Limit by Endpoint Cost

# app/core/rate_limit.py
from functools import wraps
from typing import Callable

class CostBasedLimiter:
    """Rate limiter where different operations cost different tokens."""

    def __init__(self, redis_url: str, capacity: int = 1000):
        self.limiter = RedisRateLimiter(redis_url, capacity=capacity)

    def limit(self, cost: int = 1):
        """Decorator that consumes 'cost' tokens per request."""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(request: Request, *args, **kwargs):
                key = get_user_identifier(request)

                # Check if we have enough tokens
                for _ in range(cost):
                    result = await self.limiter.check(key)
                    if not result.allowed:
                        raise HTTPException(
                            status_code=429,
                            detail=f"Rate limit exceeded (operation costs {cost} tokens)",
                        )

                return await func(request, *args, **kwargs)
            return wrapper
        return decorator


cost_limiter = CostBasedLimiter("redis://localhost:6379")

@router.get("/simple")
@cost_limiter.limit(cost=1)  # Cheap operation
async def simple_query(request: Request):
    return {"data": "simple"}

@router.post("/generate")
@cost_limiter.limit(cost=10)  # Expensive operation
async def generate_content(request: Request):
    return {"data": "generated"}

@router.post("/bulk-process")
@cost_limiter.limit(cost=50)  # Very expensive
async def bulk_process(request: Request):
    return {"data": "processed"}

Testing Rate Limits

# tests/test_rate_limiting.py
import pytest
from httpx import AsyncClient
from app.main import app

@pytest.mark.asyncio
async def test_rate_limit_enforced():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Make requests up to limit
        for _ in range(10):
            response = await client.post("/analyses")
            assert response.status_code == 200

        # Next request should be rate limited
        response = await client.post("/analyses")
        assert response.status_code == 429
        assert "Retry-After" in response.headers

@pytest.mark.asyncio
async def test_rate_limit_headers():
    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.get("/analyses")

        assert "X-RateLimit-Limit" in response.headers
        assert "X-RateLimit-Remaining" in response.headers
        assert "X-RateLimit-Reset" in response.headers
  • See references/token-bucket-algorithm.md for algorithm details
  • See checklists/rate-limiting-checklist.md for implementation checklist
  • See SKILL.md for sliding window and fixed window algorithms

Idempotency Examples

Idempotency Implementation Examples

FastAPI Idempotency Middleware

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from typing import Callable
import redis.asyncio as redis
import json
import hashlib

app = FastAPI()
redis_client = redis.from_url("redis://localhost:6379")

IDEMPOTENCY_TTL = 86400  # 24 hours


class IdempotencyMiddleware:
    """Stripe-style idempotency middleware."""

    def __init__(self, app: FastAPI):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        request = Request(scope, receive)

        # Only apply to mutating methods
        if request.method not in ("POST", "PUT", "PATCH"):
            await self.app(scope, receive, send)
            return

        # Get idempotency key
        idempotency_key = request.headers.get("Idempotency-Key")
        if not idempotency_key:
            await self.app(scope, receive, send)
            return

        # Check for cached response
        cache_key = f"idem:{request.url.path}:{idempotency_key}"
        cached = await redis_client.get(cache_key)

        if cached:
            cached_response = json.loads(cached)
            response = JSONResponse(
                content=cached_response["body"],
                status_code=cached_response["status"],
                headers={"Idempotent-Replayed": "true"},
            )
            await response(scope, receive, send)
            return

        # Try to acquire lock
        lock_key = f"idem_lock:{request.url.path}:{idempotency_key}"
        acquired = await redis_client.set(lock_key, "1", nx=True, ex=60)

        if not acquired:
            response = JSONResponse(
                content={"error": "Request with this idempotency key is being processed"},
                status_code=409,
            )
            await response(scope, receive, send)
            return

        try:
            # Process request and capture response
            # (Simplified - real implementation needs response capture)
            await self.app(scope, receive, send)
        finally:
            await redis_client.delete(lock_key)


app.add_middleware(IdempotencyMiddleware)

Database-Backed Idempotency

from sqlalchemy import Column, String, DateTime, Text, Index
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import UTC, datetime, timedelta
import json


class IdempotencyRecord(Base):
    """Track processed idempotency keys."""
    __tablename__ = "idempotency_records"

    idempotency_key = Column(String(64), primary_key=True)
    endpoint = Column(String(256), nullable=False)
    request_hash = Column(String(64), nullable=False)
    response_body = Column(Text, nullable=True)
    response_status = Column(Integer, default=200)
    created_at = Column(DateTime, default=lambda: datetime.now(UTC))
    expires_at = Column(DateTime, nullable=False)

    __table_args__ = (
        Index("ix_idempotency_expires", "expires_at"),
        Index("ix_idempotency_endpoint_key", "endpoint", "idempotency_key"),
    )


async def get_or_create_idempotency(
    db: AsyncSession,
    idempotency_key: str,
    endpoint: str,
    request_body: dict,
    process_func: Callable,
) -> tuple[dict, int, bool]:
    """
    Get cached response or process request idempotently.

    Returns:
        (response_body, status_code, was_replayed)
    """
    # Hash the request to detect mismatched bodies
    request_hash = hashlib.sha256(
        json.dumps(request_body, sort_keys=True).encode()
    ).hexdigest()

    # Check for existing record
    result = await db.execute(
        select(IdempotencyRecord).where(
            IdempotencyRecord.idempotency_key == idempotency_key,
            IdempotencyRecord.endpoint == endpoint,
        )
    )
    existing = result.scalar_one_or_none()

    if existing:
        # Verify request body matches
        if existing.request_hash != request_hash:
            raise HTTPException(
                status_code=422,
                detail="Idempotency key reused with different request body",
            )

        # Return cached response
        return (
            json.loads(existing.response_body),
            existing.response_status,
            True,
        )

    # Process the request
    try:
        response_body, status_code = await process_func()

        # Store the result
        record = IdempotencyRecord(
            idempotency_key=idempotency_key,
            endpoint=endpoint,
            request_hash=request_hash,
            response_body=json.dumps(response_body),
            response_status=status_code,
            expires_at=datetime.now(UTC) + timedelta(hours=24),
        )
        db.add(record)
        await db.commit()

        return (response_body, status_code, False)

    except Exception:
        # Don't cache errors - allow retry
        await db.rollback()
        raise


# Usage in endpoint
@app.post("/api/orders")
async def create_order(
    order: OrderCreate,
    idempotency_key: str = Header(..., alias="Idempotency-Key"),
    db: AsyncSession = Depends(get_db),
):
    async def process():
        # Actual order creation logic
        new_order = Order(**order.model_dump())
        db.add(new_order)
        await db.commit()
        return {"order_id": str(new_order.id)}, 201

    response, status, replayed = await get_or_create_idempotency(
        db=db,
        idempotency_key=idempotency_key,
        endpoint="/api/orders",
        request_body=order.model_dump(),
        process_func=process,
    )

    return JSONResponse(
        content=response,
        status_code=status,
        headers={"Idempotent-Replayed": "true"} if replayed else {},
    )

Event Consumer Idempotency

from dataclasses import dataclass
from datetime import datetime
import asyncpg


@dataclass
class ProcessedEvent:
    event_id: str
    event_type: str
    processed_at: datetime
    result: str | None


class IdempotentEventProcessor:
    """Process events exactly once using database tracking."""

    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def setup(self):
        """Create tracking table if not exists."""
        async with self.pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS processed_events (
                    event_id VARCHAR(64) PRIMARY KEY,
                    event_type VARCHAR(128) NOT NULL,
                    processed_at TIMESTAMPTZ DEFAULT NOW(),
                    result TEXT
                )
            """)
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS ix_processed_events_type
                ON processed_events (event_type, processed_at)
            """)

    async def is_processed(self, event_id: str) -> bool:
        """Check if event was already processed."""
        async with self.pool.acquire() as conn:
            result = await conn.fetchval(
                "SELECT 1 FROM processed_events WHERE event_id = $1",
                event_id,
            )
            return result is not None

    async def process_event(
        self,
        event_id: str,
        event_type: str,
        handler: Callable,
        *args,
        **kwargs,
    ) -> tuple[any, bool]:
        """
        Process event idempotently.

        Returns:
            (result, was_duplicate)
        """
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                # Try to insert tracking record (fails if exists)
                try:
                    await conn.execute(
                        """
                        INSERT INTO processed_events (event_id, event_type)
                        VALUES ($1, $2)
                        """,
                        event_id,
                        event_type,
                    )
                except asyncpg.UniqueViolationError:
                    # Already processed
                    existing = await conn.fetchrow(
                        "SELECT result FROM processed_events WHERE event_id = $1",
                        event_id,
                    )
                    return existing["result"], True

                # Process the event
                result = await handler(*args, **kwargs)

                # Update with result
                await conn.execute(
                    "UPDATE processed_events SET result = $1 WHERE event_id = $2",
                    json.dumps(result) if result else None,
                    event_id,
                )

                return result, False


# Usage with Kafka consumer
async def consume_orders(processor: IdempotentEventProcessor):
    consumer = AIOKafkaConsumer("orders", bootstrap_servers="localhost:9092")
    await consumer.start()

    try:
        async for msg in consumer:
            event = json.loads(msg.value)
            event_id = event["event_id"]

            result, was_duplicate = await processor.process_event(
                event_id=event_id,
                event_type="order.created",
                handler=handle_order_created,
                order_data=event["data"],
            )

            if was_duplicate:
                logger.info(f"Skipped duplicate event: {event_id}")
            else:
                logger.info(f"Processed event: {event_id}")

    finally:
        await consumer.stop()

Client-Side Retry with Idempotency

import httpx
import uuid
from tenacity import retry, stop_after_attempt, wait_exponential


class IdempotentClient:
    """HTTP client with automatic idempotency key handling."""

    def __init__(self, base_url: str):
        self.client = httpx.AsyncClient(base_url=base_url)

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=0.5, min=0.5, max=10),
        reraise=True,
    )
    async def post_idempotent(
        self,
        path: str,
        json: dict,
        idempotency_key: str | None = None,
    ) -> httpx.Response:
        """
        POST with idempotency key.

        Args:
            path: API endpoint path
            json: Request body
            idempotency_key: Optional key (auto-generated if not provided)

        Returns:
            Response from server (may be replayed)
        """
        key = idempotency_key or str(uuid.uuid4())

        response = await self.client.post(
            path,
            json=json,
            headers={"Idempotency-Key": key},
        )

        # Don't retry client errors (4xx)
        if 400 <= response.status_code < 500:
            response.raise_for_status()

        return response

    async def create_payment(self, amount: int, currency: str) -> dict:
        """Create payment with idempotency protection."""
        # Use deterministic key based on content
        key = hashlib.sha256(
            f"payment:{amount}:{currency}:{datetime.now().date()}".encode()
        ).hexdigest()

        response = await self.post_idempotent(
            "/api/payments",
            json={"amount": amount, "currency": currency},
            idempotency_key=key,
        )

        return response.json()


# Usage
async def main():
    client = IdempotentClient("https://api.example.com")

    # Safe to retry - same key prevents duplicate
    payment = await client.create_payment(amount=1000, currency="USD")

    # Check if it was a replay
    if payment.get("_replayed"):
        print("Payment was already processed")
    else:
        print(f"New payment created: {payment['id']}")

Cleanup Job for Expired Records

import asyncio
from datetime import UTC, datetime, timedelta


async def cleanup_expired_idempotency_records(
    db: AsyncSession,
    retention_days: int = 7,
    batch_size: int = 1000,
):
    """
    Delete expired idempotency records in batches.

    Run this as a scheduled job (e.g., daily).
    """
    cutoff = datetime.now(UTC) - timedelta(days=retention_days)
    total_deleted = 0

    while True:
        # Delete in batches to avoid long locks
        result = await db.execute(
            text("""
                DELETE FROM idempotency_records
                WHERE id IN (
                    SELECT id FROM idempotency_records
                    WHERE expires_at < :cutoff
                    LIMIT :batch_size
                )
            """),
            {"cutoff": cutoff, "batch_size": batch_size},
        )
        await db.commit()

        deleted = result.rowcount
        total_deleted += deleted

        if deleted < batch_size:
            break

        # Small delay to reduce database load
        await asyncio.sleep(0.1)

    return total_deleted


# Redis cleanup (handled by TTL, but can force cleanup)
async def cleanup_redis_idempotency_keys(redis_client, pattern: str = "idem:*"):
    """Scan and delete expired keys (if TTL not working)."""
    cursor = 0
    deleted = 0

    while True:
        cursor, keys = await redis_client.scan(cursor, match=pattern, count=100)

        for key in keys:
            ttl = await redis_client.ttl(key)
            if ttl == -1:  # No TTL set
                await redis_client.delete(key)
                deleted += 1

        if cursor == 0:
            break

    return deleted

Testing Idempotency

import pytest
from httpx import AsyncClient


@pytest.mark.asyncio
async def test_idempotent_request_returns_same_response():
    """Same idempotency key returns cached response."""
    async with AsyncClient(app=app, base_url="http://test") as client:
        idempotency_key = str(uuid.uuid4())

        # First request
        response1 = await client.post(
            "/api/orders",
            json={"product": "widget", "quantity": 1},
            headers={"Idempotency-Key": idempotency_key},
        )
        assert response1.status_code == 201
        order_id = response1.json()["order_id"]

        # Second request with same key
        response2 = await client.post(
            "/api/orders",
            json={"product": "widget", "quantity": 1},
            headers={"Idempotency-Key": idempotency_key},
        )
        assert response2.status_code == 201
        assert response2.json()["order_id"] == order_id
        assert response2.headers.get("Idempotent-Replayed") == "true"


@pytest.mark.asyncio
async def test_different_keys_process_independently():
    """Different idempotency keys process as separate requests."""
    async with AsyncClient(app=app, base_url="http://test") as client:
        response1 = await client.post(
            "/api/orders",
            json={"product": "widget", "quantity": 1},
            headers={"Idempotency-Key": str(uuid.uuid4())},
        )
        response2 = await client.post(
            "/api/orders",
            json={"product": "widget", "quantity": 1},
            headers={"Idempotency-Key": str(uuid.uuid4())},
        )

        assert response1.json()["order_id"] != response2.json()["order_id"]


@pytest.mark.asyncio
async def test_mismatched_body_rejected():
    """Reusing key with different body is rejected."""
    async with AsyncClient(app=app, base_url="http://test") as client:
        idempotency_key = str(uuid.uuid4())

        await client.post(
            "/api/orders",
            json={"product": "widget", "quantity": 1},
            headers={"Idempotency-Key": idempotency_key},
        )

        response = await client.post(
            "/api/orders",
            json={"product": "gadget", "quantity": 2},  # Different body!
            headers={"Idempotency-Key": idempotency_key},
        )

        assert response.status_code == 422
        assert "different request body" in response.json()["detail"]

Orchestkit Workflow Resilience

OrchestKit Workflow Resilience Integration

This example shows how to wire resilience patterns into the OrchestKit analysis pipeline.

Current Architecture

┌────────────────────────────────────────────────────────────────────┐
│                    OrchestKit Analysis Pipeline                     │
├────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Content ─▶ [Supervisor] ─▶ [Agent Fan-Out] ─▶ [Aggregate] ─▶ ... │
│                  │              │    │    │                         │
│                  ▼              ▼    ▼    ▼                         │
│              Agent Selection   A1   A2   A3   (Parallel Analysis)  │
│                                │    │    │                         │
│                                ▼    ▼    ▼                         │
│                              [findings, findings, findings]         │
│                                       │                             │
│                                       ▼                             │
│                              [Synthesize] ─▶ [Quality Gate]        │
│                                                                     │
└────────────────────────────────────────────────────────────────────┘

Resilience Layer Integration

┌────────────────────────────────────────────────────────────────────┐
│                    With Resilience Patterns                         │
├────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Content ─▶ [Rate Limiter] ─▶ [Circuit Breaker: LLM] ─▶ ...      │
│                                         │                           │
│                  ┌──────────────────────┼──────────────────────┐   │
│                  │                      ▼                      │   │
│                  │   ┌──────────────────────────────────────┐  │   │
│                  │   │         TIER 1: CRITICAL             │  │   │
│                  │   │  [Supervisor] [Synthesis] [QualityGate]│  │   │
│                  │   │   Bulkhead: 5 concurrent, 300s timeout │  │   │
│                  │   └──────────────────────────────────────┘  │   │
│                  │                                              │   │
│                  │   ┌──────────────────────────────────────┐  │   │
│                  │   │         TIER 2: STANDARD             │  │   │
│                  │   │  [Tech Comparator] [Impl Planner]     │  │   │
│                  │   │  [Security Auditor] [Learning Synth]  │  │   │
│                  │   │   Bulkhead: 3 concurrent, 120s timeout │  │   │
│                  │   └──────────────────────────────────────┘  │   │
│                  │                                              │   │
│                  │   ┌──────────────────────────────────────┐  │   │
│                  │   │         TIER 3: OPTIONAL             │  │   │
│                  │   │  [Enrichment] [Cache Warm]            │  │   │
│                  │   │   Bulkhead: 2 concurrent, 60s timeout  │  │   │
│                  │   └──────────────────────────────────────┘  │   │
│                  │                                              │   │
│                  └──────────────────────────────────────────────┘   │
│                                                                     │
│   Each agent call wrapped with:                                     │
│   @circuit_breaker(name="agent-{agent_type}")                      │
│   @bulkhead(tier=agent.tier)                                       │
│   @retry(max_attempts=2, base_delay=1.0)                           │
│                                                                     │
└────────────────────────────────────────────────────────────────────┘

Implementation

1. Circuit Breaker Registry

# backend/app/shared/resilience/circuit_breakers.py

from app.core.circuit_breaker import CircuitBreaker

# Per-service circuit breakers
circuit_breakers = {
    # LLM APIs
    "openai": CircuitBreaker(
        name="openai",
        failure_threshold=3,
        recovery_timeout=60.0,
    ),
    "anthropic": CircuitBreaker(
        name="anthropic",
        failure_threshold=3,
        recovery_timeout=60.0,
    ),

    # External APIs
    "youtube": CircuitBreaker(
        name="youtube",
        failure_threshold=5,
        recovery_timeout=120.0,
    ),
    "arxiv": CircuitBreaker(
        name="arxiv",
        failure_threshold=5,
        recovery_timeout=60.0,
    ),
    "github": CircuitBreaker(
        name="github",
        failure_threshold=5,
        recovery_timeout=60.0,
    ),

    # Internal services
    "embedding": CircuitBreaker(
        name="embedding",
        failure_threshold=3,
        recovery_timeout=30.0,
    ),
    "database": CircuitBreaker(
        name="database",
        failure_threshold=2,
        recovery_timeout=15.0,
    ),
}

def get_circuit_breaker(service: str) -> CircuitBreaker:
    """Get or create circuit breaker for service."""
    if service not in circuit_breakers:
        circuit_breakers[service] = CircuitBreaker(
            name=service,
            failure_threshold=5,
            recovery_timeout=30.0,
        )
    return circuit_breakers[service]

2. Bulkhead Registry

# backend/app/shared/resilience/bulkheads.py

from enum import Enum
from .bulkhead import Bulkhead, Tier, BulkheadRegistry

# Agent tier assignments
AGENT_TIERS = {
    # Tier 1: Critical
    "supervisor": Tier.CRITICAL,
    "synthesis": Tier.CRITICAL,
    "quality_gate": Tier.CRITICAL,

    # Tier 2: Standard
    "tech_comparator": Tier.STANDARD,
    "implementation_planner": Tier.STANDARD,
    "security_auditor": Tier.STANDARD,
    "learning_synthesizer": Tier.STANDARD,
    "codebase_analyzer": Tier.STANDARD,
    "prerequisite_mapper": Tier.STANDARD,
    "practical_applicator": Tier.STANDARD,
    "complexity_assessor": Tier.STANDARD,

    # Tier 3: Optional
    "enrichment": Tier.OPTIONAL,
    "cache_warm": Tier.OPTIONAL,
    "metrics": Tier.OPTIONAL,
}

# Create registry
bulkhead_registry = BulkheadRegistry()

# Register bulkheads for each tier
for agent_name, tier in AGENT_TIERS.items():
    bulkhead_registry.register(agent_name, tier)

def get_agent_bulkhead(agent_type: str) -> Bulkhead:
    """Get bulkhead for agent type."""
    tier = AGENT_TIERS.get(agent_type, Tier.STANDARD)
    return bulkhead_registry.get_or_create(agent_type, tier)

3. Resilient Agent Wrapper

# backend/app/shared/resilience/agent_wrapper.py

from functools import wraps
from typing import TypeVar, Callable, Awaitable
import structlog

from .circuit_breakers import get_circuit_breaker
from .bulkheads import get_agent_bulkhead
from .retry_handler import retry, MaxRetriesExceededError

logger = structlog.get_logger()
T = TypeVar("T")

def resilient_agent(
    agent_type: str,
    llm_service: str = "anthropic",
    max_retries: int = 2,
):
    """
    Decorator to wrap agent execution with resilience patterns.

    Applies (in order):
    1. Circuit breaker for LLM service
    2. Bulkhead for concurrency control
    3. Retry for transient failures

    Example:
        @resilient_agent("tech_comparator", llm_service="anthropic")
        async def run_tech_comparator(content: str) -> AgentOutput:
            ...
    """
    def decorator(fn: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        @wraps(fn)
        async def wrapper(*args, **kwargs) -> T:
            circuit = get_circuit_breaker(llm_service)
            bulkhead = get_agent_bulkhead(agent_type)

            # Track for observability
            logger.info(
                "agent_execution_start",
                agent_type=agent_type,
                circuit_state=circuit.state.value,
                bulkhead_active=bulkhead.stats.current_active,
            )

            async def execute():
                # Retry layer (innermost)
                @retry(max_attempts=max_retries, base_delay=1.0)
                async def with_retry():
                    return await fn(*args, **kwargs)

                return await with_retry()

            try:
                # Bulkhead layer
                async def with_bulkhead():
                    return await bulkhead.execute(execute)

                # Circuit breaker layer (outermost)
                result = await circuit.call(with_bulkhead)

                logger.info(
                    "agent_execution_success",
                    agent_type=agent_type,
                )

                return result

            except CircuitOpenError as e:
                logger.warning(
                    "agent_circuit_open",
                    agent_type=agent_type,
                    time_until_recovery=e.time_until_recovery,
                )
                raise

            except BulkheadFullError as e:
                logger.warning(
                    "agent_bulkhead_full",
                    agent_type=agent_type,
                    tier=e.tier.name,
                )
                raise

            except MaxRetriesExceededError as e:
                logger.error(
                    "agent_max_retries_exceeded",
                    agent_type=agent_type,
                    attempts=e.attempts,
                )
                raise

        return wrapper
    return decorator

4. Graph Builder Integration

# backend/app/domains/analysis/workflows/graph_builder.py

from app.shared.resilience.agent_wrapper import resilient_agent
from app.shared.resilience.circuit_breakers import circuit_breakers

async def build_analysis_graph() -> StateGraph:
    """Build the analysis workflow graph with resilience."""

    # Wrap each agent node with resilience
    @resilient_agent("supervisor", llm_service="anthropic")
    async def supervisor_node(state: AnalysisState) -> AnalysisState:
        # Existing supervisor logic
        ...

    @resilient_agent("tech_comparator", llm_service="anthropic")
    async def tech_comparator_node(state: AnalysisState) -> AnalysisState:
        # Existing agent logic
        ...

    # Build graph with wrapped nodes
    graph = StateGraph(AnalysisState)
    graph.add_node("supervisor", supervisor_node)
    graph.add_node("tech_comparator", tech_comparator_node)
    # ... add other nodes

    # Add health check endpoint
    @app.get("/health/resilience")
    async def resilience_health():
        return {
            "circuit_breakers": {
                name: cb.get_status()
                for name, cb in circuit_breakers.items()
            },
            "bulkheads": bulkhead_registry.get_all_status(),
        }

    return graph.compile()

5. LLM Fallback Chain Integration

# backend/app/shared/resilience/llm_chain.py

from app.shared.resilience.llm_fallback_chain import (
    LLMFallbackChain,
    LLMConfig,
)
from app.shared.services.cache.semantic_cache import SemanticCache

# Configure fallback chain for analysis
analysis_llm_chain = LLMFallbackChain(
    primary=AnthropicProvider(
        LLMConfig(
            name="primary",
            model="claude-sonnet-4-6",
            timeout=60.0,
            max_tokens=8192,
        )
    ),
    fallbacks=[
        OpenAIProvider(
            LLMConfig(
                name="fallback",
                model="gpt-5.2-mini",
                timeout=30.0,
                max_tokens=4096,
            )
        ),
    ],
    cache=SemanticCache(
        redis_client=redis_client,
        threshold=0.85,
    ),
    default_response=lambda p: json.dumps({
        "status": "degraded",
        "message": "Analysis temporarily unavailable",
        "partial_results": None,
    }),
)

6. Observability Integration

# backend/app/shared/resilience/observability.py

from langfuse import Langfuse

langfuse = Langfuse()

def on_circuit_state_change(old_state: str, new_state: str, name: str):
    """Record circuit state changes in Langfuse."""
    langfuse.event(
        name="circuit_breaker_state_change",
        metadata={
            "circuit_name": name,
            "old_state": old_state,
            "new_state": new_state,
        },
        level="WARNING" if new_state == "open" else "INFO",
    )

def on_bulkhead_rejection(name: str, tier: str):
    """Record bulkhead rejections."""
    langfuse.event(
        name="bulkhead_rejection",
        metadata={
            "bulkhead_name": name,
            "tier": tier,
        },
        level="WARNING",
    )

# Wire up callbacks
for name, cb in circuit_breakers.items():
    cb._on_state_change = on_circuit_state_change

for name, bh in bulkhead_registry._bulkheads.items():
    bh._on_rejection = on_bulkhead_rejection

Configuration

Environment Variables

# Circuit Breaker
CIRCUIT_FAILURE_THRESHOLD=5
CIRCUIT_RECOVERY_TIMEOUT=30

# Bulkhead Tier 1
BULKHEAD_TIER1_CONCURRENT=5
BULKHEAD_TIER1_QUEUE=10
BULKHEAD_TIER1_TIMEOUT=300

# Bulkhead Tier 2
BULKHEAD_TIER2_CONCURRENT=3
BULKHEAD_TIER2_QUEUE=5
BULKHEAD_TIER2_TIMEOUT=120

# Bulkhead Tier 3
BULKHEAD_TIER3_CONCURRENT=2
BULKHEAD_TIER3_QUEUE=3
BULKHEAD_TIER3_TIMEOUT=60

# Retry
RETRY_MAX_ATTEMPTS=3
RETRY_BASE_DELAY=1.0
RETRY_MAX_DELAY=30.0

Monitoring Dashboard

┌────────────────────────────────────────────────────────────────────┐
│                   OrchestKit Resilience Dashboard                   │
├────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   CIRCUIT BREAKERS                                                 │
│   ┌─────────────┬──────────┬────────────┬───────────────────┐     │
│   │ Service     │ State    │ Failures   │ Next Recovery     │     │
│   ├─────────────┼──────────┼────────────┼───────────────────┤     │
│   │ anthropic   │ ✅ CLOSED │ 0/5        │ -                 │     │
│   │ openai      │ ✅ CLOSED │ 1/5        │ -                 │     │
│   │ youtube     │ ⚠️ OPEN   │ 5/5        │ 45s               │     │
│   │ embedding   │ ✅ CLOSED │ 0/3        │ -                 │     │
│   └─────────────┴──────────┴────────────┴───────────────────┘     │
│                                                                     │
│   BULKHEADS                                                        │
│   ┌─────────────────┬────────┬─────────┬─────────┬──────────┐     │
│   │ Tier            │ Active │ Queued  │ Max     │ Rejected │     │
│   ├─────────────────┼────────┼─────────┼─────────┼──────────┤     │
│   │ 1: Critical     │ 3/5    │ 1/10    │ 5       │ 0        │     │
│   │ 2: Standard     │ 3/3 ⚠️ │ 4/5     │ 3       │ 12       │     │
│   │ 3: Optional     │ 1/2    │ 0/3     │ 2       │ 45       │     │
│   └─────────────────┴────────┴─────────┴─────────┴──────────┘     │
│                                                                     │
│   RETRY STATS (Last Hour)                                          │
│   • Total Attempts: 1,234                                          │
│   • Success Rate: 94.2%                                            │
│   • Retries Used: 187                                              │
│   • Max Retries Exceeded: 23                                       │
│                                                                     │
└────────────────────────────────────────────────────────────────────┘

Testing Resilience

# backend/tests/integration/test_resilience.py

import pytest
from unittest.mock import AsyncMock, patch

async def test_circuit_opens_after_failures():
    """Circuit should open after threshold failures."""
    breaker = get_circuit_breaker("test-service")

    # Simulate failures
    for _ in range(5):
        with pytest.raises(ConnectionError):
            await breaker.call(failing_function)

    # Next call should be rejected
    with pytest.raises(CircuitOpenError):
        await breaker.call(failing_function)

async def test_bulkhead_rejects_when_full():
    """Bulkhead should reject when queue is full."""
    bulkhead = Bulkhead("test", Tier.STANDARD, max_concurrent=1, queue_size=1)

    # Fill the bulkhead
    task1 = asyncio.create_task(bulkhead.execute(slow_function))
    task2 = asyncio.create_task(bulkhead.execute(slow_function))

    # Third should be rejected
    with pytest.raises(BulkheadFullError):
        await bulkhead.execute(slow_function)

async def test_fallback_chain_uses_fallback():
    """Chain should use fallback when primary fails."""
    chain = LLMFallbackChain(
        primary=FailingProvider(),
        fallbacks=[MockProvider()],
    )

    response = await chain.complete("test prompt")
    assert response.is_fallback
    assert response.source == ResponseSource.FALLBACK
Edit on GitHub

Last updated on

On this page

Distributed Systems PatternsQuick ReferenceQuick StartDistributed LocksResilienceIdempotencyRate LimitingEdge ComputingEvent-DrivenKey DecisionsWhen NOT to UseAnti-Patterns (FORBIDDEN)Detailed DocumentationRelated SkillsRules (16)Configure edge caching with CDN invalidation, TTL, and stale-while-revalidate strategies — HIGHEdge Caching & CDN PatternsDeploy edge workers with V8 isolate runtime constraints and fallback handling — HIGHEdge Workers & RuntimeGuarantee at-least-once delivery using transactional outbox and event messaging patterns — HIGHEvent Messaging & OutboxImplement event sourcing with CQRS for full audit trails and temporal queries — HIGHEvent Sourcing & CQRSStore idempotency keys in PostgreSQL with ACID guarantees for safe retries — HIGHDatabase-Backed IdempotencySchemaIdempotent ExecuteRequest Body ValidationTTL Cleanup JobFastAPI Endpoint UsageKey DecisionsDeduplicate requests using dual-layer Redis and database exactly-once processing — HIGHEvent Consumer DeduplicationDual-Layer Dedup PatternKafka Consumer IntegrationDatabase-Tracked Event ProcessingKey Design DecisionsCommon MistakesGenerate deterministic idempotency keys using Stripe-style headers for safe retries — HIGHIdempotency Key Generation & Stripe-Style HeaderDeterministic Key GenerationStripe-Style Idempotency HeaderFastAPI MiddlewareKey PrinciplesCommon MistakesValidate lock ownership with fencing tokens and TTL to prevent data corruption — CRITICALLock Safety: Fencing Tokens, TTL & HeartbeatOwner Validation (Fencing)TTL ManagementHeartbeat ExtensionLock Retry with Exponential BackoffLock Ordering (Deadlock Prevention)ChecklistUse PostgreSQL advisory locks for session and transaction-level distributed locking — CRITICALPostgreSQL Advisory LocksLock TypesSession-Level LockTransaction-Level LockLock ID StrategyPractical Example: Singleton JobMonitoringImplement distributed locking with Redis Lua scripts and multi-node Redlock algorithm — CRITICALRedis & Redlock Distributed LocksSingle-Node Redis Lock (Lua Script)Redlock Algorithm (Multi-Node HA)When to UseCommon MistakesProtect APIs with distributed rate limiting using SlowAPI, Redis, and tiered limits — HIGHDistributed Rate Limiting with SlowAPI and Tiered LimitsSlowAPI + Redis (FastAPI)User-Based Key FunctionTiered Rate LimitsResponse Headers (RFC 6585)429 Error ResponseAlgorithm SelectionKey DecisionsCommon MistakesImplement sliding window rate limiting with Redis sorted sets to prevent boundary spikes — HIGHSliding Window Rate LimitingProblem with Fixed WindowsRedis ImplementationAtomic Lua Script (Better for Production)When to UseBest PracticesImplement token bucket rate limiting with atomic Redis operations and burst capacity — HIGHToken Bucket Rate LimitingHow It WorksRedis Implementation (Atomic Lua Script)PropertiesWhen to Usevs Sliding WindowIsolate failures with bulkhead partitioning and tier-based resource capacity limits — CRITICALBulkhead PatternTier-Based ConfigurationImplementationRejection PoliciesGraceful Degradation by TierBest PracticesCommon MistakesPrevent cascade failures with circuit breaker thresholds and recovery probe patterns — CRITICALCircuit Breaker PatternState MachineConfigurationImplementationBest PracticesPattern CompositionPresets by Service TypeImplement exponential backoff with jitter to prevent thundering herd on retries — CRITICALRetry with Exponential BackoffBackoff FormulaError ClassificationRetry DecoratorRetry with Content Truncation (LLM)Retry BudgetPresetsCritical RulesReferences (10)Bulkhead PatternBulkhead PatternOverviewTypes of Bulkheads1. Thread Pool Isolation2. Semaphore Isolation3. Tier-Based Bulkheads (Recommended for Multi-Agent)Configuration for OrchestKitAgent Tier AssignmentRejection PoliciesImplementation Pattern (Python asyncio)Best Practices (2026)1. Size Based on Downstream Capacity2. Monitor Queue Depth3. Graceful Degradation by Tier4. Dynamic Tier AdjustmentAnti-Patterns1. Too Many Bulkheads2. Ignoring Rejection Handling3. No Correlation with Circuit BreakerMonitoring DashboardCircuit BreakerCircuit Breaker PatternOverviewStatesCLOSED (Normal Operation)OPEN (Failing Fast)HALF_OPEN (Recovery Probe)State MachineConfiguration ParametersBest Practices (2026)1. Use Sliding Windows, Not Fixed Counters2. Separate Health Checks from Circuit State3. Include Observability4. Provide Meaningful Fallbacks5. Per-Service BreakersAnti-Patterns1. Opening Too Quickly2. Recovery Timeout Too Short3. No FallbackIntegration with Other PatternsCircuit Breaker + RetryCircuit Breaker + BulkheadCircuit Breaker + TimeoutMonitoring QueriesPrometheusLangfuse (LLM-specific)Error ClassificationError ClassificationOverviewError Classification MatrixHTTP Status Code ClassificationRetryable (Transient)Non-Retryable (Permanent)LLM API Error ClassificationOpenAI ErrorsAnthropic ErrorsException Classification HelperUsage in Resilience PatternsWith RetryWith Circuit BreakerWith FallbackError Context EnrichmentBest PracticesLlm ResilienceLLM-Specific Resilience PatternsOverviewUnique LLM Failure ModesPattern 1: Fallback ChainImplementationRecommended Fallback ConfigurationsPattern 2: Token Budget ManagementImplementationPattern 3: Rate Limit ManagementPattern 4: Cost Control Circuit BreakerPattern 5: Quality-Aware FallbackBest Practices (2026)OrchestKit IntegrationPostgres Advisory LocksPostgreSQL Advisory LocksWhy PostgreSQL Advisory Locks?Lock TypesSession-Level LocksTransaction-Level LocksLock ID StrategiesPractical ExamplesSingleton JobTransactional UpdatePostgreSQL 18 Lock MonitoringRedis LocksRedis Distributed LocksSingle-Node Redis Lock (Lua Script)Usage ExamplesLock with Retry DecoratorRedlock AlgorithmRedlock Algorithm (Multi-Node Redis)Why Redlock?Algorithm OverviewImplementationUsageWhen to Use Redlock vs Single-NodeRetry StrategiesRetry StrategiesOverviewCore ConceptsExponential Backoff with JitterJitter StrategiesError ClassificationRetryable ErrorsNon-Retryable ErrorsImplementation PatternsBasic Retry DecoratorRetry with ModificationRetry BudgetBest Practices (2026)1. Set Appropriate Limits2. Always Use Jitter3. Different Strategies per Operation4. Log All Retries5. Combine with Circuit BreakerAnti-Patterns1. Retrying Non-Retryable Errors2. No Backoff3. Infinite RetriesLLM-Specific Retry StrategiesRate Limit HandlingContext Length HandlingMonitoringStripe PatternStripe-Style Idempotency PatternHow Stripe Does ItImplementationRequest FlowFastAPI ImplementationClient UsageKey PrinciplesToken Bucket AlgorithmToken Bucket AlgorithmHow Token Bucket WorksAlgorithm PropertiesRedis Lua Script (Atomic)Python ImplementationComparison: Token Bucket vs Sliding WindowWhen to Use Token BucketRelated FilesChecklists (5)Circuit Breaker SetupCircuit Breaker Setup GuideStep 1: Identify ServicesStep 2: Configure ThresholdsFailure ThresholdRecovery TimeoutSlow Call ThresholdStep 3: Implement Circuit BreakerBasic ImplementationWrap Service CallsStep 4: Add Fallback HandlingStep 5: Add ObservabilityLoggingMetricsHealth EndpointStep 6: Test Circuit BehaviorUnit TestsIntegration TestsStep 7: Document and MonitorDocumentationRunbook TemplateQuick ReferenceDistributed Locks ChecklistDistributed Locks ChecklistLock SelectionImplementationAcquireReleaseExtensionSafetyMutual ExclusionDeadlock PreventionSplit-Brain ProtectionError HandlingTestingMonitoringPostgreSQL Advisory LocksRedis LocksRedlock (Multi-Node)Production ReadinessIdempotency ChecklistIdempotency Implementation ChecklistKey GenerationAPI EndpointsStorageRace ConditionsResponse HandlingEvent ProcessingError CasesTestingDocumentationPre Deployment ResiliencePre-Deployment Resilience ChecklistCircuit BreakersBulkheadsRetry LogicLLM ResilienceIntegrationObservabilityDocumentationFinal VerificationRate Limiting ChecklistRate Limiting Implementation ChecklistPlanningImplementationBackend SetupRoute ProtectionResponse HeadersError HandlingTiered LimitsDistributed SystemsMonitoringSecurity ConsiderationsDocumentationTestingClient SDK RecommendationsRollout ChecklistExamples (3)Fastapi Rate LimitingFastAPI Rate Limiting ExamplesSlowAPI Setup (Recommended for Simple Cases)InstallationBasic ConfigurationRoute-Level LimitingUser-Based Rate LimitingTiered Rate LimitsCustom Redis Token BucketUsage in RoutesRate Limit by Endpoint CostTesting Rate LimitsRelated FilesIdempotency ExamplesIdempotency Implementation ExamplesFastAPI Idempotency MiddlewareDatabase-Backed IdempotencyEvent Consumer IdempotencyClient-Side Retry with IdempotencyCleanup Job for Expired RecordsTesting IdempotencyOrchestkit Workflow ResilienceOrchestKit Workflow Resilience IntegrationCurrent ArchitectureResilience Layer IntegrationImplementation1. Circuit Breaker Registry2. Bulkhead Registry3. Resilient Agent Wrapper4. Graph Builder Integration5. LLM Fallback Chain Integration6. Observability IntegrationConfigurationEnvironment VariablesMonitoring DashboardTesting Resilience