Python Backend
Python backend patterns for asyncio, FastAPI, SQLAlchemy 2.0 async, and connection pooling. Use when building async Python services, FastAPI endpoints, database sessions, or connection pool tuning.
Primary Agent: backend-system-architect
Python Backend
Patterns for building production Python backends with asyncio, FastAPI, SQLAlchemy 2.0, and connection pooling. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Asyncio | 3 | HIGH | TaskGroup, structured concurrency, cancellation handling |
| FastAPI | 3 | HIGH | Dependencies, middleware, background tasks |
| SQLAlchemy | 3 | HIGH | Async sessions, relationships, migrations |
| Pooling | 3 | MEDIUM | Database pools, HTTP sessions, tuning |
Total: 12 rules across 4 categories
Quick Start
# FastAPI + SQLAlchemy async session
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()# Asyncio TaskGroup with timeout
async def fetch_all(urls: list[str]) -> list[dict]:
async with asyncio.timeout(30):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]Asyncio
Modern Python asyncio patterns using structured concurrency, TaskGroup, and Python 3.11+ features.
Key Patterns
- TaskGroup replaces
gather()with structured concurrency and auto-cancellation asyncio.timeout()context manager for composable timeouts- Semaphore for concurrency limiting (rate-limit HTTP requests)
except*with ExceptionGroup for handling multiple task failuresasyncio.to_thread()for bridging sync code to async
Key Decisions
| Decision | Recommendation |
|---|---|
| Task spawning | TaskGroup not gather() |
| Timeouts | asyncio.timeout() context manager |
| Concurrency limit | asyncio.Semaphore |
| Sync bridge | asyncio.to_thread() |
| Cancellation | Always re-raise CancelledError |
FastAPI
Production-ready FastAPI patterns for lifespan, dependencies, middleware, and settings.
Key Patterns
- Lifespan with
asynccontextmanagerfor startup/shutdown resource management - Dependency injection with class-based services and
Depends() - Middleware stack: CORS -> RequestID -> Timing -> Logging
- Pydantic Settings with
.envand field validation - Exception handlers with RFC 7807 Problem Details
Key Decisions
| Decision | Recommendation |
|---|---|
| Lifespan | asynccontextmanager (not events) |
| Dependencies | Class-based services with DI |
| Settings | Pydantic Settings with .env |
| Response | ORJSONResponse for performance |
| Health | Check all critical dependencies |
SQLAlchemy
Async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.
Key Patterns
- One AsyncSession per request with
expire_on_commit=False lazy="raise"on relationships to prevent accidental N+1 queriesselectinloadfor eager loading collections- Repository pattern with generic async CRUD
- Bulk inserts chunked 1000-10000 rows for memory management
Key Decisions
| Decision | Recommendation |
|---|---|
| Session scope | One AsyncSession per request |
| Lazy loading | lazy="raise" + explicit loads |
| Eager loading | selectinload for collections |
| expire_on_commit | False (prevents lazy load errors) |
| Pool | pool_pre_ping=True |
Pooling
Database and HTTP connection pooling for high-performance async Python applications.
Key Patterns
- SQLAlchemy pool with
pool_size,max_overflow,pool_pre_ping - Direct asyncpg pool with
min_size/max_sizeand connection lifecycle - aiohttp session with
TCPConnectorlimits and DNS caching - FastAPI lifespan creating and closing pools at startup/shutdown
- Pool monitoring with Prometheus metrics
Pool Sizing Formula
pool_size = (concurrent_requests / avg_queries_per_request) * 1.5Anti-Patterns (FORBIDDEN)
# NEVER use gather() for new code - no structured concurrency
# NEVER swallow CancelledError - breaks TaskGroup and timeout
# NEVER block the event loop with sync calls (time.sleep, requests.get)
# NEVER use global mutable state for db sessions
# NEVER skip dependency injection (create sessions in routes)
# NEVER share AsyncSession across tasks (race condition)
# NEVER use sync Session in async code (blocks event loop)
# NEVER create engine/pool per request
# NEVER forget to close pools on shutdownRelated Skills
ork:architecture-patterns- Clean architecture and layer separationork:async-jobs- Celery/ARQ for background processingstreaming-api-patterns- SSE/WebSocket async patternsork:database-patterns- Database schema design
Rules (12)
Handle asyncio cancellation correctly for proper TaskGroup and timeout behavior — HIGH
Cancellation Handling
Proper Cancellation Pattern
async def cancellable_operation(resource_id: str) -> dict:
"""Properly handle cancellation - NEVER swallow CancelledError."""
resource = await acquire_resource(resource_id)
try:
return await process_resource(resource)
except asyncio.CancelledError:
# Clean up but RE-RAISE - this is critical!
await cleanup_resource(resource)
raise # ALWAYS re-raise CancelledError
finally:
await release_resource(resource)Anti-Patterns
# NEVER swallow CancelledError - breaks structured concurrency
except asyncio.CancelledError:
return None # BREAKS TaskGroup and timeout!
# NEVER use create_task() without TaskGroup - tasks leak
asyncio.create_task(background_work()) # Fire and forget = leaked task
# NEVER yield inside async context managers (PEP 789)
async with asyncio.timeout(10):
yield item # DANGEROUS - cancellation bugs!Key Principles
- Always re-raise
CancelledErrorafter cleanup - Breaking this rule breaks TaskGroup and timeout behavior
- Use
try/finallyfor guaranteed resource cleanup - Never use bare
create_task()outside a TaskGroup
Incorrect — Swallowing CancelledError breaks TaskGroup cancellation propagation:
async def broken_task():
try:
await long_running_operation()
except asyncio.CancelledError:
return None # BUG: TaskGroup cannot cancel properly!Correct — Re-raising CancelledError allows proper cleanup and cancellation flow:
async def proper_task():
try:
await long_running_operation()
except asyncio.CancelledError:
await cleanup()
raise # CRITICAL: Propagate cancellationUse semaphores and sync bridges to prevent resource exhaustion and event loop blocking — HIGH
Structured Concurrency Patterns
Semaphore for Concurrency Limiting
class RateLimitedClient:
"""HTTP client with concurrency limiting."""
def __init__(self, max_concurrent: int = 10):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._session: aiohttp.ClientSession | None = None
async def fetch(self, url: str) -> dict:
async with self._semaphore: # Limit concurrent requests
async with self._session.get(url) as response:
return await response.json()
async def fetch_many(self, urls: list[str]) -> list[dict]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(self.fetch(url)) for url in urls]
return [t.result() for t in tasks]Sync-to-Async Bridge
import asyncio
from concurrent.futures import ThreadPoolExecutor
# For CPU-bound or blocking sync code
async def run_blocking_operation(data: bytes) -> dict:
"""Run blocking sync code in thread pool."""
return await asyncio.to_thread(cpu_intensive_parse, data)
# For sync code that needs async context
def sync_caller():
"""Call async code from sync context (not in existing loop)."""
return asyncio.run(async_main())
# For sync code within existing async context
async def wrapper_for_sync_lib():
"""Bridge sync library to async - use with care."""
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, sync_blocking_call)
return resultKey Principles
- Use
asyncio.Semaphoreto prevent connection pool exhaustion - Use
asyncio.to_thread()for clean sync-to-async bridging - Never call
asyncio.run()inside an existing event loop - Never block the event loop with
time.sleep()orrequests.get()
Incorrect — Unlimited concurrent requests exhaust connection pools and memory:
async def fetch_all(urls: list[str]):
tasks = [fetch_url(url) for url in urls] # 10,000 concurrent!
return await asyncio.gather(*tasks)Correct — Semaphore limits concurrency to prevent resource exhaustion:
async def fetch_all(urls: list[str], max_concurrent: int = 10):
sem = asyncio.Semaphore(max_concurrent)
async def limited_fetch(url):
async with sem:
return await fetch_url(url)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(limited_fetch(url)) for url in urls]
return [t.result() for t in tasks]Apply TaskGroup for structured concurrency with automatic cancellation on failure — HIGH
TaskGroup & Timeout Patterns
TaskGroup (Replaces gather)
import asyncio
async def fetch_user_data(user_id: str) -> dict:
"""Fetch user data concurrently - all tasks complete or all cancelled."""
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(user_id))
orders_task = tg.create_task(fetch_orders(user_id))
preferences_task = tg.create_task(fetch_preferences(user_id))
# All tasks guaranteed complete here
return {
"user": user_task.result(),
"orders": orders_task.result(),
"preferences": preferences_task.result(),
}TaskGroup with Timeout
async def fetch_with_timeout(urls: list[str], timeout_sec: float = 30) -> list[dict]:
"""Fetch all URLs with overall timeout - structured concurrency."""
results = []
async with asyncio.timeout(timeout_sec):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]Exception Group Handling
async def process_batch(items: list[dict]) -> tuple[list[dict], list[Exception]]:
"""Process batch, collecting both successes and failures."""
results = []
errors = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_item(item)) for item in items]
except* ValueError as eg:
errors.extend(eg.exceptions)
except* Exception as eg:
errors.extend(eg.exceptions)
else:
results = [t.result() for t in tasks]
return results, errorsKey Principles
- Use TaskGroup not
gather()for all new code - Use
asyncio.timeout()context manager for deadlines - Handle ExceptionGroup with
except*for multiple failures - TaskGroup auto-cancels remaining tasks when one fails
Incorrect — gather() doesn't cancel remaining tasks when one fails:
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_preferences(user_id),
) # If one fails, others keep running (resource leak)Correct — TaskGroup auto-cancels all tasks when any fails (structured concurrency):
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(user_id))
orders_task = tg.create_task(fetch_orders(user_id))
prefs_task = tg.create_task(fetch_preferences(user_id))
# If any fails, all are cancelled automaticallyConfigure FastAPI lifespan management to prevent resource leaks during startup and shutdown — HIGH
FastAPI Lifespan & Health
Lifespan Context Manager
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan with resource management."""
# Startup
app.state.db_engine = create_async_engine(
settings.database_url, pool_size=5, max_overflow=10,
)
app.state.redis = redis.from_url(settings.redis_url)
# Health check connections
async with app.state.db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
await app.state.redis.ping()
yield # Application runs
# Shutdown
await app.state.db_engine.dispose()
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)Health Check Endpoint
@health_router.get("/health")
async def health_check(request: Request):
checks = {}
try:
async with request.app.state.db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
checks["database"] = "healthy"
except Exception as e:
checks["database"] = f"unhealthy: {e}"
try:
await request.app.state.redis.ping()
checks["redis"] = "healthy"
except Exception as e:
checks["redis"] = f"unhealthy: {e}"
status = "healthy" if all(v == "healthy" for v in checks.values()) else "unhealthy"
return {"status": status, "checks": checks}Pydantic Settings
from pydantic import Field, field_validator, PostgresDsn
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)
database_url: PostgresDsn
db_pool_size: int = Field(default=5, ge=1, le=20)
redis_url: str = "redis://localhost:6379"
api_key: str = Field(min_length=32)
debug: bool = False
@field_validator("database_url", mode="before")
@classmethod
def validate_database_url(cls, v: str) -> str:
if v and "+asyncpg" not in v:
return v.replace("postgresql://", "postgresql+asyncpg://")
return vException Handlers
@app.exception_handler(ProblemException)
async def problem_exception_handler(request: Request, exc: ProblemException):
return JSONResponse(
status_code=exc.status_code,
content=exc.to_problem_detail(),
media_type="application/problem+json",
)Response Optimization
from fastapi.responses import ORJSONResponse
app = FastAPI(default_response_class=ORJSONResponse)Incorrect — Creating resources at module level leads to connection leaks:
# Module-level connection (never closed!)
db_engine = create_async_engine(DATABASE_URL)
app = FastAPI()
@app.on_event("startup") # Deprecated
async def startup():
await db_engine.connect()Correct — Lifespan context manager ensures proper resource cleanup:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create resources
app.state.db_engine = create_async_engine(DATABASE_URL)
yield
# Shutdown: guaranteed cleanup
await app.state.db_engine.dispose()
app = FastAPI(lifespan=lifespan)Design FastAPI dependency injection for testable and maintainable application architecture — HIGH
FastAPI Dependency Injection
Database Session Dependency
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends, Request
async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
"""Yield database session from app state."""
async with AsyncSession(
request.app.state.db_engine,
expire_on_commit=False,
) as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseService Dependencies
class AnalysisService:
def __init__(self, db: AsyncSession, embeddings: EmbeddingsService, llm: LLMService):
self.db = db
self.embeddings = embeddings
self.llm = llm
def get_analysis_service(
db: AsyncSession = Depends(get_db),
request: Request = None,
) -> AnalysisService:
return AnalysisService(
db=db,
embeddings=request.app.state.embeddings,
llm=request.app.state.llm,
)
@router.post("/analyses")
async def create_analysis(
data: AnalysisCreate,
service: AnalysisService = Depends(get_analysis_service),
):
return await service.create(data)Cached Settings
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
redis_url: str
api_key: str
model_config = {"env_file": ".env"}
@lru_cache
def get_settings() -> Settings:
return Settings()Authentication Chain
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
db: AsyncSession = Depends(get_db),
) -> User:
token = credentials.credentials
payload = decode_jwt(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(401, "Invalid credentials")
return user
async def get_admin_user(user: User = Depends(get_current_user)) -> User:
if not user.is_admin:
raise HTTPException(403, "Admin access required")
return userIncorrect — Manually creating dependencies couples code and breaks testability:
@router.post("/analyses")
async def create_analysis(data: AnalysisCreate, request: Request):
db = AsyncSession(request.app.state.db_engine)
service = AnalysisService(db) # Cannot mock in tests
return await service.create(data)Correct — Depends() enables dependency injection and easy testing:
@router.post("/analyses")
async def create_analysis(
data: AnalysisCreate,
service: AnalysisService = Depends(get_analysis_service),
):
return await service.create(data)
# Tests can override get_analysis_service with mocksOrder FastAPI middleware correctly since it affects every request in the application — HIGH
FastAPI Middleware Patterns
Request ID Middleware
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return responseTiming Middleware
import time
from starlette.middleware.base import BaseHTTPMiddleware
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
response.headers["X-Response-Time"] = f"{duration:.3f}s"
return responseStructured Logging Middleware
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
logger = structlog.get_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
log = logger.bind(
request_id=getattr(request.state, "request_id", None),
method=request.method,
path=request.url.path,
)
try:
response = await call_next(request)
log.info("request_completed", status_code=response.status_code)
return response
except Exception as exc:
log.exception("request_failed", error=str(exc))
raiseCORS Configuration
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["*"],
expose_headers=["X-Request-ID", "X-Response-Time"],
)Middleware Order
Add middleware in this order (last added runs first):
- CORS (outermost)
- RequestID
- Timing
- Logging (innermost)
Incorrect — Wrong middleware order causes missing request IDs in logs:
app.add_middleware(LoggingMiddleware) # Runs first, no request_id yet
app.add_middleware(RequestIDMiddleware) # Runs second, sets request_id
# Result: Logs missing request_idCorrect — Correct order ensures request_id available for logging:
app.add_middleware(LoggingMiddleware) # Runs last, has request_id
app.add_middleware(RequestIDMiddleware) # Runs first, sets request_id
# Last added = outermost = runs firstConfigure database connection pools correctly to prevent connection exhaustion under load — MEDIUM
Database Connection Pooling
SQLAlchemy Async Pool Configuration
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20, # Steady-state connections
max_overflow=10, # Burst capacity (total max = 30)
pool_pre_ping=True, # Validate before use
pool_recycle=3600, # Recreate connections after 1 hour
pool_timeout=30, # Wait for connection from pool
connect_args={
"command_timeout": 60,
"server_settings": {"statement_timeout": "60000"},
},
)Direct asyncpg Pool
import asyncpg
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=10,
max_size=20,
max_inactive_connection_lifetime=300,
command_timeout=60,
timeout=30,
setup=setup_connection,
)
async def setup_connection(conn):
await conn.execute("SET timezone TO 'UTC'")
await conn.execute("SET statement_timeout TO '60s'")Pool Sizing
| Parameter | Small Service | Medium Service | High Load |
|---|---|---|---|
| pool_size | 5-10 | 20-50 | 50-100 |
| max_overflow | 5 | 10-20 | 20-50 |
| pool_pre_ping | True | True | Consider False* |
| pool_recycle | 3600 | 1800 | 900 |
pool_size = (concurrent_requests / avg_queries_per_request) * 1.5
Example: 100 concurrent / 3 queries = 50Incorrect — Creating engine per request exhausts database connections:
@app.get("/users")
async def get_users():
engine = create_async_engine(DATABASE_URL) # New pool every request!
async with AsyncSession(engine) as session:
return await session.execute(select(User))Correct — Reuse single engine from lifespan for all requests:
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.engine = create_async_engine(DATABASE_URL, pool_size=20)
yield
await app.state.engine.dispose()
@app.get("/users")
async def get_users(request: Request):
async with AsyncSession(request.app.state.engine) as session:
return await session.execute(select(User))Reuse HTTP sessions with connection pooling to prevent churn and improve throughput — MEDIUM
HTTP Connection Pooling
aiohttp Session Pool
import aiohttp
from aiohttp import TCPConnector
connector = TCPConnector(
limit=100, # Total connections
limit_per_host=20, # Per-host limit
keepalive_timeout=30, # Keep-alive duration
ssl=False, # Or ssl.SSLContext for HTTPS
ttl_dns_cache=300, # DNS cache TTL
)
session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(
total=30, # Total request timeout
connect=10, # Connection timeout
sock_read=20, # Read timeout
),
)
# IMPORTANT: Reuse session across requests
# Create once at startup, close at shutdownFastAPI Lifespan Integration
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.db_pool = await asyncpg.create_pool(DATABASE_URL)
app.state.http_session = aiohttp.ClientSession(
connector=TCPConnector(limit=100)
)
yield
await app.state.db_pool.close()
await app.state.http_session.close()
app = FastAPI(lifespan=lifespan)Key Principles
- Never create ClientSession per request (connection churn)
- Create session at startup, close at shutdown
- Set
limit_per_hostto prevent overwhelming a single service - Configure DNS caching for high-throughput scenarios
Incorrect — Creating session per request causes connection churn and poor performance:
@app.get("/fetch")
async def fetch_data():
async with aiohttp.ClientSession() as session: # New session every request!
async with session.get("https://api.example.com") as resp:
return await resp.json()Correct — Reuse session from lifespan for connection pooling and keep-alive:
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.http_session = aiohttp.ClientSession(
connector=TCPConnector(limit=100, limit_per_host=20)
)
yield
await app.state.http_session.close()
@app.get("/fetch")
async def fetch_data(request: Request):
async with request.app.state.http_session.get("https://api.example.com") as resp:
return await resp.json()Monitor connection pools to prevent silent exhaustion and stale connection errors — MEDIUM
Pool Monitoring & Tuning
Pool Monitoring with Prometheus
from prometheus_client import Gauge
pool_size = Gauge("db_pool_size", "Current pool size")
pool_available = Gauge("db_pool_available", "Available connections")
async def collect_pool_metrics(pool: asyncpg.Pool):
pool_size.set(pool.get_size())
pool_available.set(pool.get_idle_size())Connection Exhaustion Diagnosis
# Symptom: "QueuePool limit reached" or timeouts
from sqlalchemy import event
@event.listens_for(engine.sync_engine, "checkout")
def log_checkout(dbapi_conn, conn_record, conn_proxy):
print(f"Connection checked out: {id(dbapi_conn)}")
@event.listens_for(engine.sync_engine, "checkin")
def log_checkin(dbapi_conn, conn_record):
print(f"Connection returned: {id(dbapi_conn)}")
# Fix: Ensure connections are returned
async with session.begin():
pass # Connection returned hereStale Connection Handling
# Fix 1: Enable pool_pre_ping
engine = create_async_engine(url, pool_pre_ping=True)
# Fix 2: Reduce pool_recycle
engine = create_async_engine(url, pool_recycle=900)
# Fix 3: Application-level retry
from sqlalchemy.exc import DBAPIError
async def with_retry(session, operation, max_retries=3):
for attempt in range(max_retries):
try:
return await operation(session)
except DBAPIError as e:
if attempt == max_retries - 1:
raise
await session.rollback()Anti-Patterns
# NEVER create engine/pool per request
async def get_data():
engine = create_async_engine(url) # WRONG - pool per request!
# NEVER create ClientSession per request
async def fetch():
async with aiohttp.ClientSession() as session: # WRONG!
return await session.get(url)
# NEVER forget to close pools on shutdown
# NEVER set pool_size too high (exhausts DB connections)Incorrect — No pool monitoring leads to silent connection exhaustion:
# No visibility into pool state
engine = create_async_engine(url, pool_size=20)
# App runs slow, no idea why (pool exhausted)Correct — Prometheus metrics reveal pool exhaustion before timeouts occur:
from prometheus_client import Gauge
pool_size_gauge = Gauge("db_pool_size", "DB pool size")
pool_available_gauge = Gauge("db_pool_available", "Available connections")
async def collect_metrics(pool):
pool_size_gauge.set(pool.get_size())
pool_available_gauge.set(pool.get_idle_size())
# Alert when pool_available approaches 0Implement repository pattern and bulk operations for maintainable SQLAlchemy data access — HIGH
Repository & Bulk Operations
Generic Repository Pattern
from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T", bound=Base)
class AsyncRepository(Generic[T]):
"""Generic async repository for CRUD operations."""
def __init__(self, session: AsyncSession, model: type[T]):
self.session = session
self.model = model
async def get(self, id: UUID) -> T | None:
return await self.session.get(self.model, id)
async def get_many(self, ids: list[UUID]) -> list[T]:
result = await self.session.execute(
select(self.model).where(self.model.id.in_(ids))
)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, instance: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, instance: T) -> None:
await self.session.delete(instance)
await self.session.flush()Bulk Operations
async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int:
"""Efficient bulk insert."""
users = [User(**data) for data in users_data]
db.add_all(users)
await db.flush()
return len(users)
async def bulk_insert_chunked(
db: AsyncSession, items: list[dict], chunk_size: int = 1000,
) -> int:
"""Insert large datasets in chunks to manage memory."""
total = 0
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
db.add_all([Item(**data) for data in chunk])
await db.flush()
total += len(chunk)
return totalKey Principles
- Use
flush()for ID generation without committing the transaction - Chunk bulk inserts at 1000-10000 rows for memory management
- One repository per aggregate root
- Transaction boundary at the service layer, not repository
Incorrect — Adding entities one-by-one in loop causes N round trips:
async def create_users(db: AsyncSession, users_data: list[dict]):
for user_data in users_data: # 1000 loop iterations
user = User(**user_data)
db.add(user)
await db.flush() # 1000 round trips to DB!Correct — Bulk operations reduce round trips and improve performance:
async def create_users(db: AsyncSession, users_data: list[dict]):
users = [User(**data) for data in users_data]
db.add_all(users) # Single round trip
await db.flush()Configure eager loading for SQLAlchemy relationships to prevent N+1 query performance problems — HIGH
Relationships & Eager Loading
Eager Loading (Avoid N+1)
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select
async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None:
"""Load user with orders in single query - NO N+1."""
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]:
"""Load multiple users with orders efficiently."""
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.limit(limit)
)
return list(result.scalars().all())Concurrent Queries (Session Safety)
async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict:
"""Sequential queries with same session (safe)."""
# WRONG: Don't share AsyncSession across tasks
# async with asyncio.TaskGroup() as tg:
# tg.create_task(db.execute(...)) # NOT SAFE
# CORRECT: Sequential queries with same session
user = await db.get(User, user_id)
orders_result = await db.execute(
select(Order).where(Order.user_id == user_id).limit(10)
)
return {"user": user, "recent_orders": list(orders_result.scalars().all())}
async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]:
"""Concurrent queries - each task gets its own session."""
async def fetch_user(user_id: UUID) -> dict:
async with async_session_factory() as session:
user = await session.get(User, user_id)
return {"id": user_id, "email": user.email if user else None}
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
return [t.result() for t in tasks]Key Decisions
| Decision | Recommendation | Rationale |
|---|---|---|
| Lazy loading | lazy="raise" + explicit loads | Prevents accidental N+1 |
| Eager loading | selectinload for collections | Better than joinedload for async |
| Concurrent queries | Separate sessions per task | AsyncSession is NOT thread-safe |
Incorrect — Lazy loading causes N+1 query problem (1 + 100 queries):
users = await db.execute(select(User).limit(100))
for user in users.scalars():
print(user.orders) # Separate query for EACH user's orders!
# Total queries: 1 (users) + 100 (orders) = 101 queriesCorrect — Eager loading with selectinload fetches all data in 2 queries:
users = await db.execute(
select(User).options(selectinload(User.orders)).limit(100)
)
for user in users.scalars():
print(user.orders) # Already loaded, no extra query
# Total queries: 2 (users + orders in batch)Configure SQLAlchemy sessions and engines correctly to prevent connection leaks and lazy load errors — HIGH
SQLAlchemy Sessions & Models
Engine and Session Factory
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
# Create async engine - ONE per application
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600,
echo=False,
)
# Session factory
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Prevent lazy load issues
autoflush=False,
)FastAPI Dependency
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)) -> UserResponse:
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(404, "User not found")
return UserResponse.model_validate(user)Model Definition
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # Prevent accidental lazy loads
)Key Principles
- One
create_async_engine()per application - Set
expire_on_commit=Falseto prevent lazy load errors after commit - Set
pool_pre_ping=Truefor production connection validation - Use
lazy="raise"on all relationships
Incorrect — expire_on_commit=True causes lazy load errors after commit:
async_session_factory = async_sessionmaker(
engine, expire_on_commit=True # Default, causes issues
)
user = await session.execute(select(User).where(User.id == user_id))
await session.commit()
print(user.email) # ERROR: Instance is not bound to a SessionCorrect — expire_on_commit=False allows access after commit:
async_session_factory = async_sessionmaker(
engine, expire_on_commit=False # Prevents lazy load errors
)
user = await session.execute(select(User).where(User.id == user_id))
await session.commit()
print(user.email) # Works - object still accessibleReferences (6)
Eager Loading
Eager Loading Patterns for Async SQLAlchemy
The N+1 Problem in Async
# BAD: N+1 queries - one for users, N for orders
async def get_users_bad(db: AsyncSession) -> list[User]:
result = await db.execute(select(User))
users = result.scalars().all()
for user in users:
# This triggers N additional queries (or raises if lazy="raise")
print(user.orders)
return users
# GOOD: Single query with eager loading
async def get_users_good(db: AsyncSession) -> list[User]:
result = await db.execute(
select(User).options(selectinload(User.orders))
)
users = result.scalars().all()
for user in users:
print(user.orders) # Already loaded
return usersLoading Strategies
selectinload (Recommended for Collections)
from sqlalchemy.orm import selectinload
# Loads orders in separate SELECT ... WHERE user_id IN (...)
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.limit(100)
)joinedload (Best for Single Relations)
from sqlalchemy.orm import joinedload
# Uses LEFT JOIN - good for to-one relationships
result = await db.execute(
select(Order)
.options(joinedload(Order.user))
.where(Order.status == "pending")
)Nested Eager Loading
# Load user -> orders -> order_items
result = await db.execute(
select(User)
.options(
selectinload(User.orders).selectinload(Order.items)
)
)
# Load user -> orders and user -> addresses
result = await db.execute(
select(User)
.options(
selectinload(User.orders),
selectinload(User.addresses),
)
)Configuring Models to Prevent Lazy Load
from sqlalchemy.orm import relationship, Mapped
class User(Base):
__tablename__ = "users"
id: Mapped[UUID] = mapped_column(primary_key=True)
# lazy="raise" prevents accidental lazy loading
# Forces explicit eager loading
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # Raises if accessed without eager load
)
# For optional relationships you might want loaded
profile: Mapped["Profile"] = relationship(
lazy="joined", # Always joined (use sparingly)
)Strategy Comparison
| Strategy | SQL | Best For | Async Safe |
|---|---|---|---|
selectinload | Separate IN query | Collections | Yes |
joinedload | LEFT JOIN | Single/to-one | Yes |
subqueryload | Subquery | Large collections | Yes |
lazy="select" | On access | Never in async | No |
lazy="raise" | Raises error | Forcing explicit | Yes |
Dynamic Loading for Large Collections
class User(Base):
# For very large collections, use dynamic loading
orders: Mapped[list["Order"]] = relationship(
lazy="dynamic", # Returns query, not collection
)
# Usage
async def get_recent_orders(db: AsyncSession, user_id: UUID) -> list[Order]:
user = await db.get(User, user_id)
# Dynamic relationship returns a query
result = await db.execute(
user.orders.limit(10).order_by(Order.created_at.desc())
)
return list(result.scalars().all())Fastapi Integration
FastAPI + SQLAlchemy 2.0 Async Integration
Complete Setup
# app/db/session.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)
from app.core.config import settings
engine = create_async_engine(
settings.DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600,
echo=settings.DEBUG,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)Dependency Injection
# app/api/deps.py
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import async_session_factory
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Provide database session with automatic cleanup."""
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseRoute with Database Access
# app/api/v1/routes/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_db
from app.models.user import User
from app.schemas.user import UserResponse, UserCreate
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: UUID,
db: AsyncSession = Depends(get_db),
) -> UserResponse:
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status.HTTP_404_NOT_FOUND, "User not found")
return UserResponse.model_validate(user)
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_in: UserCreate,
db: AsyncSession = Depends(get_db),
) -> UserResponse:
user = User(**user_in.model_dump())
db.add(user)
await db.flush() # Get ID without committing
await db.refresh(user) # Load any defaults
return UserResponse.model_validate(user)Service Layer Pattern
# app/services/user_service.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class UserService:
def __init__(self, db: AsyncSession):
self.db = db
async def get(self, user_id: UUID) -> User | None:
return await self.db.get(User, user_id)
async def get_by_email(self, email: str) -> User | None:
result = await self.db.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def create(self, user_in: UserCreate) -> User:
user = User(**user_in.model_dump())
self.db.add(user)
await self.db.flush()
return user
async def update(self, user: User, user_in: UserUpdate) -> User:
for field, value in user_in.model_dump(exclude_unset=True).items():
setattr(user, field, value)
await self.db.flush()
return user
# Usage in route
@router.post("/")
async def create_user(
user_in: UserCreate,
db: AsyncSession = Depends(get_db),
):
service = UserService(db)
if await service.get_by_email(user_in.email):
raise HTTPException(400, "Email already registered")
return await service.create(user_in)Transaction Management
# Explicit transaction control
@router.post("/transfer")
async def transfer_funds(
transfer: TransferRequest,
db: AsyncSession = Depends(get_db),
):
async with db.begin(): # Explicit transaction
from_account = await db.get(Account, transfer.from_id, with_for_update=True)
to_account = await db.get(Account, transfer.to_id, with_for_update=True)
if from_account.balance < transfer.amount:
raise HTTPException(400, "Insufficient funds")
from_account.balance -= transfer.amount
to_account.balance += transfer.amount
# Commits automatically on exit, rolls back on exceptionLifespan with Database
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.db.session import engine
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: verify database connection
async with engine.begin() as conn:
await conn.execute(text("SELECT 1"))
yield
# Shutdown: dispose engine
await engine.dispose()
app = FastAPI(lifespan=lifespan)Middleware Stack
FastAPI Middleware Stack
Complete guide to middleware ordering and implementation in FastAPI.
Middleware Execution Order
REQUEST RESPONSE
│ ▲
▼ │
┌──────────────────────────────────────────────────────────────┐
│ 1. CORS Middleware (outermost) │
│ - Handles preflight requests │
│ - Adds CORS headers to response │
└──────────────────────────────────────────────────────────────┘
│ ▲
▼ │
┌──────────────────────────────────────────────────────────────┐
│ 2. Request ID Middleware │
│ - Generates/extracts request ID │
│ - Adds to response headers │
└──────────────────────────────────────────────────────────────┘
│ ▲
▼ │
┌──────────────────────────────────────────────────────────────┐
│ 3. Timing Middleware │
│ - Records start time │
│ - Calculates duration │
│ - Adds X-Response-Time header │
└──────────────────────────────────────────────────────────────┘
│ ▲
▼ │
┌──────────────────────────────────────────────────────────────┐
│ 4. Logging Middleware │
│ - Logs request details │
│ - Logs response status │
│ - Uses request ID for correlation │
└──────────────────────────────────────────────────────────────┘
│ ▲
▼ │
┌──────────────────────────────────────────────────────────────┐
│ 5. Authentication Middleware (optional) │
│ - Validates JWT/API key │
│ - Sets request.state.user │
└──────────────────────────────────────────────────────────────┘
│ ▲
▼ │
┌──────────────────────────────────────────────────────────────┐
│ 6. Rate Limit Middleware │
│ - Checks rate limits │
│ - Returns 429 if exceeded │
│ - Adds rate limit headers │
└──────────────────────────────────────────────────────────────┘
│ ▲
▼ │
┌──────────────────────────────────────────────────────────────┐
│ ROUTE HANDLER │
│ (Your endpoint code) │
└──────────────────────────────────────────────────────────────┘Note: Middleware added LAST executes FIRST (wraps outer).
Middleware Registration Order
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# 6. Rate Limit (added last, runs closest to route)
app.add_middleware(RateLimitMiddleware)
# 5. Authentication (optional)
app.add_middleware(AuthMiddleware)
# 4. Logging
app.add_middleware(LoggingMiddleware)
# 3. Timing
app.add_middleware(TimingMiddleware)
# 2. Request ID
app.add_middleware(RequestIDMiddleware)
# 1. CORS (added first, runs first/last)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)Core Middleware Implementations
Request ID Middleware
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class RequestIDMiddleware(BaseHTTPMiddleware):
"""Add unique request ID to each request."""
async def dispatch(self, request: Request, call_next):
# Get from header or generate new
request_id = request.headers.get(
"X-Request-ID",
str(uuid.uuid4()),
)
# Store in request state
request.state.request_id = request_id
# Call next middleware/route
response = await call_next(request)
# Add to response headers
response.headers["X-Request-ID"] = request_id
return responseTiming Middleware
import time
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class TimingMiddleware(BaseHTTPMiddleware):
"""Track request processing time."""
async def dispatch(self, request: Request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start_time
response.headers["X-Response-Time"] = f"{duration:.4f}s"
# Store for logging middleware
request.state.duration = duration
return responseLogging Middleware
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
logger = structlog.get_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
"""Structured logging for all requests."""
async def dispatch(self, request: Request, call_next):
# Bind request context
log = logger.bind(
request_id=getattr(request.state, "request_id", None),
method=request.method,
path=request.url.path,
client_ip=request.client.host if request.client else None,
)
try:
response = await call_next(request)
log.info(
"request_completed",
status_code=response.status_code,
duration=getattr(request.state, "duration", None),
)
return response
except Exception as exc:
log.exception("request_failed", error=str(exc))
raiseCORS Configuration
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
# Production: specify exact origins
allow_origins=[
"https://app.example.com",
"https://admin.example.com",
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["*"],
expose_headers=[
"X-Request-ID",
"X-Response-Time",
"X-RateLimit-Limit",
"X-RateLimit-Remaining",
],
max_age=600, # Preflight cache 10 minutes
)Advanced Patterns
Conditional Middleware
class ConditionalMiddleware(BaseHTTPMiddleware):
"""Middleware that only applies to certain paths."""
def __init__(self, app, paths: list[str] = None, exclude_paths: list[str] = None):
super().__init__(app)
self.paths = paths or []
self.exclude_paths = exclude_paths or []
async def dispatch(self, request: Request, call_next):
path = request.url.path
# Skip excluded paths
if any(path.startswith(p) for p in self.exclude_paths):
return await call_next(request)
# Only apply to specific paths if defined
if self.paths and not any(path.startswith(p) for p in self.paths):
return await call_next(request)
# Apply middleware logic
return await self._apply_middleware(request, call_next)
async def _apply_middleware(self, request: Request, call_next):
# Your middleware logic here
return await call_next(request)Error Handling Middleware
from fastapi.responses import JSONResponse
class ErrorHandlingMiddleware(BaseHTTPMiddleware):
"""Catch unhandled exceptions and return proper responses."""
async def dispatch(self, request: Request, call_next):
try:
return await call_next(request)
except Exception as exc:
logger.exception(
"unhandled_exception",
request_id=getattr(request.state, "request_id", None),
path=request.url.path,
)
return JSONResponse(
status_code=500,
content={
"type": "https://api.example.com/problems/internal-error",
"title": "Internal Server Error",
"status": 500,
"detail": "An unexpected error occurred",
"request_id": getattr(request.state, "request_id", None),
},
media_type="application/problem+json",
)Request Body Caching
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class BodyCacheMiddleware(BaseHTTPMiddleware):
"""Cache request body for multiple reads."""
async def dispatch(self, request: Request, call_next):
# Only cache for methods with body
if request.method in ("POST", "PUT", "PATCH"):
body = await request.body()
request.state.body = body
# Create new receive that returns cached body
async def receive():
return {"type": "http.request", "body": body}
request._receive = receive
return await call_next(request)Performance Considerations
Async vs Sync Middleware
# GOOD: Async middleware (non-blocking)
class AsyncMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
await asyncio.sleep(0) # Async operation
return await call_next(request)
# BAD: Sync operations in async middleware
class BadMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
time.sleep(1) # BLOCKS EVENT LOOP!
return await call_next(request)Middleware vs Dependencies
| Middleware | Dependencies |
|---|---|
| Runs on ALL requests | Runs on specific routes |
| No access to path params | Access to path params |
| Before route matching | After route matching |
| For cross-cutting concerns | For route-specific logic |
Use Middleware for:
- Request ID generation
- Logging
- CORS
- Timing
Use Dependencies for:
- Authentication
- Rate limiting per endpoint
- Request validation
- Database sessions
Testing Middleware
# tests/test_middleware.py
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_request_id_generated(client: AsyncClient):
response = await client.get("/health")
assert "X-Request-ID" in response.headers
# Should be valid UUID
import uuid
uuid.UUID(response.headers["X-Request-ID"])
@pytest.mark.asyncio
async def test_request_id_preserved(client: AsyncClient):
custom_id = "my-custom-id-123"
response = await client.get(
"/health",
headers={"X-Request-ID": custom_id},
)
assert response.headers["X-Request-ID"] == custom_id
@pytest.mark.asyncio
async def test_timing_header_present(client: AsyncClient):
response = await client.get("/health")
assert "X-Response-Time" in response.headers
# Should be a valid duration
duration = float(response.headers["X-Response-Time"].rstrip("s"))
assert duration > 0Related Files
- See
examples/fastapi-middleware.mdfor complete examples - See
scripts/middleware-stack.pyfor copy-paste template - See SKILL.md for lifespan and dependencies
Pool Sizing
Connection Pool Sizing Guide
Database Pool Sizing
Formula
pool_size = (requests_per_second * avg_query_duration_seconds) * safety_factor
Where:
- requests_per_second: Peak RPS to handle
- avg_query_duration_seconds: Average time a connection is held
- safety_factor: 1.5 - 2.0 for headroomExamples
| Scenario | RPS | Query Duration | Pool Size |
|---|---|---|---|
| Low traffic API | 10 | 50ms | 5 |
| Medium service | 100 | 50ms | 10 |
| High traffic API | 500 | 50ms | 50 |
| Slow queries | 100 | 200ms | 40 |
| Batch processing | 50 | 500ms | 50 |
PostgreSQL Connection Limits
-- Check max connections
SHOW max_connections; -- Default: 100
-- Check current connections
SELECT count(*) FROM pg_stat_activity;
-- Check per-database limit
SELECT datname, numbackends FROM pg_stat_database;Important: Total connections across all app instances must not exceed max_connections.
Total pool size = pool_size * num_instances
Example:
- max_connections = 100
- Reserve 10 for admin
- 90 available for apps
- 3 app instances
- pool_size per instance = 30max_overflow Setting
max_overflow allows temporary connections above pool_size.
engine = create_async_engine(
url,
pool_size=20, # Normal capacity
max_overflow=10, # Burst capacity (total max = 30)
)Guidelines
| Traffic Pattern | max_overflow |
|---|---|
| Steady load | 0-25% of pool_size |
| Bursty traffic | 50-100% of pool_size |
| Unpredictable | 100% of pool_size |
HTTP Connection Pool Sizing
aiohttp TCPConnector
connector = TCPConnector(
limit=100, # Total connections
limit_per_host=20, # Per-host limit
)Guidelines
limit = num_external_services * connections_per_service * safety_factor
limit_per_host = concurrent_requests_to_host * 1.5Example
| External Service | Concurrent Calls | Connections |
|---|---|---|
| Payment API | 10 | 15 |
| Email service | 5 | 8 |
| Analytics | 20 | 30 |
| Total | 35 | 53 |
Set limit=60 (with headroom).
Monitoring Pool Health
Key Metrics
# SQLAlchemy
pool = engine.pool
print(f"Size: {pool.size()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")
# asyncpg
pool = await asyncpg.create_pool(...)
print(f"Size: {pool.get_size()}")
print(f"Free: {pool.get_idle_size()}")
print(f"Min: {pool.get_min_size()}")
print(f"Max: {pool.get_max_size()}")Alert Thresholds
| Metric | Warning | Critical |
|---|---|---|
| Utilization | > 70% | > 90% |
| Wait time | > 100ms | > 1s |
| Overflow usage | > 50% | > 80% |
| Failed checkouts | > 1/min | > 10/min |
Semaphore Patterns
Semaphore Patterns for Concurrency Limiting
Basic Rate Limiting
import asyncio
import aiohttp
class RateLimitedClient:
"""HTTP client with concurrency and rate limiting."""
def __init__(
self,
max_concurrent: int = 10,
requests_per_second: float = 100,
):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._rate_limiter = AsyncRateLimiter(requests_per_second)
self._session: aiohttp.ClientSession | None = None
async def __aenter__(self):
self._session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
if self._session:
await self._session.close()
async def get(self, url: str) -> dict:
async with self._semaphore:
await self._rate_limiter.acquire()
async with self._session.get(url) as resp:
return await resp.json()
class AsyncRateLimiter:
"""Token bucket rate limiter."""
def __init__(self, rate: float):
self._rate = rate
self._tokens = rate
self._last_update = asyncio.get_event_loop().time()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = asyncio.get_event_loop().time()
self._tokens = min(
self._rate,
self._tokens + (now - self._last_update) * self._rate
)
self._last_update = now
if self._tokens < 1:
wait_time = (1 - self._tokens) / self._rate
await asyncio.sleep(wait_time)
self._tokens = 0
else:
self._tokens -= 1Database Connection Limiting
class DatabasePool:
"""Async database pool with connection limiting."""
def __init__(self, dsn: str, max_connections: int = 20):
self._dsn = dsn
self._semaphore = asyncio.Semaphore(max_connections)
self._pool = None
async def execute(self, query: str, *args) -> list:
async with self._semaphore:
async with self._pool.acquire() as conn:
return await conn.fetch(query, *args)
async def execute_many(self, queries: list[tuple[str, tuple]]) -> list:
"""Execute multiple queries with connection limiting."""
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(self.execute(q, *args))
for q, args in queries
]
return [t.result() for t in tasks]Bounded Work Queue
class BoundedWorkQueue:
"""Process items with bounded concurrency."""
def __init__(self, max_workers: int = 10):
self._semaphore = asyncio.Semaphore(max_workers)
self._results: list = []
async def process_all(
self,
items: list,
processor: Callable[[Any], Awaitable[Any]],
) -> list:
async def bounded_process(item):
async with self._semaphore:
return await processor(item)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(bounded_process(item)) for item in items]
return [t.result() for t in tasks]Common Pitfalls
# WRONG: Creating semaphore inside coroutine
async def bad_fetch(url: str):
sem = asyncio.Semaphore(10) # New semaphore each call!
async with sem:
return await fetch(url)
# CORRECT: Share semaphore across calls
SEM = asyncio.Semaphore(10)
async def good_fetch(url: str):
async with SEM:
return await fetch(url)
# WRONG: Semaphore without timeout
async with sem:
await potentially_slow_operation() # Can block other tasks indefinitely
# CORRECT: Semaphore with timeout
async with asyncio.timeout(30):
async with sem:
await potentially_slow_operation()Taskgroup Patterns
TaskGroup Patterns
Basic TaskGroup Usage
import asyncio
from typing import TypeVar
T = TypeVar("T")
async def fetch_all_concurrent(tasks: list[Coroutine[Any, Any, T]]) -> list[T]:
"""Run all tasks concurrently, fail-fast on any exception."""
async with asyncio.TaskGroup() as tg:
created = [tg.create_task(task) for task in tasks]
return [t.result() for t in created]TaskGroup with Partial Failure Handling
async def fetch_with_partial_failures(urls: list[str]) -> tuple[list[dict], list[str]]:
"""Collect successes and failures separately."""
successes = []
failures = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [(url, tg.create_task(fetch(url))) for url in urls]
except* Exception as eg:
# TaskGroup failed - collect individual results
for url, task in tasks:
if task.done():
try:
successes.append(task.result())
except Exception:
failures.append(url)
else:
failures.append(url)
else:
successes = [t.result() for _, t in tasks]
return successes, failuresTaskGroup with Timeout per Task
async def fetch_with_individual_timeouts(
items: list[dict],
timeout_per_item: float = 5.0,
) -> list[dict | None]:
"""Each task has its own timeout."""
async def fetch_with_timeout(item: dict) -> dict | None:
try:
async with asyncio.timeout(timeout_per_item):
return await process_item(item)
except asyncio.TimeoutError:
return None
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_with_timeout(item)) for item in items]
return [t.result() for t in tasks]TaskGroup vs gather Comparison
| Feature | TaskGroup | gather |
|---|---|---|
| Cancellation | Automatic on first failure | Manual with return_exceptions |
| Exception handling | ExceptionGroup | List or raises first |
| Structured concurrency | Yes | No |
| Task cleanup | Guaranteed | Manual |
| Python version | 3.11+ | 3.4+ |
When to Still Use gather
# Only for Python 3.10 compatibility or return_exceptions pattern
results = await asyncio.gather(
*tasks,
return_exceptions=True # Collect all, don't fail fast
)
# Filter successes and failures
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]Checklists (4)
Async Implementation Checklist
Async Implementation Checklist
Before Starting
- Python version >= 3.11 (for TaskGroup, ExceptionGroup)
- Using async-compatible libraries (aiohttp, asyncpg, aiofiles)
- No blocking sync calls in async code paths
TaskGroup Usage
- Using
async with asyncio.TaskGroup()instead ofasyncio.gather() - All tasks created with
tg.create_task() - Handling
ExceptionGroupwithexcept*syntax - No fire-and-forget
create_task()outside TaskGroup
Timeout Handling
- Using
async with asyncio.timeout(seconds)for deadlines - Timeout values are reasonable for the operation
- Timeout exceptions are caught and handled appropriately
Cancellation Safety
- Never swallowing
asyncio.CancelledError - Always re-raising
CancelledErrorafter cleanup - Resources cleaned up in
finallyblocks - No
yieldinsideasync withtimeout/taskgroup contexts
Concurrency Limiting
- Using
asyncio.Semaphorefor rate limiting - Semaphore created once, not per-call
- Semaphore combined with timeout to prevent deadlock
- Max concurrency matches resource limits (connections, API rate)
Sync-to-Async Bridge
- Using
asyncio.to_thread()for blocking sync code - Not calling
asyncio.run()inside async context - Thread pool sized appropriately for workload
- CPU-bound work offloaded to process pool or worker
Testing
- Using
pytest-asynciofor async tests - Tests use
@pytest.mark.asynciodecorator - Mock async functions return coroutines or use
AsyncMock - Timeouts added to prevent hanging tests
Performance
- Connection pools are shared (not created per-request)
- HTTP sessions reused across requests
- Batch operations where possible
- Monitoring for event loop blocking (> 100ms)
Connection Pool Checklist
Connection Pool Checklist
Database Pool Configuration
- Using
create_async_engine(not sync engine) -
pool_sizematches expected concurrent load -
max_overflowallows for burst traffic -
pool_pre_ping=Truefor connection validation -
pool_recycleset to prevent stale connections -
pool_timeoutset with reasonable wait time
Connection Health
- Query timeouts configured (
statement_timeout) - Connection timeouts configured (
connect_timeout) - Dead connection detection enabled
- Automatic reconnection on failure
HTTP Session Configuration
-
aiohttp.ClientSessioncreated once, reused -
TCPConnectorwith appropriate limits - Per-host connection limit set
- DNS cache TTL configured
- SSL context configured if needed
Lifecycle Management
- Pools created at application startup
- Pools closed at application shutdown
- Graceful shutdown waits for active connections
- Context managers used for connection checkout
Monitoring
- Pool size metrics exposed
- Available connections tracked
- Wait time for connections monitored
- Connection errors logged
- Alerts for pool exhaustion
Testing
- Pool behavior tested under load
- Connection failure scenarios tested
- Timeout handling verified
- Pool recovery after failure tested
Performance
- Pool sizing validated with load tests
- Connection reuse verified (no per-request pools)
- Latency impact of pool_pre_ping measured
- max_overflow tuned for burst patterns
Fastapi Production Checklist
FastAPI Production Checklist
Application Setup
Lifespan Management
-
Use
asynccontextmanagerlifespan (not deprecated events)@asynccontextmanager async def lifespan(app: FastAPI): # startup yield # shutdown -
Initialize all connections in lifespan:
- Database engine with connection pool
- Redis client
- Task queue (ARQ/Celery)
- LLM clients
-
Cleanup all resources on shutdown (reverse order)
-
Verify connections on startup (ping/SELECT 1)
-
Handle graceful shutdown with active connections
Configuration
-
Use Pydantic Settings for configuration:
class Settings(BaseSettings): model_config = SettingsConfigDict(env_file=".env") -
Validate settings on startup
-
Use
@lru_cachefor settings singleton -
Don't hardcode secrets
Response Class
- Use ORJSONResponse for better performance:
app = FastAPI(default_response_class=ORJSONResponse)
Middleware Stack
Order (add in reverse)
- Rate Limiting (innermost)
- Authentication
- Logging
- Timing
- Request ID
- CORS (outermost)
Required Middleware
-
CORS: Configure for your domains
app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) -
Request ID: Generate unique ID per request
-
Timing: Track response time
-
Logging: Structured request logging
Dependency Injection
Database Session
- Use async session with proper transaction handling:
async def get_db(request: Request): async with AsyncSession(request.app.state.db_engine) as session: try: yield session await session.commit() except Exception: await session.rollback() raise
Service Dependencies
- Inject dependencies, don't instantiate in routes
- Use factories for complex dependencies
- Consider dependency caching for expensive operations
Error Handling
Exception Handlers
- Register handler for custom exceptions
- Register handler for validation errors
- Register handler for database errors
- Register catch-all for unexpected errors
RFC 9457 Problem Details
- Return
application/problem+jsonfor errors - Include required fields: type, status
- Include trace ID in error responses
- Don't leak internal details in production
Security
Authentication
- Use HTTPBearer for JWT:
security = HTTPBearer() - Validate tokens in dependency
- Set appropriate token expiry (15min access, 7d refresh)
- Use bcrypt for password hashing (cost >= 12)
Input Validation
- Use Pydantic models for all request bodies
- Validate path/query parameters
- Sanitize user input
- Use
Field()constraints
Headers
- Add security headers:
-
X-Content-Type-Options: nosniff -
X-Frame-Options: DENY -
Strict-Transport-Security
-
- Use HTTPS in production
Performance
Async Best Practices
- Use async database driver (asyncpg)
- Use async Redis client
- Don't block event loop with sync operations
- Use
run_in_executorfor blocking I/O if needed
Connection Pooling
-
Configure database pool size:
create_async_engine( url, pool_size=5, max_overflow=10, pool_pre_ping=True, ) -
Configure Redis max connections
Caching
- Cache expensive computations
- Use proper TTLs
- Implement cache invalidation
Observability
Logging
- Use structured logging (structlog)
- Include request ID in all logs
- Log at appropriate levels
- Don't log sensitive data
Metrics
- Track request latency
- Track error rates
- Track cache hit rates
- Track queue depth
Health Checks
- Implement
/healthendpoint - Check all dependencies
- Return proper status codes:
- 200 for healthy
- 503 for unhealthy
Documentation
OpenAPI
- Add descriptions to all routes
- Document all response codes
- Include request/response examples
- Tag routes appropriately
API Info
- Set app title and description
- Set version
- Configure docs URL
Testing
Test Configuration
- Use test database
- Mock external services
- Use pytest-asyncio
Test Coverage
- Unit tests for business logic
- Integration tests for routes
- Test error handling
- Test authentication
Deployment
Docker
- Multi-stage build
- Non-root user
- Health check in Dockerfile
Kubernetes
- Readiness probe
- Liveness probe
- Resource limits
- Horizontal pod autoscaler
Environment Variables
- All secrets from environment
- Different configs per environment
- Validate required variables on startup
Quick Reference
| Concern | Solution |
|---|---|
| Startup/Shutdown | asynccontextmanager lifespan |
| Config | Pydantic Settings |
| DB Session | Dependency with context manager |
| Auth | HTTPBearer + JWT validation |
| Errors | RFC 9457 Problem Details |
| Logging | Structlog with request ID |
| Response | ORJSONResponse |
| Middleware | CORS → RequestID → Timing → Logging → Auth → RateLimit |
Sqlalchemy Async Checklist
SQLAlchemy 2.0 Async Checklist
Engine Configuration
- Using
create_async_engine(notcreate_engine) - Connection string uses async driver:
postgresql+asyncpg:// -
pool_pre_ping=Trueenabled for connection validation -
pool_sizeandmax_overflowset appropriately -
pool_recycleset to prevent stale connections (e.g., 3600)
Session Factory
- Using
async_sessionmaker(notsessionmaker) -
expire_on_commit=Falseto prevent lazy load issues -
autoflush=Falsefor explicit control (optional) - Single factory instance shared across application
FastAPI Integration
- Database dependency uses
async withcontext manager - Session yielded to routes, not returned
- Commit on success, rollback on exception
- Session properly closed after request
Model Definition
- Using
Mapped[]type hints (SQLAlchemy 2.0 style) -
mapped_column()instead ofColumn() - Relationships have explicit
lazy=parameter -
lazy="raise"to prevent accidental lazy loads
Eager Loading
- Using
selectinload()for collections - Using
joinedload()for single relationships - All needed relationships loaded in query
- No N+1 queries in response serialization
Bulk Operations
- Using
add_all()for multiple inserts - Chunking large inserts (1000-10000 per batch)
- Using
flush()between chunks for memory - Batch size tuned for performance
Concurrency
- One
AsyncSessionper task/request (never shared) - Not using
scoped_sessionwith async - Concurrent queries use separate sessions
- Connection pool sized for concurrent load
Error Handling
- Proper exception handling around DB operations
- Rollback on errors before re-raising
- Connection errors handled gracefully
- Retry logic for transient failures
Testing
- Using test database (not production)
- Transactions rolled back after each test
- Async test fixtures with
pytest-asyncio - Database state isolated between tests
Performance
- Indexes on frequently queried columns
-
EXPLAIN ANALYZErun on slow queries - Connection pool metrics monitored
- Query execution time logged
Examples (4)
Asyncio Examples
Asyncio Advanced Examples
Example 1: Concurrent API Fetching with TaskGroup
import asyncio
import aiohttp
from dataclasses import dataclass
@dataclass
class UserData:
profile: dict
orders: list
preferences: dict
async def fetch_user_dashboard(user_id: str) -> UserData:
"""Fetch all user data concurrently with proper error handling."""
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
profile_task = tg.create_task(
fetch_json(session, f"/api/users/{user_id}")
)
orders_task = tg.create_task(
fetch_json(session, f"/api/users/{user_id}/orders")
)
prefs_task = tg.create_task(
fetch_json(session, f"/api/users/{user_id}/preferences")
)
return UserData(
profile=profile_task.result(),
orders=orders_task.result(),
preferences=prefs_task.result(),
)
async def fetch_json(session: aiohttp.ClientSession, path: str) -> dict:
async with session.get(f"https://api.example.com{path}") as resp:
return await resp.json()Example 2: Rate-Limited Bulk Processing
import asyncio
from typing import TypeVar, Callable, Awaitable
T = TypeVar("T")
R = TypeVar("R")
class BulkProcessor:
"""Process items with concurrency and rate limiting."""
def __init__(self, max_concurrent: int = 10, timeout: float = 30.0):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.timeout = timeout
async def process_all(
self,
items: list[T],
processor: Callable[[T], Awaitable[R]],
) -> tuple[list[R], list[tuple[T, Exception]]]:
"""Process all items, returning successes and failures."""
successes: list[R] = []
failures: list[tuple[T, Exception]] = []
async def process_one(item: T) -> tuple[T, R | Exception]:
async with self.semaphore:
try:
async with asyncio.timeout(self.timeout):
result = await processor(item)
return (item, result)
except Exception as e:
return (item, e)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_one(item)) for item in items]
for task in tasks:
item, result = task.result()
if isinstance(result, Exception):
failures.append((item, result))
else:
successes.append(result)
return successes, failures
# Usage
processor = BulkProcessor(max_concurrent=5, timeout=10.0)
successes, failures = await processor.process_all(
items=urls,
processor=fetch_and_parse,
)
print(f"Processed {len(successes)}, failed {len(failures)}")Example 3: Graceful Shutdown with Cleanup
import asyncio
import signal
from contextlib import asynccontextmanager
class AsyncService:
def __init__(self):
self._shutdown_event = asyncio.Event()
self._tasks: set[asyncio.Task] = set()
async def start(self):
"""Start service with graceful shutdown handling."""
loop = asyncio.get_running_loop()
# Handle signals
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(self.shutdown())
)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(self.worker_loop())
tg.create_task(self.health_check_loop())
tg.create_task(self._wait_for_shutdown())
except* asyncio.CancelledError:
pass # Expected on shutdown
async def _wait_for_shutdown(self):
await self._shutdown_event.wait()
raise asyncio.CancelledError()
async def shutdown(self):
"""Graceful shutdown - complete current work."""
print("Shutting down...")
self._shutdown_event.set()
async def worker_loop(self):
while not self._shutdown_event.is_set():
try:
async with asyncio.timeout(5.0):
await self.process_next_job()
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
print("Worker cancelled, finishing current job...")
raise
async def health_check_loop(self):
while not self._shutdown_event.is_set():
await asyncio.sleep(30)
await self.health_check()Example 4: Exception Group Handling
import asyncio
async def fetch_from_multiple_sources(query: str) -> list[dict]:
"""Try multiple sources, collect partial results on failures."""
sources = ["source_a", "source_b", "source_c"]
results = []
errors = []
try:
async with asyncio.TaskGroup() as tg:
tasks = {
source: tg.create_task(fetch_from_source(source, query))
for source in sources
}
except* ConnectionError as eg:
# Some sources failed with connection errors
errors.extend(eg.exceptions)
# Collect successful results
for source, task in tasks.items():
if task.done() and not task.exception():
results.append(task.result())
except* TimeoutError as eg:
errors.extend(eg.exceptions)
for source, task in tasks.items():
if task.done() and not task.exception():
results.append(task.result())
else:
# All succeeded
results = [t.result() for t in tasks.values()]
if errors:
print(f"Partial results: {len(results)} succeeded, {len(errors)} failed")
return resultsExample 5: Async Context Manager with Resource Pool
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator
class ConnectionPool:
"""Async connection pool with proper lifecycle management."""
def __init__(self, max_size: int = 10):
self._semaphore = asyncio.Semaphore(max_size)
self._connections: asyncio.Queue = asyncio.Queue(maxsize=max_size)
self._initialized = False
async def initialize(self, dsn: str):
"""Pre-create connections."""
for _ in range(self._semaphore._value):
conn = await create_connection(dsn)
await self._connections.put(conn)
self._initialized = True
@asynccontextmanager
async def acquire(self) -> AsyncIterator[Connection]:
"""Get a connection from the pool."""
async with self._semaphore:
conn = await self._connections.get()
try:
yield conn
finally:
# Return connection to pool
if conn.is_healthy():
await self._connections.put(conn)
else:
# Replace unhealthy connection
new_conn = await create_connection(self._dsn)
await self._connections.put(new_conn)
async def close(self):
"""Close all connections."""
while not self._connections.empty():
conn = await self._connections.get()
await conn.close()
# Usage
pool = ConnectionPool(max_size=20)
await pool.initialize("postgres://...")
async with pool.acquire() as conn:
result = await conn.execute("SELECT * FROM users")Connection Pooling Examples
Connection Pooling Examples
SQLAlchemy Async Pool with FastAPI
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.pool import AsyncAdaptedQueuePool
# Configure engine with production settings
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost:5432/db",
# Pool sizing for medium-load service
pool_size=20,
max_overflow=10,
# Connection health
pool_pre_ping=True,
pool_recycle=1800, # 30 minutes
# Timeouts
pool_timeout=30,
# Use queue pool (default for async)
poolclass=AsyncAdaptedQueuePool,
# Connection settings
connect_args={
"command_timeout": 60,
"server_settings": {
"statement_timeout": "60000",
"lock_timeout": "10000",
},
},
# Echo SQL in development
echo=False,
)
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage connection pool lifecycle."""
# Startup: pool is lazy-initialized on first use
yield
# Shutdown: close all connections
await engine.dispose()
app = FastAPI(lifespan=lifespan)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency for database sessions."""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
"""Connection checked out, used, returned automatically."""
result = await db.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()Direct asyncpg Pool
import asyncpg
from contextlib import asynccontextmanager
from fastapi import FastAPI
pool: asyncpg.Pool | None = None
async def setup_connection(conn: asyncpg.Connection):
"""Called for each new connection in pool."""
await conn.execute("SET timezone TO 'UTC'")
await conn.execute("SET statement_timeout TO '60s'")
await conn.execute("SET lock_timeout TO '10s'")
# Register custom type codecs
await conn.set_type_codec(
'json',
encoder=json.dumps,
decoder=json.loads,
schema='pg_catalog'
)
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
# Create pool
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost:5432/db",
# Pool sizing
min_size=10,
max_size=20,
# Connection lifecycle
max_inactive_connection_lifetime=300, # 5 min idle timeout
max_queries=50000, # Recreate after N queries
# Timeouts
command_timeout=60,
timeout=30,
# Setup hook
setup=setup_connection,
)
yield
# Close pool
await pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/data/{id}")
async def get_data(id: int):
"""Acquire connection from pool."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM data WHERE id = $1", id
)
return dict(row) if row else None
@app.post("/batch")
async def batch_operation(items: list[dict]):
"""Use transaction for batch operations."""
async with pool.acquire() as conn:
async with conn.transaction():
for item in items:
await conn.execute(
"INSERT INTO items (name, value) VALUES ($1, $2)",
item["name"], item["value"]
)
return {"inserted": len(items)}aiohttp Session with Connection Pool
import aiohttp
from aiohttp import ClientTimeout, TCPConnector
from contextlib import asynccontextmanager
from fastapi import FastAPI
http_session: aiohttp.ClientSession | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global http_session
# Configure connector
connector = TCPConnector(
# Connection limits
limit=100, # Total connections
limit_per_host=20, # Per-host limit
# Keep-alive
keepalive_timeout=30,
enable_cleanup_closed=True,
# DNS caching
ttl_dns_cache=300,
use_dns_cache=True,
# SSL (for production, use proper SSL context)
ssl=False,
)
# Configure timeouts
timeout = ClientTimeout(
total=30, # Total request timeout
connect=10, # Connection timeout
sock_read=20, # Read timeout
sock_connect=5, # Socket connect timeout
)
# Create session
http_session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={"User-Agent": "MyApp/1.0"},
)
yield
# Close session (releases all connections)
await http_session.close()
app = FastAPI(lifespan=lifespan)
@app.get("/external/{resource}")
async def fetch_external(resource: str):
"""Reuse connection from pool."""
async with http_session.get(
f"https://api.example.com/{resource}"
) as response:
return await response.json()
@app.post("/webhook")
async def send_webhook(payload: dict):
"""POST with connection reuse."""
async with http_session.post(
"https://hooks.example.com/incoming",
json=payload,
headers={"X-Webhook-Token": "secret"},
) as response:
return {"status": response.status}Pool Health Monitoring
import asyncio
from prometheus_client import Gauge, Counter, Histogram
from sqlalchemy import event, text
# Prometheus metrics
pool_size_gauge = Gauge(
"db_pool_size", "Current pool size"
)
pool_checked_out = Gauge(
"db_pool_checked_out", "Connections in use"
)
pool_overflow = Gauge(
"db_pool_overflow", "Overflow connections"
)
pool_checkout_time = Histogram(
"db_pool_checkout_seconds",
"Time to acquire connection",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
pool_errors = Counter(
"db_pool_errors_total",
"Pool errors",
["type"]
)
def setup_pool_monitoring(engine):
"""Register SQLAlchemy pool event listeners."""
@event.listens_for(engine.sync_engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
connection_record.info["checkout_time"] = asyncio.get_event_loop().time()
@event.listens_for(engine.sync_engine, "checkin")
def on_checkin(dbapi_conn, connection_record):
if "checkout_time" in connection_record.info:
duration = asyncio.get_event_loop().time() - connection_record.info["checkout_time"]
pool_checkout_time.observe(duration)
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, connection_record):
pool_size_gauge.inc()
@event.listens_for(engine.sync_engine, "close")
def on_close(dbapi_conn, connection_record):
pool_size_gauge.dec()
async def collect_pool_metrics(engine):
"""Collect pool metrics periodically."""
while True:
pool = engine.pool
pool_size_gauge.set(pool.size())
pool_checked_out.set(pool.checkedout())
pool_overflow.set(pool.overflow())
await asyncio.sleep(10)
# Health check endpoint
@app.get("/health/db")
async def db_health():
"""Check database connectivity."""
try:
async with engine.connect() as conn:
await conn.execute(text("SELECT 1"))
return {
"status": "healthy",
"pool_size": engine.pool.size(),
"checked_out": engine.pool.checkedout(),
"overflow": engine.pool.overflow(),
}
except Exception as e:
pool_errors.labels(type="health_check").inc()
return {"status": "unhealthy", "error": str(e)}, 503Connection Retry with Backoff
import asyncio
from functools import wraps
from sqlalchemy.exc import OperationalError, DisconnectionError
def with_db_retry(max_retries: int = 3, base_delay: float = 0.1):
"""Decorator for database operations with retry."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except (OperationalError, DisconnectionError) as e:
last_error = e
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
continue
raise
raise last_error
return wrapper
return decorator
@with_db_retry(max_retries=3)
async def get_user_with_retry(db: AsyncSession, user_id: int):
"""Database operation with automatic retry on connection errors."""
result = await db.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()Multi-Database Pool Management
from dataclasses import dataclass
from typing import Dict
@dataclass
class DatabaseConfig:
url: str
pool_size: int = 10
max_overflow: int = 5
class MultiDatabaseManager:
"""Manage multiple database connection pools."""
def __init__(self, configs: Dict[str, DatabaseConfig]):
self.engines: Dict[str, AsyncEngine] = {}
self.session_makers: Dict[str, async_sessionmaker] = {}
for name, config in configs.items():
engine = create_async_engine(
config.url,
pool_size=config.pool_size,
max_overflow=config.max_overflow,
pool_pre_ping=True,
)
self.engines[name] = engine
self.session_makers[name] = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
def get_session(self, db_name: str) -> AsyncSession:
"""Get session for specific database."""
return self.session_makers[db_name]()
async def close_all(self):
"""Close all connection pools."""
for engine in self.engines.values():
await engine.dispose()
# Usage
db_manager = MultiDatabaseManager({
"primary": DatabaseConfig(
url="postgresql+asyncpg://user:pass@primary:5432/db",
pool_size=20,
),
"replica": DatabaseConfig(
url="postgresql+asyncpg://user:pass@replica:5432/db",
pool_size=30, # More for read-heavy
),
"analytics": DatabaseConfig(
url="postgresql+asyncpg://user:pass@analytics:5432/db",
pool_size=5, # Less for batch jobs
),
})
@app.get("/users/{id}")
async def get_user(id: int):
"""Read from replica."""
async with db_manager.get_session("replica") as session:
result = await session.execute(
select(User).where(User.id == id)
)
return result.scalar_one_or_none()
@app.post("/users")
async def create_user(user: UserCreate):
"""Write to primary."""
async with db_manager.get_session("primary") as session:
db_user = User(**user.model_dump())
session.add(db_user)
await session.commit()
return db_userFastapi Lifespan
FastAPI Lifespan Management
Complete examples for managing application lifecycle in FastAPI.
Basic Lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan context manager.
Code before yield runs on startup.
Code after yield runs on shutdown.
"""
# STARTUP
print("Application starting...")
app.state.started_at = datetime.now(timezone.utc)
yield # Application runs here
# SHUTDOWN
print("Application shutting down...")
app = FastAPI(lifespan=lifespan)Full Production Lifespan
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import structlog
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
import redis.asyncio as redis
from app.core.config import settings
logger = structlog.get_logger()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Production lifespan with full resource management.
Initializes:
- Database connection pool
- Redis connection
- Background task queue
- LLM clients
"""
logger.info("application_starting", version=settings.app_version)
# =====================================================================
# STARTUP
# =====================================================================
# 1. Database Engine
logger.info("initializing_database")
app.state.db_engine = create_async_engine(
settings.database_url,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_pre_ping=True, # Verify connections
echo=settings.debug,
)
# Verify database connection
async with app.state.db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
logger.info("database_connected")
# 2. Redis
logger.info("initializing_redis")
app.state.redis = redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True,
max_connections=20,
)
await app.state.redis.ping()
logger.info("redis_connected")
# 3. Task Queue (ARQ)
logger.info("initializing_task_queue")
from arq import create_pool
from arq.connections import RedisSettings
app.state.task_queue = await create_pool(
RedisSettings.from_dsn(settings.redis_url)
)
logger.info("task_queue_connected")
# 4. LLM Clients
logger.info("initializing_llm_clients")
from app.services.llm import LLMService
app.state.llm = LLMService(
openai_key=settings.openai_api_key,
anthropic_key=settings.anthropic_api_key,
)
logger.info("llm_clients_initialized")
# 5. Embeddings Service
logger.info("initializing_embeddings")
from app.services.embeddings import EmbeddingsService
app.state.embeddings = EmbeddingsService(
model=settings.embedding_model,
)
# Warmup embedding model
await app.state.embeddings.embed("warmup")
logger.info("embeddings_initialized")
# Record startup time
app.state.started_at = datetime.now(timezone.utc)
logger.info("application_started")
# =====================================================================
# APPLICATION RUNS HERE
# =====================================================================
yield
# =====================================================================
# SHUTDOWN
# =====================================================================
logger.info("application_stopping")
# Close in reverse order
logger.info("closing_embeddings")
await app.state.embeddings.close()
logger.info("closing_llm_clients")
await app.state.llm.close()
logger.info("closing_task_queue")
await app.state.task_queue.close()
logger.info("closing_redis")
await app.state.redis.close()
logger.info("closing_database")
await app.state.db_engine.dispose()
logger.info("application_stopped")
# Create app with lifespan
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
lifespan=lifespan,
)Accessing App State in Routes
from fastapi import APIRouter, Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter()
async def get_db(request: Request) -> AsyncSession:
"""Dependency to get database session."""
async with AsyncSession(
request.app.state.db_engine,
expire_on_commit=False,
) as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def get_redis(request: Request):
"""Dependency to get Redis client."""
return request.app.state.redis
async def get_task_queue(request: Request):
"""Dependency to get task queue."""
return request.app.state.task_queue
@router.get("/analyses/{id}")
async def get_analysis(
id: str,
db: AsyncSession = Depends(get_db),
redis=Depends(get_redis),
):
# Try cache first
cached = await redis.get(f"analysis:{id}")
if cached:
return json.loads(cached)
# Query database
analysis = await db.get(AnalysisModel, id)
return analysis
@router.post("/analyses")
async def create_analysis(
request: CreateAnalysisRequest,
db: AsyncSession = Depends(get_db),
queue=Depends(get_task_queue),
):
# Create record
analysis = AnalysisModel(**request.dict())
db.add(analysis)
await db.commit()
# Enqueue background processing
await queue.enqueue_job("process_analysis", analysis_id=str(analysis.id))
return analysisHealth Check Using App State
@router.get("/health")
async def health_check(request: Request):
"""Check health of all dependencies."""
checks = {}
# Database
try:
async with request.app.state.db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
checks["database"] = "healthy"
except Exception as e:
checks["database"] = f"unhealthy: {e}"
# Redis
try:
await request.app.state.redis.ping()
checks["redis"] = "healthy"
except Exception as e:
checks["redis"] = f"unhealthy: {e}"
# LLM
try:
if request.app.state.llm.is_available():
checks["llm"] = "healthy"
else:
checks["llm"] = "unhealthy: no providers available"
except Exception as e:
checks["llm"] = f"unhealthy: {e}"
# Overall status
all_healthy = all(v == "healthy" for v in checks.values())
status = "healthy" if all_healthy else "degraded"
return {
"status": status,
"checks": checks,
"uptime_seconds": (datetime.now(timezone.utc) - request.app.state.started_at).total_seconds(),
}Graceful Shutdown
import signal
import asyncio
from contextlib import asynccontextmanager
# Track active connections
active_connections: set = set()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Setup signal handlers for graceful shutdown
def handle_shutdown(signum, frame):
logger.info("shutdown_signal_received", signal=signum)
asyncio.create_task(graceful_shutdown())
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)
# Startup
app.state.shutting_down = False
yield
# Shutdown
logger.info("waiting_for_active_connections", count=len(active_connections))
# Wait for active requests (up to 30 seconds)
for _ in range(30):
if not active_connections:
break
await asyncio.sleep(1)
logger.info("shutdown_complete")
async def graceful_shutdown():
"""Initiate graceful shutdown."""
app.state.shutting_down = True
logger.info("graceful_shutdown_initiated")
# Middleware to track active connections
@app.middleware("http")
async def track_connections(request: Request, call_next):
if app.state.shutting_down:
return JSONResponse(
status_code=503,
content={"detail": "Server is shutting down"},
)
connection_id = id(request)
active_connections.add(connection_id)
try:
return await call_next(request)
finally:
active_connections.discard(connection_id)Testing with Lifespan
# tests/conftest.py
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from app.main import app
@pytest.fixture
async def client():
"""Test client with mocked dependencies."""
# Override app state for testing
app.state.db_engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=True,
)
app.state.redis = FakeRedis()
app.state.task_queue = FakeTaskQueue()
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
@pytest.fixture
async def db_session(client):
"""Get database session for tests."""
async with AsyncSession(app.state.db_engine) as session:
yield sessionEnvironment-Specific Lifespan
from app.core.config import settings
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan with environment-specific behavior."""
if settings.environment == "test":
# Minimal setup for tests
app.state.db_engine = create_async_engine("sqlite+aiosqlite:///:memory:")
app.state.redis = FakeRedis()
yield
return
if settings.environment == "development":
# Development setup
app.state.db_engine = create_async_engine(
settings.database_url,
echo=True, # SQL logging
)
app.state.redis = redis.from_url(settings.redis_url)
yield
await app.state.redis.close()
await app.state.db_engine.dispose()
return
# Production setup (full initialization)
# ... full production initialization code ...
yield
# ... full cleanup code ...Sqlalchemy Async Examples
SQLAlchemy 2.0 Async Examples
Example 1: Complete FastAPI Setup
# app/db/engine.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600,
echo=False,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
# app/api/deps.py
from typing import AsyncGenerator
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# app/api/routes/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import selectinload
router = APIRouter()
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(404, "User not found")
return userExample 2: Model with Proper Type Hints
from datetime import datetime, timezone
from uuid import UUID, uuid4
from sqlalchemy import String, ForeignKey, DateTime
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True),
primary_key=True,
default=uuid4,
)
email: Mapped[str] = mapped_column(
String(255),
unique=True,
index=True,
)
name: Mapped[str] = mapped_column(String(100))
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
)
# Prevent accidental lazy loading
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise",
)
class Order(Base):
__tablename__ = "orders"
id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True)
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
total_cents: Mapped[int]
status: Mapped[str] = mapped_column(String(20), default="pending")
user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")
items: Mapped[list["OrderItem"]] = relationship(lazy="raise")Example 3: Repository Pattern
from typing import Generic, TypeVar
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T", bound=Base)
class AsyncRepository(Generic[T]):
def __init__(self, session: AsyncSession, model: type[T]):
self.session = session
self.model = model
async def get(self, id: UUID) -> T | None:
return await self.session.get(self.model, id)
async def get_by_ids(self, ids: list[UUID]) -> list[T]:
if not ids:
return []
result = await self.session.execute(
select(self.model).where(self.model.id.in_(ids))
)
return list(result.scalars().all())
async def list(
self,
*,
offset: int = 0,
limit: int = 100,
) -> list[T]:
result = await self.session.execute(
select(self.model).offset(offset).limit(limit)
)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, instance: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, instance: T) -> None:
await self.session.delete(instance)
await self.session.flush()
# Usage
class UserRepository(AsyncRepository[User]):
def __init__(self, session: AsyncSession):
super().__init__(session, User)
async def get_by_email(self, email: str) -> User | None:
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def get_with_orders(self, user_id: UUID) -> User | None:
result = await self.session.execute(
select(User)
.options(selectinload(User.orders))
.where(User.id == user_id)
)
return result.scalar_one_or_none()Example 4: Bulk Operations
async def bulk_create_users(
db: AsyncSession,
users_data: list[dict],
chunk_size: int = 1000,
) -> int:
"""Efficiently insert many users in chunks."""
total = 0
for i in range(0, len(users_data), chunk_size):
chunk = users_data[i:i + chunk_size]
users = [User(**data) for data in chunk]
db.add_all(users)
await db.flush() # Get IDs, manage memory
total += len(chunk)
return total
async def bulk_update_status(
db: AsyncSession,
order_ids: list[UUID],
new_status: str,
) -> int:
"""Bulk update using UPDATE statement."""
from sqlalchemy import update
result = await db.execute(
update(Order)
.where(Order.id.in_(order_ids))
.values(status=new_status)
)
return result.rowcountExample 5: Transaction Management
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_funds(
db: AsyncSession,
from_account_id: UUID,
to_account_id: UUID,
amount: int,
) -> None:
"""Transfer with explicit transaction and row locking."""
async with db.begin(): # Explicit transaction
# Lock rows to prevent concurrent modification
from_account = await db.get(
Account,
from_account_id,
with_for_update=True,
)
to_account = await db.get(
Account,
to_account_id,
with_for_update=True,
)
if not from_account or not to_account:
raise ValueError("Account not found")
if from_account.balance < amount:
raise ValueError("Insufficient funds")
from_account.balance -= amount
to_account.balance += amount
# Transaction commits on exit, rolls back on exceptionExample 6: Complex Queries with Joins
from sqlalchemy import select, func, and_
from sqlalchemy.orm import selectinload, joinedload
async def get_user_order_summary(
db: AsyncSession,
user_id: UUID,
) -> dict:
"""Get user with order statistics."""
# Get user with eager-loaded orders
user_result = await db.execute(
select(User)
.options(selectinload(User.orders))
.where(User.id == user_id)
)
user = user_result.scalar_one_or_none()
if not user:
return None
# Get aggregate stats
stats_result = await db.execute(
select(
func.count(Order.id).label("total_orders"),
func.sum(Order.total_cents).label("total_spent"),
func.avg(Order.total_cents).label("avg_order"),
)
.where(Order.user_id == user_id)
)
stats = stats_result.one()
return {
"user": user,
"total_orders": stats.total_orders,
"total_spent_cents": stats.total_spent or 0,
"avg_order_cents": float(stats.avg_order or 0),
}Product Frameworks
Product management frameworks for business cases, market analysis, strategy, prioritization, OKRs/KPIs, personas, requirements, and user research. Use when building ROI projections, competitive analysis, RICE scoring, OKR trees, user personas, PRDs, or usability testing plans.
Quality Gates
Use when assessing task complexity, before starting complex tasks, when stuck after multiple attempts, or reviewing code against best practices. Provides quality-gates scoring (1-5), escalation workflows, and pattern library management.
Last updated on