Distributed Systems
Distributed systems patterns for locking, resilience, idempotency, and rate limiting. Use when implementing distributed locks, circuit breakers, retry policies, idempotency keys, token bucket rate limiters, or fault tolerance patterns.
Primary Agent: backend-system-architect
Distributed Systems Patterns
Comprehensive patterns for building reliable distributed systems. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Distributed Locks | 3 | CRITICAL | Redis/Redlock locks, PostgreSQL advisory locks, fencing tokens |
| Resilience | 3 | CRITICAL | Circuit breakers, retry with backoff, bulkhead isolation |
| Idempotency | 3 | HIGH | Idempotency keys, request dedup, database-backed idempotency |
| Rate Limiting | 3 | HIGH | Token bucket, sliding window, distributed rate limits |
| Edge Computing | 2 | HIGH | Edge workers, V8 isolates, CDN caching, geo-routing |
| Event-Driven | 2 | HIGH | Event sourcing, CQRS, transactional outbox, sagas |
Total: 16 rules across 6 categories
Quick Start
# Redis distributed lock with Lua scripts
async with RedisLock(redis_client, "payment:order-123"):
await process_payment(order_id)
# Circuit breaker for external APIs
@circuit_breaker(failure_threshold=5, recovery_timeout=30)
@retry(max_attempts=3, base_delay=1.0)
async def call_external_api():
...
# Idempotent API endpoint
@router.post("/payments")
async def create_payment(
data: PaymentCreate,
idempotency_key: str = Header(..., alias="Idempotency-Key"),
):
return await idempotent_execute(db, idempotency_key, "/payments", process)
# Token bucket rate limiting
limiter = TokenBucketLimiter(redis_client, capacity=100, refill_rate=10)
if await limiter.is_allowed(f"user:{user_id}"):
await handle_request()Distributed Locks
Coordinate exclusive access to resources across multiple service instances.
| Rule | File | Key Pattern |
|---|---|---|
| Redis & Redlock | rules/locks-redis-redlock.md | Lua scripts, SET NX, multi-node quorum |
| PostgreSQL Advisory | rules/locks-postgres-advisory.md | Session/transaction locks, lock ID strategies |
| Fencing Tokens | rules/locks-fencing-tokens.md | Owner validation, TTL, heartbeat extension |
Resilience
Production-grade fault tolerance for distributed systems.
| Rule | File | Key Pattern |
|---|---|---|
| Circuit Breaker | rules/resilience-circuit-breaker.md | CLOSED/OPEN/HALF_OPEN states, sliding window |
| Retry & Backoff | rules/resilience-retry-backoff.md | Exponential backoff, jitter, error classification |
| Bulkhead Isolation | rules/resilience-bulkhead.md | Semaphore tiers, rejection policies, queue depth |
Idempotency
Ensure operations can be safely retried without unintended side effects.
| Rule | File | Key Pattern |
|---|---|---|
| Idempotency Keys | rules/idempotency-keys.md | Deterministic hashing, Stripe-style headers |
| Request Dedup | rules/idempotency-dedup.md | Event consumer dedup, Redis + DB dual layer |
| Database-Backed | rules/idempotency-database.md | Unique constraints, upsert, TTL cleanup |
Rate Limiting
Protect APIs with distributed rate limiting using Redis.
| Rule | File | Key Pattern |
|---|---|---|
| Token Bucket | rules/ratelimit-token-bucket.md | Redis Lua scripts, burst capacity, refill rate |
| Sliding Window | rules/ratelimit-sliding-window.md | Sorted sets, precise counting, no boundary spikes |
| Distributed Limits | rules/ratelimit-distributed.md | SlowAPI + Redis, tiered limits, response headers |
Edge Computing
Edge runtime patterns for Cloudflare Workers, Vercel Edge, and Deno Deploy.
| Rule | File | Key Pattern |
|---|---|---|
| Edge Workers | rules/edge-workers.md | V8 isolate constraints, Web APIs, geo-routing, auth at edge |
| Edge Caching | rules/edge-caching.md | Cache-aside at edge, CDN headers, KV storage, stale-while-revalidate |
Event-Driven
Event sourcing, CQRS, saga orchestration, and reliable messaging patterns.
| Rule | File | Key Pattern |
|---|---|---|
| Event Sourcing | rules/event-sourcing.md | Event-sourced aggregates, CQRS read models, optimistic concurrency |
| Event Messaging | rules/event-messaging.md | Transactional outbox, saga compensation, idempotent consumers |
Key Decisions
| Decision | Recommendation |
|---|---|
| Lock backend | Redis for speed, PostgreSQL if already using it, Redlock for HA |
| Lock TTL | 2-3x expected operation time |
| Circuit breaker recovery | Half-open probe with sliding window |
| Retry algorithm | Exponential backoff + full jitter |
| Bulkhead isolation | Semaphore-based tiers (Critical/Standard/Optional) |
| Idempotency storage | Redis (speed) + DB (durability), 24-72h TTL |
| Rate limit algorithm | Token bucket for most APIs, sliding window for strict quotas |
| Rate limit storage | Redis (distributed, atomic Lua scripts) |
When NOT to Use
No separate event-sourcing/saga/CQRS skills exist — they are rules within distributed-systems. But most projects never need them.
| Pattern | Interview | Hackathon | MVP | Growth | Enterprise | Simpler Alternative |
|---|---|---|---|---|---|---|
| Event sourcing | OVERKILL | OVERKILL | OVERKILL | OVERKILL | WHEN JUSTIFIED | Append-only table with status column |
| Saga orchestration | OVERKILL | OVERKILL | OVERKILL | SELECTIVE | APPROPRIATE | Sequential service calls with manual rollback |
| Circuit breaker | OVERKILL | OVERKILL | BORDERLINE | APPROPRIATE | REQUIRED | Try/except with timeout |
| Distributed locks | OVERKILL | OVERKILL | BORDERLINE | APPROPRIATE | REQUIRED | Database row-level lock (SELECT FOR UPDATE) |
| CQRS | OVERKILL | OVERKILL | OVERKILL | OVERKILL | WHEN JUSTIFIED | Single model for read/write |
| Transactional outbox | OVERKILL | OVERKILL | OVERKILL | SELECTIVE | APPROPRIATE | Direct publish after commit |
| Rate limiting | OVERKILL | OVERKILL | SIMPLE ONLY | APPROPRIATE | REQUIRED | Nginx rate limit or cloud WAF |
Rule of thumb: If you have a single server process, you do not need distributed systems patterns. Use in-process alternatives. Add distribution only when you actually have multiple instances.
Anti-Patterns (FORBIDDEN)
# LOCKS: Never forget TTL (causes deadlocks)
await redis.set(f"lock:{name}", "1") # WRONG - no expiry!
# LOCKS: Never release without owner check
await redis.delete(f"lock:{name}") # WRONG - might release others' lock
# RESILIENCE: Never retry non-retryable errors
@retry(max_attempts=5, retryable_exceptions={Exception}) # Retries 401!
# RESILIENCE: Never put retry outside circuit breaker
@retry # Would retry when circuit is open!
@circuit_breaker
async def call(): ...
# IDEMPOTENCY: Never use non-deterministic keys
key = str(uuid.uuid4()) # Different every time!
# IDEMPOTENCY: Never cache error responses
if response.status_code >= 400:
await cache_response(key, response) # Errors should retry!
# RATE LIMITING: Never use in-memory counters in distributed systems
request_counts = {} # Lost on restart, not shared across instancesDetailed Documentation
| Resource | Description |
|---|---|
| scripts/ | Templates: lock implementations, circuit breaker, rate limiter |
| checklists/ | Pre-flight checklists for each pattern category |
| references/ | Deep dives: Redlock algorithm, bulkhead tiers, token bucket |
| examples/ | Complete integration examples |
Related Skills
caching- Redis caching patterns, cache as fallbackbackground-jobs- Job deduplication, async processing with retryobservability-monitoring- Metrics and alerting for circuit breaker state changeserror-handling-rfc9457- Structured error responses for resilience failuresauth-patterns- API key management, authentication integration
Rules (16)
Configure edge caching with CDN invalidation, TTL, and stale-while-revalidate strategies — HIGH
Edge Caching & CDN Patterns
Cache responses at the edge using Cache API, KV storage, and CDN headers for sub-millisecond response times.
Incorrect — caching without TTL or invalidation:
// WRONG: Cache forever, no way to update
export default {
async fetch(request: Request) {
const cache = caches.default;
const cached = await cache.match(request);
if (cached) return cached; // Stale forever!
const response = await fetch(request);
await cache.put(request, response.clone()); // No expiry!
return response;
}
};Correct — cache-aside with TTL and stale-while-revalidate:
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
const cacheKey = new Request(url.toString(), request);
const cache = caches.default;
// Check edge cache
let response = await cache.match(cacheKey);
if (response) return response;
// Check KV (global, eventually consistent)
const kvData = await env.CACHE_KV.get(url.pathname, 'text');
if (kvData) {
response = new Response(kvData, {
headers: {
'Content-Type': 'application/json',
'Cache-Control': 'public, max-age=60, stale-while-revalidate=300',
'CDN-Cache-Control': 'max-age=3600', // CDN caches longer
},
});
// Populate edge cache
await cache.put(cacheKey, response.clone());
return response;
}
// Fetch from origin
response = await fetch(request);
const body = await response.text();
// Store in KV with TTL
await env.CACHE_KV.put(url.pathname, body, { expirationTtl: 3600 });
const cachedResponse = new Response(body, {
headers: {
'Content-Type': 'application/json',
'Cache-Control': 'public, max-age=60, stale-while-revalidate=300',
},
});
await cache.put(cacheKey, cachedResponse.clone());
return cachedResponse;
}
};Cache header strategy:
# Static assets (hashed filenames)
Cache-Control: public, max-age=31536000, immutable
# API responses (frequently changing)
Cache-Control: public, max-age=60, stale-while-revalidate=300
# Personalized content
Cache-Control: private, no-cache
Vary: Cookie, AuthorizationKey rules:
- Always set
Cache-Controlwithmax-ageandstale-while-revalidate - Use
CDN-Cache-Controlto set different TTLs for CDN vs browser - Use
Varyheader for content that changes by user/locale - Cloudflare KV is eventually consistent (read-after-write may be stale)
- Use purge APIs for immediate invalidation of critical content
- Never cache authenticated/personalized responses without
Varyorprivate
Deploy edge workers with V8 isolate runtime constraints and fallback handling — HIGH
Edge Workers & Runtime
Deploy code to Cloudflare Workers, Vercel Edge, or Deno Deploy with correct runtime constraints and platform-specific patterns.
Incorrect — using Node.js APIs at edge:
// WRONG: Node.js APIs not available in edge runtime
import fs from 'fs';
import { createHash } from 'crypto';
export default async function handler(req: Request) {
const data = fs.readFileSync('./config.json'); // FAILS at edge
const hash = createHash('sha256').update(data); // FAILS at edge
return new Response(hash.digest('hex'));
}Correct — using Web APIs at edge:
// Cloudflare Worker with Web Crypto API
export default {
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
const country = (request as any).cf?.country || 'US';
// Web Crypto API (available at edge)
const token = request.headers.get('Authorization')?.replace('Bearer ', '');
if (token) {
const key = await crypto.subtle.importKey(
'raw',
new TextEncoder().encode(SECRET),
{ name: 'HMAC', hash: 'SHA-256' },
false,
['verify']
);
}
// Geo-based routing
if (country === 'EU') {
return Response.redirect(`https://eu.example.com${url.pathname}`);
}
return fetch(request);
}
};Vercel Edge Middleware pattern:
// middleware.ts (Next.js Edge Middleware)
import { NextResponse } from 'next/server';
import type { NextRequest } from 'next/server';
export const config = { matcher: ['/dashboard/:path*'] };
export function middleware(request: NextRequest) {
const token = request.cookies.get('session');
if (!token) {
return NextResponse.redirect(new URL('/login', request.url));
}
// A/B testing via cookie
const bucket = request.cookies.get('ab-bucket')?.value || 'control';
return NextResponse.rewrite(new URL(`/${bucket}${request.nextUrl.pathname}`, request.url));
}Key rules:
- Edge runtimes use V8 isolates — no
fs,path,child_process, native modules - Available:
fetch,Request,Response,crypto.subtle,TextEncoder, streams - Keep bundles < 1MB compressed for fast cold starts (< 1ms on Cloudflare)
- Use KV (Cloudflare) or Edge Config (Vercel) for distributed state
- Use Durable Objects for strong consistency when needed
Guarantee at-least-once delivery using transactional outbox and event messaging patterns — HIGH
Event Messaging & Outbox
Reliable event publishing with transactional outbox, saga orchestration, and message queue patterns.
Incorrect — dual-write without outbox (data loss on failure):
# WRONG: If publish fails, DB has data but event is lost
async def create_order(order: Order):
await db.insert(order) # Step 1: succeeds
await message_broker.publish( # Step 2: might fail!
"order.created", order.dict()
)
# If publish fails: order exists but no event was sent
# Downstream services never know about the orderCorrect — transactional outbox pattern:
# Outbox table: events written atomically with business data
async def create_order(order: Order, db: AsyncSession):
async with db.begin():
# Both in same transaction — atomic!
db.add(order)
db.add(OutboxEvent(
aggregate_id=order.id,
event_type="order.created",
payload=order.dict(),
created_at=datetime.utcnow(),
))
# Outbox publisher (separate process, polls for unsent events)
async def publish_outbox_events(db: AsyncSession, broker: MessageBroker):
while True:
async with db.begin():
events = await db.execute(
select(OutboxEvent)
.where(OutboxEvent.published_at.is_(None))
.order_by(OutboxEvent.created_at)
.limit(100)
.with_for_update(skip_locked=True) # Concurrent workers safe
)
for event in events.scalars():
await broker.publish(event.event_type, event.payload)
event.published_at = datetime.utcnow()
await asyncio.sleep(1)Saga orchestration pattern:
class OrderSaga:
steps = [
SagaStep("reserve_inventory", compensate="release_inventory"),
SagaStep("charge_payment", compensate="refund_payment"),
SagaStep("ship_order", compensate="cancel_shipment"),
]
async def execute(self, order_id: str):
completed = []
for step in self.steps:
try:
await step.execute(order_id)
completed.append(step)
except Exception:
# Compensate in reverse order
for s in reversed(completed):
await s.compensate(order_id)
raise SagaFailed(f"Failed at {step.name}")Idempotent consumer (prevents duplicate processing):
async def handle_event(event: Event, db: AsyncSession):
async with db.begin():
# Check if already processed
exists = await db.execute(
select(ProcessedEvent).where(ProcessedEvent.event_id == event.id)
)
if exists.scalar():
return # Already processed, skip
# Process the event
await process_order(event.payload)
# Mark as processed
db.add(ProcessedEvent(event_id=event.id, processed_at=datetime.utcnow()))Key rules:
- Use transactional outbox to atomically save data and events
- All saga steps must have compensating actions
- Every message consumer must be idempotent (use event ID deduplication)
- Use
SKIP LOCKEDfor concurrent outbox workers - Kafka for high-throughput streaming, RabbitMQ for routing, Redis Streams for simplicity
- Use dead letter queues (DLQ) for messages that fail after max retries
Implement event sourcing with CQRS for full audit trails and temporal queries — HIGH
Event Sourcing & CQRS
Store state as immutable events and separate read/write models for scalable, auditable systems.
Incorrect — mutable state without event history:
# WRONG: Direct mutation loses history
class Account:
def __init__(self, balance: float = 0):
self.balance = balance
def deposit(self, amount: float):
self.balance += amount # No record of what happened!
def withdraw(self, amount: float):
self.balance -= amount # Can't audit, can't replayCorrect — event-sourced aggregate with CQRS:
from dataclasses import dataclass, field
from typing import Any
@dataclass
class DomainEvent:
aggregate_id: str
version: int
data: dict[str, Any]
class Account:
def __init__(self):
self._changes: list[DomainEvent] = []
self._version = 0
self.balance = 0.0
def deposit(self, amount: float):
if amount <= 0:
raise ValueError("Amount must be positive")
self._raise_event("MoneyDeposited", {"amount": amount})
def withdraw(self, amount: float):
if amount > self.balance:
raise ValueError("Insufficient funds")
self._raise_event("MoneyWithdrawn", {"amount": amount})
def _raise_event(self, event_type: str, data: dict):
event = DomainEvent(
aggregate_id=self.id,
version=self._version + 1,
data={"type": event_type, **data},
)
self._apply(event)
self._changes.append(event)
def _apply(self, event: DomainEvent):
match event.data["type"]:
case "MoneyDeposited":
self.balance += event.data["amount"]
case "MoneyWithdrawn":
self.balance -= event.data["amount"]
self._version = event.version
# Event store with optimistic concurrency
class EventStore:
async def save(self, aggregate_id: str, events: list[DomainEvent], expected_version: int):
async with self.db.transaction():
current = await self.db.fetchval(
"SELECT MAX(version) FROM events WHERE aggregate_id = $1",
aggregate_id,
)
if current != expected_version:
raise ConcurrencyError(f"Expected {expected_version}, got {current}")
for event in events:
await self.db.execute(
"INSERT INTO events (aggregate_id, version, data) VALUES ($1, $2, $3)",
aggregate_id, event.version, event.data,
)
# CQRS: Separate read model projection
class BalanceProjection:
async def handle(self, event: DomainEvent):
match event.data["type"]:
case "MoneyDeposited":
await self.db.execute(
"UPDATE account_balances SET balance = balance + $1 WHERE id = $2",
event.data["amount"], event.aggregate_id,
)
case "MoneyWithdrawn":
await self.db.execute(
"UPDATE account_balances SET balance = balance - $1 WHERE id = $2",
event.data["amount"], event.aggregate_id,
)Key rules:
- Events are immutable and named in past tense (
OrderPlaced, notPlaceOrder) - Use optimistic concurrency with version checks to prevent conflicts
- CQRS read models are eventually consistent — design UX accordingly
- Snapshot every N events (e.g., 100) to avoid replaying long event streams
- Never delete events — use compensating events instead
Store idempotency keys in PostgreSQL with ACID guarantees for safe retries — HIGH
Database-Backed Idempotency
Schema
CREATE TABLE idempotency_records (
idempotency_key VARCHAR(64) PRIMARY KEY,
endpoint VARCHAR(256) NOT NULL,
request_hash VARCHAR(64) NOT NULL,
response_body TEXT,
response_status INTEGER DEFAULT 200,
created_at TIMESTAMPTZ DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX ix_idempotency_expires ON idempotency_records (expires_at);
CREATE INDEX ix_idempotency_endpoint_key ON idempotency_records (endpoint, idempotency_key);Idempotent Execute
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from datetime import UTC, datetime, timedelta
import json
async def idempotent_execute(
db: AsyncSession,
idempotency_key: str,
endpoint: str,
operation,
ttl_hours: int = 24,
) -> tuple[Any, int, bool]:
"""Execute operation idempotently. Returns (response, status, was_replayed)."""
# Check for existing
existing = await db.get(ProcessedRequest, idempotency_key)
if existing and existing.expires_at > datetime.now(UTC):
return json.loads(existing.response_body), existing.status_code, True
# Execute operation
result, status_code = await operation()
# Store result (upsert to handle races)
stmt = insert(ProcessedRequest).values(
idempotency_key=idempotency_key,
endpoint=endpoint,
status_code=status_code,
response_body=json.dumps(result),
expires_at=datetime.now(UTC) + timedelta(hours=ttl_hours),
).on_conflict_do_nothing()
await db.execute(stmt)
return result, status_code, FalseRequest Body Validation
Detect misuse: same idempotency key with different request body.
class IdempotencyService:
def _hash_request(self, body: dict) -> str:
return hashlib.sha256(
json.dumps(body, sort_keys=True, default=str).encode()
).hexdigest()
async def check_idempotency(self, key: str, endpoint: str, body: dict):
row = await self.db.execute(
text("SELECT request_hash, response_body, response_status "
"FROM idempotency_records "
"WHERE idempotency_key = :key AND endpoint = :endpoint"),
{"key": key, "endpoint": endpoint},
)
existing = row.fetchone()
if not existing:
return None
if existing.request_hash != self._hash_request(body):
raise HTTPException(
status_code=422,
detail="Idempotency key reused with different request body",
)
return {"body": json.loads(existing.response_body),
"status": existing.response_status, "replayed": True}TTL Cleanup Job
async def cleanup_expired_records(db: AsyncSession, batch_size: int = 1000) -> int:
"""Delete expired idempotency records. Run daily via scheduler."""
total_deleted = 0
while True:
result = await db.execute(
text("""
DELETE FROM idempotency_records
WHERE idempotency_key IN (
SELECT idempotency_key FROM idempotency_records
WHERE expires_at < NOW()
LIMIT :batch_size
)
"""),
{"batch_size": batch_size},
)
await db.commit()
deleted = result.rowcount
total_deleted += deleted
if deleted < batch_size:
break
await asyncio.sleep(0.1) # Reduce DB load
return total_deletedFastAPI Endpoint Usage
@app.post("/api/orders", status_code=201)
async def create_order(
order: OrderCreate,
idempotency_key: str = Header(..., alias="Idempotency-Key"),
db: AsyncSession = Depends(get_db),
):
async def process():
new_order = Order(**order.model_dump())
db.add(new_order)
await db.commit()
return {"order_id": str(new_order.id)}, 201
response, status, replayed = await idempotent_execute(
db=db,
idempotency_key=idempotency_key,
endpoint="/api/orders",
operation=process,
)
return JSONResponse(
content=response, status_code=status,
headers={"Idempotent-Replayed": "true"} if replayed else {},
)Key Decisions
| Aspect | Recommendation | Rationale |
|---|---|---|
| Storage | PostgreSQL | ACID guarantees, no extra infra |
| Key format | SHA-256 hash, 32-64 chars | Deterministic, compact |
| TTL | 24-72 hours | Balance storage vs replay window |
| Race handling | ON CONFLICT DO NOTHING | First writer wins |
| Response caching | Status 2xx only | Don't cache errors |
| Cleanup | Batch delete, daily job | Avoid long locks |
Incorrect — Check-then-act pattern has race condition between check and insert:
const existing = await db.query("SELECT * FROM idempotency_records WHERE key = $1", [key]);
if (!existing) {
await processPayment(data); // Race! Two processes both see "not existing"
await db.query("INSERT INTO idempotency_records (key, result) VALUES ($1, $2)", [key, result]);
}Correct — Insert-first pattern atomically claims idempotency key:
try {
await db.query("INSERT INTO idempotency_records (key, status) VALUES ($1, 'processing')", [key]);
const result = await processPayment(data);
await db.query("UPDATE idempotency_records SET result = $1 WHERE key = $2", [result, key]);
} catch (UniqueViolationError) {
return await db.query("SELECT result FROM idempotency_records WHERE key = $1", [key]);
}Deduplicate requests using dual-layer Redis and database exactly-once processing — HIGH
Event Consumer Deduplication
Process events exactly once using dual-layer dedup: Redis (fast) + Database (durable).
Dual-Layer Dedup Pattern
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
class IdempotentConsumer:
"""Process events exactly once using idempotency keys."""
def __init__(self, db: AsyncSession, redis: redis.Redis):
self.db = db
self.redis = redis
async def process(self, event: dict, handler) -> tuple[Any, bool]:
"""Process event idempotently. Returns (result, was_duplicate)."""
idempotency_key = event.get("idempotency_key")
if not idempotency_key:
return await handler(event), False
# Fast path: check Redis cache
cache_key = f"processed:{idempotency_key}"
if await self.redis.exists(cache_key):
return None, True
# Slow path: check database
existing = await self.db.execute(
select(ProcessedEvent)
.where(ProcessedEvent.idempotency_key == idempotency_key)
)
if existing.scalar_one_or_none():
await self.redis.setex(cache_key, 86400, "1") # Backfill cache
return None, True
# Process with database lock to prevent races
try:
async with self.db.begin_nested():
self.db.add(ProcessedEvent(idempotency_key=idempotency_key))
await self.db.flush()
result = await handler(event)
await self.redis.setex(cache_key, 86400, "1")
return result, False
except IntegrityError:
return None, True # Another process claimed itKafka Consumer Integration
async def consume_orders(processor: IdempotentEventProcessor):
consumer = AIOKafkaConsumer("orders", bootstrap_servers="localhost:9092")
await consumer.start()
try:
async for msg in consumer:
event = json.loads(msg.value)
result, was_duplicate = await processor.process_event(
event_id=event["event_id"],
event_type="order.created",
handler=handle_order_created,
order_data=event["data"],
)
if was_duplicate:
logger.info(f"Skipped duplicate: {event['event_id']}")
finally:
await consumer.stop()Database-Tracked Event Processing
class IdempotentEventProcessor:
"""Track processed events in database for exactly-once semantics."""
async def process_event(self, event_id: str, event_type: str,
handler, *args, **kwargs) -> tuple:
async with self.pool.acquire() as conn:
async with conn.transaction():
try:
await conn.execute(
"INSERT INTO processed_events (event_id, event_type) VALUES ($1, $2)",
event_id, event_type,
)
except UniqueViolationError:
existing = await conn.fetchrow(
"SELECT result FROM processed_events WHERE event_id = $1",
event_id,
)
return existing["result"], True
result = await handler(*args, **kwargs)
await conn.execute(
"UPDATE processed_events SET result = $1 WHERE event_id = $2",
json.dumps(result) if result else None, event_id,
)
return result, FalseKey Design Decisions
| Aspect | Recommendation | Rationale |
|---|---|---|
| Fast layer | Redis with TTL | Sub-millisecond lookups for hot path |
| Durable layer | Database unique constraint | Survives Redis restart, handles races |
| Lock strategy | INSERT then process | Claim key before processing to prevent races |
| Cache backfill | On DB hit, write to Redis | Speeds up subsequent duplicate checks |
| TTL | 24-72 hours | Balance storage vs replay window |
Common Mistakes
# NEVER check-then-act without locking
async def bad_process(key):
if not await exists(key): # Race condition!
await process()
await mark_processed(key)
# CORRECT: Insert first (claims the key atomically)
async def good_process(key):
try:
await insert_processed(key) # Atomic claim
await process()
except IntegrityError:
pass # Already processedIncorrect — Redis-only deduplication loses state after restart:
async def process_event(event_id: str):
if await redis.exists(f"processed:{event_id}"):
return # Already processed
await handle_event(event_id)
await redis.set(f"processed:{event_id}", "1", ex=86400)
# Redis restart = all events reprocessed!Correct — Dual-layer dedup: Redis (fast) + database (durable):
async def process_event(event_id: str):
# Fast path: Redis check
if await redis.exists(f"processed:{event_id}"):
return
# Durable check: Database with unique constraint
try:
await db.execute("INSERT INTO processed_events (event_id) VALUES ($1)", event_id)
await handle_event(event_id)
await redis.set(f"processed:{event_id}", "1", ex=86400)
except UniqueViolationError:
pass # Already processedGenerate deterministic idempotency keys using Stripe-style headers for safe retries — HIGH
Idempotency Key Generation & Stripe-Style Header
Deterministic Key Generation
import hashlib
import json
from typing import Any
def generate_idempotency_key(
*,
entity_id: str,
action: str,
params: dict[str, Any] | None = None,
) -> str:
"""Generate deterministic idempotency key.
Same input always produces the same key.
"""
content = f"{entity_id}:{action}"
if params:
content += f":{json.dumps(params, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()[:32]
# Examples
key = generate_idempotency_key(
entity_id="order-123",
action="create",
params={"amount": 100, "currency": "USD"},
)Stripe-Style Idempotency Header
Clients send Idempotency-Key header with POST requests. Server caches successful responses and replays them on duplicate requests.
Client Request (Idempotency-Key: abc-123)
|
v
Check cache (Redis) --> Exists? --> Return cached response
| (Idempotent-Replayed: true)
NO
|
v
Acquire lock --> Process request --> Cache response --> ReturnFastAPI Middleware
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import redis.asyncio as redis
import json
class IdempotencyMiddleware(BaseHTTPMiddleware):
"""Handle Idempotency-Key header for POST/PUT/PATCH."""
def __init__(self, app, redis_client: redis.Redis, ttl: int = 86400):
super().__init__(app)
self.redis = redis_client
self.ttl = ttl
async def dispatch(self, request: Request, call_next):
if request.method not in ("POST", "PUT", "PATCH"):
return await call_next(request)
idempotency_key = request.headers.get("Idempotency-Key")
if not idempotency_key:
return await call_next(request)
cache_key = f"idem:{request.url.path}:{idempotency_key}"
# Check for cached response
cached = await self.redis.get(cache_key)
if cached:
data = json.loads(cached)
return Response(
content=data["body"],
status_code=data["status"],
media_type="application/json",
headers={"X-Idempotent-Replayed": "true"},
)
# Process request
response = await call_next(request)
# Cache successful responses only
if 200 <= response.status_code < 300:
body = b"".join([chunk async for chunk in response.body_iterator])
await self.redis.setex(
cache_key, self.ttl,
json.dumps({"body": body.decode(), "status": response.status_code}),
)
return Response(content=body, status_code=response.status_code,
media_type=response.media_type)
return responseKey Principles
- Keys are deterministic -- same input = same key (never use uuid4)
- Keys are scoped to endpoint -- same key on different endpoints = different operations
- 24-hour window -- keys expire after 24 hours
- Only cache success -- errors (4xx/5xx) allow retry
- Lock during processing -- prevents concurrent duplicates
Common Mistakes
# NEVER use non-deterministic keys
key = str(uuid.uuid4()) # Different every time!
# NEVER include timestamps in keys
key = f"{event.id}:{datetime.now()}" # Timestamp varies!
# NEVER skip idempotency for financial operations
@router.post("/payments")
async def create_payment(data):
return await process_payment(data) # No idempotency!Incorrect — Random UUID keys make retry detection impossible:
# Client generates new key on each retry
key = str(uuid.uuid4()) # New key every time!
await post("/api/orders", headers={"Idempotency-Key": key}, data=order)
# Retry creates duplicate orderCorrect — Deterministic keys ensure retries are detected and deduplicated:
# Client generates same key for same operation
key = hashlib.sha256(f"{order_id}:create:{json.dumps(order_data, sort_keys=True)}".encode()).hexdigest()
await post("/api/orders", headers={"Idempotency-Key": key}, data=order)
# Retry returns cached responseValidate lock ownership with fencing tokens and TTL to prevent data corruption — CRITICAL
Lock Safety: Fencing Tokens, TTL & Heartbeat
Owner Validation (Fencing)
Every lock operation MUST validate the owner before acting. Without this, a slow process whose lock expired can corrupt data when a new owner holds the lock.
# WRONG: No owner check
await redis.delete(f"lock:{name}") # Might release someone else's lock!
# CORRECT: Atomic owner check via Lua
RELEASE = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""TTL Management
Lock TTL must be set to prevent deadlocks from crashed processes. Rule of thumb: TTL = 2-3x expected operation duration.
| Operation Duration | Recommended TTL | Rationale |
|---|---|---|
| < 1 second | 5 seconds | Fast operations with margin |
| 1-10 seconds | 30 seconds | Standard processing |
| 10-60 seconds | 3 minutes | Long operations, use heartbeat |
| > 60 seconds | 5 minutes + heartbeat | Must extend during processing |
Heartbeat Extension
For long-running tasks, extend the lock TTL periodically to prevent expiry mid-operation.
import asyncio
from datetime import timedelta
async def long_running_task(task_id: str, redis_client):
lock = RedisLock(redis_client, f"task:{task_id}", ttl=timedelta(seconds=30))
async with lock:
# Background heartbeat extends lock every 10s
async def heartbeat():
while lock.is_acquired:
await lock.extend(timedelta(seconds=30))
await asyncio.sleep(10)
heartbeat_task = asyncio.create_task(heartbeat())
try:
await do_long_work()
finally:
heartbeat_task.cancel()Lock Retry with Exponential Backoff
from functools import wraps
def with_lock(lock_name: str, ttl_s: int = 30, retries: int = 3):
"""Decorator to acquire lock before function execution."""
def decorator(func):
@wraps(func)
async def wrapper(*args, redis_client=None, **kwargs):
for attempt in range(retries):
lock = RedisLock(redis_client, lock_name, ttl_ms=ttl_s * 1000)
if await lock.acquire():
try:
return await func(*args, **kwargs)
finally:
await lock.release()
await asyncio.sleep(0.1 * (2 ** attempt)) # Backoff
raise LockAcquisitionError(f"Failed after {retries} attempts")
return wrapper
return decoratorLock Ordering (Deadlock Prevention)
When acquiring multiple locks, always acquire in a consistent order.
async def transfer_funds(session, from_account: int, to_account: int, amount):
# Always lock in sorted order to prevent deadlocks
accounts = sorted([from_account, to_account])
for account_id in accounts:
await session.execute(
text("SELECT pg_advisory_xact_lock(:ns, :id)"),
{"ns": NAMESPACE_ACCOUNT, "id": account_id},
)
await debit_account(session, from_account, amount)
await credit_account(session, to_account, amount)
await session.commit()Checklist
- Owner ID stored with lock (UUIDv7 recommended)
- Atomic release validates owner via Lua script
- TTL always set (prevents permanent deadlocks)
- Heartbeat for operations > 30 seconds
- Lock ordering for multiple locks
- Retry with exponential backoff + jitter
- Metrics: acquisition time, hold duration, failures
Incorrect — Releasing lock without owner validation can delete another process's lock:
await redis.delete(f"lock:{resource_id}")
# If our lock expired and another process acquired it,
# we just deleted their lock!Correct — Atomic owner validation via Lua ensures only owner can release:
RELEASE_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
result = await redis.eval(RELEASE_SCRIPT, keys=[f"lock:{resource_id}"], args=[owner_id])
# Only deletes if owner_id matchesUse PostgreSQL advisory locks for session and transaction-level distributed locking — CRITICAL
PostgreSQL Advisory Locks
No extra infrastructure needed -- uses existing PostgreSQL with ACID guarantees.
Lock Types
| Type | Scope | Release | Use Case |
|---|---|---|---|
| Session-level | Connection | Explicit or disconnect | Long-running jobs, singletons |
| Transaction-level | Transaction | Commit/rollback | Data consistency within transactions |
Session-Level Lock
from contextlib import asynccontextmanager
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
class PostgresAdvisoryLock:
"""PostgreSQL advisory lock (session-level)."""
def __init__(self, session: AsyncSession, lock_id: int):
self._session = session
self._lock_id = lock_id
self._acquired = False
async def acquire(self, blocking: bool = True) -> bool:
if blocking:
await self._session.execute(
text("SELECT pg_advisory_lock(:lock_id)"),
{"lock_id": self._lock_id},
)
self._acquired = True
return True
else:
result = await self._session.execute(
text("SELECT pg_try_advisory_lock(:lock_id)"),
{"lock_id": self._lock_id},
)
self._acquired = result.scalar()
return self._acquired
async def release(self) -> bool:
if not self._acquired:
return False
result = await self._session.execute(
text("SELECT pg_advisory_unlock(:lock_id)"),
{"lock_id": self._lock_id},
)
released = result.scalar()
if released:
self._acquired = False
return released
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *_):
await self.release()Transaction-Level Lock
class PostgresTransactionLock:
"""Auto-released on commit/rollback."""
async def acquire(self, session: AsyncSession, lock_id: int, blocking: bool = True) -> bool:
if blocking:
await session.execute(
text("SELECT pg_advisory_xact_lock(:lock_id)"),
{"lock_id": lock_id},
)
return True
else:
result = await session.execute(
text("SELECT pg_try_advisory_xact_lock(:lock_id)"),
{"lock_id": lock_id},
)
return result.scalar()Lock ID Strategy
import hashlib
def string_to_lock_id(name: str) -> int:
"""Convert any string to a PostgreSQL bigint lock ID."""
hash_bytes = hashlib.md5(name.encode()).digest()[:8]
return int.from_bytes(hash_bytes, byteorder="big", signed=True)
# Usage
lock_id = string_to_lock_id("payment:order-123")
# Two-key lock for namespacing
NAMESPACE_PAYMENT = 1
await session.execute(
text("SELECT pg_advisory_lock(:ns, :id)"),
{"ns": NAMESPACE_PAYMENT, "id": 12345},
)Practical Example: Singleton Job
async def run_scheduled_job(session: AsyncSession):
lock_id = string_to_lock_id("daily-report-job")
lock = PostgresAdvisoryLock(session, lock_id)
if not await lock.acquire(blocking=False):
print("Job already running on another instance")
return
try:
await generate_daily_report()
finally:
await lock.release()Monitoring
SELECT l.pid, l.objid as lock_id, l.granted,
a.application_name, a.client_addr,
now() - a.state_change as duration
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.locktype = 'advisory';Incorrect — Session-level lock without cleanup causes deadlock after crash:
await session.execute(text("SELECT pg_advisory_lock(:id)"), {"id": lock_id})
await process_critical_section()
# Process crashes before unlock - lock held forever!Correct — Transaction-level lock auto-releases on commit/rollback:
async with session.begin():
await session.execute(text("SELECT pg_advisory_xact_lock(:id)"), {"id": lock_id})
await process_critical_section()
# Auto-released on commit/rollback/disconnectImplement distributed locking with Redis Lua scripts and multi-node Redlock algorithm — CRITICAL
Redis & Redlock Distributed Locks
Single-Node Redis Lock (Lua Script)
from uuid_utils import uuid7
import redis.asyncio as redis
class RedisLock:
"""Redis lock with Lua scripts for atomicity."""
ACQUIRE = """
if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then
return 1
end
return 0
"""
RELEASE = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
EXTEND = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('pexpire', KEYS[1], ARGV[2])
end
return 0
"""
def __init__(self, client: redis.Redis, name: str, ttl_ms: int = 30000):
self._client = client
self._name = f"lock:{name}"
self._owner = str(uuid7())
self._ttl = ttl_ms
async def acquire(self) -> bool:
return await self._client.eval(
self.ACQUIRE, 1, self._name, self._owner, self._ttl
) == 1
async def release(self) -> bool:
return await self._client.eval(
self.RELEASE, 1, self._name, self._owner
) == 1
async def extend(self, ttl_ms: int | None = None) -> bool:
return await self._client.eval(
self.EXTEND, 1, self._name, self._owner, ttl_ms or self._ttl
) == 1
async def __aenter__(self):
if not await self.acquire():
raise LockError(f"Failed to acquire {self._name}")
return self
async def __aexit__(self, *_):
await self.release()Redlock Algorithm (Multi-Node HA)
Provides fault-tolerant locking across N Redis instances (recommend N=5).
1. Get current time (T1)
2. Try to acquire lock on N nodes sequentially
3. Get current time (T2)
4. Lock valid if:
- Acquired on majority (N/2 + 1) nodes
- Elapsed (T2-T1) < TTL - clock_drift
5. If valid: use lock with remaining TTL
6. If invalid: release on all nodesfrom dataclasses import dataclass
from datetime import timedelta
import time
@dataclass
class RedlockResult:
acquired: bool
validity_time_ms: int = 0
resource: str = ""
owner_id: str = ""
class Redlock:
"""Distributed lock across multiple Redis instances."""
def __init__(self, clients: list[redis.Redis], ttl: timedelta = timedelta(seconds=30)):
if len(clients) < 3:
raise ValueError("Redlock requires at least 3 Redis instances")
self._clients = clients
self._ttl_ms = int(ttl.total_seconds() * 1000)
self._quorum = len(clients) // 2 + 1
async def acquire(self, resource: str, retry_count: int = 3) -> RedlockResult:
owner_id = str(uuid7())
for attempt in range(retry_count):
start = time.monotonic()
acquired_count = sum(
1 for c in self._clients
if await self._try_lock(c, resource, owner_id)
)
elapsed_ms = int((time.monotonic() - start) * 1000)
drift_ms = int(self._ttl_ms * 0.01) + 2
validity = self._ttl_ms - elapsed_ms - drift_ms
if acquired_count >= self._quorum and validity > 0:
return RedlockResult(True, validity, resource, owner_id)
await self._release_all(resource, owner_id)
return RedlockResult(acquired=False, resource=resource)When to Use
| Single-Node Redis Lock | Redlock |
|---|---|
| Development/testing | Production with HA |
| Non-critical operations | Critical operations (payments) |
| Single datacenter | Multi-datacenter |
| Cost-sensitive | Reliability-critical |
Common Mistakes
- Forgetting TTL (causes permanent deadlocks)
- Releasing without owner check (releases someone else's lock)
- Using single Redis for critical operations (SPOF)
- Holding locks across slow external calls without heartbeat
Incorrect — SET without NX allows multiple processes to "acquire" same lock:
await redis.set(f"lock:{name}", owner_id, ex=30)
# Two processes can both succeed!Correct — SET NX atomically acquires lock only if key doesn't exist:
acquired = await redis.set(f"lock:{name}", owner_id, nx=True, ex=30)
# Only one process succeeds, others get FalseProtect APIs with distributed rate limiting using SlowAPI, Redis, and tiered limits — HIGH
Distributed Rate Limiting with SlowAPI and Tiered Limits
SlowAPI + Redis (FastAPI)
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.middleware import SlowAPIMiddleware
limiter = Limiter(
key_func=get_remote_address,
storage_uri="redis://localhost:6379",
strategy="moving-window",
)
app = FastAPI()
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
# Endpoint limits
@router.post("/api/v1/auth/login")
@limiter.limit("10/minute") # Strict for auth
async def login(request: Request): ...
@router.get("/api/v1/analyses")
@limiter.limit("100/minute") # Normal for reads
async def list_analyses(request: Request): ...
@router.post("/api/v1/analyses")
@limiter.limit("20/minute") # Moderate for writes
async def create_analysis(request: Request): ...User-Based Key Function
def get_user_identifier(request: Request) -> str:
"""Rate limit by user ID if authenticated, else IP."""
if hasattr(request.state, "user"):
return f"user:{request.state.user.id}"
return f"ip:{get_remote_address(request)}"
limiter = Limiter(key_func=get_user_identifier)Tiered Rate Limits
from enum import Enum
class UserTier(Enum):
FREE = "free"
PRO = "pro"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {"requests": 100, "window": 3600},
UserTier.PRO: {"requests": 1000, "window": 3600},
UserTier.ENTERPRISE: {"requests": 10000, "window": 3600},
}
async def get_rate_limit(user: User) -> str:
limits = TIER_LIMITS[user.tier]
return f"{limits['requests']}/{limits['window']}seconds"
@router.get("/api/v1/data")
@limiter.limit(get_rate_limit)
async def get_data(request: Request, user: User = Depends(get_current_user)):
...Response Headers (RFC 6585)
async def add_rate_limit_headers(response: Response, limit: int,
remaining: int, reset_at: datetime):
response.headers["X-RateLimit-Limit"] = str(limit)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(int(reset_at.timestamp()))
response.headers["Retry-After"] = str(
int((reset_at - datetime.now(timezone.utc)).seconds)
)429 Error Response
def rate_limit_exceeded_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=429,
content={
"type": "https://api.example.com/errors/rate-limit-exceeded",
"title": "Too Many Requests",
"status": 429,
"detail": "Rate limit exceeded. Please retry after the reset time.",
"instance": str(request.url),
},
headers={"Retry-After": "60", "X-RateLimit-Remaining": "0"},
)Algorithm Selection
| Algorithm | Use Case | Burst Handling |
|---|---|---|
| Token Bucket | General API, allows bursts | Excellent |
| Sliding Window | Precise, no burst spikes | Good |
| Leaky Bucket | Steady rate, queue excess | None |
| Fixed Window | Simple, some edge issues | Moderate |
Key Decisions
| Decision | Recommendation |
|---|---|
| Storage | Redis (distributed, atomic) |
| Algorithm | Token bucket for most APIs |
| Key | User ID if auth, else IP + fingerprint |
| Auth endpoints | 10/min (strict) |
| Read endpoints | 100-1000/min (based on tier) |
| Write endpoints | 20-100/min (moderate) |
Common Mistakes
# NEVER use in-memory counters in distributed systems
request_counts = {} # Lost on restart, not shared!
# NEVER skip rate limiting on internal APIs
@router.get("/internal/admin")
async def admin_endpoint(): # No rate limit = vulnerable
...
# NEVER use fixed window without considering edge spikesIncorrect — In-memory counters don't work in distributed systems:
counters = {} # Per-instance state
@app.post("/api/orders")
async def create_order(user_id: str):
counters[user_id] = counters.get(user_id, 0) + 1
if counters[user_id] > 100:
raise RateLimitExceeded()
# Multiple instances = no shared stateCorrect — Redis provides atomic distributed rate limiting across all instances:
@app.post("/api/orders")
@limiter.limit("100/hour")
async def create_order(request: Request, user_id: str):
# Shared Redis state across all instances
...Implement sliding window rate limiting with Redis sorted sets to prevent boundary spikes — HIGH
Sliding Window Rate Limiting
Precise rate limiting that avoids fixed window boundary spikes.
Problem with Fixed Windows
A user can hit 100 at 0:59 and 100 at 1:01 = 200 requests in 2 seconds with a "100/minute" limit.
Sliding window solves this by tracking individual request timestamps.
Redis Implementation
import redis.asyncio as redis
from datetime import datetime, timezone
class SlidingWindowLimiter:
def __init__(self, redis_client: redis.Redis, window_seconds: int = 60):
self.redis = redis_client
self.window = window_seconds
async def is_allowed(self, key: str, limit: int) -> tuple[bool, int]:
"""Returns (allowed, remaining)."""
now = datetime.now(timezone.utc).timestamp()
window_start = now - self.window
bucket_key = f"ratelimit:sliding:{key}"
pipe = self.redis.pipeline()
pipe.zremrangebyscore(bucket_key, 0, window_start) # Remove old
pipe.zcard(bucket_key) # Count current
pipe.zadd(bucket_key, {str(now): now}) # Add this request
pipe.expire(bucket_key, self.window * 2) # Set expiry
results = await pipe.execute()
current_count = results[1]
if current_count < limit:
return True, limit - current_count - 1
return False, 0Atomic Lua Script (Better for Production)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
-- Count current entries
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now .. ':' .. math.random())
redis.call('EXPIRE', key, window)
return {1, limit - count - 1, 0}
else
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local retry_after = 0
if oldest[2] then
retry_after = math.ceil((tonumber(oldest[2]) + window * 1000 - now) / 1000)
end
return {0, 0, retry_after}
endWhen to Use
| Sliding Window | Token Bucket |
|---|---|
| Strict quotas (billing) | General API limits |
| No burst tolerance | Burst-friendly |
| Higher memory (O(n) timestamps) | O(1) memory |
| Exact counting | Approximate counting |
Best Practices
- Use Redis sorted sets with timestamps as scores
- Clean expired entries on every check (ZREMRANGEBYSCORE)
- Set EXPIRE on the key to auto-cleanup inactive users
- Use pipeline or Lua script for atomicity
- Consider memory: each request stores a member in the sorted set
Incorrect — Fixed window allows 200 requests in 2 seconds with "100/min" limit:
# Fixed window resets at minute boundaries
window_start = int(time.time() / 60) * 60
if get_count(f"{user}:{window_start}") < 100:
allow()
# User can send 100 at 0:59 and 100 at 1:01 = 200 in 2 seconds!Correct — Sliding window tracks individual timestamps for precise limiting:
now = time.time()
window_start = now - 60 # Last 60 seconds
await redis.zremrangebyscore(key, 0, window_start) # Remove old
count = await redis.zcard(key) # Count current
if count < 100:
await redis.zadd(key, {str(now): now}) # Add request
# Exactly 100 requests per rolling 60-second windowImplement token bucket rate limiting with atomic Redis operations and burst capacity — HIGH
Token Bucket Rate Limiting
Allows bursts up to bucket capacity while maintaining a steady average rate.
How It Works
Bucket: capacity=10, refill_rate=5/sec
t=0s: 10 tokens | 10 requests -> 0 tokens (burst allowed)
t=1s: +5 tokens | 5 tokens available
t=2s: +5 tokens | 10 tokens (capped at capacity)Redis Implementation (Atomic Lua Script)
import redis.asyncio as redis
from datetime import datetime, timezone
class TokenBucketLimiter:
SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local tokens_requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local current_tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
-- Calculate refill
local elapsed = (now - last_update) / 1000
current_tokens = math.min(capacity, current_tokens + elapsed * refill_rate)
-- Check and consume
local allowed = 0
local remaining = math.floor(current_tokens)
local retry_after = 0
if current_tokens >= tokens_requested then
allowed = 1
remaining = math.floor(current_tokens - tokens_requested)
current_tokens = current_tokens - tokens_requested
else
local needed = tokens_requested - current_tokens
retry_after = math.ceil(needed / refill_rate)
end
redis.call('HMSET', key, 'tokens', current_tokens, 'last_update', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) * 2)
return {allowed, remaining, retry_after}
"""
def __init__(self, redis_client: redis.Redis,
capacity: int = 100, refill_rate: float = 10.0):
self.redis = redis_client
self.capacity = capacity
self.refill_rate = refill_rate
async def is_allowed(self, key: str, tokens: int = 1) -> bool:
now = datetime.now(timezone.utc).timestamp() * 1000
result = await self.redis.eval(
self.SCRIPT, 1, f"ratelimit:token:{key}",
self.capacity, self.refill_rate, tokens, now,
)
return result[0] == 1Properties
| Property | Description |
|---|---|
| Burst Capacity | Allows short bursts up to bucket size |
| Smooth Limiting | Tokens refill continuously |
| O(1) Memory | Only stores tokens + timestamp per key |
| Distributed | Atomic via Redis Lua script |
When to Use
Good for: API rate limiting (natural bursts), user actions (login), resource protection
Not ideal for: Strict per-second quotas (use sliding window), billing limits, fair queuing (use leaky bucket)
vs Sliding Window
| Aspect | Token Bucket | Sliding Window |
|---|---|---|
| Burst Handling | Allows up to capacity | Spreads evenly |
| Memory | O(1) per key | O(n) timestamps |
| Precision | Approximate | Exact |
| Redis Operations | 1 HMSET | 1 ZADD + 1 ZREMRANGEBYSCORE |
Incorrect — Check-then-act pattern has race condition in token refill:
const bucket = await redis.get(`bucket:${user}`);
const tokens = JSON.parse(bucket).tokens + elapsed * refillRate;
if (tokens >= 1) {
// Race! Another request can pass this check too
await redis.set(`bucket:${user}`, JSON.stringify({tokens: tokens - 1}));
}Correct — Atomic Lua script ensures thread-safe token bucket operations:
const result = await redis.eval(TOKEN_BUCKET_SCRIPT,
keys: [`bucket:${user}`],
args: [capacity, refillRate, tokensRequested, Date.now()]
);
// Single atomic operation, no race conditionsIsolate failures with bulkhead partitioning and tier-based resource capacity limits — CRITICAL
Bulkhead Pattern
Isolates failures by partitioning resources into independent pools. One failing component does not bring down the entire system.
Tier-Based Configuration
| Tier | Workers | Queue | Timeout | Use Case |
|---|---|---|---|---|
| 1 (Critical) | 5 | 10 | 180-300s | Synthesis, quality gate, user-facing |
| 2 (Standard) | 8 | 12 | 120s | Analysis agents, data processing |
| 3 (Optional) | 4 | 6 | 60s | Enrichment, caching, analytics |
Implementation
from asyncio import Semaphore, wait_for, TimeoutError
from enum import Enum
class Tier(Enum):
CRITICAL = 1
STANDARD = 2
OPTIONAL = 3
class Bulkhead:
def __init__(self, tier: Tier, max_concurrent: int, queue_size: int, timeout: float):
self.tier = tier
self.semaphore = Semaphore(max_concurrent)
self.queue_size = queue_size
self.timeout = timeout
self.waiting = 0
self.active = 0
async def execute(self, fn):
if self.waiting >= self.queue_size:
raise BulkheadFullError(f"Tier {self.tier.name} queue full")
self.waiting += 1
try:
await wait_for(self.semaphore.acquire(), timeout=self.timeout)
self.waiting -= 1
self.active += 1
try:
return await wait_for(fn(), timeout=self.timeout)
finally:
self.active -= 1
self.semaphore.release()
except TimeoutError:
self.waiting -= 1
raise BulkheadTimeoutError(f"Tier {self.tier.name} timeout")Rejection Policies
class RejectionPolicy(Enum):
ABORT = "abort" # Return error immediately
CALLER_RUNS = "caller" # Execute in caller's context (blocking)
DISCARD = "discard" # Silently drop (for optional ops)
QUEUE = "queue" # Wait in bounded queue
TIER_POLICIES = {
Tier.CRITICAL: RejectionPolicy.QUEUE, # Wait for slot
Tier.STANDARD: RejectionPolicy.CALLER_RUNS, # Degrade caller
Tier.OPTIONAL: RejectionPolicy.DISCARD, # Skip if busy
}Graceful Degradation by Tier
async def run_analysis(content: str):
results = {}
# Tier 1: Must succeed
results["core"] = await tier1_bulkhead.execute(
lambda: analyze_core(content)
)
# Tier 2: Best effort
try:
results["enriched"] = await tier2_bulkhead.execute(
lambda: enrich_analysis(content)
)
except BulkheadFullError:
results["enriched"] = None # Skip enrichment
# Tier 3: Optional (silent failure)
try:
await tier3_bulkhead.execute(lambda: warm_cache(results))
except (BulkheadFullError, BulkheadTimeoutError):
pass
return resultsBest Practices
- Size based on downstream capacity -- if API allows 60 RPM, don't set 100 concurrent
- Monitor queue depth -- alert when consistently > 80% full
- Combine with circuit breaker -- slow calls trigger circuit, clearing bulkhead slots
- Use per-dependency bulkheads -- not per-endpoint (too granular)
- Return 503 with Retry-After -- when rejecting, don't return 500
Common Mistakes
- Too many bulkheads (per-endpoint instead of per-dependency)
- Ignoring rejection handling (BulkheadFullError becomes 500)
- No correlation with circuit breaker (slots stay blocked on slow service)
Incorrect — No isolation means slow downstream service blocks all operations:
async def fetch_data():
await slow_external_api() # Takes 30s when degraded
# All requests wait, entire system becomes slowCorrect — Bulkhead isolates slow service, protecting critical operations:
@bulkhead(tier=Tier.OPTIONAL, max_concurrent=3, timeout=5)
async def fetch_data():
await slow_external_api()
# Only 3 concurrent + waiting queue, rest get fast rejectionPrevent cascade failures with circuit breaker thresholds and recovery probe patterns — CRITICAL
Circuit Breaker Pattern
Prevents cascade failures by "tripping" when a downstream service exceeds failure thresholds.
State Machine
failures >= threshold
CLOSED --------------------------------> OPEN
^ |
| |
| probe succeeds timeout |
| expires |
| +-------------+ |
+---------+ HALF_OPEN |<-------------+
+-------------+
|
| probe fails
v
OPEN- CLOSED: All requests pass through, failures counted in sliding window
- OPEN: All requests rejected immediately, return fallback
- HALF_OPEN: Limited probe requests test recovery
Configuration
| Parameter | Recommended | Description |
|---|---|---|
failure_threshold | 5 | Failures before opening |
success_threshold | 2 | Successes in half-open to close |
recovery_timeout | 30s | Time before half-open transition |
sliding_window_size | 10 | Requests to consider for failure rate |
slow_call_threshold | 5-30s | Calls slower than this count as failures |
Implementation
from collections import deque
from enum import Enum
from time import time
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, name: str, failure_threshold: int = 5,
recovery_timeout: float = 30.0):
self.name = name
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time = None
self._threshold = failure_threshold
self._recovery_timeout = recovery_timeout
async def call(self, fn, *args, **kwargs):
if self._state == CircuitState.OPEN:
if self._should_attempt_recovery():
self._state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError(self.name)
try:
result = await fn(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= 2:
self._state = CircuitState.CLOSED
self._failure_count = 0
def _on_failure(self):
self._last_failure_time = time()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
else:
self._failure_count += 1
if self._failure_count >= self._threshold:
self._state = CircuitState.OPENBest Practices
- Use sliding windows, not fixed counters -- one success should not reset everything
- Per-service breakers -- never use a single global breaker
- Always provide fallbacks -- cached data, default response, or partial results
- Separate health from circuit state --
/healthalways returns 200 - Include observability -- every state change = metric + log + alert on OPEN
Pattern Composition
# Retry INSIDE circuit breaker
@circuit_breaker(failure_threshold=5)
@retry(max_attempts=3, backoff=exponential)
async def call_service():
...
# Bulkhead + Circuit breaker
@circuit_breaker(service="analysis")
@bulkhead(tier=Tier.STANDARD, max_concurrent=3)
async def analyze():
...Presets by Service Type
| Service | Threshold | Recovery | Slow Call |
|---|---|---|---|
| LLM API | 3 | 60s | 30s |
| External API | 5 | 30s | 10s |
| Database | 2-3 | 15s | 5s |
Incorrect — No circuit breaker causes cascade failure when downstream is down:
async def call_payment_api():
return await http.post("https://api.payment.com/charge")
# Keeps trying even when API is down, causing 30s timeouts on every request
# Entire service becomes unresponsiveCorrect — Circuit breaker trips on failures, returning fast failures and allowing recovery:
@circuit_breaker(failure_threshold=5, recovery_timeout=30)
async def call_payment_api():
return await http.post("https://api.payment.com/charge")
# After 5 failures, circuit opens and returns fallback immediatelyImplement exponential backoff with jitter to prevent thundering herd on retries — CRITICAL
Retry with Exponential Backoff
Backoff Formula
delay = min(base * 2^attempt, max_delay)
jitter = random(0, delay) # Full jitter (recommended)
sleep(jitter)| Attempt | Base Delay | With Full Jitter |
|---|---|---|
| 1 | 1s | 0.0s - 1.0s |
| 2 | 2s | 0.0s - 2.0s |
| 3 | 4s | 0.0s - 4.0s |
| 4 | 8s | 0.0s - 8.0s |
Full jitter prevents thundering herd when many clients retry simultaneously.
Error Classification
RETRYABLE_ERRORS = {
# HTTP Status Codes
408, 429, 500, 502, 503, 504,
# Python Exceptions
ConnectionError, TimeoutError, ConnectionResetError,
# LLM API Errors
"rate_limit_exceeded", "model_overloaded", "server_error",
}
NON_RETRYABLE_ERRORS = {
400, 401, 403, 404, 422,
"invalid_api_key", "content_policy_violation",
"invalid_request_error", "model_not_found",
}Retry Decorator
import asyncio
import random
from functools import wraps
def retry(max_attempts=3, base_delay=1.0, max_delay=60.0, jitter=True):
"""Async retry with exponential backoff."""
def decorator(fn):
@wraps(fn)
async def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return await fn(*args, **kwargs)
except Exception as e:
if not is_retryable(e) or attempt == max_attempts:
raise
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
if jitter:
delay = random.uniform(0, delay)
await asyncio.sleep(delay)
return wrapper
return decoratorRetry with Content Truncation (LLM)
async def retry_with_truncation(fn, content: str, max_attempts: int = 3):
"""Retry LLM call, truncating on context_length_exceeded."""
for attempt in range(1, max_attempts + 1):
try:
return await fn(content)
except ContextLengthExceededError:
if attempt == max_attempts:
raise
content = content[:int(len(content) * 0.75)]Retry Budget
Prevents retry storms by limiting total retries per time window.
class RetryBudget:
def __init__(self, budget_per_second: float = 10.0):
self.budget = budget_per_second
self.last_update = time.time()
def can_retry(self) -> bool:
self._replenish()
return self.budget >= 1.0
def use_retry(self):
if self.budget >= 1.0:
self.budget -= 1.0Presets
| Use Case | Max Attempts | Base Delay | Max Delay |
|---|---|---|---|
| User-facing API | 2 | 0.5s | 2s |
| Background job | 5 | 2.0s | 60s |
| LLM API call | 3 | 1.0s | 60s |
| Rate-limited API | 3 | 2.0s | 120s |
Critical Rules
- Always use jitter -- prevents thundering herd
- Classify errors -- never retry 401/403/404
- Bound retries -- max 3-5 attempts, never infinite
- Retry inside circuit breaker -- circuit only sees final result
- Use Retry-After header -- respect server's backoff request
- Log all retries -- include trace ID for correlation
Incorrect — Fixed delay without jitter causes thundering herd when service recovers:
for attempt in range(3):
try:
return await api_call()
except Exception:
await asyncio.sleep(1) # All clients retry at same time!
# 1000 clients = 1000 simultaneous retriesCorrect — Exponential backoff with jitter spreads retries over time:
for attempt in range(3):
try:
return await api_call()
except Exception:
delay = min(2 ** attempt, 60) # 1s, 2s, 4s...
await asyncio.sleep(random.uniform(0, delay)) # Jitter spreads loadReferences (10)
Bulkhead Pattern
Bulkhead Pattern
Overview
The bulkhead pattern isolates failures by partitioning system resources into independent pools. Named after ship bulkheads that prevent flooding from spreading, it ensures one failing component doesn't bring down the entire system.
Types of Bulkheads
1. Thread Pool Isolation
Dedicated thread pools per service/operation.
┌────────────────────────────────────────────────────────────┐
│ Thread Pool Bulkhead │
├────────────────────────────────────────────────────────────┤
│ │
│ Service A Pool (5 threads) Service B Pool (3 threads) │
│ ┌─┬─┬─┬─┬─┐ ┌─┬─┬─┐ │
│ │█│█│█│░│░│ │█│░│░│ │
│ └─┴─┴─┴─┴─┘ └─┴─┴─┘ │
│ │
│ If Service A hangs, only 5 threads blocked │
│ Service B continues with its own 3 threads │
│ │
└────────────────────────────────────────────────────────────┘2. Semaphore Isolation
Limits concurrent executions without dedicated threads.
┌────────────────────────────────────────────────────────────┐
│ Semaphore Bulkhead │
├────────────────────────────────────────────────────────────┤
│ │
│ Semaphore: permits=5, current=3 │
│ │
│ Request 1: acquire() → ✓ (permits=2) │
│ Request 2: acquire() → ✓ (permits=1) │
│ Request 3: acquire() → ✓ (permits=0) │
│ Request 4: acquire() → BLOCKED (queue) or REJECTED │
│ │
│ Request 1: release() → ✓ (permits=1) │
│ Request 4: acquire() → ✓ (permits=0) │
│ │
└────────────────────────────────────────────────────────────┘3. Tier-Based Bulkheads (Recommended for Multi-Agent)
Group operations by criticality.
┌────────────────────────────────────────────────────────────┐
│ Tier-Based Bulkheads │
├────────────────────────────────────────────────────────────┤
│ │
│ TIER 1: CRITICAL (50% resources) │
│ ├── Synthesis node │
│ ├── Quality gate │
│ └── User-facing responses │
│ │
│ TIER 2: STANDARD (35% resources) │
│ ├── Content analysis agents │
│ ├── Data processing │
│ └── API integrations │
│ │
│ TIER 3: OPTIONAL (15% resources) │
│ ├── Enrichment │
│ ├── Caching warmup │
│ └── Analytics │
│ │
│ When Tier 3 exhausted → operations queued/dropped │
│ Tier 1 & 2 continue unaffected │
│ │
└────────────────────────────────────────────────────────────┘Configuration for OrchestKit
Agent Tier Assignment
| Tier | Agents | Max Concurrent | Queue Size | Timeout |
|---|---|---|---|---|
| 1 (Critical) | synthesis, quality_gate, supervisor | 5 | 10 | 300s |
| 2 (Standard) | tech_comparator, implementation_planner, security_auditor, learning_synthesizer | 3 | 5 | 120s |
| 3 (Optional) | enrichment, cache_warming, metrics | 2 | 3 | 60s |
Rejection Policies
class RejectionPolicy(Enum):
ABORT = "abort" # Return error immediately
CALLER_RUNS = "caller" # Execute in caller's context (blocking)
DISCARD = "discard" # Silently drop (for optional ops)
QUEUE = "queue" # Wait in bounded queue
# Per-tier policies
TIER_POLICIES = {
1: RejectionPolicy.QUEUE, # Critical: wait for slot
2: RejectionPolicy.CALLER_RUNS, # Standard: degrade caller
3: RejectionPolicy.DISCARD, # Optional: skip if busy
}Implementation Pattern (Python asyncio)
from asyncio import Semaphore, wait_for, TimeoutError
from collections import defaultdict
from enum import Enum
from typing import TypeVar, Callable, Awaitable
T = TypeVar("T")
class Tier(Enum):
CRITICAL = 1
STANDARD = 2
OPTIONAL = 3
class Bulkhead:
def __init__(self, tier: Tier, max_concurrent: int, queue_size: int, timeout: float):
self.tier = tier
self.semaphore = Semaphore(max_concurrent)
self.queue_size = queue_size
self.timeout = timeout
self.waiting = 0
self.active = 0
async def execute(self, fn: Callable[[], Awaitable[T]]) -> T:
# Check queue
if self.waiting >= self.queue_size:
raise BulkheadFullError(f"Tier {self.tier.name} queue full")
self.waiting += 1
try:
# Acquire with timeout
await wait_for(self.semaphore.acquire(), timeout=self.timeout)
self.waiting -= 1
self.active += 1
try:
return await wait_for(fn(), timeout=self.timeout)
finally:
self.active -= 1
self.semaphore.release()
except TimeoutError:
self.waiting -= 1
raise BulkheadTimeoutError(f"Tier {self.tier.name} timeout")Best Practices (2026)
1. Size Based on Downstream Capacity
# BAD: Arbitrary numbers
bulkhead = Bulkhead(max_concurrent=100)
# GOOD: Based on downstream limits
# If OpenAI allows 60 RPM, don't have 100 concurrent
bulkhead = Bulkhead(max_concurrent=10) # 10 concurrent * 6s avg = 60 RPM2. Monitor Queue Depth
async def execute_with_metrics(self, fn):
# Metric: queue depth
metrics.gauge("bulkhead.queue_depth", self.waiting, tags={"tier": self.tier.name})
# Metric: active requests
metrics.gauge("bulkhead.active", self.active, tags={"tier": self.tier.name})
# Alert when queue consistently > 80% full
if self.waiting > self.queue_size * 0.8:
logger.warning(f"Bulkhead queue high", tier=self.tier.name, depth=self.waiting)
return await self.execute(fn)3. Graceful Degradation by Tier
async def run_analysis(content: str) -> Analysis:
results = {}
# Tier 1: Must succeed
results["core"] = await tier1_bulkhead.execute(
lambda: analyze_core(content)
)
# Tier 2: Best effort
try:
results["enriched"] = await tier2_bulkhead.execute(
lambda: enrich_analysis(content)
)
except BulkheadFullError:
results["enriched"] = None # Skip enrichment
# Tier 3: Optional
try:
await tier3_bulkhead.execute(
lambda: warm_cache(results)
)
except (BulkheadFullError, BulkheadTimeoutError):
pass # Don't even log
return Analysis(**results)4. Dynamic Tier Adjustment
class AdaptiveBulkhead:
"""Adjusts tier capacity based on system load."""
def adjust_for_load(self, cpu_percent: float, memory_percent: float):
if cpu_percent > 80 or memory_percent > 85:
# Reduce optional tier
self.tiers[Tier.OPTIONAL].max_concurrent = 1
self.tiers[Tier.STANDARD].max_concurrent = 2
elif cpu_percent < 50 and memory_percent < 60:
# Restore capacity
self.tiers[Tier.OPTIONAL].max_concurrent = 2
self.tiers[Tier.STANDARD].max_concurrent = 3Anti-Patterns
1. Too Many Bulkheads
# BAD: Bulkhead per endpoint
bulkheads = {
"/api/v1/users": Bulkhead(5),
"/api/v1/users/{id}": Bulkhead(5),
"/api/v1/users/{id}/profile": Bulkhead(5),
# ... 50 more
}
# Result: Complexity nightmare, no real isolation
# GOOD: Bulkhead per tier/dependency
bulkheads = {
"database_read": Bulkhead(10),
"database_write": Bulkhead(3),
"external_api": Bulkhead(5),
}2. Ignoring Rejection Handling
# BAD: Exception bubbles up as 500
@app.post("/analyze")
async def analyze(content: str):
return await bulkhead.execute(lambda: do_analysis(content))
# BulkheadFullError → 500 Internal Server Error
# GOOD: Proper error handling
@app.post("/analyze")
async def analyze(content: str):
try:
return await bulkhead.execute(lambda: do_analysis(content))
except BulkheadFullError:
raise HTTPException(
status_code=503,
detail="Service busy, please retry",
headers={"Retry-After": "30"}
)3. No Correlation with Circuit Breaker
# BAD: Bulkhead fills up, circuit never opens
# All slots blocked on slow service
# GOOD: Combine patterns
@circuit_breaker(failure_threshold=5)
@bulkhead(tier=Tier.STANDARD)
async def call_external_service():
...
# Slow calls → timeouts → circuit opens → bulkhead clearedMonitoring Dashboard
┌────────────────────────────────────────────────────────────┐
│ Bulkhead Status Dashboard │
├────────────────────────────────────────────────────────────┤
│ │
│ TIER 1: CRITICAL [████████░░] 8/10 active │
│ Queue: 2/10 [██░░░░░░░░] 2/10 queued │
│ Rejected (1h): 0 Timeouts (1h): 1 │
│ │
│ TIER 2: STANDARD [██████████] 3/3 active ⚠️ │
│ Queue: 5/5 FULL [██████████] 5/5 queued ⚠️ │
│ Rejected (1h): 23 Timeouts (1h): 5 │
│ │
│ TIER 3: OPTIONAL [█░░░░░░░░░] 1/2 active │
│ Queue: 0/3 [░░░░░░░░░░] 0/3 queued │
│ Rejected (1h): 156 Timeouts (1h): 0 │
│ │
└────────────────────────────────────────────────────────────┘Circuit Breaker
Circuit Breaker Pattern
Overview
The circuit breaker pattern prevents cascade failures by "tripping" when a downstream service exceeds failure thresholds. Named after electrical circuit breakers, it protects your system from repeated failures.
States
CLOSED (Normal Operation)
- All requests pass through
- Failures are counted within a sliding window
- Success resets failure count (or decrements in sliding window)
- Transitions to OPEN when failures >= threshold
OPEN (Failing Fast)
- All requests immediately rejected
- Returns fallback response or error
- No calls made to downstream service
- After
recovery_timeout, transitions to HALF_OPEN
HALF_OPEN (Recovery Probe)
- Limited requests allowed (probe requests)
- If probe succeeds → CLOSED
- If probe fails → OPEN (reset recovery timer)
State Machine
failures >= threshold
CLOSED ──────────────────────────────▶ OPEN
▲ │
│ │
│ probe succeeds timeout │
│ expires │
│ ┌─────────────┐ │
└─────────│ HALF_OPEN │◀────────────┘
└─────────────┘
│
│ probe fails
▼
OPENConfiguration Parameters
| Parameter | Recommended | Description |
|---|---|---|
failure_threshold | 5 | Failures before opening |
success_threshold | 2 | Successes in half-open to close |
recovery_timeout | 30s | Time before half-open transition |
sliding_window_size | 10 | Requests to consider for failure rate |
sliding_window_type | count-based | count-based or time-based (60s) |
slow_call_threshold | 5s | Calls slower than this count as failures |
slow_call_rate | 50% | Percentage of slow calls to trip |
Best Practices (2026)
1. Use Sliding Windows, Not Fixed Counters
# BAD: Fixed counter resets on success
if failures >= 5:
open_circuit()
if success:
failures = 0 # One success resets everything!
# GOOD: Sliding window with time decay
window = deque(maxlen=10)
window.append(("fail", time.time()))
failure_rate = sum(1 for r, _ in window if r == "fail") / len(window)
if failure_rate >= 0.5:
open_circuit()2. Separate Health Checks from Circuit State
# Health endpoint should NOT check circuit state
@app.get("/health")
async def health():
return {"status": "healthy"} # Always returns 200
# Readiness endpoint CAN check circuit state
@app.get("/ready")
async def ready():
if circuit.is_open:
return {"status": "degraded", "reason": "circuit_open"}, 503
return {"status": "ready"}3. Include Observability
def on_state_change(from_state: str, to_state: str, service: str):
# Metric
metrics.increment(f"circuit_breaker.{service}.state_change",
tags={"from": from_state, "to": to_state})
# Log
logger.warning(f"Circuit breaker state change",
service=service, from_state=from_state, to_state=to_state)
# Alert (only on OPEN)
if to_state == "OPEN":
alert_service.send(
severity="warning",
message=f"Circuit breaker opened for {service}",
runbook="https://docs.internal/runbooks/circuit-breaker"
)4. Provide Meaningful Fallbacks
async def get_analysis_with_fallback(content: str) -> Analysis:
try:
return await circuit_breaker.call(analyze_content, content)
except CircuitOpenError:
# Fallback 1: Cached result
cached = await cache.get(f"analysis:{hash(content)}")
if cached:
return Analysis.from_cache(cached, is_stale=True)
# Fallback 2: Simplified analysis
return Analysis(
status="degraded",
message="Full analysis unavailable, showing basic info",
basic_info=extract_basic_info(content)
)5. Per-Service Breakers
# BAD: Single breaker for all services
global_breaker = CircuitBreaker()
# GOOD: Per-service breakers
breakers = {
"openai": CircuitBreaker(failure_threshold=3, recovery_timeout=60),
"anthropic": CircuitBreaker(failure_threshold=5, recovery_timeout=30),
"youtube_api": CircuitBreaker(failure_threshold=10, recovery_timeout=120),
}Anti-Patterns
1. Opening Too Quickly
# BAD: Opens on first failure
CircuitBreaker(failure_threshold=1) # One blip = outage
# GOOD: Tolerates transient failures
CircuitBreaker(failure_threshold=5, sliding_window_size=10)2. Recovery Timeout Too Short
# BAD: Hammers failing service
CircuitBreaker(recovery_timeout=5) # Tries every 5 seconds
# GOOD: Gives service time to recover
CircuitBreaker(recovery_timeout=30) # 30 seconds minimum3. No Fallback
# BAD: Just throws error
async def call():
if circuit.is_open:
raise CircuitOpenError() # User sees error page
# GOOD: Graceful degradation
async def call():
if circuit.is_open:
return await fallback_handler() # User sees partial dataIntegration with Other Patterns
Circuit Breaker + Retry
# Retry INSIDE circuit breaker
@circuit_breaker
@retry(max_attempts=3, backoff=exponential)
async def call_service():
...
# Circuit only sees final result after retries exhaustedCircuit Breaker + Bulkhead
# Bulkhead limits concurrency, circuit limits failures
@circuit_breaker(service="analysis")
@bulkhead(tier=2, max_concurrent=3)
async def analyze():
...Circuit Breaker + Timeout
# Timeout INSIDE circuit breaker
@circuit_breaker
@timeout(seconds=30)
async def call_service():
...
# Timeout counts as failure toward circuit thresholdMonitoring Queries
Prometheus
# Circuit state changes per minute
rate(circuit_breaker_state_changes_total[5m])
# Percentage of time in OPEN state
avg_over_time(circuit_breaker_state{state="open"}[1h])
# Requests rejected due to open circuit
rate(circuit_breaker_rejected_total[5m])Langfuse (LLM-specific)
# Tag traces with circuit state
trace.update(metadata={
"circuit_state": circuit.state,
"circuit_failure_count": circuit.failure_count,
})Error Classification
Error Classification
Overview
Proper error classification is the foundation of resilience. Different errors require different handling strategies: retry, fallback, fail fast, or alert.
Error Classification Matrix
┌────────────────────────────────────────────────────────────────────┐
│ Error Classification Matrix │
├────────────────────────────────────────────────────────────────────┤
│ │
│ TRANSIENT PERMANENT │
│ ┌─────────────────────┬─────────────────────┐ │
│ │ │ │ │
│ EXTERNAL │ • Rate limits │ • Invalid API key │ │
│ (API/Net) │ • Timeouts │ • 403 Forbidden │ │
│ │ • 502/503/504 │ • 404 Not Found │ │
│ │ • Connection reset │ • 400 Bad Request │ │
│ │ │ │ │
│ │ ACTION: Retry │ ACTION: Fail Fast │ │
│ │ with backoff │ Log & Alert │ │
│ │ │ │ │
│ ├─────────────────────┼─────────────────────┤ │
│ │ │ │ │
│ INTERNAL │ • DB connection │ • Schema error │ │
│ (System) │ • Memory pressure │ • Logic bug │ │
│ │ • Lock contention │ • Missing config │ │
│ │ • Resource exhaust │ • Invalid state │ │
│ │ │ │ │
│ │ ACTION: Retry │ ACTION: Fail Fast │ │
│ │ Circuit breaker │ Fix code, restart │ │
│ │ │ │ │
│ └─────────────────────┴─────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────┘HTTP Status Code Classification
Retryable (Transient)
| Code | Name | Strategy |
|---|---|---|
| 408 | Request Timeout | Retry immediately |
| 429 | Too Many Requests | Retry with Retry-After header |
| 500 | Internal Server Error | Retry with backoff |
| 502 | Bad Gateway | Retry with backoff |
| 503 | Service Unavailable | Retry with Retry-After |
| 504 | Gateway Timeout | Retry with backoff |
Non-Retryable (Permanent)
| Code | Name | Strategy |
|---|---|---|
| 400 | Bad Request | Log, fix input, fail |
| 401 | Unauthorized | Refresh token or fail |
| 403 | Forbidden | Fail, alert |
| 404 | Not Found | Fail (resource doesn't exist) |
| 405 | Method Not Allowed | Fail, fix code |
| 409 | Conflict | May retry with merge logic |
| 422 | Unprocessable Entity | Fail, fix input |
LLM API Error Classification
OpenAI Errors
OPENAI_RETRYABLE = {
"rate_limit_exceeded", # Retry with backoff
"server_error", # Retry with backoff
"timeout", # Retry immediately
"overloaded", # Retry with longer backoff
}
OPENAI_NON_RETRYABLE = {
"invalid_api_key", # Fix config
"invalid_request_error", # Fix request
"context_length_exceeded", # Reduce input (special handling)
"content_policy_violation",# Change content
"insufficient_quota", # Add credits
"model_not_found", # Fix model name
}Anthropic Errors
ANTHROPIC_RETRYABLE = {
"overloaded_error", # Retry with backoff
"api_error", # Retry with backoff
"rate_limit_error", # Retry with Retry-After
}
ANTHROPIC_NON_RETRYABLE = {
"authentication_error", # Fix API key
"permission_error", # Check permissions
"invalid_request_error", # Fix request format
"not_found_error", # Fix resource reference
}Exception Classification Helper
from enum import Enum
from typing import Union, Type
import httpx
class ErrorCategory(Enum):
RETRYABLE = "retryable" # Transient, retry with backoff
NON_RETRYABLE = "non_retryable" # Permanent, fail fast
CIRCUIT_TRIP = "circuit_trip" # Count toward circuit breaker
ALERTABLE = "alertable" # Should trigger alert
DEGRADABLE = "degradable" # Can fall back to alternative
class ErrorClassifier:
"""Classify errors for resilience handling."""
RETRYABLE_STATUS_CODES = {408, 429, 500, 502, 503, 504}
NON_RETRYABLE_STATUS_CODES = {400, 401, 403, 404, 405, 422}
ALERTABLE_STATUS_CODES = {401, 403, 500}
RETRYABLE_EXCEPTIONS = {
ConnectionError,
TimeoutError,
ConnectionResetError,
ConnectionRefusedError,
BrokenPipeError,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
}
def classify(self, error: Exception) -> set[ErrorCategory]:
"""Return set of applicable error categories."""
categories = set()
# HTTP errors
if hasattr(error, "status_code"):
code = error.status_code
if code in self.RETRYABLE_STATUS_CODES:
categories.add(ErrorCategory.RETRYABLE)
if code in self.NON_RETRYABLE_STATUS_CODES:
categories.add(ErrorCategory.NON_RETRYABLE)
if code in self.ALERTABLE_STATUS_CODES:
categories.add(ErrorCategory.ALERTABLE)
if code >= 500:
categories.add(ErrorCategory.CIRCUIT_TRIP)
# Exception types
if type(error) in self.RETRYABLE_EXCEPTIONS:
categories.add(ErrorCategory.RETRYABLE)
categories.add(ErrorCategory.CIRCUIT_TRIP)
# LLM-specific errors
if hasattr(error, "code"):
categories.update(self._classify_llm_error(error.code))
# Default: non-retryable if nothing matched
if not categories:
categories.add(ErrorCategory.NON_RETRYABLE)
categories.add(ErrorCategory.ALERTABLE)
return categories
def _classify_llm_error(self, error_code: str) -> set[ErrorCategory]:
"""Classify LLM API error codes."""
categories = set()
retryable_codes = {
"rate_limit_exceeded", "server_error", "timeout",
"overloaded", "overloaded_error", "api_error",
}
if error_code in retryable_codes:
categories.add(ErrorCategory.RETRYABLE)
categories.add(ErrorCategory.CIRCUIT_TRIP)
else:
categories.add(ErrorCategory.NON_RETRYABLE)
if error_code in {"context_length_exceeded"}:
categories.add(ErrorCategory.DEGRADABLE)
return categories
def should_retry(self, error: Exception) -> bool:
"""Quick check if error should be retried."""
return ErrorCategory.RETRYABLE in self.classify(error)
def should_trip_circuit(self, error: Exception) -> bool:
"""Check if error should count toward circuit breaker."""
return ErrorCategory.CIRCUIT_TRIP in self.classify(error)
def should_alert(self, error: Exception) -> bool:
"""Check if error should trigger an alert."""
return ErrorCategory.ALERTABLE in self.classify(error)Usage in Resilience Patterns
With Retry
classifier = ErrorClassifier()
async def call_with_smart_retry(fn, *args, **kwargs):
for attempt in range(max_attempts):
try:
return await fn(*args, **kwargs)
except Exception as e:
categories = classifier.classify(e)
if ErrorCategory.NON_RETRYABLE in categories:
logger.error(f"Non-retryable error: {e}")
raise
if ErrorCategory.ALERTABLE in categories:
await alert_service.send(
severity="warning",
message=f"Retryable error in {fn.__name__}",
error=str(e),
)
if attempt < max_attempts - 1:
await asyncio.sleep(backoff(attempt))
raise MaxRetriesExceeded()With Circuit Breaker
class SmartCircuitBreaker:
def __init__(self, classifier: ErrorClassifier):
self.classifier = classifier
self.failure_count = 0
async def call(self, fn, *args, **kwargs):
try:
result = await fn(*args, **kwargs)
self.failure_count = 0
return result
except Exception as e:
if self.classifier.should_trip_circuit(e):
self.failure_count += 1
if self.failure_count >= self.threshold:
self.open_circuit()
raiseWith Fallback
async def call_with_fallback(primary_fn, fallback_fn, *args):
try:
return await primary_fn(*args)
except Exception as e:
categories = classifier.classify(e)
if ErrorCategory.DEGRADABLE in categories:
logger.info(f"Degrading to fallback: {e}")
return await fallback_fn(*args)
raiseError Context Enrichment
class EnrichedError(Exception):
"""Exception with classification and context."""
def __init__(
self,
original: Exception,
classifier: ErrorClassifier,
context: dict = None,
):
self.original = original
self.categories = classifier.classify(original)
self.context = context or {}
self.timestamp = datetime.now(UTC)
self.trace_id = get_current_trace_id()
super().__init__(str(original))
@property
def is_retryable(self) -> bool:
return ErrorCategory.RETRYABLE in self.categories
@property
def should_alert(self) -> bool:
return ErrorCategory.ALERTABLE in self.categories
def to_dict(self) -> dict:
return {
"error": str(self.original),
"type": type(self.original).__name__,
"categories": [c.value for c in self.categories],
"context": self.context,
"timestamp": self.timestamp.isoformat(),
"trace_id": self.trace_id,
}Best Practices
- Default to non-retryable: Unknown errors should fail fast
- Log all classifications: Helps tune classification rules
- Include context: Error classification without context is useless
- Review regularly: New error types emerge, update rules
- Test classification: Unit test your classification logic
Llm Resilience
LLM-Specific Resilience Patterns
Overview
LLM APIs have unique failure modes that require specialized resilience patterns. This guide covers fallback chains, token budget management, rate limiting, and cost optimization through resilience.
Unique LLM Failure Modes
┌────────────────────────────────────────────────────────────┐
│ LLM Failure Taxonomy │
├────────────────────────────────────────────────────────────┤
│ │
│ TRANSIENT (Retry) PERMANENT (Fail Fast) │
│ ───────────────── ────────────────────── │
│ • rate_limit_exceeded • invalid_api_key │
│ • model_overloaded • content_policy_violation │
│ • server_error • invalid_request_error │
│ • timeout • insufficient_quota │
│ • context_length_exceeded* • model_not_found │
│ │
│ * Can retry with truncation │
│ │
│ DEGRADABLE (Fallback) COSTLY (Budget Control) │
│ ───────────────────── ──────────────────────── │
│ • Primary model down • Large context = high cost │
│ • Quality below threshold • Streaming = token overhead │
│ • Latency too high • Retries multiply cost │
│ │
└────────────────────────────────────────────────────────────┘Pattern 1: Fallback Chain
┌────────────────────────────────────────────────────────────┐
│ LLM Fallback Chain │
├────────────────────────────────────────────────────────────┤
│ │
│ Request ─▶ [Primary Model] ──success──▶ Response │
│ │ │
│ fail (timeout, rate limit, error) │
│ ▼ │
│ [Fallback Model] ──success──▶ Response │
│ │ │
│ fail │
│ ▼ │
│ [Semantic Cache] ──hit──▶ Response │
│ │ │
│ miss │
│ ▼ │
│ [Default Response] ──▶ Graceful Degradation │
│ │
└────────────────────────────────────────────────────────────┘Implementation
from dataclasses import dataclass
from typing import Optional, List, Callable, Awaitable
@dataclass
class LLMConfig:
name: str
model: str
api_key: str
timeout: float = 30.0
max_tokens: int = 4096
temperature: float = 0.7
class FallbackChain:
def __init__(
self,
primary: LLMConfig,
fallbacks: List[LLMConfig],
cache: Optional[SemanticCache] = None,
default_response: Optional[Callable[[str], str]] = None,
):
self.primary = primary
self.fallbacks = fallbacks
self.cache = cache
self.default_response = default_response
async def complete(self, prompt: str, **kwargs) -> LLMResponse:
# Try primary
try:
return await self._call_model(self.primary, prompt, **kwargs)
except RetryableError as e:
logger.warning(f"Primary model failed: {e}")
# Try fallbacks
for fallback in self.fallbacks:
try:
response = await self._call_model(fallback, prompt, **kwargs)
response.is_fallback = True
return response
except RetryableError as e:
logger.warning(f"Fallback {fallback.name} failed: {e}")
# Try cache
if self.cache:
cached = await self.cache.get_similar(prompt, threshold=0.85)
if cached:
logger.info("Returning cached response")
return LLMResponse(
content=cached.content,
is_cached=True,
cache_similarity=cached.similarity,
)
# Default response
if self.default_response:
return LLMResponse(
content=self.default_response(prompt),
is_degraded=True,
)
raise AllModelsFailedError("All LLM options exhausted")Recommended Fallback Configurations
| Use Case | Primary | Fallback 1 | Fallback 2 | Notes |
|---|---|---|---|---|
| Analysis | Claude Sonnet | GPT-5.2-mini | Cache | Quality-first |
| Chat | GPT-5.2 | Claude Haiku | Default msg | Latency-first |
| Embedding | text-embedding-3-large | text-embedding-3-small | - | Dimension compat |
| Code Gen | Claude Sonnet | GPT-5.2 | - | Quality-first |
Pattern 2: Token Budget Management
┌────────────────────────────────────────────────────────────┐
│ Token Budget Guard │
├────────────────────────────────────────────────────────────┤
│ │
│ Context Window: 128K tokens │
│ ┌────────────────────────────────────────────────────┐ │
│ │████████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░│ │
│ └────────────────────────────────────────────────────┘ │
│ Used: 32K (25%) Available: 96K │
│ │
│ Budget Allocation: │
│ ├── System prompt: 2K (fixed) │
│ ├── Conversation: 20K (sliding window) │
│ ├── Retrieved docs: 8K (chunked) │
│ ├── Output reserve: 2K (for response) │
│ └── Safety margin: 5K (overflow buffer) │
│ │
│ When approaching limit: │
│ 1. Summarize conversation history (4:1 compression) │
│ 2. Reduce retrieved chunks │
│ 3. Truncate oldest messages │
│ 4. Fail with "context too large" error │
│ │
└────────────────────────────────────────────────────────────┘Implementation
import tiktoken
from dataclasses import dataclass
from typing import List
@dataclass
class BudgetAllocation:
system_prompt: int = 2000
conversation: int = 20000
retrieved_docs: int = 8000
output_reserve: int = 2000
safety_margin: int = 5000
@property
def total_budget(self) -> int:
return (
self.system_prompt +
self.conversation +
self.retrieved_docs +
self.output_reserve +
self.safety_margin
)
class TokenBudgetGuard:
def __init__(
self,
model: str,
context_limit: int,
allocation: BudgetAllocation = None,
):
self.encoding = tiktoken.encoding_for_model(model)
self.context_limit = context_limit
self.allocation = allocation or BudgetAllocation()
def count_tokens(self, text: str) -> int:
return len(self.encoding.encode(text))
def fit_to_budget(
self,
system_prompt: str,
messages: List[dict],
retrieved_docs: List[str],
) -> tuple[str, List[dict], List[str]]:
"""Fit content to token budget, compressing as needed."""
# Count fixed costs
system_tokens = self.count_tokens(system_prompt)
if system_tokens > self.allocation.system_prompt:
raise TokenBudgetError("System prompt exceeds budget")
# Fit messages with sliding window
fitted_messages = self._fit_messages(
messages,
self.allocation.conversation
)
# Fit retrieved docs
fitted_docs = self._fit_docs(
retrieved_docs,
self.allocation.retrieved_docs
)
return system_prompt, fitted_messages, fitted_docs
def _fit_messages(self, messages: List[dict], budget: int) -> List[dict]:
"""Keep most recent messages that fit in budget."""
fitted = []
used = 0
# Always keep system message if present
for msg in reversed(messages):
tokens = self.count_tokens(msg["content"])
if used + tokens <= budget:
fitted.insert(0, msg)
used += tokens
elif msg["role"] == "system":
# Summarize old messages
summary = self._summarize_old_messages(messages[:-len(fitted)])
fitted.insert(0, {"role": "system", "content": summary})
break
return fitted
def _fit_docs(self, docs: List[str], budget: int) -> List[str]:
"""Keep highest-scoring docs that fit in budget."""
fitted = []
used = 0
for doc in docs: # Assume already sorted by relevance
tokens = self.count_tokens(doc)
if used + tokens <= budget:
fitted.append(doc)
used += tokens
else:
break
return fittedPattern 3: Rate Limit Management
from asyncio import Semaphore, sleep
from collections import deque
from time import time
class RateLimiter:
"""Token bucket rate limiter for LLM APIs."""
def __init__(
self,
requests_per_minute: int = 60,
tokens_per_minute: int = 100000,
):
self.rpm_limit = requests_per_minute
self.tpm_limit = tokens_per_minute
self.request_times = deque(maxlen=rpm_limit)
self.token_counts = deque(maxlen=1000)
self.semaphore = Semaphore(rpm_limit)
async def acquire(self, estimated_tokens: int):
"""Wait until rate limit allows the request."""
async with self.semaphore:
now = time()
# Check RPM
while len(self.request_times) >= self.rpm_limit:
oldest = self.request_times[0]
wait_time = 60 - (now - oldest)
if wait_time > 0:
await sleep(wait_time)
now = time()
self.request_times.popleft()
# Check TPM
recent_tokens = sum(
t for t, ts in self.token_counts
if now - ts < 60
)
if recent_tokens + estimated_tokens > self.tpm_limit:
wait_time = 60 - (now - self.token_counts[0][1])
await sleep(wait_time)
# Record this request
self.request_times.append(now)
def record_usage(self, actual_tokens: int):
"""Record actual token usage after request completes."""
self.token_counts.append((actual_tokens, time()))Pattern 4: Cost Control Circuit Breaker
class CostCircuitBreaker:
"""Opens when LLM costs exceed budget."""
def __init__(
self,
hourly_budget: float = 10.0, # $10/hour
daily_budget: float = 100.0, # $100/day
alert_threshold: float = 0.8, # Alert at 80%
):
self.hourly_budget = hourly_budget
self.daily_budget = daily_budget
self.alert_threshold = alert_threshold
self.hourly_spend = 0.0
self.daily_spend = 0.0
self.last_hour_reset = time()
self.last_day_reset = time()
def record_cost(self, input_tokens: int, output_tokens: int, model: str):
"""Record cost and check budget."""
self._reset_if_needed()
cost = self._calculate_cost(input_tokens, output_tokens, model)
self.hourly_spend += cost
self.daily_spend += cost
# Alert at threshold
if self.hourly_spend > self.hourly_budget * self.alert_threshold:
logger.warning(
f"Hourly LLM budget at {self.hourly_spend/self.hourly_budget:.0%}"
)
# Trip circuit at limit
if self.hourly_spend >= self.hourly_budget:
raise CostBudgetExceeded("Hourly LLM budget exceeded")
if self.daily_spend >= self.daily_budget:
raise CostBudgetExceeded("Daily LLM budget exceeded")
def _calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
"""Calculate cost based on model pricing (Dec 2025)."""
PRICING = {
"claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
"gpt-5.2": {"input": 2.5, "output": 10.0},
"gpt-5.2-mini": {"input": 0.15, "output": 0.60},
"claude-haiku-4-5-20251101": {"input": 0.80, "output": 4.0},
}
prices = PRICING.get(model, {"input": 1.0, "output": 3.0})
return (
(input_tokens / 1_000_000) * prices["input"] +
(output_tokens / 1_000_000) * prices["output"]
)Pattern 5: Quality-Aware Fallback
class QualityAwareFallback:
"""Falls back when response quality is below threshold."""
def __init__(
self,
primary_chain: FallbackChain,
quality_evaluator: Callable[[str, str], float],
quality_threshold: float = 0.7,
max_retries: int = 2,
):
self.chain = primary_chain
self.evaluate = quality_evaluator
self.threshold = quality_threshold
self.max_retries = max_retries
async def complete(self, prompt: str, **kwargs) -> LLMResponse:
for attempt in range(self.max_retries + 1):
response = await self.chain.complete(prompt, **kwargs)
# Evaluate quality
quality_score = await self.evaluate(prompt, response.content)
if quality_score >= self.threshold:
response.quality_score = quality_score
return response
logger.warning(
f"Response quality {quality_score:.2f} below threshold",
attempt=attempt + 1,
)
# Try with different parameters on retry
if attempt < self.max_retries:
kwargs["temperature"] = max(0.3, kwargs.get("temperature", 0.7) - 0.2)
# Return best effort with warning
response.quality_warning = f"Below threshold: {quality_score:.2f}"
return responseBest Practices (2026)
- Always have a fallback: Even a cached or default response is better than an error
- Monitor costs per-request: Track token usage in traces (Langfuse)
- Use streaming for long responses: Better UX and partial results on failure
- Cache aggressively: Semantic cache with 0.85+ similarity saves 60-80% costs
- Set appropriate timeouts: 30s for completion, 5s for embeddings
- Log all fallback events: Critical for understanding system behavior
OrchestKit Integration
# Example integration for OrchestKit analysis pipeline
llm_chain = FallbackChain(
primary=LLMConfig(
name="primary",
model="claude-sonnet-4-6",
timeout=30.0,
),
fallbacks=[
LLMConfig(
name="fallback",
model="gpt-5.2-mini",
timeout=20.0,
),
],
cache=semantic_cache, # Redis-backed
default_response=lambda p: "Analysis temporarily unavailable",
)
budget_guard = TokenBudgetGuard(
model="claude-sonnet-4-6",
context_limit=200000,
allocation=BudgetAllocation(
system_prompt=3000,
conversation=10000,
retrieved_docs=15000, # RAG context
output_reserve=4000,
safety_margin=5000,
),
)
rate_limiter = RateLimiter(
requests_per_minute=50, # Leave headroom
tokens_per_minute=80000,
)Postgres Advisory Locks
PostgreSQL Advisory Locks
Why PostgreSQL Advisory Locks?
- No extra infrastructure - uses existing PostgreSQL
- ACID guarantees - integrated with transactions
- Two modes - session-level and transaction-level
- PostgreSQL 18 - enhanced performance and monitoring
Lock Types
| Type | Scope | Release | Use Case |
|---|---|---|---|
| Session-level | Connection | Explicit or disconnect | Long-running jobs |
| Transaction-level | Transaction | Commit/rollback | Data consistency |
Session-Level Locks
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
class PostgresAdvisoryLock:
"""PostgreSQL advisory lock (session-level).
Lock persists until explicitly released or connection closes.
Good for: background jobs, cron tasks, singleton processes.
"""
def __init__(self, session: AsyncSession, lock_id: int):
self._session = session
self._lock_id = lock_id
self._acquired = False
async def acquire(self, blocking: bool = True) -> bool:
"""Acquire advisory lock.
Args:
blocking: If True, wait for lock. If False, return immediately.
"""
if blocking:
# pg_advisory_lock blocks until acquired
await self._session.execute(
text("SELECT pg_advisory_lock(:lock_id)"),
{"lock_id": self._lock_id},
)
self._acquired = True
return True
else:
# pg_try_advisory_lock returns immediately
result = await self._session.execute(
text("SELECT pg_try_advisory_lock(:lock_id)"),
{"lock_id": self._lock_id},
)
self._acquired = result.scalar()
return self._acquired
async def release(self) -> bool:
"""Release advisory lock."""
if not self._acquired:
return False
result = await self._session.execute(
text("SELECT pg_advisory_unlock(:lock_id)"),
{"lock_id": self._lock_id},
)
released = result.scalar()
if released:
self._acquired = False
return released
@property
def is_acquired(self) -> bool:
return self._acquired
async def __aenter__(self) -> "PostgresAdvisoryLock":
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.release()
@asynccontextmanager
async def advisory_lock(
session: AsyncSession,
lock_id: int,
blocking: bool = True,
) -> AsyncGenerator[bool, None]:
"""Context manager for advisory locks."""
lock = PostgresAdvisoryLock(session, lock_id)
acquired = await lock.acquire(blocking=blocking)
try:
yield acquired
finally:
if acquired:
await lock.release()Transaction-Level Locks
class PostgresTransactionLock:
"""PostgreSQL advisory lock (transaction-level).
Lock automatically released on commit/rollback.
Good for: ensuring data consistency within a transaction.
"""
def __init__(self, session: AsyncSession, lock_id: int):
self._session = session
self._lock_id = lock_id
async def acquire(self, blocking: bool = True) -> bool:
"""Acquire transaction-scoped lock."""
if blocking:
await self._session.execute(
text("SELECT pg_advisory_xact_lock(:lock_id)"),
{"lock_id": self._lock_id},
)
return True
else:
result = await self._session.execute(
text("SELECT pg_try_advisory_xact_lock(:lock_id)"),
{"lock_id": self._lock_id},
)
return result.scalar()
# No release method - automatically released on transaction endLock ID Strategies
import hashlib
def string_to_lock_id(name: str) -> int:
"""Convert string to PostgreSQL lock ID (bigint).
PostgreSQL advisory locks use bigint IDs.
This converts any string to a consistent ID.
"""
# Use MD5 hash, take first 8 bytes as signed int64
hash_bytes = hashlib.md5(name.encode()).digest()[:8]
return int.from_bytes(hash_bytes, byteorder="big", signed=True)
def composite_lock_id(namespace: int, resource_id: int) -> tuple[int, int]:
"""Create two-key lock ID.
PostgreSQL supports advisory locks with two int4 keys.
Useful for namespacing locks.
"""
return (namespace, resource_id)
# Usage
NAMESPACE_PAYMENT = 1
NAMESPACE_INVENTORY = 2
# Single key lock
lock_id = string_to_lock_id("payment:order-123")
await session.execute(
text("SELECT pg_advisory_lock(:id)"),
{"id": lock_id},
)
# Two-key lock
await session.execute(
text("SELECT pg_advisory_lock(:ns, :id)"),
{"ns": NAMESPACE_PAYMENT, "id": 12345},
)Practical Examples
Singleton Job
async def run_scheduled_job(session: AsyncSession):
"""Ensure only one instance runs this job."""
lock_id = string_to_lock_id("daily-report-job")
async with advisory_lock(session, lock_id, blocking=False) as acquired:
if not acquired:
print("Job already running on another instance")
return
print("Running daily report...")
await generate_daily_report()
print("Daily report complete")Transactional Update
async def transfer_funds(
session: AsyncSession,
from_account: int,
to_account: int,
amount: Decimal,
):
"""Transfer funds with advisory lock for consistency."""
# Lock both accounts (always in same order to prevent deadlock)
accounts = sorted([from_account, to_account])
# Transaction-level locks
for account_id in accounts:
await session.execute(
text("SELECT pg_advisory_xact_lock(:ns, :id)"),
{"ns": NAMESPACE_ACCOUNT, "id": account_id},
)
# Perform transfer (locks auto-release on commit)
await debit_account(session, from_account, amount)
await credit_account(session, to_account, amount)
await session.commit()PostgreSQL 18 Lock Monitoring
-- View current advisory locks
SELECT
pid,
locktype,
classid,
objid,
mode,
granted
FROM pg_locks
WHERE locktype = 'advisory';
-- View locks with session info (PostgreSQL 18)
SELECT
l.pid,
l.objid as lock_id,
l.granted,
a.application_name,
a.client_addr,
a.state,
now() - a.state_change as duration
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.locktype = 'advisory';Redis Locks
Redis Distributed Locks
Single-Node Redis Lock (Lua Script)
import asyncio
from datetime import timedelta
from uuid import UUID
from uuid_utils import uuid7
import redis.asyncio as redis
class RedisLock:
"""Distributed lock using Redis with Lua scripts.
Features:
- Atomic acquire/release with Lua scripts
- Automatic expiration (prevents deadlocks)
- Owner validation (only owner can release)
- Extension support (heartbeat)
"""
# Lua script for atomic acquire
ACQUIRE_SCRIPT = """
if redis.call('exists', KEYS[1]) == 0 then
redis.call('hset', KEYS[1], 'owner', ARGV[1], 'count', 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
elseif redis.call('hget', KEYS[1], 'owner') == ARGV[1] then
redis.call('hincrby', KEYS[1], 'count', 1)
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end
return 0
"""
# Lua script for atomic release
RELEASE_SCRIPT = """
if redis.call('hget', KEYS[1], 'owner') == ARGV[1] then
local count = redis.call('hincrby', KEYS[1], 'count', -1)
if count <= 0 then
redis.call('del', KEYS[1])
return 1
end
return 1
end
return 0
"""
# Lua script for extending TTL
EXTEND_SCRIPT = """
if redis.call('hget', KEYS[1], 'owner') == ARGV[1] then
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end
return 0
"""
def __init__(
self,
client: redis.Redis,
name: str,
ttl: timedelta = timedelta(seconds=30),
):
self._client = client
self._name = f"lock:{name}"
self._ttl_ms = int(ttl.total_seconds() * 1000)
self._owner_id = str(uuid7())
self._acquired = False
async def acquire(self, timeout: timedelta | None = None) -> bool:
"""Acquire lock with optional timeout."""
deadline = (
asyncio.get_event_loop().time() + timeout.total_seconds()
if timeout
else None
)
while True:
result = await self._client.eval(
self.ACQUIRE_SCRIPT,
1,
self._name,
self._owner_id,
self._ttl_ms,
)
if result == 1:
self._acquired = True
return True
if deadline and asyncio.get_event_loop().time() >= deadline:
return False
# Exponential backoff
await asyncio.sleep(0.05)
async def release(self) -> bool:
"""Release lock (only if owner)."""
if not self._acquired:
return False
result = await self._client.eval(
self.RELEASE_SCRIPT,
1,
self._name,
self._owner_id,
)
if result == 1:
self._acquired = False
return True
return False
async def extend(self, ttl: timedelta | None = None) -> bool:
"""Extend lock TTL (heartbeat)."""
if not self._acquired:
return False
ttl_ms = int((ttl or timedelta(seconds=30)).total_seconds() * 1000)
result = await self._client.eval(
self.EXTEND_SCRIPT,
1,
self._name,
self._owner_id,
ttl_ms,
)
return result == 1
@property
def is_acquired(self) -> bool:
return self._acquired
async def __aenter__(self) -> "RedisLock":
acquired = await self.acquire(timeout=timedelta(seconds=10))
if not acquired:
raise LockAcquisitionError(f"Failed to acquire lock: {self._name}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.release()
class LockAcquisitionError(Exception):
"""Raised when lock cannot be acquired."""
passUsage Examples
# Basic usage with context manager
async def process_payment(order_id: UUID, redis_client: redis.Redis):
lock = RedisLock(redis_client, f"payment:{order_id}")
async with lock:
# Only one instance processes this order
order = await get_order(order_id)
await charge_payment(order)
await mark_order_paid(order_id)
# Manual acquire/release with timeout
async def try_process_batch(batch_id: str, redis_client: redis.Redis):
lock = RedisLock(redis_client, f"batch:{batch_id}", ttl=timedelta(minutes=5))
if await lock.acquire(timeout=timedelta(seconds=5)):
try:
await process_batch(batch_id)
finally:
await lock.release()
else:
print(f"Batch {batch_id} already being processed")
# Long-running task with heartbeat
async def long_running_task(task_id: str, redis_client: redis.Redis):
lock = RedisLock(redis_client, f"task:{task_id}", ttl=timedelta(seconds=30))
async with lock:
# Start heartbeat in background
async def heartbeat():
while lock.is_acquired:
await lock.extend(timedelta(seconds=30))
await asyncio.sleep(10)
heartbeat_task = asyncio.create_task(heartbeat())
try:
await do_long_work()
finally:
heartbeat_task.cancel()Lock with Retry Decorator
from functools import wraps
from typing import Callable, ParamSpec, TypeVar
P = ParamSpec("P")
R = TypeVar("R")
def with_lock(
lock_name: str,
ttl: timedelta = timedelta(seconds=30),
timeout: timedelta = timedelta(seconds=10),
retries: int = 3,
):
"""Decorator to acquire lock before function execution."""
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
redis_client = kwargs.get("redis") or args[0] # Adjust as needed
for attempt in range(retries):
lock = RedisLock(redis_client, lock_name, ttl=ttl)
if await lock.acquire(timeout=timeout):
try:
return await func(*args, **kwargs)
finally:
await lock.release()
await asyncio.sleep(0.1 * (2 ** attempt)) # Exponential backoff
raise LockAcquisitionError(
f"Failed to acquire lock after {retries} attempts"
)
return wrapper
return decorator
# Usage
@with_lock("payment-processor", ttl=timedelta(seconds=60))
async def process_payment(redis: redis.Redis, order_id: UUID):
...Redlock Algorithm
Redlock Algorithm (Multi-Node Redis)
Why Redlock?
Single Redis instance = single point of failure. Redlock provides:
- Distributed consensus across N Redis nodes
- Fault tolerance (works with N/2+1 nodes)
- Safety guarantees (mutual exclusion)
Algorithm Overview
┌─────────────────────────────────────────────────────────────┐
│ Redlock Algorithm │
├─────────────────────────────────────────────────────────────┤
│ 1. Get current time (T1) │
│ 2. Try to acquire lock on N nodes sequentially │
│ 3. Get current time (T2) │
│ 4. Calculate elapsed = T2 - T1 │
│ 5. Lock valid if: │
│ - Acquired on majority (N/2 + 1) nodes │
│ - Elapsed < TTL - clock_drift │
│ 6. If valid: use lock with remaining TTL │
│ 7. If invalid: release on all nodes │
└─────────────────────────────────────────────────────────────┘Implementation
import asyncio
from dataclasses import dataclass, field
from datetime import timedelta
import time
from uuid_utils import uuid7
import redis.asyncio as redis
@dataclass
class RedlockConfig:
"""Redlock configuration."""
ttl: timedelta = timedelta(seconds=30)
retry_count: int = 3
retry_delay: timedelta = timedelta(milliseconds=200)
clock_drift_factor: float = 0.01 # 1% of TTL
@dataclass
class RedlockResult:
"""Result of lock acquisition attempt."""
acquired: bool
validity_time_ms: int = 0
resource: str = ""
owner_id: str = ""
class Redlock:
"""Distributed lock across multiple Redis instances.
Implements the Redlock algorithm for fault-tolerant locking.
Requires N Redis instances (recommend N=5 for production).
"""
ACQUIRE_SCRIPT = """
if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then
return 1
end
return 0
"""
RELEASE_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
def __init__(
self,
clients: list[redis.Redis],
config: RedlockConfig | None = None,
):
if len(clients) < 3:
raise ValueError("Redlock requires at least 3 Redis instances")
self._clients = clients
self._config = config or RedlockConfig()
self._quorum = len(clients) // 2 + 1
async def acquire(self, resource: str) -> RedlockResult:
"""Acquire lock on resource across all nodes."""
owner_id = str(uuid7())
ttl_ms = int(self._config.ttl.total_seconds() * 1000)
for attempt in range(self._config.retry_count):
start_time = time.monotonic()
# Try to acquire on all nodes
acquired_count = 0
for client in self._clients:
try:
result = await asyncio.wait_for(
client.eval(
self.ACQUIRE_SCRIPT,
1,
f"lock:{resource}",
owner_id,
ttl_ms,
),
timeout=0.1, # Fast timeout per node
)
if result == 1:
acquired_count += 1
except (asyncio.TimeoutError, redis.RedisError):
continue
# Calculate elapsed time
elapsed_ms = int((time.monotonic() - start_time) * 1000)
# Calculate validity time with clock drift
drift_ms = int(ttl_ms * self._config.clock_drift_factor) + 2
validity_time_ms = ttl_ms - elapsed_ms - drift_ms
# Check if we acquired quorum with enough validity time
if acquired_count >= self._quorum and validity_time_ms > 0:
return RedlockResult(
acquired=True,
validity_time_ms=validity_time_ms,
resource=resource,
owner_id=owner_id,
)
# Failed - release on all nodes
await self._release_all(resource, owner_id)
# Retry with delay + jitter
if attempt < self._config.retry_count - 1:
delay = self._config.retry_delay.total_seconds()
jitter = delay * 0.1 * (0.5 - asyncio.get_event_loop().time() % 1)
await asyncio.sleep(delay + jitter)
return RedlockResult(acquired=False, resource=resource)
async def release(self, result: RedlockResult) -> bool:
"""Release lock on all nodes."""
if not result.acquired:
return False
return await self._release_all(result.resource, result.owner_id)
async def _release_all(self, resource: str, owner_id: str) -> bool:
"""Release lock on all Redis nodes."""
released_count = 0
for client in self._clients:
try:
result = await asyncio.wait_for(
client.eval(
self.RELEASE_SCRIPT,
1,
f"lock:{resource}",
owner_id,
),
timeout=0.1,
)
if result == 1:
released_count += 1
except (asyncio.TimeoutError, redis.RedisError):
continue
return released_count > 0
async def __aenter__(self) -> "RedlockContext":
return RedlockContext(self)
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
pass
class RedlockContext:
"""Context manager for Redlock."""
def __init__(self, redlock: Redlock):
self._redlock = redlock
self._result: RedlockResult | None = None
async def acquire(self, resource: str) -> RedlockResult:
self._result = await self._redlock.acquire(resource)
if not self._result.acquired:
raise LockAcquisitionError(f"Failed to acquire lock: {resource}")
return self._result
async def release(self) -> bool:
if self._result:
return await self._redlock.release(self._result)
return FalseUsage
# Setup with multiple Redis instances
redis_clients = [
redis.from_url("redis://redis1:6379"),
redis.from_url("redis://redis2:6379"),
redis.from_url("redis://redis3:6379"),
redis.from_url("redis://redis4:6379"),
redis.from_url("redis://redis5:6379"),
]
redlock = Redlock(redis_clients, RedlockConfig(
ttl=timedelta(seconds=30),
retry_count=3,
))
# Acquire and use lock
async def critical_operation(resource_id: str):
result = await redlock.acquire(resource_id)
if result.acquired:
try:
# Use lock for validity_time_ms
print(f"Lock valid for {result.validity_time_ms}ms")
await do_critical_work()
finally:
await redlock.release(result)
else:
print("Failed to acquire lock")When to Use Redlock vs Single-Node
| Single-Node Redis Lock | Redlock |
|---|---|
| Development/testing | Production with HA |
| Non-critical operations | Critical operations |
| Single datacenter | Multi-datacenter |
| Cost-sensitive | Reliability-critical |
| Simpler setup | Complex setup |
Retry Strategies
Retry Strategies
Overview
Retry strategies handle transient failures by automatically re-attempting operations. The key is knowing when to retry, how long to wait, and when to give up.
Core Concepts
Exponential Backoff with Jitter
┌────────────────────────────────────────────────────────────┐
│ Exponential Backoff + Full Jitter │
├────────────────────────────────────────────────────────────┤
│ │
│ Attempt Base Delay With Jitter (random 0-base) │
│ ─────── ────────── ───────────────────────── │
│ 1 1s 0.0s - 1.0s │
│ 2 2s 0.0s - 2.0s │
│ 3 4s 0.0s - 4.0s │
│ 4 8s 0.0s - 8.0s │
│ 5 16s 0.0s - 16.0s │
│ │
│ Formula: sleep = random(0, min(cap, base * 2^attempt)) │
│ │
│ Full jitter prevents thundering herd when many clients │
│ retry simultaneously after an outage. │
│ │
└────────────────────────────────────────────────────────────┘Jitter Strategies
| Strategy | Formula | Use Case |
|---|---|---|
| No jitter | base * 2^attempt | Testing only |
| Full jitter | random(0, base * 2^attempt) | Most common, best distribution |
| Equal jitter | (base * 2^attempt)/2 + random(0, (base * 2^attempt)/2) | When min delay needed |
| Decorrelated | random(base, prev_delay * 3) | Aggressive retry scenarios |
Error Classification
Retryable Errors
RETRYABLE_ERRORS = {
# HTTP Status Codes
408: "Request Timeout",
429: "Too Many Requests",
500: "Internal Server Error",
502: "Bad Gateway",
503: "Service Unavailable",
504: "Gateway Timeout",
# Python Exceptions
ConnectionError,
TimeoutError,
ConnectionResetError,
BrokenPipeError,
# LLM API Errors
"rate_limit_exceeded",
"model_overloaded",
"server_error",
"timeout",
"context_length_exceeded", # Retry with truncation
}
def is_retryable(error: Exception) -> bool:
# HTTP errors
if hasattr(error, "status_code"):
return error.status_code in RETRYABLE_ERRORS
# Exception types
if type(error) in RETRYABLE_ERRORS:
return True
# LLM API error codes
if hasattr(error, "code"):
return error.code in RETRYABLE_ERRORS
return FalseNon-Retryable Errors
NON_RETRYABLE_ERRORS = {
# HTTP Status Codes
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
422: "Unprocessable Entity",
# LLM API Errors
"invalid_api_key",
"invalid_request_error",
"content_policy_violation",
"model_not_found",
"insufficient_quota",
}Implementation Patterns
Basic Retry Decorator
import asyncio
import random
from functools import wraps
from typing import TypeVar, Callable, Awaitable, Set, Type
T = TypeVar("T")
def retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retryable_exceptions: Set[Type[Exception]] = None,
):
"""Async retry decorator with exponential backoff."""
retryable = retryable_exceptions or {Exception}
def decorator(fn: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
@wraps(fn)
async def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return await fn(*args, **kwargs)
except tuple(retryable) as e:
last_exception = e
if attempt == max_attempts:
raise
# Calculate delay
delay = min(base_delay * (exponential_base ** (attempt - 1)), max_delay)
# Apply jitter
if jitter:
delay = random.uniform(0, delay)
logger.warning(
f"Retry {attempt}/{max_attempts} after {delay:.2f}s",
error=str(e),
function=fn.__name__,
)
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decoratorRetry with Modification
async def retry_with_truncation(
fn: Callable[[str], Awaitable[T]],
content: str,
max_attempts: int = 3,
) -> T:
"""Retry LLM call, truncating content on context_length_exceeded."""
for attempt in range(1, max_attempts + 1):
try:
return await fn(content)
except ContextLengthExceededError:
if attempt == max_attempts:
raise
# Truncate content by 25% each retry
truncate_to = int(len(content) * 0.75)
content = content[:truncate_to]
logger.warning(
f"Truncating content to {truncate_to} chars",
attempt=attempt,
)Retry Budget
class RetryBudget:
"""Limits total retries across all calls to prevent retry storms."""
def __init__(
self,
budget_per_second: float = 10.0,
min_retries_per_second: float = 1.0,
):
self.budget = budget_per_second
self.min_budget = min_retries_per_second
self.last_update = time.time()
def can_retry(self) -> bool:
self._replenish()
return self.budget >= 1.0
def use_retry(self):
if self.budget >= 1.0:
self.budget -= 1.0
def _replenish(self):
now = time.time()
elapsed = now - self.last_update
self.budget = min(
self.budget + elapsed * self.min_budget,
10.0 # Max budget
)
self.last_update = nowBest Practices (2026)
1. Set Appropriate Limits
# BAD: Too many retries
@retry(max_attempts=10, base_delay=0.1) # 10 retries in ~3s = hammering
# GOOD: Reasonable limits
@retry(max_attempts=3, base_delay=1.0, max_delay=30.0)2. Always Use Jitter
# BAD: No jitter (thundering herd)
delay = base * 2 ** attempt # All clients retry at same time
# GOOD: Full jitter
delay = random.uniform(0, base * 2 ** attempt) # Spread retries3. Different Strategies per Operation
# Fast-fail for user-facing
@retry(max_attempts=2, base_delay=0.5, max_delay=2.0)
async def get_user_data():
...
# More patient for background jobs
@retry(max_attempts=5, base_delay=2.0, max_delay=60.0)
async def sync_data():
...4. Log All Retries
async def retry_with_logging(fn, *args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return await fn(*args, **kwargs)
except RetryableError as e:
logger.warning(
"Retry attempt",
attempt=attempt,
max_attempts=max_attempts,
error_type=type(e).__name__,
error_message=str(e),
function=fn.__name__,
# Include trace ID for correlation
trace_id=get_current_trace_id(),
)
await asyncio.sleep(calculate_delay(attempt))5. Combine with Circuit Breaker
# Retry INSIDE circuit breaker
# Circuit only counts final failures after retries exhausted
@circuit_breaker(failure_threshold=5)
@retry(max_attempts=3)
async def call_external_api():
...
# NOT the other way around:
# @retry # Would retry when circuit is open!
# @circuit_breakerAnti-Patterns
1. Retrying Non-Retryable Errors
# BAD: Retry everything
@retry(max_attempts=5, retryable_exceptions={Exception})
async def call_api():
... # Will retry 401 Unauthorized 5 times!
# GOOD: Specific exceptions
@retry(
max_attempts=3,
retryable_exceptions={
ConnectionError,
TimeoutError,
RateLimitError,
}
)2. No Backoff
# BAD: Fixed delay
for attempt in range(5):
try:
return await call()
except Exception:
await asyncio.sleep(1) # Same delay every time
# GOOD: Exponential backoff
for attempt in range(5):
try:
return await call()
except Exception:
await asyncio.sleep(2 ** attempt) # 1, 2, 4, 8, 163. Infinite Retries
# BAD: Never gives up
while True:
try:
return await call()
except Exception:
await asyncio.sleep(1)
# GOOD: Bounded retries
for attempt in range(max_attempts):
...
raise MaxRetriesExceeded()LLM-Specific Retry Strategies
Rate Limit Handling
async def call_llm_with_rate_limit_handling(prompt: str) -> str:
for attempt in range(3):
try:
return await llm.complete(prompt)
except RateLimitError as e:
# Use retry-after header if provided
retry_after = e.headers.get("retry-after", 60)
logger.warning(f"Rate limited, waiting {retry_after}s")
await asyncio.sleep(int(retry_after))
raise MaxRetriesExceeded("Rate limit persists after retries")Context Length Handling
async def call_with_context_management(prompt: str, max_tokens: int = 4096) -> str:
for attempt in range(3):
try:
return await llm.complete(prompt, max_tokens=max_tokens)
except ContextLengthExceededError:
# Reduce by 25% each attempt
prompt = truncate_prompt(prompt, ratio=0.75 ** attempt)
logger.warning(f"Truncated prompt to {len(prompt)} chars")
raise ContextLengthExceededError("Cannot fit in context after truncation")Monitoring
# Retry rate
rate(retries_total[5m])
# Retry success rate (retries that eventually succeed)
sum(rate(retry_success_total[5m])) / sum(rate(retries_total[5m]))
# Average attempts before success
histogram_quantile(0.95, retry_attempts_bucket)
# Retry budget utilization
retry_budget_used / retry_budget_totalStripe Pattern
Stripe-Style Idempotency Pattern
Stripe's idempotency implementation is considered the gold standard. Here's how to replicate it.
How Stripe Does It
- Client sends
Idempotency-Keyheader with POST request - Server checks if key was seen before
- If yes: return cached response
- If no: process request, cache response, return
Implementation
Request Flow
Client Request
│
▼
┌────────────────┐
│ Check cache │
│ (Redis) │
└───────┬────────┘
│
┌────┴────┐
│ Exists? │
└────┬────┘
│
┌───┴───┐
YES NO
│ │
▼ ▼
┌────────┐ ┌────────────┐
│ Return │ │ Lock key │
│ cached │ │ (Redis) │
└────────┘ └─────┬──────┘
│
▼
┌────────────┐
│ Process │
│ request │
└─────┬──────┘
│
▼
┌────────────┐
│ Cache │
│ response │
└─────┬──────┘
│
▼
┌────────────┐
│ Return │
│ response │
└────────────┘FastAPI Implementation
from fastapi import FastAPI, Request, Header, HTTPException
from typing import Optional
import redis.asyncio as redis
import json
import hashlib
app = FastAPI()
redis_client = redis.from_url("redis://localhost")
IDEMPOTENCY_TTL = 86400 # 24 hours
async def get_idempotency_response(
key: str,
path: str,
) -> dict | None:
"""Check for existing idempotent response."""
cache_key = f"idem:{path}:{key}"
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
async def set_idempotency_response(
key: str,
path: str,
response: dict,
status_code: int,
) -> None:
"""Cache response for idempotency key."""
cache_key = f"idem:{path}:{key}"
await redis_client.setex(
cache_key,
IDEMPOTENCY_TTL,
json.dumps({
"body": response,
"status": status_code,
}),
)
async def acquire_idempotency_lock(
key: str,
path: str,
timeout: int = 60,
) -> bool:
"""Acquire lock for processing (prevents concurrent duplicates)."""
lock_key = f"idem_lock:{path}:{key}"
return await redis_client.set(lock_key, "1", nx=True, ex=timeout)
async def release_idempotency_lock(key: str, path: str) -> None:
"""Release processing lock."""
lock_key = f"idem_lock:{path}:{key}"
await redis_client.delete(lock_key)
@app.post("/v1/charges")
async def create_charge(
request: Request,
idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"),
):
# If no key, process normally (no idempotency)
if not idempotency_key:
return await _process_charge(await request.json())
path = request.url.path
# Check for cached response
cached = await get_idempotency_response(idempotency_key, path)
if cached:
return JSONResponse(
content=cached["body"],
status_code=cached["status"],
headers={"Idempotent-Replayed": "true"},
)
# Try to acquire lock
if not await acquire_idempotency_lock(idempotency_key, path):
# Another request is processing with this key
raise HTTPException(
status_code=409,
detail="A request with this idempotency key is already being processed",
)
try:
# Process the request
body = await request.json()
result = await _process_charge(body)
# Cache the response
await set_idempotency_response(
idempotency_key,
path,
result,
status_code=200,
)
return result
finally:
await release_idempotency_lock(idempotency_key, path)Client Usage
import httpx
import uuid
async def create_charge_safely(amount: int, currency: str):
"""Create charge with idempotency protection."""
idempotency_key = str(uuid.uuid4())
for attempt in range(3):
try:
response = await client.post(
"/v1/charges",
headers={"Idempotency-Key": idempotency_key},
json={"amount": amount, "currency": currency},
)
return response.json()
except httpx.TransportError:
# Network error - safe to retry with same key
continue
raise Exception("Failed after 3 attempts")Key Principles
- Keys are client-generated: Server doesn't generate keys
- Keys are scoped to endpoint: Same key on different endpoints = different operations
- 24-hour window: Keys expire after 24 hours
- Locked during processing: Prevents concurrent duplicates
- Only success cached: Errors can be retried
- Response fully cached: Body, status, and relevant headers
Token Bucket Algorithm
Token Bucket Algorithm
In-depth guide to the token bucket rate limiting algorithm with Redis implementation.
How Token Bucket Works
┌─────────────────────────────────────────────────────────────┐
│ TOKEN BUCKET │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Tokens: ●●●●●●●○○○ (7/10 tokens available) │ │
│ │ Capacity: 10 tokens │ │
│ │ Refill Rate: 5 tokens/second │ │
│ └──────────────────────────────────────────────────┘ │
│ │ │
│ REQUEST ──────────────┼──────────────────────► ALLOWED │
│ │ │
│ (Each request consumes 1 token) │
│ (Bucket refills at constant rate) │
│ │
│ Timeline: │
│ t=0s: 10 tokens │ 10 requests → 0 tokens │
│ t=1s: +5 tokens │ 5 tokens available │
│ t=2s: +5 tokens │ 10 tokens (capped at capacity) │
│ │
└─────────────────────────────────────────────────────────────┘Algorithm Properties
| Property | Description |
|---|---|
| Burst Capacity | Allows short bursts up to bucket size |
| Smooth Limiting | Tokens refill continuously |
| No Memory | Doesn't track request history |
| Distributed | Works with Redis for multi-server |
Redis Lua Script (Atomic)
-- token_bucket.lua
-- KEYS[1] = bucket key
-- ARGV[1] = bucket capacity
-- ARGV[2] = refill rate (tokens per second)
-- ARGV[3] = current timestamp (milliseconds)
-- ARGV[4] = tokens to consume
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
-- Get current bucket state
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
-- Calculate tokens to add based on time elapsed
local elapsed = (now - last_refill) / 1000 -- Convert to seconds
local refill = math.floor(elapsed * refill_rate)
tokens = math.min(capacity, tokens + refill)
-- Check if we can consume tokens
local allowed = 0
local remaining = tokens
local retry_after = 0
if tokens >= requested then
allowed = 1
remaining = tokens - requested
else
-- Calculate when enough tokens will be available
local needed = requested - tokens
retry_after = math.ceil(needed / refill_rate)
end
-- Update bucket state
redis.call('HMSET', key,
'tokens', remaining,
'last_refill', now
)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
return {allowed, remaining, retry_after}Python Implementation
import time
from typing import NamedTuple
import redis.asyncio as redis
class RateLimitResult(NamedTuple):
allowed: bool
remaining: int
retry_after: int # seconds
class TokenBucket:
"""Token bucket rate limiter with Redis backend."""
# Load Lua script once
SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
local elapsed = (now - last_refill) / 1000
local refill = math.floor(elapsed * refill_rate)
tokens = math.min(capacity, tokens + refill)
local allowed = 0
local remaining = tokens
local retry_after = 0
if tokens >= requested then
allowed = 1
remaining = tokens - requested
else
local needed = requested - tokens
retry_after = math.ceil(needed / refill_rate)
end
redis.call('HMSET', key, 'tokens', remaining, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
return {allowed, remaining, retry_after}
"""
def __init__(
self,
redis_client: redis.Redis,
capacity: int = 100,
refill_rate: float = 10, # tokens per second
):
self.redis = redis_client
self.capacity = capacity
self.refill_rate = refill_rate
self._script = self.redis.register_script(self.SCRIPT)
async def consume(
self,
key: str,
tokens: int = 1,
) -> RateLimitResult:
"""
Try to consume tokens from the bucket.
Args:
key: Unique identifier (user_id, ip_address, etc.)
tokens: Number of tokens to consume
Returns:
RateLimitResult with allowed status and metadata
"""
bucket_key = f"ratelimit:token_bucket:{key}"
now_ms = int(time.time() * 1000)
result = await self._script(
keys=[bucket_key],
args=[self.capacity, self.refill_rate, now_ms, tokens],
)
return RateLimitResult(
allowed=bool(result[0]),
remaining=int(result[1]),
retry_after=int(result[2]),
)
# Usage with FastAPI
async def get_rate_limiter() -> TokenBucket:
redis_client = redis.from_url("redis://localhost:6379")
return TokenBucket(redis_client, capacity=100, refill_rate=10)Comparison: Token Bucket vs Sliding Window
| Aspect | Token Bucket | Sliding Window |
|---|---|---|
| Burst Handling | Allows up to capacity | Spreads evenly |
| Memory | O(1) per key | O(n) request timestamps |
| Precision | Approximate | Exact |
| Use Case | API rate limiting | Strict quotas |
| Redis Operations | 1 HMSET | 1 ZADD + 1 ZREMRANGEBYSCORE |
When to Use Token Bucket
Good for:
- API rate limiting (allows natural bursts)
- User actions (login attempts, form submissions)
- Resource protection (database connections)
Not ideal for:
- Strict per-second quotas
- Billing-based limits (use sliding window)
- Fair queuing (use leaky bucket)
Related Files
- See
examples/fastapi-rate-limiting.mdfor FastAPI integration - See
checklists/rate-limiting-checklist.mdfor implementation checklist - See SKILL.md for sliding window implementation
Checklists (5)
Circuit Breaker Setup
Circuit Breaker Setup Guide
Step-by-step guide for adding circuit breakers to a service.
Step 1: Identify Services
List all external dependencies that need circuit breakers:
┌─────────────────────────────────────────────────────────────┐
│ Service Inventory │
├─────────────────────────────────────────────────────────────┤
│ │
│ External APIs: │
│ □ OpenAI API (LLM) │
│ □ Anthropic API (LLM) │
│ □ YouTube Data API │
│ □ GitHub API │
│ □ arXiv API │
│ │
│ Internal Services: │
│ □ Embedding service │
│ □ Database (PostgreSQL) │
│ □ Redis cache │
│ □ Semantic search │
│ │
│ For each, answer: │
│ 1. What's the expected failure rate? │
│ 2. How long does recovery typically take? │
│ 3. What's the fallback behavior? │
│ │
└─────────────────────────────────────────────────────────────┘Step 2: Configure Thresholds
Failure Threshold
| Service Type | Recommended | Reasoning |
|---|---|---|
| LLM API | 3 | APIs can be unstable, fail fast |
| External API | 5 | More tolerant of transient issues |
| Database | 2-3 | DB issues usually need immediate attention |
| Internal service | 3-5 | Depends on service criticality |
Recovery Timeout
| Service Type | Recommended | Reasoning |
|---|---|---|
| LLM API | 60s | Rate limits typically reset in minutes |
| External API | 30-120s | Depends on SLA |
| Database | 15-30s | Should recover quickly |
| Internal service | 15-60s | Depends on restart time |
Slow Call Threshold
| Service Type | Recommended | Reasoning |
|---|---|---|
| LLM API | 30s | LLM calls can be slow |
| External API | 10s | Most APIs should be fast |
| Database | 5s | DB queries should be optimized |
| Internal service | 5-10s | Depends on operation |
Step 3: Implement Circuit Breaker
Basic Implementation
from resilience import CircuitBreaker, CircuitBreakerFactory
# Option 1: Use factory for common patterns
openai_breaker = CircuitBreakerFactory.for_llm_api("openai")
db_breaker = CircuitBreakerFactory.for_database("postgres")
# Option 2: Custom configuration
custom_breaker = CircuitBreaker(
name="my-service",
failure_threshold=5,
success_threshold=2,
recovery_timeout=30.0,
slow_call_threshold=10.0,
)Wrap Service Calls
# Method 1: Decorator
@openai_breaker
async def call_openai(prompt: str) -> str:
return await openai_client.complete(prompt)
# Method 2: Explicit call
async def call_openai(prompt: str) -> str:
return await openai_breaker.call(
openai_client.complete,
prompt,
)Step 4: Add Fallback Handling
from resilience import CircuitOpenError
async def analyze_with_fallback(content: str) -> Analysis:
try:
return await circuit_breaker.call(primary_analysis, content)
except CircuitOpenError as e:
logger.warning(
f"Circuit open for {e.name}, using fallback",
time_until_recovery=e.time_until_recovery,
)
# Fallback 1: Try cache
cached = await cache.get(f"analysis:{hash(content)}")
if cached:
return Analysis.from_cache(cached, is_stale=True)
# Fallback 2: Degraded response
return Analysis(
status="degraded",
message="Full analysis temporarily unavailable",
basic_info=extract_basic_info(content),
)Step 5: Add Observability
Logging
def setup_circuit_logging(breaker: CircuitBreaker):
def on_state_change(old: str, new: str, name: str):
logger.warning(
"circuit_state_change",
circuit=name,
old_state=old,
new_state=new,
)
if new == "open":
# Send alert
alerting.send(
severity="warning",
message=f"Circuit {name} opened",
runbook="https://docs/runbooks/circuit-breaker",
)
breaker._on_state_change = on_state_changeMetrics
from prometheus_client import Gauge, Counter
circuit_state = Gauge(
"circuit_breaker_state",
"Circuit breaker state (0=closed, 1=open, 2=half_open)",
["service"],
)
circuit_rejections = Counter(
"circuit_breaker_rejections_total",
"Total requests rejected by circuit breaker",
["service"],
)
def update_metrics(breaker: CircuitBreaker):
state_map = {"closed": 0, "open": 1, "half_open": 2}
circuit_state.labels(service=breaker.name).set(
state_map[breaker.state.value]
)Health Endpoint
@app.get("/health/circuits")
async def circuit_health():
return {
name: {
"state": cb.state.value,
"failure_count": cb._failure_count,
"time_until_recovery": (
cb._time_until_recovery()
if cb.state == CircuitState.OPEN
else None
),
}
for name, cb in circuit_breakers.items()
}Step 6: Test Circuit Behavior
Unit Tests
@pytest.mark.asyncio
async def test_circuit_opens_on_failures():
breaker = CircuitBreaker(name="test", failure_threshold=3)
async def failing_call():
raise ConnectionError("Failed")
# Fail 3 times
for _ in range(3):
with pytest.raises(ConnectionError):
await breaker.call(failing_call)
# Should be open now
assert breaker.state == CircuitState.OPEN
# Next call rejected
with pytest.raises(CircuitOpenError):
await breaker.call(failing_call)
@pytest.mark.asyncio
async def test_circuit_recovers():
breaker = CircuitBreaker(
name="test",
failure_threshold=1,
recovery_timeout=0.1, # Fast for testing
)
async def failing_then_succeeding():
if breaker._failure_count > 0:
return "success"
raise ConnectionError("First call fails")
# Open circuit
with pytest.raises(ConnectionError):
await breaker.call(failing_then_succeeding)
# Wait for recovery
await asyncio.sleep(0.2)
# Should succeed now
result = await breaker.call(failing_then_succeeding)
assert result == "success"
assert breaker.state == CircuitState.CLOSEDIntegration Tests
@pytest.mark.asyncio
async def test_circuit_isolates_failures():
"""Verify circuit prevents cascade failures."""
openai_breaker = circuit_breakers["openai"]
# Simulate OpenAI outage
with patch("openai.complete", side_effect=ConnectionError):
# Multiple calls should fail then trip circuit
for _ in range(5):
try:
await analyze_content("test")
except (ConnectionError, CircuitOpenError):
pass
# Circuit should be open
assert openai_breaker.state == CircuitState.OPEN
# Anthropic should still work (different circuit)
anthropic_breaker = circuit_breakers["anthropic"]
assert anthropic_breaker.state == CircuitState.CLOSEDStep 7: Document and Monitor
Documentation
Add to your service's README:
## Circuit Breakers
| Service | Threshold | Recovery | Fallback |
|---------|-----------|----------|----------|
| openai | 3 failures | 60s | Use gpt-5.2-mini |
| anthropic | 3 failures | 60s | Use cache |
| youtube | 5 failures | 120s | Return partial data |
### Monitoring
- Dashboard: [Grafana Circuit Breakers](...)
- Alerts: PagerDuty channel #resilience
- Runbook: [Circuit Breaker Runbook](...)Runbook Template
## Circuit Breaker Open - {service}
### Symptoms
- Service returning 503 errors
- Alert: "Circuit {service} opened"
- Dashboard shows circuit in OPEN state
### Impact
- {describe impact on users}
### Resolution
1. Check {service} status page
2. Review logs for failure pattern
3. If transient: wait for auto-recovery
4. If persistent: {escalation steps}
### Verification
1. Circuit state returns to CLOSED
2. Service calls succeeding
3. Metrics returning to normalQuick Reference
┌─────────────────────────────────────────────────────────────┐
│ Circuit Breaker Quick Reference │
├─────────────────────────────────────────────────────────────┤
│ │
│ CREATE: │
│ breaker = CircuitBreaker("name", failure_threshold=5) │
│ │
│ USE: │
│ @breaker │
│ async def my_function(): ... │
│ │
│ result = await breaker.call(func, *args) │
│ │
│ HANDLE: │
│ try: │
│ await breaker.call(...) │
│ except CircuitOpenError: │
│ return fallback_response() │
│ │
│ MONITOR: │
│ breaker.get_status() │
│ breaker.state == CircuitState.OPEN │
│ │
│ RESET (manual): │
│ breaker.reset() │
│ │
└─────────────────────────────────────────────────────────────┘Distributed Locks Checklist
Distributed Locks Checklist
Lock Selection
- Chose appropriate lock backend
- Redis: Fast, TTL-based, requires Redis infrastructure
- PostgreSQL: No extra infra, integrates with transactions
- Redlock: Multi-node Redis for high availability
- Determined lock scope (session vs transaction)
- Set appropriate TTL (not too short, not too long)
Implementation
Acquire
- Non-blocking option available (
try_lock) - Timeout support for blocking acquire
- Retry logic with exponential backoff
- Jitter added to prevent thundering herd
- Unique owner ID generated (UUIDv7)
Release
- Owner validation (only owner can release)
- Atomic release (Lua script for Redis)
- Idempotent release (safe to call twice)
- Finally block ensures release on exception
Extension
- Heartbeat/extend for long operations
- Auto-extend background task option
- Extension validates ownership
Safety
Mutual Exclusion
- Atomic acquire (SET NX for Redis)
- Fencing token or owner ID validated
- No race conditions in acquire/release
Deadlock Prevention
- TTL prevents permanent deadlocks
- Lock ordering for multiple locks
- Timeout on acquire attempts
Split-Brain Protection
- Redlock for multi-node Redis
- Clock drift factored into validity
- Quorum required for lock acquisition
Error Handling
- Lock acquisition failures handled gracefully
- Release failures logged and handled
- Network partition scenarios considered
- Retry logic for transient failures
Testing
- Unit tests for lock logic
- Integration tests with real backend
- Concurrent access tests
- Failure scenario tests (network, timeout)
- Lock expiration tests
Monitoring
- Lock acquisition metrics
- Lock hold duration metrics
- Failed acquisition alerts
- Long-held lock alerts
- Deadlock detection
PostgreSQL Advisory Locks
- Correct lock function used (session vs xact)
- Lock ID strategy documented
- Namespace collisions prevented
-
pg_locksmonitoring query available
Redis Locks
- Lua scripts used for atomicity
- TTL always set (no deadlocks)
- Owner ID stored with lock
- Release validates owner
Redlock (Multi-Node)
- Minimum 3 Redis instances (recommend 5)
- Quorum calculated correctly (N/2 + 1)
- Clock drift factored in
- Failed nodes don't block acquire
- Release attempted on all nodes
Production Readiness
- Lock names are descriptive and namespaced
- TTL tuned for operation duration
- Metrics and alerting configured
- Runbook for lock-related incidents
- Graceful degradation strategy
Idempotency Checklist
Idempotency Implementation Checklist
Key Generation
- Keys are deterministic (same input = same key)
- Keys include all relevant parameters
- Keys are scoped appropriately (user, endpoint, etc.)
- Keys use consistent hash algorithm (SHA-256)
- Keys are reasonable length (32-64 chars)
API Endpoints
- POST/PUT/PATCH endpoints support Idempotency-Key header
- Idempotency key format is documented
- Key is validated (format, length)
- Duplicate requests return cached response
- Response includes header indicating replay
Storage
- Redis used for fast lookups
- Database used for durability
- TTL configured appropriately (24-72 hours)
- Cleanup job for expired records
- Storage sized for expected volume
Race Conditions
- Database constraint prevents duplicates
- Check-and-insert is atomic
- Lost updates are prevented
- Concurrent requests handled correctly
Response Handling
- Only successful responses cached (2xx)
- Error responses allow retry
- Response body stored completely
- Status code preserved
- Headers preserved if needed
Event Processing
- Events include idempotency key
- Consumer checks before processing
- Processed events tracked
- At-least-once delivery handled
- Dead letter queue for failures
Error Cases
- Missing key handled (process normally or reject)
- Invalid key format rejected
- Storage failures don't break processing
- Timeout during processing handled
Testing
- Duplicate request returns same response
- Different keys process independently
- Race condition tests pass
- TTL expiration verified
- Cache miss falls back to database
Documentation
- Idempotency behavior documented
- Key format documented
- TTL window documented
- Client retry guidance provided
Pre Deployment Resilience
Pre-Deployment Resilience Checklist
Use this checklist before deploying services with resilience patterns.
Circuit Breakers
-
Threshold Configuration
- Failure threshold set appropriately (not too low, not too high)
- Recovery timeout allows service to actually recover
- Sliding window size captures representative sample
-
Fallback Behavior
- Every circuit breaker has a defined fallback response
- Fallbacks return meaningful partial data when possible
- Fallbacks don't call other services with closed circuits
-
Observability
- State changes logged with structured logging
- Metrics exported (Prometheus/Langfuse)
- Alerts configured for OPEN state
- Dashboard shows circuit status
-
Testing
- Unit tests for state transitions
- Integration tests simulate failure scenarios
- Chaos testing validates circuit behavior under load
Bulkheads
-
Tier Assignment
- Critical operations in Tier 1 (highest priority)
- Standard operations in Tier 2
- Optional/background operations in Tier 3
- No critical path through Tier 3
-
Capacity Planning
- Max concurrent based on downstream capacity
- Queue sizes prevent memory exhaustion
- Timeouts shorter than caller's timeout
-
Rejection Handling
- Rejection policy defined per tier
- HTTP 503 returned with Retry-After header
- Rejections logged and metriced
-
Testing
- Load test validates bulkhead isolation
- Tier 3 failure doesn't affect Tier 1
- Queue depth monitored under load
Retry Logic
-
Error Classification
- Retryable vs non-retryable errors defined
- HTTP status codes classified correctly
- LLM API errors handled specifically
-
Backoff Strategy
- Exponential backoff configured
- Jitter enabled to prevent thundering herd
- Max delay caps retry storms
-
Limits
- Max attempts bounded (typically 3-5)
- Total retry time < caller's timeout
- Retry budget prevents system overload
-
Testing
- Transient failures recovered automatically
- Non-retryable errors fail immediately
- Retry budget depletes under sustained failures
LLM Resilience
-
Fallback Chain
- Primary model defined
- At least one fallback model configured
- Semantic cache as final fallback
- Default response for complete outage
-
Token Budget
- Budget allocation per category
- Truncation strategy defined
- Output reserve prevents overflow
-
Rate Limiting
- Client-side rate limiter configured
- Respects API provider limits
- Graceful handling of 429 responses
-
Cost Control
- Per-request cost tracking
- Hourly/daily budget alerts
- Cost circuit breaker configured
Integration
-
Pattern Composition
- Retry INSIDE circuit breaker
- Bulkhead wraps retry+circuit
- Timeout inside all patterns
-
Health Endpoints
- /health returns 200 (doesn't check circuit)
- /ready reflects degraded state
- /resilience shows all pattern status
-
Configuration
- All thresholds configurable via env vars
- Defaults documented
- Per-environment overrides
Observability
-
Logging
- Structured logging with trace IDs
- State changes logged at WARN level
- Rejections logged with context
-
Metrics
- Circuit state gauge
- Bulkhead utilization gauge
- Retry counter
- Latency histograms
-
Alerting
- Alert when circuit opens
- Alert when bulkhead consistently full
- Alert when retry budget exhausted
- Runbook links in alerts
Documentation
-
Architecture
- Resilience patterns documented in ADR
- Diagram shows pattern composition
- Tier assignments documented
-
Operations
- Runbook for circuit open scenarios
- Runbook for bulkhead exhaustion
- Manual override procedures documented
-
API Documentation
- 503 responses documented
- Retry-After header usage documented
- Degraded response format documented
Final Verification
-
Load Test
- System handles expected load
- Graceful degradation under 2x load
- Recovery after load spike
-
Chaos Test
- Dependency failure isolated
- Recovery automatic when dependency restored
- No cascading failures
-
Security Review
- Fallback responses don't leak sensitive data
- Error messages don't expose internals
- Rate limits prevent abuse
Rate Limiting Checklist
Rate Limiting Implementation Checklist
Planning
-
Define rate limits for each endpoint category
- Read endpoints (GET) - higher limits
- Write endpoints (POST/PUT/DELETE) - lower limits
- Authentication endpoints - very strict limits
- Expensive operations (LLM calls, file processing) - strictest limits
-
Choose limiting algorithm
- Token Bucket - for bursty traffic patterns
- Sliding Window - for strict quotas
- Fixed Window - for simple requirements
-
Determine key strategy
- By IP address (anonymous users)
- By user ID (authenticated users)
- By API key (service accounts)
- By organization (enterprise customers)
Implementation
Backend Setup
-
Install dependencies
pip install slowapi redis -
Configure Redis connection
redis_client = Redis.from_url(settings.redis_url) -
Set up SlowAPI or custom limiter
limiter = Limiter(key_func=get_user_identifier) app.add_middleware(SlowAPIMiddleware)
Route Protection
- Add
@limiter.limit()to all public endpoints - Set stricter limits for:
- Login/register endpoints (prevent brute force)
- Password reset (prevent enumeration)
- File upload (prevent abuse)
- LLM/AI operations (cost control)
Response Headers
-
Include rate limit headers in all responses:
-
X-RateLimit-Limit- max requests in window -
X-RateLimit-Remaining- requests remaining -
X-RateLimit-Reset- Unix timestamp when limit resets
-
-
Include
Retry-Afterheader in 429 responses
Error Handling
- Return proper 429 Too Many Requests status
- Include helpful error message
{ "type": "https://api.example.com/problems/rate-limit-exceeded", "title": "Rate Limit Exceeded", "status": 429, "detail": "You have exceeded 100 requests per minute. Please wait 45 seconds.", "retry_after": 45 }
Tiered Limits
-
Define limits per user tier:
Tier Requests/min Burst Anonymous 10 5 Free 100 20 Pro 1000 100 Enterprise 10000 1000 -
Implement dynamic limit function
def get_tier_limit(request: Request) -> str: user = request.state.user return TIER_LIMITS.get(user.tier, "10/minute")
Distributed Systems
- Use Redis backend (not in-memory)
- Configure Redis connection pooling
- Set appropriate key TTLs
- Use Lua scripts for atomicity
- Handle Redis connection failures gracefully
Monitoring
-
Log rate limit hits
logger.warning("Rate limit exceeded", extra={ "user_id": user.id, "endpoint": request.url.path, "limit": limit, }) -
Track metrics:
- Rate limit hits per endpoint
- Rate limit hits per user
- Average remaining quota
-
Set up alerts for:
- Unusual spike in 429 responses
- Single user hitting limits repeatedly
- Redis connection failures
Security Considerations
- Rate limit login endpoints strictly (prevent brute force)
- Rate limit password reset (prevent enumeration)
- Consider IP reputation for anonymous limits
- Don't expose internal rate limit keys
- Use secure Redis connection (TLS)
Documentation
- Document rate limits in OpenAPI/Swagger
- Add rate limit info to API documentation
- Include examples of handling 429 responses
- Explain tier limits for customers
Testing
- Unit test rate limit logic
- Integration test with Redis
- Load test to verify limits work
- Test retry logic in clients
- Test header values are correct
- Test limit reset behavior
Client SDK Recommendations
Document recommended client-side handling:
# Python client example
import time
import httpx
def make_request_with_retry(url: str, max_retries: int = 3):
for attempt in range(max_retries):
response = httpx.get(url)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
time.sleep(retry_after)
continue
return response
raise Exception("Rate limit exceeded after retries")Rollout Checklist
- Deploy with monitoring enabled
- Start with permissive limits
- Monitor for false positives
- Gradually tighten limits
- Communicate changes to users
- Provide upgrade path for users hitting limits
Examples (3)
Fastapi Rate Limiting
FastAPI Rate Limiting Examples
Complete examples for implementing rate limiting in FastAPI with Redis.
SlowAPI Setup (Recommended for Simple Cases)
Installation
pip install slowapi redisBasic Configuration
# app/core/rate_limit.py
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from redis import Redis
# Use Redis backend for distributed rate limiting
redis_client = Redis.from_url("redis://localhost:6379", decode_responses=True)
limiter = Limiter(
key_func=get_remote_address,
storage_uri="redis://localhost:6379",
default_limits=["100/minute"],
)
def setup_rate_limiting(app):
"""Configure rate limiting for the FastAPI app."""
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)Route-Level Limiting
# app/api/v1/routes/analyses.py
from fastapi import APIRouter, Request, Depends
from slowapi import Limiter
from slowapi.util import get_remote_address
router = APIRouter()
limiter = Limiter(key_func=get_remote_address)
@router.post("/analyses")
@limiter.limit("10/minute") # Override default
async def create_analysis(request: Request):
"""Create analysis - stricter limit due to resource cost."""
return {"message": "Analysis created"}
@router.get("/analyses")
@limiter.limit("100/minute")
async def list_analyses(request: Request):
"""List analyses - more permissive."""
return {"analyses": []}
@router.get("/analyses/{id}")
@limiter.limit("200/minute")
async def get_analysis(request: Request, id: str):
"""Get single analysis - most permissive."""
return {"id": id}User-Based Rate Limiting
# app/core/rate_limit.py
from fastapi import Request
from app.api.deps import get_current_user
def get_user_identifier(request: Request) -> str:
"""Get rate limit key from authenticated user or IP."""
# Try to get user from request state (set by auth middleware)
user = getattr(request.state, "user", None)
if user:
return f"user:{user.id}"
# Fallback to IP for unauthenticated requests
return f"ip:{get_remote_address(request)}"
limiter = Limiter(key_func=get_user_identifier)Tiered Rate Limits
# app/api/v1/routes/protected.py
from fastapi import APIRouter, Request, Depends
from slowapi import Limiter
router = APIRouter()
def get_tier_limit(request: Request) -> str:
"""Dynamic limit based on user tier."""
user = getattr(request.state, "user", None)
if not user:
return "10/minute" # Anonymous
tier_limits = {
"free": "100/minute",
"pro": "1000/minute",
"enterprise": "10000/minute",
}
return tier_limits.get(user.tier, "100/minute")
@router.post("/generate")
@limiter.limit(get_tier_limit)
async def generate_content(request: Request):
"""Rate limit based on user subscription tier."""
return {"content": "Generated"}Custom Redis Token Bucket
For more control, implement custom rate limiting:
# app/core/rate_limit.py
import time
from typing import NamedTuple
import redis.asyncio as redis
from fastapi import Request, HTTPException, status
class RateLimitResult(NamedTuple):
allowed: bool
remaining: int
reset_at: float
retry_after: int
class RedisRateLimiter:
"""Custom rate limiter with token bucket algorithm."""
SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
local elapsed = (now - last_refill) / 1000
tokens = math.min(capacity, tokens + elapsed * refill_rate)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
return {1, math.floor(tokens), 0}
else
local retry_after = math.ceil((1 - tokens) / refill_rate)
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
return {0, 0, retry_after}
end
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
capacity: int = 100,
refill_rate: float = 10,
):
self.redis = redis.from_url(redis_url)
self.capacity = capacity
self.refill_rate = refill_rate
self._script = None
async def _get_script(self):
if self._script is None:
self._script = self.redis.register_script(self.SCRIPT)
return self._script
async def check(self, key: str) -> RateLimitResult:
"""Check rate limit for a key."""
script = await self._get_script()
now_ms = int(time.time() * 1000)
result = await script(
keys=[f"ratelimit:{key}"],
args=[self.capacity, self.refill_rate, now_ms],
)
reset_at = time.time() + (self.capacity / self.refill_rate)
return RateLimitResult(
allowed=bool(result[0]),
remaining=int(result[1]),
reset_at=reset_at,
retry_after=int(result[2]),
)
# FastAPI Dependency
async def rate_limit_dependency(
request: Request,
limiter: RedisRateLimiter = Depends(get_rate_limiter),
):
"""Dependency that enforces rate limiting."""
# Get identifier
user = getattr(request.state, "user", None)
key = f"user:{user.id}" if user else f"ip:{request.client.host}"
result = await limiter.check(key)
# Set rate limit headers
request.state.rate_limit = result
if not result.allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded",
headers={
"Retry-After": str(result.retry_after),
"X-RateLimit-Limit": str(limiter.capacity),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(result.reset_at)),
},
)
# Middleware to add rate limit headers to all responses
@app.middleware("http")
async def add_rate_limit_headers(request: Request, call_next):
response = await call_next(request)
rate_limit = getattr(request.state, "rate_limit", None)
if rate_limit:
response.headers["X-RateLimit-Limit"] = str(100)
response.headers["X-RateLimit-Remaining"] = str(rate_limit.remaining)
response.headers["X-RateLimit-Reset"] = str(int(rate_limit.reset_at))
return responseUsage in Routes
@router.post("/expensive-operation")
async def expensive_operation(
request: Request,
_: None = Depends(rate_limit_dependency),
):
"""This endpoint is rate limited."""
return {"result": "success"}Rate Limit by Endpoint Cost
# app/core/rate_limit.py
from functools import wraps
from typing import Callable
class CostBasedLimiter:
"""Rate limiter where different operations cost different tokens."""
def __init__(self, redis_url: str, capacity: int = 1000):
self.limiter = RedisRateLimiter(redis_url, capacity=capacity)
def limit(self, cost: int = 1):
"""Decorator that consumes 'cost' tokens per request."""
def decorator(func: Callable):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
key = get_user_identifier(request)
# Check if we have enough tokens
for _ in range(cost):
result = await self.limiter.check(key)
if not result.allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded (operation costs {cost} tokens)",
)
return await func(request, *args, **kwargs)
return wrapper
return decorator
cost_limiter = CostBasedLimiter("redis://localhost:6379")
@router.get("/simple")
@cost_limiter.limit(cost=1) # Cheap operation
async def simple_query(request: Request):
return {"data": "simple"}
@router.post("/generate")
@cost_limiter.limit(cost=10) # Expensive operation
async def generate_content(request: Request):
return {"data": "generated"}
@router.post("/bulk-process")
@cost_limiter.limit(cost=50) # Very expensive
async def bulk_process(request: Request):
return {"data": "processed"}Testing Rate Limits
# tests/test_rate_limiting.py
import pytest
from httpx import AsyncClient
from app.main import app
@pytest.mark.asyncio
async def test_rate_limit_enforced():
async with AsyncClient(app=app, base_url="http://test") as client:
# Make requests up to limit
for _ in range(10):
response = await client.post("/analyses")
assert response.status_code == 200
# Next request should be rate limited
response = await client.post("/analyses")
assert response.status_code == 429
assert "Retry-After" in response.headers
@pytest.mark.asyncio
async def test_rate_limit_headers():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/analyses")
assert "X-RateLimit-Limit" in response.headers
assert "X-RateLimit-Remaining" in response.headers
assert "X-RateLimit-Reset" in response.headersRelated Files
- See
references/token-bucket-algorithm.mdfor algorithm details - See
checklists/rate-limiting-checklist.mdfor implementation checklist - See SKILL.md for sliding window and fixed window algorithms
Idempotency Examples
Idempotency Implementation Examples
FastAPI Idempotency Middleware
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from typing import Callable
import redis.asyncio as redis
import json
import hashlib
app = FastAPI()
redis_client = redis.from_url("redis://localhost:6379")
IDEMPOTENCY_TTL = 86400 # 24 hours
class IdempotencyMiddleware:
"""Stripe-style idempotency middleware."""
def __init__(self, app: FastAPI):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
# Only apply to mutating methods
if request.method not in ("POST", "PUT", "PATCH"):
await self.app(scope, receive, send)
return
# Get idempotency key
idempotency_key = request.headers.get("Idempotency-Key")
if not idempotency_key:
await self.app(scope, receive, send)
return
# Check for cached response
cache_key = f"idem:{request.url.path}:{idempotency_key}"
cached = await redis_client.get(cache_key)
if cached:
cached_response = json.loads(cached)
response = JSONResponse(
content=cached_response["body"],
status_code=cached_response["status"],
headers={"Idempotent-Replayed": "true"},
)
await response(scope, receive, send)
return
# Try to acquire lock
lock_key = f"idem_lock:{request.url.path}:{idempotency_key}"
acquired = await redis_client.set(lock_key, "1", nx=True, ex=60)
if not acquired:
response = JSONResponse(
content={"error": "Request with this idempotency key is being processed"},
status_code=409,
)
await response(scope, receive, send)
return
try:
# Process request and capture response
# (Simplified - real implementation needs response capture)
await self.app(scope, receive, send)
finally:
await redis_client.delete(lock_key)
app.add_middleware(IdempotencyMiddleware)Database-Backed Idempotency
from sqlalchemy import Column, String, DateTime, Text, Index
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import UTC, datetime, timedelta
import json
class IdempotencyRecord(Base):
"""Track processed idempotency keys."""
__tablename__ = "idempotency_records"
idempotency_key = Column(String(64), primary_key=True)
endpoint = Column(String(256), nullable=False)
request_hash = Column(String(64), nullable=False)
response_body = Column(Text, nullable=True)
response_status = Column(Integer, default=200)
created_at = Column(DateTime, default=lambda: datetime.now(UTC))
expires_at = Column(DateTime, nullable=False)
__table_args__ = (
Index("ix_idempotency_expires", "expires_at"),
Index("ix_idempotency_endpoint_key", "endpoint", "idempotency_key"),
)
async def get_or_create_idempotency(
db: AsyncSession,
idempotency_key: str,
endpoint: str,
request_body: dict,
process_func: Callable,
) -> tuple[dict, int, bool]:
"""
Get cached response or process request idempotently.
Returns:
(response_body, status_code, was_replayed)
"""
# Hash the request to detect mismatched bodies
request_hash = hashlib.sha256(
json.dumps(request_body, sort_keys=True).encode()
).hexdigest()
# Check for existing record
result = await db.execute(
select(IdempotencyRecord).where(
IdempotencyRecord.idempotency_key == idempotency_key,
IdempotencyRecord.endpoint == endpoint,
)
)
existing = result.scalar_one_or_none()
if existing:
# Verify request body matches
if existing.request_hash != request_hash:
raise HTTPException(
status_code=422,
detail="Idempotency key reused with different request body",
)
# Return cached response
return (
json.loads(existing.response_body),
existing.response_status,
True,
)
# Process the request
try:
response_body, status_code = await process_func()
# Store the result
record = IdempotencyRecord(
idempotency_key=idempotency_key,
endpoint=endpoint,
request_hash=request_hash,
response_body=json.dumps(response_body),
response_status=status_code,
expires_at=datetime.now(UTC) + timedelta(hours=24),
)
db.add(record)
await db.commit()
return (response_body, status_code, False)
except Exception:
# Don't cache errors - allow retry
await db.rollback()
raise
# Usage in endpoint
@app.post("/api/orders")
async def create_order(
order: OrderCreate,
idempotency_key: str = Header(..., alias="Idempotency-Key"),
db: AsyncSession = Depends(get_db),
):
async def process():
# Actual order creation logic
new_order = Order(**order.model_dump())
db.add(new_order)
await db.commit()
return {"order_id": str(new_order.id)}, 201
response, status, replayed = await get_or_create_idempotency(
db=db,
idempotency_key=idempotency_key,
endpoint="/api/orders",
request_body=order.model_dump(),
process_func=process,
)
return JSONResponse(
content=response,
status_code=status,
headers={"Idempotent-Replayed": "true"} if replayed else {},
)Event Consumer Idempotency
from dataclasses import dataclass
from datetime import datetime
import asyncpg
@dataclass
class ProcessedEvent:
event_id: str
event_type: str
processed_at: datetime
result: str | None
class IdempotentEventProcessor:
"""Process events exactly once using database tracking."""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def setup(self):
"""Create tracking table if not exists."""
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS processed_events (
event_id VARCHAR(64) PRIMARY KEY,
event_type VARCHAR(128) NOT NULL,
processed_at TIMESTAMPTZ DEFAULT NOW(),
result TEXT
)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS ix_processed_events_type
ON processed_events (event_type, processed_at)
""")
async def is_processed(self, event_id: str) -> bool:
"""Check if event was already processed."""
async with self.pool.acquire() as conn:
result = await conn.fetchval(
"SELECT 1 FROM processed_events WHERE event_id = $1",
event_id,
)
return result is not None
async def process_event(
self,
event_id: str,
event_type: str,
handler: Callable,
*args,
**kwargs,
) -> tuple[any, bool]:
"""
Process event idempotently.
Returns:
(result, was_duplicate)
"""
async with self.pool.acquire() as conn:
async with conn.transaction():
# Try to insert tracking record (fails if exists)
try:
await conn.execute(
"""
INSERT INTO processed_events (event_id, event_type)
VALUES ($1, $2)
""",
event_id,
event_type,
)
except asyncpg.UniqueViolationError:
# Already processed
existing = await conn.fetchrow(
"SELECT result FROM processed_events WHERE event_id = $1",
event_id,
)
return existing["result"], True
# Process the event
result = await handler(*args, **kwargs)
# Update with result
await conn.execute(
"UPDATE processed_events SET result = $1 WHERE event_id = $2",
json.dumps(result) if result else None,
event_id,
)
return result, False
# Usage with Kafka consumer
async def consume_orders(processor: IdempotentEventProcessor):
consumer = AIOKafkaConsumer("orders", bootstrap_servers="localhost:9092")
await consumer.start()
try:
async for msg in consumer:
event = json.loads(msg.value)
event_id = event["event_id"]
result, was_duplicate = await processor.process_event(
event_id=event_id,
event_type="order.created",
handler=handle_order_created,
order_data=event["data"],
)
if was_duplicate:
logger.info(f"Skipped duplicate event: {event_id}")
else:
logger.info(f"Processed event: {event_id}")
finally:
await consumer.stop()Client-Side Retry with Idempotency
import httpx
import uuid
from tenacity import retry, stop_after_attempt, wait_exponential
class IdempotentClient:
"""HTTP client with automatic idempotency key handling."""
def __init__(self, base_url: str):
self.client = httpx.AsyncClient(base_url=base_url)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, min=0.5, max=10),
reraise=True,
)
async def post_idempotent(
self,
path: str,
json: dict,
idempotency_key: str | None = None,
) -> httpx.Response:
"""
POST with idempotency key.
Args:
path: API endpoint path
json: Request body
idempotency_key: Optional key (auto-generated if not provided)
Returns:
Response from server (may be replayed)
"""
key = idempotency_key or str(uuid.uuid4())
response = await self.client.post(
path,
json=json,
headers={"Idempotency-Key": key},
)
# Don't retry client errors (4xx)
if 400 <= response.status_code < 500:
response.raise_for_status()
return response
async def create_payment(self, amount: int, currency: str) -> dict:
"""Create payment with idempotency protection."""
# Use deterministic key based on content
key = hashlib.sha256(
f"payment:{amount}:{currency}:{datetime.now().date()}".encode()
).hexdigest()
response = await self.post_idempotent(
"/api/payments",
json={"amount": amount, "currency": currency},
idempotency_key=key,
)
return response.json()
# Usage
async def main():
client = IdempotentClient("https://api.example.com")
# Safe to retry - same key prevents duplicate
payment = await client.create_payment(amount=1000, currency="USD")
# Check if it was a replay
if payment.get("_replayed"):
print("Payment was already processed")
else:
print(f"New payment created: {payment['id']}")Cleanup Job for Expired Records
import asyncio
from datetime import UTC, datetime, timedelta
async def cleanup_expired_idempotency_records(
db: AsyncSession,
retention_days: int = 7,
batch_size: int = 1000,
):
"""
Delete expired idempotency records in batches.
Run this as a scheduled job (e.g., daily).
"""
cutoff = datetime.now(UTC) - timedelta(days=retention_days)
total_deleted = 0
while True:
# Delete in batches to avoid long locks
result = await db.execute(
text("""
DELETE FROM idempotency_records
WHERE id IN (
SELECT id FROM idempotency_records
WHERE expires_at < :cutoff
LIMIT :batch_size
)
"""),
{"cutoff": cutoff, "batch_size": batch_size},
)
await db.commit()
deleted = result.rowcount
total_deleted += deleted
if deleted < batch_size:
break
# Small delay to reduce database load
await asyncio.sleep(0.1)
return total_deleted
# Redis cleanup (handled by TTL, but can force cleanup)
async def cleanup_redis_idempotency_keys(redis_client, pattern: str = "idem:*"):
"""Scan and delete expired keys (if TTL not working)."""
cursor = 0
deleted = 0
while True:
cursor, keys = await redis_client.scan(cursor, match=pattern, count=100)
for key in keys:
ttl = await redis_client.ttl(key)
if ttl == -1: # No TTL set
await redis_client.delete(key)
deleted += 1
if cursor == 0:
break
return deletedTesting Idempotency
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_idempotent_request_returns_same_response():
"""Same idempotency key returns cached response."""
async with AsyncClient(app=app, base_url="http://test") as client:
idempotency_key = str(uuid.uuid4())
# First request
response1 = await client.post(
"/api/orders",
json={"product": "widget", "quantity": 1},
headers={"Idempotency-Key": idempotency_key},
)
assert response1.status_code == 201
order_id = response1.json()["order_id"]
# Second request with same key
response2 = await client.post(
"/api/orders",
json={"product": "widget", "quantity": 1},
headers={"Idempotency-Key": idempotency_key},
)
assert response2.status_code == 201
assert response2.json()["order_id"] == order_id
assert response2.headers.get("Idempotent-Replayed") == "true"
@pytest.mark.asyncio
async def test_different_keys_process_independently():
"""Different idempotency keys process as separate requests."""
async with AsyncClient(app=app, base_url="http://test") as client:
response1 = await client.post(
"/api/orders",
json={"product": "widget", "quantity": 1},
headers={"Idempotency-Key": str(uuid.uuid4())},
)
response2 = await client.post(
"/api/orders",
json={"product": "widget", "quantity": 1},
headers={"Idempotency-Key": str(uuid.uuid4())},
)
assert response1.json()["order_id"] != response2.json()["order_id"]
@pytest.mark.asyncio
async def test_mismatched_body_rejected():
"""Reusing key with different body is rejected."""
async with AsyncClient(app=app, base_url="http://test") as client:
idempotency_key = str(uuid.uuid4())
await client.post(
"/api/orders",
json={"product": "widget", "quantity": 1},
headers={"Idempotency-Key": idempotency_key},
)
response = await client.post(
"/api/orders",
json={"product": "gadget", "quantity": 2}, # Different body!
headers={"Idempotency-Key": idempotency_key},
)
assert response.status_code == 422
assert "different request body" in response.json()["detail"]Orchestkit Workflow Resilience
OrchestKit Workflow Resilience Integration
This example shows how to wire resilience patterns into the OrchestKit analysis pipeline.
Current Architecture
┌────────────────────────────────────────────────────────────────────┐
│ OrchestKit Analysis Pipeline │
├────────────────────────────────────────────────────────────────────┤
│ │
│ Content ─▶ [Supervisor] ─▶ [Agent Fan-Out] ─▶ [Aggregate] ─▶ ... │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ Agent Selection A1 A2 A3 (Parallel Analysis) │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ [findings, findings, findings] │
│ │ │
│ ▼ │
│ [Synthesize] ─▶ [Quality Gate] │
│ │
└────────────────────────────────────────────────────────────────────┘Resilience Layer Integration
┌────────────────────────────────────────────────────────────────────┐
│ With Resilience Patterns │
├────────────────────────────────────────────────────────────────────┤
│ │
│ Content ─▶ [Rate Limiter] ─▶ [Circuit Breaker: LLM] ─▶ ... │
│ │ │
│ ┌──────────────────────┼──────────────────────┐ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ TIER 1: CRITICAL │ │ │
│ │ │ [Supervisor] [Synthesis] [QualityGate]│ │ │
│ │ │ Bulkhead: 5 concurrent, 300s timeout │ │ │
│ │ └──────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ TIER 2: STANDARD │ │ │
│ │ │ [Tech Comparator] [Impl Planner] │ │ │
│ │ │ [Security Auditor] [Learning Synth] │ │ │
│ │ │ Bulkhead: 3 concurrent, 120s timeout │ │ │
│ │ └──────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ TIER 3: OPTIONAL │ │ │
│ │ │ [Enrichment] [Cache Warm] │ │ │
│ │ │ Bulkhead: 2 concurrent, 60s timeout │ │ │
│ │ └──────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────┘ │
│ │
│ Each agent call wrapped with: │
│ @circuit_breaker(name="agent-{agent_type}") │
│ @bulkhead(tier=agent.tier) │
│ @retry(max_attempts=2, base_delay=1.0) │
│ │
└────────────────────────────────────────────────────────────────────┘Implementation
1. Circuit Breaker Registry
# backend/app/shared/resilience/circuit_breakers.py
from app.core.circuit_breaker import CircuitBreaker
# Per-service circuit breakers
circuit_breakers = {
# LLM APIs
"openai": CircuitBreaker(
name="openai",
failure_threshold=3,
recovery_timeout=60.0,
),
"anthropic": CircuitBreaker(
name="anthropic",
failure_threshold=3,
recovery_timeout=60.0,
),
# External APIs
"youtube": CircuitBreaker(
name="youtube",
failure_threshold=5,
recovery_timeout=120.0,
),
"arxiv": CircuitBreaker(
name="arxiv",
failure_threshold=5,
recovery_timeout=60.0,
),
"github": CircuitBreaker(
name="github",
failure_threshold=5,
recovery_timeout=60.0,
),
# Internal services
"embedding": CircuitBreaker(
name="embedding",
failure_threshold=3,
recovery_timeout=30.0,
),
"database": CircuitBreaker(
name="database",
failure_threshold=2,
recovery_timeout=15.0,
),
}
def get_circuit_breaker(service: str) -> CircuitBreaker:
"""Get or create circuit breaker for service."""
if service not in circuit_breakers:
circuit_breakers[service] = CircuitBreaker(
name=service,
failure_threshold=5,
recovery_timeout=30.0,
)
return circuit_breakers[service]2. Bulkhead Registry
# backend/app/shared/resilience/bulkheads.py
from enum import Enum
from .bulkhead import Bulkhead, Tier, BulkheadRegistry
# Agent tier assignments
AGENT_TIERS = {
# Tier 1: Critical
"supervisor": Tier.CRITICAL,
"synthesis": Tier.CRITICAL,
"quality_gate": Tier.CRITICAL,
# Tier 2: Standard
"tech_comparator": Tier.STANDARD,
"implementation_planner": Tier.STANDARD,
"security_auditor": Tier.STANDARD,
"learning_synthesizer": Tier.STANDARD,
"codebase_analyzer": Tier.STANDARD,
"prerequisite_mapper": Tier.STANDARD,
"practical_applicator": Tier.STANDARD,
"complexity_assessor": Tier.STANDARD,
# Tier 3: Optional
"enrichment": Tier.OPTIONAL,
"cache_warm": Tier.OPTIONAL,
"metrics": Tier.OPTIONAL,
}
# Create registry
bulkhead_registry = BulkheadRegistry()
# Register bulkheads for each tier
for agent_name, tier in AGENT_TIERS.items():
bulkhead_registry.register(agent_name, tier)
def get_agent_bulkhead(agent_type: str) -> Bulkhead:
"""Get bulkhead for agent type."""
tier = AGENT_TIERS.get(agent_type, Tier.STANDARD)
return bulkhead_registry.get_or_create(agent_type, tier)3. Resilient Agent Wrapper
# backend/app/shared/resilience/agent_wrapper.py
from functools import wraps
from typing import TypeVar, Callable, Awaitable
import structlog
from .circuit_breakers import get_circuit_breaker
from .bulkheads import get_agent_bulkhead
from .retry_handler import retry, MaxRetriesExceededError
logger = structlog.get_logger()
T = TypeVar("T")
def resilient_agent(
agent_type: str,
llm_service: str = "anthropic",
max_retries: int = 2,
):
"""
Decorator to wrap agent execution with resilience patterns.
Applies (in order):
1. Circuit breaker for LLM service
2. Bulkhead for concurrency control
3. Retry for transient failures
Example:
@resilient_agent("tech_comparator", llm_service="anthropic")
async def run_tech_comparator(content: str) -> AgentOutput:
...
"""
def decorator(fn: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
@wraps(fn)
async def wrapper(*args, **kwargs) -> T:
circuit = get_circuit_breaker(llm_service)
bulkhead = get_agent_bulkhead(agent_type)
# Track for observability
logger.info(
"agent_execution_start",
agent_type=agent_type,
circuit_state=circuit.state.value,
bulkhead_active=bulkhead.stats.current_active,
)
async def execute():
# Retry layer (innermost)
@retry(max_attempts=max_retries, base_delay=1.0)
async def with_retry():
return await fn(*args, **kwargs)
return await with_retry()
try:
# Bulkhead layer
async def with_bulkhead():
return await bulkhead.execute(execute)
# Circuit breaker layer (outermost)
result = await circuit.call(with_bulkhead)
logger.info(
"agent_execution_success",
agent_type=agent_type,
)
return result
except CircuitOpenError as e:
logger.warning(
"agent_circuit_open",
agent_type=agent_type,
time_until_recovery=e.time_until_recovery,
)
raise
except BulkheadFullError as e:
logger.warning(
"agent_bulkhead_full",
agent_type=agent_type,
tier=e.tier.name,
)
raise
except MaxRetriesExceededError as e:
logger.error(
"agent_max_retries_exceeded",
agent_type=agent_type,
attempts=e.attempts,
)
raise
return wrapper
return decorator4. Graph Builder Integration
# backend/app/domains/analysis/workflows/graph_builder.py
from app.shared.resilience.agent_wrapper import resilient_agent
from app.shared.resilience.circuit_breakers import circuit_breakers
async def build_analysis_graph() -> StateGraph:
"""Build the analysis workflow graph with resilience."""
# Wrap each agent node with resilience
@resilient_agent("supervisor", llm_service="anthropic")
async def supervisor_node(state: AnalysisState) -> AnalysisState:
# Existing supervisor logic
...
@resilient_agent("tech_comparator", llm_service="anthropic")
async def tech_comparator_node(state: AnalysisState) -> AnalysisState:
# Existing agent logic
...
# Build graph with wrapped nodes
graph = StateGraph(AnalysisState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("tech_comparator", tech_comparator_node)
# ... add other nodes
# Add health check endpoint
@app.get("/health/resilience")
async def resilience_health():
return {
"circuit_breakers": {
name: cb.get_status()
for name, cb in circuit_breakers.items()
},
"bulkheads": bulkhead_registry.get_all_status(),
}
return graph.compile()5. LLM Fallback Chain Integration
# backend/app/shared/resilience/llm_chain.py
from app.shared.resilience.llm_fallback_chain import (
LLMFallbackChain,
LLMConfig,
)
from app.shared.services.cache.semantic_cache import SemanticCache
# Configure fallback chain for analysis
analysis_llm_chain = LLMFallbackChain(
primary=AnthropicProvider(
LLMConfig(
name="primary",
model="claude-sonnet-4-6",
timeout=60.0,
max_tokens=8192,
)
),
fallbacks=[
OpenAIProvider(
LLMConfig(
name="fallback",
model="gpt-5.2-mini",
timeout=30.0,
max_tokens=4096,
)
),
],
cache=SemanticCache(
redis_client=redis_client,
threshold=0.85,
),
default_response=lambda p: json.dumps({
"status": "degraded",
"message": "Analysis temporarily unavailable",
"partial_results": None,
}),
)6. Observability Integration
# backend/app/shared/resilience/observability.py
from langfuse import Langfuse
langfuse = Langfuse()
def on_circuit_state_change(old_state: str, new_state: str, name: str):
"""Record circuit state changes in Langfuse."""
langfuse.event(
name="circuit_breaker_state_change",
metadata={
"circuit_name": name,
"old_state": old_state,
"new_state": new_state,
},
level="WARNING" if new_state == "open" else "INFO",
)
def on_bulkhead_rejection(name: str, tier: str):
"""Record bulkhead rejections."""
langfuse.event(
name="bulkhead_rejection",
metadata={
"bulkhead_name": name,
"tier": tier,
},
level="WARNING",
)
# Wire up callbacks
for name, cb in circuit_breakers.items():
cb._on_state_change = on_circuit_state_change
for name, bh in bulkhead_registry._bulkheads.items():
bh._on_rejection = on_bulkhead_rejectionConfiguration
Environment Variables
# Circuit Breaker
CIRCUIT_FAILURE_THRESHOLD=5
CIRCUIT_RECOVERY_TIMEOUT=30
# Bulkhead Tier 1
BULKHEAD_TIER1_CONCURRENT=5
BULKHEAD_TIER1_QUEUE=10
BULKHEAD_TIER1_TIMEOUT=300
# Bulkhead Tier 2
BULKHEAD_TIER2_CONCURRENT=3
BULKHEAD_TIER2_QUEUE=5
BULKHEAD_TIER2_TIMEOUT=120
# Bulkhead Tier 3
BULKHEAD_TIER3_CONCURRENT=2
BULKHEAD_TIER3_QUEUE=3
BULKHEAD_TIER3_TIMEOUT=60
# Retry
RETRY_MAX_ATTEMPTS=3
RETRY_BASE_DELAY=1.0
RETRY_MAX_DELAY=30.0Monitoring Dashboard
┌────────────────────────────────────────────────────────────────────┐
│ OrchestKit Resilience Dashboard │
├────────────────────────────────────────────────────────────────────┤
│ │
│ CIRCUIT BREAKERS │
│ ┌─────────────┬──────────┬────────────┬───────────────────┐ │
│ │ Service │ State │ Failures │ Next Recovery │ │
│ ├─────────────┼──────────┼────────────┼───────────────────┤ │
│ │ anthropic │ ✅ CLOSED │ 0/5 │ - │ │
│ │ openai │ ✅ CLOSED │ 1/5 │ - │ │
│ │ youtube │ ⚠️ OPEN │ 5/5 │ 45s │ │
│ │ embedding │ ✅ CLOSED │ 0/3 │ - │ │
│ └─────────────┴──────────┴────────────┴───────────────────┘ │
│ │
│ BULKHEADS │
│ ┌─────────────────┬────────┬─────────┬─────────┬──────────┐ │
│ │ Tier │ Active │ Queued │ Max │ Rejected │ │
│ ├─────────────────┼────────┼─────────┼─────────┼──────────┤ │
│ │ 1: Critical │ 3/5 │ 1/10 │ 5 │ 0 │ │
│ │ 2: Standard │ 3/3 ⚠️ │ 4/5 │ 3 │ 12 │ │
│ │ 3: Optional │ 1/2 │ 0/3 │ 2 │ 45 │ │
│ └─────────────────┴────────┴─────────┴─────────┴──────────┘ │
│ │
│ RETRY STATS (Last Hour) │
│ • Total Attempts: 1,234 │
│ • Success Rate: 94.2% │
│ • Retries Used: 187 │
│ • Max Retries Exceeded: 23 │
│ │
└────────────────────────────────────────────────────────────────────┘Testing Resilience
# backend/tests/integration/test_resilience.py
import pytest
from unittest.mock import AsyncMock, patch
async def test_circuit_opens_after_failures():
"""Circuit should open after threshold failures."""
breaker = get_circuit_breaker("test-service")
# Simulate failures
for _ in range(5):
with pytest.raises(ConnectionError):
await breaker.call(failing_function)
# Next call should be rejected
with pytest.raises(CircuitOpenError):
await breaker.call(failing_function)
async def test_bulkhead_rejects_when_full():
"""Bulkhead should reject when queue is full."""
bulkhead = Bulkhead("test", Tier.STANDARD, max_concurrent=1, queue_size=1)
# Fill the bulkhead
task1 = asyncio.create_task(bulkhead.execute(slow_function))
task2 = asyncio.create_task(bulkhead.execute(slow_function))
# Third should be rejected
with pytest.raises(BulkheadFullError):
await bulkhead.execute(slow_function)
async def test_fallback_chain_uses_fallback():
"""Chain should use fallback when primary fails."""
chain = LLMFallbackChain(
primary=FailingProvider(),
fallbacks=[MockProvider()],
)
response = await chain.complete("test prompt")
assert response.is_fallback
assert response.source == ResponseSource.FALLBACKDevops Deployment
Use when setting up CI/CD pipelines, containerizing applications, deploying to Kubernetes, or writing infrastructure as code. DevOps & Deployment covers GitHub Actions, Docker, Helm, and Terraform patterns.
Doctor
OrchestKit doctor for health diagnostics. Use when running checks on plugin health, diagnosing problems, or troubleshooting issues.
Last updated on